Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Persistent DHT and Peerstore (#907)
Browse files Browse the repository at this point in the history
* Implemented persistent datastore for peer and dht information in golang

* Implemented persistent key value store for the browser environment

* Finished the persistent storage solution

* Fixed all outstanding discrepancies between the dexie ds implementation and the tests

* Finalized new key store implementation

* Addressed issues uncovered during a personal review

* Addressed review feedback from @recmo

* Changed comment to prompt `ci/circleci:build`
  • Loading branch information
jalextowle authored Aug 25, 2020
1 parent 0677443 commit d5e4236
Show file tree
Hide file tree
Showing 21 changed files with 811 additions and 189 deletions.
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ test-go-parallel:
go test ./... -race -timeout 30s


.PHONY: test-key-value-stores
test-key-value-stores: test-key-value-stores-go test-key-value-stores-wasm


.PHONY: test-key-value-stores-go
test-key-value-stores-go:
ENABLE_KEY_VALUE_TESTS=true go test ./db


.PHONY: test-key-value-stores-wasm
test-key-value-stores-wasm:
WASM_INIT_FILE="$$(pwd)/packages/mesh-browser-shim/dist/browser_shim.js" GOOS=js GOARCH=wasm ENABLE_KEY_VALUE_TESTS=true go test ./db -timeout 30m -tags=browser -exec="$$GOPATH/bin/wasmbrowsertest"


.PHONY: test-go-serial
test-go-serial:
go test ./zeroex/ordervalidator ./zeroex/orderwatch ./core -race -timeout 90s -p=1 --serial
Expand Down
97 changes: 18 additions & 79 deletions cmd/mesh-bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/0xProject/0x-mesh/loghooks"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/p2p/banner"
sqlds "github.com/0xProject/sql-datastore"
"github.com/ipfs/go-datastore"
leveldbStore "github.com/ipfs/go-ds-leveldb"
libp2p "github.com/libp2p/go-libp2p"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
peerstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
"github.com/libp2p/go-libp2p/p2p/host/relay"
filter "github.com/libp2p/go-maddr-filter"
Expand Down Expand Up @@ -77,29 +75,6 @@ type Config struct {
DataStoreType string `envvar:"DATA_STORE_TYPE" default:"leveldb"`
// LevelDBDataDir is the directory used for storing data when using leveldb as data store type.
LevelDBDataDir string `envvar:"LEVELDB_DATA_DIR" default:"0x_mesh"`
// SQLDBConnectionString is the connection URL used to connect to the
// database.
// NOTE: When set it has precedence over SQL_DB_HOST, SQL_DB_PORT etc.
SQLDBConnectionString string `envvar:"SQL_DB_CONNECTION_STRING" default:"" json:"-"`
// SQLDBHost is the database host used to connect to the database when
// using postgres as data store type.
SQLDBHost string `envvar:"SQL_DB_HOST" default:"localhost" json:"-"`
// SQLDBPort is the database port used to connect to the database when
// using postgres as data store type.
SQLDBPort string `envvar:"SQL_DB_PORT" default:"5432" json:"-"`
// SQLDBUser is the database user used to connect to the database when
// using postgres as data store type.
SQLDBUser string `envvar:"SQL_DB_USER" default:"postgres" json:"-"`
// SQLDBPassword is the database password used to connect to the database when
// using postgres as data store type.
SQLDBPassword string `envvar:"SQL_DB_PASSWORD" default:"" json:"-"`
// SQLDBName is the database name to connect to when using
// postgres as data store type.
SQLDBName string `envvar:"SQL_DB_NAME" default:"datastore" json:"-"`
// SQLDBEngine is the underyling database engine to use as the
// database driver.
// NOTE: Currently only `postgres` driver is supported.
SQLDBEngine string `envvar:"SQL_DB_ENGINE" default:"postgres"`
// UseBootstrapList determines whether or not to use the list of hard-coded
// peers to bootstrap the DHT for peer discovery.
UseBootstrapList bool `envvar:"USE_BOOTSTRAP_LIST" default:"true"`
Expand Down Expand Up @@ -162,66 +137,30 @@ func main() {
var kadDHT *dht.IpfsDHT
var newDHT func(h host.Host) (routing.PeerRouting, error)

var peerStore peerstore.Peerstore

// TODO(oskar) - Figure out why returning an anonymous function from
// getNewDHT() is making kadDHT.runBootstrap panicing.
// When solved this big switch case can be removed from main()
switch config.DataStoreType {
case leveldbDataStore:
newDHT = func(h host.Host) (routing.PeerRouting, error) {
var err error
dhtDir := getDHTDir(config)
kadDHT, err = p2p.NewDHT(ctx, dhtDir, h)
if err != nil {
log.WithField("error", err).Fatal("could not create DHT")
}
return kadDHT, err
}

// Set up the peerstore to use LevelDB.
store, err := leveldbStore.NewDatastore(getPeerstoreDir(config), nil)
newDHT = func(h host.Host) (routing.PeerRouting, error) {
var err error
dhtDir := getDHTDir(config)
// Set up the DHT to use LevelDB.
store, err := leveldbStore.NewDatastore(dhtDir, nil)
if err != nil {
log.Fatal(err)
}

peerStore, err = pstoreds.NewPeerstore(ctx, store, pstoreds.DefaultOpts())
if err != nil {
log.Fatal(err)
}

case sqlDataStore:
db, err := getSQLDatabase(config)
if err != nil {
log.WithField("error", err).Fatal("could not create SQL database")
}

err = prepareSQLDatabase(db)
if err != nil {
log.WithField("error", err).Fatal("failed to repare SQL tables for datastores")
}

newDHT = func(h host.Host) (routing.PeerRouting, error) {
var err error
dstore := sqlds.NewDatastore(db, sqlds.NewQueriesForTable(dhtTableName))

kadDHT, err = NewDHTWithDatastore(ctx, dstore, h)
if err != nil {
log.WithField("error", err).Fatal("could not create DHT")
}

return kadDHT, err
return nil, err
}

pstore := sqlds.NewDatastore(db, sqlds.NewQueriesForTable(peerStoreTableName))
peerStore, err = pstoreds.NewPeerstore(ctx, pstore, pstoreds.DefaultOpts())
kadDHT, err = dht.New(ctx, h, dhtopts.Datastore(store), dhtopts.Protocols(p2p.DHTProtocolID))
if err != nil {
log.WithField("error", err).Fatal("could not create peerStore")
log.WithField("error", err).Fatal("could not create DHT")
}
return kadDHT, err
}

default:
log.Fatalf("invalid datastore configured: %s. Expected either %s or %s", config.DataStoreType, leveldbDataStore, sqlDataStore)
// Set up the peerstore to use LevelDB.
store, err := leveldbStore.NewDatastore(getPeerstoreDir(config), nil)
if err != nil {
log.Fatal(err)
}

peerStore, err := pstoreds.NewPeerstore(ctx, store, pstoreds.DefaultOpts())
if err != nil {
log.Fatal(err)
}

// Parse multiaddresses given in the config
Expand Down
37 changes: 0 additions & 37 deletions cmd/mesh-bootstrap/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
package main

import (
"database/sql"
"errors"
"fmt"
"os"
"path/filepath"

Expand Down Expand Up @@ -33,40 +30,6 @@ func getPeerstoreDir(config Config) string {
return filepath.Join(config.LevelDBDataDir, "p2p", "peerstore")
}

func getSQLDatabase(config Config) (*sql.DB, error) {
// Currently we only support the postgres driver.
if config.SQLDBEngine != "postgres" {
return nil, errors.New("sqld currently only supports postgres driver")
}

if config.SQLDBConnectionString != "" {
return sql.Open(config.SQLDBEngine, config.SQLDBConnectionString)
}

fmtStr := "postgresql:///%s?host=%s&port=%s&user=%s&password=%s&sslmode=disable"
connstr := fmt.Sprintf(fmtStr, config.SQLDBName, config.SQLDBHost, config.SQLDBPort, config.SQLDBUser, config.SQLDBPassword)

return sql.Open(config.SQLDBEngine, connstr)
}

func prepareSQLDatabase(db *sql.DB) error {
createTableString := "CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA NOT NULL)"
createDHTTable := fmt.Sprintf(createTableString, dhtTableName)
createPeerStoreTable := fmt.Sprintf(createTableString, peerStoreTableName)

_, err := db.Exec(createDHTTable)
if err != nil {
return err
}

_, err = db.Exec(createPeerStoreTable)
if err != nil {
return err
}

return nil
}

func initPrivateKey(path string) (p2pcrypto.PrivKey, error) {
privKey, err := keys.GetPrivateKeyFromPath(path)
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (app *App) Start() error {
RendezvousPoints: rendezvousPoints,
UseBootstrapList: app.config.UseBootstrapList,
BootstrapList: bootstrapList,
DataDir: filepath.Join(app.config.DataDir, "p2p"),
DB: app.db,
CustomMessageValidator: app.orderFilter.ValidatePubSubMessage,
MaxBytesPerSecond: app.config.MaxBytesPerSecond,
}
Expand Down
4 changes: 2 additions & 2 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestOrderSync(t *testing.T) {
}
for i, testCase := range testCases {
testCaseName := fmt.Sprintf("%s (test case %d)", testCase.name, i)
t.Run(testCaseName, runOrdersyncTestCase(t, testCase))
t.Run(testCaseName, runOrdersyncTestCase(testCase))
}
}

Expand All @@ -259,7 +259,7 @@ type ordersyncTestCase struct {

const defaultOrderFilter = "{}"

func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *testing.T) {
func runOrdersyncTestCase(testCase ordersyncTestCase) func(t *testing.T) {
return func(t *testing.T) {
teardownSubTest := setupSubTest(t)
defer teardownSubTest(t)
Expand Down
5 changes: 4 additions & 1 deletion core/ordersync/ordersync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/ethereum"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/scenario"
Expand All @@ -30,12 +31,14 @@ func TestCalculateDelayWithJitters(t *testing.T) {
}

func TestHandleRawRequest(t *testing.T) {
db, err := db.New(context.Background(), db.TestOptions())
require.NoError(t, err)
n, err := p2p.New(
context.Background(),
p2p.Config{
MessageHandler: &noopMessageHandler{},
RendezvousPoints: []string{"/test-rendezvous-point"},
DataDir: "/tmp",
DB: db,
},
)
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions db/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/common"
"github.com/gibson042/canonicaljson-go"
ds "github.com/ipfs/go-datastore"
)

const (
Expand Down Expand Up @@ -48,6 +49,8 @@ type Database interface {
GetMetadata() (*types.Metadata, error)
SaveMetadata(metadata *types.Metadata) error
UpdateMetadata(updateFunc func(oldmetadata *types.Metadata) (newMetadata *types.Metadata)) error
PeerStore() ds.Batching
DHTStore() ds.Batching
}

type Options struct {
Expand Down
84 changes: 84 additions & 0 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,32 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
ethtypes "github.com/ethereum/go-ethereum/core/types"
dstest "github.com/ipfs/go-datastore/test"
"github.com/plaid/go-envvar/envvar"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var contractAddresses = ethereum.GanacheAddresses

// Since the key value store tests take a very long time to run, we disable them
// by default. These tests can be enabled with the `ENABLE_KEY_VALUE_TESTS`
// environment variable.
var keyValueTestsEnabled bool

type TestingFlags struct {
EnableKeyValueTests bool `envvar:"ENABLE_KEY_VALUE_TESTS" default:"false"`
}

func init() {
var flags TestingFlags
if err := envvar.Parse(&flags); err != nil {
panic(fmt.Sprintf("could not parse environment variables: %s", err.Error()))
}
keyValueTestsEnabled = flags.EnableKeyValueTests
testing.Init()
}

func TestAddOrders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1818,6 +1838,70 @@ func makeMiniHeaderFilterTestCases(t *testing.T, db *DB) ([]*types.MiniHeader, [
return storedMiniHeaders, testCases
}

func TestPeerStoreBasic(t *testing.T) {
if !keyValueTestsEnabled {
t.Skip("Key-value store tests are disabled. You can enable them with the ENABLE_KEY_VALUE_TESTS environment variable")
}
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := newTestDB(t, ctx)
peerstore := db.PeerStore()

for _, test := range dstest.BasicSubtests {
test(t, peerstore)
}
}

func TestPeerStoreBatch(t *testing.T) {
if !keyValueTestsEnabled {
t.Skip("Key-value store tests are disabled. You can enable them with the ENABLE_KEY_VALUE_TESTS environment variable")
}
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := newTestDB(t, ctx)
peerstore := db.PeerStore()

for _, test := range dstest.BatchSubtests {
test(t, peerstore)
}
}

func TestDHTStoreBasic(t *testing.T) {
if !keyValueTestsEnabled {
t.Skip("Key-value store tests are disabled. You can enable them with the ENABLE_KEY_VALUE_TESTS environment variable")
}
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := newTestDB(t, ctx)
dhtstore := db.DHTStore()

for _, test := range dstest.BasicSubtests {
test(t, dhtstore)
}
}

func TestDHTStoreBatch(t *testing.T) {
if !keyValueTestsEnabled {
t.Skip("Key-value store tests are disabled. You can enable them with the ENABLE_KEY_VALUE_TESTS environment variable")
}
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := newTestDB(t, ctx)
dhtstore := db.DHTStore()

for _, test := range dstest.BatchSubtests {
test(t, dhtstore)
}
}

// safeSubsliceOrders returns a (shallow) subslice of orders without modifying
// the original slice. Uses the same semantics as slice expressions: low is
// inclusive, hi is exclusive. The returned slice still contains pointers, it
Expand Down
Loading

0 comments on commit d5e4236

Please sign in to comment.