diff --git a/share/eds/inverted_index.go b/share/eds/inverted_index.go index 0e3665d973..d302932369 100644 --- a/share/eds/inverted_index.go +++ b/share/eds/inverted_index.go @@ -8,10 +8,13 @@ import ( "github.com/filecoin-project/dagstore/index" "github.com/filecoin-project/dagstore/shard" ds "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" "github.com/multiformats/go-multihash" + + dsbadger "github.com/celestiaorg/go-ds-badger4" ) +const invertedIndexPath = "/inverted_index/" + // simpleInvertedIndex is an inverted index that only stores a single shard key per multihash. Its // implementation is modified from the default upstream implementation in dagstore/index. type simpleInvertedIndex struct { @@ -21,10 +24,21 @@ type simpleInvertedIndex struct { // newSimpleInvertedIndex returns a new inverted index that only stores a single shard key per // multihash. This is because we use badger as a storage backend, so updates are expensive, and we // don't care which shard is used to serve a cid. -func newSimpleInvertedIndex(dts ds.Batching) *simpleInvertedIndex { - return &simpleInvertedIndex{ - ds: namespace.Wrap(dts, ds.NewKey("/inverted/index")), +func newSimpleInvertedIndex(storePath string) (*simpleInvertedIndex, error) { + opts := dsbadger.DefaultOptions // this should be copied + // turn off value log GC + opts.GcInterval = 0 + // 20 compactors show to have no hangups on put operation up to 40k blocks with eds size 128. + opts.NumCompactors = 20 + // use minimum amount of NumLevelZeroTables to trigger L0 compaction faster + opts.NumLevelZeroTables = 1 + + ds, err := dsbadger.NewDatastore(storePath+invertedIndexPath, &opts) + if err != nil { + return nil, fmt.Errorf("can't open Badger Datastore: %w", err) } + + return &simpleInvertedIndex{ds: ds}, nil } func (s *simpleInvertedIndex) AddMultihashesForShard( @@ -75,3 +89,7 @@ func (s *simpleInvertedIndex) GetShardsForMultihash(ctx context.Context, mh mult return []shard.Key{shardKey}, nil } + +func (s *simpleInvertedIndex) close() error { + return s.ds.Close() +} diff --git a/share/eds/inverted_index_test.go b/share/eds/inverted_index_test.go index 8fb037bb92..e83c2be267 100644 --- a/share/eds/inverted_index_test.go +++ b/share/eds/inverted_index_test.go @@ -5,8 +5,6 @@ import ( "testing" "github.com/filecoin-project/dagstore/shard" - "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) @@ -37,11 +35,12 @@ func TestMultihashesForShard(t *testing.T) { } mi := &mockIterator{mhs: mhs} - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - invertedIndex := newSimpleInvertedIndex(ds) + path := t.TempDir() + invertedIndex, err := newSimpleInvertedIndex(path) + require.NoError(t, err) // 1. Add all 3 multihashes to shard1 - err := invertedIndex.AddMultihashesForShard(ctx, mi, shard.KeyFromString("shard1")) + err = invertedIndex.AddMultihashesForShard(ctx, mi, shard.KeyFromString("shard1")) require.NoError(t, err) shardKeys, err := invertedIndex.GetShardsForMultihash(ctx, mhs[0]) require.NoError(t, err) diff --git a/share/eds/store.go b/share/eds/store.go index cd85380ba1..2218e13954 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -56,8 +56,8 @@ type Store struct { cache *blockstoreCache bs bstore.Blockstore - topIdx index.Inverted - carIdx index.FullIndexRepo + carIdx index.FullIndexRepo + invertedIdx *simpleInvertedIndex basepath string gcInterval time.Duration @@ -83,14 +83,17 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { return nil, fmt.Errorf("failed to create index repository: %w", err) } - invertedRepo := newSimpleInvertedIndex(ds) + invertedIdx, err := newSimpleInvertedIndex(basepath) + if err != nil { + return nil, fmt.Errorf("failed to create index: %w", err) + } dagStore, err := dagstore.NewDAGStore( dagstore.Config{ TransientsDir: basepath + transientsPath, IndexRepo: fsRepo, Datastore: ds, MountRegistry: r, - TopLevelIndex: invertedRepo, + TopLevelIndex: invertedIdx, }, ) if err != nil { @@ -103,13 +106,13 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { } store := &Store{ - basepath: basepath, - dgstr: dagStore, - topIdx: invertedRepo, - carIdx: fsRepo, - gcInterval: defaultGCInterval, - mounts: r, - cache: cache, + basepath: basepath, + dgstr: dagStore, + carIdx: fsRepo, + invertedIdx: invertedIdx, + gcInterval: defaultGCInterval, + mounts: r, + cache: cache, } store.bs = newBlockstore(store, cache) return store, nil @@ -137,6 +140,9 @@ func (s *Store) Start(ctx context.Context) error { // Stop stops the underlying DAGStore. func (s *Store) Stop(context.Context) error { defer s.cancel() + if err := s.invertedIdx.close(); err != nil { + return err + } return s.dgstr.Close() }