diff --git a/context/config_test.go b/context/config_test.go index dd4d400e19..4dd90ce66a 100644 --- a/context/config_test.go +++ b/context/config_test.go @@ -4,22 +4,76 @@ package context import ( - "encoding/json" "testing" "github.com/stretchr/testify/require" ) -func TestConfig(t *testing.T) { - r := require.New(t) - c := make(Config) +type testConfig struct { + TxFee uint64 `json:"txFee"` + MinFee uint64 `json:"minFee"` +} - type VMConfig struct { - TxFee uint64 `json:"txFee"` +func TestConfigC(t *testing.T) { + type test struct { + name string + providedStr string + defaultConfig testConfig + wantConfig testConfig + } + for _, test := range []test{ + { + name: "default want non-zero values", + providedStr: "", + defaultConfig: testConfig{TxFee: 100}, + wantConfig: testConfig{TxFee: 100}, + }, + { + name: "default want zero values", + providedStr: "", + defaultConfig: testConfig{}, + wantConfig: testConfig{}, + }, + { + name: "override default with zero values", + providedStr: `{ + "test": { + "txFee": 0, + "minFee": 0 + } + }`, + defaultConfig: testConfig{TxFee: 100, MinFee: 100}, + wantConfig: testConfig{TxFee: 0, MinFee: 0}, + }, + { + name: "override non-zero defaults", + providedStr: `{ + "test": { + "txFee": 1000, + "minFee": 1000 + } + }`, + defaultConfig: testConfig{TxFee: 100, MinFee: 100}, + wantConfig: testConfig{TxFee: 1000, MinFee: 1000}, + }, + { + name: "override one default value", + providedStr: `{ + "test": { + "txFee": 1000 + } + }`, + defaultConfig: testConfig{TxFee: 100, MinFee: 100}, + wantConfig: testConfig{TxFee: 1000, MinFee: 100}, + }, + } { + t.Run(test.name, func(t *testing.T) { + r := require.New(t) + c, err := NewConfig([]byte(test.providedStr)) + r.NoError(err) + testConfig, err := GetConfig(c, "test", test.defaultConfig) + r.NoError(err) + r.Equal(test.wantConfig, testConfig) + }) } - configStr := `{"vm": {"txFee": 1000}}` - r.NoError(json.Unmarshal([]byte(configStr), &c)) - vmConfig, err := GetConfig(c, "vm", VMConfig{}) - r.NoError(err) - r.Equal(uint64(1000), vmConfig.TxFee) } diff --git a/examples/morpheusvm/go.mod b/examples/morpheusvm/go.mod index 47dfd92c7b..1d40751e46 100644 --- a/examples/morpheusvm/go.mod +++ b/examples/morpheusvm/go.mod @@ -102,8 +102,8 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d // indirect + github.com/neilotoole/errgroup v0.1.6 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect - github.com/openzipkin/zipkin-go v0.4.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/pires/go-proxyproto v0.6.2 // indirect @@ -138,7 +138,6 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.22.0 // indirect - go.opentelemetry.io/otel/exporters/zipkin v1.11.2 // indirect go.opentelemetry.io/otel/metric v1.22.0 // indirect go.opentelemetry.io/otel/sdk v1.22.0 // indirect go.opentelemetry.io/otel/trace v1.22.0 // indirect diff --git a/lifecycle/ready.go b/lifecycle/ready.go index 67406f9d19..af8709bc07 100644 --- a/lifecycle/ready.go +++ b/lifecycle/ready.go @@ -32,6 +32,15 @@ func (c *ChanReady) Ready() bool { } } +func (c *ChanReady) AwaitReady(done <-chan struct{}) bool { + select { + case <-c.ready: + return true + case <-done: + return false + } +} + func (c *ChanReady) MarkReady() { c.readyOnce.Do(func() { close(c.ready) }) } diff --git a/snow/application.go b/snow/application.go index 15069e9e6d..34740cae21 100644 --- a/snow/application.go +++ b/snow/application.go @@ -9,9 +9,9 @@ import ( "github.com/ava-labs/avalanchego/api/health" "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/hypersdk/event" "github.com/ava-labs/hypersdk/lifecycle" - "github.com/ava-labs/hypersdk/statesync" ) type Application[I Block, O Block, A Block] struct { @@ -21,8 +21,7 @@ type Application[I Block, O Block, A Block] struct { Handlers map[string]http.Handler HealthChecker health.Checker Network *p2p.Network - StateSyncClient *statesync.Client[*StatefulBlock[I, O, A]] - StateSyncServer *statesync.Server[*StatefulBlock[I, O, A]] + StateSyncableVM block.StateSyncableVM Closers []func() error Ready *lifecycle.AtomicBoolReady @@ -36,6 +35,10 @@ type Application[I Block, O Block, A Block] struct { PreReadyAcceptedSubs []event.Subscription[I] } +func (a *Application[I, O, A]) GetCovariantVM() *CovariantVM[I, O, A] { + return a.vm.covariantVM +} + func (a *Application[I, O, A]) WithAcceptedSub(sub ...event.Subscription[A]) { a.AcceptedSubs = append(a.AcceptedSubs, sub...) } diff --git a/snow/statesync.go b/snow/statesync.go index edde24f715..a3b4912fac 100644 --- a/snow/statesync.go +++ b/snow/statesync.go @@ -7,85 +7,13 @@ import ( "context" "fmt" - "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" - "github.com/ava-labs/avalanchego/x/merkledb" - hcontext "github.com/ava-labs/hypersdk/context" - "github.com/ava-labs/hypersdk/statesync" - "github.com/prometheus/client_golang/prometheus" ) var _ block.StateSyncableVM = (*VM[Block, Block, Block])(nil) -func (a *Application[I, O, A]) WithStateSyncableVM( - client *statesync.Client[*StatefulBlock[I, O, A]], - server *statesync.Server[*StatefulBlock[I, O, A]], -) { - a.StateSyncClient = client - a.StateSyncServer = server -} - -type StateSyncConfig struct { - MinBlocks uint64 `json:"minBlocks"` - Parallelism int `json:"parallelism"` -} - -func NewDefaultStateSyncConfig() StateSyncConfig { - return StateSyncConfig{ - MinBlocks: 768, - Parallelism: 4, - } -} - -func GetStateSyncConfig(ctx *hcontext.Context) (StateSyncConfig, error) { - return hcontext.GetConfigFromContext(ctx, "statesync", NewDefaultStateSyncConfig()) -} - -func (a *Application[I, O, A]) WithStateSyncer( - db database.Database, - stateDB merkledb.MerkleDB, - rangeProofHandlerID uint64, - changeProofHandlerID uint64, - branchFactor merkledb.BranchFactor, -) error { - server := statesync.NewServer[*StatefulBlock[I, O, A]]( - a.vm.log, - a.vm.covariantVM, - ) - a.StateSyncServer = server - - syncerRegistry := prometheus.NewRegistry() - if err := a.vm.snowCtx.Metrics.Register("syncer", syncerRegistry); err != nil { - return err - } - stateSyncConfig, err := GetStateSyncConfig(a.vm.hctx) - if err != nil { - return err - } - client := statesync.NewClient[*StatefulBlock[I, O, A]]( - a.vm.covariantVM, - a.vm.snowCtx.Log, - syncerRegistry, - db, - stateDB, - a.Network, - rangeProofHandlerID, - changeProofHandlerID, - branchFactor, - stateSyncConfig.MinBlocks, - stateSyncConfig.Parallelism, - ) - a.StateSyncClient = client - a.OnNormalOperationStarted = append(a.OnNormalOperationStarted, client.StartBootstrapping) - // Note: this is not perfect because we may need to get a notification of a block between finishing state sync - // and when the engine/VM has received the notification and switched over. - // a.WithPreReadyAcceptedSub(event.SubscriptionFunc[I]{ - // NotifyF: func(ctx context.Context, block I) error { - // _, err := client.UpdateSyncTarget(block) - // return err - // }, - // }) - return statesync.RegisterHandlers(a.vm.log, a.Network, rangeProofHandlerID, changeProofHandlerID, stateDB) +func (a *Application[I, O, A]) WithStateSyncableVM(stateSyncableVM block.StateSyncableVM) { + a.StateSyncableVM = stateSyncableVM } // StartStateSync marks the VM as "not ready" so that blocks are verified / accepted vaccuously @@ -141,21 +69,21 @@ func (v *VM[I, O, A]) FinishStateSync(ctx context.Context, input I, output O, ac } func (v *VM[I, O, A]) StateSyncEnabled(ctx context.Context) (bool, error) { - return v.app.StateSyncClient.StateSyncEnabled(ctx) + return v.app.StateSyncableVM.StateSyncEnabled(ctx) } func (v *VM[I, O, A]) GetOngoingSyncStateSummary(ctx context.Context) (block.StateSummary, error) { - return v.app.StateSyncClient.GetOngoingSyncStateSummary(ctx) + return v.app.StateSyncableVM.GetOngoingSyncStateSummary(ctx) } func (v *VM[I, O, A]) GetLastStateSummary(ctx context.Context) (block.StateSummary, error) { - return v.app.StateSyncServer.GetLastStateSummary(ctx) + return v.app.StateSyncableVM.GetLastStateSummary(ctx) } func (v *VM[I, O, A]) ParseStateSummary(ctx context.Context, summaryBytes []byte) (block.StateSummary, error) { - return v.app.StateSyncClient.ParseStateSummary(ctx, summaryBytes) + return v.app.StateSyncableVM.ParseStateSummary(ctx, summaryBytes) } func (v *VM[I, O, A]) GetStateSummary(ctx context.Context, summaryHeight uint64) (block.StateSummary, error) { - return v.app.StateSyncServer.GetStateSummary(ctx, summaryHeight) + return v.app.StateSyncableVM.GetStateSummary(ctx, summaryHeight) } diff --git a/snow/vm.go b/snow/vm.go index fe31380af8..5902d18eaf 100644 --- a/snow/vm.go +++ b/snow/vm.go @@ -26,7 +26,6 @@ import ( "github.com/ava-labs/hypersdk/event" "github.com/ava-labs/hypersdk/internal/cache" "github.com/ava-labs/hypersdk/lifecycle" - "go.uber.org/zap" ) var ( @@ -285,7 +284,7 @@ func (v *VM[I, O, A]) SetState(ctx context.Context, state snow.State) error { } return nil case snow.NormalOp: - v.log.Info("Starting normal operation", zap.Bool("stateSyncStarted", v.app.StateSyncClient.Started())) + v.log.Info("Starting normal operation") for _, startNormalOpF := range v.app.OnNormalOperationStarted { if err := startNormalOpF(ctx); err != nil { return err diff --git a/statesync/block.go b/statesync/block.go index 16b4b078db..0bb8865c7d 100644 --- a/statesync/block.go +++ b/statesync/block.go @@ -19,15 +19,14 @@ type StateSummaryBlock interface { Height() uint64 Bytes() []byte GetStateRoot() ids.ID - AcceptSyncTarget(context.Context) error } type SyncableBlock[T StateSummaryBlock] struct { container T - accepter Accepter[T] // accepter is nil if the SyncableBlock is constructed by the server + accepter *Client[T] } -func NewSyncableBlock[T StateSummaryBlock](container T, accepter Accepter[T]) *SyncableBlock[T] { +func NewSyncableBlock[T StateSummaryBlock](container T, accepter *Client[T]) *SyncableBlock[T] { return &SyncableBlock[T]{ container: container, accepter: accepter, @@ -50,10 +49,6 @@ func (sb *SyncableBlock[T]) Accept(ctx context.Context) (block.StateSyncMode, er return sb.accepter.Accept(ctx, sb.container) } -func (sb *SyncableBlock[T]) AcceptSyncTarget(ctx context.Context) error { - return sb.container.AcceptSyncTarget(ctx) -} - func (sb *SyncableBlock[T]) String() string { return sb.container.String() } diff --git a/statesync/block_window_syncer.go b/statesync/block_window_syncer.go new file mode 100644 index 0000000000..b87f382b2e --- /dev/null +++ b/statesync/block_window_syncer.go @@ -0,0 +1,62 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package statesync + +import ( + "context" + "fmt" + "sync" +) + +var _ Syncer[interface{}] = (*BlockWindowSyncer[interface{}])(nil) + +type BlockSyncer[T any] interface { + Accept(ctx context.Context, block T) (bool, error) +} + +type BlockWindowSyncer[T any] struct { + syncer BlockSyncer[T] + doneOnce sync.Once + done chan struct{} +} + +func NewBlockWindowSyncer[T any](syncer BlockSyncer[T]) *BlockWindowSyncer[T] { + return &BlockWindowSyncer[T]{ + syncer: syncer, + done: make(chan struct{}), + } +} + +func (b *BlockWindowSyncer[T]) Start(ctx context.Context, target T) error { + done, err := b.syncer.Accept(ctx, target) + if done { + b.doneOnce.Do(func() { + close(b.done) + }) + } + return err +} + +func (b *BlockWindowSyncer[T]) Wait(ctx context.Context) error { + select { + case <-b.done: + return nil + case <-ctx.Done(): + return fmt.Errorf("failed to await full block window: %w", ctx.Err()) + } +} + +func (b *BlockWindowSyncer[T]) Close() error { + return nil +} + +func (b *BlockWindowSyncer[T]) UpdateSyncTarget(ctx context.Context, target T) error { + done, err := b.syncer.Accept(ctx, target) + if done { + b.doneOnce.Do(func() { + close(b.done) + }) + } + return err +} diff --git a/statesync/client.go b/statesync/client.go index 132da87e56..4aeac2b240 100644 --- a/statesync/client.go +++ b/statesync/client.go @@ -6,21 +6,14 @@ package statesync import ( "context" "errors" - "sync" "github.com/ava-labs/avalanchego/database" - "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/x/merkledb" - "github.com/prometheus/client_golang/prometheus" + "github.com/neilotoole/errgroup" "go.uber.org/zap" - - avasync "github.com/ava-labs/avalanchego/x/sync" ) -var _ Accepter[StateSummaryBlock] = (*Client[StateSummaryBlock])(nil) - var isSyncing = []byte("is_syncing") type ChainClient[T StateSummaryBlock] interface { @@ -28,325 +21,138 @@ type ChainClient[T StateSummaryBlock] interface { ParseBlock(ctx context.Context, bytes []byte) (T, error) } -type Accepter[T StateSummaryBlock] interface { - Accept(ctx context.Context, block T) (block.StateSyncMode, error) +type Syncer[T any] interface { + Start(ctx context.Context, target T) error + Wait(ctx context.Context) error + Close() error + UpdateSyncTarget(ctx context.Context, target T) error } type Client[T StateSummaryBlock] struct { - chain ChainClient[T] - log logging.Logger - registerer prometheus.Registerer - db database.Database - merkleDB merkledb.MerkleDB - network *p2p.Network - rangeProofHandlerID, changeProofHandlerID uint64 - syncManager *avasync.Manager - merkleBranchFactor merkledb.BranchFactor - minBlocks uint64 - simultaneousWorkLimit int - - // tracks the sync target so we can update last accepted - // block when sync completes. - target T - targetUpdated bool - - // State Sync results - init bool - startedSync bool - stateSyncErr error - doneOnce sync.Once - done chan struct{} + log logging.Logger + chain ChainClient[T] + db database.Database + syncers []Syncer[T] + onStart func(context.Context) error + onFinish func(context.Context) error + minBlocks uint64 + mustStateSync bool } -func NewClient[T StateSummaryBlock]( - chain ChainClient[T], +func NewAggregateClient[T StateSummaryBlock]( log logging.Logger, - registerer prometheus.Registerer, + chain ChainClient[T], db database.Database, - merkleDB merkledb.MerkleDB, - network *p2p.Network, - rangeProofHandlerID uint64, - changeProofHandlerID uint64, - merkleBranchFactor merkledb.BranchFactor, + syncers []Syncer[T], + onStart func(context.Context) error, + onFinish func(context.Context) error, minBlocks uint64, - simultaneousWorkLimit int, -) *Client[T] { - return &Client[T]{ - chain: chain, - log: log, - registerer: registerer, - db: db, - merkleDB: merkleDB, - network: network, - rangeProofHandlerID: rangeProofHandlerID, - changeProofHandlerID: changeProofHandlerID, - merkleBranchFactor: merkleBranchFactor, - minBlocks: minBlocks, - simultaneousWorkLimit: simultaneousWorkLimit, - done: make(chan struct{}), +) (*Client[T], error) { + c := &Client[T]{ + log: log, + chain: chain, + db: db, + syncers: syncers, + minBlocks: minBlocks, + onStart: onStart, + onFinish: onFinish, + } + var err error + c.mustStateSync, err = c.GetDiskIsSyncing() + if err != nil { + return nil, err } + return c, nil } -func (*Client[T]) StateSyncEnabled(context.Context) (bool, error) { - // We always start the state syncer and may fallback to normal bootstrapping - // if we are close to tip. - // - // There is no way to trigger a full bootstrap from genesis. - return true, nil -} +func (c *Client[T]) StateSyncEnabled(context.Context) (bool, error) { return true, nil } -func (*Client[T]) GetOngoingSyncStateSummary( - context.Context, -) (block.StateSummary, error) { - // Because the history of MerkleDB change proofs tends to be short, we always - // restart syncing from scratch. - // - // This is unlike other DB implementations where roots are persisted - // indefinitely (and it means we can continue from where we left off). +func (c *Client[T]) GetOngoingSyncStateSummary(context.Context) (block.StateSummary, error) { return nil, database.ErrNotFound } -func (s *Client[T]) ParseStateSummary(ctx context.Context, bytes []byte) (block.StateSummary, error) { - sb, err := s.chain.ParseBlock(ctx, bytes) +func (c *Client[T]) ParseStateSummary(ctx context.Context, bytes []byte) (block.StateSummary, error) { + blk, err := c.chain.ParseBlock(ctx, bytes) if err != nil { return nil, err } - summary := NewSyncableBlock(sb, s) - s.log.Info("parsed state summary", zap.Stringer("summary", summary)) + summary := NewSyncableBlock(blk, c) + c.log.Info("parsed state summary", zap.Stringer("summary", summary)) return summary, nil } -func (s *Client[T]) Accept( +func (c *Client[T]) Accept( ctx context.Context, - sb T, + target T, ) (block.StateSyncMode, error) { - s.init = true - s.log.Info("accepted syncable block", - zap.Uint64("height", sb.Height()), - zap.Stringer("blockID", sb.ID()), - ) - - // If we did not finish syncing, we must state sync. - syncing, err := s.GetDiskIsSyncing() - if err != nil { - s.log.Warn("could not determine if syncing", zap.Error(err)) - return block.StateSyncSkipped, err - } - lastAcceptedBlk := s.chain.LastAcceptedBlock(ctx) - if !syncing && (lastAcceptedBlk.Height()+s.minBlocks > sb.Height()) { - s.log.Info( - "bypassing state sync", - zap.Uint64("lastAccepted", lastAcceptedBlk.Height()), - zap.Uint64("syncableHeight", sb.Height()), - ) - s.startedSync = true - - // We trigger [done] immediately so we let the engine know we are - // synced as soon as the [ValidityWindow] worth of txs are verified. - s.doneOnce.Do(func() { - close(s.done) - }) - - // Even when we do normal bootstrapping, we mark syncing as dynamic to - // ensure we fill [vm.seen] before transitioning to normal operation. - // - // If there is no last accepted block above genesis, we will perform normal - // bootstrapping before transitioning into normal operation. - return block.StateSyncDynamic, nil + c.log.Info("Accepting state sync", zap.Stringer("target", target)) + lastAcceptedBlk := c.chain.LastAcceptedBlock(ctx) + if !c.mustStateSync && lastAcceptedBlk.Height()+c.minBlocks > target.Height() { + c.log.Info("Skipping state sync", zap.Stringer("lastAccepted", lastAcceptedBlk), zap.Stringer("target", target)) + return block.StateSyncSkipped, nil } - // When state syncing after restart (whether successful or not), we restart - // from scratch. - // - // MerkleDB will handle clearing any keys on-disk that are no - // longer necessary. - s.target = sb - s.log.Info( - "starting state sync", - zap.Uint64("height", s.target.Height()), - zap.Stringer("summary", sb), - zap.Bool("already syncing", syncing), - ) - s.startedSync = true - - s.syncManager, err = avasync.NewManager(avasync.ManagerConfig{ - BranchFactor: s.merkleBranchFactor, - DB: s.merkleDB, - RangeProofClient: s.network.NewClient(s.rangeProofHandlerID), - ChangeProofClient: s.network.NewClient(s.changeProofHandlerID), - SimultaneousWorkLimit: s.simultaneousWorkLimit, - Log: s.log, - TargetRoot: sb.GetStateRoot(), - }, s.registerer) - if err != nil { - return block.StateSyncSkipped, err - } + c.log.Info("Starting state sync", zap.Stringer("lastAccepted", lastAcceptedBlk), zap.Stringer("target", target)) + return block.StateSyncDynamic, c.startDynamicStateSync(ctx, target) +} - // Persist that the node has started syncing. - // - // This is necessary since last accepted will be modified without - // the VM having state, so it must resume only in state-sync - // mode if interrupted. - // - // Since the sync will write directly into the state trie, - // the node cannot continue from the previous state once - // it starts state syncing. - if err := s.PutDiskIsSyncing(true); err != nil { - return block.StateSyncSkipped, err +func (c *Client[T]) startDynamicStateSync(ctx context.Context, target T) error { + if c.onStart != nil { + if err := c.onStart(ctx); err != nil { + return err + } } - - // Update the last accepted to the state target block, - // since we don't want bootstrapping to fetch all the blocks - // from genesis to the sync target. - if err := s.target.AcceptSyncTarget(context.Background()); err != nil { - return block.StateSyncSkipped, err + for _, syncer := range c.syncers { + if err := syncer.Start(ctx, target); err != nil { + return err + } } - // Kickoff state syncing from [s.target] - if err := s.syncManager.Start(context.Background()); err != nil { - s.log.Warn("not starting state syncing", zap.Error(err)) - return block.StateSyncSkipped, err + c.log.Info("Starting state syncer(s)", zap.Int("numSyncers", len(c.syncers))) + awaitCtx := context.WithoutCancel(ctx) + eg, egCtx := errgroup.WithContext(awaitCtx) + for _, syncer := range c.syncers { + eg.Go(func() error { + return syncer.Wait(egCtx) + }) } go func() { - // wait for the work to complete on this goroutine - // - // [syncManager] guarantees this will always return so it isn't possible to - // deadlock. - s.stateSyncErr = s.syncManager.Wait(context.Background()) - s.log.Info("state sync done", zap.Error(s.stateSyncErr)) - if s.stateSyncErr == nil { - // if the sync was successful, update the last accepted pointers. - s.stateSyncErr = s.finishSync() + c.log.Info("Waiting for state syncers to complete") + err := eg.Wait() + if err != nil { + c.log.Error("state sync failed", zap.Error(err)) + panic(err) } - // notify the engine the VM is ready to participate - // in voting and it can verify blocks. - // - // This function will send a message to the VM when it has processed at least - // [ValidityWindow] blocks. - s.doneOnce.Do(func() { - close(s.done) - }) - }() - // TODO: engine will mark VM as ready when we return - // [block.StateSyncDynamic]. This should change in v1.9.11. - return block.StateSyncDynamic, nil -} -// finishSync is responsible for updating disk and memory pointers -func (s *Client[T]) finishSync() error { - if s.targetUpdated { - // Will look like block on start accepted then last block before beginning - // bootstrapping is accepted. - // - // NOTE: There may be a number of verified but unaccepted blocks above this - // block. - if err := s.target.AcceptSyncTarget(context.Background()); err != nil { - return err + c.log.Info("state sync completed") + if c.onFinish != nil { + if err := c.onFinish(ctx); err != nil { + c.log.Error("state sync finish failed", zap.Error(err)) + return + } } - } - return s.PutDiskIsSyncing(false) -} - -func (s *Client[T]) Started() bool { - return s.startedSync -} - -var ErrStateSyncing = errors.New("state syncing") -func (s *Client[T]) StartBootstrapping(ctx context.Context) error { - // Ensure state sync client marks itself as done if it was never started - syncStarted := s.Started() - if syncStarted { - return nil - } - - // We must check if we finished syncing before starting bootstrapping. - // This should only ever occur if we began a state sync, restarted, and - // were unable to find any acceptable summaries. - syncing, err := s.GetDiskIsSyncing() - if err != nil { - s.log.Error("could not determine if syncing", zap.Error(err)) - return err - } - if syncing { - s.log.Error("cannot start bootstrapping", zap.Error(ErrStateSyncing)) - // This is a fatal error that will require retrying sync or deleting the - // node database. - return ErrStateSyncing - } - - // If we weren't previously syncing, we force state syncer completion so - // that the node will mark itself as ready. - s.ForceDone() + if err := c.PutDiskIsSyncing(false); err != nil { + c.log.Error("failed to mark state sync as complete", zap.Error(err)) + return + } + c.log.Info("state sync finished and marked itself complete") + }() - // TODO: add a config to FATAL here if could not state sync (likely won't be - // able to recover in networks where no one has the full state, bypass - // still starts sync): https://github.com/ava-labs/hypersdk/issues/438 return nil } -// ForceDone is used by the [VM] to skip the sync process or to close the -// channel if the sync process never started (i.e. [AcceptedSyncableBlock] will -// never be called) -func (s *Client[T]) ForceDone() { - if s.startedSync { - // If we started sync, we must wait for it to finish - return - } - s.doneOnce.Do(func() { - close(s.done) - }) -} - -func (s *Client[T]) Done() <-chan struct{} { - return s.done -} - -// Shutdown can be called to abort an ongoing sync. -func (s *Client[T]) Shutdown() error { - if s.syncManager != nil { - s.syncManager.Close() - <-s.done // wait for goroutine to exit - } - return s.stateSyncErr // will be nil if [syncManager] is nil -} - -// Error returns a non-nil error if one occurred during the sync. -func (s *Client[T]) Error() error { return s.stateSyncErr } - -func (s *Client[T]) Ready() bool { - select { - case <-s.done: - return true - default: - } - // If we have not yet invoked [AcceptedSyncableBlock] we should return - // false until it has been called or we invoke [ForceDone]. - if !s.init { - return false - } - // Cover the case where initialization failed - return s.syncManager == nil -} - -// UpdateSyncTarget returns a boolean indicating if the root was -// updated and an error if one occurred while updating the root. -func (s *Client[T]) UpdateSyncTarget(b T) (bool, error) { - err := s.syncManager.UpdateSyncTarget(b.GetStateRoot()) - if errors.Is(err, avasync.ErrAlreadyClosed) { - <-s.done // Wait for goroutine to exit for consistent return values with IsSyncing - return false, nil // Sync finished before update - } - if err != nil { - return false, err // Unexpected error +func (c *Client[T]) UpdateSyncTarget(ctx context.Context, target T) error { + for _, syncer := range c.syncers { + if err := syncer.UpdateSyncTarget(ctx, target); err != nil { + return err + } } - s.target = b // Remember the new target - s.targetUpdated = true // Set [targetUpdated] so we call SetLastAccepted on finish - return true, nil // Sync root target updated successfully + return nil } -func (s *Client[T]) GetDiskIsSyncing() (bool, error) { - v, err := s.db.Get(isSyncing) +func (c *Client[T]) GetDiskIsSyncing() (bool, error) { + v, err := c.db.Get(isSyncing) if errors.Is(err, database.ErrNotFound) { return false, nil } @@ -356,9 +162,9 @@ func (s *Client[T]) GetDiskIsSyncing() (bool, error) { return v[0] == 0x1, nil } -func (s *Client[T]) PutDiskIsSyncing(v bool) error { +func (c *Client[T]) PutDiskIsSyncing(v bool) error { if v { - return s.db.Put(isSyncing, []byte{0x1}) + return c.db.Put(isSyncing, []byte{0x1}) } - return s.db.Put(isSyncing, []byte{0x0}) + return c.db.Put(isSyncing, []byte{0x0}) } diff --git a/statesync/handler.go b/statesync/handler.go deleted file mode 100644 index 11f1afcd96..0000000000 --- a/statesync/handler.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package statesync - -import ( - "errors" - - "github.com/ava-labs/avalanchego/network/p2p" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/x/merkledb" - "github.com/ava-labs/avalanchego/x/sync" -) - -func RegisterHandlers( - log logging.Logger, - network *p2p.Network, - rangeProofHandlerID uint64, - changeProofHandlerID uint64, - db merkledb.MerkleDB, -) error { - return errors.Join( - network.AddHandler( - rangeProofHandlerID, - sync.NewGetRangeProofHandler(log, db), - ), - network.AddHandler( - changeProofHandlerID, - sync.NewGetChangeProofHandler(log, db), - ), - ) -} diff --git a/statesync/merkledb_adapter.go b/statesync/merkledb_adapter.go new file mode 100644 index 0000000000..bb65bec342 --- /dev/null +++ b/statesync/merkledb_adapter.go @@ -0,0 +1,105 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package statesync + +import ( + "context" + "errors" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/merkledb" + "github.com/ava-labs/avalanchego/x/sync" + "github.com/prometheus/client_golang/prometheus" +) + +const namespace = "merklesyncer" + +var _ Syncer[MerkleSyncerBlock] = (*MerkleSyncer[MerkleSyncerBlock])(nil) + +type MerkleSyncerBlock interface { + GetStateRoot() ids.ID +} + +type MerkleSyncer[T MerkleSyncerBlock] struct { + log logging.Logger + registerer prometheus.Registerer + merkleDB merkledb.MerkleDB + network *p2p.Network + rangeProofHandlerID, changeProofHandlerID uint64 + merkleBranchFactor merkledb.BranchFactor + simultaneousWorkLimit int + + syncManager *sync.Manager +} + +func NewMerkleSyncer[T MerkleSyncerBlock]( + log logging.Logger, + merkleDB merkledb.MerkleDB, + network *p2p.Network, + rangeProofHandlerID uint64, + changeProofHandlerID uint64, + merkleBranchFactor merkledb.BranchFactor, + simultaneousWorkLimit int, + registry prometheus.Registerer, +) (*MerkleSyncer[T], error) { + return &MerkleSyncer[T]{ + log: log, + registerer: registry, + merkleDB: merkleDB, + network: network, + rangeProofHandlerID: rangeProofHandlerID, + changeProofHandlerID: changeProofHandlerID, + merkleBranchFactor: merkleBranchFactor, + simultaneousWorkLimit: simultaneousWorkLimit, + }, nil +} + +func (m *MerkleSyncer[T]) Start(ctx context.Context, target T) error { + syncManager, err := sync.NewManager(sync.ManagerConfig{ + BranchFactor: m.merkleBranchFactor, + DB: m.merkleDB, + RangeProofClient: m.network.NewClient(m.rangeProofHandlerID), + ChangeProofClient: m.network.NewClient(m.changeProofHandlerID), + SimultaneousWorkLimit: m.simultaneousWorkLimit, + Log: m.log, + TargetRoot: target.GetStateRoot(), + }, m.registerer) + if err != nil { + return err + } + m.syncManager = syncManager + return m.syncManager.Start(ctx) +} + +func (m *MerkleSyncer[T]) Wait(ctx context.Context) error { return m.syncManager.Wait(ctx) } + +func (m *MerkleSyncer[T]) Close() error { + m.syncManager.Close() + return nil +} + +func (m *MerkleSyncer[T]) UpdateSyncTarget(_ context.Context, target T) error { + return m.syncManager.UpdateSyncTarget(target.GetStateRoot()) +} + +func RegisterHandlers( + log logging.Logger, + network *p2p.Network, + rangeProofHandlerID uint64, + changeProofHandlerID uint64, + db merkledb.MerkleDB, +) error { + return errors.Join( + network.AddHandler( + rangeProofHandlerID, + sync.NewGetRangeProofHandler(log, db), + ), + network.AddHandler( + changeProofHandlerID, + sync.NewGetChangeProofHandler(log, db), + ), + ) +} diff --git a/statesync/vm.go b/statesync/vm.go new file mode 100644 index 0000000000..b3b3bde86c --- /dev/null +++ b/statesync/vm.go @@ -0,0 +1,47 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package statesync + +import ( + "context" + + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" +) + +var _ block.StateSyncableVM = (*VM[StateSummaryBlock])(nil) + +type VM[T StateSummaryBlock] struct { + client *Client[T] + server *Server[T] +} + +func NewStateSyncableVM[T StateSummaryBlock]( + client *Client[T], + server *Server[T], +) *VM[T] { + return &VM[T]{ + client: client, + server: server, + } +} + +func (v *VM[T]) StateSyncEnabled(ctx context.Context) (bool, error) { + return v.client.StateSyncEnabled(ctx) +} + +func (v *VM[T]) GetOngoingSyncStateSummary(ctx context.Context) (block.StateSummary, error) { + return v.client.GetOngoingSyncStateSummary(ctx) +} + +func (v *VM[T]) GetLastStateSummary(ctx context.Context) (block.StateSummary, error) { + return v.server.GetLastStateSummary(ctx) +} + +func (v *VM[T]) ParseStateSummary(ctx context.Context, summaryBytes []byte) (block.StateSummary, error) { + return v.client.ParseStateSummary(ctx, summaryBytes) +} + +func (v *VM[T]) GetStateSummary(ctx context.Context, summaryHeight uint64) (block.StateSummary, error) { + return v.server.GetStateSummary(ctx, summaryHeight) +} diff --git a/vm/config.go b/vm/config.go index 90ef12a99e..cdb90cf6d0 100644 --- a/vm/config.go +++ b/vm/config.go @@ -24,9 +24,6 @@ type Config struct { StateIntermediateWriteBatchSize int `json:"stateIntermediateWriteBatchSize"` // how many bytes to write from intermediate cache at once ValueNodeCacheSize int `json:"valueNodeCacheSize"` // how many bytes to keep in value cache AcceptorSize int `json:"acceptorSize"` // how far back we can fall in processing accepted blocks - StateSyncParallelism int `json:"stateSyncParallelism"` - StateSyncMinBlocks uint64 `json:"stateSyncMinBlocks"` - StateSyncServerDelay time.Duration `json:"stateSyncServerDelay"` ParsedBlockCacheSize int `json:"parsedBlockCacheSize"` AcceptedBlockWindow int `json:"acceptedBlockWindow"` AcceptedBlockWindowCache int `json:"acceptedBlockWindowCache"` @@ -50,9 +47,6 @@ func NewConfig() Config { StateIntermediateWriteBatchSize: 4 * units.MiB, ValueNodeCacheSize: 2 * units.GiB, AcceptorSize: 64, - StateSyncParallelism: 4, - StateSyncMinBlocks: 768, // set to max int for archive nodes to ensure no skips - StateSyncServerDelay: 0, // used for testing ParsedBlockCacheSize: 128, AcceptedBlockWindow: 50_000, // ~3.5hr with 250ms block time (100GB at 2MB) AcceptedBlockWindowCache: 128, // 256MB at 2MB blocks diff --git a/vm/statesync.go b/vm/statesync.go new file mode 100644 index 0000000000..638ea4340f --- /dev/null +++ b/vm/statesync.go @@ -0,0 +1,117 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vm + +import ( + "context" + "fmt" + + "github.com/ava-labs/hypersdk/chain" + hcontext "github.com/ava-labs/hypersdk/context" + "github.com/ava-labs/hypersdk/internal/pebble" + "github.com/ava-labs/hypersdk/internal/validitywindow" + "github.com/ava-labs/hypersdk/snow" + "github.com/ava-labs/hypersdk/statesync" + "github.com/ava-labs/hypersdk/storage" +) + +const StateSyncNamespace = "statesync" + +type validityWindowAdapter struct { + *validitywindow.Syncer[*chain.Transaction] +} + +type StateSyncConfig struct { + MerkleSimultaneousWorkLimit int `json:"merkleSimultaneousWorkLimit"` + MinBlocks uint64 `json:"minBlocks"` +} + +func NewDefaultStateSyncConfig() StateSyncConfig { + return StateSyncConfig{ + MerkleSimultaneousWorkLimit: 4, + MinBlocks: 768, + } +} + +func (v validityWindowAdapter) Accept(ctx context.Context, blk *snow.StatefulBlock[*chain.ExecutionBlock, *chain.OutputBlock, *chain.OutputBlock]) (bool, error) { + return v.Syncer.Accept(ctx, blk.Input) +} + +func (vm *VM) initStateSync(ctx context.Context) error { + stateSyncConfig, err := hcontext.GetConfigFromContext(vm.snowInput.Context, StateSyncNamespace, NewDefaultStateSyncConfig()) + if err != nil { + return err + } + stateSyncRegistry, err := vm.snowInput.Context.MakeRegistry("statesync") + if err != nil { + return err + } + if err := statesync.RegisterHandlers( + vm.snowCtx.Log, + vm.network, + rangeProofHandlerID, + changeProofHandlerID, + vm.stateDB, + ); err != nil { + return err + } + + vm.syncer = validitywindow.NewSyncer(vm, vm.chainTimeValidityWindow, func(time int64) int64 { + return vm.ruleFactory.GetRules(time).GetValidityWindow() + }) + // blockWindowSyncer := statesync.NewBlockWindowSyncer[*snow.StatefulBlock[*chain.ExecutionBlock, *chain.OutputBlock, *chain.OutputBlock]](validityWindowAdapter{vm.syncer}) + + merkleSyncer, err := statesync.NewMerkleSyncer[*snow.StatefulBlock[*chain.ExecutionBlock, *chain.OutputBlock, *chain.OutputBlock]]( + vm.snowCtx.Log, + vm.stateDB, + vm.network, + rangeProofHandlerID, + changeProofHandlerID, + vm.genesis.GetStateBranchFactor(), + stateSyncConfig.MerkleSimultaneousWorkLimit, + stateSyncRegistry, + ) + if err != nil { + return err + } + + pebbleConfig := pebble.NewDefaultConfig() + syncerDB, err := storage.New(pebbleConfig, vm.snowCtx.ChainDataDir, syncerDB, stateSyncRegistry) + if err != nil { + return err + } + vm.snowApp.WithCloser(func() error { + if err := syncerDB.Close(); err != nil { + return fmt.Errorf("failed to close syncer db: %w", err) + } + return nil + }) + + covariantVM := vm.snowApp.GetCovariantVM() + client, err := statesync.NewAggregateClient[*snow.StatefulBlock[*chain.ExecutionBlock, *chain.OutputBlock, *chain.OutputBlock]]( + vm.snowCtx.Log, + covariantVM, + syncerDB, + []statesync.Syncer[*snow.StatefulBlock[*chain.ExecutionBlock, *chain.OutputBlock, *chain.OutputBlock]]{ + // blockWindowSyncer, + merkleSyncer, + }, + vm.snowApp.StartStateSync, + func(ctx context.Context) error { + outputBlock, err := vm.extractLatestOutputBlock(ctx) + if err != nil { + return fmt.Errorf("failed to extract latest output block while finishing state sync: %w", err) + } + return vm.snowApp.FinishStateSync(ctx, outputBlock.ExecutionBlock, outputBlock, outputBlock) + }, + stateSyncConfig.MinBlocks, + ) + if err != nil { + return err + } + server := statesync.NewServer[*snow.StatefulBlock[*chain.ExecutionBlock, *chain.OutputBlock, *chain.OutputBlock]](vm.snowCtx.Log, covariantVM) + stateSyncableVM := statesync.NewStateSyncableVM(client, server) + vm.snowApp.WithStateSyncableVM(stateSyncableVM) + return nil +} diff --git a/vm/vm.go b/vm/vm.go index f4e500e9ef..ee11080a1c 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -34,7 +34,6 @@ import ( "github.com/ava-labs/hypersdk/internal/validators" "github.com/ava-labs/hypersdk/internal/validitywindow" "github.com/ava-labs/hypersdk/internal/workers" - "github.com/ava-labs/hypersdk/lifecycle" hsnow "github.com/ava-labs/hypersdk/snow" "github.com/ava-labs/hypersdk/state" "github.com/ava-labs/hypersdk/storage" @@ -174,22 +173,6 @@ func (vm *VM) Initialize( vm.network = snowApp.Network - pebbleConfig := pebble.NewDefaultConfig() - rawStateDBRegistry := prometheus.NewRegistry() - if err := vm.snowCtx.Metrics.Register("rawstatedb", rawStateDBRegistry); err != nil { - return nil, fmt.Errorf("failed to register rawstatedb metrics: %w", err) - } - vm.rawStateDB, err = storage.New(pebbleConfig, vm.snowCtx.ChainDataDir, stateDB, rawStateDBRegistry) - if err != nil { - return nil, err - } - snowApp.WithCloser(func() error { - if err := vm.rawStateDB.Close(); err != nil { - return fmt.Errorf("failed to close raw state db: %w", err) - } - return nil - }) - vm.genesis, vm.ruleFactory, err = vm.genesisAndRuleFactory.Load(genesisBytes, upgradeBytes, vm.snowCtx.NetworkID, vm.snowCtx.ChainID) vm.GenesisBytes = genesisBytes if err != nil { @@ -232,6 +215,15 @@ func (vm *VM) Initialize( }) // Instantiate DBs + pebbleConfig := pebble.NewDefaultConfig() + rawStateDBRegistry := prometheus.NewRegistry() + if err := vm.snowCtx.Metrics.Register("rawstatedb", rawStateDBRegistry); err != nil { + return nil, fmt.Errorf("failed to register rawstatedb metrics: %w", err) + } + vm.rawStateDB, err = storage.New(pebbleConfig, vm.snowCtx.ChainDataDir, stateDB, rawStateDBRegistry) + if err != nil { + return nil, err + } merkleRegistry := prometheus.NewRegistry() vm.stateDB, err = merkledb.New(ctx, vm.rawStateDB, merkledb.Config{ BranchFactor: vm.genesis.GetStateBranchFactor(), @@ -254,6 +246,9 @@ func (vm *VM) Initialize( if err := vm.stateDB.Close(); err != nil { return fmt.Errorf("failed to close state db: %w", err) } + if err := vm.rawStateDB.Close(); err != nil { + return fmt.Errorf("failed to close raw state db: %w", err) + } return nil }) if err := vm.snowCtx.Metrics.Register("state", merkleRegistry); err != nil { @@ -323,25 +318,6 @@ func (vm *VM) Initialize( if err != nil { return nil, err } - vm.syncer = validitywindow.NewSyncer(vm, vm.chainTimeValidityWindow, func(time int64) int64 { - return vm.ruleFactory.GetRules(time).GetValidityWindow() - }) - validityWindowReady := lifecycle.NewChanReady() - // TODO: combine time validity window and state syncer to call FinishStateSync correclty - snowApp.WithPreReadyAcceptedSub(event.SubscriptionFunc[*chain.ExecutionBlock]{ - NotifyF: func(ctx context.Context, b *chain.ExecutionBlock) error { - vm.metrics.txsAccepted.Add(float64(len(b.StatelessBlock.Txs))) - seenValidityWindow, err := vm.syncer.Accept(ctx, b) - if err != nil { - return fmt.Errorf("syncer failed to accept block: %w", err) - } - if seenValidityWindow { - validityWindowReady.MarkReady() - } - - return nil - }, - }) snowApp.WithVerifiedSub(event.SubscriptionFunc[*chain.OutputBlock]{ NotifyF: func(ctx context.Context, b *chain.OutputBlock) error { vm.metrics.txsVerified.Add(float64(len(b.StatelessBlock.Txs))) @@ -349,30 +325,6 @@ func (vm *VM) Initialize( }, }) - syncerDBRegistry := prometheus.NewRegistry() - if err := vm.snowCtx.Metrics.Register(syncerDB, syncerDBRegistry); err != nil { - return nil, fmt.Errorf("failed to register syncerdb metrics: %w", err) - } - syncerDB, err := storage.New(pebbleConfig, vm.snowCtx.ChainDataDir, syncerDB, syncerDBRegistry) - if err != nil { - return nil, err - } - snowApp.WithCloser(func() error { - if err := syncerDB.Close(); err != nil { - return fmt.Errorf("failed to close syncer db: %w", err) - } - return nil - }) - if err := snowApp.WithStateSyncer( - syncerDB, - vm.stateDB, - rangeProofHandlerID, - changeProofHandlerID, - vm.genesis.GetStateBranchFactor(), - ); err != nil { - return nil, err - } - if err := vm.network.AddHandler( txGossipHandlerID, gossiper.NewTxGossipHandler( @@ -393,12 +345,17 @@ func (vm *VM) Initialize( if err != nil { return nil, err } + // Switch away from true chainIndex, err := makeChainIndex(ctx, vm.chainStore, lastAccepted, lastAccepted, true) if err != nil { return nil, err } vm.chainIndex = chainIndex + if err := vm.initStateSync(ctx); err != nil { + return nil, err + } + // Initialize the syncer with the last accepted block snowApp.WithStateSyncStarted(func(ctx context.Context) error { lastAccepted := vm.chainIndex.GetLastAccepted(ctx) @@ -453,6 +410,10 @@ func (vm *VM) initLastAccepted(ctx context.Context) (*chain.OutputBlock, error) // If the chain store is initialized, return the output block that matches with the latest // state. + return vm.extractLatestOutputBlock(ctx) +} + +func (vm *VM) extractLatestOutputBlock(ctx context.Context) (*chain.OutputBlock, error) { heightBytes, err := vm.stateDB.Get(chain.HeightKey(vm.metadataManager.HeightPrefix())) if err != nil { return nil, err @@ -461,6 +422,8 @@ func (vm *VM) initLastAccepted(ctx context.Context) (*chain.OutputBlock, error) if err != nil { return nil, err } + // We should always have the block at the height matching the state height + // because we always keep the chain store tip >= state tip. blk, err := vm.chainStore.GetBlockByHeight(ctx, stateHeight) if err != nil { return nil, err diff --git a/vm/vm_test.go b/vm/vm_test.go index 8d287458bc..9cce08384b 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -17,9 +17,11 @@ import ( avasnow "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/enginetest" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/vms/rpcchainvm/grpcutils" "github.com/ava-labs/avalanchego/x/merkledb" "github.com/ava-labs/hypersdk/api/indexer" @@ -67,8 +69,8 @@ func NewTestVM( ctx context.Context, t *testing.T, chainID ids.ID, + genesisBytes []byte, configBytes []byte, - allocations []*genesis.CustomAllocation, ) *VM { r := require.New(t) var ( @@ -98,18 +100,11 @@ func NewTestVM( snowVM := snow.NewVM(vm) toEngine := make(chan common.Message, 1) - testRules := genesis.NewDefaultRules() - testRules.MinBlockGap = 0 - testRules.MinEmptyBlockGap = 0 - genesis := genesis.DefaultGenesis{ - StateBranchFactor: merkledb.BranchFactor16, - CustomAllocation: allocations, - Rules: testRules, - } - genesisBytes, err := json.Marshal(genesis) - r.NoError(err) + snowCtx := snowtest.Context(t, chainID) + snowCtx.Log = logging.NewLogger("vmtest") snowCtx.ChainDataDir = t.TempDir() + snowCtx.NodeID = ids.GenerateTestNodeID() appSender := &enginetest.Sender{T: t} r.NoError(snowVM.Initialize(ctx, snowCtx, nil, genesisBytes, nil, configBytes, toEngine, nil, appSender)) @@ -131,13 +126,17 @@ func NewTestVM( } type TestNetwork struct { - chainID ids.ID - require *require.Assertions + t *testing.T + require *require.Assertions + + chainID ids.ID + genesisBytes []byte + authFactory chain.AuthFactory + VMs []*VM nodeIDToVM map[ids.NodeID]*VM + uris []string - authFactory chain.AuthFactory - uris []string configuration workload.TestNetworkConfiguration } @@ -153,113 +152,143 @@ func NewTestNetwork( r.NoError(err) authFactory := auth.NewED25519Factory(privKey) funds := uint64(1_000_000_000) - vms := make([]*VM, numVMs) allocations := []*genesis.CustomAllocation{ { Address: authFactory.Address(), Balance: funds, }, } + testRules := genesis.NewDefaultRules() + testRules.MinBlockGap = 0 + testRules.MinEmptyBlockGap = 0 + testRules.ValidityWindow = time.Second.Milliseconds() + genesis := genesis.DefaultGenesis{ + StateBranchFactor: merkledb.BranchFactor16, + CustomAllocation: allocations, + Rules: testRules, + } + genesisBytes, err := json.Marshal(genesis) + r.NoError(err) + + testNetwork := &TestNetwork{ + t: t, + require: r, + chainID: chainID, + genesisBytes: genesisBytes, + authFactory: authFactory, + } + + vms := make([]*VM, numVMs) nodeIDToVM := make(map[ids.NodeID]*VM) uris := make([]string, len(vms)) for i := range vms { - vm := NewTestVM(ctx, t, chainID, configBytes, allocations) + vm := NewTestVM(ctx, t, chainID, genesisBytes, configBytes) vms[i] = vm uris[i] = vm.server.URL nodeIDToVM[vm.nodeID] = vm } + testNetwork.VMs = vms + testNetwork.nodeIDToVM = nodeIDToVM + testNetwork.uris = uris configuration := workload.NewDefaultTestNetworkConfiguration( vms[0].VM.GenesisBytes, "hypervmtests", vms[0].VM, []chain.AuthFactory{authFactory}, ) - testNetwork := &TestNetwork{ - chainID: chainID, - require: r, - VMs: vms, - authFactory: authFactory, - uris: uris, - configuration: configuration, - nodeIDToVM: nodeIDToVM, - } - testNetwork.initAppNetwork() + testNetwork.configuration = configuration + testNetwork.initAppNetwork(ctx) return testNetwork } -func (n *TestNetwork) initAppNetwork() { - for _, vm := range n.VMs { - myNodeID := vm.nodeID - vm.appSender.SendAppRequestF = func(ctx context.Context, nodeIDs set.Set[ids.NodeID], u uint32, b []byte) error { - for nodeID := range nodeIDs { - if nodeID == myNodeID { - go func() { - err := vm.SnowVM.AppRequest(ctx, nodeID, u, time.Now().Add(time.Second), b) - n.require.NoError(err) - }() - } else { - err := n.nodeIDToVM[nodeID].SnowVM.AppRequest(ctx, nodeID, u, time.Now().Add(time.Second), b) - n.require.NoError(err) - } - } - return nil - } - vm.appSender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { +func (n *TestNetwork) AddVM(ctx context.Context, configBytes []byte) *VM { + vm := NewTestVM(ctx, n.t, n.chainID, n.genesisBytes, configBytes) + n.VMs = append(n.VMs, vm) + n.nodeIDToVM[vm.nodeID] = vm + n.uris = append(n.uris, vm.server.URL) + n.initVMNetwork(ctx, vm) + return vm +} + +func (n *TestNetwork) initVMNetwork(ctx context.Context, vm *VM) { + myNodeID := vm.nodeID + vm.appSender.SendAppRequestF = func(ctx context.Context, nodeIDs set.Set[ids.NodeID], u uint32, b []byte) error { + for nodeID := range nodeIDs { if nodeID == myNodeID { go func() { - err := vm.SnowVM.AppResponse(ctx, nodeID, requestID, response) + err := vm.SnowVM.AppRequest(ctx, nodeID, u, time.Now().Add(time.Second), b) n.require.NoError(err) }() - return nil + } else { + err := n.nodeIDToVM[nodeID].SnowVM.AppRequest(ctx, nodeID, u, time.Now().Add(time.Second), b) + n.require.NoError(err) } + } + return nil + } + vm.appSender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { + if nodeID == myNodeID { + go func() { + err := vm.SnowVM.AppResponse(ctx, nodeID, requestID, response) + n.require.NoError(err) + }() + return nil + } - return n.nodeIDToVM[nodeID].SnowVM.AppResponse(ctx, nodeID, requestID, response) + return n.nodeIDToVM[nodeID].SnowVM.AppResponse(ctx, nodeID, requestID, response) + } + vm.appSender.SendAppErrorF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, code int32, message string) error { + if nodeID == myNodeID { + go func() { + err := vm.SnowVM.AppRequestFailed(ctx, nodeID, requestID, &common.AppError{ + Code: code, + Message: message, + }) + n.require.NoError(err) + }() + return nil } - vm.appSender.SendAppErrorF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, code int32, message string) error { + return n.nodeIDToVM[nodeID].SnowVM.AppRequestFailed(ctx, nodeID, requestID, &common.AppError{ + Code: code, + Message: message, + }) + } + vm.appSender.SendAppGossipF = func(ctx context.Context, sendConfig common.SendConfig, b []byte) error { + nodeIDs := sendConfig.NodeIDs + nodeIDs.Remove(myNodeID) + // Select numSend nodes excluding myNodeID and gossip to them + numSend := sendConfig.Validators + sendConfig.NonValidators + sendConfig.Peers + nodes := set.NewSet[ids.NodeID](numSend) + for nodeID := range n.nodeIDToVM { if nodeID == myNodeID { - go func() { - err := vm.SnowVM.AppRequestFailed(ctx, nodeID, requestID, &common.AppError{ - Code: code, - Message: message, - }) - n.require.NoError(err) - }() - return nil + continue } - return n.nodeIDToVM[nodeID].SnowVM.AppRequestFailed(ctx, nodeID, requestID, &common.AppError{ - Code: code, - Message: message, - }) - } - vm.appSender.SendAppGossipF = func(ctx context.Context, sendConfig common.SendConfig, b []byte) error { - nodeIDs := sendConfig.NodeIDs - nodeIDs.Remove(myNodeID) - // Select numSend nodes excluding myNodeID and gossip to them - numSend := sendConfig.Validators + sendConfig.NonValidators + sendConfig.Peers - nodes := set.NewSet[ids.NodeID](numSend) - for nodeID := range n.nodeIDToVM { - if nodeID == myNodeID { - continue - } - nodes.Add(nodeID) - if nodes.Len() >= numSend { - break - } + nodes.Add(nodeID) + if nodes.Len() >= numSend { + break } + } - // Send to specified nodes - for nodeID := range nodeIDs { - err := n.nodeIDToVM[nodeID].SnowVM.AppGossip(ctx, nodeID, b) - n.require.NoError(err) - } - return nil + // Send to specified nodes + for nodeID := range nodeIDs { + err := n.nodeIDToVM[nodeID].SnowVM.AppGossip(ctx, nodeID, b) + n.require.NoError(err) + } + return nil + } + + for _, peer := range n.VMs { + if peer.nodeID == vm.nodeID { + continue } + n.require.NoError(vm.SnowVM.Connected(ctx, peer.nodeID, version.CurrentApp)) + n.require.NoError(peer.SnowVM.Connected(ctx, vm.nodeID, version.CurrentApp)) } } -func (n *TestNetwork) SetState(ctx context.Context, state avasnow.State) { +func (n *TestNetwork) initAppNetwork(ctx context.Context) { for _, vm := range n.VMs { - n.require.NoError(vm.SnowVM.SetState(ctx, state)) + n.initVMNetwork(ctx, vm) } } @@ -335,6 +364,12 @@ func (n *TestNetwork) GenerateTx(ctx context.Context, actions []chain.Action, au return tx, err } +func (n *TestNetwork) ConfirmBlocks(ctx context.Context, numBlocks int, generateTxs func(i int) []*chain.Transaction) { + for i := 0; i < numBlocks; i++ { + n.require.NoError(n.ConfirmTxs(ctx, generateTxs(i))) + } +} + func (n *TestNetwork) URIs() []string { return n.uris } @@ -574,7 +609,9 @@ func TestForceGossip(t *testing.T) { r := require.New(t) chainID := ids.GenerateTestID() network := NewTestNetwork(ctx, t, chainID, 2, nil) - network.SetState(ctx, avasnow.NormalOp) + for _, vm := range network.VMs { + vm.SnowVM.SetState(ctx, avasnow.NormalOp) + } tx0, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{ NumComputeUnits: 1, @@ -803,3 +840,79 @@ func TestWebsocketAPI(t *testing.T) { r.Equal(lastAccepted.ExecutionResults.Results, wsResults) r.Equal(lastAccepted.ExecutionResults.UnitPrices, wsUnitPrices) } + +func TestStateSync(t *testing.T) { + ctx := context.Background() + r := require.New(t) + chainID := ids.GenerateTestID() + network := NewTestNetwork(ctx, t, chainID, 1, nil) + + initialVM := network.VMs[0] + initialVMGenesisBlock := initialVM.SnowVM.GetChainIndex().GetLastAccepted(ctx) + + numBlocks := 5 + network.ConfirmBlocks(ctx, numBlocks, func(i int) []*chain.Transaction { + tx, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{ + NumComputeUnits: 1, + Nonce: uint64(i), + SpecifiedStateKeys: state.Keys{ + string(keys.EncodeChunks([]byte{byte(i)}, 1)): state.All, + }, + WriteKeys: [][]byte{keys.EncodeChunks([]byte{byte(i)}, 1)}, + WriteValues: [][]byte{{byte(i)}}, + }}, network.authFactory) + r.NoError(err) + return []*chain.Transaction{tx} + }) + + stateSyncConfig := vm.StateSyncConfig{ + MinBlocks: uint64(numBlocks - 1), + MerkleSimultaneousWorkLimit: 1, + } + stateSyncConfigBytes, err := json.Marshal(stateSyncConfig) + r.NoError(err) + config := map[string]json.RawMessage{ + vm.StateSyncNamespace: stateSyncConfigBytes, + } + configBytes, err := json.Marshal(config) + r.NoError(err) + + vm := network.AddVM(ctx, configBytes) + lastAccepted := vm.SnowVM.GetChainIndex().GetLastAccepted(ctx) + r.Equal(lastAccepted.ID(), initialVMGenesisBlock.ID()) + + stateSummary, err := initialVM.SnowVM.GetLastStateSummary(ctx) + r.NoError(err) + parsedStateSummary, err := vm.SnowVM.ParseStateSummary(ctx, stateSummary.Bytes()) + r.NoError(err) + + // Accepting the state sync summary kicks off an async process + stateSyncMode, err := parsedStateSummary.Accept(ctx) + r.NoError(err) + r.Equal(block.StateSyncDynamic, stateSyncMode) + + network.ConfirmBlocks(ctx, numBlocks, func(i int) []*chain.Transaction { + time.Sleep(time.Second) + tx, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{ + NumComputeUnits: 1, + Nonce: uint64(i + numBlocks), + SpecifiedStateKeys: state.Keys{ + string(keys.EncodeChunks([]byte{byte(i)}, 1)): state.All, + }, + WriteKeys: [][]byte{keys.EncodeChunks([]byte{byte(i)}, 1)}, + WriteValues: [][]byte{{byte(i)}}, + }}, network.authFactory) + r.NoError(err) + return []*chain.Transaction{tx} + }) + + // Make sure we can sync the block window + // Confirm we can sync the block window and merkle trie individually + // Block window may require processing more blocks + // Confirm FinishStateSync is called with a correct block + // Add cases for each state sync case + + // time.Sleep(5 * time.Second) + updatedLastAcceptedBlock := vm.SnowVM.GetChainIndex().GetLastAccepted(ctx) + r.Equal(uint64(numBlocks), updatedLastAcceptedBlock.Height()) +}