Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Bridge node short-circuits storing a historical EDS if pruning is enabled #3283

Merged
merged 9 commits into from
Apr 10, 2024
52 changes: 40 additions & 12 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
)
Expand All @@ -23,6 +25,8 @@ type Exchange struct {
store *eds.Store
construct header.ConstructFn

availabilityWindow pruner.AvailabilityWindow

metrics *exchangeMetrics
}

Expand All @@ -32,9 +36,9 @@ func NewExchange(
construct header.ConstructFn,
opts ...Option,
) (*Exchange, error) {
p := new(params)
p := defaultParams()
for _, opt := range opts {
opt(p)
opt(&p)
}

var (
Expand All @@ -49,10 +53,11 @@ func NewExchange(
}

return &Exchange{
fetcher: fetcher,
store: store,
construct: construct,
metrics: metrics,
fetcher: fetcher,
store: store,
construct: construct,
availabilityWindow: p.availabilityWindow,
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -150,11 +155,11 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
&block.Height, hash, eh.Hash())
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
err = ce.storeEDS(ctx, eh, eds, adder)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for height %d: %w", &block.Height, err)
return nil, err
}

return eh, nil
}

Expand Down Expand Up @@ -190,10 +195,33 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
err = ce.storeEDS(ctx, eh, eds, adder)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for block height %d: %w", b.Header.Height, err)
return nil, err
}

return eh, nil
}

func (ce *Exchange) storeEDS(
ctx context.Context,
eh *header.ExtendedHeader,
eds *rsmt2d.ExtendedDataSquare,
adder *ipld.ProofsAdder,
) error {
walldiss marked this conversation as resolved.
Show resolved Hide resolved
if !pruner.IsWithinAvailabilityWindow(eh.Time(), ce.availabilityWindow) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}

ctx = ipld.CtxWithProofsAdder(ctx, adder) // TODO @renaynay: should we short-circuit this if pruning enabled
renaynay marked this conversation as resolved.
Show resolved Hide resolved
// && historic?
walldiss marked this conversation as resolved.
Show resolved Hide resolved

err := storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
if err != nil {
return fmt.Errorf("storing EDS to eds.Store for block height %d: %w", eh.Height(), err)
}

log.Debugw("stored EDS for height", "height", eh.Height())
return nil
}
142 changes: 131 additions & 11 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package core

import (
"bytes"
"context"
"testing"
"time"

"github.com/cosmos/cosmos-sdk/client/flags"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
Expand All @@ -13,6 +15,8 @@ import (
"github.com/celestiaorg/celestia-app/test/util/testnode"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
)

Expand All @@ -21,11 +25,10 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
t.Cleanup(cancel)

cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)
cfg.ChainID = testChainID
fetcher, cctx := createCoreFetcher(t, cfg)

// generate 10 blocks
generateBlocks(t, fetcher)
generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)

Expand All @@ -39,18 +42,67 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

to := uint64(10)
to := uint64(30)
expectedFirstHeightInRange := genHeader.Height() + 1
expectedLastHeightInRange := to - 1
expectedLenHeaders := to - expectedFirstHeightInRange

// request headers from height 1 to 10 [2:10)
// request headers from height 1 to 20 [2:30)
headers, err := ce.GetRangeByHeight(context.Background(), genHeader, to)
require.NoError(t, err)

assert.Len(t, headers, int(expectedLenHeaders))
assert.Equal(t, expectedFirstHeightInRange, headers[0].Height())
assert.Equal(t, expectedLastHeightInRange, headers[len(headers)-1].Height())

for _, h := range headers {
has, err := store.Has(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.True(t, has)
}
}

// TestExchange_DoNotStoreHistoric tests that the CoreExchange will not
// store EDSs that are historic if pruning is enabled.
func TestExchange_DoNotStoreHistoric(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

cfg := DefaultTestConfig()
cfg.ChainID = testChainID
fetcher, cctx := createCoreFetcher(t, cfg)

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)

ce, err := NewExchange(
fetcher,
store,
header.MakeExtendedHeader,
WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)), // all blocks will be "historic"
)
require.NoError(t, err)

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

headers, err := ce.GetRangeByHeight(ctx, genHeader, 30)
require.NoError(t, err)

// ensure none of the "historic" EDSs were stored
for _, h := range headers {
if bytes.Equal(h.DataHash, share.EmptyRoot().Hash()) {
continue
}
has, err := store.Has(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.False(t, has)
}
}

func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testnode.Context) {
Expand All @@ -68,14 +120,82 @@ func createStore(t *testing.T) *eds.Store {
storeCfg := eds.DefaultParameters()
store, err := eds.NewStore(storeCfg, t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = store.Start(ctx)
require.NoError(t, err)

// store an empty square to initialize EDS store
eds := share.EmptyExtendedDataSquare()
err = store.Put(ctx, share.EmptyRoot().Hash(), eds)
require.NoError(t, err)

t.Cleanup(func() {
err = store.Stop(ctx)
require.NoError(t, err)
})

return store
}

func generateBlocks(t *testing.T, fetcher *BlockFetcher) {
sub, err := fetcher.SubscribeNewBlockEvent(context.Background())
require.NoError(t, err)
// fillBlocks fills blocks until the context is canceled.
func fillBlocks(
t *testing.T,
ctx context.Context,
cfg *testnode.Config,
cctx testnode.Context,
) {
for {
select {
case <-ctx.Done():
return
default:
}

_, err := cctx.FillBlock(16, cfg.Accounts, flags.BroadcastBlock)
require.NoError(t, err)
}
}

for i := 0; i < 10; i++ {
<-sub
// generateNonEmptyBlocks generates at least 20 non-empty blocks
func generateNonEmptyBlocks(
t *testing.T,
ctx context.Context,
fetcher *BlockFetcher,
cfg *testnode.Config,
cctx testnode.Context,
) []share.DataHash {
// generate several non-empty blocks
generateCtx, generateCtxCancel := context.WithCancel(context.Background())

sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
defer func() {
err = fetcher.UnsubscribeNewBlockEvent(ctx)
require.NoError(t, err)
}()

go fillBlocks(t, generateCtx, cfg, cctx)

hashes := make([]share.DataHash, 0, 20)

i := 0
for i < 20 {
select {
case b, ok := <-sub:
require.True(t, ok)

if !bytes.Equal(b.Data.Hash(), share.EmptyRoot().Hash()) {
hashes = append(hashes, share.DataHash(b.Data.Hash()))
i++
}
case <-ctx.Done():
t.Fatal("failed to fill blocks within timeout")
}
}
generateCtxCancel()

return hashes
}
43 changes: 25 additions & 18 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
Expand All @@ -37,8 +38,9 @@ var (
type Listener struct {
fetcher *BlockFetcher

construct header.ConstructFn
store *eds.Store
construct header.ConstructFn
store *eds.Store
availabilityWindow pruner.AvailabilityWindow

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
Expand All @@ -60,9 +62,9 @@ func NewListener(
blocktime time.Duration,
opts ...Option,
) (*Listener, error) {
p := new(params)
p := defaultParams()
for _, opt := range opts {
opt(p)
opt(&p)
}

var (
Expand All @@ -77,21 +79,22 @@ func NewListener(
}

return &Listener{
fetcher: fetcher,
headerBroadcaster: bcast,
hashBroadcaster: hashBroadcaster,
construct: construct,
store: store,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
fetcher: fetcher,
headerBroadcaster: bcast,
hashBroadcaster: hashBroadcaster,
construct: construct,
store: store,
availabilityWindow: p.availabilityWindow,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
}, nil
}

// Start kicks off the Listener listener loop.
func (cl *Listener) Start(context.Context) error {
if cl.cancel != nil {
return errors.New("listener: already started")
return fmt.Errorf("listener: already started")
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -221,11 +224,15 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
panic(fmt.Errorf("making extended header: %w", err))
}

// attempt to store block data if not empty
ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, b.Header.DataHash.Bytes(), eds, cl.store)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
// only store EDS if the header is within the availability window
if pruner.IsWithinAvailabilityWindow(eh.Time(), cl.availabilityWindow) {
// attempt to store block data if not empty
ctx = ipld.CtxWithProofsAdder(ctx, adder)

err = storeEDS(ctx, b.Header.DataHash.Bytes(), eds, cl.store)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
}
}

syncing, err := cl.fetcher.IsSyncing(ctx)
Expand Down
Loading
Loading