From ad087c14ab3356529a44fc11c70e5068b6378be4 Mon Sep 17 00:00:00 2001 From: sunjun Date: Thu, 24 Jun 2021 16:22:23 +0800 Subject: [PATCH 1/6] support graceful shutdown, stop wait all processing completed --- dq/consumernode.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/dq/consumernode.go b/dq/consumernode.go index 689805d..a53c067 100644 --- a/dq/consumernode.go +++ b/dq/consumernode.go @@ -1,6 +1,7 @@ package dq import ( + "sync/atomic" "time" "github.com/beanstalkd/go-beanstalk" @@ -10,9 +11,10 @@ import ( type ( consumerNode struct { - conn *connection - tube string - on *syncx.AtomicBool + conn *connection + tube string + on *syncx.AtomicBool + processingNum uint64 } consumeService struct { @@ -34,6 +36,13 @@ func (c *consumerNode) dispose() { } func (c *consumerNode) consumeEvents(consume Consume) { + defer func() { + if err := recover(); err != nil { + logx.Error(err) + // prevent accidental crashes leading to inaccurate counting + atomic.StoreUint64(&c.processingNum, 0) + } + }() for c.on.True() { conn, err := c.conn.get() if err != nil { @@ -54,7 +63,9 @@ func (c *consumerNode) consumeEvents(consume Consume) { id, body, err := conn.Reserve(reserveTimeout) if err == nil { conn.Delete(id) + atomic.AddUint64(&c.processingNum, 1) consume(body) + atomic.AddUint64(&c.processingNum, -1) continue } @@ -91,4 +102,12 @@ func (cs consumeService) Start() { func (cs consumeService) Stop() { cs.c.dispose() + for { + if atomic.LoadUint64(&cs.c.processingNum) == 0 { + // wait all service consumer func process complete + break + } + // wait 100 millisecond check again + time.Sleep(time.Millisecond * 100) + } } From 2b9c584c64365243aafa6a22b700cf07254d869f Mon Sep 17 00:00:00 2001 From: sunjun Date: Thu, 24 Jun 2021 16:38:33 +0800 Subject: [PATCH 2/6] support graceful shutdown, stop wait all processing completed --- dq/consumernode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dq/consumernode.go b/dq/consumernode.go index a53c067..9485001 100644 --- a/dq/consumernode.go +++ b/dq/consumernode.go @@ -40,7 +40,7 @@ func (c *consumerNode) consumeEvents(consume Consume) { if err := recover(); err != nil { logx.Error(err) // prevent accidental crashes leading to inaccurate counting - atomic.StoreUint64(&c.processingNum, 0) + atomic.AddUint64(&c.processingNum, -1) } }() for c.on.True() { From 500d8e97a3dc5d23ba44bbac815bcce02e093b60 Mon Sep 17 00:00:00 2001 From: sunjun Date: Thu, 24 Jun 2021 16:42:39 +0800 Subject: [PATCH 3/6] 1 --- dq/consumernode.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dq/consumernode.go b/dq/consumernode.go index 9485001..42955ce 100644 --- a/dq/consumernode.go +++ b/dq/consumernode.go @@ -14,7 +14,7 @@ type ( conn *connection tube string on *syncx.AtomicBool - processingNum uint64 + processingNum int64 } consumeService struct { @@ -40,7 +40,7 @@ func (c *consumerNode) consumeEvents(consume Consume) { if err := recover(); err != nil { logx.Error(err) // prevent accidental crashes leading to inaccurate counting - atomic.AddUint64(&c.processingNum, -1) + atomic.AddInt64(&c.processingNum, -1) } }() for c.on.True() { @@ -63,9 +63,9 @@ func (c *consumerNode) consumeEvents(consume Consume) { id, body, err := conn.Reserve(reserveTimeout) if err == nil { conn.Delete(id) - atomic.AddUint64(&c.processingNum, 1) + atomic.AddInt64(&c.processingNum, 1) consume(body) - atomic.AddUint64(&c.processingNum, -1) + atomic.AddInt64(&c.processingNum, -1) continue } @@ -103,7 +103,7 @@ func (cs consumeService) Start() { func (cs consumeService) Stop() { cs.c.dispose() for { - if atomic.LoadUint64(&cs.c.processingNum) == 0 { + if atomic.LoadInt64(&cs.c.processingNum) == 0 { // wait all service consumer func process complete break } From 3fa69947ff45d004b3a2c22c79fbe398def007ec Mon Sep 17 00:00:00 2001 From: sunjun Date: Wed, 30 Jun 2021 17:50:20 +0800 Subject: [PATCH 4/6] consume run in threading.GoSafe --- dq/consumernode.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/dq/consumernode.go b/dq/consumernode.go index 42955ce..86c0b25 100644 --- a/dq/consumernode.go +++ b/dq/consumernode.go @@ -7,6 +7,7 @@ import ( "github.com/beanstalkd/go-beanstalk" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/syncx" + "github.com/tal-tech/go-zero/core/threading" ) type ( @@ -36,13 +37,6 @@ func (c *consumerNode) dispose() { } func (c *consumerNode) consumeEvents(consume Consume) { - defer func() { - if err := recover(); err != nil { - logx.Error(err) - // prevent accidental crashes leading to inaccurate counting - atomic.AddInt64(&c.processingNum, -1) - } - }() for c.on.True() { conn, err := c.conn.get() if err != nil { @@ -64,7 +58,9 @@ func (c *consumerNode) consumeEvents(consume Consume) { if err == nil { conn.Delete(id) atomic.AddInt64(&c.processingNum, 1) - consume(body) + threading.GoSafe(func() { + consume(body) + }) atomic.AddInt64(&c.processingNum, -1) continue } From e24b55f176010f0462a30b60ed3d138a84be2123 Mon Sep 17 00:00:00 2001 From: sunjun Date: Wed, 30 Jun 2021 17:58:16 +0800 Subject: [PATCH 5/6] consume run in threading.GoSafe --- dq/consumernode.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dq/consumernode.go b/dq/consumernode.go index 86c0b25..0e3ca47 100644 --- a/dq/consumernode.go +++ b/dq/consumernode.go @@ -57,11 +57,11 @@ func (c *consumerNode) consumeEvents(consume Consume) { id, body, err := conn.Reserve(reserveTimeout) if err == nil { conn.Delete(id) - atomic.AddInt64(&c.processingNum, 1) threading.GoSafe(func() { + atomic.AddInt64(&c.processingNum, 1) + defer atomic.AddInt64(&c.processingNum, -1) consume(body) }) - atomic.AddInt64(&c.processingNum, -1) continue } From 5ceaa2679d7e0cbac3942bc7a6d98870d5665355 Mon Sep 17 00:00:00 2001 From: sunjun Date: Wed, 30 Jun 2021 17:59:55 +0800 Subject: [PATCH 6/6] add err log --- dq/consumernode.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dq/consumernode.go b/dq/consumernode.go index 0e3ca47..817b841 100644 --- a/dq/consumernode.go +++ b/dq/consumernode.go @@ -56,7 +56,9 @@ func (c *consumerNode) consumeEvents(consume Consume) { conn.TubeSet.Name[c.tube] = true id, body, err := conn.Reserve(reserveTimeout) if err == nil { - conn.Delete(id) + if err := conn.Delete(id); err != nil { + logx.Error(err) + } threading.GoSafe(func() { atomic.AddInt64(&c.processingNum, 1) defer atomic.AddInt64(&c.processingNum, -1)