diff --git a/das/daser_test.go b/das/daser_test.go index 458c99e51d..7a1e0007d2 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -6,9 +6,9 @@ import ( "testing" "time" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" - format "github.com/ipfs/go-ipld-format" mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,10 +23,10 @@ var timeout = time.Second * 15 // the DASer checkpoint is updated to network head. func TestDASerLifecycle(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - dag := mdutils.Mock() + bServ := mdutils.Bserv() // 15 headers from the past and 15 future headers - mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15) + mockGet, shareServ, sub := createDASerSubcomponents(t, bServ, 15, 15) ctx, cancel := context.WithTimeout(context.Background(), timeout) t.Cleanup(cancel) @@ -61,10 +61,10 @@ func TestDASerLifecycle(t *testing.T) { func TestDASer_Restart(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - dag := mdutils.Mock() + bServ := mdutils.Bserv() // 15 headers from the past and 15 future headers - mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15) + mockGet, shareServ, sub := createDASerSubcomponents(t, bServ, 15, 15) ctx, cancel := context.WithTimeout(context.Background(), timeout) t.Cleanup(cancel) @@ -85,10 +85,10 @@ func TestDASer_Restart(t *testing.T) { require.NoError(t, err) // reset mockGet, generate 15 "past" headers, building off chain head which is 30 - mockGet.generateHeaders(t, dag, 30, 45) + mockGet.generateHeaders(t, bServ, 30, 45) mockGet.doneCh = make(chan struct{}) // reset dummy subscriber - mockGet.fillSubWithHeaders(t, sub, dag, 45, 60) + mockGet.fillSubWithHeaders(t, sub, bServ, 45, 60) // manually set mockGet head to trigger stop at 45 mockGet.head = int64(45) @@ -124,9 +124,9 @@ func TestDASer_Restart(t *testing.T) { func TestDASer_catchUp(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - dag := mdutils.Mock() + bServ := mdutils.Bserv() - mockGet, shareServ, _ := createDASerSubcomponents(t, dag, 5, 0) + mockGet, shareServ, _ := createDASerSubcomponents(t, bServ, 5, 0) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -165,9 +165,9 @@ func TestDASer_catchUp(t *testing.T) { // difference of 1 func TestDASer_catchUp_oneHeader(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - dag := mdutils.Mock() + bServ := mdutils.Bserv() - mockGet, shareServ, _ := createDASerSubcomponents(t, dag, 6, 0) + mockGet, shareServ, _ := createDASerSubcomponents(t, bServ, 6, 0) daser := NewDASer(shareServ, nil, mockGet, ds) // store checkpoint @@ -209,7 +209,7 @@ func TestDASer_catchUp_oneHeader(t *testing.T) { func TestDASer_catchUp_fails(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - dag := mdutils.Mock() + dag := mdutils.Bserv() mockGet, _, _ := createDASerSubcomponents(t, dag, 6, 0) daser := NewDASer(share.NewBrokenAvailability(), nil, mockGet, ds) @@ -256,21 +256,21 @@ func TestDASer_catchUp_fails(t *testing.T) { // mockGetter, share.Service, and mock header.Subscriber. func createDASerSubcomponents( t *testing.T, - dag format.DAGService, + bServ blockservice.BlockService, numGetter, numSub int, ) (*mockGetter, *share.Service, *header.DummySubscriber) { - shareServ := share.NewService(dag, share.NewLightAvailability(dag)) + shareServ := share.NewService(bServ, share.NewLightAvailability(bServ)) mockGet := &mockGetter{ headers: make(map[int64]*header.ExtendedHeader), doneCh: make(chan struct{}), } - mockGet.generateHeaders(t, dag, 0, numGetter) + mockGet.generateHeaders(t, bServ, 0, numGetter) sub := new(header.DummySubscriber) - mockGet.fillSubWithHeaders(t, sub, dag, numGetter, numGetter+numSub) + mockGet.fillSubWithHeaders(t, sub, bServ, numGetter, numGetter+numSub) return mockGet, shareServ, sub } @@ -279,7 +279,7 @@ func createDASerSubcomponents( func (m *mockGetter) fillSubWithHeaders( t *testing.T, sub *header.DummySubscriber, - dag format.DAGService, + bServ blockservice.BlockService, startHeight, endHeight int, ) { @@ -287,7 +287,7 @@ func (m *mockGetter) fillSubWithHeaders( index := 0 for i := startHeight; i < endHeight; i++ { - dah := share.RandFillDAG(t, 16, dag) + dah := share.RandFillDAG(t, 16, bServ) randHeader := header.RandExtendedHeader(t) randHeader.DataHash = dah.Hash() @@ -309,9 +309,9 @@ type mockGetter struct { headers map[int64]*header.ExtendedHeader } -func (m *mockGetter) generateHeaders(t *testing.T, dag format.DAGService, startHeight, endHeight int) { +func (m *mockGetter) generateHeaders(t *testing.T, bServ blockservice.BlockService, startHeight, endHeight int) { for i := startHeight; i < endHeight; i++ { - dah := share.RandFillDAG(t, 16, dag) + dah := share.RandFillDAG(t, 16, bServ) randHeader := header.RandExtendedHeader(t) randHeader.DataHash = dah.Hash() diff --git a/fraud/bad_encoding_test.go b/fraud/bad_encoding_test.go index 460438827b..ab8dcf95d1 100644 --- a/fraud/bad_encoding_test.go +++ b/fraud/bad_encoding_test.go @@ -14,15 +14,15 @@ import ( ) func TestFraudProofValidation(t *testing.T) { - dag := mdutils.Mock() + bServ := mdutils.Bserv() eds := ipld.RandEDS(t, 2) shares := ipld.ExtractEDS(eds) copy(shares[3][8:], shares[4][8:]) - eds, err := ipld.ImportShares(context.Background(), shares, dag) + eds, err := ipld.ImportShares(context.Background(), shares, bServ) require.NoError(t, err) da := da.NewDataAvailabilityHeader(eds) - r := ipld.NewRetriever(dag) + r := ipld.NewRetriever(bServ) _, err = r.Retrieve(context.Background(), &da) var errByz *ipld.ErrByzantine require.True(t, errors.As(err, &errByz)) diff --git a/header/core/exchange.go b/header/core/exchange.go index c86ee8f8d6..d7811d2193 100644 --- a/header/core/exchange.go +++ b/header/core/exchange.go @@ -5,7 +5,7 @@ import ( "context" "fmt" - format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-blockservice" logging "github.com/ipfs/go-log/v2" tmbytes "github.com/tendermint/tendermint/libs/bytes" @@ -18,14 +18,14 @@ var log = logging.Logger("header/core") type Exchange struct { fetcher *core.BlockFetcher - shareStore format.DAGService + shareStore blockservice.BlockService construct header.ConstructFn } -func NewExchange(fetcher *core.BlockFetcher, dag format.DAGService, construct header.ConstructFn) *Exchange { +func NewExchange(fetcher *core.BlockFetcher, bServ blockservice.BlockService, construct header.ConstructFn) *Exchange { return &Exchange{ fetcher: fetcher, - shareStore: dag, + shareStore: bServ, construct: construct, } } diff --git a/header/core/exchange_test.go b/header/core/exchange_test.go index 495da86b71..d1d3fe52a6 100644 --- a/header/core/exchange_test.go +++ b/header/core/exchange_test.go @@ -18,7 +18,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { t.Cleanup(cancel) fetcher := createCoreFetcher(ctx, t) - store := mdutils.Mock() + store := mdutils.Bserv() // generate 10 blocks generateBlocks(t, fetcher) diff --git a/header/core/listener.go b/header/core/listener.go index c5096e1010..ee11021580 100644 --- a/header/core/listener.go +++ b/header/core/listener.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-blockservice" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/tendermint/tendermint/types" @@ -22,7 +22,7 @@ import ( type Listener struct { bcast header.Broadcaster fetcher *core.BlockFetcher - dag format.DAGService + bServ blockservice.BlockService construct header.ConstructFn cancel context.CancelFunc } @@ -30,13 +30,13 @@ type Listener struct { func NewListener( bcast header.Broadcaster, fetcher *core.BlockFetcher, - dag format.DAGService, + bServ blockservice.BlockService, construct header.ConstructFn, ) *Listener { return &Listener{ bcast: bcast, fetcher: fetcher, - dag: dag, + bServ: bServ, construct: construct, } } @@ -89,7 +89,7 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan *types.Block) { return } - eh, err := cl.construct(ctx, b, comm, vals, cl.dag) + eh, err := cl.construct(ctx, b, comm, vals, cl.bServ) if err != nil { log.Errorw("listener: making extended header", "err", err) return diff --git a/header/core/listener_test.go b/header/core/listener_test.go index 2ac9365f6f..c4c9177260 100644 --- a/header/core/listener_test.go +++ b/header/core/listener_test.go @@ -102,5 +102,5 @@ func createListener( require.NoError(t, err) }) - return NewListener(p2pSub, fetcher, mdutils.Mock(), header.MakeExtendedHeader) + return NewListener(p2pSub, fetcher, mdutils.Bserv(), header.MakeExtendedHeader) } diff --git a/header/header.go b/header/header.go index d095919887..c0240caf29 100644 --- a/header/header.go +++ b/header/header.go @@ -5,9 +5,9 @@ import ( "context" "fmt" + "github.com/ipfs/go-blockservice" logging "github.com/ipfs/go-log/v2" - format "github.com/ipfs/go-ipld-format" bts "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/pkg/da" core "github.com/tendermint/tendermint/types" @@ -43,7 +43,7 @@ func MakeExtendedHeader( b *core.Block, comm *core.Commit, vals *core.ValidatorSet, - dag format.NodeAdder, + bServ blockservice.BlockService, ) (*ExtendedHeader, error) { var dah DataAvailabilityHeader if len(b.Txs) > 0 { @@ -51,7 +51,7 @@ func MakeExtendedHeader( if err != nil { return nil, err } - extended, err := ipld.AddShares(ctx, namespacedShares.RawShares(), dag) + extended, err := ipld.AddShares(ctx, namespacedShares.RawShares(), bServ) if err != nil { return nil, err } diff --git a/header/header_test.go b/header/header_test.go index f10a331140..419f6e93f9 100644 --- a/header/header_test.go +++ b/header/header_test.go @@ -18,7 +18,7 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) { _, client := core.StartTestClient(ctx, t) fetcher := core.NewBlockFetcher(client) - store := mdutils.Mock() + store := mdutils.Bserv() sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) diff --git a/header/interface.go b/header/interface.go index b7b150e797..faf67dbd7f 100644 --- a/header/interface.go +++ b/header/interface.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" - format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-blockservice" pubsub "github.com/libp2p/go-libp2p-pubsub" tmbytes "github.com/tendermint/tendermint/libs/bytes" core "github.com/tendermint/tendermint/types" @@ -17,7 +17,7 @@ type ConstructFn = func( *core.Block, *core.Commit, *core.ValidatorSet, - format.NodeAdder, + blockservice.BlockService, ) (*ExtendedHeader, error) // Validator aliases a func that validates ExtendedHeader. diff --git a/ipld/add.go b/ipld/add.go index 2147df8d88..6b768cb3a0 100644 --- a/ipld/add.go +++ b/ipld/add.go @@ -5,6 +5,7 @@ import ( "fmt" "math" + "github.com/ipfs/go-blockservice" ipld "github.com/ipfs/go-ipld-format" "github.com/celestiaorg/nmt" @@ -13,15 +14,19 @@ import ( "github.com/tendermint/tendermint/pkg/wrapper" ) -// AddShares erasures and extends shares to IPLD DAG using the provided ipld.NodeAdder. -func AddShares(ctx context.Context, shares []Share, adder ipld.NodeAdder) (*rsmt2d.ExtendedDataSquare, error) { +// AddShares erasures and extends shares to blockservice.BlockService using the provided ipld.NodeAdder. +func AddShares( + ctx context.Context, + shares []Share, + adder blockservice.BlockService, +) (*rsmt2d.ExtendedDataSquare, error) { if len(shares) == 0 { return nil, fmt.Errorf("empty data") // empty block is not an empty Data } squareSize := int(math.Sqrt(float64(len(shares)))) // create nmt adder wrapping batch adder with calculated size bs := batchSize(squareSize * 2) - batchAdder := NewNmtNodeAdder(ctx, ipld.NewBatch(ctx, adder, ipld.MaxSizeBatchOption(bs))) + batchAdder := NewNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs)) // create the nmt wrapper to generate row and col commitments tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize), nmt.NodeVisitor(batchAdder.Visit)) // recompute the eds @@ -35,15 +40,18 @@ func AddShares(ctx context.Context, shares []Share, adder ipld.NodeAdder) (*rsmt return eds, batchAdder.Commit() } -// ImportShares imports flattend chunks of data into Extended Data square and saves it in IPLD DAG -func ImportShares(ctx context.Context, shares [][]byte, na ipld.NodeAdder) (*rsmt2d.ExtendedDataSquare, error) { +// ImportShares imports flattend chunks of data into Extended Data square and saves it in blockservice.BlockService +func ImportShares( + ctx context.Context, + shares [][]byte, + adder blockservice.BlockService) (*rsmt2d.ExtendedDataSquare, error) { if len(shares) == 0 { return nil, fmt.Errorf("ipld: importing empty data") } squareSize := int(math.Sqrt(float64(len(shares)))) // create nmt adder wrapping batch adder with calculated size bs := batchSize(squareSize * 2) - batchAdder := NewNmtNodeAdder(ctx, ipld.NewBatch(ctx, na, ipld.MaxSizeBatchOption(bs))) + batchAdder := NewNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs)) // create the nmt wrapper to generate row and col commitments tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize/2), nmt.NodeVisitor(batchAdder.Visit)) // recompute the eds diff --git a/ipld/get.go b/ipld/get.go index beb37d3afb..6e00ca5f32 100644 --- a/ipld/get.go +++ b/ipld/get.go @@ -3,6 +3,7 @@ package ipld import ( "context" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -15,12 +16,12 @@ import ( // GetShare fetches and returns the data for leaf `leafIndex` of root `rootCid`. func GetShare( ctx context.Context, - dag ipld.NodeGetter, + bGetter blockservice.BlockGetter, rootCid cid.Cid, leafIndex int, totalLeafs int, // this corresponds to the extended square width ) (Share, error) { - nd, err := GetLeaf(ctx, dag, rootCid, leafIndex, totalLeafs) + nd, err := GetLeaf(ctx, bGetter, rootCid, leafIndex, totalLeafs) if err != nil { return nil, err } @@ -30,9 +31,9 @@ func GetShare( // GetLeaf fetches and returns the raw leaf. // It walks down the IPLD NMT tree until it finds the requested one. -func GetLeaf(ctx context.Context, dag ipld.NodeGetter, root cid.Cid, leaf, total int) (ipld.Node, error) { +func GetLeaf(ctx context.Context, bGetter blockservice.BlockGetter, root cid.Cid, leaf, total int) (ipld.Node, error) { // request the node - nd, err := dag.Get(ctx, root) + nd, err := plugin.GetNode(ctx, bGetter, root) if err != nil { return nil, err } @@ -41,7 +42,7 @@ func GetLeaf(ctx context.Context, dag ipld.NodeGetter, root cid.Cid, leaf, total lnks := nd.Links() if len(lnks) == 1 { // in case there is only one we reached tree's bottom, so finally request the leaf. - return dag.Get(ctx, lnks[0].Cid) + return plugin.GetNode(ctx, bGetter, lnks[0].Cid) } // route walk to appropriate children @@ -53,14 +54,14 @@ func GetLeaf(ctx context.Context, dag ipld.NodeGetter, root cid.Cid, leaf, total } // recursively walk down through selected children - return GetLeaf(ctx, dag, root, leaf, total) + return GetLeaf(ctx, bGetter, root, leaf, total) } // GetProofsForShares fetches Merkle proofs for the given shares // and returns the result as an array of ShareWithProof. func GetProofsForShares( ctx context.Context, - dag ipld.NodeGetter, + bGetter blockservice.BlockGetter, root cid.Cid, shares [][]byte, ) ([]*ShareWithProof, error) { @@ -70,11 +71,11 @@ func GetProofsForShares( proof := make([]cid.Cid, 0) // TODO(@vgonkivs): Combine GetLeafData and GetProof in one function as the are traversing the same tree. // Add options that will control what data will be fetched. - s, err := GetLeaf(ctx, dag, root, index, len(shares)) + s, err := GetLeaf(ctx, bGetter, root, index, len(shares)) if err != nil { return nil, err } - proof, err = GetProof(ctx, dag, root, proof, index, len(shares)) + proof, err = GetProof(ctx, bGetter, root, proof, index, len(shares)) if err != nil { return nil, err } @@ -89,13 +90,13 @@ func GetProofsForShares( // It walks down the IPLD NMT tree until it reaches the leaf and returns collected proof func GetProof( ctx context.Context, - dag ipld.NodeGetter, + bGetter blockservice.BlockGetter, root cid.Cid, proof []cid.Cid, leaf, total int, ) ([]cid.Cid, error) { // request the node - nd, err := dag.Get(ctx, root) + nd, err := plugin.GetNode(ctx, bGetter, root) if err != nil { return nil, err } @@ -114,7 +115,7 @@ func GetProof( proof = append(proof, lnks[1].Cid) } else { root, leaf = lnks[1].Cid, leaf-total // otherwise go down the second - proof, err = GetProof(ctx, dag, root, proof, leaf, total) + proof, err = GetProof(ctx, bGetter, root, proof, leaf, total) if err != nil { return nil, err } @@ -122,18 +123,18 @@ func GetProof( } // recursively walk down through selected children - return GetProof(ctx, dag, root, proof, leaf, total) + return GetProof(ctx, bGetter, root, proof, leaf, total) } // GetSharesByNamespace returns all the shares from the given root // with the given namespace.ID. func GetSharesByNamespace( ctx context.Context, - dag ipld.NodeGetter, + bGetter blockservice.BlockGetter, root cid.Cid, nID namespace.ID, ) ([]Share, error) { - leaves, err := GetLeavesByNamespace(ctx, dag, root, nID) + leaves, err := GetLeavesByNamespace(ctx, bGetter, root, nID) if err != nil { return nil, err } @@ -150,7 +151,7 @@ func GetSharesByNamespace( // If nothing is found it returns both data and err as nil. func GetLeavesByNamespace( ctx context.Context, - dag ipld.NodeGetter, + bGetter blockservice.BlockGetter, root cid.Cid, nID namespace.ID, ) ([]ipld.Node, error) { @@ -163,7 +164,7 @@ func GetLeavesByNamespace( return nil, nil } // request the node - nd, err := dag.Get(ctx, root) + nd, err := plugin.GetNode(ctx, bGetter, root) if err != nil { return nil, err } @@ -176,7 +177,7 @@ func GetLeavesByNamespace( // if there are some links, then traverse them var out []ipld.Node for _, lnk := range nd.Links() { - nds, err := GetLeavesByNamespace(ctx, dag, lnk.Cid, nID) + nds, err := GetLeavesByNamespace(ctx, bGetter, lnk.Cid, nID) if err != nil { return out, err } diff --git a/ipld/get_shares.go b/ipld/get_shares.go index dc4a04e817..8b4b6cd1d3 100644 --- a/ipld/get_shares.go +++ b/ipld/get_shares.go @@ -5,8 +5,10 @@ import ( "sync" "github.com/gammazero/workerpool" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" + + "github.com/celestiaorg/celestia-node/ipld/plugin" ) // NumWorkersLimit sets global limit for workers spawned by GetShares. @@ -47,7 +49,7 @@ var pool = workerpool.New(NumWorkersLimit) // tree, so it's not suitable for anything else besides that. Parts on the // implementation that rely on this property are explicitly tagged with // (bin-tree-feat). -func GetShares(ctx context.Context, dag format.NodeGetter, root cid.Cid, shares int, put func(int, Share)) { +func GetShares(ctx context.Context, bGetter blockservice.BlockGetter, root cid.Cid, shares int, put func(int, Share)) { // job is not used anywhere else, so can be kept here type job struct { id cid.Cid @@ -70,7 +72,7 @@ func GetShares(ctx context.Context, dag format.NodeGetter, root cid.Cid, shares // processing of each other pool.Submit(func() { defer wg.Done() - nd, err := dag.Get(ctx, j.id) + nd, err := plugin.GetNode(ctx, bGetter, j.id) if err != nil { // we don't really care about errors here // just fetch as much as possible @@ -81,7 +83,7 @@ func GetShares(ctx context.Context, dag format.NodeGetter, root cid.Cid, shares if len(lnks) == 1 { // so we are almost there // the reason why the comment on 'total' is lying, as each // leaf has its own additional leaf(hack) so get it - nd, err := dag.Get(ctx, lnks[0].Cid) + nd, err := plugin.GetNode(ctx, bGetter, lnks[0].Cid) if err != nil { // again, we don't care return diff --git a/ipld/get_test.go b/ipld/get_test.go index f4d08cd3ff..bb9f22a228 100644 --- a/ipld/get_test.go +++ b/ipld/get_test.go @@ -15,7 +15,6 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" offline "github.com/ipfs/go-ipfs-exchange-offline" format "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -34,17 +33,17 @@ func TestGetShare(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - dag := mdutils.Mock() + bServ := mdutils.Bserv() // generate random shares for the nmt shares := RandShares(t, size*size) - eds, err := AddShares(ctx, shares, dag) + eds, err := AddShares(ctx, shares, bServ) require.NoError(t, err) for i, leaf := range shares { row := i / size pos := i - (size * row) - share, err := GetShare(ctx, dag, plugin.MustCidFromNamespacedSha256(eds.RowRoots()[row]), pos, size*2) + share, err := GetShare(ctx, bServ, plugin.MustCidFromNamespacedSha256(eds.RowRoots()[row]), pos, size*2) require.NoError(t, err) assert.Equal(t, leaf, share) } @@ -149,7 +148,7 @@ func removeRandShares(data [][]byte, d int) [][]byte { func TestGetSharesByNamespace(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dag := mdutils.Mock() + bServ := mdutils.Bserv() var tests = []struct { rawData []Share @@ -167,13 +166,13 @@ func TestGetSharesByNamespace(t *testing.T) { // change rawData to contain several shares with same nID tt.rawData[(len(tt.rawData)/2)+1] = expected - // put raw data in DAG - eds, err := AddShares(ctx, tt.rawData, dag) + // put raw data in BlockService + eds, err := AddShares(ctx, tt.rawData, bServ) require.NoError(t, err) for _, row := range eds.RowRoots() { rcid := plugin.MustCidFromNamespacedSha256(row) - shares, err := GetSharesByNamespace(ctx, dag, rcid, nID) + shares, err := GetSharesByNamespace(ctx, bServ, rcid, nID) require.NoError(t, err) for _, share := range shares { @@ -187,7 +186,7 @@ func TestGetSharesByNamespace(t *testing.T) { func TestGetLeavesByNamespace_AbsentNamespaceId(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dag := mdutils.Mock() + bServ := mdutils.Bserv() shares := RandShares(t, 16) @@ -228,9 +227,9 @@ func TestGetLeavesByNamespace_AbsentNamespaceId(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - eds, err := AddShares(ctx, shares, dag) + eds, err := AddShares(ctx, shares, bServ) require.NoError(t, err) - assertNoRowContainsNID(t, dag, eds, tt.missingNid) + assertNoRowContainsNID(t, bServ, eds, tt.missingNid) }) } } @@ -238,7 +237,7 @@ func TestGetLeavesByNamespace_AbsentNamespaceId(t *testing.T) { func TestGetLeavesByNamespace_MultipleRowsContainingSameNamespaceId(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dag := mdutils.Mock() + bServ := mdutils.Bserv() shares := RandShares(t, 16) @@ -254,12 +253,12 @@ func TestGetLeavesByNamespace_MultipleRowsContainingSameNamespaceId(t *testing.T copy(nspace, commonNamespaceData) } - eds, err := AddShares(ctx, shares, dag) + eds, err := AddShares(ctx, shares, bServ) require.NoError(t, err) for _, row := range eds.RowRoots() { rcid := plugin.MustCidFromNamespacedSha256(row) - nodes, err := GetLeavesByNamespace(ctx, dag, rcid, nid) + nodes, err := GetLeavesByNamespace(ctx, bServ, rcid, nid) require.NoError(t, err) for _, node := range nodes { @@ -289,10 +288,9 @@ func TestBatchSize(t *testing.T) { defer cancel() bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) eds := RandEDS(t, tt.origWidth) - _, err := AddShares(ctx, ExtractODS(eds), dag) + _, err := AddShares(ctx, ExtractODS(eds), blockservice.New(bs, offline.Exchange(bs))) require.NoError(t, err) out, err := bs.AllKeysChan(ctx) @@ -310,7 +308,7 @@ func TestBatchSize(t *testing.T) { func assertNoRowContainsNID( t *testing.T, - dag format.DAGService, + bServ blockservice.BlockService, eds *rsmt2d.ExtendedDataSquare, nID namespace.ID, ) { @@ -322,7 +320,7 @@ func assertNoRowContainsNID( // for each row root cid check if the minNID exists for _, rowCID := range rowRootCIDs { - data, err := GetLeavesByNamespace(context.Background(), dag, rowCID, nID) + data, err := GetLeavesByNamespace(context.Background(), bServ, rowCID, nID) assert.Nil(t, data) assert.Nil(t, err) } @@ -333,10 +331,10 @@ func TestGetProof(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() - dag := mdutils.Mock() + bServ := mdutils.Bserv() shares := RandShares(t, width*width) - in, err := AddShares(ctx, shares, dag) + in, err := AddShares(ctx, shares, bServ) require.NoError(t, err) dah := da.NewDataAvailabilityHeader(in) @@ -353,9 +351,9 @@ func TestGetProof(t *testing.T) { rootCid := plugin.MustCidFromNamespacedSha256(root) for index := 0; uint(index) < in.Width(); index++ { proof := make([]cid.Cid, 0) - proof, err = GetProof(ctx, dag, rootCid, proof, index, int(in.Width())) + proof, err = GetProof(ctx, bServ, rootCid, proof, index, int(in.Width())) require.NoError(t, err) - node, err := GetLeaf(ctx, dag, rootCid, index, int(in.Width())) + node, err := GetLeaf(ctx, bServ, rootCid, index, int(in.Width())) require.NoError(t, err) inclusion := NewShareWithProof(index, node.RawData()[1:], proof) require.True(t, inclusion.Validate(rootCid)) @@ -369,10 +367,10 @@ func TestGetProofs(t *testing.T) { const width = 4 ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() - dag := mdutils.Mock() + bServ := mdutils.Bserv() shares := RandShares(t, width*width) - in, err := AddShares(ctx, shares, dag) + in, err := AddShares(ctx, shares, bServ) require.NoError(t, err) dah := da.NewDataAvailabilityHeader(in) @@ -380,12 +378,12 @@ func TestGetProofs(t *testing.T) { rootCid := plugin.MustCidFromNamespacedSha256(root) data := make([][]byte, 0, in.Width()) for index := 0; uint(index) < in.Width(); index++ { - node, err := GetLeaf(ctx, dag, rootCid, index, int(in.Width())) + node, err := GetLeaf(ctx, bServ, rootCid, index, int(in.Width())) require.NoError(t, err) data = append(data, node.RawData()[9:]) } - proves, err := GetProofsForShares(ctx, dag, rootCid, data) + proves, err := GetProofsForShares(ctx, bServ, rootCid, data) require.NoError(t, err) for _, proof := range proves { require.True(t, proof.Validate(rootCid)) @@ -398,7 +396,7 @@ func TestRetrieveDataFailedWithByzantineError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - dag := mdutils.Mock() + bServ := mdutils.Bserv() eds := RandEDS(t, width) shares := ExtractEDS(eds) @@ -406,7 +404,11 @@ func TestRetrieveDataFailedWithByzantineError(t *testing.T) { copy(shares[14][8:], shares[15][8:]) // import corrupted eds - batchAdder := NewNmtNodeAdder(ctx, format.NewBatch(ctx, dag, format.MaxSizeBatchOption(batchSize(width*2)))) + batchAdder := NewNmtNodeAdder( + ctx, + bServ, + format.MaxSizeBatchOption(batchSize(width*2)), + ) tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(width), nmt.NodeVisitor(batchAdder.Visit)) attackerEDS, err := rsmt2d.ImportExtendedDataSquare(shares, DefaultRSMT2DCodec(), tree.Constructor) require.NoError(t, err) @@ -415,7 +417,7 @@ func TestRetrieveDataFailedWithByzantineError(t *testing.T) { // ensure we rcv an error da := da.NewDataAvailabilityHeader(attackerEDS) - r := NewRetriever(dag) + r := NewRetriever(bServ) _, err = r.Retrieve(ctx, &da) var errByz *ErrByzantine require.ErrorAs(t, err, &errByz) diff --git a/ipld/nmt_adder.go b/ipld/nmt_adder.go index 28ab49facd..069ad8c182 100644 --- a/ipld/nmt_adder.go +++ b/ipld/nmt_adder.go @@ -3,6 +3,9 @@ package ipld import ( "context" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-merkledag" + "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -21,9 +24,9 @@ type NmtNodeAdder struct { // NewNmtNodeAdder returns a new NmtNodeAdder with the provided context and // batch. Note that the context provided should have a timeout // It is not thread-safe. -func NewNmtNodeAdder(ctx context.Context, add ipld.NodeAdder) *NmtNodeAdder { +func NewNmtNodeAdder(ctx context.Context, bs blockservice.BlockService, opts ...ipld.BatchOption) *NmtNodeAdder { return &NmtNodeAdder{ - add: add, + add: ipld.NewBatch(ctx, merkledag.NewDAGService(bs), opts...), ctx: ctx, leaves: cid.NewSet(), } diff --git a/ipld/plugin/nmt.go b/ipld/plugin/nmt.go index d651f64a20..600f2a7e52 100644 --- a/ipld/plugin/nmt.go +++ b/ipld/plugin/nmt.go @@ -3,6 +3,7 @@ package plugin import ( "bufio" "bytes" + "context" "crypto/sha256" "errors" "fmt" @@ -10,6 +11,7 @@ import ( "io" blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" mh "github.com/multiformats/go-multihash" @@ -51,8 +53,6 @@ func init() { return NewNamespaceHasher(nmt.NewNmtHasher(sha256.New(), nmt.DefaultNamespaceIDLen, true)) }, ) - // this should already happen when the plugin is injected but it doesn't for some CI tests - ipld.DefaultBlockDecoder.Register(NmtCodec, NmtNodeParser) // register the codecs in the global maps cid.Codecs[NmtCodecName] = NmtCodec cid.CodecToStr[NmtCodec] = NmtCodecName @@ -184,7 +184,16 @@ func prependNode(newNode ipld.Node, nodes []ipld.Node) []ipld.Node { return prepended } -func NmtNodeParser(block blocks.Block) (ipld.Node, error) { +func GetNode(ctx context.Context, bGetter blockservice.BlockGetter, root cid.Cid) (ipld.Node, error) { + block, err := bGetter.GetBlock(ctx, root) + if err != nil { + return nil, err + } + + return decodeBlock(block) +} + +func decodeBlock(block blocks.Block) (ipld.Node, error) { // length of the domain separator for leaf and inner nodes: const prefixOffset = 1 var ( diff --git a/ipld/retriever.go b/ipld/retriever.go index a8de671552..4dbbbdc32d 100644 --- a/ipld/retriever.go +++ b/ipld/retriever.go @@ -7,13 +7,14 @@ import ( "sync" "time" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" - "github.com/ipfs/go-merkledag" "github.com/tendermint/tendermint/pkg/da" "github.com/tendermint/tendermint/pkg/wrapper" + "github.com/celestiaorg/celestia-node/ipld/plugin" "github.com/celestiaorg/nmt" "github.com/celestiaorg/rsmt2d" ) @@ -36,12 +37,12 @@ var RetrieveQuadrantTimeout = time.Minute * 5 // Retriever randomly picks one of the data square quadrants and tries to request them one by one until it is able to // reconstruct the whole square. type Retriever struct { - dag format.DAGService + bServ blockservice.BlockService } // NewRetriever creates a new instance of the Retriever over IPLD Service and rmst2d.Codec -func NewRetriever(dag format.DAGService) *Retriever { - return &Retriever{dag: dag} +func NewRetriever(bServ blockservice.BlockService) *Retriever { + return &Retriever{bServ: bServ} } // Retrieve retrieves all the data committed to DataAvailabilityHeader. @@ -63,7 +64,7 @@ func (r *Retriever) Retrieve(ctx context.Context, dah *da.DataAvailabilityHeader var errByz *rsmt2d.ErrByzantineData if errors.As(err, &errByz) { - return nil, NewErrByzantine(ctx, r.dag, dah, errByz) + return nil, NewErrByzantine(ctx, r.bServ, dah, errByz) } log.Warnw("not enough shares to reconstruct data square, requesting more...", "err", err) // retry quadrants until we can reconstruct the EDS or error out @@ -73,8 +74,8 @@ func (r *Retriever) Retrieve(ctx context.Context, dah *da.DataAvailabilityHeader } type retrieverSession struct { - dag format.NodeGetter - adder *NmtNodeAdder + session blockservice.BlockGetter + adder *NmtNodeAdder treeFn rsmt2d.TreeConstructorFn codec rsmt2d.Codec @@ -87,10 +88,14 @@ type retrieverSession struct { func (r *Retriever) newSession(ctx context.Context, dah *da.DataAvailabilityHeader) (*retrieverSession, error) { size := len(dah.RowsRoots) - adder := NewNmtNodeAdder(ctx, format.NewBatch(ctx, r.dag, format.MaxSizeBatchOption(batchSize(size)))) + adder := NewNmtNodeAdder( + ctx, + r.bServ, + format.MaxSizeBatchOption(batchSize(size)), + ) ses := &retrieverSession{ - dag: merkledag.NewSession(ctx, r.dag), - adder: adder, + session: blockservice.NewSession(ctx, r.bServ), + adder: adder, treeFn: func() rsmt2d.Tree { tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(size)/2, nmt.NodeVisitor(adder.Visit)) return &tree @@ -148,14 +153,14 @@ func (rs *retrieverSession) request(ctx context.Context, q *quadrant) { go func(i int, root cid.Cid) { defer wg.Done() // get the root node - nd, err := rs.dag.Get(ctx, root) + nd, err := plugin.GetNode(ctx, rs.session, root) if err != nil { return } // and go get shares of left or the right side of the whole col/row axis // the left or the right side of the tree represent some portion of the quadrant // which we put into the rs.square share-by-share by calculating shares' indexes using q.index - GetShares(ctx, rs.dag, nd.Links()[q.x].Cid, size, func(j int, share Share) { + GetShares(ctx, rs.session, nd.Links()[q.x].Cid, size, func(j int, share Share) { // the R lock here is *not* to protect rs.square from multiple concurrent shares writes // but to avoid races between share writes and repairing attempts // shares are written atomically in their own slice slot diff --git a/ipld/retriever_byzantine.go b/ipld/retriever_byzantine.go index fb27d801f9..1d18df432b 100644 --- a/ipld/retriever_byzantine.go +++ b/ipld/retriever_byzantine.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-blockservice" "github.com/tendermint/tendermint/pkg/da" "github.com/celestiaorg/celestia-node/ipld/plugin" @@ -33,7 +33,7 @@ func (e *ErrByzantine) Error() string { // TODO(@Wondertan): Migrate to ErrByzantineData in the newest rsmt2d func NewErrByzantine( ctx context.Context, - dag format.NodeGetter, + bGetter blockservice.BlockGetter, dah *da.DataAvailabilityHeader, errByz *rsmt2d.ErrByzantineData, ) *ErrByzantine { @@ -43,7 +43,7 @@ func NewErrByzantine( }[errByz.Axis][errByz.Index] sharesWithProof, err := GetProofsForShares( ctx, - dag, + bGetter, plugin.MustCidFromNamespacedSha256(root), errByz.Shares, ) diff --git a/ipld/retriever_test.go b/ipld/retriever_test.go index c8b4c6e74a..2126f4845e 100644 --- a/ipld/retriever_test.go +++ b/ipld/retriever_test.go @@ -18,8 +18,8 @@ func TestRetriever_Retrieve(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dag := mdutils.Mock() - r := NewRetriever(dag) + bServ := mdutils.Bserv() + r := NewRetriever(bServ) type test struct { name string @@ -36,7 +36,7 @@ func TestRetriever_Retrieve(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // generate EDS shares := RandShares(t, tc.squareSize*tc.squareSize) - in, err := AddShares(ctx, shares, dag) + in, err := AddShares(ctx, shares, bServ) require.NoError(t, err) // limit with timeout, specifically retrieval diff --git a/node/core/core.go b/node/core/core.go index 60ff112169..f797fd605e 100644 --- a/node/core/core.go +++ b/node/core/core.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-blockservice" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/libs/fxutil" @@ -58,10 +58,10 @@ func HeaderListener( lc fx.Lifecycle, ex *core.BlockFetcher, bcast header.Broadcaster, - dag format.DAGService, + bServ blockservice.BlockService, construct header.ConstructFn, ) *headercore.Listener { - cl := headercore.NewListener(bcast, ex, dag, construct) + cl := headercore.NewListener(bcast, ex, bServ, construct) lc.Append(fx.Hook{ OnStart: cl.Start, OnStop: cl.Stop, diff --git a/node/node.go b/node/node.go index d86dffea37..a5844dfe43 100644 --- a/node/node.go +++ b/node/node.go @@ -6,8 +6,8 @@ import ( "strings" "time" + "github.com/ipfs/go-blockservice" exchange "github.com/ipfs/go-ipfs-exchange-interface" - format "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/host" @@ -50,7 +50,7 @@ type Node struct { ConnGater connmgr.ConnectionGater Routing routing.PeerRouting DataExchange exchange.Interface - DAG format.DAGService + BlockService blockservice.BlockService // p2p protocols PubSub *pubsub.PubSub // services diff --git a/node/p2p/ipld.go b/node/p2p/ipld.go index 22b6a64f35..5835618a43 100644 --- a/node/p2p/ipld.go +++ b/node/p2p/ipld.go @@ -4,11 +4,9 @@ import ( "github.com/ipfs/go-blockservice" blockstore "github.com/ipfs/go-ipfs-blockstore" exchange "github.com/ipfs/go-ipfs-exchange-interface" - format "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" ) -// DAG constructs IPFS's DAG Service for fetching arbitrary Merkle structures. -func DAG(bs blockstore.Blockstore, ex exchange.Interface) format.DAGService { - return merkledag.NewDAGService(blockservice.New(bs, ex)) +// BlockService constructs IPFS's BlockService for fetching arbitrary Merkle structures. +func BlockService(bs blockstore.Blockstore, ex exchange.Interface) blockservice.BlockService { + return blockservice.New(bs, ex) } diff --git a/node/p2p/p2p.go b/node/p2p/p2p.go index 5c1c2d09dd..f239169005 100644 --- a/node/p2p/p2p.go +++ b/node/p2p/p2p.go @@ -63,7 +63,7 @@ func Components(cfg Config) fx.Option { fx.Provide(RoutedHost), fx.Provide(PubSub(cfg)), fx.Provide(DataExchange(cfg)), - fx.Provide(DAG), + fx.Provide(BlockService), fx.Provide(PeerRouting(cfg)), fx.Provide(ContentRouting), fx.Provide(AddrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)), diff --git a/node/services/service.go b/node/services/service.go index 9dede97497..41058fa2cc 100644 --- a/node/services/service.go +++ b/node/services/service.go @@ -3,9 +3,8 @@ package services import ( "context" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" - ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" @@ -122,8 +121,8 @@ func HeaderStoreInit(cfg *Config) func(context.Context, params.Network, header.S } // ShareService constructs new share.Service. -func ShareService(lc fx.Lifecycle, dag ipld.DAGService, avail share.Availability) *share.Service { - service := share.NewService(dag, avail) +func ShareService(lc fx.Lifecycle, bServ blockservice.BlockService, avail share.Availability) *share.Service { + service := share.NewService(bServ, avail) lc.Append(fx.Hook{ OnStart: service.Start, OnStop: service.Stop, @@ -148,10 +147,10 @@ func DASer( } // LightAvailability constructs light share availability. -func LightAvailability(ctx context.Context, lc fx.Lifecycle, dag ipld.DAGService) share.Availability { - return share.NewLightAvailability(merkledag.NewSession(fxutil.WithLifecycle(ctx, lc), dag)) +func LightAvailability(ctx context.Context, lc fx.Lifecycle, bServ blockservice.BlockService) share.Availability { + return share.NewLightAvailability(blockservice.NewSession(fxutil.WithLifecycle(ctx, lc), bServ)) } -func FullAvailability(dag ipld.DAGService) share.Availability { - return share.NewFullAvailability(dag) +func FullAvailability(bServ blockservice.BlockService) share.Availability { + return share.NewFullAvailability(bServ) } diff --git a/service/share/empty.go b/service/share/empty.go index 6eb3b99b16..931f505043 100644 --- a/service/share/empty.go +++ b/service/share/empty.go @@ -4,7 +4,7 @@ import ( "bytes" "context" - format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-blockservice" "github.com/tendermint/tendermint/pkg/consts" "github.com/celestiaorg/celestia-node/ipld" @@ -14,13 +14,13 @@ import ( // If it does not, it stores an empty block. This optimization exists to prevent // redundant storing of empty block data so that it is only stored once and returned // upon request for a block with an empty data square. Ref: header/header.go#L56 -func EnsureEmptySquareExists(ctx context.Context, dag format.DAGService) error { +func EnsureEmptySquareExists(ctx context.Context, bServ blockservice.BlockService) error { shares := make([][]byte, consts.MinSharecount) for i := 0; i < consts.MinSharecount; i++ { shares[i] = tailPaddingShare } - _, err := ipld.AddShares(ctx, shares, dag) + _, err := ipld.AddShares(ctx, shares, bServ) return err } diff --git a/service/share/full_availability.go b/service/share/full_availability.go index 271bceed5f..34dc10d8c6 100644 --- a/service/share/full_availability.go +++ b/service/share/full_availability.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/ipfs/go-blockservice" format "github.com/ipfs/go-ipld-format" "github.com/celestiaorg/celestia-node/ipld" @@ -17,9 +18,9 @@ type fullAvailability struct { } // NewFullAvailability creates a new full Availability. -func NewFullAvailability(dag format.DAGService) Availability { +func NewFullAvailability(bServ blockservice.BlockService) Availability { return &fullAvailability{ - rtrv: ipld.NewRetriever(dag), + rtrv: ipld.NewRetriever(bServ), } } diff --git a/service/share/light_availability.go b/service/share/light_availability.go index 9013f9f284..5c1651aa22 100644 --- a/service/share/light_availability.go +++ b/service/share/light_availability.go @@ -4,8 +4,8 @@ import ( "context" "errors" + "github.com/ipfs/go-blockservice" format "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" "github.com/celestiaorg/celestia-node/ipld" ) @@ -18,13 +18,13 @@ var DefaultSampleAmount = 16 // its availability. It is assumed that there are a lot of lightAvailability instances // on the network doing sampling over the same Root to collectively verify its availability. type lightAvailability struct { - getter format.NodeGetter + getter blockservice.BlockGetter } // NewLightAvailability creates a new light Availability. -func NewLightAvailability(get format.NodeGetter) Availability { +func NewLightAvailability(getter blockservice.BlockGetter) Availability { return &lightAvailability{ - getter: get, + getter: getter, } } @@ -40,12 +40,11 @@ func (la *lightAvailability) SharesAvailable(ctx context.Context, dah *Root) err ctx, cancel := context.WithTimeout(ctx, AvailabilityTimeout) defer cancel() - ses := merkledag.NewSession(ctx, la.getter) errs := make(chan error, len(samples)) for _, s := range samples { go func(s Sample) { root, leaf := translate(dah, s.Row, s.Col) - _, err := ipld.GetShare(ctx, ses, root, leaf, len(dah.RowsRoots)) + _, err := ipld.GetShare(ctx, la.getter, root, leaf, len(dah.RowsRoots)) // we don't really care about Share bodies at this point // it also means we now saved the Share in local storage select { diff --git a/service/share/share.go b/service/share/share.go index c48a3a6855..c985cf674a 100644 --- a/service/share/share.go +++ b/service/share/share.go @@ -7,10 +7,9 @@ import ( "golang.org/x/sync/errgroup" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" - "github.com/ipfs/go-merkledag" "github.com/tendermint/tendermint/pkg/da" "github.com/celestiaorg/celestia-node/ipld" @@ -50,21 +49,21 @@ type Root = da.DataAvailabilityHeader // TODO(@Wondertan): Simple thread safety for Start and Stop would not hurt. type Service struct { Availability - rtrv *ipld.Retriever - dag format.DAGService - // session is dag sub-session that applies optimization for fetching/loading related nodes, like shares - // prefer session over dag for fetching nodes. - session format.NodeGetter + rtrv *ipld.Retriever + bServ blockservice.BlockService + // session is blockservice sub-session that applies optimization for fetching/loading related nodes, like shares + // prefer session over blockservice for fetching nodes. + session blockservice.BlockGetter // cancel controls lifecycle of the session cancel context.CancelFunc } // NewService creates new basic share.Service. -func NewService(dag format.DAGService, avail Availability) *Service { +func NewService(bServ blockservice.BlockService, avail Availability) *Service { return &Service{ - rtrv: ipld.NewRetriever(dag), + rtrv: ipld.NewRetriever(bServ), Availability: avail, - dag: dag, + bServ: bServ, } } @@ -79,7 +78,7 @@ func (s *Service) Start(context.Context) error { // The newer context here is created to control lifecycle of the session. ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel - s.session = merkledag.NewSession(ctx, s.dag) + s.session = blockservice.NewSession(ctx, s.bServ) return nil } @@ -96,7 +95,7 @@ func (s *Service) Stop(context.Context) error { func (s *Service) GetShare(ctx context.Context, dah *Root, row, col int) (Share, error) { root, leaf := translate(dah, row, col) - nd, err := ipld.GetShare(ctx, s.dag, root, leaf, len(dah.RowsRoots)) + nd, err := ipld.GetShare(ctx, s.bServ, root, leaf, len(dah.RowsRoots)) if err != nil { return nil, err } @@ -145,7 +144,7 @@ func (s *Service) GetSharesByNamespace(ctx context.Context, root *Root, nID name // shadow loop variables, to ensure correct values are captured i, rootCID := i, rootCID errGroup.Go(func() (err error) { - shares[i], err = ipld.GetSharesByNamespace(ctx, s.dag, rootCID, nID) + shares[i], err = ipld.GetSharesByNamespace(ctx, s.bServ, rootCID, nID) return }) } diff --git a/service/share/share_test.go b/service/share/share_test.go index 36dc7faf65..ddd7857e4c 100644 --- a/service/share/share_test.go +++ b/service/share/share_test.go @@ -52,7 +52,7 @@ func TestService_GetSharesByNamespace(t *testing.T) { for _, tt := range tests { t.Run("size: "+strconv.Itoa(tt.squareSize), func(t *testing.T) { - serv, dag := RandLightService() + serv, bServ := RandLightService() n := tt.squareSize * tt.squareSize randShares := RandShares(t, n) idx1 := (n - 1) / 2 @@ -61,7 +61,7 @@ func TestService_GetSharesByNamespace(t *testing.T) { // make it so that two rows have the same namespace ID copy(randShares[idx2][:8], randShares[idx1][:8]) } - root := FillDag(t, dag, randShares) + root := FillDag(t, bServ, randShares) randNID := randShares[idx1][:8] shares, err := serv.GetSharesByNamespace(context.Background(), root, randNID) diff --git a/service/share/testing.go b/service/share/testing.go index 25b7c51382..25192de889 100644 --- a/service/share/testing.go +++ b/service/share/testing.go @@ -13,7 +13,6 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/ipfs/go-ipfs-routing/offline" format "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" mdutils "github.com/ipfs/go-merkledag/test" record "github.com/libp2p/go-libp2p-record" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -31,31 +30,31 @@ import ( // RandLightServiceWithSquare provides a share.Service filled with 'n' NMT // trees of 'n' random shares, essentially storing a whole square. func RandLightServiceWithSquare(t *testing.T, n int) (*Service, *Root) { - dag := mdutils.Mock() - return NewService(dag, NewLightAvailability(dag)), RandFillDAG(t, n, dag) + bServ := mdutils.Bserv() + return NewService(bServ, NewLightAvailability(bServ)), RandFillDAG(t, n, bServ) } // RandLightService provides an unfilled share.Service with corresponding -// format.DAGService than can be filled by the test. -func RandLightService() (*Service, format.DAGService) { - dag := mdutils.Mock() - return NewService(dag, NewLightAvailability(dag)), dag +// blockservice.BlockService than can be filled by the test. +func RandLightService() (*Service, blockservice.BlockService) { + bServ := mdutils.Bserv() + return NewService(bServ, NewLightAvailability(bServ)), bServ } // RandFullServiceWithSquare provides a share.Service filled with 'n' NMT // trees of 'n' random shares, essentially storing a whole square. func RandFullServiceWithSquare(t *testing.T, n int) (*Service, *Root) { - dag := mdutils.Mock() - return NewService(dag, NewFullAvailability(dag)), RandFillDAG(t, n, dag) + bServ := mdutils.Bserv() + return NewService(bServ, NewFullAvailability(bServ)), RandFillDAG(t, n, bServ) } -func RandFillDAG(t *testing.T, n int, dag format.DAGService) *Root { +func RandFillDAG(t *testing.T, n int, bServ blockservice.BlockService) *Root { shares := RandShares(t, n*n) - return FillDag(t, dag, shares) + return FillDag(t, bServ, shares) } -func FillDag(t *testing.T, dag format.DAGService, shares []Share) *Root { - na := ipld.NewNmtNodeAdder(context.TODO(), dag) +func FillDag(t *testing.T, bServ blockservice.BlockService, shares []Share) *Root { + na := ipld.NewNmtNodeAdder(context.TODO(), bServ, format.MaxSizeBatchOption(len(shares))) squareSize := uint32(math.Sqrt(float64(len(shares)))) tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize), nmt.NodeVisitor(na.Visit)) @@ -90,26 +89,26 @@ func NewDAGNet(ctx context.Context, t *testing.T) *DAGNet { } func (dn *DAGNet) RandLightService(n int) (*Service, *Root) { - dag, root := dn.RandDAG(n) - return NewService(dag, NewLightAvailability(dag)), root + bServ, root := dn.RandDAG(n) + return NewService(bServ, NewLightAvailability(bServ)), root } func (dn *DAGNet) RandFullService(n int) (*Service, *Root) { - dag, root := dn.RandDAG(n) - return NewService(dag, NewFullAvailability(dag)), root + bServ, root := dn.RandDAG(n) + return NewService(bServ, NewFullAvailability(bServ)), root } -func (dn *DAGNet) RandDAG(n int) (format.DAGService, *Root) { - dag := dn.CleanDAG() - return dag, RandFillDAG(dn.t, n, dag) +func (dn *DAGNet) RandDAG(n int) (blockservice.BlockService, *Root) { + bServ := dn.CleanDAG() + return bServ, RandFillDAG(dn.t, n, bServ) } func (dn *DAGNet) CleanService() *Service { - dag := dn.CleanDAG() - return NewService(dag, NewLightAvailability(dag)) + bServ := dn.CleanDAG() + return NewService(bServ, NewLightAvailability(bServ)) } -func (dn *DAGNet) CleanDAG() format.DAGService { +func (dn *DAGNet) CleanDAG() blockservice.BlockService { nd, err := dn.net.GenPeer() require.NoError(dn.t, err) @@ -117,7 +116,7 @@ func (dn *DAGNet) CleanDAG() format.DAGService { bstore := blockstore.NewBlockstore(dstore) routing := offline.NewOfflineRouter(dstore, record.NamespacedValidator{}) bs := bitswap.New(dn.ctx, network.NewFromIpfsHost(nd, routing), bstore, bitswap.ProvideEnabled(false)) - return merkledag.NewDAGService(blockservice.New(bstore, bs)) + return blockservice.New(bstore, bs) } func (dn *DAGNet) ConnectAll() {