Skip to content

Commit

Permalink
feat: consumer close log
Browse files Browse the repository at this point in the history
  • Loading branch information
fearlessfei committed May 24, 2024
1 parent 328f46c commit a5b5e2b
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions kq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func (q *kafkaQueue) Start() {
q.producerRoutines.Wait()
close(q.channel)
q.consumerRoutines.Wait()

logx.Infof("Consumer %s is closed", q.c.Name)
}

func (q *kafkaQueue) Stop() {
Expand Down Expand Up @@ -202,13 +204,15 @@ func (q *kafkaQueue) startConsumers() {

func (q *kafkaQueue) startProducers() {
for i := 0; i < q.c.Consumers; i++ {
i := i
q.producerRoutines.Run(func() {
for {
msg, err := q.consumer.FetchMessage(context.Background())
// io.EOF means consumer closed
// io.ErrClosedPipe means committing messages on the consumer,
// kafka will refire the messages on uncommitted messages, ignore
if err == io.EOF || err == io.ErrClosedPipe {
logx.Infof("Consumer %s-%d is closed, error: %q", q.c.Name, i, err.Error())
return
}
if err != nil {
Expand Down

0 comments on commit a5b5e2b

Please sign in to comment.