Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

golang: fast to the ok state #295

Merged
merged 2 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,14 @@ func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteReque
func (cli *defaultClient) getTotalTargets() []string {
endpoints := make([]string, 0)
endpointsSet := make(map[string]bool)
for _, address := range cli.accessPoint.GetAddresses() {
target := utils.ParseAddress(address)
if _, ok := endpointsSet[target]; ok {
continue
}
endpointsSet[target] = true
endpoints = append(endpoints, target)
}
cli.router.Range(func(_, v interface{}) bool {
messageQueues := v.([]*v2.MessageQueue)
for _, messageQueue := range messageQueues {
Expand Down Expand Up @@ -372,7 +380,7 @@ func (cli *defaultClient) Heartbeat() {
}

func (cli *defaultClient) trySyncSettings() {
cli.log.Info("start syncSetting")
cli.log.Info("start trySyncSettings")
command := cli.getSettingsCommand()
targets := cli.getTotalTargets()
for _, target := range targets {
Expand All @@ -381,7 +389,7 @@ func (cli *defaultClient) trySyncSettings() {
}

func (cli *defaultClient) mustSyncSettings() error {
cli.log.Info("start syncSetting")
cli.log.Info("start mustSyncSettings")
command := cli.getSettingsCommand()
targets := cli.getTotalTargets()
for _, target := range targets {
Expand Down
57 changes: 57 additions & 0 deletions golang/client_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package golang
import (
"context"
"fmt"
"io"
"os"
"testing"
"time"

v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
"google.golang.org/grpc/metadata"
)

var MOCK_CLIENT_ID = "mock_client_id"
Expand All @@ -35,6 +37,60 @@ var MOCK_GROUP = "mock_group"
var MOCK_CLIENT *MockClient
var MOCK_RPC_CLIENT *MockRpcClient

type MOCK_MessagingService_TelemetryClient struct {
trace []string
}

// CloseSend implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
mt.trace = append(mt.trace, "closesend")
return nil
}

// Context implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
mt.trace = append(mt.trace, "context")
return nil
}

// Header implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) {
mt.trace = append(mt.trace, "header")
return nil, nil
}

// RecvMsg implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
mt.trace = append(mt.trace, "recvmsg")
return nil
}

// SendMsg implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
mt.trace = append(mt.trace, "sendmsg")
return nil
}

// Trailer implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
mt.trace = append(mt.trace, "trailer")
return nil
}

// Recv implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
mt.trace = append(mt.trace, "recv")
return nil, io.EOF
}

// Send implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) error {
mt.trace = append(mt.trace, "send")
return nil
}

var _ = v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})

func TestMain(m *testing.M) {
os.Setenv("mq.consoleAppender.enabled", "true")
ResetLogger()
Expand All @@ -50,6 +106,7 @@ func TestMain(m *testing.M) {
Code: v2.Code_OK,
},
}, nil).AnyTimes()

MOCK_RPC_CLIENT.EXPECT().GracefulStop().Return(nil).AnyTimes()
MOCK_RPC_CLIENT.EXPECT().GetTarget().Return(fakeAddresss).AnyTimes()
stubs := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
Expand Down
35 changes: 35 additions & 0 deletions golang/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,46 @@ package golang
import (
"fmt"
"testing"
"time"

"github.com/apache/rocketmq-clients/golang/credentials"
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
)

func TestCLINewClient(t *testing.T) {
stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
RPC_CLIENT_MAX_IDLE_DURATION: time.Second,

RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Hour,
RPC_CLIENT_IDLE_CHECK_PERIOD: time.Hour,

HEART_BEAT_INITIAL_DELAY: time.Hour,
HEART_BEAT_PERIOD: time.Hour,

LOG_STATS_INITIAL_DELAY: time.Hour,
LOG_STATS_PERIOD: time.Hour,

SYNC_SETTINGS_DELAY: time.Hour,
SYNC_SETTINGS_PERIOD: time.Hour,
})

stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
if target == fakeAddresss {
return MOCK_RPC_CLIENT, nil
}
return nil, fmt.Errorf("invalid target=%s", target)
})

defer func() {
stubs.Reset()
stubs2.Reset()
}()

MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
trace: make([]string, 0),
}, nil)

endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
cli, err := NewClient(&Config{
Endpoint: endpoints,
Expand Down
104 changes: 25 additions & 79 deletions golang/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ package golang
import (
"context"
"fmt"
"io"
"testing"
"time"

"github.com/apache/rocketmq-clients/golang/credentials"
v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
"google.golang.org/grpc/metadata"
)

func TestProducer(t *testing.T) {
Expand All @@ -47,7 +45,22 @@ func TestProducer(t *testing.T) {
SYNC_SETTINGS_DELAY: time.Hour,
SYNC_SETTINGS_PERIOD: time.Hour,
})
defer stubs.Reset()

stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
if target == fakeAddresss {
return MOCK_RPC_CLIENT, nil
}
return nil, fmt.Errorf("invalid target=%s", target)
})

defer func() {
stubs.Reset()
stubs2.Reset()
}()

MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
trace: make([]string, 0),
}, nil).AnyTimes()

endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
p, err := NewProducer(&Config{
Expand All @@ -72,7 +85,7 @@ func TestProducer(t *testing.T) {
v2.MessageType_TRANSACTION,
},
}},
}, nil)
}, nil).AnyTimes()
err = p.Start()
if err != nil {
t.Error(err)
Expand All @@ -88,7 +101,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()

_, err := p.Send(context.TODO(), msg)
if err != nil {
Expand All @@ -101,7 +114,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()

done := make(chan bool)
p.SendAsync(context.TODO(), msg, func(ctx context.Context, sr []*SendReceipt, err error) {
Expand All @@ -118,12 +131,12 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()
MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), gomock.Any()).Return(&v2.EndTransactionResponse{
Status: &v2.Status{
Code: v2.Code_OK,
},
}, nil)
}, nil).AnyTimes()

transaction := p.BeginTransaction()
_, err := p.SendWithTransaction(context.TODO(), msg, transaction)
Expand All @@ -141,12 +154,12 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()
MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), gomock.Any()).Return(&v2.EndTransactionResponse{
Status: &v2.Status{
Code: v2.Code_OK,
},
}, nil)
}, nil).AnyTimes()

transaction := p.BeginTransaction()
_, err := p.SendWithTransaction(context.TODO(), msg, transaction)
Expand All @@ -164,7 +177,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()
msg.SetMessageGroup(MOCK_GROUP)
defer func() { msg.messageGroup = nil }()
_, err := p.Send(context.TODO(), msg)
Expand All @@ -178,7 +191,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()
msg.SetDelayTimestamp(time.Now().Add(time.Hour))
defer func() { msg.deliveryTimestamp = nil }()
_, err := p.Send(context.TODO(), msg)
Expand All @@ -204,77 +217,10 @@ func TestProducer(t *testing.T) {
t.Error(err)
}
})
t.Run("syncsettings", func(t *testing.T) {
mt := &MOCK_MessagingService_TelemetryClient{
trace: make([]string, 0),
}
MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(mt, nil)
p.(*defaultProducer).cli.clientManager.(*defaultClientManager).syncSettings()
for {
time.Sleep(time.Duration(100))
if len(mt.trace) >= 3 && mt.trace[0] == "send" && mt.trace[1] == "recv" && mt.trace[2] == "closesend" {
break
}
}
})
t.Run("do heartbeat", func(t *testing.T) {
err := p.(*defaultProducer).cli.doHeartbeat(endpoints, nil)
if err != nil {
t.Error(err)
}
})
}

type MOCK_MessagingService_TelemetryClient struct {
trace []string
}

// CloseSend implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
mt.trace = append(mt.trace, "closesend")
return nil
}

// Context implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
mt.trace = append(mt.trace, "context")
return nil
}

// Header implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) {
mt.trace = append(mt.trace, "header")
return nil, nil
}

// RecvMsg implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
mt.trace = append(mt.trace, "recvmsg")
return nil
}

// SendMsg implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
mt.trace = append(mt.trace, "sendmsg")
return nil
}

// Trailer implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
mt.trace = append(mt.trace, "trailer")
return nil
}

// Recv implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
mt.trace = append(mt.trace, "recv")
return nil, io.EOF
}

// Send implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) error {
mt.trace = append(mt.trace, "send")
return nil
}

var _ = v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})
2 changes: 1 addition & 1 deletion golang/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request *v2
break
}
if err != nil {
sc.cli.log.Errorf("simpleConsumer recv msg err=%w, requestId=%s", err, utils.GetRequestID(ctx))
sc.cli.log.Errorf("simpleConsumer recv msg err=%v, requestId=%s", err, utils.GetRequestID(ctx))
break
}
sugarBaseLogger.Debugf("receiveMessage response: %v", resp)
Expand Down