Skip to content

Commit

Permalink
cluster/store: implement http.Database
Browse files Browse the repository at this point in the history
Restrict querying to only leader node. Use raft for Data method
  • Loading branch information
gernest committed Feb 27, 2024
1 parent f6d9f1d commit 705a7b1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
2 changes: 2 additions & 0 deletions internal/cluster/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type Database interface {
Load(ctx context.Context, req *v1.Load_Request) error
}

var _ Database = (*store.Store)(nil)

// Store is the interface the Raft-based database must implement.
type Store interface {
Database
Expand Down
34 changes: 34 additions & 0 deletions internal/cluster/store/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"context"
"errors"
"log/slog"
"os"
Expand All @@ -14,6 +15,7 @@ import (
v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"github.com/vinceanalytics/vince/internal/cluster/log"
"github.com/vinceanalytics/vince/internal/cluster/snapshots"
"github.com/vinceanalytics/vince/internal/compute"
"github.com/vinceanalytics/vince/internal/db"
"github.com/vinceanalytics/vince/internal/index/primary"
"github.com/vinceanalytics/vince/internal/indexer"
Expand Down Expand Up @@ -269,6 +271,38 @@ func (s *Store) Open() error {
return nil
}

func (s *Store) Data(ctx context.Context, req *v1.Data) error { return nil }

func (s *Store) Realtime(ctx context.Context, req *v1.Realtime_Request) (*v1.Realtime_Response, error) {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
return compute.Realtime(ctx, s.session, req)
}

func (s *Store) Aggregate(ctx context.Context, req *v1.Aggregate_Request) (*v1.Aggregate_Response, error) {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
return compute.Aggregate(ctx, s.session, req)
}

func (s *Store) Timeseries(ctx context.Context, req *v1.Timeseries_Request) (*v1.Timeseries_Response, error) {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
return compute.Timeseries(ctx, s.session, req)
}

func (s *Store) Breakdown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Response, error) {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
return compute.Breakdown(ctx, s.session, req)
}

func (s *Store) Load(ctx context.Context, req *v1.Load_Request) error { return nil }

// pathExists returns true if the given path exists.
func pathExists(p string) bool {
if _, err := os.Lstat(p); err != nil && os.IsNotExist(err) {
Expand Down

0 comments on commit 705a7b1

Please sign in to comment.