Skip to content

Commit

Permalink
try to recover on server TelemetryCommand transmission error (#408)
Browse files Browse the repository at this point in the history
* try to recover on server TelemetryCommand transmission error

* refactor modyfying the recovery state

* write tests for auxilary functions

* remove binary files

* add .test files to gitignore

* refactor tests

* write the rest of  tests

* move Go .gitignore to global .gitignore
  • Loading branch information
Anakin100100 committed Mar 20, 2023
1 parent 514056c commit 550f969
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 26 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ rust/src/pb/*.rs
composer.phar
composer.lock
vendor/

# Go
*.tests
109 changes: 89 additions & 20 deletions golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ type isClient interface {
onVerifyMessageCommand(endpoints *v2.Endpoints, command *v2.VerifyMessageCommand) error
}
type defaultClientSession struct {
endpoints *v2.Endpoints
observer v2.MessagingService_TelemetryClient
observerLock sync.RWMutex
cli *defaultClient
timeout time.Duration
endpoints *v2.Endpoints
observer v2.MessagingService_TelemetryClient
observerLock sync.RWMutex
cli *defaultClient
timeout time.Duration
recovering bool
recoveryWaitTime time.Duration `default:"5s"`
}

func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error) {
Expand All @@ -64,36 +66,79 @@ func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientS
return nil, err
}
cs := &defaultClientSession{
endpoints: endpoints,
cli: cli,
timeout: 365 * 24 * time.Hour,
endpoints: endpoints,
cli: cli,
timeout: 365 * 24 * time.Hour,
recovering: false,
}
cs.startUp()
return cs, nil
}

func (cs *defaultClientSession) _acquire_observer() (v2.MessagingService_TelemetryClient, bool) {
cs.observerLock.RLock()
observer := cs.observer
cs.observerLock.RUnlock()

if observer == nil {
time.Sleep(time.Second)
return nil, false
} else {
return observer, true
}

}

func (cs *defaultClientSession) _execute_server_telemetry_command(command *v2.TelemetryCommand) {
err := cs.handleTelemetryCommand(command)
if err != nil {
cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
} else {
cs.cli.log.Info("Executed command successfully")
}
}

func (cs *defaultClientSession) startUp() {
cs.cli.log.Infof("defaultClientSession is startUp! endpoints=%v", cs.endpoints)
go func() {
for {
cs.observerLock.RLock()
observer := cs.observer
cs.observerLock.RUnlock()

if observer == nil {
time.Sleep(time.Second)
// ensure that observer is present, if not wait for it to be regenerated on publish.
observer, acquired_observer := cs._acquire_observer()
if !acquired_observer {
continue
}

response, err := observer.Recv()
if err != nil {
cs.release()

cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
// we are recovering
if !cs.recovering {
cs.cli.log.Info("Encountered error while receiving TelemetryCommand, trying to recover")
// we wait five seconds to give time for the transmission error to be resolved externally before we attempt to read the message again.
time.Sleep(cs.recoveryWaitTime)
cs.recovering = true
} else {
// we are recovering but we failed to read the message again, resetting observer
cs.cli.log.Info("Failed to recover, err=%w", err)
cs.release()
cs.recovering = false
}
continue
}
err = cs.handleTelemetryCommand(response)
if err != nil {
cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
// at this point we received the message and must confirm that the sender is healthy
if cs.recovering {
// we don't know which server sent the request so we must check that each of the servers is healthy.
// we assume that the list of the servers hasn't changed, so the server that sent the message is still present.
hearbeat_response, err := cs.cli.clientManager.HeartBeat(context.TODO(), cs.endpoints, &v2.HeartbeatRequest{}, 10*time.Second)
if err == nil && hearbeat_response.Status.Code == v2.Code_OK {
cs.cli.log.Info("Managed to recover, executing message")
cs._execute_server_telemetry_command(response)
} else {
cs.cli.log.Errorf("Failed to recover, Some of the servers are unhealthy, Heartbeat err=%w", err)
cs.release()
}
cs.recovering = false
}
cs._execute_server_telemetry_command(response)
}
}()
}
Expand Down Expand Up @@ -199,6 +244,30 @@ var NewClient = func(config *Config, opts ...ClientOption) (Client, error) {
return cli, nil
}

var NewClientConcrete = func(config *Config, opts ...ClientOption) (*defaultClient, error) {
endpoints, err := utils.ParseTarget(config.Endpoint)
if err != nil {
return nil, err
}
cli := &defaultClient{
config: config,
opts: defaultNSOptions,
clientID: utils.GenClientID(),
accessPoint: endpoints,
messageInterceptors: make([]MessageInterceptor, 0),
endpointsTelemetryClientTable: make(map[string]*defaultClientSession),
on: *atomic.NewBool(true),
clientManager: &MockClientManager{},
}
cli.log = sugarBaseLogger.With("client_id", cli.clientID)
for _, opt := range opts {
opt.apply(&cli.opts)
}
cli.done = make(chan struct{}, 1)
cli.clientMeterProvider = NewDefaultClientMeterProvider(cli)
return cli, nil
}

func (cli *defaultClient) GetClientID() string {
return cli.clientID
}
Expand Down
1 change: 0 additions & 1 deletion golang/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func (cm *defaultClientManager) deleteRpcClient(rpcClient RpcClient) {
}

func (cm *defaultClientManager) clearIdleRpcClients() {
sugarBaseLogger.Info("clientManager start clearIdleRpcClients")
cm.rpcClientTableLock.Lock()
defer cm.rpcClientTableLock.Unlock()
for target, rpcClient := range cm.rpcClientTable {
Expand Down
17 changes: 15 additions & 2 deletions golang/client_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ var MOCK_CLIENT *MockClient
var MOCK_RPC_CLIENT *MockRpcClient

type MOCK_MessagingService_TelemetryClient struct {
trace []string
trace []string
recv_error_count int `default:"0"`
cli *defaultClient `default:"nil"`
}

// CloseSend implements v2.MessagingService_TelemetryClient
Expand Down Expand Up @@ -80,7 +82,18 @@ func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
// Recv implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
mt.trace = append(mt.trace, "recv")
return nil, io.EOF
if mt.recv_error_count >= 1 {
mt.recv_error_count -= 1
return nil, io.EOF
} else {
if mt.cli == nil {
return nil, io.EOF
} else {
time.Sleep(time.Second)
command := mt.cli.getSettingsCommand()
return command, nil
}
}
}

// Send implements v2.MessagingService_TelemetryClient
Expand Down
Loading

0 comments on commit 550f969

Please sign in to comment.