Skip to content

Commit

Permalink
fix(blob/service): fix handling of the padding shares (#3404)
Browse files Browse the repository at this point in the history
Fixes the issue with the missing blob. Initially, there was an incorrect assumption that only one padding share is possible between two blobs. So, the Blob service was skipping only one share instead of multiple. 

* Fixed parsing logic allowing to skip multiple padding shares;
* Added a test case that retrieves the EDS and finds the correct number of blobs;
  • Loading branch information
vgonkivs authored and Wondertan committed May 20, 2024
1 parent 081da17 commit 72111f4
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 44 deletions.
26 changes: 14 additions & 12 deletions blob/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,21 @@ func (p *parser) skipPadding(shares []shares.Share) ([]shares.Share, error) {
return nil, errEmptyShares
}

isPadding, err := shares[0].IsPadding()
if err != nil {
return nil, err
}

if !isPadding {
return shares, nil
offset := 0
for _, sh := range shares {
isPadding, err := sh.IsPadding()
if err != nil {
return nil, err
}
if !isPadding {
break
}
offset++
}

// update blob index if we are going to skip one share
p.index++
if len(shares) > 1 {
return shares[1:], nil
// set start index
p.index = offset
if len(shares) > offset {
return shares[offset:], nil
}
return nil, nil
}
Expand Down
2 changes: 2 additions & 0 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ func (s *Service) retrieve(
shrs, err = sharesParser.set(rowIndex*len(header.DAH.RowRoots)+index, appShares)
if err != nil {
if errors.Is(err, errEmptyShares) {
// reset parser as `skipPadding` can update next blob's index
sharesParser.reset()
appShares = nil
break
}
Expand Down
141 changes: 109 additions & 32 deletions blob/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,15 @@ import (
"testing"
"time"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/shares"
"github.com/celestiaorg/go-header/store"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
tmrand "github.com/tendermint/tendermint/libs/rand"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/shares"
"github.com/celestiaorg/go-header/store"

"github.com/celestiaorg/celestia-node/blob/blobtest"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
Expand Down Expand Up @@ -425,64 +424,68 @@ func TestService_Get(t *testing.T) {
}
}

// TestService_GetAllWithoutPadding it retrieves all blobs under the given namespace:
// the amount of the blobs is known and equal to 5. Then it ensures that each blob has a correct index inside the eds
// by requesting share and comparing them.
func TestService_GetAllWithoutPadding(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

appBlob, err := blobtest.GenerateV0Blobs([]int{9, 5}, true)
appBlob, err := blobtest.GenerateV0Blobs([]int{9, 5, 15, 4, 24}, true)
require.NoError(t, err)
blobs, err := convertBlobs(appBlob...)
require.NoError(t, err)

ns1, ns2 := blobs[0].Namespace().ToAppNamespace(), blobs[1].Namespace().ToAppNamespace()
var (
ns = blobs[0].Namespace().ToAppNamespace()
rawShares = make([][]byte, 0)
)

padding0, err := shares.NamespacePaddingShare(ns1, appconsts.ShareVersionZero)
require.NoError(t, err)
padding1, err := shares.NamespacePaddingShare(ns2, appconsts.ShareVersionZero)
padding, err := shares.NamespacePaddingShare(ns, appconsts.ShareVersionZero)
require.NoError(t, err)
rawShares0, err := BlobsToShares(blobs[0])

for i := 0; i < 2; i++ {
sh, err := BlobsToShares(blobs[i])
require.NoError(t, err)
rawShares = append(rawShares, append(sh, padding.ToBytes())...)
}

sh, err := BlobsToShares(blobs[2])
require.NoError(t, err)
rawShares1, err := BlobsToShares(blobs[1])
rawShares = append(rawShares, append(sh, padding.ToBytes(), padding.ToBytes())...)

sh, err = BlobsToShares(blobs[3])
require.NoError(t, err)
rawShares := make([][]byte, 0)
rawShares = append(rawShares, append(sh, padding.ToBytes(), padding.ToBytes(), padding.ToBytes())...)

// create shares in correct order with padding shares
if bytes.Compare(blobs[0].Namespace(), blobs[1].Namespace()) <= 0 {
rawShares = append(rawShares, append(rawShares0, padding0.ToBytes())...)
rawShares = append(rawShares, append(rawShares1, padding1.ToBytes())...)
} else {
rawShares = append(rawShares, append(rawShares1, padding1.ToBytes())...)
rawShares = append(rawShares, append(rawShares0, padding0.ToBytes())...)
}
sh, err = BlobsToShares(blobs[4])
require.NoError(t, err)
rawShares = append(rawShares, sh...)

bs := ipld.NewMemBlockservice()
batching := ds_sync.MutexWrap(ds.NewMapDatastore())
headerStore, err := store.NewStore[*header.ExtendedHeader](batching)
require.NoError(t, err)
eds, err := ipld.AddShares(ctx, rawShares, bs)
require.NoError(t, err)

h := headertest.ExtendedHeaderFromEDS(t, 1, eds)
err = headerStore.Init(ctx, h)
require.NoError(t, err)

fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return headerStore.GetByHeight(ctx, height)
return h, nil
}

service := NewService(nil, getters.NewIPLDGetter(bs), fn)

blobs, err = service.GetAll(ctx, 1, []share.Namespace{blobs[0].Namespace(), blobs[1].Namespace()})
newBlobs, err := service.GetAll(ctx, 1, []share.Namespace{blobs[0].Namespace()})
require.NoError(t, err)
assert.Equal(t, len(newBlobs), len(blobs))

resultShares, err := BlobsToShares(blobs...)
resultShares, err := BlobsToShares(newBlobs...)
require.NoError(t, err)
sort.Slice(blobs, func(i, j int) bool {
val := bytes.Compare(blobs[i].NamespaceId, blobs[j].NamespaceId)
return val < 0
})

shareOffset := 0
for _, blob := range blobs {
for i, blob := range newBlobs {
require.True(t, blobs[i].compareCommitments(blob.Commitment))

row, col := calculateIndex(len(h.DAH.RowRoots), blob.index)
sh, err := service.shareGetter.GetShare(ctx, h, row, col)
require.NoError(t, err)
Expand All @@ -492,6 +495,80 @@ func TestService_GetAllWithoutPadding(t *testing.T) {
}
}

func TestAllPaddingSharesInEDS(t *testing.T) {
nid, err := share.NewBlobNamespaceV0(tmrand.Bytes(7))
require.NoError(t, err)
padding, err := shares.NamespacePaddingShare(nid.ToAppNamespace(), appconsts.ShareVersionZero)
require.NoError(t, err)

rawShares := make([]share.Share, 16)
for i := 0; i < 16; i++ {
rawShares[i] = padding.ToBytes()
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

bs := ipld.NewMemBlockservice()
require.NoError(t, err)
eds, err := ipld.AddShares(ctx, rawShares, bs)
require.NoError(t, err)

h := headertest.ExtendedHeaderFromEDS(t, 1, eds)

fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return h, nil
}

service := NewService(nil, getters.NewIPLDGetter(bs), fn)
_, err = service.GetAll(ctx, 1, []share.Namespace{nid})
require.Error(t, err)
}

func TestSkipPaddingsAndRetrieveBlob(t *testing.T) {
nid, err := share.NewBlobNamespaceV0(tmrand.Bytes(7))
require.NoError(t, err)
padding, err := shares.NamespacePaddingShare(nid.ToAppNamespace(), appconsts.ShareVersionZero)
require.NoError(t, err)

rawShares := make([]share.Share, 0, 64)
for i := 0; i < 58; i++ {
rawShares = append(rawShares, padding.ToBytes())
}

appBlob, err := blobtest.GenerateV0Blobs([]int{6}, true)
require.NoError(t, err)
appBlob[0].NamespaceVersion = nid[0]
appBlob[0].NamespaceID = nid[1:]

blobs, err := convertBlobs(appBlob...)
require.NoError(t, err)
sh, err := BlobsToShares(blobs[0])
require.NoError(t, err)

rawShares = append(rawShares, sh...)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

bs := ipld.NewMemBlockservice()
require.NoError(t, err)
eds, err := ipld.AddShares(ctx, rawShares, bs)
require.NoError(t, err)

h := headertest.ExtendedHeaderFromEDS(t, 1, eds)

fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return h, nil
}

service := NewService(nil, getters.NewIPLDGetter(bs), fn)
newBlob, err := service.GetAll(ctx, 1, []share.Namespace{nid})
require.NoError(t, err)
require.Len(t, newBlob, 1)
require.True(t, newBlob[0].compareCommitments(blobs[0].Commitment))
}

// BenchmarkGetByCommitment-12 1869 571663 ns/op 1085371 B/op 6414 allocs/op
func BenchmarkGetByCommitment(b *testing.B) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down

0 comments on commit 72111f4

Please sign in to comment.