Skip to content

Commit

Permalink
Fix warnings and errors with xds
Browse files Browse the repository at this point in the history
  • Loading branch information
ajroetker committed May 22, 2023
1 parent 57c7a59 commit 3449b2b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
5 changes: 4 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/lytics/retry"
etcdv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
grpcBackoff "google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
Expand Down Expand Up @@ -289,7 +290,9 @@ func (c *Client) getCCLocked(ctx context.Context, nsReceiver string) (*clientAnd
c.cs.Inc(numGRPCDial)

// Dial the destination.
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(c.creds), grpc.WithBackoffMaxDelay(20*time.Second))
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(c.creds), grpc.WithConnectParams(grpc.ConnectParams{
Backoff: grpcBackoff.Config{MaxDelay: 20 * time.Second},
}))
if err != nil {
return nil, noID, err
}
Expand Down
22 changes: 8 additions & 14 deletions client_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@ func (c *Client) Check(ctx context.Context, peer string) (*healthpb.HealthCheckR
}

var resp *healthpb.HealthCheckResponse
retry.X(3, time.Second, func() bool {
_ = retry.XWithContext(ctx, 3, time.Second, func(ctx context.Context) error {
var client healthpb.HealthClient
client, _, err = c.getHealthClient(ctx, nsReceiver)
if err != nil {
return false
return nil
}

resp, err = client.Check(ctx, &healthpb.HealthCheckRequest{})
if err != nil {
if ctx.Err() != nil {
return false
}
return true
return err
}

return false
return nil
})
if err != nil {
return nil, fmt.Errorf("checking health: %w", err)
Expand All @@ -49,22 +46,19 @@ func (c *Client) Watch(ctx context.Context, peer string) (healthpb.Health_WatchC
}

var recv healthpb.Health_WatchClient
retry.X(3, time.Second, func() bool {
_ = retry.XWithContext(ctx, 3, time.Second, func(ctx context.Context) error {
var client healthpb.HealthClient
client, _, err = c.getHealthClient(ctx, nsReceiver)
if err != nil {
return false
return nil
}

recv, err = client.Watch(ctx, &healthpb.HealthCheckRequest{})
if err != nil {
if ctx.Err() != nil {
return false
}
return true
return err
}

return false
return nil
})
if err != nil {
return nil, fmt.Errorf("checking health: %w", err)
Expand Down
17 changes: 14 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Server struct {
cancel func()
cfg ServerCfg
etcd *etcdv3.Client
grpc *xds.GRPCServer
grpc grpcServer
health *health.Server
stop sync.Once
fatalErr chan error
Expand All @@ -56,6 +56,12 @@ type Server struct {
creds credentials.TransportCredentials
}

type grpcServer interface {
RegisterService(desc *grpc.ServiceDesc, impl interface{})
Serve(lis net.Listener) error
Stop()
}

// NewServer for the grid. The namespace must contain only characters
// in the set: [a-zA-Z0-9-_] and no other.
func NewServer(etcd *etcdv3.Client, cfg ServerCfg) (*Server, error) {
Expand Down Expand Up @@ -90,11 +96,16 @@ func NewServer(etcd *etcdv3.Client, cfg ServerCfg) (*Server, error) {
r.Logger = cfg.Logger
}

grpcServer := xds.NewGRPCServer(grpc.Creds(creds))
var ser grpcServer
if cfg.XDSCreds {
ser = xds.NewGRPCServer(grpc.Creds(creds))
} else {
ser = grpc.NewServer(grpc.Creds(creds))
}
server := &Server{
cfg: cfg,
etcd: etcd,
grpc: grpcServer,
grpc: ser,
health: health.NewServer(),
actors: newMakeActorRegistry(),
fatalErr: make(chan error, 1),
Expand Down

0 comments on commit 3449b2b

Please sign in to comment.