Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Bridge node skips storing EDS if historic on sync #3261

Closed
wants to merge 12 commits into from
Closed
2 changes: 0 additions & 2 deletions api/docgen/openrpc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Package docgen generates an OpenRPC spec for the Celestia Node. It has been inspired by and
// adapted from Filecoin's Lotus API implementation.

//nolint:revive
package docgen

import (
Expand Down Expand Up @@ -163,7 +161,7 @@
}

// remove the default implementation from the method descriptions
appReflector.FnGetMethodDescription = func(r reflect.Value, m reflect.Method, funcDecl *ast.FuncDecl) (string, error) {

Check warning on line 164 in api/docgen/openrpc.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

unused-parameter: parameter 'r' seems to be unused, consider removing or renaming it as _ (revive)
if v, ok := permissions[m.Name]; ok {
return "Auth level: " + v, nil
}
Expand All @@ -172,14 +170,14 @@

appReflector.FnGetMethodName = func(
moduleName string,
r reflect.Value,

Check warning on line 173 in api/docgen/openrpc.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

unused-parameter: parameter 'r' seems to be unused, consider removing or renaming it as _ (revive)
m reflect.Method,
funcDecl *ast.FuncDecl,

Check warning on line 175 in api/docgen/openrpc.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

unused-parameter: parameter 'funcDecl' seems to be unused, consider removing or renaming it as _ (revive)
) (string, error) {
return moduleName + "." + m.Name, nil
}

appReflector.FnGetMethodSummary = func(r reflect.Value, m reflect.Method, funcDecl *ast.FuncDecl) (string, error) {

Check warning on line 180 in api/docgen/openrpc.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

unused-parameter: parameter 'r' seems to be unused, consider removing or renaming it as _ (revive)
if v, ok := comments[extractPackageNameFromAPIMethod(m)+m.Name]; ok {
return v, nil
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/cel-shed/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
var p2pNewKeyCmd = &cobra.Command{
Use: "new-key",
Short: "Generate and print new Ed25519 private key for p2p networking",
RunE: func(cmd *cobra.Command, _ []string) error { //nolint:revive
RunE: func(cmd *cobra.Command, _ []string) error {

Check warning on line 25 in cmd/cel-shed/p2p.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

unused-parameter: parameter 'cmd' seems to be unused, consider removing or renaming it as _ (revive)
privkey, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return err
Expand All @@ -42,7 +42,7 @@
var p2pPeerIDCmd = &cobra.Command{
Use: "peer-id",
Short: "Get peer-id out of public or private Ed25519 key",
RunE: func(cmd *cobra.Command, args []string) error { //nolint:revive
RunE: func(cmd *cobra.Command, args []string) error {

Check warning on line 45 in cmd/cel-shed/p2p.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

unused-parameter: parameter 'cmd' seems to be unused, consider removing or renaming it as _ (revive)
decKey, err := hex.DecodeString(args[0])
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/pruner"
"github.com/celestiaorg/celestia-node/nodebuilder/rpc"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
)
Expand All @@ -22,6 +23,7 @@ func NewBridge(options ...func(*cobra.Command, []*pflag.FlagSet)) *cobra.Command
rpc.Flags(),
gateway.Flags(),
state.Flags(),
pruner.Flags(),
}
cmd := &cobra.Command{
Use: "bridge [subcommand]",
Expand Down Expand Up @@ -72,6 +74,7 @@ func NewFull(options ...func(*cobra.Command, []*pflag.FlagSet)) *cobra.Command {
rpc.Flags(),
gateway.Flags(),
state.Flags(),
pruner.Flags(),
}
cmd := &cobra.Command{
Use: "full [subcommand]",
Expand Down
26 changes: 19 additions & 7 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/pruner"
rpc_cfg "github.com/celestiaorg/celestia-node/nodebuilder/rpc"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
"github.com/celestiaorg/celestia-node/share"
Expand Down Expand Up @@ -105,13 +106,6 @@ func PersistentPreRunEnv(cmd *cobra.Command, nodeType node.Type, _ []string) err
return err
}

if nodeType != node.Bridge {
err = header.ParseFlags(cmd, &cfg.Header)
if err != nil {
return err
}
}

ctx, err = ParseMiscFlags(ctx, cmd)
if err != nil {
return err
Expand All @@ -121,6 +115,24 @@ func PersistentPreRunEnv(cmd *cobra.Command, nodeType node.Type, _ []string) err
gateway.ParseFlags(cmd, &cfg.Gateway)
state.ParseFlags(cmd, &cfg.State)

switch nodeType {
case node.Light:
err = header.ParseFlags(cmd, &cfg.Header)
if err != nil {
return err
}
case node.Full:
err = header.ParseFlags(cmd, &cfg.Header)
if err != nil {
return err
}
pruner.ParseFlags(cmd, &cfg.Pruner)
case node.Bridge:
pruner.ParseFlags(cmd, &cfg.Pruner)
default:
panic(fmt.Sprintf("invalid node type: %v", nodeType))
}

// set config
ctx = WithNodeConfig(ctx, &cfg)
cmd.SetContext(ctx)
Expand Down
49 changes: 37 additions & 12 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
)
Expand All @@ -23,6 +25,8 @@ type Exchange struct {
store *eds.Store
construct header.ConstructFn

availabilityWindow pruner.AvailabilityWindow

metrics *exchangeMetrics
}

Expand All @@ -32,9 +36,9 @@ func NewExchange(
construct header.ConstructFn,
opts ...Option,
) (*Exchange, error) {
p := new(params)
p := defaultParams()
for _, opt := range opts {
opt(p)
opt(&p)
}

var (
Expand All @@ -49,10 +53,11 @@ func NewExchange(
}

return &Exchange{
fetcher: fetcher,
store: store,
construct: construct,
metrics: metrics,
fetcher: fetcher,
store: store,
construct: construct,
availabilityWindow: p.availabilityWindow,
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -150,11 +155,14 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
&block.Height, hash, eh.Hash())
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
ctx = ipld.CtxWithProofsAdder(ctx, adder) // TODO @renaynay: should we short-circuit this if pruning enabled
// && historic?

err = ce.storeEDS(ctx, eh, eds)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for height %d: %w", &block.Height, err)
return nil, err
}

return eh, nil
}

Expand Down Expand Up @@ -190,10 +198,27 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
ctx = ipld.CtxWithProofsAdder(ctx, adder) // TODO @renaynay: refer to above comment ^

err = ce.storeEDS(ctx, eh, eds)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for block height %d: %w", b.Header.Height, err)
return nil, err
}

return eh, nil
}

func (ce *Exchange) storeEDS(ctx context.Context, eh *header.ExtendedHeader, eds *rsmt2d.ExtendedDataSquare) error {
if !pruner.IsWithinAvailabilityWindow(eh.Time(), ce.availabilityWindow) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}

err := storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
if err != nil {
return fmt.Errorf("storing EDS to eds.Store for block height %d: %w", eh.Height(), err)
}

log.Debugw("stored EDS for height", "height", eh.Height())
return nil
}
96 changes: 88 additions & 8 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package core

import (
"bytes"
"context"
"testing"
"time"

"github.com/cosmos/cosmos-sdk/client/flags"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
Expand All @@ -13,6 +15,8 @@ import (
"github.com/celestiaorg/celestia-app/test/util/testnode"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
)

Expand All @@ -22,10 +26,10 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {

cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)
fetcher, cctx := createCoreFetcher(t, cfg)

// generate 10 blocks
generateBlocks(t, fetcher)
// generate several blocks
generateBlocks(t, fetcher, cfg, cctx)

store := createStore(t)

Expand All @@ -39,18 +43,69 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

to := uint64(10)
to := uint64(40) // ensures some blocks will be non-empty
expectedFirstHeightInRange := genHeader.Height() + 1
expectedLastHeightInRange := to - 1
expectedLenHeaders := to - expectedFirstHeightInRange

// request headers from height 1 to 10 [2:10)
// request headers from height 1 to 10 [2:35)
headers, err := ce.GetRangeByHeight(context.Background(), genHeader, to)
require.NoError(t, err)

assert.Len(t, headers, int(expectedLenHeaders))
assert.Equal(t, expectedFirstHeightInRange, headers[0].Height())
assert.Equal(t, expectedLastHeightInRange, headers[len(headers)-1].Height())

for _, h := range headers {
has, err := store.Has(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.True(t, has)
}
}

// TestExchange_DoNotStoreHistoric tests that the CoreExchange will not
// store EDSs that are historic if pruning is enabled.
func TestExchange_DoNotStoreHistoric(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, cctx := createCoreFetcher(t, cfg)

// generate 10 blocks
generateBlocks(t, fetcher, cfg, cctx)

store := createStore(t)

ce, err := NewExchange(
fetcher,
store,
header.MakeExtendedHeader,
WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)), // all blocks will be "historic"
)
require.NoError(t, err)

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

// ensures some blocks will be non-empty
headers, err := ce.GetRangeByHeight(ctx, genHeader, 40)
require.NoError(t, err)

// ensure none of the "historic" EDSs were stored
for _, h := range headers {
if bytes.Equal(h.DataHash, share.EmptyRoot().Hash()) {
continue
}
has, err := store.Has(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.False(t, has)
}
}

func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testnode.Context) {
Expand All @@ -68,14 +123,39 @@ func createStore(t *testing.T) *eds.Store {
storeCfg := eds.DefaultParameters()
store, err := eds.NewStore(storeCfg, t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = store.Start(ctx)
require.NoError(t, err)

// store an empty square to initialize EDS store
eds := share.EmptyExtendedDataSquare()
err = store.Put(ctx, share.EmptyRoot().Hash(), eds)
require.NoError(t, err)

t.Cleanup(func() {
err = store.Stop(ctx)
require.NoError(t, err)
})

return store
}

func generateBlocks(t *testing.T, fetcher *BlockFetcher) {
func generateBlocks(t *testing.T, fetcher *BlockFetcher, cfg *testnode.Config, cctx testnode.Context) {
sub, err := fetcher.SubscribeNewBlockEvent(context.Background())
require.NoError(t, err)

for i := 0; i < 10; i++ {
<-sub
i := 0
for i < 20 {
_, err := cctx.FillBlock(16, cfg.Accounts, flags.BroadcastBlock)
require.NoError(t, err)

b := <-sub
if bytes.Equal(b.Header.DataHash, share.EmptyRoot().Hash()) {
continue
}
i++
}
}
Loading
Loading