Skip to content

Commit

Permalink
cluster/store: update fsmRestore
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 29, 2024
1 parent 51bf780 commit 363fe1b
Showing 1 changed file with 31 additions and 25 deletions.
56 changes: 31 additions & 25 deletions internal/cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,6 @@ type Store struct {
snapshotStore *snapshots.Store // Snapshot store.
boltStore *log.Log

// Raft changes observer
leaderObserversMu sync.RWMutex
leaderObservers []chan<- struct{}
observerClose chan struct{}
observerDone chan struct{}
observerChan chan raft.Observation
observer *raft.Observer

firstIdxOnOpen uint64 // First index on log when Store opens.
lastIdxOnOpen uint64 // Last index on log when Store opens.
lastCommandIdxOnOpen uint64 // Last command index before applied index when Store opens.
Expand Down Expand Up @@ -267,19 +259,18 @@ func NewStore(base *v1.Config, ly Layer) (*Store, error) {
),
)
return &Store{
ly: ly,
raftDir: base.Data,
snapshotDir: filepath.Join(base.Data, snapshotsDirName),
peersPath: filepath.Join(base.Data, peersPath),
peersInfoPath: filepath.Join(base.Data, peersInfoPath),
dbPath: filepath.Join(base.Data, dbName),
restoreDoneCh: make(chan struct{}),
leaderObservers: make([]chan<- struct{}, 0),
logger: slog.Default().With("component", "store"),
notifyingNodes: make(map[string]*v1.Server),
ApplyTimeout: applyTimeout,
session: sess,
db: store,
ly: ly,
raftDir: base.Data,
snapshotDir: filepath.Join(base.Data, snapshotsDirName),
peersPath: filepath.Join(base.Data, peersPath),
peersInfoPath: filepath.Join(base.Data, peersInfoPath),
dbPath: filepath.Join(base.Data, dbName),
restoreDoneCh: make(chan struct{}),
logger: slog.Default().With("component", "store"),
notifyingNodes: make(map[string]*v1.Server),
ApplyTimeout: applyTimeout,
session: sess,
db: store,
}, nil
}

Expand Down Expand Up @@ -572,9 +563,6 @@ func (s *Store) Close(wait bool) (retErr error) {
return nil
}

close(s.observerClose)
<-s.observerDone

close(s.snapshotWClose)
<-s.snapshotWDone

Expand Down Expand Up @@ -806,7 +794,25 @@ func (s *Store) fsmSnapshot() (raft.FSMSnapshot, error) {
}

func (s *Store) fsmRestore(w io.ReadCloser) error {
return snapshots.NewBadger(s.db.DB).Restore(w)
s.logger.Info("initiating node restore", "nodeId", s.config.NodeId)
startT := time.Now()

err := snapshots.NewBadger(s.db.DB).Restore(w)
if err != nil {
return err
}
meta, err := s.snapshotStore.List()
if err != nil {
return fmt.Errorf("failed to get latest snapshot index post restore: %s", err)
}
idx := meta[len(meta)-1].Index
if err := s.boltStore.SetAppliedIndex(idx); err != nil {
return fmt.Errorf("failed to set applied index: %s", err)
}
s.fsmIdx.Store(idx)
elapsed := time.Since(startT)
s.logger.Info("node restored", "elapsed", elapsed)
return nil
}

// setLogInfo records some key indexes about the log.
Expand Down

0 comments on commit 363fe1b

Please sign in to comment.