Skip to content

Commit

Permalink
feat: consumer close log
Browse files Browse the repository at this point in the history
  • Loading branch information
fearlessfei committed May 24, 2024
1 parent 328f46c commit fb83318
Showing 1 changed file with 3 additions and 0 deletions.
3 changes: 3 additions & 0 deletions kq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func (q *kafkaQueue) Start() {
q.producerRoutines.Wait()
close(q.channel)
q.consumerRoutines.Wait()

logx.Infof("Consumer %s is closed", q.c.Name)
}

func (q *kafkaQueue) Stop() {
Expand Down Expand Up @@ -209,6 +211,7 @@ func (q *kafkaQueue) startProducers() {
// io.ErrClosedPipe means committing messages on the consumer,
// kafka will refire the messages on uncommitted messages, ignore
if err == io.EOF || err == io.ErrClosedPipe {
logx.Infof("Consumer %s-%d is closed, error: %q", q.c.Name, i, err.Error())
return
}
if err != nil {
Expand Down

0 comments on commit fb83318

Please sign in to comment.