Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(share/availability/light | share/availability/full): Availability implementations are aware of sampling window, removed from DASer #3957

Merged
merged 10 commits into from
Dec 10, 2024
Merged
3 changes: 2 additions & 1 deletion core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func storeEDS(
eds *rsmt2d.ExtendedDataSquare,
store *store.Store,
window time.Duration,
archival bool,
) error {
if !availability.IsWithinWindow(eh.Time(), window) {
if !archival && !availability.IsWithinWindow(eh.Time(), window) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}
Expand Down
6 changes: 4 additions & 2 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Exchange struct {
construct header.ConstructFn

availabilityWindow time.Duration
archival bool

metrics *exchangeMetrics
}
Expand Down Expand Up @@ -54,6 +55,7 @@ func NewExchange(
store: store,
construct: construct,
availabilityWindow: p.availabilityWindow,
archival: p.archival,
metrics: metrics,
}, nil
}
Expand Down Expand Up @@ -147,7 +149,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
&block.Height, hash, eh.Hash())
}

err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.archival)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,7 +189,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}

err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.archival)
if err != nil {
return nil, err
}
Expand Down
49 changes: 49 additions & 0 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,55 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {
}
}

// TestExchange_StoreHistoricIfArchival makes sure blocks are stored past
// sampling window if archival is enabled
func TestExchange_StoreHistoricIfArchival(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

cfg := DefaultTestConfig()
fetcher, cctx := createCoreFetcher(t, cfg)

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
require.NoError(t, err)

ce, err := NewExchange(
fetcher,
store,
header.MakeExtendedHeader,
WithAvailabilityWindow(time.Nanosecond), // all blocks will be "historic"
WithArchivalMode(), // make sure to store them anyway
)
require.NoError(t, err)

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

headers, err := ce.GetRangeByHeight(ctx, genHeader, 30)
require.NoError(t, err)

// ensure all "historic" EDSs were stored
for _, h := range headers {
has, err := store.HasByHeight(ctx, h.Height())
require.NoError(t, err)
assert.True(t, has)

// empty EDSs are expected to exist in the store, so we skip them
if h.DAH.Equals(share.EmptyEDSRoots()) {
continue
}
has, err = store.HasByHash(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.True(t, has)
}
}

func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testnode.Context) {
cctx := StartTestNodeWithConfig(t, cfg)
// wait for height 2 in order to be able to start submitting txs (this prevents
Expand Down
4 changes: 3 additions & 1 deletion core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Listener struct {
construct header.ConstructFn
store *store.Store
availabilityWindow time.Duration
archival bool

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
Expand Down Expand Up @@ -83,6 +84,7 @@ func NewListener(
construct: construct,
store: store,
availabilityWindow: p.availabilityWindow,
archival: p.archival,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
Expand Down Expand Up @@ -237,7 +239,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
panic(fmt.Errorf("making extended header: %w", err))
}

err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow)
err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow, cl.archival)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
}
Expand Down
8 changes: 8 additions & 0 deletions core/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ type params struct {
metrics bool
chainID string
availabilityWindow time.Duration
archival bool
renaynay marked this conversation as resolved.
Show resolved Hide resolved
}

func defaultParams() params {
return params{
availabilityWindow: time.Duration(0),
archival: false,
}
}

Expand All @@ -39,3 +41,9 @@ func WithAvailabilityWindow(window time.Duration) Option {
p.availabilityWindow = window
}
}

func WithArchivalMode() Option {
return func(p *params) {
p.archival = true
}
}
14 changes: 0 additions & 14 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,12 @@ import (

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)

var log = logging.Logger("das")

// errOutsideSamplingWindow is an error used to inform
// the caller of Sample that the given header is outside
// the sampling window.
var errOutsideSamplingWindow = fmt.Errorf("skipping header outside of sampling window")

// DASer continuously validates availability of data committed to headers.
type DASer struct {
params Parameters
Expand Down Expand Up @@ -160,14 +154,6 @@ func (d *DASer) Stop(ctx context.Context) error {
}

func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
// short-circuit if pruning is enabled and the header is outside the
// availability window
if !availability.IsWithinWindow(h.Time(), d.params.samplingWindow) {
log.Debugw("skipping header outside sampling window", "height", h.Height(),
"time", h.Time())
return errOutsideSamplingWindow
}

err := d.da.SharesAvailable(ctx, h)
if err != nil {
var byzantineErr *byzantine.ErrByzantine
Expand Down
41 changes: 0 additions & 41 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package das
import (
"context"
"fmt"
"strconv"
"testing"
"time"

Expand All @@ -20,7 +19,6 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/availability/mocks"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)
Expand Down Expand Up @@ -243,45 +241,6 @@ func TestDASerSampleTimeout(t *testing.T) {
}
}

// TestDASer_SamplingWindow tests the sampling window determination
// for headers.
func TestDASer_SamplingWindow(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
sub := new(headertest.Subscriber)
fserv := &fraudtest.DummyService[*header.ExtendedHeader]{}
getter := getterStub{}
avail := mocks.NewMockAvailability(gomock.NewController(t))

// create and start DASer
daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1),
WithSamplingWindow(time.Second))
require.NoError(t, err)

tests := []struct {
timestamp time.Time
withinWindow bool
}{
{timestamp: time.Now().Add(-(time.Second * 5)), withinWindow: false},
{timestamp: time.Now().Add(-(time.Millisecond * 800)), withinWindow: true},
{timestamp: time.Now().Add(-(time.Hour)), withinWindow: false},
{timestamp: time.Now().Add(-(time.Hour * 24 * 30)), withinWindow: false},
{timestamp: time.Now(), withinWindow: true},
}

for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
eh := headertest.RandExtendedHeader(t)
eh.RawHeader.Time = tt.timestamp

assert.Equal(
t,
tt.withinWindow,
availability.IsWithinWindow(eh.Time(), daser.params.samplingWindow),
)
})
}
}

// createDASerSubcomponents takes numGetter (number of headers
// to store in mockGetter) and numSub (number of headers to store
// in the mock header.Subscriber), returning a newly instantiated
Expand Down
13 changes: 0 additions & 13 deletions das/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ type Parameters struct {
// divided between parallel workers. SampleTimeout should be adjusted proportionally to
// ConcurrencyLimit.
SampleTimeout time.Duration

// samplingWindow determines the time window that headers should fall into
// in order to be sampled. If set to 0, the sampling window will include
// all headers.
samplingWindow time.Duration
}

// DefaultParameters returns the default configuration values for the daser parameters
Expand Down Expand Up @@ -161,11 +156,3 @@ func WithSampleTimeout(sampleTimeout time.Duration) Option {
d.params.SampleTimeout = sampleTimeout
}
}

// WithSamplingWindow is a functional option to configure the DASer's
// `samplingWindow` parameter.
func WithSamplingWindow(samplingWindow time.Duration) Option {
return func(d *DASer) {
d.params.samplingWindow = samplingWindow
}
}
5 changes: 3 additions & 2 deletions das/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)

Expand Down Expand Up @@ -83,7 +84,7 @@ func (w *worker) run(ctx context.Context, timeout time.Duration, resultCh chan<-
// sampling worker will resume upon restart
return
}
if errors.Is(err, errOutsideSamplingWindow) {
if errors.Is(err, availability.ErrOutsideSamplingWindow) {
skipped++
err = nil
}
Expand Down Expand Up @@ -119,7 +120,7 @@ func (w *worker) sample(ctx context.Context, timeout time.Duration, height uint6
defer cancel()

err = w.sampleFn(ctx, h)
if errors.Is(err, errOutsideSamplingWindow) {
if errors.Is(err, availability.ErrOutsideSamplingWindow) {
// if header is outside sampling window, do not log
// or record it.
return err
Expand Down
4 changes: 0 additions & 4 deletions nodebuilder/das/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/header"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
Expand Down Expand Up @@ -45,11 +44,8 @@ func newDASer(
batching datastore.Batching,
fraudServ fraud.Service[*header.ExtendedHeader],
bFn shrexsub.BroadcastFn,
availWindow modshare.Window,
options ...das.Option,
) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer, *header.ExtendedHeader], error) {
options = append(options, das.WithSamplingWindow(availWindow.Duration()))

ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...)
if err != nil {
return nil, nil, err
Expand Down
13 changes: 7 additions & 6 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share/availability"
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
)
Expand Down Expand Up @@ -59,33 +60,33 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
)
}
return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
)
case node.Bridge:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Provide(func(window modshare.Window) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window.Duration())}
}),
fx.Supply([]fullavail.Option{}),
fx.Supply([]core.Option{}),
)
}
return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Provide(func() []core.Option {
return []core.Option{}
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply([]core.Option{core.WithArchivalMode()}),
)
default:
panic("unknown node type")
Expand Down
Loading
Loading