Skip to content

Commit

Permalink
manage the current state of a cluster connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ftl committed Apr 28, 2024
1 parent 7994775 commit 6d303c2
Showing 1 changed file with 81 additions and 35 deletions.
116 changes: 81 additions & 35 deletions core/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"net"
"strings"
"sync"
"time"

"github.com/ftl/clusterix"
Expand Down Expand Up @@ -59,7 +60,7 @@ func NewClusters(sources []core.SpotSource, bandmap Bandmap, bandplan bandplan.B
if isDemoSource(spotSource.Name) {
cluster = newDemoCluster(result, spotSource, bandmap, bandplan, clock)
} else {
cluster = newCluster(result, spotSource, bandmap, bandplan, clock)
cluster = newClusterixCluster(result, spotSource, bandmap, bandplan, clock)
}
result.clusters = append(result.clusters, cluster)
}
Expand Down Expand Up @@ -155,13 +156,21 @@ func (c *Clusters) clusterConnected(name string, connected bool) {
}
log.Printf("Cluster %s %s", name, status)

if !connected && c.view != nil {
if c.view != nil {
c.doIgnoreUpdates(func() {
c.view.SetSpotSourceEnabled(name, false)
c.view.SetSpotSourceEnabled(name, connected)
})
}
}

type ConnectionState int

const (
Disconnected ConnectionState = iota
ConnectionPending
Connected
)

type clusterClient interface {
Connected() bool
Notify(any)
Expand All @@ -170,73 +179,110 @@ type clusterClient interface {

type openClusterFunc func(host *net.TCPAddr, username string, password string, trace bool) (clusterClient, error)

func openClusterix(host *net.TCPAddr, username string, password string, trace bool) (clusterClient, error) {
return clusterix.Open(host, username, password, trace)
}

type cluster struct {
parent *Clusters
source core.SpotSource
bandmap Bandmap
bandplan bandplan.Bandplan
clock core.Clock

client clusterClient
openCluster openClusterFunc
client clusterClient
clientMutex *sync.RWMutex
openCluster openClusterFunc
connectionState ConnectionState
}

func newCluster(parent *Clusters, source core.SpotSource, bandmap Bandmap, bandplan bandplan.Bandplan, clock core.Clock) *cluster {
func newCluster(parent *Clusters, source core.SpotSource, bandmap Bandmap, bandplan bandplan.Bandplan, clock core.Clock, openCluster openClusterFunc) *cluster {
return &cluster{
parent: parent,
source: source,
bandmap: bandmap,
bandplan: bandplan,
clock: clock,

openCluster: openClusterix,
clientMutex: &sync.RWMutex{},
connectionState: Disconnected,
openCluster: openCluster,
}
}

func (c *cluster) Active() bool {
return c.client != nil && c.client.Connected()
c.clientMutex.RLock()
defer c.clientMutex.RUnlock()
return c.connectionState == Connected
}

func (c *cluster) Enable() error {
if c.client != nil {
c.clientMutex.Lock()
defer c.clientMutex.Unlock()

if c.connectionState != Disconnected {
return nil
}

hostAddress, err := network.ParseTCPAddr(c.source.HostAddress)
if err != nil {
return err
}
c.connectionState = ConnectionPending

c.client, err = c.openCluster(hostAddress, c.source.Username, c.source.Password, traceClusterix)
if err != nil {
c.client = nil
return err
}
go func() {
client, err := c.openCluster(hostAddress, c.source.Username, c.source.Password, traceClusterix)

c.clientMutex.Lock()
defer c.clientMutex.Unlock()

c.client.Notify(c)
if err != nil {
log.Printf("Connection to cluster %s failed: %v", c.source.Name, err)
c.client = nil
c.connectionState = Disconnected
c.parent.clusterConnected(c.source.Name, false)
return
}
if c.connectionState == Disconnected {
log.Printf("Connection to cluster %s aborted", c.source.Name)
c.parent.clusterConnected(c.source.Name, false)
return
}

c.client = client
c.client.Notify(c)
c.connectionState = Connected
c.parent.clusterConnected(c.source.Name, true)
}()

return nil
}

func (c *cluster) Disable() error {
if c.client == nil {
c.clientMutex.Lock()
defer c.clientMutex.Unlock()

if c.connectionState == Disconnected {
return nil
}
c.client.Disconnect()
c.client = nil
c.connectionState = Disconnected
if c.client != nil {
c.client.Disconnect()
c.client = nil
}

return nil
}

func (c *cluster) Connected(connected bool) {
if !connected {
c.client = nil
}
c.parent.clusterConnected(c.source.Name, connected)
go func() {
c.clientMutex.Lock()
if connected {
c.connectionState = Connected
} else {
c.connectionState = Disconnected
c.client = nil
}
c.clientMutex.Unlock()

c.parent.clusterConnected(c.source.Name, connected)
}()
}

func (c *cluster) DX(msg clusterix.DXMessage) {
Expand Down Expand Up @@ -339,16 +385,16 @@ func toCoreBand(bandName bandplan.BandName) core.Band {
return core.Band(bandName)
}

func newDemoCluster(parent *Clusters, source core.SpotSource, bandmap Bandmap, bandplan bandplan.Bandplan, clock core.Clock) *cluster {
return &cluster{
parent: parent,
source: source,
bandmap: bandmap,
bandplan: bandplan,
clock: clock,
func newClusterixCluster(parent *Clusters, source core.SpotSource, bandmap Bandmap, bandplan bandplan.Bandplan, clock core.Clock) *cluster {
return newCluster(parent, source, bandmap, bandplan, clock, openClusterix)
}

openCluster: openDemoCluster(bandmap),
}
func openClusterix(host *net.TCPAddr, username string, password string, trace bool) (clusterClient, error) {
return clusterix.Open(host, username, password, trace)
}

func newDemoCluster(parent *Clusters, source core.SpotSource, bandmap Bandmap, bandplan bandplan.Bandplan, clock core.Clock) *cluster {
return newCluster(parent, source, bandmap, bandplan, clock, openDemoCluster(bandmap))
}

func openDemoCluster(bandmap Bandmap) openClusterFunc {
Expand Down

0 comments on commit 6d303c2

Please sign in to comment.