diff --git a/.gitignore b/.gitignore index 924f09fb5..4959de17d 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,6 @@ rust/src/pb/*.rs composer.phar composer.lock vendor/ + +# Go +*.tests diff --git a/golang/client.go b/golang/client.go index 2b95f2bcb..313092ee5 100644 --- a/golang/client.go +++ b/golang/client.go @@ -51,11 +51,13 @@ type isClient interface { onVerifyMessageCommand(endpoints *v2.Endpoints, command *v2.VerifyMessageCommand) error } type defaultClientSession struct { - endpoints *v2.Endpoints - observer v2.MessagingService_TelemetryClient - observerLock sync.RWMutex - cli *defaultClient - timeout time.Duration + endpoints *v2.Endpoints + observer v2.MessagingService_TelemetryClient + observerLock sync.RWMutex + cli *defaultClient + timeout time.Duration + recovering bool + recoveryWaitTime time.Duration `default:"5s"` } func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error) { @@ -64,36 +66,79 @@ func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientS return nil, err } cs := &defaultClientSession{ - endpoints: endpoints, - cli: cli, - timeout: 365 * 24 * time.Hour, + endpoints: endpoints, + cli: cli, + timeout: 365 * 24 * time.Hour, + recovering: false, } cs.startUp() return cs, nil } + +func (cs *defaultClientSession) _acquire_observer() (v2.MessagingService_TelemetryClient, bool) { + cs.observerLock.RLock() + observer := cs.observer + cs.observerLock.RUnlock() + + if observer == nil { + time.Sleep(time.Second) + return nil, false + } else { + return observer, true + } + +} + +func (cs *defaultClientSession) _execute_server_telemetry_command(command *v2.TelemetryCommand) { + err := cs.handleTelemetryCommand(command) + if err != nil { + cs.cli.log.Errorf("telemetryCommand recv err=%w", err) + } else { + cs.cli.log.Info("Executed command successfully") + } +} + func (cs *defaultClientSession) startUp() { cs.cli.log.Infof("defaultClientSession is startUp! endpoints=%v", cs.endpoints) go func() { for { - cs.observerLock.RLock() - observer := cs.observer - cs.observerLock.RUnlock() - - if observer == nil { - time.Sleep(time.Second) + // ensure that observer is present, if not wait for it to be regenerated on publish. + observer, acquired_observer := cs._acquire_observer() + if !acquired_observer { continue } + response, err := observer.Recv() if err != nil { - cs.release() - - cs.cli.log.Errorf("telemetryCommand recv err=%w", err) + // we are recovering + if !cs.recovering { + cs.cli.log.Info("Encountered error while receiving TelemetryCommand, trying to recover") + // we wait five seconds to give time for the transmission error to be resolved externally before we attempt to read the message again. + time.Sleep(cs.recoveryWaitTime) + cs.recovering = true + } else { + // we are recovering but we failed to read the message again, resetting observer + cs.cli.log.Info("Failed to recover, err=%w", err) + cs.release() + cs.recovering = false + } continue } - err = cs.handleTelemetryCommand(response) - if err != nil { - cs.cli.log.Errorf("telemetryCommand recv err=%w", err) + // at this point we received the message and must confirm that the sender is healthy + if cs.recovering { + // we don't know which server sent the request so we must check that each of the servers is healthy. + // we assume that the list of the servers hasn't changed, so the server that sent the message is still present. + hearbeat_response, err := cs.cli.clientManager.HeartBeat(context.TODO(), cs.endpoints, &v2.HeartbeatRequest{}, 10*time.Second) + if err == nil && hearbeat_response.Status.Code == v2.Code_OK { + cs.cli.log.Info("Managed to recover, executing message") + cs._execute_server_telemetry_command(response) + } else { + cs.cli.log.Errorf("Failed to recover, Some of the servers are unhealthy, Heartbeat err=%w", err) + cs.release() + } + cs.recovering = false } + cs._execute_server_telemetry_command(response) } }() } @@ -199,6 +244,30 @@ var NewClient = func(config *Config, opts ...ClientOption) (Client, error) { return cli, nil } +var NewClientConcrete = func(config *Config, opts ...ClientOption) (*defaultClient, error) { + endpoints, err := utils.ParseTarget(config.Endpoint) + if err != nil { + return nil, err + } + cli := &defaultClient{ + config: config, + opts: defaultNSOptions, + clientID: utils.GenClientID(), + accessPoint: endpoints, + messageInterceptors: make([]MessageInterceptor, 0), + endpointsTelemetryClientTable: make(map[string]*defaultClientSession), + on: *atomic.NewBool(true), + clientManager: &MockClientManager{}, + } + cli.log = sugarBaseLogger.With("client_id", cli.clientID) + for _, opt := range opts { + opt.apply(&cli.opts) + } + cli.done = make(chan struct{}, 1) + cli.clientMeterProvider = NewDefaultClientMeterProvider(cli) + return cli, nil +} + func (cli *defaultClient) GetClientID() string { return cli.clientID } diff --git a/golang/client_manager.go b/golang/client_manager.go index 44e71a14b..e3e386643 100644 --- a/golang/client_manager.go +++ b/golang/client_manager.go @@ -137,7 +137,6 @@ func (cm *defaultClientManager) deleteRpcClient(rpcClient RpcClient) { } func (cm *defaultClientManager) clearIdleRpcClients() { - sugarBaseLogger.Info("clientManager start clearIdleRpcClients") cm.rpcClientTableLock.Lock() defer cm.rpcClientTableLock.Unlock() for target, rpcClient := range cm.rpcClientTable { diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go index 6d794c81e..d61582887 100644 --- a/golang/client_manager_test.go +++ b/golang/client_manager_test.go @@ -38,7 +38,9 @@ var MOCK_CLIENT *MockClient var MOCK_RPC_CLIENT *MockRpcClient type MOCK_MessagingService_TelemetryClient struct { - trace []string + trace []string + recv_error_count int `default:"0"` + cli *defaultClient `default:"nil"` } // CloseSend implements v2.MessagingService_TelemetryClient @@ -80,7 +82,18 @@ func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD { // Recv implements v2.MessagingService_TelemetryClient func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) { mt.trace = append(mt.trace, "recv") - return nil, io.EOF + if mt.recv_error_count >= 1 { + mt.recv_error_count -= 1 + return nil, io.EOF + } else { + if mt.cli == nil { + return nil, io.EOF + } else { + time.Sleep(time.Second) + command := mt.cli.getSettingsCommand() + return command, nil + } + } } // Send implements v2.MessagingService_TelemetryClient diff --git a/golang/client_test.go b/golang/client_test.go index 5265a7630..41079ccdb 100644 --- a/golang/client_test.go +++ b/golang/client_test.go @@ -18,15 +18,88 @@ package golang import ( + "context" "fmt" "testing" "time" "github.com/apache/rocketmq-clients/golang/credentials" + v2 "github.com/apache/rocketmq-clients/golang/protocol/v2" gomock "github.com/golang/mock/gomock" "github.com/prashantv/gostub" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) +func BuildCLient(t *testing.T) *defaultClient { + stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{ + RPC_CLIENT_MAX_IDLE_DURATION: time.Second, + + RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Hour, + RPC_CLIENT_IDLE_CHECK_PERIOD: time.Hour, + + HEART_BEAT_INITIAL_DELAY: time.Hour, + HEART_BEAT_PERIOD: time.Hour, + + LOG_STATS_INITIAL_DELAY: time.Hour, + LOG_STATS_PERIOD: time.Hour, + + SYNC_SETTINGS_DELAY: time.Hour, + SYNC_SETTINGS_PERIOD: time.Hour, + }) + + stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) { + if target == fakeAddress { + return MOCK_RPC_CLIENT, nil + } + return nil, fmt.Errorf("invalid target=%s", target) + }) + + defer func() { + stubs.Reset() + stubs2.Reset() + }() + + MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{ + trace: make([]string, 0), + }, nil) + + endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort) + cli, err := NewClientConcrete(&Config{ + Endpoint: endpoints, + Credentials: &credentials.SessionCredentials{}, + }) + if err != nil { + t.Error(err) + } + sugarBaseLogger.Info(cli) + err = cli.startUp() + if err != nil { + t.Error(err) + } + + return cli +} + +func GetClientAndDefaultClientSession(t *testing.T) (*defaultClient, *defaultClientSession) { + cli := BuildCLient(t) + default_cli_session, err := cli.getDefaultClientSession(fakeAddress) + if err != nil { + t.Error(err) + } + return cli, default_cli_session +} + +func PrepareTestLogger(cli *defaultClient) *observer.ObservedLogs { + observedZapCore, observedLogs := observer.New(zap.InfoLevel) + observedLogger := zap.New(observedZapCore) + cli.log = observedLogger.Sugar() + + return observedLogs +} + func TestCLINewClient(t *testing.T) { stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{ RPC_CLIENT_MAX_IDLE_DURATION: time.Second, @@ -74,3 +147,145 @@ func TestCLINewClient(t *testing.T) { t.Error(err) } } + +func Test_acquire_observer_uninitialized(t *testing.T) { + // given + _, default_cli_session := GetClientAndDefaultClientSession(t) + + // when + observer, acquired_observer := default_cli_session._acquire_observer() + + // then + if acquired_observer { + t.Error("Acquired observer even though it is uninitialized") + } + if observer != nil { + t.Error("Observer should be nil") + } +} + +func Test_acquire_observer_initialized(t *testing.T) { + // given + _, default_cli_session := GetClientAndDefaultClientSession(t) + default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{}) + + // when + observer, acquired_observer := default_cli_session._acquire_observer() + + // then + if !acquired_observer { + t.Error("Failed to acquire observer even though it is uninitialized") + } + if observer == nil { + t.Error("Observer should be not nil") + } +} + +func Test_execute_server_telemetry_command_fail(t *testing.T) { + // given + cli, default_cli_session := GetClientAndDefaultClientSession(t) + default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{}) + observedLogs := PrepareTestLogger(cli) + + // when + default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{}) + + // then + require.Equal(t, 1, observedLogs.Len()) + commandExecutionLog := observedLogs.All()[0] + assert.Equal(t, "telemetryCommand recv err=%!w(*errors.errorString=&{handleTelemetryCommand err = Command is nil})", commandExecutionLog.Message) +} + +func Test_execute_server_telemetry_command(t *testing.T) { + // given + cli, default_cli_session := GetClientAndDefaultClientSession(t) + default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{}) + observedLogs := PrepareTestLogger(cli) + + // when + default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{Command: &v2.TelemetryCommand_RecoverOrphanedTransactionCommand{}}) + + // then + require.Equal(t, 2, observedLogs.Len()) + commandExecutionLog := observedLogs.All()[1] + assert.Equal(t, "Executed command successfully", commandExecutionLog.Message) +} + +func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) { + // given + cli := BuildCLient(t) + default_cli_session, err := cli.getDefaultClientSession(fakeAddress) + if err != nil { + t.Error(err) + } + default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{}) + observedLogs := PrepareTestLogger(cli) + default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{ + recv_error_count: 0, + cli: cli, + } + default_cli_session.recoveryWaitTime = time.Second + cli.settings = &simpleConsumerSettings{} + + // when + // we wait some time while consumer goroutine runs + time.Sleep(3 * time.Second) + + // then + commandExecutionLog := observedLogs.All()[:2] + assert.Equal(t, "Executed command successfully", commandExecutionLog[0].Message) + assert.Equal(t, "Executed command successfully", commandExecutionLog[1].Message) +} + +func TestRestoreDefaultClientSessionOneError(t *testing.T) { + // given + cli := BuildCLient(t) + default_cli_session, err := cli.getDefaultClientSession(fakeAddress) + if err != nil { + t.Error(err) + } + default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{}) + observedLogs := PrepareTestLogger(cli) + default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{ + recv_error_count: 1, + cli: cli, + } + default_cli_session.recoveryWaitTime = time.Second + cli.settings = &simpleConsumerSettings{} + + // when + // we wait some time while consumer goroutine runs + time.Sleep(3 * time.Second) + + // then + commandExecutionLog := observedLogs.All()[:3] + assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message) + assert.Equal(t, "Managed to recover, executing message", commandExecutionLog[1].Message) + assert.Equal(t, "Executed command successfully", commandExecutionLog[2].Message) +} + +func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) { + // given + cli := BuildCLient(t) + default_cli_session, err := cli.getDefaultClientSession(fakeAddress) + if err != nil { + t.Error(err) + } + default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{}) + observedLogs := PrepareTestLogger(cli) + default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{ + recv_error_count: 2, + cli: cli, + } + default_cli_session.recoveryWaitTime = time.Second + cli.settings = &simpleConsumerSettings{} + + // when + // we wait some time while consumer goroutine runs + time.Sleep(3 * time.Second) + + // then + commandExecutionLog := observedLogs.All()[:2] + assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message) + assert.Equal(t, "Failed to recover, err=%wEOF", commandExecutionLog[1].Message) +} diff --git a/golang/rpc_client_mock.go b/golang/rpc_client_mock.go index 0cd2372cf..fb2ddf8a4 100644 --- a/golang/rpc_client_mock.go +++ b/golang/rpc_client_mock.go @@ -215,9 +215,7 @@ func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call { // idleDuration mocks base method. func (m *MockRpcClient) idleDuration() time.Duration { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "idleDuration") - ret0, _ := ret[0].(time.Duration) - return ret0 + return time.Hour } // idleDuration indicates an expected call of idleDuration.