Skip to content

Commit

Permalink
cluster/transport: use connections.Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Mar 1, 2024
1 parent 42ee6d4 commit 29858ab
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 41 deletions.
60 changes: 60 additions & 0 deletions internal/cluster/connections/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package connections

import (
"sync"

v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"google.golang.org/grpc"
)

type Manager struct {
localAddr string
dialOptions []grpc.DialOption
conns sync.Map
}

func New(localAddr string, opts ...grpc.DialOption) *Manager {
return &Manager{
localAddr: localAddr,
dialOptions: opts,
}
}

func (m *Manager) LocalAddress() string {
return m.localAddr
}

func (m *Manager) Get(peer, target string) (*Conn, error) {
o, ok := m.conns.Load(peer)
if ok {
return o.(*Conn), nil
}
x, err := grpc.Dial(target, m.dialOptions...)
if err != nil {
return nil, err
}
conn := &Conn{
ClientConn: x,
RaftTransportClient: v1.NewRaftTransportClient(x),
InternalCLusterClient: v1.NewInternalCLusterClient(x),
}
m.conns.Store(peer, conn)
return conn, nil
}

func (m *Manager) Close() (err error) {
m.conns.Range(func(key, value any) bool {
e := value.(*Conn).Close()
if e != nil {
err = e
}
return true
})
return
}

type Conn struct {
*grpc.ClientConn
v1.RaftTransportClient
v1.InternalCLusterClient
}
9 changes: 5 additions & 4 deletions internal/cluster/transport/pair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/hashicorp/raft"
"github.com/vinceanalytics/vince/internal/cluster/connections"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
Expand All @@ -21,12 +22,12 @@ func makeTestPair(ctx context.Context, t *testing.T) (raft.Transport, raft.Trans
t2Listen := bufconn.Listen(1024)
shutdownSig := make(chan struct{})

t1 := New(raft.ServerAddress("t1"), []grpc.DialOption{grpc.WithInsecure(), grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
t1 := New(connections.New("t1", grpc.WithInsecure(), grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return t2Listen.Dial()
})})
t2 := New(raft.ServerAddress("t2"), []grpc.DialOption{grpc.WithInsecure(), grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
})))
t2 := New(connections.New("t2", grpc.WithInsecure(), grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return t1Listen.Dial()
})})
})))

s1 := grpc.NewServer()
t1.Register(s1)
Expand Down
23 changes: 2 additions & 21 deletions internal/cluster/transport/raftapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/hashicorp/raft"
v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)

Expand All @@ -18,36 +17,18 @@ type raftAPI struct {
manager *Manager
}

type conn struct {
*grpc.ClientConn
v1.RaftTransportClient
}

// Consumer returns a channel that can be used to consume and respond to RPC requests.
func (r raftAPI) Consumer() <-chan raft.RPC {
return r.manager.rpcChan
}

// LocalAddr is used to return our local address to distinguish from our peers.
func (r raftAPI) LocalAddr() raft.ServerAddress {
return r.manager.localAddress
return raft.ServerAddress(r.manager.LocalAddress())
}

func (r raftAPI) getPeer(id raft.ServerID, target raft.ServerAddress) (v1.RaftTransportClient, error) {
c, ok := r.manager.connections.Load(id)
if ok {
return c.(*conn), nil
}
x, err := grpc.Dial(string(target), r.manager.dialOptions...)
if err != nil {
return nil, err
}
xc := &conn{
ClientConn: x,
RaftTransportClient: v1.NewRaftTransportClient(x),
}
r.manager.connections.Store(id, x)
return xc, nil
return r.manager.Get(string(id), string(target))
}

// AppendEntries sends the appropriate RPC to the target node.
Expand Down
22 changes: 6 additions & 16 deletions internal/cluster/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,27 @@
package transport

import (
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/raft"
v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"github.com/vinceanalytics/vince/internal/cluster/connections"
"google.golang.org/grpc"
)

type Manager struct {
localAddress raft.ServerAddress
dialOptions []grpc.DialOption

*connections.Manager
rpcChan chan raft.RPC
heartbeatFunc atomic.Value
heartbeatTimeout time.Duration
connections sync.Map
}

// New creates both components of raft-grpc-transport: a gRPC service and a Raft Transport.
func New(localAddress raft.ServerAddress, dialOptions []grpc.DialOption, options ...Option) *Manager {
func New(conns *connections.Manager, options ...Option) *Manager {
m := &Manager{
localAddress: localAddress,
dialOptions: dialOptions,
rpcChan: make(chan raft.RPC),
Manager: conns,
rpcChan: make(chan raft.RPC),
}
for _, opt := range options {
opt(m)
Expand All @@ -45,12 +41,6 @@ func (m *Manager) Transport() raft.Transport {
}

func (m *Manager) Close() (err error) {
m.connections.Range(func(key, value any) bool {
e := value.(*conn).Close()
if e != nil {
err = e
}
return true
})
m.Manager.Close()
return
}

0 comments on commit 29858ab

Please sign in to comment.