Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make sure we emit only entities existing in the main dataset in multi… #207

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion internal/jobs/source/multi_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,30 @@ func (multiSource *MultiSource) processDependency(dep Dependency, d *MultiDatase
e := r[2].(*server.Entity)
if _, ok := dedupCache[e.InternalID]; !ok {
dedupCache[e.InternalID] = true
entities = append(entities, e)
// we need to load the target entity only with target dataset scope, else we risk merged entities as result.
//
// normally this is rather costy, since GetEntity creates a new read transacton per call.
// But multiSource is a slow source to begin with, so it's not that noticable in the sum of things going on.
//
// To make the whole source more performant, we could consider performing all read operations
// in a single read transaction while also accessing badger indexes with more specific read keys.
// (minimize the number of badger keys we iterate over in store.GetX operations for this source).
//
// For example:
// GetEntity here goes through all datasets containing the desired entity ID. If we create
// a new API function which includes the target dataset in the bagder search key this would be more performant in
// cases where the ID exists in many datasets. Even better if we can have an API function version that allows
// reuse of a common read transaction.
mainEntity, err := multiSource.Store.GetEntity(e.ID, []string{multiSource.DatasetName})
if err != nil {
return fmt.Errorf("Could not load entity %+v from dataset %v", e, multiSource.DatasetName)
}
// GetEntity is a Query Function. As such it returns skeleton Entities which only contain an ID for
// references to non-existing entities. Checking for recorded tells us whether this is a real entity hit
// on an "open graph" reference.
if mainEntity.Recorded > 0 {
entities = append(entities, mainEntity)
}
}
}
} else {
Expand Down
71 changes: 67 additions & 4 deletions internal/jobs/source/multi_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (
"encoding/json"
"fmt"
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/mimiro-io/datahub/internal"
"os"
"testing"

"github.com/franela/goblin"
"github.com/mimiro-io/datahub/internal"
"go.uber.org/fx/fxtest"
"go.uber.org/zap"
"os"
"testing"

"github.com/mimiro-io/datahub/internal/conf"
"github.com/mimiro-io/datahub/internal/jobs/source"
Expand Down Expand Up @@ -1011,6 +1010,70 @@ func TestMultiSource(t *testing.T) {
g.Assert(m["Hank"]).IsTrue()
g.Assert(m["Lisa"]).IsTrue()
})
g.It("Should only emit main entities that exist in main dataset", func() {
// people <- employment
_, peoplePrefix := createTestDataset("people", []string{"Bob", "Alice"}, nil, dsm, g, store)
copyDs, _ := dsm.CreateDataset("peopleTwo", nil)
copyDs.StoreEntities([]*server.Entity{
server.NewEntityFromMap(map[string]interface{}{
"id": peoplePrefix + ":Bob",
"props": map[string]interface{}{"name": "Bob"},
"refs": map[string]interface{}{},
}),
})
employmentDs, employmentPrefix := createTestDataset("employment",
[]string{"MediumCorp", "LittleSweatshop", "YardSale"}, map[string]map[string]interface{}{
"MediumCorp": {peoplePrefix + ":employment": peoplePrefix + ":Bob"},
"LittleSweatshop": {peoplePrefix + ":employment": peoplePrefix + ":Alice"},
}, dsm, g, store)

testSource := source.MultiSource{DatasetName: "people", Store: store, DatasetManager: dsm}
srcJSON := `{ "Type" : "MultiSource", "Name" : "people", "Dependencies": [ {
"dataset": "employment",
"joins": [ { "dataset": "people", "predicate": "http://people/employment", "inverse": false } ]
} ] }`

srcConfig := map[string]interface{}{}
_ = json.Unmarshal([]byte(srcJSON), &srcConfig)
_ = testSource.ParseDependencies(srcConfig["Dependencies"])

//fullsync
var recordedEntities []server.Entity
token := &source.MultiDatasetContinuation{}
var lastToken source.DatasetContinuation
testSource.StartFullSync()
err := testSource.ReadEntities(token, 1000, func(entities []*server.Entity, token source.DatasetContinuation) error {
lastToken = token
for _, e := range entities {
recordedEntities = append(recordedEntities, *e)
}
return nil
})
g.Assert(err).IsNil()
testSource.EndFullSync()

//now, add new Employment with refs to Bob (exists) and Franz (non-exists)
err = employmentDs.StoreEntities([]*server.Entity{
server.NewEntityFromMap(map[string]interface{}{
"id": employmentPrefix + ":YardSale",
"props": map[string]interface{}{"name": "YardSale"},
"refs": map[string]interface{}{peoplePrefix + ":employment": []string{peoplePrefix + ":Franz", peoplePrefix + ":Bob"}},
}),
})

recordedEntities = []server.Entity{}
err = testSource.ReadEntities(lastToken, 1000, func(entities []*server.Entity, token source.DatasetContinuation) error {
lastToken = token
for _, e := range entities {
recordedEntities = append(recordedEntities, *e)
}
return nil
})
g.Assert(err).IsNil()
g.Assert(len(recordedEntities)).Eql(1, "YardSale change points to non-existing entity 'Franz' in people, therefore only Bob should be emitted")
g.Assert(recordedEntities[0].ID).Eql(peoplePrefix + ":Bob")
g.Assert(recordedEntities[0].Properties["name"]).Eql("Bob", "Bob exists in two datasets. making sure we dont get a merged result")
})

g.Describe("parseDependencies", func() {
g.It("should translate json to config", func() {
Expand Down