diff --git a/share/shwap/p2p/bitswap/block_fetch.go b/share/shwap/p2p/bitswap/block_fetch.go index d741f89c5a..0cdd4ed85b 100644 --- a/share/shwap/p2p/bitswap/block_fetch.go +++ b/share/shwap/p2p/bitswap/block_fetch.go @@ -50,7 +50,6 @@ const maxPerFetch = 1024 // See [Fetch] for detailed description. func fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks ...Block) error { fetcher := getFetcher(ctx, exchg) - cids := make([]cid.Cid, 0, len(blks)) duplicates := make(map[cid.Cid]Block) for _, blk := range blks { @@ -122,14 +121,14 @@ func unmarshal(unmarshalFn UnmarshalFn, data []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("casting cid: %w", err) } - // get ID out of CID validating it + // getBlock ID out of CID validating it id, err := extractCID(cid) if err != nil { return nil, fmt.Errorf("validating cid: %w", err) } if unmarshalFn == nil { - // get registered UnmarshalFn and use it to check data validity and + // getBlock registered UnmarshalFn and use it to check data validity and // pass it to Fetch caller val, ok := unmarshalFns.Load(cid) if !ok { diff --git a/share/shwap/p2p/bitswap/block_registry.go b/share/shwap/p2p/bitswap/block_registry.go index 0b0d7bd1f4..bdf10d0861 100644 --- a/share/shwap/p2p/bitswap/block_registry.go +++ b/share/shwap/p2p/bitswap/block_registry.go @@ -1,6 +1,7 @@ package bitswap import ( + "fmt" "hash" "github.com/ipfs/go-cid" @@ -26,4 +27,8 @@ type blockSpec struct { builder func(cid.Cid) (Block, error) } +func (spec *blockSpec) String() string { + return fmt.Sprintf("BlockSpec{size: %d, codec: %d}", spec.size, spec.codec) +} + var specRegistry = make(map[uint64]blockSpec) diff --git a/share/shwap/p2p/bitswap/block_store.go b/share/shwap/p2p/bitswap/block_store.go index 72994301d5..9a5418054d 100644 --- a/share/shwap/p2p/bitswap/block_store.go +++ b/share/shwap/p2p/bitswap/block_store.go @@ -11,32 +11,36 @@ import ( bitswappb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb" ) +// Accessors abstracts storage system that indexes and manages multiple eds.Accessors by network height. type Accessors interface { + // Get returns an Accessor by its height. Get(ctx context.Context, height uint64) (eds.Accessor, error) } +// Blockstore implements generalized Bitswap compatible storage over Shwap containers +// that operates with Block and accesses data through Accessors. type Blockstore struct { Accessors } -func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { +func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, error) { spec, ok := specRegistry[cid.Prefix().MhType] if !ok { - return nil, fmt.Errorf("unsupported codec") + return nil, fmt.Errorf("unsupported Block type: %v", cid.Prefix().MhType) } blk, err := spec.builder(cid) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to build a Block for %s: %w", spec.String(), err) } eds, err := b.Accessors.Get(ctx, blk.Height()) if err != nil { - return nil, err + return nil, fmt.Errorf("getting EDS Accessor for height %v: %w", blk.Height(), err) } if err = blk.Populate(ctx, eds); err != nil { - return nil, fmt.Errorf("failed to unmarshal Shwap Block: %w", err) + return nil, fmt.Errorf("failed to populate Shwap Block on height %v for %s: %w", blk.Height(), spec.String(), err) } containerData, err := blk.Marshal() @@ -62,6 +66,16 @@ func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) return bitswapBlk, nil } +func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { + blk, err := b.getBlock(ctx, cid) + if err != nil { + log.Errorf("blockstore: getting local block(%s): %s", cid, err) + return nil, err + } + + return blk, nil +} + func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { // TODO(@Wondertan): There must be a way to derive size without reading, proving, serializing and // allocating Sample's block.Block or we could do hashing