Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/more testcases compaction #326

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21 AS builder_src
FROM golang:1.23 AS builder_src

COPY jemalloc-install.sh .
RUN apt-get update -y
Expand Down Expand Up @@ -48,4 +48,4 @@ EXPOSE 8080 40000

ENV GOMAXPROCS=128

CMD ["./server"]
CMD ["./server"]
11 changes: 5 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/mimiro-io/datahub

go 1.21
go 1.23.1

require (
github.com/DataDog/datadog-go/v5 v5.5.0
Expand All @@ -26,20 +26,21 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require github.com/mimiro-io/entity-graph-data-model v0.7.7
require (
github.com/dgraph-io/ristretto v0.1.1
github.com/mimiro-io/entity-graph-data-model v0.7.7
)

require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-sourcemap/sourcemap v2.1.4+incompatible // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down Expand Up @@ -82,13 +83,11 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/protobuf v1.34.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
438 changes: 2 additions & 436 deletions go.sum

Large diffs are not rendered by default.

27 changes: 15 additions & 12 deletions internal/service/dataset/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/mimiro-io/datahub/internal/server"
"github.com/mimiro-io/datahub/internal/service/store"
"github.com/mimiro-io/datahub/internal/service/types"
"go.uber.org/zap"
"time"
)

type CompactionWorker struct {
Expand All @@ -25,6 +26,7 @@ func NewCompactor(store *server.Store, dsm *server.DsManager, logger *zap.Sugare
logger: logger.Named("compaction-worker"),
}
}

func (c *CompactionWorker) CompactAsync(datasetID string, strategy CompactionStrategy) error {
if !c.running {
c.running = true
Expand Down Expand Up @@ -53,10 +55,10 @@ func (c *CompactionWorker) compact(datasetID string, strategy CompactionStrategy
defer txn.Discard()

// 1. go through these:
//datasetEntitiesLatestVersionKey := make([]byte, 14)
//binary.BigEndian.PutUint16(datasetEntitiesLatestVersionKey, DatasetLatestEntities)
//binary.BigEndian.PutUint32(datasetEntitiesLatestVersionKey[2:], ds.InternalID)
//binary.BigEndian.PutUint64(datasetEntitiesLatestVersionKey[6:], rid)
// datasetEntitiesLatestVersionKey := make([]byte, 14)
// binary.BigEndian.PutUint16(datasetEntitiesLatestVersionKey, DatasetLatestEntities)
// binary.BigEndian.PutUint32(datasetEntitiesLatestVersionKey[2:], ds.InternalID)
// binary.BigEndian.PutUint64(datasetEntitiesLatestVersionKey[6:], rid)
seekLatestChanges := store.SeekLatestChanges(dsId)
opts := badger.DefaultIteratorOptions
opts.Prefix = seekLatestChanges
Expand All @@ -76,23 +78,24 @@ func (c *CompactionWorker) compact(datasetID string, strategy CompactionStrategy
return err
}
cnt++
//keysToBeDeleted = append(keysToBeDeleted, newKeys...)
// keysToBeDeleted = append(keysToBeDeleted, newKeys...)
if time.Since(ts) > 15*time.Second {
ts = time.Now()
c.logger.Infof("compacting dataset %v, processed %v entities, removed keys so far: %v", datasetID, cnt, strategy.stats())
}

}
_, err4 := flushDeletes(c.bs, ops, true, strategy)
if err4 != nil {
return err4
_, err := flushDeletes(c.bs, ops, true, strategy)
if err != nil {
return err
}
c.logger.Infof("compacted dataset %s in %v, removed keys: %v", datasetID, time.Since(startTs), strategy.stats())
return nil
}

func (c *CompactionWorker) forEntity(dsId types.InternalDatasetID, internalEntityID types.InternalID, txn *badger.Txn,
strategy CompactionStrategy, ops *compactionInstruction) error {
strategy CompactionStrategy, ops *compactionInstruction,
) error {
entityLocatorPrefixBuffer := store.SeekEntityChanges(dsId, internalEntityID)
opts1 := badger.DefaultIteratorOptions
opts1.PrefetchValues = false
Expand Down Expand Up @@ -174,14 +177,14 @@ func flushDeletes(bs store.BadgerStore, ops *compactionInstruction, finalFlush b
}
}
}
//fmt.Println("deleted", len(all), "keys")
// fmt.Println("deleted", len(all), "keys")
for i, key := range ops.RewriteKeys {
err2 := txn.Set(key, ops.RewriteValues[i])
if err2 != nil {
return err2
}
}
//fmt.Println("rewritten", len(ops.RewriteKeys), "keys")
// fmt.Println("rewritten", len(ops.RewriteKeys), "keys")
return nil
})
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions internal/service/dataset/compact_stategy_deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package dataset
import (
"bytes"
"encoding/binary"
"reflect"

"github.com/dgraph-io/badger/v4"
"github.com/mimiro-io/datahub/internal/server"
"github.com/mimiro-io/datahub/internal/service/entity"
"go.uber.org/zap"
"reflect"
)

type deduplicationStrategy struct {
Expand Down Expand Up @@ -50,7 +51,8 @@ func (d *deduplicationStrategy) eval(
jsonKey []byte,
isFirstVersion bool,
isLatestVersion bool,
txn *badger.Txn) (*compactionInstruction, error) {
txn *badger.Txn,
) (*compactionInstruction, error) {
// if this is the first version of the entity, we just need to keep it
if isFirstVersion {
d.prevJsonKey = jsonKey
Expand All @@ -76,8 +78,8 @@ func (d *deduplicationStrategy) eval(
}

// 3.delete change log entry (need to iterate over all change versions, match value with json key)
//ts := time.Now()
//del = append(del, d.findChangeLogKeys(jsonKey, txn, false)...)
// ts := time.Now()
// del = append(del, d.findChangeLogKeys(jsonKey, txn, false)...)
d.changeBuffer[[24]byte(jsonKey)] = 1
d.counts["changeLog"]++
//fmt.Printf("findChangeLogKeys took: %v\n", time.Since(ts))
Expand Down Expand Up @@ -113,7 +115,6 @@ func (d *deduplicationStrategy) eval(
if len(refsToDel) == len(refsToDelPrev) {
identical = true
for i, ref := range refsToDel {

if !bytes.Equal(ref, refsToDelPrev[i]) {
identical = false
break
Expand Down
27 changes: 15 additions & 12 deletions internal/service/dataset/compact_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dataset
import (
"encoding/binary"
"fmt"

"github.com/dgraph-io/badger/v4"
"github.com/mimiro-io/datahub/internal/server"
"github.com/mimiro-io/datahub/internal/service/entity"
Expand Down Expand Up @@ -62,26 +63,28 @@ func (i *compactionInstruction) reset() {
i.RewriteValues = make([][]byte, 0)
}

type recordedStrategy struct{}
type maxVersionStrategy struct{}

var (
DeduplicationStrategy = func() CompactionStrategy {
return &deduplicationStrategy{counts: make(map[string]int), changeBuffer: make(map[[24]byte]byte)}
}
//RecordedStrategy = func() CompactionStrategy { return &recordedStrategy{} }
//MaxVersionStrategy = func() CompactionStrategy { return &maxVersionStrategy{} }
type (
recordedStrategy struct{}
maxVersionStrategy struct{}
)

var DeduplicationStrategy = func() CompactionStrategy {
return &deduplicationStrategy{counts: make(map[string]int), changeBuffer: make(map[[24]byte]byte)}
}

// RecordedStrategy = func() CompactionStrategy { return &recordedStrategy{} }
// MaxVersionStrategy = func() CompactionStrategy { return &maxVersionStrategy{} }

func mkLatestKey(jsonKey []byte) []byte {
//2:6, dataset id
//6:14 entity id)
// 2:6, dataset id
// 6:14 entity id)
datasetEntitiesLatestVersionKey := make([]byte, 14)
binary.BigEndian.PutUint16(datasetEntitiesLatestVersionKey, server.DatasetLatestEntities)
copy(datasetEntitiesLatestVersionKey[2:], jsonKey[10:14])
copy(datasetEntitiesLatestVersionKey[6:], jsonKey[2:10])
return datasetEntitiesLatestVersionKey
}

func findRefs(ent *server.Entity, jsonKey []byte, txn *badger.Txn, lookup entity.Lookup) ([][]byte, error) {
refsToDel := make([][]byte, 0)
for k, stringOrArrayValue := range ent.References {
Expand Down Expand Up @@ -121,7 +124,7 @@ func processRefs(
return nil, er
}

//fmt.Println("building refs for entity", ent.InternalID, "pred", k, "related", relatedid, "recorded", ent.Recorded)
// fmt.Println("building refs for entity", ent.InternalID, "pred", k, "related", relatedid, "recorded", ent.Recorded)
// delete outgoing references
// 0:2: outgoing ref index, uint16
// 2:10: this entity id, uint64
Expand Down
Loading
Loading