Skip to content

Commit

Permalink
fix: dont ignore deleted state when compacting refs
Browse files Browse the repository at this point in the history
  • Loading branch information
rompetroll committed Aug 30, 2024
1 parent 1cd2a06 commit 4c27c40
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 16 deletions.
2 changes: 1 addition & 1 deletion internal/service/dataset/compact_stategy_deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (d *deduplicationStrategy) eval(
}
del = append(del, refs...)
d.counts["refs"] += len(refs)
} else {
} else if e.IsDeleted == d.prev.IsDeleted {
// if the entity is not equal to the previous entity, we can still check for just reference duplicates
for k, stringOrArrayValue := range e.References {
if reflect.DeepEqual(d.prev.References[k], stringOrArrayValue) {
Expand Down
167 changes: 152 additions & 15 deletions internal/service/dataset/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dataset
import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/dgraph-io/badger/v4"
"github.com/mimiro-io/datahub/internal/conf"
Expand Down Expand Up @@ -162,12 +161,15 @@ func TestCompact(t *testing.T) {
}
t.Run("using deduplication", func(t *testing.T) {
for _, flushThreshold := range []int{1, 2, 100000} {
// if true {
//flushThreshold := 1
strat := func() CompactionStrategy {
s := DeduplicationStrategy()
s.(*deduplicationStrategy).flushAfter = flushThreshold
return s
}
t.Run("with flush threshold "+strconv.Itoa(flushThreshold), func(t *testing.T) {
//t.Run("with flush threshold 1", func(t *testing.T) {

t.Run("empty dataset", func(t *testing.T) {
defer setup()()
Expand Down Expand Up @@ -265,13 +267,16 @@ func TestCompact(t *testing.T) {
t.Run("stored in different batches", func(t *testing.T) {
defer setup()()
// create a dataset with 3 entities
// entity 2 is deleted without prior version, so datahub should not store any refs for it
mkDs(t, "people", store,
[]any{"http://ns/1", false, nil, `{"ns3:ref1": "ns3:2", "ns4:r1": "ns4:2"}`},
[]any{"http://ns/2", true},
[]any{"http://ns/2", true, nil, `{ "ns4:r1": ["ns4:2", "ns4:3"]}`},
[]any{"http://ns/3", false})
time.Sleep(1 * time.Millisecond) // make sure the txTime is different
// now update entity 2 to be not deleted
mkDs(t, "people", store,
[]any{"http://ns/1", false, `{"p": "a"}`, `{"ns3:ref1": "ns3:2", "ns4:r1": "ns4:2"}`},
[]any{"http://ns/2", false, nil, `{ "ns4:r1": ["ns4:2", "ns4:3"]}`},
)
time.Sleep(1 * time.Millisecond) // make sure the txTime is different
mkDs(t, "people", store,
Expand All @@ -281,9 +286,10 @@ func TestCompact(t *testing.T) {
// verify that the duplicate is present
checkChanges(t, store, "people", []any{
[]any{"ns3:1", false, nil, `{"ns3:ref1":"ns3:2", "ns4:r1":"ns4:2"}`},
[]any{"ns3:2", true},
[]any{"ns3:2", true, nil, `{"ns4:r1":["ns4:2", "ns4:3"]}`},
[]any{"ns3:3", false},
[]any{"ns3:1", false, nil, `{"ns3:ref1":"ns3:2", "ns4:r1":"ns4:2"}`},
[]any{"ns3:2", false, nil, `{"ns4:r1":["ns4:2", "ns4:3"]}`},
[]any{"ns3:1", false, nil, `{"ns3:ref1":"ns3:2", "ns4:r1":"ns4:2"}`},
})
if rcOut, rcIn := refCount(t, store, ba, "ns3:1", "ns3:ref1"); rcOut != 3 || rcIn != 3 {
Expand All @@ -292,15 +298,24 @@ func TestCompact(t *testing.T) {
if rcOut, rcIn := refCount(t, store, ba, "ns3:1", "ns4:r1"); rcOut != 3 || rcIn != 3 {
t.Fatalf("expected 3 ref out and 3 ref in, got %d and %d", rcOut, rcIn)
}
if rcOut, rcIn := refCount(t, store, ba, "ns3:2", "ns4:r1"); rcOut != 2 || rcIn != 2 {
t.Fatalf("expected 2 ref out and 2 ref in, got %d and %d", rcOut, rcIn)
}

newStats := getStats(t, ba, store, log)
peopleIncoming := newStats["refs"].(map[string]any)["INCOMING_REF_INDEX"].(map[string]any)["people"].(map[string]any)
peopleOutgoing := newStats["refs"].(map[string]any)["OUTGOING_REF_INDEX"].(map[string]any)["people"].(map[string]any)
if peopleIncoming["keys"] != 6.0 {
t.Fatalf("expected 6 incoming ref keys, got %.0f", peopleIncoming["keys"])
if peopleIncoming["keys"] != 8.0 {
t.Fatalf("expected 8 incoming ref keys, got %.0f", peopleIncoming["keys"])
}
if peopleOutgoing["keys"] != 6.0 {
t.Fatalf("expected 6 outgoing ref keys, got %.0f", peopleOutgoing["keys"])
if peopleOutgoing["keys"] != 8.0 {
t.Fatalf("expected 8 outgoing ref keys, got %.0f", peopleOutgoing["keys"])
}
if newStats["refs"].(map[string]any)["INCOMING_REF_INDEX"].(map[string]any)["people (deleted)"] != nil {
t.Fatalf("not expected incoming ref index for deleted entities")
}
if newStats["refs"].(map[string]any)["OUTGOING_REF_INDEX"].(map[string]any)["people (deleted)"] != nil {
t.Fatalf("not expected outgoing ref index for deleted entities")
}

// query for the entity before the compaction
Expand All @@ -318,9 +333,10 @@ func TestCompact(t *testing.T) {
// verify that the duplicate is gone
checkChanges(t, store, "people", []any{
[]any{"ns3:1", false, nil, `{"ns3:ref1":"ns3:2", "ns4:r1":"ns4:2"}`},
[]any{"ns3:2", true},
[]any{"ns3:2", true, nil, `{"ns4:r1":["ns4:2", "ns4:3"]}`},
[]any{"ns3:3", false},
[]any{"ns3:1", false, nil, `{"ns3:ref1":"ns3:2", "ns4:r1":"ns4:2"}`},
[]any{"ns3:2", false, nil, `{"ns4:r1":["ns4:2", "ns4:3"]}`},
[]any{"ns3:1", false, nil, `{"ns3:ref1":"ns3:2", "ns4:r1":"ns4:2"}`},
})

Expand All @@ -331,21 +347,29 @@ func TestCompact(t *testing.T) {
if rcOut, rcIn := refCount(t, store, ba, "ns3:1", "ns4:r1"); rcOut != 1 || rcIn != 1 {
t.Fatalf("expected 1 ref out and 1 ref in, got %d and %d", rcOut, rcIn)
}
if rcOut, rcIn := refCount(t, store, ba, "ns3:2", "ns4:r1"); rcOut != 2 || rcIn != 2 {
//t.Fatalf("expected 2 ref out and 2 ref in, got %d and %d", rcOut, rcIn)
}
newStats = getStats(t, ba, store, log)
peopleIncoming = newStats["refs"].(map[string]any)["INCOMING_REF_INDEX"].(map[string]any)["people"].(map[string]any)
peopleOutgoing = newStats["refs"].(map[string]any)["OUTGOING_REF_INDEX"].(map[string]any)["people"].(map[string]any)
if peopleIncoming["keys"] != 2.0 {
t.Fatalf("expected 2 incoming ref keys, got %.0f", peopleIncoming["keys"])
if peopleIncoming["keys"] != 4.0 {
t.Fatalf("expected 4 incoming ref keys, got %.0f", peopleIncoming["keys"])
}
if peopleOutgoing["keys"] != 2.0 {
t.Fatalf("expected 2 outgoing ref keys, got %.0f", peopleOutgoing["keys"])
if peopleOutgoing["keys"] != 4.0 {
t.Fatalf("expected 4 outgoing ref keys, got %.0f", peopleOutgoing["keys"])
}
if newStats["refs"].(map[string]any)["INCOMING_REF_INDEX"].(map[string]any)["people (deleted)"] != nil {
t.Fatalf("not expected incoming ref index for deleted entities")
}
if newStats["refs"].(map[string]any)["OUTGOING_REF_INDEX"].(map[string]any)["people (deleted)"] != nil {
t.Fatalf("not expected outgoing ref index for deleted entities")
}
})
t.Run("stored in same batch", func(t *testing.T) {
defer setup()()
// create a dataset with 3 entities
mkDs(t, "people", store,
[]any{"http://ns/1", false, nil, `{"ns3:ref1": "ns3:2"}`},
mkDs(t, "people", store, []any{"http://ns/1", false, nil, `{"ns3:ref1": "ns3:2"}`},
[]any{"http://ns/2", true},
[]any{"http://ns/1", false, `{"p": "a"}`, `{"ns3:ref1": "ns3:2"}`},
[]any{"http://ns/1", false, `{"p": "b"}`, `{"ns3:ref1": "ns3:2"}`},
Expand Down Expand Up @@ -408,6 +432,119 @@ func TestCompact(t *testing.T) {
}

})
t.Run("with alternating delete state", func(t *testing.T) {
defer setup()()
for _, e := range [][]any{
[]any{"http://ns/1", false, nil, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"http://ns/1", false, `{"p": "a"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"http://ns/1", true, `{"p": "b"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"http://ns/1", true, `{"p": "c"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"http://ns/1", true, `{"p": "d"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"http://ns/1", false, `{"p": "e"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"http://ns/1", false, `{"p": "f"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
} {
mkDs(t, "people", store, e)
time.Sleep(1 * time.Millisecond) // make sure the txTime is different
}

// verify that the duplicate is present
checkChanges(t, store, "people", []any{
[]any{"ns3:1", false, nil, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", false, `{"p": "a"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", true, `{"p": "b"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", true, `{"p": "c"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", true, `{"p": "d"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", false, `{"p": "e"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", false, `{"p": "f"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
})
if rcOut, rcIn := refCount(t, store, ba, "ns3:1", "ns3:ref1"); rcOut != 14 || rcIn != 14 {
t.Fatalf("expected 14 ref out and 14 ref in, got %d and %d", rcOut, rcIn)
}
newStats := getStats(t, ba, store, log)
peopleIncoming := newStats["refs"].(map[string]any)["INCOMING_REF_INDEX"].(map[string]any)["people"].(map[string]any)
peopleOutgoing := newStats["refs"].(map[string]any)["OUTGOING_REF_INDEX"].(map[string]any)["people"].(map[string]any)
if peopleIncoming["keys"] != 8.0 {
t.Fatalf("expected 8 incoming ref keys, got %.0f", peopleIncoming["keys"])
}
if peopleOutgoing["keys"] != 8.0 {
t.Fatalf("expected 8 outgoing ref keys, got %.0f", peopleOutgoing["keys"])
}
peopleIncoming = newStats["refs"].(map[string]any)["INCOMING_REF_INDEX"].(map[string]any)["people (deleted)"].(map[string]any)
peopleOutgoing = newStats["refs"].(map[string]any)["OUTGOING_REF_INDEX"].(map[string]any)["people (deleted)"].(map[string]any)
if peopleIncoming["keys"] != 6.0 {
t.Fatalf("expected 6 incoming deleted ref keys, got %.0f", peopleIncoming["keys"])
}
if peopleOutgoing["keys"] != 6.0 {
t.Fatalf("expected 6 outgoing ref keys, got %.0f", peopleOutgoing["keys"])
}

// query for the entity before the compaction
queryResult, _ := store.GetManyRelatedEntities([]string{"ns3:1"}, "ns3:ref1", false, nil, false)
if len(queryResult) != 2 {
t.Fatalf("expected 2 related entities, got %d", len(queryResult))
}
if queryResult[0][2].(*server.Entity).ID != "ns3:3" {
t.Fatalf("expected related entity id ns3:3, got %s", queryResult[0][2].(*server.Entity).ID)
}
if queryResult[1][2].(*server.Entity).ID != "ns3:2" {
t.Fatalf("expected related entity id ns3:2, got %s", queryResult[1][2].(*server.Entity).ID)
}
checkQuery(t, store, "ns3:2", "ns3:ref1", true, "ns3:1")
checkQuery(t, store, "ns3:3", "ns3:ref1", true, "ns3:1")

// Now do the compaction. it should not have to do anything since all refs have same txn time
if err := compactor.compact("people", strat()); err != nil {
t.Fatalf("error compacting dataset: %v", err)
}

// query for the entity after the compaction
queryResult, _ = store.GetManyRelatedEntities([]string{"ns3:1"}, "ns3:ref1", false, nil, false)
if len(queryResult) != 2 {
t.Fatalf("expected 2 related entities, got %d", len(queryResult))
}
if queryResult[0][2].(*server.Entity).ID != "ns3:3" {
t.Fatalf("expected related entity id ns3:3, got %s", queryResult[0][2].(*server.Entity).ID)
}
if queryResult[1][2].(*server.Entity).ID != "ns3:2" {
t.Fatalf("expected related entity id ns3:2, got %s", queryResult[1][2].(*server.Entity).ID)
}
checkQuery(t, store, "ns3:2", "ns3:ref1", true, "ns3:1")
checkQuery(t, store, "ns3:3", "ns3:ref1", true, "ns3:1")

// verify that the changes are still there
checkChanges(t, store, "people", []any{
[]any{"ns3:1", false, nil, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", false, `{"p": "a"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", true, `{"p": "b"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", true, `{"p": "c"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", true, `{"p": "d"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", false, `{"p": "e"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
[]any{"ns3:1", false, `{"p": "f"}`, `{"ns3:ref1": ["ns3:2", "ns3:3"]}`},
})

// check that reference duplicates are gone.
// 6 expected = 2 from first change, 2 from first delete (change nr3), 2 from undelete (change nr6)
if rcOut, rcIn := refCount(t, store, ba, "ns3:1", "ns3:ref1"); rcOut != 6 || rcIn != 6 {
t.Fatalf("expected 6 ref out and 6 ref in, got %d and %d", rcOut, rcIn)
}
newStats = getStats(t, ba, store, log)
peopleIncoming = newStats["refs"].(map[string]any)["INCOMING_REF_INDEX"].(map[string]any)["people"].(map[string]any)
peopleOutgoing = newStats["refs"].(map[string]any)["OUTGOING_REF_INDEX"].(map[string]any)["people"].(map[string]any)
if peopleIncoming["keys"] != 4.0 {
t.Fatalf("expected 4 incoming ref keys, got %.0f", peopleIncoming["keys"])
}
if peopleOutgoing["keys"] != 4.0 {
t.Fatalf("expected 4 outgoing ref keys, got %.0f", peopleOutgoing["keys"])
}
peopleIncoming = newStats["refs"].(map[string]any)["INCOMING_REF_INDEX"].(map[string]any)["people (deleted)"].(map[string]any)
peopleOutgoing = newStats["refs"].(map[string]any)["OUTGOING_REF_INDEX"].(map[string]any)["people (deleted)"].(map[string]any)
if peopleIncoming["keys"] != 2.0 {
t.Fatalf("expected 2 incoming deleted ref keys, got %.0f", peopleIncoming["keys"])
}
if peopleOutgoing["keys"] != 2.0 {
t.Fatalf("expected 2 outgoing ref keys, got %.0f", peopleOutgoing["keys"])
}
})
})

// TODO: try to cancel/abort mid compaction, make sure no change is semi-deleted (transactional integrity)
Expand Down Expand Up @@ -437,7 +574,7 @@ func getStats(t *testing.T, ba server.BadgerAccess, store *server.Store, log *za
su := scheduler.NewStatisticsUpdater(log, ba)
su.Run()
for su.State() != scheduler.TaskStateScheduled {
fmt.Println("waiting for stats update")
//fmt.Println("waiting for stats update")
time.Sleep(100 * time.Millisecond)
}
sr := &server.Statistics{
Expand Down

0 comments on commit 4c27c40

Please sign in to comment.