Skip to content

Commit

Permalink
cluster: add disco
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 24, 2024
1 parent 274f6db commit 5b6ffd9
Show file tree
Hide file tree
Showing 2 changed files with 382 additions and 0 deletions.
179 changes: 179 additions & 0 deletions internal/cluster/disco/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package disco

import (
"fmt"
"log"
"os"
"sync"
"time"

"github.com/vinceanalytics/vince/internal/cluster/random"
)

const (
leaderChanLen = 5 // Support any fast back-to-back leadership changes.
)

// Client is the interface discovery clients must implement.
type Client interface {
// GetLeader returns the current Leader stored in the KV store. If the Leader
// is set, the returned ok flag will be true. If the Leader is not set, the
// returned ok flag will be false.
GetLeader() (id string, apiAddr string, addr string, ok bool, e error)

// InitializeLeader sets the leader to the given details, but only if no leader
// has already been set. This operation is a check-and-set type operation. If
// initialization succeeds, ok is set to true, otherwise false.
InitializeLeader(id, apiAddr, addr string) (bool, error)

// SetLeader unconditionally sets the leader to the given details.
SetLeader(id, apiAddr, addr string) error

fmt.Stringer
}

// Store is the interface the consensus system must implement.
type Store interface {
// IsLeader returns whether this node is the Leader.
IsLeader() bool

// RegisterLeaderChange registers a channel that will be notified when
// a leadership change occurs.
RegisterLeaderChange(c chan<- struct{})
}

// Suffrage is the type of suffrage -- voting or non-voting -- a node has.
type Suffrage int

const (
SuffrageUnknown Suffrage = iota
Voter
NonVoter
)

// VoterSuffrage returns a Suffrage based on the given boolean.
func VoterSuffrage(b bool) Suffrage {
if b {
return Voter
}
return NonVoter
}

// IsVoter returns whether the Suffrage indicates a Voter.
func (s Suffrage) IsVoter() bool {
return s == Voter
}

// Service represents a Discovery Service instance.
type Service struct {
RegisterInterval time.Duration
ReportInterval time.Duration

c Client
s Store
suf Suffrage

logger *log.Logger

mu sync.Mutex
lastContact time.Time
}

// NewService returns an instantiated Discovery Service.
func NewService(c Client, s Store, suf Suffrage) *Service {
return &Service{
c: c,
s: s,
suf: suf,
RegisterInterval: 3 * time.Second,
ReportInterval: 10 * time.Second,
logger: log.New(os.Stderr, "[disco] ", log.LstdFlags),
}
}

// Register registers this node with the discovery service. It will block
// until a) if the node is a voter, it registers itself, b) learns of another
// node it can use to join the cluster, or c) an unrecoverable error occurs.
func (s *Service) Register(id, apiAddr, addr string) (bool, string, error) {
for {
_, _, cRaftAddr, ok, err := s.c.GetLeader()
if err != nil {
s.logger.Printf("failed to get leader: %s", err.Error())
}
if ok {
return false, cRaftAddr, nil
}

if s.suf.IsVoter() {
ok, err = s.c.InitializeLeader(id, apiAddr, addr)
if err != nil {
s.logger.Printf("failed to initialize as Leader: %s", err.Error())
}
if ok {
s.updateContact(time.Now())
return true, addr, nil
}
}

time.Sleep(random.Jitter(s.RegisterInterval))
}
}

// StartReporting reports the details of this node to the discovery service,
// if, and only if, this node is the leader. The service will report
// anytime a leadership change is detected. It also does it periodically
// to deal with any intermittent issues that caused Leadership information
// to go stale.
func (s *Service) StartReporting(id, apiAddr, addr string) chan struct{} {
ticker := time.NewTicker(s.ReportInterval)
obCh := make(chan struct{}, leaderChanLen)
s.s.RegisterLeaderChange(obCh)

update := func(changed bool) {
if s.s.IsLeader() {
if err := s.c.SetLeader(id, apiAddr, addr); err != nil {
s.logger.Printf("failed to update discovery service with Leader details: %s",
err.Error())
}
if changed {
s.logger.Printf("updated Leader API address to %s due to leadership change",
apiAddr)
}
s.updateContact(time.Now())
}
}

done := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
update(false)
case <-obCh:
update(true)
case <-done:
return
}
}
}()
return done
}

// Stats returns diagnostic information on the disco service.
func (s *Service) Stats() (map[string]interface{}, error) {
s.mu.Lock()
defer s.mu.Unlock()

return map[string]interface{}{
"mode": s.c.String(),
"register_interval": s.RegisterInterval,
"report_interval": s.ReportInterval,
"last_contact": s.lastContact,
}, nil
}

func (s *Service) updateContact(t time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastContact = t
}
203 changes: 203 additions & 0 deletions internal/cluster/disco/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package disco

import (
"sync"
"testing"
"time"
)

func Test_NewServce(t *testing.T) {
s := NewService(&mockClient{}, &mockStore{}, Voter)
if s == nil {
t.Fatalf("service is nil")
}
}

func Test_RegisterGetLeaderOK(t *testing.T) {
m := &mockClient{}
m.getLeaderFn = func() (id string, apiAddr string, addr string, ok bool, e error) {
return "2", "localhost:4003", "localhost:4004", true, nil
}
m.initializeLeaderFn = func(tID, tAPIAddr, tAddr string) (bool, error) {
t.Fatalf("Leader initialized unexpectedly")
return false, nil
}
c := &mockStore{}

s := NewService(m, c, Voter)
s.RegisterInterval = 10 * time.Millisecond

ok, addr, err := s.Register("1", "localhost:4001", "localhost:4002")
if err != nil {
t.Fatalf("error registering with disco: %s", err.Error())
}
if ok {
t.Fatalf("registered as leader unexpectedly")
}
if exp, got := "localhost:4004", addr; exp != got {
t.Fatalf("returned addressed incorrect, exp %s, got %s", exp, got)
}
}

func Test_RegisterInitializeLeader(t *testing.T) {
m := &mockClient{}
m.getLeaderFn = func() (id string, apiAddr string, addr string, ok bool, e error) {
return "", "", "", false, nil
}
m.initializeLeaderFn = func(tID, tAPIAddr, tAddr string) (bool, error) {
if tID != "1" || tAPIAddr != "localhost:4001" || tAddr != "localhost:4002" {
t.Fatalf("wrong values passed to InitializeLeader")
}
return true, nil
}
c := &mockStore{}

s := NewService(m, c, Voter)
s.RegisterInterval = 10 * time.Millisecond

ok, addr, err := s.Register("1", "localhost:4001", "localhost:4002")
if err != nil {
t.Fatalf("error registering with disco: %s", err.Error())
}
if !ok {
t.Fatalf("failed to register as expected")
}
if exp, got := "localhost:4002", addr; exp != got {
t.Fatalf("returned addressed incorrect, exp %s, got %s", exp, got)
}
}

func Test_RegisterNonVoter(t *testing.T) {
m := &mockClient{}
getCalled := false
m.getLeaderFn = func() (id string, apiAddr string, addr string, ok bool, e error) {
if getCalled {
return "2", "localhost:4003", "localhost:4004", true, nil
}
getCalled = true
return "", "", "", false, nil
}
m.initializeLeaderFn = func(tID, tAPIAddr, tAddr string) (bool, error) {
t.Fatal("InitializeLeader called unexpectedly")
return false, nil
}

c := &mockStore{}
s := NewService(m, c, NonVoter)
s.RegisterInterval = 10 * time.Millisecond

ok, addr, err := s.Register("1", "localhost:4001", "localhost:4002")
if err != nil {
t.Fatalf("error registering with disco: %s", err.Error())
}
if ok {
t.Fatalf("registered incorrectly as non-voter")
}
if exp, got := "localhost:4004", addr; exp != got {
t.Fatalf("returned addressed incorrect, exp %s, got %s", exp, got)
}
}

func Test_StartReportingTimer(t *testing.T) {
// WaitGroups won't work because we don't know how many times
// setLeaderFn will be called.
calledCh := make(chan struct{}, 10)

m := &mockClient{}
m.setLeaderFn = func(id, apiAddr, addr string) error {
if id != "1" || apiAddr != "localhost:4001" || addr != "localhost:4002" {
t.Fatalf("wrong values passed to SetLeader")
}
calledCh <- struct{}{}
return nil
}
c := &mockStore{}
c.isLeaderFn = func() bool {
return true
}

s := NewService(m, c, Voter)
s.ReportInterval = 10 * time.Millisecond

go s.StartReporting("1", "localhost:4001", "localhost:4002")
<-calledCh
}

func Test_StartReportingChange(t *testing.T) {
var wg sync.WaitGroup
m := &mockClient{}
m.setLeaderFn = func(id, apiAddr, addr string) error {
defer wg.Done()
if id != "1" || apiAddr != "localhost:4001" || addr != "localhost:4002" {
t.Fatalf("wrong values passed to SetLeader")
}
return nil
}
c := &mockStore{}
c.isLeaderFn = func() bool {
return true
}
var ch chan<- struct{}
c.registerLeaderChangeFn = func(c chan<- struct{}) {
ch = c
}

wg.Add(1)
s := NewService(m, c, Voter)
s.ReportInterval = 10 * time.Minute // Nothing will happen due to timer.
done := s.StartReporting("1", "localhost:4001", "localhost:4002")

// Signal a leadership change.
ch <- struct{}{}
wg.Wait()
close(done)
}

type mockClient struct {
getLeaderFn func() (id string, apiAddr string, addr string, ok bool, e error)
initializeLeaderFn func(id, apiAddr, addr string) (bool, error)
setLeaderFn func(id, apiAddr, addr string) error
}

func (m *mockClient) GetLeader() (id string, apiAddr string, addr string, ok bool, e error) {
if m.getLeaderFn != nil {
return m.getLeaderFn()
}
return
}

func (m *mockClient) InitializeLeader(id, apiAddr, addr string) (bool, error) {
if m.initializeLeaderFn != nil {
return m.initializeLeaderFn(id, apiAddr, addr)
}
return false, nil
}

func (m *mockClient) SetLeader(id, apiAddr, addr string) error {
if m.setLeaderFn != nil {
return m.setLeaderFn(id, apiAddr, addr)
}
return nil
}

func (m *mockClient) String() string {
return "mock"
}

type mockStore struct {
isLeaderFn func() bool
registerLeaderChangeFn func(c chan<- struct{})
}

func (m *mockStore) IsLeader() bool {
if m.isLeaderFn != nil {
return m.isLeaderFn()
}
return false
}

func (m *mockStore) RegisterLeaderChange(c chan<- struct{}) {
if m.registerLeaderChangeFn != nil {
m.registerLeaderChangeFn(c)
}
}

0 comments on commit 5b6ffd9

Please sign in to comment.