Skip to content

Commit

Permalink
Use groupcache pool for state db access (#91)
Browse files Browse the repository at this point in the history
* Use groupcache pool for state db access

* Group cache config and logging stats on timer

* Integrate state validator into server

* Use tagged ipfs-ethdb

* groupcache config for tests

* Work around duplicate registration of groupcache error in tests

* Use tagged version of eth-ipfs-state-validator

* State validation command.

* Validator for static replicas to keep cache warm

* Update docker go-version and go.mod.

* Address comments and self review.

* Fix ipfs-ethdb version.

Co-authored-by: Arijit Das <arijitad.in@gmail.com>
  • Loading branch information
ashwinphatak and arijitAD authored Sep 21, 2021
1 parent 838ed03 commit 2de9c5b
Show file tree
Hide file tree
Showing 14 changed files with 1,208 additions and 101 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.13-alpine as builder
FROM golang:1.14-alpine as builder

RUN apk --update --no-cache add make git g++ linux-headers
# DEBUG
Expand Down
37 changes: 37 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright © 2021 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

func addDatabaseFlags(command *cobra.Command) {
// database flags
command.PersistentFlags().String("database-name", "vulcanize_public", "database name")
command.PersistentFlags().Int("database-port", 5432, "database port")
command.PersistentFlags().String("database-hostname", "localhost", "database hostname")
command.PersistentFlags().String("database-user", "", "database user")
command.PersistentFlags().String("database-password", "", "database password")

// database flag bindings
viper.BindPFlag("database.name", command.PersistentFlags().Lookup("database-name"))
viper.BindPFlag("database.port", command.PersistentFlags().Lookup("database-port"))
viper.BindPFlag("database.hostname", command.PersistentFlags().Lookup("database-hostname"))
viper.BindPFlag("database.user", command.PersistentFlags().Lookup("database-user"))
viper.BindPFlag("database.password", command.PersistentFlags().Lookup("database-password"))
}
133 changes: 118 additions & 15 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
"os/signal"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/rpc"
"github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-eth-server/pkg/eth"

"github.com/vulcanize/gap-filler/pkg/mux"

"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/graphql"
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
Expand Down Expand Up @@ -86,6 +88,18 @@ func serve() {
logWithCommand.Fatal(err)
}

err = startGroupCacheService(serverConfig)
if err != nil {
logWithCommand.Fatal(err)
}

if serverConfig.StateValidationEnabled {
go startStateTrieValidator(serverConfig, server)
logWithCommand.Info("state validator enabled")
} else {
logWithCommand.Info("state validator disabled")
}

shutdown := make(chan os.Signal)
signal.Notify(shutdown, os.Interrupt)
<-shutdown
Expand Down Expand Up @@ -200,6 +214,83 @@ func startIpldGraphQL(settings *s.Config) error {
return nil
}

func startGroupCacheService(settings *s.Config) error {
gcc := settings.GroupCache

if gcc.Pool.Enabled {
logWithCommand.Info("starting up groupcache pool HTTTP server")

pool := groupcache.NewHTTPPoolOpts(gcc.Pool.HttpEndpoint, &groupcache.HTTPPoolOptions{})
pool.Set(gcc.Pool.PeerHttpEndpoints...)

httpURL, err := url.Parse(gcc.Pool.HttpEndpoint)
if err != nil {
return err
}

server := http.Server{
Addr: httpURL.Host,
Handler: pool,
}

// Start a HTTP server to listen for peer requests from the groupcache
go server.ListenAndServe()

logWithCommand.Infof("groupcache pool endpoint opened for url %s", httpURL)
} else {
logWithCommand.Info("Groupcache pool is disabled")
}

return nil
}

func startStateTrieValidator(config *s.Config, server s.Server) {
validateEveryNthBlock := config.StateValidationEveryNthBlock

var lastBlockNumber uint64
backend := server.Backend()

for {
time.Sleep(5 * time.Second)

block, err := backend.CurrentBlock()
if err != nil {
log.Errorln("Error fetching current block for state trie validator")
continue
}

stateRoot := block.Root()
blockNumber := block.NumberU64()
blockHash := block.Hash()

if validateEveryNthBlock <= 0 || // Used for static replicas where block number doesn't progress.
(blockNumber > lastBlockNumber) && (blockNumber%validateEveryNthBlock == 0) {

// The validate trie call will take a long time on mainnet, e.g. a few hours.
if err = backend.ValidateTrie(stateRoot); err != nil {
log.Fatalf("Error validating trie for block number %d hash %s state root %s",
blockNumber,
blockHash,
stateRoot,
)
}

log.Infof("Successfully validated trie for block number %d hash %s state root %s",
blockNumber,
blockHash,
stateRoot,
)

if validateEveryNthBlock <= 0 {
// Static replica, sleep a long-ish time (1/2 of cache expiry time) since we only need to keep the cache warm.
time.Sleep((time.Minute * time.Duration(config.GroupCache.StateDB.CacheExpiryInMins)) / 2)
}

lastBlockNumber = blockNumber
}
}
}

func parseRpcAddresses(value string) ([]*rpc.Client, error) {
rpcAddresses := strings.Split(value, ",")
rpcClients := make([]*rpc.Client, 0, len(rpcAddresses))
Expand All @@ -224,12 +315,7 @@ func parseRpcAddresses(value string) ([]*rpc.Client, error) {
func init() {
rootCmd.AddCommand(serveCmd)

// database credentials
serveCmd.PersistentFlags().String("database-name", "vulcanize_public", "database name")
serveCmd.PersistentFlags().Int("database-port", 5432, "database port")
serveCmd.PersistentFlags().String("database-hostname", "localhost", "database hostname")
serveCmd.PersistentFlags().String("database-user", "", "database user")
serveCmd.PersistentFlags().String("database-password", "", "database password")
addDatabaseFlags(serveCmd)

// flags for all config variables
// eth graphql and json-rpc parameters
Expand Down Expand Up @@ -260,14 +346,19 @@ func init() {
serveCmd.PersistentFlags().String("eth-chain-config", "", "json chain config file location")
serveCmd.PersistentFlags().Bool("eth-supports-state-diff", false, "whether or not the proxy ethereum client supports statediffing endpoints")

// and their bindings
// database
viper.BindPFlag("database.name", serveCmd.PersistentFlags().Lookup("database-name"))
viper.BindPFlag("database.port", serveCmd.PersistentFlags().Lookup("database-port"))
viper.BindPFlag("database.hostname", serveCmd.PersistentFlags().Lookup("database-hostname"))
viper.BindPFlag("database.user", serveCmd.PersistentFlags().Lookup("database-user"))
viper.BindPFlag("database.password", serveCmd.PersistentFlags().Lookup("database-password"))
// groupcache flags
serveCmd.PersistentFlags().Bool("gcache-pool-enabled", false, "turn on the groupcache pool")
serveCmd.PersistentFlags().String("gcache-pool-http-path", "", "http url for groupcache node")
serveCmd.PersistentFlags().StringArray("gcache-pool-http-peers", []string{}, "http urls for groupcache peers")
serveCmd.PersistentFlags().Int("gcache-statedb-cache-size", 16, "state DB cache size in MB")
serveCmd.PersistentFlags().Int("gcache-statedb-cache-expiry", 60, "state DB cache expiry time in mins")
serveCmd.PersistentFlags().Int("gcache-statedb-log-stats-interval", 60, "state DB cache stats log interval in secs")

// state validator flags
serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator")
serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block")

// and their bindings
// eth graphql server
viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql"))
viper.BindPFlag("eth.server.graphqlPath", serveCmd.PersistentFlags().Lookup("eth-server-graphql-path"))
Expand Down Expand Up @@ -301,4 +392,16 @@ func init() {
viper.BindPFlag("ethereum.rpcGasCap", serveCmd.PersistentFlags().Lookup("eth-rpc-gas-cap"))
viper.BindPFlag("ethereum.chainConfig", serveCmd.PersistentFlags().Lookup("eth-chain-config"))
viper.BindPFlag("ethereum.supportsStateDiff", serveCmd.PersistentFlags().Lookup("eth-supports-state-diff"))

// groupcache flags
viper.BindPFlag("groupcache.pool.enabled", serveCmd.PersistentFlags().Lookup("gcache-pool-enabled"))
viper.BindPFlag("groupcache.pool.httpEndpoint", serveCmd.PersistentFlags().Lookup("gcache-pool-http-path"))
viper.BindPFlag("groupcache.pool.peerHttpEndpoints", serveCmd.PersistentFlags().Lookup("gcache-pool-http-peers"))
viper.BindPFlag("groupcache.statedb.cacheSizeInMB", serveCmd.PersistentFlags().Lookup("gcache-statedb-cache-size"))
viper.BindPFlag("groupcache.statedb.cacheExpiryInMins", serveCmd.PersistentFlags().Lookup("gcache-statedb-cache-expiry"))
viper.BindPFlag("groupcache.statedb.logStatsIntervalInSecs", serveCmd.PersistentFlags().Lookup("gcache-statedb-log-stats-interval"))

// state validator flags
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
}
87 changes: 87 additions & 0 deletions cmd/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright © 2021 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
"time"

"github.com/ethereum/go-ethereum/common"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
validator "github.com/vulcanize/eth-ipfs-state-validator/pkg"
ipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"

s "github.com/vulcanize/ipld-eth-server/pkg/serve"
)

const GroupName = "statedb-validate"
const CacheExpiryInMins = 8 * 60 // 8 hours
const CacheSizeInMB = 16 // 16 MB

var validateCmd = &cobra.Command{
Use: "validate",
Short: "valdiate state",
Long: `This command validates the trie for the given state root`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand)
validate()
},
}

func validate() {
config, err := s.NewConfig()
if err != nil {
logWithCommand.Fatal(err)
}

stateRootStr := viper.GetString("stateRoot")
if stateRootStr == "" {
logWithCommand.Fatal("must provide a state root for state validation")
}

stateRoot := common.HexToHash(stateRootStr)
cacheSize := viper.GetInt("cacheSize")

ethDB := ipfsethdb.NewDatabase(config.DB.DB, ipfsethdb.CacheConfig{
Name: GroupName,
Size: cacheSize * 1024 * 1024,
ExpiryDuration: time.Minute * time.Duration(CacheExpiryInMins),
})

validator := validator.NewValidator(nil, ethDB)
if err = validator.ValidateTrie(stateRoot); err != nil {
log.Fatalln("Error validating state root")
}

stats := ethDB.GetCacheStats()
log.Debugf("groupcache stats %+v", stats)

log.Infoln("Successfully validated state root")
}

func init() {
rootCmd.AddCommand(validateCmd)

addDatabaseFlags(validateCmd)

validateCmd.PersistentFlags().String("state-root", "", "root of the state trie we wish to validate")
viper.BindPFlag("stateRoot", validateCmd.PersistentFlags().Lookup("state-root"))

validateCmd.PersistentFlags().Int("cache-size", CacheSizeInMB, "cache size in MB")
viper.BindPFlag("cacheSize", validateCmd.PersistentFlags().Lookup("cache-size"))
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ require (
github.com/jmoiron/sqlx v1.2.0
github.com/lib/pq v1.10.2
github.com/machinebox/graphql v0.2.2
github.com/mailgun/groupcache/v2 v2.2.1
github.com/matryer/is v1.4.0 // indirect
github.com/multiformats/go-multihash v0.0.14
github.com/onsi/ginkgo v1.16.2
github.com/onsi/gomega v1.10.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_golang v1.7.1
github.com/shirou/gopsutil v3.21.5+incompatible // indirect
github.com/sirupsen/logrus v1.7.0
github.com/spf13/cobra v1.1.1
github.com/spf13/viper v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
github.com/vulcanize/eth-ipfs-state-validator v0.0.1
github.com/vulcanize/gap-filler v0.3.1
github.com/vulcanize/ipfs-ethdb v0.0.4-0.20210824131459-7bb49801fc12
github.com/vulcanize/ipfs-ethdb v0.0.4
)

replace github.com/ethereum/go-ethereum v1.10.8 => github.com/vulcanize/go-ethereum v1.10.8-statediff-0.0.26

replace github.com/vulcanize/ipfs-ethdb v0.0.2-alpha => github.com/vulcanize/pg-ipfs-ethdb v0.0.2-alpha
Loading

0 comments on commit 2de9c5b

Please sign in to comment.