Skip to content

Commit

Permalink
cluster: basic single node loading
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Mar 1, 2024
1 parent 29858ab commit 4903eaa
Show file tree
Hide file tree
Showing 14 changed files with 232 additions and 535 deletions.
130 changes: 54 additions & 76 deletions gen/go/vince/v1/status.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 7 additions & 47 deletions internal/cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"errors"
"io"
"sync"

v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"github.com/vinceanalytics/vince/internal/cluster/auth"
"github.com/vinceanalytics/vince/internal/cluster/connections"
"github.com/vinceanalytics/vince/internal/cluster/http"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand All @@ -31,11 +31,8 @@ func CredentialsFor(credStr *auth.CredentialsStore, username string) *v1.Credent

// Client allows communicating with a remote node.
type Client struct {
dialOpts []grpc.DialOption
insecure bool
localNodeAddr string
mu sync.RWMutex
clients map[string]*Klient
insecure bool
conns *connections.Manager
}

var _ http.Cluster = (*Client)(nil)
Expand All @@ -51,25 +48,10 @@ type Klient struct {
// and removing nodes are not retried, to make it clear to the operator
// that the operation failed. In addition, higher-level code will
// usually retry these operations.
func NewClient(insecure bool, opts ...grpc.DialOption) *Client {
func NewClient(mgr *connections.Manager) *Client {
return &Client{
insecure: insecure,
dialOpts: opts,
clients: make(map[string]*Klient),
}
}

// SetLocal informs the client instance of the node address for the node
// using this client. Along with the Service instance it allows this
// client to serve requests for this node locally without the network hop.
func (c *Client) SetLocal(nodeAddr string, serv v1.InternalCLusterClient) error {
c.mu.Lock()
c.clients[nodeAddr] = &Klient{
InternalCLusterClient: serv,
conns: mgr,
}
c.localNodeAddr = nodeAddr
c.mu.Unlock()
return nil
}

// GetNodeAPIAddr retrieves the API Address for the node at nodeAddr
Expand All @@ -86,26 +68,7 @@ func (c *Client) GetNodeAPIAddr(ctx context.Context, nodeAddr string) (string, e
}

func (c *Client) node(addr string) (v1.InternalCLusterClient, error) {
c.mu.RLock()
x, ok := c.clients[addr]
if ok {
c.mu.RUnlock()
return x, nil
}
c.mu.RUnlock()

conn, err := grpc.Dial(addr, c.dialOpts...)
if err != nil {
return nil, err
}
k := &Klient{
ClientConn: conn,
InternalCLusterClient: v1.NewInternalCLusterClient(conn),
}
c.mu.Lock()
c.clients[addr] = k
return k, nil

return c.conns.ByAddress(addr)
}

func (c *Client) SendData(ctx context.Context, req *v1.Data, nodeAddr string, creds *v1.Credentials) error {
Expand Down Expand Up @@ -238,11 +201,8 @@ func (c *Client) Breakdown(ctx context.Context, req *v1.BreakDown_Request, nodeA

// Stats returns stats on the Client instance
func (c *Client) Status() *v1.Status_Cluster {
c.mu.RLock()
defer c.mu.RUnlock()

o := &v1.Status_Cluster{
LocalNodeAddress: c.localNodeAddr,
LocalNodeAddress: c.conns.LocalAddress(),
}
return o
}
11 changes: 11 additions & 0 deletions internal/cluster/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type Manager struct {
localAddr string
dialOptions []grpc.DialOption
mapping sync.Map
conns sync.Map
}

Expand All @@ -24,6 +25,14 @@ func (m *Manager) LocalAddress() string {
return m.localAddr
}

func (m *Manager) ByAddress(target string) (*Conn, error) {
a, ok := m.mapping.Load(target)
if ok {
return m.Get(a.(string), target)
}
return m.Get(target, target)
}

func (m *Manager) Get(peer, target string) (*Conn, error) {
o, ok := m.conns.Load(peer)
if ok {
Expand All @@ -39,6 +48,7 @@ func (m *Manager) Get(peer, target string) (*Conn, error) {
InternalCLusterClient: v1.NewInternalCLusterClient(x),
}
m.conns.Store(peer, conn)
m.mapping.Store(target, peer)
return conn, nil
}

Expand All @@ -48,6 +58,7 @@ func (m *Manager) Close() (err error) {
if e != nil {
err = e
}
m.conns.Delete(key)
return true
})
return
Expand Down
Loading

0 comments on commit 4903eaa

Please sign in to comment.