Skip to content

Commit

Permalink
mvcc: sending events after restore
Browse files Browse the repository at this point in the history
  • Loading branch information
fengshaobao 00231050 committed Aug 18, 2017
1 parent f33d64a commit 0d723d5
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
15 changes: 15 additions & 0 deletions mvcc/watchable_store.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
s.mu.Unlock()
}

func (s *watchableStore) Restore(b backend.Backend) error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.store.Restore(b)
if err != nil {
return err
}

for wa := range s.synced.watchers {
s.unsynced.watchers.add(wa)
}
s.synced = newWatcherGroup()
return nil
}

// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
func (s *watchableStore) syncWatchersLoop() {
defer s.wg.Done()
Expand Down
42 changes: 42 additions & 0 deletions mvcc/watchable_store_test.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package mvcc
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -296,6 +298,46 @@ func TestWatchFutureRev(t *testing.T) {
}
}

func TestWatchRestore(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)

testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)

dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
if err != nil {
plog.Fatal(err)
}
newPath := filepath.Join(dir, "database_new")

newBackend := backend.NewDefaultBackend(newPath)
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
w.Watch(testKey, nil, rev-1)

newStore.Restore(b)

select {
case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}
}

// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
Expand Down

0 comments on commit 0d723d5

Please sign in to comment.