From 637b723ded12c358c4541e17c7dd36d9402b9ab6 Mon Sep 17 00:00:00 2001 From: fengshaobao 00231050 Date: Thu, 17 Aug 2017 20:38:34 +0800 Subject: [PATCH] mvcc: sending events after restore Fixes: #8411 --- mvcc/watchable_store.go | 23 +++++++++++++++++ mvcc/watchable_store_test.go | 48 ++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) mode change 100644 => 100755 mvcc/watchable_store.go mode change 100644 => 100755 mvcc/watchable_store_test.go diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go old mode 100644 new mode 100755 index 3205cf895214..81781b4365a7 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -180,6 +180,29 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { s.mu.Unlock() } +func (s *watchableStore) Restore(b backend.Backend) error { + var err error + var needSync bool + func() { + s.mu.Lock() + defer s.mu.Unlock() + err = s.store.Restore(b) + if err != nil { + return + } + + for wa := range s.synced.watchers { + s.unsynced.watchers.add(wa) + } + s.synced = newWatcherGroup() + needSync = true + }() + if needSync { + s.syncWatchers() + } + return err +} + // syncWatchersLoop syncs the watcher in the unsynced map every 100ms. func (s *watchableStore) syncWatchersLoop() { defer s.wg.Done() diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go old mode 100644 new mode 100755 index a72be9cd9038..d1130be953cd --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -26,6 +26,8 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" + "io/ioutil" + "path/filepath" ) func TestWatch(t *testing.T) { @@ -296,6 +298,52 @@ func TestWatchFutureRev(t *testing.T) { } } +func TestWatchRestoreRevs(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}, nil) + defer func() { + s.Close() + os.Remove(tmpPath) + }() + + testKey := []byte("foo") + testValue := []byte("bar") + rev := s.Put(testKey, testValue, lease.NoLease) + + dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") + if err != nil { + plog.Fatal(err) + } + newPath := filepath.Join(dir, "database_new") + + newBeckend := backend.NewDefaultBackend(newPath) + newStore := newWatchableStore(newBeckend, &lease.FakeLessor{}, nil) + defer func() { + newStore.Close() + os.Remove(newPath) + }() + + w := newStore.NewWatchStream() + w.Watch(testKey, nil, rev-1) + + newStore.Restore(b) + + select { + case resp := <-w.Chan(): + if resp.Revision != rev { + t.Fatalf("rev = %d, want %d", resp.Revision, rev) + } + if len(resp.Events) != 1 { + t.Fatalf("failed to get events from the response") + } + if resp.Events[0].Kv.ModRevision != rev { + t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev) + } + case <-time.After(time.Second): + t.Fatal("failed to receive event in 1 second.") + } +} + // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend()