diff --git a/dq/consumernode.go b/dq/consumernode.go index 689805d..817b841 100644 --- a/dq/consumernode.go +++ b/dq/consumernode.go @@ -1,18 +1,21 @@ package dq import ( + "sync/atomic" "time" "github.com/beanstalkd/go-beanstalk" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/syncx" + "github.com/tal-tech/go-zero/core/threading" ) type ( consumerNode struct { - conn *connection - tube string - on *syncx.AtomicBool + conn *connection + tube string + on *syncx.AtomicBool + processingNum int64 } consumeService struct { @@ -53,8 +56,14 @@ func (c *consumerNode) consumeEvents(consume Consume) { conn.TubeSet.Name[c.tube] = true id, body, err := conn.Reserve(reserveTimeout) if err == nil { - conn.Delete(id) - consume(body) + if err := conn.Delete(id); err != nil { + logx.Error(err) + } + threading.GoSafe(func() { + atomic.AddInt64(&c.processingNum, 1) + defer atomic.AddInt64(&c.processingNum, -1) + consume(body) + }) continue } @@ -91,4 +100,12 @@ func (cs consumeService) Start() { func (cs consumeService) Stop() { cs.c.dispose() + for { + if atomic.LoadInt64(&cs.c.processingNum) == 0 { + // wait all service consumer func process complete + break + } + // wait 100 millisecond check again + time.Sleep(time.Millisecond * 100) + } }