From c761dcf435eafdbe57f1f4f1a48b9efc2524fd39 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Tue, 27 Sep 2022 10:54:01 +0800 Subject: [PATCH 1/2] remove redundant configuration --- golang/client.go | 2 +- golang/client_test.go | 1 - golang/config.go | 9 ++++----- golang/example/consumer/simple_consumer/main.go | 6 ++---- golang/example/producer/async/main.go | 4 ---- golang/example/producer/delay/main.go | 4 ---- golang/example/producer/fifo/main.go | 4 ---- golang/example/producer/normal/main.go | 4 ---- golang/example/producer/transaction/main.go | 4 ---- golang/producer_test.go | 1 - golang/simple_consumer.go | 5 ++++- 11 files changed, 11 insertions(+), 33 deletions(-) diff --git a/golang/client.go b/golang/client.go index ee0dc6285..c094f1eae 100644 --- a/golang/client.go +++ b/golang/client.go @@ -507,7 +507,7 @@ func (cli *defaultClient) Sign(ctx context.Context) context.Context { innerMD.EncryptHeader, innerMD.Credential, cli.config.Credentials.AccessKey, - cli.config.Region, + "", innerMD.Rocketmq, innerMD.SignedHeaders, innerMD.DateTime, diff --git a/golang/client_test.go b/golang/client_test.go index b2a9328ec..25b43afec 100644 --- a/golang/client_test.go +++ b/golang/client_test.go @@ -28,7 +28,6 @@ func TestCLINewClient(t *testing.T) { endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort) cli, err := NewClient(&Config{ Endpoint: endpoints, - Group: "", Credentials: &credentials.SessionCredentials{}, }) if err != nil { diff --git a/golang/config.go b/golang/config.go index c07ce098c..42ead577d 100644 --- a/golang/config.go +++ b/golang/config.go @@ -20,9 +20,8 @@ package golang import "github.com/apache/rocketmq-clients/golang/credentials" type Config struct { - Endpoint string `validate:"required"` - Region string - NameSpace string - Group string `validate:"required"` - Credentials *credentials.SessionCredentials `validate:"required"` + Endpoint string `validate:"required"` + NameSpace string + ConsumerGroup string + Credentials *credentials.SessionCredentials `validate:"required"` } diff --git a/golang/example/consumer/simple_consumer/main.go b/golang/example/consumer/simple_consumer/main.go index c4139b22f..b6ff29c6a 100644 --- a/golang/example/consumer/simple_consumer/main.go +++ b/golang/example/consumer/simple_consumer/main.go @@ -32,7 +32,6 @@ const ( Topic = "xxxxxx" GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -53,9 +52,8 @@ func main() { golang.ResetLogger() // new simpleConsumer instance simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{ - Endpoint: Endpoint, - Group: GroupName, - Region: Region, + Endpoint: Endpoint, + ConsumerGroup: GroupName, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, diff --git a/golang/example/producer/async/main.go b/golang/example/producer/async/main.go index 6c1e1dfcf..dde45dd32 100644 --- a/golang/example/producer/async/main.go +++ b/golang/example/producer/async/main.go @@ -31,9 +31,7 @@ import ( const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -45,8 +43,6 @@ func main() { // new producer instance producer, err := golang.NewProducer(&golang.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, diff --git a/golang/example/producer/delay/main.go b/golang/example/producer/delay/main.go index 695e10718..c896cc3b7 100644 --- a/golang/example/producer/delay/main.go +++ b/golang/example/producer/delay/main.go @@ -31,9 +31,7 @@ import ( const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -45,8 +43,6 @@ func main() { // new producer instance producer, err := golang.NewProducer(&golang.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, diff --git a/golang/example/producer/fifo/main.go b/golang/example/producer/fifo/main.go index c2250d3bd..816d299ab 100644 --- a/golang/example/producer/fifo/main.go +++ b/golang/example/producer/fifo/main.go @@ -31,9 +31,7 @@ import ( const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -45,8 +43,6 @@ func main() { // new producer instance producer, err := golang.NewProducer(&golang.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, diff --git a/golang/example/producer/normal/main.go b/golang/example/producer/normal/main.go index 5be74978d..d88ac014b 100644 --- a/golang/example/producer/normal/main.go +++ b/golang/example/producer/normal/main.go @@ -31,9 +31,7 @@ import ( const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -44,8 +42,6 @@ func main() { // new producer instance producer, err := golang.NewProducer(&golang.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, diff --git a/golang/example/producer/transaction/main.go b/golang/example/producer/transaction/main.go index ac38d3836..09a5db134 100644 --- a/golang/example/producer/transaction/main.go +++ b/golang/example/producer/transaction/main.go @@ -31,9 +31,7 @@ import ( const ( Topic = "xxxxxx" - GroupName = "xxxxxx" Endpoint = "xxxxxx" - Region = "xxxxxx" AccessKey = "xxxxxx" SecretKey = "xxxxxx" ) @@ -45,8 +43,6 @@ func main() { // new producer instance producer, err := golang.NewProducer(&golang.Config{ Endpoint: Endpoint, - Group: GroupName, - Region: Region, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, diff --git a/golang/producer_test.go b/golang/producer_test.go index 312fcd7b8..1313e6961 100644 --- a/golang/producer_test.go +++ b/golang/producer_test.go @@ -52,7 +52,6 @@ func TestProducer(t *testing.T) { endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort) p, err := NewProducer(&Config{ Endpoint: endpoints, - Group: MOCK_GROUP, Credentials: &credentials.SessionCredentials{}, }) if err != nil { diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go index 81e98b8e2..fbb884445 100644 --- a/golang/simple_consumer.go +++ b/golang/simple_consumer.go @@ -320,6 +320,9 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp for _, opt := range opts { opt.apply(scOpts) } + if len(config.ConsumerGroup) == 0 { + return nil, fmt.Errorf("consumerGroup could not be nil") + } cli, err := scOpts.clientFunc(config) if err != nil { return nil, err @@ -327,7 +330,7 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp sc := &defaultSimpleConsumer{ scOpts: *scOpts, cli: cli.(*defaultClient), - groupName: config.Group, + groupName: config.ConsumerGroup, awaitDuration: scOpts.awaitDuration, subscriptionExpressions: scOpts.subscriptionExpressions, From 2c4e725c83ffa5a28fece10de314fb8198dfc55f Mon Sep 17 00:00:00 2001 From: guyinyou Date: Mon, 10 Oct 2022 15:03:40 +0800 Subject: [PATCH 2/2] format code --- .../example/consumer/simple_consumer/main.go | 24 +++++++++---------- golang/example/producer/async/main.go | 12 +++++----- golang/example/producer/delay/main.go | 10 ++++---- golang/example/producer/fifo/main.go | 10 ++++---- golang/example/producer/normal/main.go | 10 ++++---- golang/example/producer/transaction/main.go | 16 ++++++------- 6 files changed, 41 insertions(+), 41 deletions(-) diff --git a/golang/example/consumer/simple_consumer/main.go b/golang/example/consumer/simple_consumer/main.go index b6ff29c6a..7be67c109 100644 --- a/golang/example/consumer/simple_consumer/main.go +++ b/golang/example/consumer/simple_consumer/main.go @@ -24,16 +24,16 @@ import ( "os" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( - Topic = "xxxxxx" - GroupName = "xxxxxx" - Endpoint = "xxxxxx" - AccessKey = "xxxxxx" - SecretKey = "xxxxxx" + Topic = "xxxxxx" + ConsumerGroup = "xxxxxx" + Endpoint = "xxxxxx" + AccessKey = "xxxxxx" + SecretKey = "xxxxxx" ) var ( @@ -49,19 +49,19 @@ var ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new simpleConsumer instance - simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{ + simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{ Endpoint: Endpoint, - ConsumerGroup: GroupName, + ConsumerGroup: ConsumerGroup, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithAwaitDuration(awaitDuration), - golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{ - Topic: golang.SUB_ALL, + rmq_client.WithAwaitDuration(awaitDuration), + rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{ + Topic: rmq_client.SUB_ALL, }), ) if err != nil { diff --git a/golang/example/producer/async/main.go b/golang/example/producer/async/main.go index dde45dd32..5a1604770 100644 --- a/golang/example/producer/async/main.go +++ b/golang/example/producer/async/main.go @@ -25,7 +25,7 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) @@ -39,16 +39,16 @@ const ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -62,7 +62,7 @@ func main() { defer producer.GracefulStop() for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } @@ -70,7 +70,7 @@ func main() { msg.SetKeys("a", "b") msg.SetTag("ab") // send message in async - producer.SendAsync(context.TODO(), msg, func(ctx context.Context, resp []*golang.SendReceipt, err error) { + producer.SendAsync(context.TODO(), msg, func(ctx context.Context, resp []*rmq_client.SendReceipt, err error) { if err != nil { log.Fatal(err) } diff --git a/golang/example/producer/delay/main.go b/golang/example/producer/delay/main.go index c896cc3b7..8792c8e65 100644 --- a/golang/example/producer/delay/main.go +++ b/golang/example/producer/delay/main.go @@ -25,7 +25,7 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) @@ -39,16 +39,16 @@ const ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -62,7 +62,7 @@ func main() { defer producer.GracefulStop() for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } diff --git a/golang/example/producer/fifo/main.go b/golang/example/producer/fifo/main.go index 816d299ab..4ecbed9fe 100644 --- a/golang/example/producer/fifo/main.go +++ b/golang/example/producer/fifo/main.go @@ -25,7 +25,7 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) @@ -39,16 +39,16 @@ const ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -62,7 +62,7 @@ func main() { defer producer.GracefulStop() for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } diff --git a/golang/example/producer/normal/main.go b/golang/example/producer/normal/main.go index d88ac014b..7b0aa0e37 100644 --- a/golang/example/producer/normal/main.go +++ b/golang/example/producer/normal/main.go @@ -25,7 +25,7 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) @@ -38,16 +38,16 @@ const ( func main() { os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -62,7 +62,7 @@ func main() { for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } diff --git a/golang/example/producer/transaction/main.go b/golang/example/producer/transaction/main.go index 09a5db134..2424008bd 100644 --- a/golang/example/producer/transaction/main.go +++ b/golang/example/producer/transaction/main.go @@ -25,7 +25,7 @@ import ( "strconv" "time" - "github.com/apache/rocketmq-clients/golang" + rmq_client "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) @@ -39,22 +39,22 @@ const ( func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") - golang.ResetLogger() + rmq_client.ResetLogger() // new producer instance - producer, err := golang.NewProducer(&golang.Config{ + producer, err := rmq_client.NewProducer(&rmq_client.Config{ Endpoint: Endpoint, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, - golang.WithTransactionChecker(&golang.TransactionChecker{ - Check: func(msg *golang.MessageView) golang.TransactionResolution { + rmq_client.WithTransactionChecker(&rmq_client.TransactionChecker{ + Check: func(msg *rmq_client.MessageView) rmq_client.TransactionResolution { log.Printf("check transaction message: %v", msg) - return golang.COMMIT + return rmq_client.COMMIT }, }), - golang.WithTopics(Topic), + rmq_client.WithTopics(Topic), ) if err != nil { log.Fatal(err) @@ -68,7 +68,7 @@ func main() { defer producer.GracefulStop() for i := 0; i < 10; i++ { // new a message - msg := &golang.Message{ + msg := &rmq_client.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), }