From a5b5e2b0315c27a101df7f3d1529515f393b60a5 Mon Sep 17 00:00:00 2001 From: fearlessfei <573088370@qq.com> Date: Fri, 24 May 2024 17:29:15 +0800 Subject: [PATCH] feat: consumer close log --- kq/queue.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kq/queue.go b/kq/queue.go index b6a6b6b..afb6899 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -162,6 +162,8 @@ func (q *kafkaQueue) Start() { q.producerRoutines.Wait() close(q.channel) q.consumerRoutines.Wait() + + logx.Infof("Consumer %s is closed", q.c.Name) } func (q *kafkaQueue) Stop() { @@ -202,6 +204,7 @@ func (q *kafkaQueue) startConsumers() { func (q *kafkaQueue) startProducers() { for i := 0; i < q.c.Consumers; i++ { + i := i q.producerRoutines.Run(func() { for { msg, err := q.consumer.FetchMessage(context.Background()) @@ -209,6 +212,7 @@ func (q *kafkaQueue) startProducers() { // io.ErrClosedPipe means committing messages on the consumer, // kafka will refire the messages on uncommitted messages, ignore if err == io.EOF || err == io.ErrClosedPipe { + logx.Infof("Consumer %s-%d is closed, error: %q", q.c.Name, i, err.Error()) return } if err != nil {