diff --git a/golang/client.go b/golang/client.go index ee0dc6285..c094f1eae 100644 --- a/golang/client.go +++ b/golang/client.go @@ -507,7 +507,7 @@ func (cli *defaultClient) Sign(ctx context.Context) context.Context { innerMD.EncryptHeader, innerMD.Credential, cli.config.Credentials.AccessKey, - cli.config.Region, + "", innerMD.Rocketmq, innerMD.SignedHeaders, innerMD.DateTime, diff --git a/golang/client_test.go b/golang/client_test.go index b2a9328ec..25b43afec 100644 --- a/golang/client_test.go +++ b/golang/client_test.go @@ -28,7 +28,6 @@ func TestCLINewClient(t *testing.T) { endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort) cli, err := NewClient(&Config{ Endpoint: endpoints, - Group: "", Credentials: &credentials.SessionCredentials{}, }) if err != nil { diff --git a/golang/config.go b/golang/config.go index c07ce098c..42ead577d 100644 --- a/golang/config.go +++ b/golang/config.go @@ -20,9 +20,8 @@ package golang import "github.com/apache/rocketmq-clients/golang/credentials" type Config struct { - Endpoint string `validate:"required"` - Region string - NameSpace string - Group string `validate:"required"` - Credentials *credentials.SessionCredentials `validate:"required"` + Endpoint string `validate:"required"` + NameSpace string + ConsumerGroup string + Credentials *credentials.SessionCredentials `validate:"required"` } diff --git a/golang/example/consumer/simple_consumer/main.go b/golang/example/consumer/simple_consumer/main.go index c4139b22f..7be67c109 100644 --- a/golang/example/consumer/simple_consumer/main.go +++ b/golang/example/consumer/simple_consumer/main.go @@ -24,17 +24,16 @@ import ( "os" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( - Topic = "xxxxxx" - GroupName = "xxxxxx" - Endpoint = "xxxxxx" - Region = "xxxxxx" - AccessKey = "xxxxxx" - SecretKey = "xxxxxx" + Topic = "xxxxxx" + ConsumerGroup = "xxxxxx" + Endpoint = "xxxxxx" + AccessKey = "xxxxxx" + SecretKey = "xxxxxx" ) var ( @@ -50,20 +49,19 @@ var ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new simpleConsumer instance - simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{ - Endpoint: Endpoint, - Group: GroupName, - Region: Region, + simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{ + Endpoint: Endpoint, + ConsumerGroup: ConsumerGroup, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithAwaitDuration(awaitDuration), - golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{ - Topic: golang.SUB_ALL, + rmq_client.WithAwaitDuration(awaitDuration), + rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{ + Topic: rmq_client.SUB_ALL, }), ) if err != nil { diff --git a/golang/example/producer/async/main.go b/golang/example/producer/async/main.go index 6c1e1dfcf..5a1604770 100644 --- a/golang/example/producer/async/main.go +++ b/golang/example/producer/async/main.go @@ -25,15 +25,13 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -41,18 +39,16 @@ const ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -66,7 +62,7 @@ func main() { defer producer.GracefulStop() for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } @@ -74,7 +70,7 @@ func main() { msg.SetKeys("a", "b") msg.SetTag("ab") // send message in async - producer.SendAsync(context.TODO(), msg, func(ctx context.Context, resp []*golang.SendReceipt, err error) { + producer.SendAsync(context.TODO(), msg, func(ctx context.Context, resp []*rmq_client.SendReceipt, err error) { if err != nil { log.Fatal(err) } diff --git a/golang/example/producer/delay/main.go b/golang/example/producer/delay/main.go index 695e10718..8792c8e65 100644 --- a/golang/example/producer/delay/main.go +++ b/golang/example/producer/delay/main.go @@ -25,15 +25,13 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -41,18 +39,16 @@ const ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -66,7 +62,7 @@ func main() { defer producer.GracefulStop() for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } diff --git a/golang/example/producer/fifo/main.go b/golang/example/producer/fifo/main.go index c2250d3bd..4ecbed9fe 100644 --- a/golang/example/producer/fifo/main.go +++ b/golang/example/producer/fifo/main.go @@ -25,15 +25,13 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -41,18 +39,16 @@ const ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -66,7 +62,7 @@ func main() { defer producer.GracefulStop() for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } diff --git a/golang/example/producer/normal/main.go b/golang/example/producer/normal/main.go index 5be74978d..7b0aa0e37 100644 --- a/golang/example/producer/normal/main.go +++ b/golang/example/producer/normal/main.go @@ -25,33 +25,29 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) func main() { os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -66,7 +62,7 @@ func main() { for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } diff --git a/golang/example/producer/transaction/main.go b/golang/example/producer/transaction/main.go index ac38d3836..2424008bd 100644 --- a/golang/example/producer/transaction/main.go +++ b/golang/example/producer/transaction/main.go @@ -25,15 +25,13 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -41,24 +39,22 @@ const ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTransactionChecker(&golang.TransactionChecker{ - Check: func(msg *golang.MessageView) golang.TransactionResolution { + rmq_client.WithTransactionChecker(&rmq_client.TransactionChecker{ + Check: func(msg *rmq_client.MessageView) rmq_client.TransactionResolution { log.Printf("check transaction message: %v", msg) - return golang.COMMIT + return rmq_client.COMMIT }, }), - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -72,7 +68,7 @@ func main() { defer producer.GracefulStop() for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } diff --git a/golang/producer_test.go b/golang/producer_test.go index 312fcd7b8..1313e6961 100644 --- a/golang/producer_test.go +++ b/golang/producer_test.go @@ -52,7 +52,6 @@ func TestProducer(t *testing.T) { endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort) p, err := NewProducer(&Config{ Endpoint: endpoints, - Group: MOCK_GROUP, Credentials: &credentials.SessionCredentials{}, }) if err != nil { diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go index 81e98b8e2..fbb884445 100644 --- a/golang/simple_consumer.go +++ b/golang/simple_consumer.go @@ -320,6 +320,9 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp for _, opt := range opts { opt.apply(scOpts) } + if len(config.ConsumerGroup) == 0 { + return nil, fmt.Errorf("consumerGroup could not be nil") + } cli, err := scOpts.clientFunc(config) if err != nil { return nil, err @@ -327,7 +330,7 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp sc := &defaultSimpleConsumer{ scOpts: *scOpts, cli: cli.(*defaultClient), - groupName: config.Group, + groupName: config.ConsumerGroup, awaitDuration: scOpts.awaitDuration, subscriptionExpressions: scOpts.subscriptionExpressions,