Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Persistent DHT and Peerstore #907

Merged
merged 8 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/mesh-bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,12 @@ func main() {
newDHT = func(h host.Host) (routing.PeerRouting, error) {
var err error
dhtDir := getDHTDir(config)
kadDHT, err = p2p.NewDHT(ctx, dhtDir, h)
// Set up the DHT to use LevelDB.
store, err := leveldbStore.NewDatastore(dhtDir, nil)
if err != nil {
return nil, err
}
kadDHT, err = dht.New(ctx, h, dhtopts.Datastore(store), dhtopts.Protocols(p2p.DHTProtocolID))
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.WithField("error", err).Fatal("could not create DHT")
}
Expand Down
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (app *App) Start() error {
RendezvousPoints: rendezvousPoints,
UseBootstrapList: app.config.UseBootstrapList,
BootstrapList: bootstrapList,
DataDir: filepath.Join(app.config.DataDir, "p2p"),
DB: app.db,
CustomMessageValidator: app.orderFilter.ValidatePubSubMessage,
MaxBytesPerSecond: app.config.MaxBytesPerSecond,
}
Expand Down
6 changes: 4 additions & 2 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestOrderSync(t *testing.T) {
}
for i, testCase := range testCases {
testCaseName := fmt.Sprintf("%s (test case %d)", testCase.name, i)
t.Run(testCaseName, runOrdersyncTestCase(t, testCase))
t.Run(testCaseName, runOrdersyncTestCase(testCase))
}
}

Expand All @@ -259,7 +259,7 @@ type ordersyncTestCase struct {

const defaultOrderFilter = "{}"

func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *testing.T) {
func runOrdersyncTestCase(testCase ordersyncTestCase) func(t *testing.T) {
return func(t *testing.T) {
teardownSubTest := setupSubTest(t)
defer teardownSubTest(t)
Expand All @@ -275,6 +275,7 @@ func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *test
defer wg.Done()
if err := originalNode.Start(); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
fmt.Println("OrderSync Test: Checkpoint 1")
panic(fmt.Sprintf("%s %s", testCase.name, err))
}
}()
Expand Down Expand Up @@ -303,6 +304,7 @@ func runOrdersyncTestCase(t *testing.T, testCase ordersyncTestCase) func(t *test
defer wg.Done()
if err := newNode.Start(); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
fmt.Println("OrderSync Test: Checkpoint 2")
panic(fmt.Sprintf("%s %s", testCase.name, err))
}
}()
Expand Down
5 changes: 4 additions & 1 deletion core/ordersync/ordersync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/ethereum"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/scenario"
Expand All @@ -30,12 +31,14 @@ func TestCalculateDelayWithJitters(t *testing.T) {
}

func TestHandleRawRequest(t *testing.T) {
db, err := db.New(context.Background(), db.TestOptions())
require.NoError(t, err)
n, err := p2p.New(
context.Background(),
p2p.Config{
MessageHandler: &noopMessageHandler{},
RendezvousPoints: []string{"/test-rendezvous-point"},
DataDir: "/tmp",
DB: db,
},
)
require.NoError(t, err)
Expand Down
202 changes: 202 additions & 0 deletions db/dexie_datastore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// +build js, wasm

package db

import (
"context"
"syscall/js"

"github.com/0xProject/0x-mesh/packages/mesh-browser/go/jsutil"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
)

var _ ds.Batching = &Datastore{}

type Datastore struct {
db *DB
ctx context.Context
dexieStore js.Value
}

// io.Closer

// FIXME - Is this what we want?
func (d *Datastore) Close() error {
// Noop
return nil
}

// Sync

func (d *Datastore) Sync(ds.Key) error {
// Noop
return nil
}

/// Write

func (d *Datastore) Put(key ds.Key, value []byte) error {
_, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("putAsync", key.String(), string(value)))
if err != nil {
return convertJSError(err)
}
return nil
}

func (d *Datastore) Delete(key ds.Key) error {
_, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("deleteAsync", key.String()))
if err != nil {
return convertJSError(err)
}
return nil
}

// Read

func (d *Datastore) Get(key ds.Key) ([]byte, error) {
jsResult, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("getAsync", key.String()))
if err != nil {
return nil, convertJSError(err)
}
return []byte(jsResult.String()), nil
}

func (d *Datastore) Has(key ds.Key) (bool, error) {
jsResult, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("hasAsync", key.String()))
if err != nil {
return false, convertJSError(err)
}
return jsResult.Bool(), nil
}

func (d *Datastore) GetSize(key ds.Key) (int, error) {
jsResult, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("getSizeAsync", key.String()))
if err != nil {
return 0, convertJSError(err)
}
return jsResult.Int(), nil
}

func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
jsResults, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("queryAsync", convertQueryToJS(q)))
if err != nil {
return nil, convertJSError(err)
}
entries := make([]dsq.Entry, jsResults.Get("length").Int())
for i := 0; i < jsResults.Get("length").Int(); i++ {
jsResult := jsResults.Index(i)
entries[i] = dsq.Entry{
Key: jsResult.Get("key").String(),
Value: []byte(jsResult.Get("value").String()),
Size: jsResult.Get("size").Int(),
}
}
return dsq.ResultsWithEntries(q, entries), nil
}

/// Batching

type OperationType byte

const (
ADDITION OperationType = iota
// FIXME - Rename to DELETION
REMOVAL
)

type Operation struct {
operationType OperationType
key ds.Key
value []byte
}

func (o *Operation) JSValue() js.Value {
return js.ValueOf(map[string]interface{}{
"operationType": int(o.operationType),
"key": o.key.String(),
"value": string(o.value),
})
}

type Batch struct {
ctx context.Context
dexieStore js.Value
operations []*Operation
}

func (d *Datastore) Batch() (ds.Batch, error) {
return &Batch{
ctx: d.ctx,
dexieStore: d.dexieStore,
}, nil
}

func (b *Batch) Commit() error {
convertibleOperations := make([]interface{}, len(b.operations))
for i, operation := range b.operations {
convertibleOperations[i] = interface{}(operation)
}
_, err := jsutil.AwaitPromiseContext(b.ctx, b.dexieStore.Call("commitAsync", convertibleOperations))
if err != nil {
return convertJSError(err)
}
return nil
}

func (b *Batch) Put(key ds.Key, value []byte) error {
b.operations = append(b.operations, &Operation{
operationType: ADDITION,
key: key,
value: value,
})
return nil
}

func (b *Batch) Delete(key ds.Key) error {
b.operations = append(b.operations, &Operation{
operationType: REMOVAL,
key: key,
})
return nil
}

/// js conversions

// FIXME - length checks and code dedupe
func convertQueryToJS(q dsq.Query) js.Value {
jsFilters := make([]interface{}, len(q.Filters))
for i, filter := range q.Filters {
jsFilters[i] = js.FuncOf(func(this js.Value, args []js.Value) interface{} {
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
entry := dsq.Entry{
Key: args[0].Get("key").String(),
Value: []byte(args[0].Get("value").String()),
Size: args[0].Get("size").Int(),
}
return filter.Filter(entry)
})
}
jsOrders := make([]interface{}, len(q.Orders))
for i, order := range q.Orders {
jsOrders[i] = js.FuncOf(func(this js.Value, args []js.Value) interface{} {
a := dsq.Entry{
Key: args[0].Get("key").String(),
Value: []byte(args[0].Get("value").String()),
Size: args[0].Get("size").Int(),
}
b := dsq.Entry{
Key: args[1].Get("key").String(),
Value: []byte(args[1].Get("value").String()),
Size: args[1].Get("size").Int(),
}
return order.Compare(a, b)
})
}
return js.ValueOf(map[string]interface{}{
"prefix": q.Prefix,
"filters": jsFilters,
"orders": jsOrders,
"limit": q.Limit,
"offset": q.Offset,
})
}
17 changes: 17 additions & 0 deletions db/dexie_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/gibson042/canonicaljson-go"
"github.com/google/uuid"
ds "github.com/ipfs/go-datastore"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -90,6 +91,22 @@ func New(ctx context.Context, opts *Options) (database *DB, err error) {
}, nil
}

func (db *DB) PeerStore() ds.Batching {
dexieStore := db.dexie.Call("peerStore")
return &Datastore{
ctx: db.ctx,
dexieStore: dexieStore,
}
}

func (db *DB) DHTStore() ds.Batching {
dexieStore := db.dexie.Call("dhtStore")
return &Datastore{
ctx: db.ctx,
dexieStore: dexieStore,
}
}

func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (alreadyStored []common.Hash, added []*types.OrderWithMetadata, removed []*types.OrderWithMetadata, err error) {
defer func() {
if r := recover(); r != nil {
Expand Down
22 changes: 22 additions & 0 deletions db/sql_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (

"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/db/sqltypes"
"github.com/0xProject/sql-datastore"
"github.com/ethereum/go-ethereum/common"
"github.com/gibson042/canonicaljson-go"
"github.com/google/uuid"
"github.com/ido50/sqlz"
ds "github.com/ipfs/go-datastore"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
)
Expand Down Expand Up @@ -96,6 +98,16 @@ func New(ctx context.Context, opts *Options) (*DB, error) {
return db, nil
}

// FIXME
func (db *DB) DHTStore() ds.Batching {
return sqlds.NewDatastore(db.sqldb.DB.DB, sqlds.NewQueriesForTable("dhtstore"))
}

// FIXME
func (db *DB) PeerStore() ds.Batching {
return sqlds.NewDatastore(db.sqldb.DB.DB, sqlds.NewQueriesForTable("peerstore"))
}

// TODO(albrow): Use a proper migration tool. We don't technically need this
// now but it will be necessary if we ever change the database schema.
// Note(albrow): If needed, we can optimize this by adding indexes to the
Expand Down Expand Up @@ -143,6 +155,16 @@ CREATE TABLE IF NOT EXISTS metadata (
ethRPCRequestsSentInCurrentUTCDay BIGINT NOT NULL,
startOfCurrentUTCDay DATETIME NOT NULL
);

CREATE TABLE IF NOT EXISTS peerstore (
key TEXT NOT NULL UNIQUE,
data BYTEA NOT NULL
);

CREATE TABLE IF NOT EXISTS dhtstore (
key TEXT NOT NULL UNIQUE,
data BYTEA NOT NULL
);
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
`

// Note(albrow): If needed, we can optimize this by using prepared
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ replace (
)

require (
github.com/0xProject/sql-datastore v0.0.0-20200129193319-32397013f115
github.com/0xProject/sql-datastore v0.0.0-20200812212451-239b36c67c16
github.com/99designs/gqlgen v0.11.3
github.com/agnivade/levenshtein v1.1.0 // indirect
github.com/albrow/stringset v2.1.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/0xProject/goleveldb v1.0.1-0.20200602173211-6ee893c9b83a/go.mod h1:1J
github.com/0xProject/qunit v0.0.0-20190730000255-81c18fdf7752/go.mod h1:Onz5mS+Tpffz0tyRWdHDrqKcQ1ZFNeRhYHrNAkaMgeQ=
github.com/0xProject/sql-datastore v0.0.0-20200129193319-32397013f115 h1:OHq6PP4Y8Pjmhm9UB3RtYWQSgxJBQplojLCDZZLrVDg=
github.com/0xProject/sql-datastore v0.0.0-20200129193319-32397013f115/go.mod h1:7icquWqYm+GkgQsUyBs0C0N1SyCHXQYBnoHaazVYDQ0=
github.com/0xProject/sql-datastore v0.0.0-20200812212451-239b36c67c16 h1:bgRTbK0Yv/zemVIIyoDD1MWCg6Ru7eVtE7+XfDVkxos=
github.com/0xProject/sql-datastore v0.0.0-20200812212451-239b36c67c16/go.mod h1:7icquWqYm+GkgQsUyBs0C0N1SyCHXQYBnoHaazVYDQ0=
github.com/99designs/gqlgen v0.11.3 h1:oFSxl1DFS9X///uHV3y6CEfpcXWrDUxVblR4Xib2bs4=
github.com/99designs/gqlgen v0.11.3/go.mod h1:RgX5GRRdDWNkh4pBrdzNpNPFVsdoUFY2+adM6nb1N+4=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
Expand Down
Loading