Skip to content

Commit

Permalink
WIP: add state sync integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronbuchwald committed Dec 23, 2024
1 parent 7f7afd2 commit 78079a2
Show file tree
Hide file tree
Showing 16 changed files with 737 additions and 575 deletions.
76 changes: 65 additions & 11 deletions context/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,76 @@
package context

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/require"
)

func TestConfig(t *testing.T) {
r := require.New(t)
c := make(Config)
type testConfig struct {
TxFee uint64 `json:"txFee"`
MinFee uint64 `json:"minFee"`
}

type VMConfig struct {
TxFee uint64 `json:"txFee"`
func TestConfigC(t *testing.T) {
type test struct {
name string
providedStr string
defaultConfig testConfig
wantConfig testConfig
}
for _, test := range []test{
{
name: "default want non-zero values",
providedStr: "",
defaultConfig: testConfig{TxFee: 100},
wantConfig: testConfig{TxFee: 100},
},
{
name: "default want zero values",
providedStr: "",
defaultConfig: testConfig{},
wantConfig: testConfig{},
},
{
name: "override default with zero values",
providedStr: `{
"test": {
"txFee": 0,
"minFee": 0
}
}`,
defaultConfig: testConfig{TxFee: 100, MinFee: 100},
wantConfig: testConfig{TxFee: 0, MinFee: 0},
},
{
name: "override non-zero defaults",
providedStr: `{
"test": {
"txFee": 1000,
"minFee": 1000
}
}`,
defaultConfig: testConfig{TxFee: 100, MinFee: 100},
wantConfig: testConfig{TxFee: 1000, MinFee: 1000},
},
{
name: "override one default value",
providedStr: `{
"test": {
"txFee": 1000
}
}`,
defaultConfig: testConfig{TxFee: 100, MinFee: 100},
wantConfig: testConfig{TxFee: 1000, MinFee: 100},
},
} {
t.Run(test.name, func(t *testing.T) {
r := require.New(t)
c, err := NewConfig([]byte(test.providedStr))
r.NoError(err)
testConfig, err := GetConfig(c, "test", test.defaultConfig)
r.NoError(err)
r.Equal(test.wantConfig, testConfig)
})
}
configStr := `{"vm": {"txFee": 1000}}`
r.NoError(json.Unmarshal([]byte(configStr), &c))
vmConfig, err := GetConfig(c, "vm", VMConfig{})
r.NoError(err)
r.Equal(uint64(1000), vmConfig.TxFee)
}
3 changes: 1 addition & 2 deletions examples/morpheusvm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d // indirect
github.com/neilotoole/errgroup v0.1.6 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/openzipkin/zipkin-go v0.4.1 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/pires/go-proxyproto v0.6.2 // indirect
Expand Down Expand Up @@ -138,7 +138,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/zipkin v1.11.2 // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.22.0 // indirect
Expand Down
9 changes: 9 additions & 0 deletions lifecycle/ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ func (c *ChanReady) Ready() bool {
}
}

func (c *ChanReady) AwaitReady(done <-chan struct{}) bool {
select {
case <-c.ready:
return true
case <-done:
return false
}
}

func (c *ChanReady) MarkReady() {
c.readyOnce.Do(func() { close(c.ready) })
}
Expand Down
9 changes: 6 additions & 3 deletions snow/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/ava-labs/avalanchego/api/health"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/hypersdk/event"
"github.com/ava-labs/hypersdk/lifecycle"
"github.com/ava-labs/hypersdk/statesync"
)

type Application[I Block, O Block, A Block] struct {
Expand All @@ -21,8 +21,7 @@ type Application[I Block, O Block, A Block] struct {
Handlers map[string]http.Handler
HealthChecker health.Checker
Network *p2p.Network
StateSyncClient *statesync.Client[*StatefulBlock[I, O, A]]
StateSyncServer *statesync.Server[*StatefulBlock[I, O, A]]
StateSyncableVM block.StateSyncableVM
Closers []func() error

Ready *lifecycle.AtomicBoolReady
Expand All @@ -36,6 +35,10 @@ type Application[I Block, O Block, A Block] struct {
PreReadyAcceptedSubs []event.Subscription[I]
}

func (a *Application[I, O, A]) GetCovariantVM() *CovariantVM[I, O, A] {
return a.vm.covariantVM
}

func (a *Application[I, O, A]) WithAcceptedSub(sub ...event.Subscription[A]) {
a.AcceptedSubs = append(a.AcceptedSubs, sub...)
}
Expand Down
86 changes: 7 additions & 79 deletions snow/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,85 +7,13 @@ import (
"context"
"fmt"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/x/merkledb"
hcontext "github.com/ava-labs/hypersdk/context"
"github.com/ava-labs/hypersdk/statesync"
"github.com/prometheus/client_golang/prometheus"
)

var _ block.StateSyncableVM = (*VM[Block, Block, Block])(nil)

func (a *Application[I, O, A]) WithStateSyncableVM(
client *statesync.Client[*StatefulBlock[I, O, A]],
server *statesync.Server[*StatefulBlock[I, O, A]],
) {
a.StateSyncClient = client
a.StateSyncServer = server
}

type StateSyncConfig struct {
MinBlocks uint64 `json:"minBlocks"`
Parallelism int `json:"parallelism"`
}

func NewDefaultStateSyncConfig() StateSyncConfig {
return StateSyncConfig{
MinBlocks: 768,
Parallelism: 4,
}
}

func GetStateSyncConfig(ctx *hcontext.Context) (StateSyncConfig, error) {
return hcontext.GetConfigFromContext(ctx, "statesync", NewDefaultStateSyncConfig())
}

func (a *Application[I, O, A]) WithStateSyncer(
db database.Database,
stateDB merkledb.MerkleDB,
rangeProofHandlerID uint64,
changeProofHandlerID uint64,
branchFactor merkledb.BranchFactor,
) error {
server := statesync.NewServer[*StatefulBlock[I, O, A]](
a.vm.log,
a.vm.covariantVM,
)
a.StateSyncServer = server

syncerRegistry := prometheus.NewRegistry()
if err := a.vm.snowCtx.Metrics.Register("syncer", syncerRegistry); err != nil {
return err
}
stateSyncConfig, err := GetStateSyncConfig(a.vm.hctx)
if err != nil {
return err
}
client := statesync.NewClient[*StatefulBlock[I, O, A]](
a.vm.covariantVM,
a.vm.snowCtx.Log,
syncerRegistry,
db,
stateDB,
a.Network,
rangeProofHandlerID,
changeProofHandlerID,
branchFactor,
stateSyncConfig.MinBlocks,
stateSyncConfig.Parallelism,
)
a.StateSyncClient = client
a.OnNormalOperationStarted = append(a.OnNormalOperationStarted, client.StartBootstrapping)
// Note: this is not perfect because we may need to get a notification of a block between finishing state sync
// and when the engine/VM has received the notification and switched over.
// a.WithPreReadyAcceptedSub(event.SubscriptionFunc[I]{
// NotifyF: func(ctx context.Context, block I) error {
// _, err := client.UpdateSyncTarget(block)
// return err
// },
// })
return statesync.RegisterHandlers(a.vm.log, a.Network, rangeProofHandlerID, changeProofHandlerID, stateDB)
func (a *Application[I, O, A]) WithStateSyncableVM(stateSyncableVM block.StateSyncableVM) {
a.StateSyncableVM = stateSyncableVM
}

// StartStateSync marks the VM as "not ready" so that blocks are verified / accepted vaccuously
Expand Down Expand Up @@ -141,21 +69,21 @@ func (v *VM[I, O, A]) FinishStateSync(ctx context.Context, input I, output O, ac
}

func (v *VM[I, O, A]) StateSyncEnabled(ctx context.Context) (bool, error) {
return v.app.StateSyncClient.StateSyncEnabled(ctx)
return v.app.StateSyncableVM.StateSyncEnabled(ctx)
}

func (v *VM[I, O, A]) GetOngoingSyncStateSummary(ctx context.Context) (block.StateSummary, error) {
return v.app.StateSyncClient.GetOngoingSyncStateSummary(ctx)
return v.app.StateSyncableVM.GetOngoingSyncStateSummary(ctx)
}

func (v *VM[I, O, A]) GetLastStateSummary(ctx context.Context) (block.StateSummary, error) {
return v.app.StateSyncServer.GetLastStateSummary(ctx)
return v.app.StateSyncableVM.GetLastStateSummary(ctx)
}

func (v *VM[I, O, A]) ParseStateSummary(ctx context.Context, summaryBytes []byte) (block.StateSummary, error) {
return v.app.StateSyncClient.ParseStateSummary(ctx, summaryBytes)
return v.app.StateSyncableVM.ParseStateSummary(ctx, summaryBytes)
}

func (v *VM[I, O, A]) GetStateSummary(ctx context.Context, summaryHeight uint64) (block.StateSummary, error) {
return v.app.StateSyncServer.GetStateSummary(ctx, summaryHeight)
return v.app.StateSyncableVM.GetStateSummary(ctx, summaryHeight)
}
3 changes: 1 addition & 2 deletions snow/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/ava-labs/hypersdk/event"
"github.com/ava-labs/hypersdk/internal/cache"
"github.com/ava-labs/hypersdk/lifecycle"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -285,7 +284,7 @@ func (v *VM[I, O, A]) SetState(ctx context.Context, state snow.State) error {
}
return nil
case snow.NormalOp:
v.log.Info("Starting normal operation", zap.Bool("stateSyncStarted", v.app.StateSyncClient.Started()))
v.log.Info("Starting normal operation")
for _, startNormalOpF := range v.app.OnNormalOperationStarted {
if err := startNormalOpF(ctx); err != nil {
return err
Expand Down
9 changes: 2 additions & 7 deletions statesync/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ type StateSummaryBlock interface {
Height() uint64
Bytes() []byte
GetStateRoot() ids.ID
AcceptSyncTarget(context.Context) error
}

type SyncableBlock[T StateSummaryBlock] struct {
container T
accepter Accepter[T] // accepter is nil if the SyncableBlock is constructed by the server
accepter *Client[T]
}

func NewSyncableBlock[T StateSummaryBlock](container T, accepter Accepter[T]) *SyncableBlock[T] {
func NewSyncableBlock[T StateSummaryBlock](container T, accepter *Client[T]) *SyncableBlock[T] {
return &SyncableBlock[T]{
container: container,
accepter: accepter,
Expand All @@ -50,10 +49,6 @@ func (sb *SyncableBlock[T]) Accept(ctx context.Context) (block.StateSyncMode, er
return sb.accepter.Accept(ctx, sb.container)
}

func (sb *SyncableBlock[T]) AcceptSyncTarget(ctx context.Context) error {
return sb.container.AcceptSyncTarget(ctx)
}

func (sb *SyncableBlock[T]) String() string {
return sb.container.String()
}
62 changes: 62 additions & 0 deletions statesync/block_window_syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package statesync

import (
"context"
"fmt"
"sync"
)

var _ Syncer[interface{}] = (*BlockWindowSyncer[interface{}])(nil)

type BlockSyncer[T any] interface {
Accept(ctx context.Context, block T) (bool, error)
}

type BlockWindowSyncer[T any] struct {
syncer BlockSyncer[T]
doneOnce sync.Once
done chan struct{}
}

func NewBlockWindowSyncer[T any](syncer BlockSyncer[T]) *BlockWindowSyncer[T] {
return &BlockWindowSyncer[T]{
syncer: syncer,
done: make(chan struct{}),
}
}

func (b *BlockWindowSyncer[T]) Start(ctx context.Context, target T) error {
done, err := b.syncer.Accept(ctx, target)
if done {
b.doneOnce.Do(func() {
close(b.done)
})
}
return err
}

func (b *BlockWindowSyncer[T]) Wait(ctx context.Context) error {
select {
case <-b.done:
return nil
case <-ctx.Done():
return fmt.Errorf("failed to await full block window: %w", ctx.Err())
}
}

func (b *BlockWindowSyncer[T]) Close() error {
return nil
}

func (b *BlockWindowSyncer[T]) UpdateSyncTarget(ctx context.Context, target T) error {
done, err := b.syncer.Accept(ctx, target)
if done {
b.doneOnce.Do(func() {
close(b.done)
})
}
return err
}
Loading

0 comments on commit 78079a2

Please sign in to comment.