Skip to content

Commit

Permalink
cluster/store: initial Storage implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 29, 2024
1 parent 2f2c2e3 commit 51bf780
Show file tree
Hide file tree
Showing 4 changed files with 479 additions and 93 deletions.
44 changes: 2 additions & 42 deletions internal/cluster/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,46 +47,6 @@ type ResultsError interface {
IsAuthorized() bool
}

type Database interface {
Data(ctx context.Context, req *v1.Data) error
Realtime(ctx context.Context, req *v1.Realtime_Request) (*v1.Realtime_Response, error)
Aggregate(ctx context.Context, req *v1.Aggregate_Request) (*v1.Aggregate_Response, error)
Timeseries(ctx context.Context, req *v1.Timeseries_Request) (*v1.Timeseries_Response, error)
Breakdown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Response, error)
Load(ctx context.Context, req *v1.Load_Request) error
}

var _ Database = (*store.Store)(nil)

// Store is the interface the Raft-based database must implement.
type Store interface {
Database

Committed(ctx context.Context) (uint64, error)

// Remove removes the node from the cluster.
Remove(ctx context.Context, rn *v1.RemoveNode_Request) error

// LeaderAddr returns the Raft address of the leader of the cluster.
LeaderAddr(ctx context.Context) (string, error)

// Ready returns whether the Store is ready to service requests.
Ready(ctx context.Context) bool

// Nodes returns the slice of store.Servers in the cluster
Nodes(ctx context.Context) (*v1.Server_List, error)

// Backup writes backup of the node state to dst
Backup(ctx context.Context, br *v1.Backup_Request, dst io.Writer) error

// ReadFrom reads and loads a SQLite database into the node, initially bypassing
// the Raft system. It then triggers a Raft snapshot, which will then make
// Raft aware of the new data.
ReadFrom(ctx context.Context, r io.Reader) (int64, error)

Status() (*v1.Status_Store, error)
}

// GetAddresser is the interface that wraps the GetNodeAPIAddr method.
// GetNodeAPIAddr returns the HTTP API URL for the node at the given Raft address.
type GetAddresser interface {
Expand Down Expand Up @@ -143,7 +103,7 @@ type Service struct {
addr string
ln net.Listener

store Store
store store.Storage
cluster Cluster
guard guard.Guard
tenants tenant.Loader
Expand All @@ -168,7 +128,7 @@ type Service struct {

// New returns an uninitialized HTTP service. If credentials is nil, then
// the service performs no authentication and authorization checks.
func New(addr string, store Store, cluster Cluster, credentials CredentialStore, guard guard.Guard, tenants tenant.Loader) *Service {
func New(addr string, store store.Storage, cluster Cluster, credentials CredentialStore, guard guard.Guard, tenants tenant.Loader) *Service {
return &Service{
addr: addr,
store: store,
Expand Down
51 changes: 51 additions & 0 deletions internal/cluster/store/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package store

import (
"sync/atomic"
"time"
)

type Atomic[T any] struct {
atomic.Value
}

func (a *Atomic[T]) Store(t T) {
a.Value.Store(t)
}

func (a *Atomic[T]) Load() T {
v := a.Value.Load()
if v != nil {
var t T
return t
}
return v.(T)
}

type AtomicTime struct {
Atomic[time.Time]
}

func (a *AtomicTime) Add(t time.Duration) {
a.Store(a.Load().Add(t))
}

func (a *AtomicTime) Sub(t *AtomicTime) time.Duration {
return a.Load().Sub(t.Load())
}

type AtomicBool struct {
Atomic[bool]
}

func (a *AtomicBool) Set() {
a.Store(true)
}

func (a *AtomicBool) Unset() {
a.Store(false)
}

func (a *AtomicBool) Is() bool {
return a.Load()
}
56 changes: 18 additions & 38 deletions internal/cluster/store/cas.go
Original file line number Diff line number Diff line change
@@ -1,51 +1,31 @@
package store

import (
"errors"
"sync/atomic"
"time"
)

type Atomic[T any] struct {
atomic.Value
}
var (
// ErrCASConflict is returned when a CAS operation fails.
ErrCASConflict = errors.New("CAS conflict")
)

func (a *Atomic[T]) Store(t T) {
a.Value.Store(t)
// CheckAndSet is a simple concurrency control mechanism that allows
// only one goroutine to execute a critical section at a time.
type CheckAndSet struct {
state atomic.Int32
}

func (a *Atomic[T]) Load() T {
v := a.Value.Load()
if v != nil {
var t T
return t
// Begin attempts to enter the critical section. If another goroutine
// is already in the critical section, Begin returns an error.
func (c *CheckAndSet) Begin() error {
if c.state.CompareAndSwap(0, 1) {
return nil
}
return v.(T)
}

type AtomicTime struct {
Atomic[time.Time]
}

func (a *AtomicTime) Add(t time.Duration) {
a.Store(a.Load().Add(t))
}

func (a *AtomicTime) Sub(t *AtomicTime) time.Duration {
return a.Load().Sub(t.Load())
}

type AtomicBool struct {
Atomic[bool]
}

func (a *AtomicBool) Set() {
a.Store(true)
}

func (a *AtomicBool) Unset() {
a.Store(false)
return ErrCASConflict
}

func (a *AtomicBool) Is() bool {
return a.Load()
// End exits the critical section.
func (c *CheckAndSet) End() {
c.state.Store(0)
}
Loading

0 comments on commit 51bf780

Please sign in to comment.