Skip to content

Commit

Permalink
test(nodebuilder/tests): Archival blob sync + the most convoluted fx …
Browse files Browse the repository at this point in the history
…magic you will ever see
  • Loading branch information
renaynay committed May 6, 2024
1 parent 63d4c02 commit 586f97e
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 94 deletions.
24 changes: 18 additions & 6 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,23 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// even if pruning is not enabled.
return fx.Options(
baseComponents,
fx.Supply(light.Window),
fx.Provide(func() pruner.AvailabilityWindow {
return light.Window
}),
)
case node.Full:
return fx.Options(
baseComponents,
fx.Supply(archival.Window),
fx.Provide(func() pruner.AvailabilityWindow {
return archival.Window
}),
)
case node.Bridge:
return fx.Options(
baseComponents,
fx.Supply(archival.Window),
fx.Provide(func() pruner.AvailabilityWindow {
return archival.Window
}),
fx.Provide(func() []core.Option {
return []core.Option{}
}),
Expand Down Expand Up @@ -69,15 +75,19 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
fx.Supply(full.Window),
fx.Provide(func() pruner.AvailabilityWindow {
return full.Window
}),
)
case node.Bridge:
return fx.Module("prune",
baseComponents,
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
fx.Supply(full.Window),
fx.Provide(func() pruner.AvailabilityWindow {
return full.Window
}),
fx.Provide(func(window pruner.AvailabilityWindow) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window)}
}),
Expand All @@ -86,7 +96,9 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// in which case, this can be enabled.
case node.Light:
return fx.Module("prune",
fx.Supply(light.Window),
fx.Provide(func() pruner.AvailabilityWindow {
return light.Window
}),
)
default:
panic("unknown node type")
Expand Down
122 changes: 76 additions & 46 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package share
import (
"context"
"fmt"
"time"

"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.uber.org/fx"
"time"

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/sync"
Expand Down Expand Up @@ -43,6 +44,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
peerManagerComponents(tp, cfg),
fullDiscoveryComponents(cfg),
shrexSubComponents(),
shrexGetterComponents(cfg),
archivalComponents(cfg),
)

Expand All @@ -51,7 +53,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
shrexServerComponents(cfg),
edsStoreComponents(cfg),
fullAvailabilityComponents(),
shrexGetterComponents(cfg),
advertiseHooks(),
fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn {
return shrexSub.Broadcast
}),
Expand Down Expand Up @@ -87,7 +89,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return fx.Module(
"share",
baseComponents,
shrexGetterComponents(cfg),
lightAvailabilityComponents(cfg),
fx.Invoke(ensureEmptyEDSInBS),
fx.Provide(getters.NewIPLDGetter),
Expand All @@ -98,8 +99,9 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return nil
}
}),
// needed to invoke archival discovery
fx.Invoke(func(_ []*disc.Discovery) {}),
// needed to invoke archival discovery lifecycle as no other
// component takes it.
fx.Invoke(func(_ map[string]*disc.Discovery) {}),
)
default:
panic("invalid node type")
Expand Down Expand Up @@ -263,15 +265,7 @@ func edsStoreComponents(cfg *Config) fx.Option {

func fullAvailabilityComponents() fx.Option {
return fx.Options(
fx.Provide(fx.Annotate(
full.NewShareAvailability,
fx.OnStart(func(ctx context.Context, avail *full.ShareAvailability) error {
return avail.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, avail *full.ShareAvailability) error {
return avail.Stop(ctx)
}),
)),
fx.Provide(full.NewShareAvailability),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
}),
Expand Down Expand Up @@ -305,46 +299,45 @@ func archivalComponents(cfg *Config) fx.Option {
}

func archivalDiscovery(cfg *Config) fx.Option {
return fx.Provide(func(
lc fx.Lifecycle,
tp node.Type,
pruneCfg *modprune.Config,
d *disc.Discovery,
h host.Host,
r routing.ContentRouting,
opt disc.Option,
) ([]*disc.Discovery, error) {
// if pruner is enabled for BN, no archival service is necessary
if tp == node.Bridge && pruneCfg.EnableService {
// TODO @renaynay: remove bc pruned nodes should still be able to query archival
// only full node discovery is needed
return []*disc.Discovery{d}, nil
}
return fx.Options(
fx.Provide(func(
lc fx.Lifecycle,
tp node.Type,
pruneCfg *modprune.Config,
d *disc.Discovery,
h host.Host,
r routing.ContentRouting,
opt disc.Option,
) (map[string]*disc.Discovery, error) {
// if pruner is enabled for BN, no archival service is necessary
if tp == node.Bridge && pruneCfg.EnableService {
return map[string]*disc.Discovery{"full": d}, nil
}

archivalDisc, err := disc.NewDiscovery(
cfg.Discovery,
h,
routingdisc.NewRoutingDiscovery(r),
archivalNodesTag,
opt,
)
if err != nil {
return nil, err
}
archivalDisc, err := disc.NewDiscovery(
cfg.Discovery,
h,
routingdisc.NewRoutingDiscovery(r),
archivalNodesTag,
opt,
)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStart: archivalDisc.Start,
OnStop: archivalDisc.Stop,
})
lc.Append(fx.Hook{
OnStart: archivalDisc.Start,
OnStop: archivalDisc.Stop,
})

return []*disc.Discovery{d, archivalDisc}, nil
})
return map[string]*disc.Discovery{"full": d, "archival": archivalDisc}, nil
}),
)
}

func archivalPeerManager() fx.Option {
return fx.Provide(func(
lc fx.Lifecycle,
pruneCfg *modprune.Config,
params peers.Parameters,
h host.Host,
gater *conngater.BasicConnectionGater,
Expand All @@ -362,3 +355,40 @@ func archivalPeerManager() fx.Option {
return opts, disc.WithOnPeersUpdate(archivalPeerManager.UpdateNodePool), nil
})
}

// advertiseHooks provides the lifecycle hooks for both `full` and, if
// applicable, `archival` discovery.
func advertiseHooks() fx.Option {
return fx.Invoke(func(lc fx.Lifecycle, tp node.Type, cfg *modprune.Config, discs map[string]*disc.Discovery) error {
if tp == node.Light {
// do not advertise on `full` or `archival` topics
return nil
}

// start advertising on `full` node topic as both FNs and BNs (regardless of pruning status)
// are considered `full` on the celestia DA network
fullDisc, ok := discs["full"]
if !ok {
return fmt.Errorf("expected full discovery component to be provided")
}
lc.Append(fx.Hook{
OnStart: fullDisc.StartAdvertising,
OnStop: fullDisc.StopAdvertising,
})

// if the BN or FN is not in pruning mode, it is `archival`, so
// advertise on the `archival` topic
if !cfg.EnableService {
archivalDisc, ok := discs["archival"]
if !ok {
return fmt.Errorf("expected archival discovery component to be provided")
}
lc.Append(fx.Hook{
OnStart: archivalDisc.StartAdvertising,
OnStop: archivalDisc.StopAdvertising,
})
}

return nil
})
}
9 changes: 2 additions & 7 deletions nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package tests
import (
"bytes"
"context"
"go.uber.org/fx"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder"
"github.com/celestiaorg/celestia-node/nodebuilder/das"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
Expand Down Expand Up @@ -62,7 +61,7 @@ func TestArchivalBlobSync(t *testing.T) {

testAvailWindow := pruner.AvailabilityWindow(time.Millisecond)
prunerOpts := fx.Options(
fxutil.ReplaceAs(testAvailWindow, new(pruner.AvailabilityWindow)),
fx.Replace(testAvailWindow),
)

err = archivalBN.Stop(ctx)
Expand Down Expand Up @@ -93,12 +92,9 @@ func TestArchivalBlobSync(t *testing.T) {
root share.DataHash
}

t.Log("89")

archivalBlobs := make([]*archivalBlob, 0)
i := 1
for {
t.Log("getting block by height", "height", i)
eh, err := archivalFN.HeaderServ.GetByHeight(ctx, uint64(i))
require.NoError(t, err)

Expand All @@ -111,7 +107,6 @@ func TestArchivalBlobSync(t *testing.T) {
require.NoError(t, err)
ns, err := share.NamespaceFromBytes(shr[:share.NamespaceSize])
require.NoError(t, err)
t.Log("SHAREEEEEEE: ", ns.String())

blobs, err := archivalFN.BlobServ.GetAll(ctx, uint64(i), []share.Namespace{ns})
require.NoError(t, err)
Expand Down
22 changes: 0 additions & 22 deletions share/availability/full/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
)

var log = logging.Logger("share/full")
Expand All @@ -24,38 +23,17 @@ var log = logging.Logger("share/full")
type ShareAvailability struct {
store *eds.Store
getter share.Getter
disc []*discovery.Discovery

cancel context.CancelFunc
}

// NewShareAvailability creates a new full ShareAvailability.
func NewShareAvailability(
store *eds.Store,
getter share.Getter,
disc []*discovery.Discovery,
) *ShareAvailability {
return &ShareAvailability{
store: store,
getter: getter,
disc: disc,
}
}

func (fa *ShareAvailability) Start(context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
fa.cancel = cancel

for _, disc := range fa.disc {
go disc.Advertise(ctx)
}

return nil
}

func (fa *ShareAvailability) Stop(context.Context) error {
fa.cancel()
return nil
}

// SharesAvailable reconstructs the data committed to the given Root by requesting
Expand Down
12 changes: 2 additions & 10 deletions share/availability/full/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"time"

"github.com/ipfs/go-datastore"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share"
Expand Down Expand Up @@ -44,13 +42,7 @@ func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability {
params := discovery.DefaultParameters()
params.AdvertiseInterval = time.Second
params.PeersLimit = 10
disc, err := discovery.NewDiscovery(
params,
nil,
routing.NewRoutingDiscovery(routinghelpers.Null{}),
"full",
)
require.NoError(t, err)

store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore())
require.NoError(t, err)
err = store.Start(context.Background())
Expand All @@ -60,5 +52,5 @@ func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability {
err = store.Stop(context.Background())
require.NoError(t, err)
})
return NewShareAvailability(store, getter, []*discovery.Discovery{disc})
return NewShareAvailability(store, getter)
}
5 changes: 2 additions & 3 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,9 @@ func (sg *ShrexGetter) getPeer(
ctx context.Context,
header *header.ExtendedHeader,
) (libpeer.ID, peers.DoneFunc, error) {
log.Errorw("getPeer called with", "availabilityWindow", sg.availabilityWindow)
if sg.archivalPeerManager != nil && !pruner.IsWithinAvailabilityWindow(header.Time(), sg.availabilityWindow) {
log.Errorw("RYAN THIS LOG SHOULD HIT --- HI HELLO")
return sg.archivalPeerManager.Peer(ctx, header.DAH.Hash(), header.Height())
p, df, err := sg.archivalPeerManager.Peer(ctx, header.DAH.Hash(), header.Height())
return p, df, err
}
return sg.fullPeerManager.Peer(ctx, header.DAH.Hash(), header.Height())
}
Loading

0 comments on commit 586f97e

Please sign in to comment.