Skip to content

Commit

Permalink
cluster/store: add state recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 28, 2024
1 parent 705a7b1 commit d9afbb2
Show file tree
Hide file tree
Showing 12 changed files with 512 additions and 44 deletions.
36 changes: 23 additions & 13 deletions gen/go/vince/v1/events.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions internal/cluster/events/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (b *Builder) NewRecord() arrow.Record {
func (b *Builder) Release() {
b.r.Release()
}
func (b *Builder) Len() int {
return b.r.Field(0).Len()
}

func (b *Builder) Write(list *v1.Data_List) arrow.Record {
ls := list.GetItems()
Expand All @@ -81,12 +84,12 @@ func (b *Builder) Write(list *v1.Data_List) arrow.Record {
})
b.r.Reserve(len(ls))
for _, e := range ls {
b.writeData(e)
b.WriteData(e)
}
return b.NewRecord()
}

func (b *Builder) writeData(data *v1.Data) {
func (b *Builder) WriteData(data *v1.Data) {
fs := data.ProtoReflect()
fds := fs.Descriptor().Fields()
for i := 0; i < fds.Len(); i++ {
Expand Down
6 changes: 6 additions & 0 deletions internal/cluster/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,12 @@ func (s *Service) handleApiEvent(w http.ResponseWriter, r *http.Request, params

func (s *Service) process(w http.ResponseWriter, r *http.Request, e *v1.Data) {
ctx := r.Context()
tenantId := s.tenants.TenantBySiteID(ctx, e.Domain)
if tenantId == "" {
w.WriteHeader(http.StatusUnauthorized)
return
}
e.TenantId = tenantId
err := s.store.Data(ctx, e)
if err == nil {
w.WriteHeader(http.StatusAccepted)
Expand Down
23 changes: 23 additions & 0 deletions internal/cluster/snapshots/badger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package snapshots

import (
"github.com/dgraph-io/badger/v4"
"github.com/hashicorp/raft"
)

type Badger struct {
DB *badger.DB
}

func NewBadger(db *badger.DB) *Badger {
return &Badger{DB: db}
}

var _ raft.FSMSnapshot = (*Badger)(nil)

func (b *Badger) Persist(sink raft.SnapshotSink) error {
_, err := b.DB.Backup(sink, 0)
return err
}

func (Badger) Release() {}
File renamed without changes.
Loading

0 comments on commit d9afbb2

Please sign in to comment.