Skip to content

Commit

Permalink
RabbitMQ增加手动确认功能
Browse files Browse the repository at this point in the history
  • Loading branch information
L-fushen committed May 21, 2023
1 parent 19cc7bf commit a6f5113
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 9 deletions.
2 changes: 1 addition & 1 deletion example/rabbitmq/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func main() {
conf := rabbitmq.RabbitConf{
Host: "192.168.253.100",
Host: "127.0.0.1",
Port: 5672,
Username: "guest",
Password: "guest",
Expand Down
2 changes: 1 addition & 1 deletion example/rabbitmq/listener/listener.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ListenerConf:
Username: guest
Password: guest
Host: 192.168.253.100
Host: 127.0.0.1
Port: 5672
ListenerQueues:
-
Expand Down
5 changes: 3 additions & 2 deletions example/rabbitmq/listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"

"github.com/zeromicro/go-queue/example/rabbitmq/listener/config"
"github.com/zeromicro/go-queue/rabbitmq"
Expand All @@ -27,7 +28,7 @@ func main() {
type Handler struct {
}

func (h Handler) Consume(message string) error {
fmt.Printf("listener %s\n", message)
func (h Handler) Consume(d amqp.Delivery) error {
fmt.Printf("listener %s\n", d.Body)
return nil
}
2 changes: 1 addition & 1 deletion example/rabbitmq/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func main() {
conf := rabbitmq.RabbitSenderConf{RabbitConf: rabbitmq.RabbitConf{
Host: "192.168.253.100",
Host: "127.0.0.1",
Port: 5672,
Username: "guest",
Password: "guest",
Expand Down
File renamed without changes.
3 changes: 2 additions & 1 deletion rabbitmq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type ConsumerConf struct {
// cannot be delivered to consumers in this connection.
NoLocal bool `json:",default=false"`
// Whether to block processing
NoWait bool `json:",default=false"`
NoWait bool `json:",default=false"`
PrefetchCount int `json:",default=0"`
}

type RabbitSenderConf struct {
Expand Down
10 changes: 7 additions & 3 deletions rabbitmq/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
)

type (
ConsumeHandle func(message string) error
ConsumeHandle func(d amqp.Delivery) error

ConsumeHandler interface {
Consume(message string) error
Consume(d amqp.Delivery) error
}

RabbitListener struct {
Expand Down Expand Up @@ -43,6 +43,10 @@ func MustNewListener(listenerConf RabbitListenerConf, handler ConsumeHandler) qu

func (q RabbitListener) Start() {
for _, que := range q.queues.ListenerQueues {
err := q.channel.Qos(que.PrefetchCount, 0, false)
if err != nil {
log.Fatalf("failed to qos, error: %v", err)
}
msg, err := q.channel.Consume(
que.Name,
"",
Expand All @@ -58,7 +62,7 @@ func (q RabbitListener) Start() {

go func() {
for d := range msg {
if err := q.handler.Consume(string(d.Body)); err != nil {
if err := q.handler.Consume(d); err != nil {
logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err)
}
}
Expand Down

0 comments on commit a6f5113

Please sign in to comment.