Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Persistent DHT and Peerstore #907

Merged
merged 8 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ test-go-parallel:
go test ./... -race -timeout 30s


.PHONY: test-key-value-stores
test-key-value-stores: test-key-value-stores-go test-key-value-stores-wasm


.PHONY: test-key-value-stores-go
test-key-value-stores-go:
ENABLE_KEY_VALUE_TESTS=true go test ./db


.PHONY: test-key-value-stores-wasm
test-key-value-stores-wasm:
WASM_INIT_FILE="$$(pwd)/packages/mesh-browser-shim/dist/browser_shim.js" GOOS=js GOARCH=wasm ENABLE_KEY_VALUE_TESTS=true go test ./db -timeout 30m -tags=browser -exec="$$GOPATH/bin/wasmbrowsertest"


.PHONY: test-go-serial
test-go-serial:
go test ./zeroex/ordervalidator ./zeroex/orderwatch ./core -race -timeout 90s -p=1 --serial
Expand Down
97 changes: 18 additions & 79 deletions cmd/mesh-bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/0xProject/0x-mesh/loghooks"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/p2p/banner"
sqlds "github.com/0xProject/sql-datastore"
"github.com/ipfs/go-datastore"
leveldbStore "github.com/ipfs/go-ds-leveldb"
libp2p "github.com/libp2p/go-libp2p"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
peerstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
"github.com/libp2p/go-libp2p/p2p/host/relay"
filter "github.com/libp2p/go-maddr-filter"
Expand Down Expand Up @@ -77,29 +75,6 @@ type Config struct {
DataStoreType string `envvar:"DATA_STORE_TYPE" default:"leveldb"`
// LevelDBDataDir is the directory used for storing data when using leveldb as data store type.
LevelDBDataDir string `envvar:"LEVELDB_DATA_DIR" default:"0x_mesh"`
// SQLDBConnectionString is the connection URL used to connect to the
// database.
// NOTE: When set it has precedence over SQL_DB_HOST, SQL_DB_PORT etc.
SQLDBConnectionString string `envvar:"SQL_DB_CONNECTION_STRING" default:"" json:"-"`
// SQLDBHost is the database host used to connect to the database when
// using postgres as data store type.
SQLDBHost string `envvar:"SQL_DB_HOST" default:"localhost" json:"-"`
// SQLDBPort is the database port used to connect to the database when
// using postgres as data store type.
SQLDBPort string `envvar:"SQL_DB_PORT" default:"5432" json:"-"`
// SQLDBUser is the database user used to connect to the database when
// using postgres as data store type.
SQLDBUser string `envvar:"SQL_DB_USER" default:"postgres" json:"-"`
// SQLDBPassword is the database password used to connect to the database when
// using postgres as data store type.
SQLDBPassword string `envvar:"SQL_DB_PASSWORD" default:"" json:"-"`
// SQLDBName is the database name to connect to when using
// postgres as data store type.
SQLDBName string `envvar:"SQL_DB_NAME" default:"datastore" json:"-"`
// SQLDBEngine is the underyling database engine to use as the
// database driver.
// NOTE: Currently only `postgres` driver is supported.
SQLDBEngine string `envvar:"SQL_DB_ENGINE" default:"postgres"`
// UseBootstrapList determines whether or not to use the list of hard-coded
// peers to bootstrap the DHT for peer discovery.
UseBootstrapList bool `envvar:"USE_BOOTSTRAP_LIST" default:"true"`
Expand Down Expand Up @@ -162,66 +137,30 @@ func main() {
var kadDHT *dht.IpfsDHT
var newDHT func(h host.Host) (routing.PeerRouting, error)

var peerStore peerstore.Peerstore

// TODO(oskar) - Figure out why returning an anonymous function from
// getNewDHT() is making kadDHT.runBootstrap panicing.
// When solved this big switch case can be removed from main()
switch config.DataStoreType {
case leveldbDataStore:
newDHT = func(h host.Host) (routing.PeerRouting, error) {
var err error
dhtDir := getDHTDir(config)
kadDHT, err = p2p.NewDHT(ctx, dhtDir, h)
if err != nil {
log.WithField("error", err).Fatal("could not create DHT")
}
return kadDHT, err
}

// Set up the peerstore to use LevelDB.
store, err := leveldbStore.NewDatastore(getPeerstoreDir(config), nil)
newDHT = func(h host.Host) (routing.PeerRouting, error) {
var err error
dhtDir := getDHTDir(config)
// Set up the DHT to use LevelDB.
store, err := leveldbStore.NewDatastore(dhtDir, nil)
if err != nil {
log.Fatal(err)
}

peerStore, err = pstoreds.NewPeerstore(ctx, store, pstoreds.DefaultOpts())
if err != nil {
log.Fatal(err)
}

case sqlDataStore:
db, err := getSQLDatabase(config)
if err != nil {
log.WithField("error", err).Fatal("could not create SQL database")
}

err = prepareSQLDatabase(db)
if err != nil {
log.WithField("error", err).Fatal("failed to repare SQL tables for datastores")
}

newDHT = func(h host.Host) (routing.PeerRouting, error) {
var err error
dstore := sqlds.NewDatastore(db, sqlds.NewQueriesForTable(dhtTableName))

kadDHT, err = NewDHTWithDatastore(ctx, dstore, h)
if err != nil {
log.WithField("error", err).Fatal("could not create DHT")
}

return kadDHT, err
return nil, err
}

pstore := sqlds.NewDatastore(db, sqlds.NewQueriesForTable(peerStoreTableName))
peerStore, err = pstoreds.NewPeerstore(ctx, pstore, pstoreds.DefaultOpts())
kadDHT, err = dht.New(ctx, h, dhtopts.Datastore(store), dhtopts.Protocols(p2p.DHTProtocolID))
if err != nil {
log.WithField("error", err).Fatal("could not create peerStore")
log.WithField("error", err).Fatal("could not create DHT")
}
return kadDHT, err
}

default:
log.Fatalf("invalid datastore configured: %s. Expected either %s or %s", config.DataStoreType, leveldbDataStore, sqlDataStore)
// Set up the peerstore to use LevelDB.
store, err := leveldbStore.NewDatastore(getPeerstoreDir(config), nil)
if err != nil {
log.Fatal(err)
}

peerStore, err := pstoreds.NewPeerstore(ctx, store, pstoreds.DefaultOpts())
if err != nil {
log.Fatal(err)
}

// Parse multiaddresses given in the config
Expand Down
37 changes: 0 additions & 37 deletions cmd/mesh-bootstrap/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
package main

import (
"database/sql"
"errors"
"fmt"
"os"
"path/filepath"

Expand Down Expand Up @@ -33,40 +30,6 @@ func getPeerstoreDir(config Config) string {
return filepath.Join(config.LevelDBDataDir, "p2p", "peerstore")
}

func getSQLDatabase(config Config) (*sql.DB, error) {
// Currently we only support the postgres driver.
if config.SQLDBEngine != "postgres" {
return nil, errors.New("sqld currently only supports postgres driver")
}

if config.SQLDBConnectionString != "" {
return sql.Open(config.SQLDBEngine, config.SQLDBConnectionString)
}

fmtStr := "postgresql:///%s?host=%s&port=%s&user=%s&password=%s&sslmode=disable"
connstr := fmt.Sprintf(fmtStr, config.SQLDBName, config.SQLDBHost, config.SQLDBPort, config.SQLDBUser, config.SQLDBPassword)

return sql.Open(config.SQLDBEngine, connstr)
}

func prepareSQLDatabase(db *sql.DB) error {
createTableString := "CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA NOT NULL)"
createDHTTable := fmt.Sprintf(createTableString, dhtTableName)
createPeerStoreTable := fmt.Sprintf(createTableString, peerStoreTableName)

_, err := db.Exec(createDHTTable)
if err != nil {
return err
}

_, err = db.Exec(createPeerStoreTable)
if err != nil {
return err
}

return nil
}

func initPrivateKey(path string) (p2pcrypto.PrivKey, error) {
privKey, err := keys.GetPrivateKeyFromPath(path)
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (app *App) Start() error {
RendezvousPoints: rendezvousPoints,
UseBootstrapList: app.config.UseBootstrapList,
BootstrapList: bootstrapList,
DataDir: filepath.Join(app.config.DataDir, "p2p"),
DB: app.db,
CustomMessageValidator: app.orderFilter.ValidatePubSubMessage,
MaxBytesPerSecond: app.config.MaxBytesPerSecond,
}
Expand Down
4 changes: 2 additions & 2 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestOrderSync(t *testing.T) {
}
for i, testCase := range testCases {
testCaseName := fmt.Sprintf("%s (test case %d)", testCase.name, i)
t.Run(testCaseName, runOrdersyncTestCase(t, testCase))
t.Run(testCaseName, runOrdersyncTestCase(testCase))
}
}

Expand All @@ -259,7 +259,7 @@ type ordersyncTestCase struct {

const defaultOrderFilter = "{}"

func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *testing.T) {
func runOrdersyncTestCase(testCase ordersyncTestCase) func(t *testing.T) {
return func(t *testing.T) {
teardownSubTest := setupSubTest(t)
defer teardownSubTest(t)
Expand Down
5 changes: 4 additions & 1 deletion core/ordersync/ordersync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/ethereum"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/scenario"
Expand All @@ -30,12 +31,14 @@ func TestCalculateDelayWithJitters(t *testing.T) {
}

func TestHandleRawRequest(t *testing.T) {
db, err := db.New(context.Background(), db.TestOptions())
require.NoError(t, err)
n, err := p2p.New(
context.Background(),
p2p.Config{
MessageHandler: &noopMessageHandler{},
RendezvousPoints: []string{"/test-rendezvous-point"},
DataDir: "/tmp",
DB: db,
},
)
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions db/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/common"
"github.com/gibson042/canonicaljson-go"
ds "github.com/ipfs/go-datastore"
)

const (
Expand Down Expand Up @@ -48,6 +49,8 @@ type Database interface {
GetMetadata() (*types.Metadata, error)
SaveMetadata(metadata *types.Metadata) error
UpdateMetadata(updateFunc func(oldmetadata *types.Metadata) (newMetadata *types.Metadata)) error
PeerStore() ds.Batching
DHTStore() ds.Batching
}

type Options struct {
Expand Down
84 changes: 84 additions & 0 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,32 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
ethtypes "github.com/ethereum/go-ethereum/core/types"
dstest "github.com/ipfs/go-datastore/test"
"github.com/plaid/go-envvar/envvar"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var contractAddresses = ethereum.GanacheAddresses

// Since the key value store tests take a very long time to run, we disable them
// by default. These tests can be enabled with the `ENABLE_KEY_VALUE_TESTS`
// environment variable.
var keyValueTestsEnabled bool

type TestingFlags struct {
EnableKeyValueTests bool `envvar:"ENABLE_KEY_VALUE_TESTS" default:"false"`
}

func init() {
var flags TestingFlags
if err := envvar.Parse(&flags); err != nil {
panic(fmt.Sprintf("could not parse environment variables: %s", err.Error()))
}
keyValueTestsEnabled = flags.EnableKeyValueTests
testing.Init()
}

func TestAddOrders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1818,6 +1838,70 @@ func makeMiniHeaderFilterTestCases(t *testing.T, db *DB) ([]*types.MiniHeader, [
return storedMiniHeaders, testCases
}

func TestPeerStoreBasic(t *testing.T) {
if !keyValueTestsEnabled {
t.Skip("Key-value store tests are disabled. You can enable them with the ENABLE_KEY_VALUE_TESTS environment variable")
}
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := newTestDB(t, ctx)
peerstore := db.PeerStore()

for _, test := range dstest.BasicSubtests {
test(t, peerstore)
}
}

func TestPeerStoreBatch(t *testing.T) {
if !keyValueTestsEnabled {
t.Skip("Key-value store tests are disabled. You can enable them with the ENABLE_KEY_VALUE_TESTS environment variable")
}
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := newTestDB(t, ctx)
peerstore := db.PeerStore()

for _, test := range dstest.BatchSubtests {
test(t, peerstore)
}
}

func TestDHTStoreBasic(t *testing.T) {
if !keyValueTestsEnabled {
t.Skip("Key-value store tests are disabled. You can enable them with the ENABLE_KEY_VALUE_TESTS environment variable")
}
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := newTestDB(t, ctx)
dhtstore := db.DHTStore()

for _, test := range dstest.BasicSubtests {
test(t, dhtstore)
}
}

func TestDHTStoreBatch(t *testing.T) {
if !keyValueTestsEnabled {
t.Skip("Key-value store tests are disabled. You can enable them with the ENABLE_KEY_VALUE_TESTS environment variable")
}
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := newTestDB(t, ctx)
dhtstore := db.DHTStore()

for _, test := range dstest.BatchSubtests {
test(t, dhtstore)
}
}

// safeSubsliceOrders returns a (shallow) subslice of orders without modifying
// the original slice. Uses the same semantics as slice expressions: low is
// inclusive, hi is exclusive. The returned slice still contains pointers, it
Expand Down
Loading