Skip to content

Commit

Permalink
fix(share/byzantine): fix proof collection (#2957)
Browse files Browse the repository at this point in the history
This PR fixes an issue in the proof collection method. Previously, we
were sending `len(dah.RowRoots)/2` requests in order to get proofs but
there is a use case when the process can get stuck for a particular
share(out-of-order case). The new approach sends n requests, where n is
the number of non-empty shares received from rsmt2d library, and then
waits until `len(dah.RowRoots)/2` will be received.
I've also improved error handling as previously we were panicking in
case any error appeared. The new approach will rely only on
`context.DeadlineExceeded ` which basically means that for some reason
we can't get proofs, so the data is not available.

---------

Co-authored-by: ramin <raminkeene@gmail.com>
  • Loading branch information
vgonkivs and ramin authored Dec 18, 2023
1 parent 1d228f0 commit e38a7bf
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 38 deletions.
111 changes: 97 additions & 14 deletions share/eds/byzantine/bad_encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ package byzantine

import (
"context"
"crypto/sha256"
"hash"
"testing"
"time"

"github.com/ipfs/boxo/blockservice"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
mhcore "github.com/multiformats/go-multihash/core"
"github.com/stretchr/testify/require"
core "github.com/tendermint/tendermint/types"

Expand Down Expand Up @@ -35,10 +41,11 @@ func TestBEFP_Validate(t *testing.T) {
err = square.Repair(dah.RowRoots, dah.ColumnRoots)
require.ErrorAs(t, err, &errRsmt2d)

errByz := NewErrByzantine(ctx, bServ, &dah, errRsmt2d)
byzantine := NewErrByzantine(ctx, bServ, &dah, errRsmt2d)
var errByz *ErrByzantine
require.ErrorAs(t, byzantine, &errByz)

befp := CreateBadEncodingProof([]byte("hash"), 0, errByz)

var test = []struct {
name string
prepareFn func() error
Expand Down Expand Up @@ -69,7 +76,9 @@ func TestBEFP_Validate(t *testing.T) {
Shares: validShares[0:4],
},
)
invalidBefp := CreateBadEncodingProof([]byte("hash"), 0, errInvalidByz)
var errInvalid *ErrByzantine
require.ErrorAs(t, errInvalidByz, &errInvalid)
invalidBefp := CreateBadEncodingProof([]byte("hash"), 0, errInvalid)
return invalidBefp.Validate(&header.ExtendedHeader{DAH: &validDah})
},
expectedResult: func(err error) {
Expand Down Expand Up @@ -198,18 +207,21 @@ func TestIncorrectBadEncodingFraudProof(t *testing.T) {
}

func TestBEFP_ValidateOutOfOrderShares(t *testing.T) {
// skipping it for now because `malicious` package has a small issue: Constructor does not apply
// passed options, so it's not possible to store shares and thus get proofs for them.
// should be ok once app team will fix it.
t.Skip()
eds := edstest.RandEDS(t, 16)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

size := 4
eds := edstest.RandEDS(t, size)

shares := eds.Flattened()
shares[0], shares[1] = shares[1], shares[0] // corrupting eds
bServ := ipld.NewMemBlockservice()
batchAddr := ipld.NewNmtNodeAdder(context.Background(), bServ, ipld.MaxSizeBatchOption(16*2))
shares[0], shares[4] = shares[4], shares[0] // corrupting eds

bServ := newNamespacedBlockService()
batchAddr := ipld.NewNmtNodeAdder(ctx, bServ, ipld.MaxSizeBatchOption(size*2))

eds, err := rsmt2d.ImportExtendedDataSquare(shares,
share.DefaultRSMT2DCodec(),
malicious.NewConstructor(16, nmt.NodeVisitor(batchAddr.Visit)),
malicious.NewConstructor(uint64(size), nmt.NodeVisitor(batchAddr.Visit)),
)
require.NoError(t, err, "failure to recompute the extended data square")

Expand All @@ -223,9 +235,80 @@ func TestBEFP_ValidateOutOfOrderShares(t *testing.T) {
err = eds.Repair(dah.RowRoots, dah.ColumnRoots)
require.ErrorAs(t, err, &errRsmt2d)

errByz := NewErrByzantine(context.Background(), bServ, &dah, errRsmt2d)
byzantine := NewErrByzantine(ctx, bServ, &dah, errRsmt2d)
var errByz *ErrByzantine
require.ErrorAs(t, byzantine, &errByz)

befp := CreateBadEncodingProof([]byte("hash"), 0, errByz)
err = befp.Validate(&header.ExtendedHeader{DAH: &dah})
require.Error(t, err)
require.NoError(t, err)
}

// namespacedBlockService wraps `BlockService` and extends the verification part
// to avoid returning blocks that has out of order namespaces.
type namespacedBlockService struct {
blockservice.BlockService
// the data structure that is used on the networking level, in order
// to verify the order of the namespaces
prefix *cid.Prefix
}

func newNamespacedBlockService() *namespacedBlockService {
sha256NamespaceFlagged := uint64(0x7701)
// register the nmt hasher to validate the order of namespaces
mhcore.Register(sha256NamespaceFlagged, func() hash.Hash {
nh := nmt.NewNmtHasher(sha256.New(), share.NamespaceSize, true)
nh.Reset()
return nh
})

bs := &namespacedBlockService{}
bs.BlockService = ipld.NewMemBlockservice()

bs.prefix = &cid.Prefix{
Version: 1,
Codec: sha256NamespaceFlagged,
MhType: sha256NamespaceFlagged,
// equals to NmtHasher.Size()
MhLength: sha256.New().Size() + 2*share.NamespaceSize,
}
return bs
}

func (n *namespacedBlockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
block, err := n.BlockService.GetBlock(ctx, c)
if err != nil {
return nil, err
}

_, err = n.prefix.Sum(block.RawData())
if err != nil {
return nil, err
}
return block, nil
}

func (n *namespacedBlockService) GetBlocks(ctx context.Context, cids []cid.Cid) <-chan blocks.Block {
blockCh := n.BlockService.GetBlocks(ctx, cids)
resultCh := make(chan blocks.Block)

go func() {
for {
select {
case <-ctx.Done():
close(resultCh)
return
case block, ok := <-blockCh:
if !ok {
close(resultCh)
return
}
if _, err := n.prefix.Sum(block.RawData()); err != nil {
continue
}
resultCh <- block
}
}
}()
return resultCh
}
46 changes: 22 additions & 24 deletions share/eds/byzantine/byzantine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"fmt"

"github.com/ipfs/boxo/blockservice"
"golang.org/x/sync/errgroup"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/rsmt2d"
"github.com/ipfs/boxo/blockservice"

"github.com/celestiaorg/celestia-node/share/ipld"
)
Expand All @@ -35,49 +33,49 @@ func NewErrByzantine(
bGetter blockservice.BlockGetter,
dah *da.DataAvailabilityHeader,
errByz *rsmt2d.ErrByzantineData,
) *ErrByzantine {
) error {
// changing the order to collect proofs against an orthogonal axis
roots := [][][]byte{
dah.ColumnRoots,
dah.RowRoots,
}[errByz.Axis]

sharesWithProof := make([]*ShareWithProof, len(errByz.Shares))
sharesAmount := 0

errGr, ctx := errgroup.WithContext(ctx)
type result struct {
share *ShareWithProof
index int
}
resultCh := make(chan *result)
for index, share := range errByz.Shares {
// skip further shares if we already requested half of them, which is enough to recompute the row
// or col
if sharesAmount == len(dah.RowRoots)/2 {
break
}

if share == nil {
continue
}
sharesAmount++

index := index
errGr.Go(func() error {
go func() {
share, err := getProofsAt(
ctx, bGetter,
ipld.MustCidFromNamespacedSha256(roots[index]),
int(errByz.Index), len(errByz.Shares),
)
sharesWithProof[index] = share
return err
})
if err != nil {
log.Warn("requesting proof failed", "root", roots[index], "err", err)
return
}
resultCh <- &result{share, index}
}()
}

if err := errGr.Wait(); err != nil {
// Fatal as rsmt2d proved that error is byzantine,
// but we cannot properly collect the proof,
// so verification will fail and thus services won't be stopped
// while we still have to stop them.
// TODO(@Wondertan): Find a better way to handle
log.Fatalw("getting proof for ErrByzantine", "err", err)
for i := 0; i < len(dah.RowRoots)/2; i++ {
select {
case t := <-resultCh:
sharesWithProof[t.index] = t.share
case <-ctx.Done():
return ipld.ErrNodeNotFound
}
}

return &ErrByzantine{
Index: uint32(errByz.Index),
Shares: sharesWithProof,
Expand Down

0 comments on commit e38a7bf

Please sign in to comment.