Skip to content

Commit

Permalink
Expose TrustPeriod flag for snapshots (#1063)
Browse files Browse the repository at this point in the history
* Expose TrustPeriod flag for snapshots

* fix lint issues
  • Loading branch information
charithabandi authored Oct 7, 2024
1 parent 301c5fa commit ca809d6
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 17 deletions.
1 change: 1 addition & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ var DefaultConfig = func() *commonConfig.KwildConfig {
Enable: false,
DiscoveryTime: commonConfig.Duration(15 * time.Second),
ChunkRequestTimeout: commonConfig.Duration(10 * time.Second),
TrustPeriod: commonConfig.Duration(730 * time.Hour), // 1 month
},
Consensus: &commonConfig.ConsensusConfig{
TimeoutPropose: commonConfig.Duration(3 * time.Second),
Expand Down
4 changes: 4 additions & 0 deletions cmd/kwil-admin/nodecfg/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,4 +395,8 @@ chunk_request_timeout = "{{ .ChainConfig.StateSync.ChunkRequestTimeout }}"
# Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default),
# the state sync process is aborted and the node will fall back to the regular block sync process.
# Trust period is the duration for which the node trusts the state sync snapshots.
# Snapshots older than the trust period are considered to be expired and are not used for state sync.
trust_period = "{{ .ChainConfig.StateSync.TrustPeriod }}"
`
4 changes: 4 additions & 0 deletions cmd/kwild/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/kwilteam/kwil-db/cmd"
"github.com/kwilteam/kwil-db/common/config"
Expand Down Expand Up @@ -45,6 +46,9 @@ func Test_Config_Toml(t *testing.T) {
assert.Equal(t, "9.8.7.6:20660", cfg.Instrumentation.PromListenAddr)

// TODO: Add bunch of other validations for different types
// 1h -> 3600s
assert.Equal(t, 1*time.Hour, cfg.ChainConfig.StateSync.TrustPeriod.Dur())
assert.Equal(t, config.Duration(1*time.Hour), cfg.ChainConfig.StateSync.TrustPeriod)
}

func Test_cleanListenAddr(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions cmd/kwild/config/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ chunk_request_timeout = "10s"
# Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default),
# the state sync process is aborted and the node will fall back to the regular block sync process.

# Trust period is the duration for which the node trusts the state sync snapshots.
# Snapshots older than the trust period are considered to be expired and are not used for state sync.
trust_period = "36000h"

[instrumentation]

# collect and serve are served under /metrics
Expand Down
1 change: 1 addition & 0 deletions cmd/kwild/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ to instead run a dedicated seeder like https://github.com/kwilteam/cometseed.`))
flagSet.StringVar(&cfg.ChainConfig.StateSync.RPCServers, "chain.statesync.rpc-servers", cfg.ChainConfig.StateSync.RPCServers, "Chain state sync rpc servers")
flagSet.Var(&cfg.ChainConfig.StateSync.DiscoveryTime, "chain.statesync.discovery-time", "Chain state sync discovery time")
flagSet.Var(&cfg.ChainConfig.StateSync.ChunkRequestTimeout, "chain.statesync.chunk-request-timeout", "Chain state sync chunk request timeout")
flagSet.Var(&cfg.ChainConfig.StateSync.TrustPeriod, "chain.statesync.trust-period", "Duration for which the state sync snapshots are considered trustworthy.")

// Instrumentation flags
flagSet.BoolVar(&cfg.Instrumentation.Prometheus, "instrumentation.prometheus", cfg.Instrumentation.Prometheus, "collect and serve prometheus metrics")
Expand Down
1 change: 1 addition & 0 deletions cmd/kwild/config/test_data/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ chunk_request_timeout = "10s"

# Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default),
# the state sync process is aborted and the node will fall back to the regular block sync process.
trust_period = "1h"

[instrumentation]

Expand Down
22 changes: 14 additions & 8 deletions cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,20 +768,30 @@ func buildStatesyncer(d *coreDependencies, db sql.ReadTxMaker) *statesync.StateS

providers := strings.Split(d.cfg.ChainConfig.StateSync.RPCServers, ",")

if len(providers) == 0 {
if len(providers) == 0 || d.cfg.ChainConfig.StateSync.RPCServers == "" {
failBuild(nil, "failed to configure state syncer, no remote servers provided.")
}

if len(providers) == 1 {
// Duplicating the same provider to satisfy cometbft statesync requirement of having at least 2 providers.
// Statesynce module doesn't have the same requirements and
// can work with a single provider (providers are passed as is)
d.cfg.ChainConfig.StateSync.RPCServers += "," + providers[0]
providers[0] = strings.TrimSpace(providers[0])
d.cfg.ChainConfig.StateSync.RPCServers = providers[0] + "," + providers[0]
} else {
// sanitize the providers
for i, p := range providers {
providers[i] = strings.TrimSpace(p)
}
d.cfg.ChainConfig.StateSync.RPCServers = strings.Join(providers, ",")
}

// create state syncer
return statesync.NewStateSyncer(d.ctx, dbCfg, kwildcfg.ReceivedSnapshotsDir(d.cfg.RootDir),
providers, db, *d.log.Named("state-syncer"))
ss, err := statesync.NewStateSyncer(d.ctx, dbCfg, kwildcfg.ReceivedSnapshotsDir(d.cfg.RootDir), providers, db, *d.log.Named("state-syncer"))
if err != nil {
failBuild(err, "failed to build state syncer")
}
return ss
}

// retrieveLightClientTrustOptions fetches the trust options (Trusted Height and Hash) from the
Expand All @@ -791,10 +801,6 @@ func buildStatesyncer(d *coreDependencies, db sql.ReadTxMaker) *statesync.StateS
func retrieveLightClientTrustOptions(d *coreDependencies) (height int64, hash string, err error) {
providers := strings.Split(d.cfg.ChainConfig.StateSync.RPCServers, ",")

if len(providers) == 0 {
failBuild(nil, "failed to configure state syncer, no remote servers provided.")
}

configDone := false
for _, p := range providers {
clt, err := statesync.ChainRPCClient(p)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kwild/server/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func newCometConfig(cfg *config.KwildConfig) *cmtCfg.Config {
}

// Light client verification
nodeCfg.StateSync.TrustPeriod = 168 * time.Hour // 7 days
nodeCfg.StateSync.TrustPeriod = time.Duration(userChainCfg.StateSync.TrustPeriod)

// Standardize the paths.
nodeCfg.DBPath = cometbft.DataDir // i.e. "data", we do not allow users to change
Expand Down
4 changes: 4 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ type StateSyncConfig struct {
// The timeout duration before re-requesting a chunk, possibly from a different
// peer (default: 1 minute).
ChunkRequestTimeout Duration `mapstructure:"chunk_request_timeout"`

// Trust period is the duration for which the node trusts the state sync snapshots.
// Snapshots older than the trust period are considered to be expired and are not used for state sync.
TrustPeriod Duration `mapstructure:"trust_period"`
}

type ChainConfig struct {
Expand Down
13 changes: 5 additions & 8 deletions internal/statesync/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type StateSyncer struct {
// It takes the database configuration, snapshot directory, and the trusted snapshot providers.
// Trusted snapshot providers are special nodes in the network trusted by the nodes and
// have snapshot creation enabled. These nodes are responsible for creating and validating snapshots.
func NewStateSyncer(ctx context.Context, cfg *DBConfig, snapshotDir string, providers []string, db sql.ReadTxMaker, logger log.Logger) *StateSyncer {
func NewStateSyncer(ctx context.Context, cfg *DBConfig, snapshotDir string, providers []string, db sql.ReadTxMaker, logger log.Logger) (*StateSyncer, error) {

ss := &StateSyncer{
dbConfig: cfg,
Expand All @@ -73,23 +73,20 @@ func NewStateSyncer(ctx context.Context, cfg *DBConfig, snapshotDir string, prov
for _, s := range providers {
clt, err := ChainRPCClient(s)
if err != nil {
logger.Error("Failed to create rpc client", log.String("server", s), log.Error(err))
return nil
return nil, fmt.Errorf("failed to create rpc client: %s, error %w", s, err)
}
ss.snapshotProviders = append(ss.snapshotProviders, clt)
}

// Ensure that the snapshot directory exists and is empty
if err := os.RemoveAll(snapshotDir); err != nil {
logger.Error("Failed to delete snapshot directory", log.String("dir", snapshotDir), log.Error(err))
return nil
return nil, fmt.Errorf("failed to delete snapshot directory: %s with error: %w", snapshotDir, err)
}
if err := os.MkdirAll(snapshotDir, 0755); err != nil {
logger.Error("Failed to create snapshot directory", log.String("dir", snapshotDir), log.Error(err))
return nil
return nil, fmt.Errorf("failed to create snapshot directory: %s with error: %w", snapshotDir, err)
}

return ss
return ss, nil
}

// OfferSnapshot checks if the snapshot is valid and kicks off the state sync process
Expand Down

0 comments on commit ca809d6

Please sign in to comment.