From a5b5e2b0315c27a101df7f3d1529515f393b60a5 Mon Sep 17 00:00:00 2001 From: fearlessfei <573088370@qq.com> Date: Fri, 24 May 2024 17:29:15 +0800 Subject: [PATCH 1/3] feat: consumer close log --- kq/queue.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kq/queue.go b/kq/queue.go index b6a6b6b..afb6899 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -162,6 +162,8 @@ func (q *kafkaQueue) Start() { q.producerRoutines.Wait() close(q.channel) q.consumerRoutines.Wait() + + logx.Infof("Consumer %s is closed", q.c.Name) } func (q *kafkaQueue) Stop() { @@ -202,6 +204,7 @@ func (q *kafkaQueue) startConsumers() { func (q *kafkaQueue) startProducers() { for i := 0; i < q.c.Consumers; i++ { + i := i q.producerRoutines.Run(func() { for { msg, err := q.consumer.FetchMessage(context.Background()) @@ -209,6 +212,7 @@ func (q *kafkaQueue) startProducers() { // io.ErrClosedPipe means committing messages on the consumer, // kafka will refire the messages on uncommitted messages, ignore if err == io.EOF || err == io.ErrClosedPipe { + logx.Infof("Consumer %s-%d is closed, error: %q", q.c.Name, i, err.Error()) return } if err != nil { From 9cdcaf59edd6697f198fca007fb4cad5ab1f7a91 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sat, 20 Jul 2024 23:06:00 +0800 Subject: [PATCH 2/3] Update queue.go --- kq/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kq/queue.go b/kq/queue.go index 4f1508f..5704425 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -192,7 +192,7 @@ func (q *kafkaQueue) Start() { close(q.channel) q.consumerRoutines.Wait() - logx.Infof("Consumer %s is closed", q.c.Name) + logx.Infof("Consumer %s is closed", q.c.Name) } } From 3b897539802ba2a36b1cbe6834ed6b861e9f3f14 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sat, 20 Jul 2024 23:16:42 +0800 Subject: [PATCH 3/3] Update queue.go --- kq/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kq/queue.go b/kq/queue.go index 5704425..66451b9 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -246,6 +246,7 @@ func (q *kafkaQueue) startProducers() { if err := q.consume(func(msg kafka.Message) { q.channel <- msg }); err != nil { + logx.Infof("Consumer %s-%d is closed, error: %q", q.c.Name, i, err.Error()) return } }) @@ -259,7 +260,6 @@ func (q *kafkaQueue) consume(handle func(msg kafka.Message)) error { // io.ErrClosedPipe means committing messages on the consumer, // kafka will refire the messages on uncommitted messages, ignore if err == io.EOF || errors.Is(err, io.ErrClosedPipe) { - logx.Infof("Consumer %s-%d is closed, error: %q", q.c.Name, i, err.Error()) return err } if err != nil {