Skip to content

Commit

Permalink
cleaning progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed May 29, 2024
1 parent 5d295ca commit 51feefd
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 208 deletions.
82 changes: 30 additions & 52 deletions share/shwap/p2p/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,60 @@ package bitswap
import (
"context"
"fmt"
"hash"
"sync"

"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"
mh "github.com/multiformats/go-multihash"

"github.com/celestiaorg/celestia-node/share"
)

var log = logger.Logger("shwap/bitswap")

// TODO:
// * Synchronization for GetContainers
// * Synchronization for Fetch
// * Test with race and count 100
// * Hasher test
// * Coverage
// * godoc
// * document steps required to add new id/container type

type ID interface {
String() string
CID() cid.Cid
Verifier(root *share.Root) verify
}
// PopulateFn is a closure that validates given bytes and populates
// Blocks with serialized shwap container in those bytes on success.
type PopulateFn func([]byte) error

type verify func(data []byte) error
type Block interface {
blockBuilder

func RegisterID(mhcode, codec uint64, size int, bldrFn func(cid2 cid.Cid) (blockBuilder, error)) {
mh.Register(mhcode, func() hash.Hash {
return &hasher{IDSize: size}
})
specRegistry[mhcode] = idSpec{
size: size,
codec: codec,
builder: bldrFn,
}
// String returns string representation of the Block
// to be used as map key. Might not be human-readable
String() string
// CID returns shwap ID of the Block formatted as CID.
CID() cid.Cid
// IsEmpty reports whether the Block has the shwap container.
// If the Block is empty, it can be populated with Fetch.
IsEmpty() bool
// Populate returns closure that fills up the Block with shwap container.
// Population involves data validation against the Root.
Populate(*share.Root) PopulateFn
}

// GetContainers
// Does not guarantee synchronization. Calling this func simultaneously with the same ID may cause
// issues. TODO: Describe motivation
func GetContainers(ctx context.Context, fetcher exchange.Fetcher, root *share.Root, ids ...ID) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

cids := make([]cid.Cid, len(ids))
for i, id := range ids {
i := i
cids[i] = id.CID()
// Fetch
// Does not guarantee synchronization. Calling this func simultaneously with the same Block may
// cause issues. TODO: Describe motivation
func Fetch(ctx context.Context, fetcher exchange.Fetcher, root *share.Root, ids ...Block) error {
cids := make([]cid.Cid, 0, len(ids))
for _, id := range ids {
if !id.IsEmpty() {
continue
}
cids = append(cids, id.CID())

idStr := id.String()
globalVerifiers.add(idStr, id.Verifier(root))
defer globalVerifiers.release(idStr)
populators.Store(idStr, id.Populate(root))
defer populators.Delete(idStr)
}

// must start getting only after verifiers are registered
Expand All @@ -82,24 +80,4 @@ func GetContainers(ctx context.Context, fetcher exchange.Fetcher, root *share.Ro
return nil
}

var globalVerifiers verifiers

type verifiers struct {
sync.Map
}

func (vs *verifiers) add(key string, v func([]byte) error) {
vs.Store(key, v)
}

func (vs *verifiers) get(key string) func([]byte) error {
v, ok := vs.Load(key)
if !ok {
return nil
}
return v.(func([]byte) error)
}

func (vs *verifiers) release(key string) {
vs.Delete(key)
}
var populators sync.Map
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,10 @@ import (
"encoding"
"fmt"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"

"github.com/celestiaorg/rsmt2d"
)

var specRegistry = make(map[uint64]idSpec)

type idSpec struct {
size int
codec uint64
builder func(cid.Cid) (blockBuilder, error)
}

type blockBuilder interface {
BlockFromEDS(*rsmt2d.ExtendedDataSquare) (blocks.Block, error)
}

// DefaultAllowlist keeps default list of multihashes allowed in the network.
// TODO(@Wondertan): Make it private and instead provide Blockservice constructor with injected
// allowlist
Expand Down
11 changes: 6 additions & 5 deletions share/shwap/p2p/bitswap/hasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@ func (h *hasher) Write(data []byte) (int, error) {
log.Error()
return 0, err
}
// extract ID out of data
// extract Block out of data
// we do this on the raw data to:
// * Avoid complicating hasher with generalized bytes -> type unmarshalling
// * Avoid type allocations
id := data[pbOffset : h.IDSize+pbOffset]
// get registered verifier and use it to check data validity
ver := globalVerifiers.get(string(id))
if ver == nil {
val, ok := populators.Load(string(id))
if !ok {
err := fmt.Errorf("shwap/bitswap hasher: no verifier registered")
log.Error(err)
return 0, err
}
err := ver(data)
populate := val.(PopulateFn)
err := populate(data)
if err != nil {
err = fmt.Errorf("shwap/bitswap hasher: verifying data: %w", err)
log.Error(err)
return 0, err
}
// if correct set the id as resulting sum
// it's required for the sum to match the original ID
// it's required for the sum to match the original Block
// to satisfy hash contract
h.sum = id
return len(data), err
Expand Down
36 changes: 36 additions & 0 deletions share/shwap/p2p/bitswap/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package bitswap

import (
"hash"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"

"github.com/celestiaorg/rsmt2d"
)

// RegisterBlock registers the new Block type.
func RegisterBlock(mhcode, codec uint64, size int, bldrFn func(cid.Cid) (blockBuilder, error)) {
mh.Register(mhcode, func() hash.Hash {
return &hasher{IDSize: size}
})
specRegistry[mhcode] = idSpec{
size: size,
codec: codec,
builder: bldrFn,
}
}

var specRegistry = make(map[uint64]idSpec)

type idSpec struct {
size int
codec uint64
builder func(cid.Cid) (blockBuilder, error)
}

type blockBuilder interface {
// BlockFromEDS gets Bitswap Block out of the EDS.
BlockFromEDS(*rsmt2d.ExtendedDataSquare) (blocks.Block, error)
}
76 changes: 42 additions & 34 deletions share/shwap/p2p/bitswap/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,92 +22,100 @@ const (
)

func init() {
RegisterID(
RegisterBlock(
rowMultihashCode,
rowCodec,
shwap.RowIDSize,
func(cid cid.Cid) (blockBuilder, error) {
return RowIDFromCID(cid)
return EmptyRowBlockFromCID(cid)
},
)
}

type RowID shwap.RowID
type RowBlock struct {
ID shwap.RowID
Container *shwap.Row
}

func NewEmptyRowBlock(height uint64, rowIdx int, root *share.Root) (*RowBlock, error) {
id, err := shwap.NewRowID(height, rowIdx, root)
if err != nil {
return nil, err
}

return &RowBlock{ID: id}, nil
}

// RowIDFromCID coverts CID to RowID.
func RowIDFromCID(cid cid.Cid) (RowID, error) {
// EmptyRowBlockFromCID coverts CID to RowBlock.
func EmptyRowBlockFromCID(cid cid.Cid) (*RowBlock, error) {
ridData, err := extractCID(cid)
if err != nil {
return RowID{}, err
return nil, err
}

rid, err := shwap.RowIDFromBinary(ridData)
if err != nil {
return RowID{}, fmt.Errorf("while unmarhaling RowID: %w", err)
return nil, fmt.Errorf("while unmarhaling RowBlock: %w", err)
}
return RowID(rid), nil
return &RowBlock{ID: rid}, nil
}

func (rid RowID) String() string {
data, err := rid.MarshalBinary()
func (rb *RowBlock) IsEmpty() bool {
return rb.Container == nil
}

func (rb *RowBlock) String() string {
data, err := rb.ID.MarshalBinary()
if err != nil {
panic(fmt.Errorf("marshaling RowID: %w", err))
panic(fmt.Errorf("marshaling RowBlock: %w", err))
}
return string(data)
}

func (rid RowID) MarshalBinary() ([]byte, error) {
return shwap.RowID(rid).MarshalBinary()
}

func (rid RowID) CID() cid.Cid {
return encodeCID(rid, rowMultihashCode, rowCodec)
func (rb *RowBlock) CID() cid.Cid {
return encodeCID(rb.ID, rowMultihashCode, rowCodec)
}

func (rid RowID) BlockFromEDS(eds *rsmt2d.ExtendedDataSquare) (blocks.Block, error) {
row := shwap.RowFromEDS(eds, rid.RowIndex, shwap.Left)
func (rb *RowBlock) BlockFromEDS(eds *rsmt2d.ExtendedDataSquare) (blocks.Block, error) {
row := shwap.RowFromEDS(eds, rb.ID.RowIndex, shwap.Left)

dataID, err := rid.MarshalBinary()
rowID, err := rb.ID.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("marshaling RowID: %w", err)
return nil, fmt.Errorf("marshaling RowBlock: %w", err)
}

rowBlk := bitswappb.RowBlock{
RowId: dataID,
RowId: rowID,
Row: row.ToProto(),
}

dataBlk, err := rowBlk.Marshal()
blkData, err := rowBlk.Marshal()
if err != nil {
return nil, fmt.Errorf("marshaling RowBlock: %w", err)
}

blk, err := blocks.NewBlockWithCid(dataBlk, rid.CID())
blk, err := blocks.NewBlockWithCid(blkData, rb.CID())
if err != nil {
return nil, fmt.Errorf("assembling block: %w", err)
}

return blk, nil
}

type RowBlock struct {
RowID
Row shwap.Row
}

func (r *RowBlock) Verifier(root *share.Root) verify {
func (rb *RowBlock) Populate(root *share.Root) PopulateFn {
return func(data []byte) error {
var rowBlk bitswappb.RowBlock
if err := rowBlk.Unmarshal(data); err != nil {
return fmt.Errorf("unmarshaling RowBlock: %w", err)
}

r.Row = shwap.RowFromProto(rowBlk.Row)
if err := r.Row.Validate(root, r.RowID.RowIndex); err != nil {
fmt.Errorf("validating Row: %w", err)
cntr := shwap.RowFromProto(rowBlk.Row)
if err := cntr.Validate(root, rb.ID.RowIndex); err != nil {
return fmt.Errorf("validating Row: %w", err)
}
rb.Container = &cntr

// NOTE: We don't have to validate ID in the RowBlock, as it's implicitly verified by string
// NOTE: We don't have to validate Block in the RowBlock, as it's implicitly verified by string
// equality of globalVerifiers entry key(requesting side) and hasher accessing the entry(response
// verification)
return nil
Expand Down
Loading

0 comments on commit 51feefd

Please sign in to comment.