Skip to content

Commit

Permalink
ipld: move for dag service to block service (#730)
Browse files Browse the repository at this point in the history
* ipld: move for dag service to block service

* chore: rename dag to bServ
  • Loading branch information
vgonkivs authored May 30, 2022
1 parent 67250a1 commit 1c45a66
Show file tree
Hide file tree
Showing 29 changed files with 214 additions and 189 deletions.
40 changes: 20 additions & 20 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"testing"
"time"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
format "github.com/ipfs/go-ipld-format"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -23,10 +23,10 @@ var timeout = time.Second * 15
// the DASer checkpoint is updated to network head.
func TestDASerLifecycle(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
bServ := mdutils.Bserv()

// 15 headers from the past and 15 future headers
mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15)
mockGet, shareServ, sub := createDASerSubcomponents(t, bServ, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
Expand Down Expand Up @@ -61,10 +61,10 @@ func TestDASerLifecycle(t *testing.T) {

func TestDASer_Restart(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
bServ := mdutils.Bserv()

// 15 headers from the past and 15 future headers
mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15)
mockGet, shareServ, sub := createDASerSubcomponents(t, bServ, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
Expand All @@ -85,10 +85,10 @@ func TestDASer_Restart(t *testing.T) {
require.NoError(t, err)

// reset mockGet, generate 15 "past" headers, building off chain head which is 30
mockGet.generateHeaders(t, dag, 30, 45)
mockGet.generateHeaders(t, bServ, 30, 45)
mockGet.doneCh = make(chan struct{})
// reset dummy subscriber
mockGet.fillSubWithHeaders(t, sub, dag, 45, 60)
mockGet.fillSubWithHeaders(t, sub, bServ, 45, 60)
// manually set mockGet head to trigger stop at 45
mockGet.head = int64(45)

Expand Down Expand Up @@ -124,9 +124,9 @@ func TestDASer_Restart(t *testing.T) {

func TestDASer_catchUp(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
bServ := mdutils.Bserv()

mockGet, shareServ, _ := createDASerSubcomponents(t, dag, 5, 0)
mockGet, shareServ, _ := createDASerSubcomponents(t, bServ, 5, 0)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down Expand Up @@ -165,9 +165,9 @@ func TestDASer_catchUp(t *testing.T) {
// difference of 1
func TestDASer_catchUp_oneHeader(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
bServ := mdutils.Bserv()

mockGet, shareServ, _ := createDASerSubcomponents(t, dag, 6, 0)
mockGet, shareServ, _ := createDASerSubcomponents(t, bServ, 6, 0)
daser := NewDASer(shareServ, nil, mockGet, ds)

// store checkpoint
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestDASer_catchUp_oneHeader(t *testing.T) {

func TestDASer_catchUp_fails(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
dag := mdutils.Bserv()

mockGet, _, _ := createDASerSubcomponents(t, dag, 6, 0)
daser := NewDASer(share.NewBrokenAvailability(), nil, mockGet, ds)
Expand Down Expand Up @@ -256,21 +256,21 @@ func TestDASer_catchUp_fails(t *testing.T) {
// mockGetter, share.Service, and mock header.Subscriber.
func createDASerSubcomponents(
t *testing.T,
dag format.DAGService,
bServ blockservice.BlockService,
numGetter,
numSub int,
) (*mockGetter, *share.Service, *header.DummySubscriber) {
shareServ := share.NewService(dag, share.NewLightAvailability(dag))
shareServ := share.NewService(bServ, share.NewLightAvailability(bServ))

mockGet := &mockGetter{
headers: make(map[int64]*header.ExtendedHeader),
doneCh: make(chan struct{}),
}

mockGet.generateHeaders(t, dag, 0, numGetter)
mockGet.generateHeaders(t, bServ, 0, numGetter)

sub := new(header.DummySubscriber)
mockGet.fillSubWithHeaders(t, sub, dag, numGetter, numGetter+numSub)
mockGet.fillSubWithHeaders(t, sub, bServ, numGetter, numGetter+numSub)

return mockGet, shareServ, sub
}
Expand All @@ -279,15 +279,15 @@ func createDASerSubcomponents(
func (m *mockGetter) fillSubWithHeaders(
t *testing.T,
sub *header.DummySubscriber,
dag format.DAGService,
bServ blockservice.BlockService,
startHeight,
endHeight int,
) {
sub.Headers = make([]*header.ExtendedHeader, endHeight-startHeight)

index := 0
for i := startHeight; i < endHeight; i++ {
dah := share.RandFillDAG(t, 16, dag)
dah := share.RandFillDAG(t, 16, bServ)

randHeader := header.RandExtendedHeader(t)
randHeader.DataHash = dah.Hash()
Expand All @@ -309,9 +309,9 @@ type mockGetter struct {
headers map[int64]*header.ExtendedHeader
}

func (m *mockGetter) generateHeaders(t *testing.T, dag format.DAGService, startHeight, endHeight int) {
func (m *mockGetter) generateHeaders(t *testing.T, bServ blockservice.BlockService, startHeight, endHeight int) {
for i := startHeight; i < endHeight; i++ {
dah := share.RandFillDAG(t, 16, dag)
dah := share.RandFillDAG(t, 16, bServ)

randHeader := header.RandExtendedHeader(t)
randHeader.DataHash = dah.Hash()
Expand Down
6 changes: 3 additions & 3 deletions fraud/bad_encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ import (
)

func TestFraudProofValidation(t *testing.T) {
dag := mdutils.Mock()
bServ := mdutils.Bserv()
eds := ipld.RandEDS(t, 2)

shares := ipld.ExtractEDS(eds)
copy(shares[3][8:], shares[4][8:])
eds, err := ipld.ImportShares(context.Background(), shares, dag)
eds, err := ipld.ImportShares(context.Background(), shares, bServ)
require.NoError(t, err)
da := da.NewDataAvailabilityHeader(eds)
r := ipld.NewRetriever(dag)
r := ipld.NewRetriever(bServ)
_, err = r.Retrieve(context.Background(), &da)
var errByz *ipld.ErrByzantine
require.True(t, errors.As(err, &errByz))
Expand Down
8 changes: 4 additions & 4 deletions header/core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"context"
"fmt"

format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-blockservice"
logging "github.com/ipfs/go-log/v2"

tmbytes "github.com/tendermint/tendermint/libs/bytes"
Expand All @@ -18,14 +18,14 @@ var log = logging.Logger("header/core")

type Exchange struct {
fetcher *core.BlockFetcher
shareStore format.DAGService
shareStore blockservice.BlockService
construct header.ConstructFn
}

func NewExchange(fetcher *core.BlockFetcher, dag format.DAGService, construct header.ConstructFn) *Exchange {
func NewExchange(fetcher *core.BlockFetcher, bServ blockservice.BlockService, construct header.ConstructFn) *Exchange {
return &Exchange{
fetcher: fetcher,
shareStore: dag,
shareStore: bServ,
construct: construct,
}
}
Expand Down
2 changes: 1 addition & 1 deletion header/core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
t.Cleanup(cancel)

fetcher := createCoreFetcher(ctx, t)
store := mdutils.Mock()
store := mdutils.Bserv()

// generate 10 blocks
generateBlocks(t, fetcher)
Expand Down
10 changes: 5 additions & 5 deletions header/core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-blockservice"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/tendermint/tendermint/types"

Expand All @@ -22,21 +22,21 @@ import (
type Listener struct {
bcast header.Broadcaster
fetcher *core.BlockFetcher
dag format.DAGService
bServ blockservice.BlockService
construct header.ConstructFn
cancel context.CancelFunc
}

func NewListener(
bcast header.Broadcaster,
fetcher *core.BlockFetcher,
dag format.DAGService,
bServ blockservice.BlockService,
construct header.ConstructFn,
) *Listener {
return &Listener{
bcast: bcast,
fetcher: fetcher,
dag: dag,
bServ: bServ,
construct: construct,
}
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan *types.Block) {
return
}

eh, err := cl.construct(ctx, b, comm, vals, cl.dag)
eh, err := cl.construct(ctx, b, comm, vals, cl.bServ)
if err != nil {
log.Errorw("listener: making extended header", "err", err)
return
Expand Down
2 changes: 1 addition & 1 deletion header/core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,5 @@ func createListener(
require.NoError(t, err)
})

return NewListener(p2pSub, fetcher, mdutils.Mock(), header.MakeExtendedHeader)
return NewListener(p2pSub, fetcher, mdutils.Bserv(), header.MakeExtendedHeader)
}
6 changes: 3 additions & 3 deletions header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"context"
"fmt"

"github.com/ipfs/go-blockservice"
logging "github.com/ipfs/go-log/v2"

format "github.com/ipfs/go-ipld-format"
bts "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/pkg/da"
core "github.com/tendermint/tendermint/types"
Expand Down Expand Up @@ -43,15 +43,15 @@ func MakeExtendedHeader(
b *core.Block,
comm *core.Commit,
vals *core.ValidatorSet,
dag format.NodeAdder,
bServ blockservice.BlockService,
) (*ExtendedHeader, error) {
var dah DataAvailabilityHeader
if len(b.Txs) > 0 {
namespacedShares, _, err := b.Data.ComputeShares(b.OriginalSquareSize)
if err != nil {
return nil, err
}
extended, err := ipld.AddShares(ctx, namespacedShares.RawShares(), dag)
extended, err := ipld.AddShares(ctx, namespacedShares.RawShares(), bServ)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion header/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {
_, client := core.StartTestClient(ctx, t)
fetcher := core.NewBlockFetcher(client)

store := mdutils.Mock()
store := mdutils.Bserv()

sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions header/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"fmt"

format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-blockservice"
pubsub "github.com/libp2p/go-libp2p-pubsub"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
core "github.com/tendermint/tendermint/types"
Expand All @@ -17,7 +17,7 @@ type ConstructFn = func(
*core.Block,
*core.Commit,
*core.ValidatorSet,
format.NodeAdder,
blockservice.BlockService,
) (*ExtendedHeader, error)

// Validator aliases a func that validates ExtendedHeader.
Expand Down
20 changes: 14 additions & 6 deletions ipld/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"

"github.com/ipfs/go-blockservice"
ipld "github.com/ipfs/go-ipld-format"

"github.com/celestiaorg/nmt"
Expand All @@ -13,15 +14,19 @@ import (
"github.com/tendermint/tendermint/pkg/wrapper"
)

// AddShares erasures and extends shares to IPLD DAG using the provided ipld.NodeAdder.
func AddShares(ctx context.Context, shares []Share, adder ipld.NodeAdder) (*rsmt2d.ExtendedDataSquare, error) {
// AddShares erasures and extends shares to blockservice.BlockService using the provided ipld.NodeAdder.
func AddShares(
ctx context.Context,
shares []Share,
adder blockservice.BlockService,
) (*rsmt2d.ExtendedDataSquare, error) {
if len(shares) == 0 {
return nil, fmt.Errorf("empty data") // empty block is not an empty Data
}
squareSize := int(math.Sqrt(float64(len(shares))))
// create nmt adder wrapping batch adder with calculated size
bs := batchSize(squareSize * 2)
batchAdder := NewNmtNodeAdder(ctx, ipld.NewBatch(ctx, adder, ipld.MaxSizeBatchOption(bs)))
batchAdder := NewNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs))
// create the nmt wrapper to generate row and col commitments
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize), nmt.NodeVisitor(batchAdder.Visit))
// recompute the eds
Expand All @@ -35,15 +40,18 @@ func AddShares(ctx context.Context, shares []Share, adder ipld.NodeAdder) (*rsmt
return eds, batchAdder.Commit()
}

// ImportShares imports flattend chunks of data into Extended Data square and saves it in IPLD DAG
func ImportShares(ctx context.Context, shares [][]byte, na ipld.NodeAdder) (*rsmt2d.ExtendedDataSquare, error) {
// ImportShares imports flattend chunks of data into Extended Data square and saves it in blockservice.BlockService
func ImportShares(
ctx context.Context,
shares [][]byte,
adder blockservice.BlockService) (*rsmt2d.ExtendedDataSquare, error) {
if len(shares) == 0 {
return nil, fmt.Errorf("ipld: importing empty data")
}
squareSize := int(math.Sqrt(float64(len(shares))))
// create nmt adder wrapping batch adder with calculated size
bs := batchSize(squareSize * 2)
batchAdder := NewNmtNodeAdder(ctx, ipld.NewBatch(ctx, na, ipld.MaxSizeBatchOption(bs)))
batchAdder := NewNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs))
// create the nmt wrapper to generate row and col commitments
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize/2), nmt.NodeVisitor(batchAdder.Visit))
// recompute the eds
Expand Down
Loading

0 comments on commit 1c45a66

Please sign in to comment.