Skip to content

Commit

Permalink
mvcc: sending events after restore
Browse files Browse the repository at this point in the history
  • Loading branch information
fengshaobao 00231050 committed Aug 17, 2017
1 parent f33d64a commit 637b723
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
23 changes: 23 additions & 0 deletions mvcc/watchable_store.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,29 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
s.mu.Unlock()
}

func (s *watchableStore) Restore(b backend.Backend) error {
var err error
var needSync bool
func() {
s.mu.Lock()
defer s.mu.Unlock()
err = s.store.Restore(b)
if err != nil {
return
}

for wa := range s.synced.watchers {
s.unsynced.watchers.add(wa)
}
s.synced = newWatcherGroup()
needSync = true
}()
if needSync {
s.syncWatchers()
}
return err
}

// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
func (s *watchableStore) syncWatchersLoop() {
defer s.wg.Done()
Expand Down
48 changes: 48 additions & 0 deletions mvcc/watchable_store_test.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"io/ioutil"
"path/filepath"
)

func TestWatch(t *testing.T) {
Expand Down Expand Up @@ -296,6 +298,52 @@ func TestWatchFutureRev(t *testing.T) {
}
}

func TestWatchRestoreRevs(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer func() {
s.Close()
os.Remove(tmpPath)
}()

testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)

dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
if err != nil {
plog.Fatal(err)
}
newPath := filepath.Join(dir, "database_new")

newBeckend := backend.NewDefaultBackend(newPath)
newStore := newWatchableStore(newBeckend, &lease.FakeLessor{}, nil)
defer func() {
newStore.Close()
os.Remove(newPath)
}()

w := newStore.NewWatchStream()
w.Watch(testKey, nil, rev-1)

newStore.Restore(b)

select {
case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}
}

// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
Expand Down

0 comments on commit 637b723

Please sign in to comment.