Skip to content

Commit

Permalink
golang: fast to the ok state (#295)
Browse files Browse the repository at this point in the history
* golang: fast to the ok state

* fix utest

Co-authored-by: guyinyou <[email protected]>
  • Loading branch information
2 people authored and lizhanhui committed Jan 10, 2023
1 parent afda894 commit cb19057
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 82 deletions.
12 changes: 10 additions & 2 deletions golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,14 @@ func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteReque
func (cli *defaultClient) getTotalTargets() []string {
endpoints := make([]string, 0)
endpointsSet := make(map[string]bool)
for _, address := range cli.accessPoint.GetAddresses() {
target := utils.ParseAddress(address)
if _, ok := endpointsSet[target]; ok {
continue
}
endpointsSet[target] = true
endpoints = append(endpoints, target)
}
cli.router.Range(func(_, v interface{}) bool {
messageQueues := v.([]*v2.MessageQueue)
for _, messageQueue := range messageQueues {
Expand Down Expand Up @@ -372,7 +380,7 @@ func (cli *defaultClient) Heartbeat() {
}

func (cli *defaultClient) trySyncSettings() {
cli.log.Info("start syncSetting")
cli.log.Info("start trySyncSettings")
command := cli.getSettingsCommand()
targets := cli.getTotalTargets()
for _, target := range targets {
Expand All @@ -381,7 +389,7 @@ func (cli *defaultClient) trySyncSettings() {
}

func (cli *defaultClient) mustSyncSettings() error {
cli.log.Info("start syncSetting")
cli.log.Info("start mustSyncSettings")
command := cli.getSettingsCommand()
targets := cli.getTotalTargets()
for _, target := range targets {
Expand Down
57 changes: 57 additions & 0 deletions golang/client_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package golang
import (
"context"
"fmt"
"io"
"os"
"testing"
"time"

v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
"google.golang.org/grpc/metadata"
)

var MOCK_CLIENT_ID = "mock_client_id"
Expand All @@ -35,6 +37,60 @@ var MOCK_GROUP = "mock_group"
var MOCK_CLIENT *MockClient
var MOCK_RPC_CLIENT *MockRpcClient

type MOCK_MessagingService_TelemetryClient struct {
trace []string
}

// CloseSend implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
mt.trace = append(mt.trace, "closesend")
return nil
}

// Context implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
mt.trace = append(mt.trace, "context")
return nil
}

// Header implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) {
mt.trace = append(mt.trace, "header")
return nil, nil
}

// RecvMsg implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
mt.trace = append(mt.trace, "recvmsg")
return nil
}

// SendMsg implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
mt.trace = append(mt.trace, "sendmsg")
return nil
}

// Trailer implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
mt.trace = append(mt.trace, "trailer")
return nil
}

// Recv implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
mt.trace = append(mt.trace, "recv")
return nil, io.EOF
}

// Send implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) error {
mt.trace = append(mt.trace, "send")
return nil
}

var _ = v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})

func TestMain(m *testing.M) {
os.Setenv("mq.consoleAppender.enabled", "true")
ResetLogger()
Expand All @@ -50,6 +106,7 @@ func TestMain(m *testing.M) {
Code: v2.Code_OK,
},
}, nil).AnyTimes()

MOCK_RPC_CLIENT.EXPECT().GracefulStop().Return(nil).AnyTimes()
MOCK_RPC_CLIENT.EXPECT().GetTarget().Return(fakeAddresss).AnyTimes()
stubs := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
Expand Down
35 changes: 35 additions & 0 deletions golang/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,46 @@ package golang
import (
"fmt"
"testing"
"time"

"github.com/apache/rocketmq-clients/golang/credentials"
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
)

func TestCLINewClient(t *testing.T) {
stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
RPC_CLIENT_MAX_IDLE_DURATION: time.Second,

RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Hour,
RPC_CLIENT_IDLE_CHECK_PERIOD: time.Hour,

HEART_BEAT_INITIAL_DELAY: time.Hour,
HEART_BEAT_PERIOD: time.Hour,

LOG_STATS_INITIAL_DELAY: time.Hour,
LOG_STATS_PERIOD: time.Hour,

SYNC_SETTINGS_DELAY: time.Hour,
SYNC_SETTINGS_PERIOD: time.Hour,
})

stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
if target == fakeAddresss {
return MOCK_RPC_CLIENT, nil
}
return nil, fmt.Errorf("invalid target=%s", target)
})

defer func() {
stubs.Reset()
stubs2.Reset()
}()

MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
trace: make([]string, 0),
}, nil)

endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
cli, err := NewClient(&Config{
Endpoint: endpoints,
Expand Down
104 changes: 25 additions & 79 deletions golang/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ package golang
import (
"context"
"fmt"
"io"
"testing"
"time"

"github.com/apache/rocketmq-clients/golang/credentials"
v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
"google.golang.org/grpc/metadata"
)

func TestProducer(t *testing.T) {
Expand All @@ -47,7 +45,22 @@ func TestProducer(t *testing.T) {
SYNC_SETTINGS_DELAY: time.Hour,
SYNC_SETTINGS_PERIOD: time.Hour,
})
defer stubs.Reset()

stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
if target == fakeAddresss {
return MOCK_RPC_CLIENT, nil
}
return nil, fmt.Errorf("invalid target=%s", target)
})

defer func() {
stubs.Reset()
stubs2.Reset()
}()

MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
trace: make([]string, 0),
}, nil).AnyTimes()

endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
p, err := NewProducer(&Config{
Expand All @@ -72,7 +85,7 @@ func TestProducer(t *testing.T) {
v2.MessageType_TRANSACTION,
},
}},
}, nil)
}, nil).AnyTimes()
err = p.Start()
if err != nil {
t.Error(err)
Expand All @@ -88,7 +101,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()

_, err := p.Send(context.TODO(), msg)
if err != nil {
Expand All @@ -101,7 +114,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()

done := make(chan bool)
p.SendAsync(context.TODO(), msg, func(ctx context.Context, sr []*SendReceipt, err error) {
Expand All @@ -118,12 +131,12 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()
MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), gomock.Any()).Return(&v2.EndTransactionResponse{
Status: &v2.Status{
Code: v2.Code_OK,
},
}, nil)
}, nil).AnyTimes()

transaction := p.BeginTransaction()
_, err := p.SendWithTransaction(context.TODO(), msg, transaction)
Expand All @@ -141,12 +154,12 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()
MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), gomock.Any()).Return(&v2.EndTransactionResponse{
Status: &v2.Status{
Code: v2.Code_OK,
},
}, nil)
}, nil).AnyTimes()

transaction := p.BeginTransaction()
_, err := p.SendWithTransaction(context.TODO(), msg, transaction)
Expand All @@ -164,7 +177,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()
msg.SetMessageGroup(MOCK_GROUP)
defer func() { msg.messageGroup = nil }()
_, err := p.Send(context.TODO(), msg)
Expand All @@ -178,7 +191,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
}, nil)
}, nil).AnyTimes()
msg.SetDelayTimestamp(time.Now().Add(time.Hour))
defer func() { msg.deliveryTimestamp = nil }()
_, err := p.Send(context.TODO(), msg)
Expand All @@ -204,77 +217,10 @@ func TestProducer(t *testing.T) {
t.Error(err)
}
})
t.Run("syncsettings", func(t *testing.T) {
mt := &MOCK_MessagingService_TelemetryClient{
trace: make([]string, 0),
}
MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(mt, nil)
p.(*defaultProducer).cli.clientManager.(*defaultClientManager).syncSettings()
for {
time.Sleep(time.Duration(100))
if len(mt.trace) >= 3 && mt.trace[0] == "send" && mt.trace[1] == "recv" && mt.trace[2] == "closesend" {
break
}
}
})
t.Run("do heartbeat", func(t *testing.T) {
err := p.(*defaultProducer).cli.doHeartbeat(endpoints, nil)
if err != nil {
t.Error(err)
}
})
}

type MOCK_MessagingService_TelemetryClient struct {
trace []string
}

// CloseSend implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
mt.trace = append(mt.trace, "closesend")
return nil
}

// Context implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
mt.trace = append(mt.trace, "context")
return nil
}

// Header implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) {
mt.trace = append(mt.trace, "header")
return nil, nil
}

// RecvMsg implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
mt.trace = append(mt.trace, "recvmsg")
return nil
}

// SendMsg implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
mt.trace = append(mt.trace, "sendmsg")
return nil
}

// Trailer implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
mt.trace = append(mt.trace, "trailer")
return nil
}

// Recv implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
mt.trace = append(mt.trace, "recv")
return nil, io.EOF
}

// Send implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) error {
mt.trace = append(mt.trace, "send")
return nil
}

var _ = v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})
2 changes: 1 addition & 1 deletion golang/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request *v2
break
}
if err != nil {
sc.cli.log.Errorf("simpleConsumer recv msg err=%w, requestId=%s", err, utils.GetRequestID(ctx))
sc.cli.log.Errorf("simpleConsumer recv msg err=%v, requestId=%s", err, utils.GetRequestID(ctx))
break
}
sugarBaseLogger.Debugf("receiveMessage response: %v", resp)
Expand Down

0 comments on commit cb19057

Please sign in to comment.