Skip to content

Commit

Permalink
cluster/store: add Join and Notify
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Mar 1, 2024
1 parent f9c0151 commit 376f350
Showing 1 changed file with 142 additions and 0 deletions.
142 changes: 142 additions & 0 deletions internal/cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ type Store struct {
logger *slog.Logger
logIncremental bool

numIgnoredJoins int
notifyMu sync.Mutex
BootstrapExpect int
bootstrapped bool
Expand Down Expand Up @@ -835,6 +836,139 @@ func (s *Store) setLogInfo() error {
return nil
}

// Notify notifies this Store that a node is ready for bootstrapping at the
// given address. Once the number of known nodes reaches the expected level
// bootstrapping will be attempted using this Store. "Expected level" includes
// this node, so this node must self-notify to ensure the cluster bootstraps
// with the *advertised Raft address* which the Store doesn't know about.
//
// Notifying is idempotent. A node may repeatedly notify the Store without issue.
func (s *Store) Notify(ctx context.Context, nr *v1.Notify_Request) error {
if !s.open.Is() {
return ErrNotOpen
}

s.notifyMu.Lock()
defer s.notifyMu.Unlock()

if s.BootstrapExpect == 0 || s.bootstrapped || s.HasLeader() {
// There is no reason this node will bootstrap.
//
// - Read-only nodes require that BootstrapExpect is set to 0, so this
// block ensures that notifying a read-only node will not cause a bootstrap.
// - If the node is already bootstrapped, then there is nothing to do.
// - If the node already has a leader, then no bootstrapping is required.
return nil
}

if _, ok := s.notifyingNodes[nr.Id]; ok {
return nil
}

// Confirm that this node can resolve the remote address. This can happen due
// to incomplete DNS records across the underlying infrastructure. If it can't
// then don't consider this Notify attempt successful -- so the notifying node
// will presumably try again.
if addr, err := resolvableAddress(nr.Address); err != nil {
return fmt.Errorf("failed to resolve %s: %w", addr, err)
}

s.notifyingNodes[nr.Id] = &v1.Server{
Id: nr.Id,
Addr: nr.Address,
Suffrage: v1.Server_Voter,
}
if len(s.notifyingNodes) < s.BootstrapExpect {
return nil
}

raftServers := make([]raft.Server, 0)
for _, n := range s.notifyingNodes {
raftServers = append(raftServers, raft.Server{
ID: raft.ServerID(n.Id),
Address: raft.ServerAddress(n.Addr),
})
}

s.logger.Info("starting cluster bootstrap",
"count", s.BootstrapExpect)
bf := s.raft.BootstrapCluster(raft.Configuration{
Servers: raftServers,
})
if bf.Error() != nil {
s.logger.Error("cluster bootstrap failed", "err", bf.Error())
} else {
s.logger.Info("cluster bootstrap successful")
}
s.bootstrapped = true
return nil
}

// Join joins a node, identified by id and located at addr, to this store.
// The node must be ready to respond to Raft communications at that address.
func (s *Store) Join(ctx context.Context, jr *v1.Join_Request) error {
if !s.open.Is() {
return ErrNotOpen
}

if s.raft.State() != raft.Leader {
return ErrNotLeader
}

id := jr.Id
addr := jr.Address
voter := jr.Voter

// Confirm that this node can resolve the remote address. This can happen due
// to incomplete DNS records across the underlying infrastructure. If it can't
// then don't consider this join attempt successful -- so the joining node
// will presumably try again.
if addr, err := resolvableAddress(addr); err != nil {
return fmt.Errorf("failed to resolve %s: %w", addr, err)
}

configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Error("failed to get raft configuration", "err", err)
return err
}

for _, srv := range configFuture.Configuration().Servers {
// If a node already exists with either the joining node's ID or address,
// that node may need to be removed from the config first.
if srv.ID == raft.ServerID(id) || srv.Address == raft.ServerAddress(addr) {
// However, if *both* the ID and the address are the same, then no
// join is actually needed.
if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(id) {
s.numIgnoredJoins++
s.logger.Info("node is already member of cluster, ignoring join request", "nodeId", id, "address", addr)
return nil
}

if err := s.remove(id); err != nil {
s.logger.Error("failed to remove node ", "nodeId", id, "err", err)
return err
}
s.logger.Info("removed node prior to rejoin with changed ID or address", "nodeId", id)
}
}

var f raft.IndexFuture
if voter {
f = s.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(addr), 0, 0)
} else {
f = s.raft.AddNonvoter(raft.ServerID(id), raft.ServerAddress(addr), 0, 0)
}
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
return ErrNotLeader
}
return e.Error()
}
s.logger.Info("node joined successfully ", "nodeId", id, "addr", addr, "suffrage", prettyVoter(voter))
return nil
}

// Remove removes a node from the store.
func (s *Store) Remove(ctx context.Context, rn *v1.RemoveNode_Request) error {
if !s.open.Is() {
Expand All @@ -858,6 +992,14 @@ func (s *Store) remove(id string) error {
return f.Error()
}

// prettyVoter converts bool to "voter" or "non-voter"
func prettyVoter(v bool) string {
if v {
return "voter"
}
return "non-voter"
}

// pathExists returns true if the given path exists.
func pathExists(p string) bool {
if _, err := os.Lstat(p); err != nil && os.IsNotExist(err) {
Expand Down

0 comments on commit 376f350

Please sign in to comment.