diff --git a/example/rabbitmq/admin/admin.go b/example/rabbitmq/admin/admin.go index d0ee705..f34f1b8 100644 --- a/example/rabbitmq/admin/admin.go +++ b/example/rabbitmq/admin/admin.go @@ -8,7 +8,7 @@ import ( func main() { conf := rabbitmq.RabbitConf{ - Host: "192.168.253.100", + Host: "127.0.0.1", Port: 5672, Username: "guest", Password: "guest", diff --git a/example/rabbitmq/listener/listener.yaml b/example/rabbitmq/listener/listener.yaml index 0b0a516..00a57b0 100644 --- a/example/rabbitmq/listener/listener.yaml +++ b/example/rabbitmq/listener/listener.yaml @@ -1,7 +1,7 @@ ListenerConf: Username: guest Password: guest - Host: 192.168.253.100 + Host: 127.0.0.1 Port: 5672 ListenerQueues: - diff --git a/example/rabbitmq/listener/main.go b/example/rabbitmq/listener/main.go index c8fb934..bc64540 100644 --- a/example/rabbitmq/listener/main.go +++ b/example/rabbitmq/listener/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + amqp "github.com/rabbitmq/amqp091-go" "github.com/zeromicro/go-queue/example/rabbitmq/listener/config" "github.com/zeromicro/go-queue/rabbitmq" @@ -27,7 +28,7 @@ func main() { type Handler struct { } -func (h Handler) Consume(message string) error { - fmt.Printf("listener %s\n", message) +func (h Handler) Consume(d amqp.Delivery) error { + fmt.Printf("listener %s\n", d.Body) return nil } diff --git a/example/rabbitmq/sender/main.go b/example/rabbitmq/sender/main.go index 0eb3161..edd4f26 100644 --- a/example/rabbitmq/sender/main.go +++ b/example/rabbitmq/sender/main.go @@ -9,7 +9,7 @@ import ( func main() { conf := rabbitmq.RabbitSenderConf{RabbitConf: rabbitmq.RabbitConf{ - Host: "192.168.253.100", + Host: "127.0.0.1", Port: 5672, Username: "guest", Password: "guest", diff --git a/rabbitmq/rabbitmqadmin.go b/rabbitmq/admin.go similarity index 100% rename from rabbitmq/rabbitmqadmin.go rename to rabbitmq/admin.go diff --git a/rabbitmq/config.go b/rabbitmq/config.go index 3b1f1ff..3ca34a1 100644 --- a/rabbitmq/config.go +++ b/rabbitmq/config.go @@ -23,7 +23,8 @@ type ConsumerConf struct { // cannot be delivered to consumers in this connection. NoLocal bool `json:",default=false"` // Whether to block processing - NoWait bool `json:",default=false"` + NoWait bool `json:",default=false"` + PrefetchCount int `json:",default=0"` } type RabbitSenderConf struct { diff --git a/rabbitmq/listener.go b/rabbitmq/listener.go index 90de462..13131e5 100644 --- a/rabbitmq/listener.go +++ b/rabbitmq/listener.go @@ -9,10 +9,10 @@ import ( ) type ( - ConsumeHandle func(message string) error + ConsumeHandle func(d amqp.Delivery) error ConsumeHandler interface { - Consume(message string) error + Consume(d amqp.Delivery) error } RabbitListener struct { @@ -43,6 +43,10 @@ func MustNewListener(listenerConf RabbitListenerConf, handler ConsumeHandler) qu func (q RabbitListener) Start() { for _, que := range q.queues.ListenerQueues { + err := q.channel.Qos(que.PrefetchCount, 0, false) + if err != nil { + log.Fatalf("failed to qos, error: %v", err) + } msg, err := q.channel.Consume( que.Name, "", @@ -58,7 +62,7 @@ func (q RabbitListener) Start() { go func() { for d := range msg { - if err := q.handler.Consume(string(d.Body)); err != nil { + if err := q.handler.Consume(d); err != nil { logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err) } }