diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 68d9ab71d27..5b00bcba997 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -188,7 +188,7 @@ func (s *watchableStore) Restore(b backend.Backend) error { } for wa := range s.synced.watchers { - s.unsynced.watchers.add(wa) + s.unsynced.add(wa) } s.synced = newWatcherGroup() return nil diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 60fe949a3be..141627c8be6 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -295,36 +295,45 @@ func TestWatchFutureRev(t *testing.T) { } func TestWatchRestore(t *testing.T) { - b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) - defer cleanup(s, b, tmpPath) - - testKey := []byte("foo") - testValue := []byte("bar") - rev := s.Put(testKey, testValue, lease.NoLease) - - newBackend, newPath := backend.NewDefaultTmpBackend() - newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil) - defer cleanup(newStore, newBackend, newPath) - - w := newStore.NewWatchStream() - w.Watch(testKey, nil, rev-1) - - newStore.Restore(b) - select { - case resp := <-w.Chan(): - if resp.Revision != rev { - t.Fatalf("rev = %d, want %d", resp.Revision, rev) - } - if len(resp.Events) != 1 { - t.Fatalf("failed to get events from the response") - } - if resp.Events[0].Kv.ModRevision != rev { - t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev) + test := func(delay time.Duration) func(t *testing.T) { + return func(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}, nil) + defer cleanup(s, b, tmpPath) + + testKey := []byte("foo") + testValue := []byte("bar") + rev := s.Put(testKey, testValue, lease.NoLease) + + newBackend, newPath := backend.NewDefaultTmpBackend() + newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil) + defer cleanup(newStore, newBackend, newPath) + + w := newStore.NewWatchStream() + w.Watch(testKey, nil, rev-1) + + time.Sleep(delay) + + newStore.Restore(b) + select { + case resp := <-w.Chan(): + if resp.Revision != rev { + t.Fatalf("rev = %d, want %d", resp.Revision, rev) + } + if len(resp.Events) != 1 { + t.Fatalf("failed to get events from the response") + } + if resp.Events[0].Kv.ModRevision != rev { + t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev) + } + case <-time.After(time.Second): + t.Fatal("failed to receive event in 1 second.") + } } - case <-time.After(time.Second): - t.Fatal("failed to receive event in 1 second.") } + + t.Run("Normal", test(0)) + t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration } // TestWatchBatchUnsynced tests batching on unsynced watchers