Skip to content

Commit

Permalink
feat(share/availability): persist random samples selection in availab…
Browse files Browse the repository at this point in the history
…ility call (celestiaorg#3239)

This PR introduces persistence for sample selection in random sampling.
It addresses the issue by storing all failed samples into the datastore,
allowing them to be reloaded on the next sampling attempt. This ensures
that if the availability call fails fully or partially during the last
sampling attempt, the sampling retry will use the same preselected
random coordinates of shares.

Provided solution is backwards compatible with previously stored empty
byte slice on sampling success, allowing the change to be non-breaking
for existing storage.


Additionally, this PR includes basic refactoring to simplify concurrency
logic in availability. It also ensures that errors returned by the call
are aligned with the interface declaration in
[availability.go](https://github.com/celestiaorg/celestia-node/blob/main/share/availability.go)
enhancing code consistency and maintainability.

Resolves celestiaorg#2780
  • Loading branch information
walldiss authored and renaynay committed Apr 3, 2024
1 parent 8c35cc6 commit 7fb8fdc
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 82 deletions.
97 changes: 54 additions & 43 deletions share/availability/light/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
"github.com/ipfs/go-datastore/namespace"
ipldFormat "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/header"
Expand Down Expand Up @@ -67,76 +66,88 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header
return nil
}

// do not sample over Root that has already been sampled
// load snapshot of the last sampling errors from disk
key := rootKey(dah)

la.dsLk.RLock()
exists, err := la.ds.Has(ctx, key)
last, err := la.ds.Get(ctx, key)
la.dsLk.RUnlock()
if err != nil || exists {

// Check for error cases
var samples []Sample
switch {
case err == nil && len(last) == 0:
// Availability has already been validated
return nil
case err != nil && !errors.Is(err, datastore.ErrNotFound):
// Other error occurred
return err
case errors.Is(err, datastore.ErrNotFound):
// No sampling result found, select new samples
samples, err = SampleSquare(len(dah.RowRoots), int(la.params.SampleAmount))
if err != nil {
return err
}
default:
// Sampling result found, unmarshal it
samples, err = decodeSamples(last)
if err != nil {
return err
}
}

log.Debugw("validate availability", "root", dah.String())
// We assume the caller of this method has already performed basic validation on the
// given dah/root. If for some reason this has not happened, the node should panic.
if err := dah.ValidateBasic(); err != nil {
log.Errorw("availability validation cannot be performed on a malformed DataAvailabilityHeader",
"err", err)
panic(err)
}
samples, err := SampleSquare(len(dah.RowRoots), int(la.params.SampleAmount))
if err != nil {
log.Errorw("DAH validation failed", "error", err)
return err
}

// indicate to the share.Getter that a blockservice session should be created. This
// functionality is optional and must be supported by the used share.Getter.
ctx = getters.WithSession(ctx)

var (
failedSamplesLock sync.Mutex
failedSamples []Sample
)

log.Debugw("starting sampling session", "root", dah.String())
errs := make(chan error, len(samples))
var wg sync.WaitGroup
for _, s := range samples {
wg.Add(1)
go func(s Sample) {
log.Debugw("fetching share", "root", dah.String(), "row", s.Row, "col", s.Col)
_, err := la.getter.GetShare(ctx, header, s.Row, s.Col)
defer wg.Done()
// check if the sample is available
_, err := la.getter.GetShare(ctx, header, int(s.Row), int(s.Col))
if err != nil {
log.Debugw("error fetching share", "root", dah.String(), "row", s.Row, "col", s.Col)
}
// we don't really care about Share bodies at this point
// it also means we now saved the Share in local storage
select {
case errs <- err:
case <-ctx.Done():
failedSamplesLock.Lock()
failedSamples = append(failedSamples, s)
failedSamplesLock.Unlock()
}
}(s)
}
wg.Wait()

for range samples {
var err error
select {
case err = <-errs:
case <-ctx.Done():
err = ctx.Err()
}

if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
log.Errorw("availability validation failed", "root", dah.String(), "err", err.Error())
if ipldFormat.IsNotFound(err) || errors.Is(err, context.DeadlineExceeded) {
return share.ErrNotAvailable
}
return err
}
if errors.Is(ctx.Err(), context.Canceled) {
// Availability did not complete due to context cancellation, return context error instead of share.ErrNotAvailable
return ctx.Err()
}

// store the result of the sampling session
bs := encodeSamples(failedSamples)
la.dsLk.Lock()
err = la.ds.Put(ctx, key, []byte{})
err = la.ds.Put(ctx, key, bs)
la.dsLk.Unlock()
if err != nil {
log.Errorw("storing root of successful SharesAvailable request to disk", "err", err)
log.Errorw("Failed to store sampling result", "error", err)
}

// if any of the samples failed, return an error
if len(failedSamples) > 0 {
log.Errorw("availability validation failed",
"root", dah.String(),
"failed_samples", failedSamples,
)
return share.ErrNotAvailable
}
return nil
}
Expand Down
82 changes: 48 additions & 34 deletions share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/header/headertest"
Expand All @@ -26,16 +25,18 @@ func TestSharesAvailableCaches(t *testing.T) {

// cache doesn't have dah yet
has, err := avail.ds.Has(ctx, rootKey(dah))
assert.NoError(t, err)
assert.False(t, has)
require.NoError(t, err)
require.False(t, has)

err = avail.SharesAvailable(ctx, eh)
assert.NoError(t, err)
require.NoError(t, err)

// is now cached
has, err = avail.ds.Has(ctx, rootKey(dah))
assert.NoError(t, err)
assert.True(t, has)
// is now stored success result
result, err := avail.ds.Get(ctx, rootKey(dah))
require.NoError(t, err)
failed, err := decodeSamples(result)
require.NoError(t, err)
require.Empty(t, failed)
}

func TestSharesAvailableHitsCache(t *testing.T) {
Expand All @@ -45,19 +46,16 @@ func TestSharesAvailableHitsCache(t *testing.T) {
getter, _ := GetterWithRandSquare(t, 16)
avail := TestAvailability(getter)

// create new dah, that is not available by getter
bServ := ipld.NewMemBlockservice()
dah := availability_test.RandFillBS(t, 16, bServ)
eh := headertest.RandExtendedHeaderWithRoot(t, dah)

// blockstore doesn't actually have the dah
err := avail.SharesAvailable(ctx, eh)
require.Error(t, err)

// cache doesn't have dah yet, since it errored
has, err := avail.ds.Has(ctx, rootKey(dah))
assert.NoError(t, err)
assert.False(t, has)
require.ErrorIs(t, err, share.ErrNotAvailable)

// put success result in cache
err = avail.ds.Put(ctx, rootKey(dah), []byte{})
require.NoError(t, err)

Expand All @@ -75,31 +73,47 @@ func TestSharesAvailableEmptyRoot(t *testing.T) {

eh := headertest.RandExtendedHeaderWithRoot(t, share.EmptyRoot())
err := avail.SharesAvailable(ctx, eh)
assert.NoError(t, err)
require.NoError(t, err)
}

func TestSharesAvailable(t *testing.T) {
func TestSharesAvailableFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

getter, dah := GetterWithRandSquare(t, 16)
getter, _ := GetterWithRandSquare(t, 16)
avail := TestAvailability(getter)
err := avail.SharesAvailable(ctx, dah)
assert.NoError(t, err)
}

func TestSharesAvailableFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create new dah, that is not available by getter
bServ := ipld.NewMemBlockservice()
dah := availability_test.RandFillBS(t, 16, bServ)
eh := headertest.RandExtendedHeaderWithRoot(t, dah)

getter, _ := GetterWithRandSquare(t, 16)
avail := TestAvailability(getter)
// blockstore doesn't actually have the dah, so it should fail
err := avail.SharesAvailable(ctx, eh)
assert.Error(t, err)
require.ErrorIs(t, err, share.ErrNotAvailable)

// cache should have failed results now
result, err := avail.ds.Get(ctx, rootKey(dah))
require.NoError(t, err)

failed, err := decodeSamples(result)
require.NoError(t, err)
require.Len(t, failed, int(avail.params.SampleAmount))

// ensure that retry persists the failed samples selection
// create new getter with only the failed samples available, and add them to the onceGetter
onceGetter := newOnceGetter()
onceGetter.AddSamples(failed)

// replace getter with the new one
avail.getter = onceGetter

// should be able to retrieve all the failed samples now
err = avail.SharesAvailable(ctx, eh)
require.NoError(t, err)

// onceGetter should have no more samples stored after the call
require.Empty(t, onceGetter.available)
}

func TestShareAvailableOverMocknet_Light(t *testing.T) {
Expand All @@ -114,7 +128,7 @@ func TestShareAvailableOverMocknet_Light(t *testing.T) {
net.ConnectAll()

err := nd.SharesAvailable(ctx, eh)
assert.NoError(t, err)
require.NoError(t, err)
}

func TestGetShare(t *testing.T) {
Expand All @@ -127,8 +141,8 @@ func TestGetShare(t *testing.T) {
for i := range make([]bool, n) {
for j := range make([]bool, n) {
sh, err := getter.GetShare(ctx, eh, i, j)
assert.NotNil(t, sh)
assert.NoError(t, err)
require.NotNil(t, sh)
require.NoError(t, err)
}
}
}
Expand Down Expand Up @@ -163,14 +177,14 @@ func TestService_GetSharesByNamespace(t *testing.T) {
require.NoError(t, err)
require.NoError(t, shares.Verify(root, randNamespace))
flattened := shares.Flatten()
assert.Len(t, flattened, tt.expectedShareCount)
require.Len(t, flattened, tt.expectedShareCount)
for _, value := range flattened {
assert.Equal(t, randNamespace, share.GetNamespace(value))
require.Equal(t, randNamespace, share.GetNamespace(value))
}
if tt.expectedShareCount > 1 {
// idx1 is always smaller than idx2
assert.Equal(t, randShares[idx1], flattened[0])
assert.Equal(t, randShares[idx2], flattened[1])
require.Equal(t, randShares[idx1], flattened[0])
require.Equal(t, randShares[idx2], flattened[1])
}
})
t.Run("last two rows of a 4x4 square that have the same namespace have valid NMT proofs", func(t *testing.T) {
Expand Down
34 changes: 31 additions & 3 deletions share/availability/light/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package light

import (
crand "crypto/rand"
"encoding/binary"
"errors"
"math/big"
)

// Sample is a point in 2D space over square.
type Sample struct {
Row, Col int
Row, Col uint16
}

// SampleSquare randomly picks *num* unique points from the given *width* square
Expand Down Expand Up @@ -66,11 +68,37 @@ func (ss *squareSampler) samples() []Sample {
return samples
}

func randInt(max int) int {
func randInt(max int) uint16 {
n, err := crand.Int(crand.Reader, big.NewInt(int64(max)))
if err != nil {
panic(err) // won't panic as rand.Reader is endless
}

return int(n.Int64())
return uint16(n.Uint64())
}

// encodeSamples encodes a slice of samples into a byte slice using little endian encoding.
func encodeSamples(samples []Sample) []byte {
bs := make([]byte, 0, len(samples)*4)
for _, s := range samples {
bs = binary.LittleEndian.AppendUint16(bs, s.Row)
bs = binary.LittleEndian.AppendUint16(bs, s.Col)
}
return bs
}

// decodeSamples decodes a byte slice into a slice of samples.
func decodeSamples(bs []byte) ([]Sample, error) {
if len(bs)%4 != 0 {
return nil, errors.New("invalid byte slice length")
}

samples := make([]Sample, 0, len(bs)/4)
for i := 0; i < len(bs); i += 4 {
samples = append(samples, Sample{
Row: binary.LittleEndian.Uint16(bs[i : i+2]),
Col: binary.LittleEndian.Uint16(bs[i+2 : i+4]),
})
}
return samples, nil
}
4 changes: 2 additions & 2 deletions share/availability/light/sample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestSampleSquare(t *testing.T) {
assert.Len(t, ss, tt.samples)
// check points are within width
for _, s := range ss {
assert.Less(t, s.Row, tt.width)
assert.Less(t, s.Col, tt.width)
assert.Less(t, int(s.Row), tt.width)
assert.Less(t, int(s.Col), tt.width)
}
// checks samples are not equal
for i, s1 := range ss {
Expand Down
Loading

0 comments on commit 7fb8fdc

Please sign in to comment.