Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

包装使用 #13

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions delaymqwarp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* @Author: 陈建君
* @Date: 2021/5/17 2:59 下午
* @Description: 延迟消息队列的包装
*/

package go_queue

import (
"encoding/json"
"fmt"
"time"

"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/service"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/threading"

"github.com/tal-tech/go-queue/internal/dq"
)

type (
DelayMqConsumeFn = dq.Consume

DelayMqWarp interface {
SetUp(adds []string, group string, redisConfig redis.RedisConf)
RegConsumer(key int, consume DelayMqConsumeFn)
At(key int, body []byte, at time.Time) (string, error)
Delay(key int, body []byte, delay time.Duration) (string, error)
Revoke(ids string) error
Stop()
Start()
}

defaultDelayMq struct {
cfg DqConf
producer dq.Producer
consumer dq.Consumer
consumes map[int]DelayMqConsumeFn
consumeSrv *service.ServiceGroup
}

SendMsg struct {
Key int `json:"key"`
Body []byte `json:"body"`
}
)

func NewDelayMq() DelayMqWarp {
return &defaultDelayMq{
consumes: map[int]DelayMqConsumeFn{},
}
}

// 初始化设置
func (k *defaultDelayMq) SetUp(adds []string, group string, redisConfig redis.RedisConf) {
// 生产端
var beanstalks []Beanstalk
for _, v := range adds {
beanstalks = append(beanstalks, dq.Beanstalk{
Endpoint: v,
Tube: group,
})
}
k.producer = dq.NewProducer(beanstalks)

// 消费端
dqCfg := DqConf{}
dqCfg.Beanstalks = beanstalks
dqCfg.Redis.Host = redisConfig.Host
dqCfg.Redis.Type = redisConfig.Type
dqCfg.Redis.Pass = redisConfig.Pass
k.consumer = dq.NewConsumer(dqCfg)
}

// 注册消费端
func (k *defaultDelayMq) RegConsumer(key int, consume DelayMqConsumeFn) {
k.consumes[key] = consume
}

// 定时消费
func (k *defaultDelayMq) At(key int, body []byte, at time.Time) (string, error) {
msg := &SendMsg{
Key: key,
Body: body,
}
m, _ := json.Marshal(msg)

return k.producer.At(m, at)
}

// 延迟消费
func (k *defaultDelayMq) Delay(key int, body []byte, delay time.Duration) (string, error) {
msg := &SendMsg{
Key: key,
Body: body,
}
m, _ := json.Marshal(msg)

return k.producer.Delay(m, delay)
}

// 撤销
func (k *defaultDelayMq) Revoke(ids string) error {
return k.producer.Revoke(ids)
}

// 开启
func (k *defaultDelayMq) Start() {
k.consumeSrv = k.consumer.Consume(k.consumeFun)
threading.GoSafe(func() {
k.consumeSrv.Start()
})
}

func (k *defaultDelayMq) Stop() {
_ = k.producer.Close()
if k.consumeSrv != nil {
k.consumeSrv.Stop()
}
}

func (k *defaultDelayMq) consumeFun(body []byte) {
msg := &SendMsg{}
err := json.Unmarshal(body, msg)
if err != nil {
logx.Error("消费消息失败", err)
}

if c, ok := k.consumes[msg.Key]; ok {
logx.Info(fmt.Sprintf("beanstalkd收到延迟消息%s", msg.Body))
c(msg.Body)
} else {
logx.Error("无效的消息", msg)
}
}
30 changes: 13 additions & 17 deletions example/dq/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,24 @@ package main
import (
"fmt"

"github.com/tal-tech/go-queue/dq"
queue "github.com/tal-tech/go-queue"

"github.com/tal-tech/go-zero/core/stores/redis"
)

func main() {
consumer := dq.NewConsumer(dq.DqConf{
Beanstalks: []dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11300",
Tube: "tube",
},
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
},
consumer := queue.NewDelayMq()
consumer.SetUp([]string{"localhost:11300", "localhost:11301"}, "tube", redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
})
consumer.Consume(func(body []byte) {

consumer.RegConsumer(1, func(body []byte) {
fmt.Println(string(body))
})

consumer.Start()
defer consumer.Stop()

select {}
}
22 changes: 11 additions & 11 deletions example/dq/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@ import (
"strconv"
"time"

"github.com/tal-tech/go-queue/dq"
"github.com/tal-tech/go-zero/core/stores/redis"

queue "github.com/tal-tech/go-queue"
)

func main() {
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11300",
Tube: "tube",
},
producer := queue.NewDelayMq()
producer.SetUp([]string{"localhost:11300", "localhost:11301"}, "tube", redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
})

for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
taskId, err := producer.Delay(1, []byte(strconv.Itoa(i)), time.Second*5)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(taskId)
}
}
}
6 changes: 3 additions & 3 deletions example/kq/consumer/config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Name: kq
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:9092
- 127.0.0.1:9092
- 127.0.0.1:9092
Group: adhoc
Topic: kq
Offset: first
Expand Down
16 changes: 11 additions & 5 deletions example/kq/consumer/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@ package main
import (
"fmt"

"github.com/tal-tech/go-queue/kq"
"github.com/tal-tech/go-zero/core/conf"

queue "github.com/tal-tech/go-queue"
)

func main() {
var c kq.KqConf
var c queue.KqConf
conf.MustLoad("config.yaml", &c)

q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
fmt.Printf("=> %s\n", v)
q := queue.NewKafka()
q.SetUp(c.Brokers, c.Topic)
q.RegConsumer("test", func(key, value string) error {
fmt.Printf("=> %s:%s\n", key, value)
return nil
}))
})

defer q.Stop()
q.Start()

select {}
}
9 changes: 9 additions & 0 deletions example/kq/producer/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Name: kq
Brokers:
- 127.0.0.1:9092
- 127.0.0.1:9092
- 127.0.0.1:9092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1
22 changes: 9 additions & 13 deletions example/kq/producer/produce.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package main

import (
"encoding/json"
"fmt"
"log"
"math/rand"
"strconv"
"time"

"github.com/tal-tech/go-queue/kq"
"github.com/tal-tech/go-zero/core/cmdline"
"github.com/tal-tech/go-zero/core/conf"

queue "github.com/tal-tech/go-queue"
)

type message struct {
Expand All @@ -19,11 +20,11 @@ type message struct {
}

func main() {
pusher := kq.NewPusher([]string{
"127.0.0.1:19092",
"127.0.0.1:19092",
"127.0.0.1:19092",
}, "kq")
var c queue.KqConf
conf.MustLoad("config.yaml", &c)

pusher := queue.NewKafka()
pusher.SetUp(c.Brokers, c.Topic)

ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
Expand All @@ -35,13 +36,8 @@ func main() {
Value: fmt.Sprintf("%d,%d", round, count),
Payload: fmt.Sprintf("%d,%d", round, count),
}
body, err := json.Marshal(m)
if err != nil {
log.Fatal(err)
}

fmt.Println(string(body))
if err := pusher.Push(string(body)); err != nil {
if err := pusher.SendMsg(m); err != nil {
log.Fatal(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion dq/config.go → internal/dq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type (
Tube string
}

DqConf struct {
Conf struct {
Beanstalks []Beanstalk
Redis redis.RedisConf
}
Expand Down
File renamed without changes.
9 changes: 5 additions & 4 deletions dq/consumer.go → internal/dq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type (
Consume func(body []byte)

Consumer interface {
Consume(consume Consume)
Consume(consume Consume) *service.ServiceGroup
}

consumerCluster struct {
Expand All @@ -31,7 +31,7 @@ type (
}
)

func NewConsumer(c DqConf) Consumer {
func NewConsumer(c Conf) Consumer {
var nodes []*consumerNode
for _, node := range c.Beanstalks {
nodes = append(nodes, newConsumerNode(node.Endpoint, node.Tube))
Expand All @@ -42,7 +42,7 @@ func NewConsumer(c DqConf) Consumer {
}
}

func (c *consumerCluster) Consume(consume Consume) {
func (c *consumerCluster) Consume(consume Consume) *service.ServiceGroup {
guardedConsume := func(body []byte) {
key := hash.Md5Hex(body)
body, ok := c.unwrap(body)
Expand All @@ -66,7 +66,8 @@ func (c *consumerCluster) Consume(consume Consume) {
consume: guardedConsume,
})
}
group.Start()

return group
}

func (c *consumerCluster) unwrap(body []byte) ([]byte, bool) {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion kq/config.go → internal/kq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const (
lastOffset = "last"
)

type KqConf struct {
type Conf struct {
service.ServiceConf
Brokers []string
Group string
Expand Down
File renamed without changes.
Loading