diff --git a/.github/workflows/ci_release.yml b/.github/workflows/ci_release.yml index 6e9293cc38..271cab472f 100644 --- a/.github/workflows/ci_release.yml +++ b/.github/workflows/ci_release.yml @@ -25,7 +25,7 @@ on: jobs: # Dockerfile Linting hadolint: - uses: celestiaorg/.github/.github/workflows/reusable_dockerfile_lint.yml@v0.2.2 # yamllint disable-line rule:line-length + uses: celestiaorg/.github/.github/workflows/reusable_dockerfile_lint.yml@v0.2.3 # yamllint disable-line rule:line-length with: dockerfile: Dockerfile @@ -33,7 +33,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: celestiaorg/.github/.github/actions/yamllint@v0.2.2 + - uses: celestiaorg/.github/.github/actions/yamllint@v0.2.3 markdown-lint: name: Markdown Lint @@ -65,10 +65,10 @@ jobs: # version_bump section, it would skip and not run, which would result # in goreleaser not running either. if: ${{ github.event_name == 'workflow_dispatch' }} - uses: mathieudutour/github-tag-action@v6.0 + uses: mathieudutour/github-tag-action@v6.1 - name: Version Release - uses: celestiaorg/.github/.github/actions/version-release@v0.2.2 + uses: celestiaorg/.github/.github/actions/version-release@v0.2.3 with: github_token: ${{ secrets.GITHUB_TOKEN }} default_bump: ${{ inputs.version }} @@ -88,7 +88,7 @@ jobs: with: go-version: 1.21 # Generate the binaries and release - - uses: goreleaser/goreleaser-action@v4 + - uses: goreleaser/goreleaser-action@v5 with: distribution: goreleaser version: latest diff --git a/go.mod b/go.mod index 8cda25d380..dc7aeb1676 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/golang/mock v1.6.0 github.com/gorilla/mux v1.8.0 github.com/hashicorp/go-retryablehttp v0.7.4 - github.com/hashicorp/golang-lru/v2 v2.0.5 + github.com/hashicorp/golang-lru/v2 v2.0.6 github.com/imdario/mergo v0.3.16 github.com/ipfs/boxo v0.12.0 github.com/ipfs/go-block-format v0.2.0 @@ -59,8 +59,8 @@ require ( go.opentelemetry.io/contrib/instrumentation/runtime v0.43.0 go.opentelemetry.io/otel v1.18.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.41.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.18.0 go.opentelemetry.io/otel/metric v1.18.0 go.opentelemetry.io/otel/sdk v1.18.0 go.opentelemetry.io/otel/sdk/metric v0.41.0 diff --git a/go.sum b/go.sum index 354b14ac32..2959221c98 100644 --- a/go.sum +++ b/go.sum @@ -980,8 +980,8 @@ github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uG github.com/hashicorp/golang-lru/arc/v2 v2.0.5 h1:l2zaLDubNhW4XO3LnliVj0GXO3+/CGNJAg1dcN2Fpfw= github.com/hashicorp/golang-lru/arc/v2 v2.0.5/go.mod h1:ny6zBSQZi2JxIeYcv7kt2sH2PXJtirBN7RDhRpxPkxU= github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= -github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= -github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hashicorp/golang-lru/v2 v2.0.6 h1:3xi/Cafd1NaoEnS/yDssIiuVeDVywU0QdFGl3aQaQHM= +github.com/hashicorp/golang-lru/v2 v2.0.6/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= @@ -2386,10 +2386,10 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.41.0 h1:k0k7hFNDd8K4iOMJXj go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.41.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.41.0 h1:iV3BOgW4fry1Riw9dwypigqlIYWXvSRVT2RJmblzo40= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.41.0/go.mod h1:7PGzqlKrxIRmbj5tlNW0nTkYZ5fHXDgk6Fy8/KjR0CI= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0 h1:U5GYackKpVKlPrd/5gKMlrTlP2dCESAAFU682VCpieY= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0/go.mod h1:aFsJfCEnLzEu9vRRAcUiB/cpRTbVsNdF3OHSPpdjxZQ= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0 h1:kvWMtSUNVylLVrOE4WLUmBtgziYoCIYUNSpTYtMzVJI= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0/go.mod h1:SExUrRYIXhDgEKG4tkiQovd2HTaELiHUsuK08s5Nqx4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0 h1:IAtl+7gua134xcV3NieDhJHjjOVeJhXAnYf/0hswjUY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0/go.mod h1:w+pXobnBzh95MNIkeIuAKcHe/Uu/CX2PKIvBP6ipKRA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.18.0 h1:6pu8ttx76BxHf+xz/H77AUZkPF3cwWzXqAUsXhVKI18= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.18.0/go.mod h1:IOmXxPrxoxFMXdNy7lfDmE8MzE61YPcurbUm0SMjerI= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.15.1 h1:2PunuO5SbkN5MhCbuHCd3tC6qrcaj+uDAkX/qBU5BAs= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.15.1/go.mod h1:q8+Tha+5LThjeSU8BW93uUC5w5/+DnYHMKBMpRCsui0= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= diff --git a/share/eds/blockstore.go b/share/eds/blockstore.go index 349d6f58ba..9cbb3f4e8a 100644 --- a/share/eds/blockstore.go +++ b/share/eds/blockstore.go @@ -10,9 +10,8 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" ipld "github.com/ipfs/go-ipld-format" - - "github.com/celestiaorg/celestia-node/share/eds/cache" ) var _ bstore.Blockstore = (*blockstore)(nil) @@ -35,6 +34,13 @@ type blockstore struct { ds datastore.Batching } +func newBlockstore(store *Store, ds datastore.Batching) *blockstore { + return &blockstore{ + store: store, + ds: namespace.Wrap(ds, blockstoreCacheKey), + } +} + func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash()) if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { @@ -146,28 +152,17 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (* return nil, fmt.Errorf("failed to find shards containing multihash: %w", err) } - // check if cache contains any of accessors + // check if either cache contains an accessor shardKey := keys[0] - if accessor, err := bs.store.cache.Get(shardKey); err == nil { + accessor, err := bs.store.cache.Get(shardKey) + if err == nil { return blockstoreCloser(accessor) } - // load accessor to the cache and use it as blockstoreCloser - accessor, err := bs.store.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor) + // load accessor to the blockstore cache and use it as blockstoreCloser + accessor, err = bs.store.cache.Second().GetOrLoad(ctx, shardKey, bs.store.getAccessor) if err != nil { return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err) } return blockstoreCloser(accessor) } - -// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor -func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) { - bs, err := ac.Blockstore() - if err != nil { - return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err) - } - return &BlockstoreCloser{ - ReadBlockstore: bs, - Closer: ac, - }, nil -} diff --git a/share/eds/cache/doublecache.go b/share/eds/cache/doublecache.go new file mode 100644 index 0000000000..a63eadee9e --- /dev/null +++ b/share/eds/cache/doublecache.go @@ -0,0 +1,51 @@ +package cache + +import ( + "errors" + + "github.com/filecoin-project/dagstore/shard" +) + +// DoubleCache represents a Cache that looks into multiple caches one by one. +type DoubleCache struct { + first, second Cache +} + +// NewDoubleCache creates a new DoubleCache with the provided caches. +func NewDoubleCache(first, second Cache) *DoubleCache { + return &DoubleCache{ + first: first, + second: second, + } +} + +// Get looks for an item in all the caches one by one and returns the Cache found item. +func (mc *DoubleCache) Get(key shard.Key) (Accessor, error) { + ac, err := mc.first.Get(key) + if err == nil { + return ac, nil + } + return mc.second.Get(key) +} + +// Remove removes an item from all underlying caches +func (mc *DoubleCache) Remove(key shard.Key) error { + err1 := mc.first.Remove(key) + err2 := mc.second.Remove(key) + return errors.Join(err1, err2) +} + +func (mc *DoubleCache) First() Cache { + return mc.first +} + +func (mc *DoubleCache) Second() Cache { + return mc.second +} + +func (mc *DoubleCache) EnableMetrics() error { + if err := mc.first.EnableMetrics(); err != nil { + return err + } + return mc.second.EnableMetrics() +} diff --git a/share/eds/store.go b/share/eds/store.go index e8caf4c35a..14df4a4bee 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -40,7 +40,8 @@ const ( // We don't use transient files right now, so GC is turned off by default. defaultGCInterval = 0 - defaultCacheSize = 128 + defaultRecentBlocksCacheSize = 10 + defaultBlockstoreCacheSize = 128 ) var ErrNotFound = errors.New("eds not found in store") @@ -56,7 +57,7 @@ type Store struct { mounts *mount.Registry bs *blockstore - cache cache.Cache + cache *cache.DoubleCache carIdx index.FullIndexRepo invertedIdx *simpleInvertedIndex @@ -114,11 +115,16 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { return nil, fmt.Errorf("failed to create DAGStore: %w", err) } - accessorCache, err := cache.NewAccessorCache("cache", defaultCacheSize) + recentBlocksCache, err := cache.NewAccessorCache("recent", defaultRecentBlocksCacheSize) if err != nil { return nil, fmt.Errorf("failed to create recent blocks cache: %w", err) } + blockstoreCache, err := cache.NewAccessorCache("blockstore", defaultBlockstoreCacheSize) + if err != nil { + return nil, fmt.Errorf("failed to create blockstore blocks cache: %w", err) + } + store := &Store{ basepath: basepath, dgstr: dagStore, @@ -127,7 +133,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { gcInterval: defaultGCInterval, mounts: r, shardFailures: failureChan, - cache: accessorCache, + cache: cache.NewDoubleCache(recentBlocksCache, blockstoreCache), } store.bs = newBlockstore(store, ds) return store, nil @@ -284,7 +290,7 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext go func() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - _, err := s.cache.GetOrLoad(ctx, result.Key, s.getAccessor) + _, err := s.cache.First().GetOrLoad(ctx, result.Key, s.getAccessor) if err != nil { log.Warnw("unable to put accessor to recent blocks accessors cache", "err", err) return diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 0d5283e2f2..b38a25c827 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -303,7 +303,7 @@ func Test_BlockstoreCache(t *testing.T) { // store eds to the store with noopCache to allow clean cache after put swap := edsStore.cache - edsStore.cache = cache.NoopCache{} + edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) eds, dah := randomEDS(t) err = edsStore.Put(ctx, dah.Hash(), eds) require.NoError(t, err) @@ -388,7 +388,7 @@ func Test_NotCachedAccessor(t *testing.T) { err = edsStore.Start(ctx) require.NoError(t, err) // replace cache with noopCache to - edsStore.cache = cache.NoopCache{} + edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) eds, dah := randomEDS(t) err = edsStore.Put(ctx, dah.Hash(), eds) diff --git a/share/eds/utils.go b/share/eds/utils.go index e7b24a9aee..3417a2aa62 100644 --- a/share/eds/utils.go +++ b/share/eds/utils.go @@ -1,11 +1,10 @@ package eds import ( + "fmt" "io" "github.com/filecoin-project/dagstore" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" "github.com/celestiaorg/celestia-node/share/eds/cache" ) @@ -30,11 +29,16 @@ func newReadCloser(ac cache.Accessor) io.ReadCloser { } } -func newBlockstore(store *Store, ds datastore.Batching) *blockstore { - return &blockstore{ - store: store, - ds: namespace.Wrap(ds, blockstoreCacheKey), +// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor +func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) { + bs, err := ac.Blockstore() + if err != nil { + return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err) } + return &BlockstoreCloser{ + ReadBlockstore: bs, + Closer: ac, + }, nil } func closeAndLog(name string, closer io.Closer) {