-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpc.go
101 lines (88 loc) · 2.34 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package goyser
import (
"context"
"crypto/x509"
"errors"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
"net/url"
"time"
)
// createAndObserveGRPCConn creates a new gRPC connection and observes its conn status.
func createAndObserveGRPCConn(ctx context.Context, chErr chan error, target string) (*grpc.ClientConn, error) {
var opts []grpc.DialOption
u, err := url.Parse(target)
if err != nil {
return nil, err
}
port := u.Port()
if u.Scheme == "http" {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
pool, _ := x509.SystemCertPool()
opts = append(opts, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(pool, "")))
port = "443"
}
hostname := u.Hostname()
if hostname == "" {
return nil, errors.New("please provide URL format endpoint e.g. http(s)://<endpoint>:<port>")
}
address := hostname + ":" + port
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
}))
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
conn, err := grpc.NewClient(address, opts...)
if err != nil {
return nil, err
}
go func() {
var retries int
for {
select {
case <-ctx.Done():
if err = conn.Close(); err != nil {
chErr <- err
}
return
default:
state := conn.GetState()
if state == connectivity.Ready {
retries = 0
time.Sleep(1 * time.Second)
continue
}
if state == connectivity.TransientFailure || state == connectivity.Connecting || state == connectivity.Idle {
if retries < 5 {
time.Sleep(time.Duration(retries) * time.Second)
conn.ResetConnectBackoff()
retries++
} else {
conn.Close()
conn, err = grpc.NewClient(target, opts...)
if err != nil {
chErr <- err
}
retries = 0
}
} else if state == connectivity.Shutdown {
conn, err = grpc.NewClient(target, opts...)
if err != nil {
chErr <- err
}
retries = 0
}
if !conn.WaitForStateChange(ctx, state) {
continue
}
}
}
}()
return conn, nil
}