Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: recursive blocking queries are not triggered by partial nested deletes #195

Closed
abursavich opened this issue Jun 6, 2014 · 6 comments · Fixed by #577
Closed

kv: recursive blocking queries are not triggered by partial nested deletes #195

abursavich opened this issue Jun 6, 2014 · 6 comments · Fixed by #577
Labels
type/bug Feature does not function as expected

Comments

@abursavich
Copy link
Contributor

I figured it'd be easiest to add some quick and dirty test cases to consul-kv to demonstrate.

// delete a key -- fails :(
func TestClient_WatchList_DeleteKey(t *testing.T) {
    client := testClient(t)
    client.config.WaitTime = 250 * time.Millisecond
    prefix := testKey()
    key := ""
    value := []byte("testing")
    defer client.DeleteTree(prefix)
    for i := 0; i < 10; i++ {
        key = path.Join(prefix, testKey())
        if err := client.Put(key, value, 0); err != nil {
            t.Errorf("err: %v", err)
            return
        }
    }
    meta, _, err := client.List(prefix)
    if err != nil {
        t.Errorf("err: %v", err)
        return
    }
    go func() {
        time.Sleep(100 * time.Millisecond)
        if err := client.Delete(key); err != nil {
            t.Errorf("err: %v", err)
        }
    }()
    start := time.Now()
    client.WatchList(prefix, meta.ModifyIndex)
    if time.Since(start) > client.config.WaitTime {
        t.Errorf("Wait timed out")
    }
}

// delete the prefix -- works!
func TestClient_WatchList_DeletePrefix(t *testing.T) {
    client := testClient(t)
    client.config.WaitTime = 250 * time.Millisecond
    prefix := testKey()
    value := []byte("testing")
    defer client.DeleteTree(prefix)
    for i := 0; i < 10; i++ {
        key := path.Join(prefix, testKey())
        if err := client.Put(key, value, 0); err != nil {
            t.Errorf("err: %v", err)
            return
        }
    }
    meta, _, err := client.List(prefix)
    if err != nil {
        t.Errorf("err: %v", err)
        return
    }
    go func() {
        time.Sleep(100 * time.Millisecond)
        if err := client.DeleteTree(prefix); err != nil {
            t.Errorf("err: %v", err)
        }
    }()
    start := time.Now()
    client.WatchList(prefix, meta.ModifyIndex)
    if time.Since(start) > client.config.WaitTime {
        t.Errorf("Wait timed out")
    }
}

// add a new key -- works!
func TestClient_WatchList_Add(t *testing.T) {
    client := testClient(t)
    client.config.WaitTime = 250 * time.Millisecond
    prefix := testKey()
    value := []byte("testing")
    defer client.DeleteTree(prefix)
    for i := 0; i < 10; i++ {
        key := path.Join(prefix, testKey())
        if err := client.Put(key, value, 0); err != nil {
            t.Errorf("err: %v", err)
            return
        }
    }
    meta, _, err := client.List(prefix)
    if err != nil {
        t.Errorf("err: %v", err)
        return
    }
    go func() {
        time.Sleep(100 * time.Millisecond)
        key := path.Join(prefix, testKey())
        if ok, err := client.CAS(key, value, 0, 0); err != nil {
            t.Errorf("err: %v", err)
        } else if !ok {
            t.Errorf("CAS failure")
        }
    }()
    start := time.Now()
    client.WatchList(prefix, meta.ModifyIndex)
    if time.Since(start) > client.config.WaitTime {
        t.Errorf("Wait timed out")
    }
}

// overwrite an existing key -- works!
func TestClient_WatchList_Put(t *testing.T) {
    client := testClient(t)
    client.config.WaitTime = 250 * time.Millisecond
    prefix := testKey()
    value := []byte("testing")
    defer client.DeleteTree(prefix)
    key := ""
    for i := 0; i < 10; i++ {
        key = path.Join(prefix, testKey())
        if err := client.Put(key, value, 0); err != nil {
            t.Errorf("err: %v", err)
            return
        }
    }
    meta, _, err := client.List(prefix)
    if err != nil {
        t.Errorf("err: %v", err)
        return
    }
    go func() {
        time.Sleep(100 * time.Millisecond)
        if err := client.Put(key, value, 0); err != nil {
            t.Errorf("err: %v", err)
        }
    }()
    start := time.Now()
    client.WatchList(prefix, meta.ModifyIndex)
    if time.Since(start) > client.config.WaitTime {
        t.Errorf("Wait timed out")
    }
}
$ go test -run="WatchList"
--- FAIL: TestClient_WatchList_DeleteKey (0.32 seconds)
    client_test.go:253: Wait timed out
FAIL
exit status 1
FAIL    github.com/armon/consul-kv  0.806s
@armon
Copy link
Member

armon commented Jun 6, 2014

Yeah this is a current limitation, haven't found a great way to fix it yet. Probably need to implement tombstones, which are then periodically GC'd.

Issue is this:

  1. Index of a key lookup is max(index for key in result)
  2. Blocking query waits for index > i
  3. Delete removes the key
  4. The max index can now go backwards (absence of a key)
  5. Blocking query never detects change

To fix this, there must be a mechanism by which the index rolls forward.
Detecting the absence of information is difficult to say the least, so instead
we must put a "tombstone" record indicating that the key was deleted. We do not
return this to the client, but process it locally, allowing the clock to be monotonic.

This introduces a new problem which is that tombstones now accumulate.
At some point it must be cleared, and then you have the same issue.

@abursavich
Copy link
Contributor Author

That makes sense. I skimmed through the code that runs the query this evening and added something that will complete a blocking query early if the length of the entries drops, but it's so hacky and not technically correct (the delete may occur before the first run) that I don't want to submit a pull request.

@abursavich
Copy link
Contributor Author

Well, ok... Here's the diff in case you're interested in seeing the hack ;o)

diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go
index 91d8f3b..43f3778 100644
--- a/consul/kvs_endpoint.go
+++ b/consul/kvs_endpoint.go
@@ -100,7 +100,10 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e

    // Get the local state
    state := k.srv.fsm.State()
-   return k.srv.blockingRPC(&args.QueryOptions,
+   var maxIndex uint64
+   var prevLen int
+   var terminated bool
+   err := k.srv.blockingRPC(&args.QueryOptions,
        &reply.QueryMeta,
        state.QueryTables("KVSList"),
        func() error {
@@ -119,18 +122,29 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
                reply.Entries = nil
            } else {
                // Determine the maximum affected index
-               var maxIndex uint64
+               maxIndex = 0
                for _, e := range ent {
                    if e.ModifyIndex > maxIndex {
                        maxIndex = e.ModifyIndex
                    }
                }
-
-               reply.Index = maxIndex
                reply.Entries = ent
+               // ugly hack to terminate the query upon deletion of key
+               if len(ent) < prevLen {
+                   reply.Index = args.MinQueryIndex + 1
+                   terminated = true
+               } else {
+                   reply.Index = maxIndex
+                   prevLen = len(ent)
+               }
            }
            return nil
        })
+   // recover from ugly hack
+   if terminated {
+       reply.Index = maxIndex
+   }
+   return err
 }

 // ListKeys is used to list all keys with a given prefix to a seperator

@abursavich
Copy link
Contributor Author

^ diff edited to be slightly less but still hacky

@armon
Copy link
Member

armon commented Jun 6, 2014

Yeah this is a bit sketchy since it doesn't work well depending on the ordering of commands. e.g. if the delete happens before the watch, it will still hang. Also the index returned is not actually accurate.

@prepor
Copy link

prepor commented Dec 10, 2014

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Feature does not function as expected
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants