Skip to content

Commit

Permalink
fix telemeter logic (#348)
Browse files Browse the repository at this point in the history
Co-authored-by: guyinyou <[email protected]>
  • Loading branch information
guyinyou and guyinyou committed Feb 3, 2023
1 parent 83493ea commit 70d4382
Showing 1 changed file with 22 additions and 36 deletions.
58 changes: 22 additions & 36 deletions golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,22 @@ func (cli *defaultClient) getMessageQueues(ctx context.Context, topic string) ([
if err != nil {
return nil, err
}

// telemeter to all messageQueues
endpointsSet := make(map[string]bool)
for _, messageQueue := range route {
for _, address := range messageQueue.GetBroker().GetEndpoints().GetAddresses() {
target := utils.ParseAddress(address)
if _, ok := endpointsSet[target]; ok {
continue
}
endpointsSet[target] = true
if err = cli.mustSyncSettingsToTargert(target); err != nil {
return nil, err
}
}
}

cli.router.Store(topic, route)
return route, nil
}
Expand Down Expand Up @@ -299,14 +315,7 @@ func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteReque
func (cli *defaultClient) getTotalTargets() []string {
endpoints := make([]string, 0)
endpointsSet := make(map[string]bool)
for _, address := range cli.accessPoint.GetAddresses() {
target := utils.ParseAddress(address)
if _, ok := endpointsSet[target]; ok {
continue
}
endpointsSet[target] = true
endpoints = append(endpoints, target)
}

cli.router.Range(func(_, v interface{}) bool {
messageQueues := v.([]*v2.MessageQueue)
for _, messageQueue := range messageQueues {
Expand Down Expand Up @@ -388,16 +397,9 @@ func (cli *defaultClient) trySyncSettings() {
}
}

func (cli *defaultClient) mustSyncSettings() error {
cli.log.Info("start mustSyncSettings")
func (cli *defaultClient) mustSyncSettingsToTargert(target string) error {
command := cli.getSettingsCommand()
targets := cli.getTotalTargets()
for _, target := range targets {
if err := cli.telemeter(target, command); err != nil {
return err
}
}
return nil
return cli.telemeter(target, command)
}

func (cli *defaultClient) telemeter(target string, command *v2.TelemetryCommand) error {
Expand All @@ -423,27 +425,11 @@ func (cli *defaultClient) startUp() error {
cm.RegisterClient(cli)
cli.clientManager = cm
for _, topic := range cli.initTopics {
maxAttempts := int(cli.settings.GetRetryPolicy().GetMaxAttempts())
for i := 0; i < maxAttempts; i++ {
_, err := cli.getMessageQueues(context.Background(), topic)
if err != nil {
if i == maxAttempts-1 {
return fmt.Errorf("failed to get topic route data result from remote during client startup, clientId=%s, topics=%v, err=%v", cli.clientID, cli.initTopics, err)
} else {
cli.log.Errorf("failed to get topic route data result from remote during client startup, topics=%v, err=%v. retry attempt=%d", cli.initTopics, err, i)
time.Sleep(time.Second * 3)
}
} else {
if i > 0 {
cli.log.Infof("retry to get topic route data success, attempts=%d\n", i)
}
break
}
_, err := cli.getMessageQueues(context.Background(), topic)
if err != nil {
return fmt.Errorf("failed to get topic route data result from remote during client startup, clientId=%s, topics=%v, err=%v", cli.clientID, cli.initTopics, err)
}
}
if err := cli.mustSyncSettings(); err != nil {
return err
}
f := func() {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)
Expand Down

0 comments on commit 70d4382

Please sign in to comment.