Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Finished the persistent storage solution
Browse files Browse the repository at this point in the history
  • Loading branch information
jalextowle committed Aug 17, 2020
1 parent bb63f4c commit 4f70271
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 180 deletions.
2 changes: 0 additions & 2 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ func runOrdersyncTestCase(testCase ordersyncTestCase) func(t *testing.T) {
defer wg.Done()
if err := originalNode.Start(); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
fmt.Println("OrderSync Test: Checkpoint 1")
panic(fmt.Sprintf("%s %s", testCase.name, err))
}
}()
Expand Down Expand Up @@ -304,7 +303,6 @@ func runOrdersyncTestCase(testCase ordersyncTestCase) func(t *testing.T) {
defer wg.Done()
if err := newNode.Start(); err != nil && err != context.Canceled {
// context.Canceled is expected. For any other error, fail the test.
fmt.Println("OrderSync Test: Checkpoint 2")
panic(fmt.Sprintf("%s %s", testCase.name, err))
}
}()
Expand Down
216 changes: 99 additions & 117 deletions db/dexie_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,49 +11,102 @@ import (
dsq "github.com/ipfs/go-datastore/query"
)

// Ensure that we are implementing the ds.Batching interface.
var _ ds.Batching = &Datastore{}

// NOTE(jalextowle): Close is a noop in this implementation. We do not want a close
// operation to shut down the database connection.
func (d *Datastore) Close() error {
return nil
}

// NOTE(jalextowle): Sync is not needed in this implementation since operations
// such as Put and Delete are completed before a result is returned.
func (d *Datastore) Sync(ds.Key) error {
return nil
}

// Datastore provides a Dexie implementation of the ds.Batching interface. The
// corresponding javascript bindings can be found in
// packages/mesh-browser-lite/src/datastore.ts, which is where the bulk of the
// implementation can be found.
type Datastore struct {
db *DB
ctx context.Context
dexieStore js.Value
}

// io.Closer
type OperationType byte

// FIXME - Is this what we want?
func (d *Datastore) Close() error {
// Noop
return nil
const (
PUT OperationType = iota
DELETE
)

// Operation contains all of the data needed to communicate with the Javascript
// bindings that control access to the Dexie datastore. The Javascript bindings
// need to know what the operation should do (put or delete) and the data that
// should be used in the operation.
type Operation struct {
operationType OperationType
key ds.Key
value []byte
}

// Sync
func (o *Operation) JSValue() js.Value {
return js.ValueOf(map[string]interface{}{
"operationType": int(o.operationType),
"key": o.key.String(),
"value": string(o.value),
})
}

func (d *Datastore) Sync(ds.Key) error {
// Noop
return nil
// Batch implements the ds.Batch interface, which allows Put and Delete operations
// to be queued and then committed all at once.
type Batch struct {
ctx context.Context
dexieStore js.Value
operations []*Operation
}

func (d *Datastore) Batch() (ds.Batch, error) {
return &Batch{
ctx: d.ctx,
dexieStore: d.dexieStore,
}, nil
}

/// Write
func (b *Batch) Put(key ds.Key, value []byte) error {
b.operations = append(b.operations, &Operation{
operationType: PUT,
key: key,
value: value,
})
return nil
}

func (d *Datastore) Put(key ds.Key, value []byte) error {
_, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("putAsync", key.String(), string(value)))
if err != nil {
return convertJSError(err)
}
func (b *Batch) Delete(key ds.Key) error {
b.operations = append(b.operations, &Operation{
operationType: DELETE,
key: key,
})
return nil
}

func (d *Datastore) Delete(key ds.Key) error {
_, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("deleteAsync", key.String()))
// Commit performs a batch of operations on the Dexie datastore. In this implementation,
// all of these operations occur in the same transactional context.
func (b *Batch) Commit() error {
convertibleOperations := make([]interface{}, len(b.operations))
for i, operation := range b.operations {
convertibleOperations[i] = interface{}(operation)
}
_, err := jsutil.AwaitPromiseContext(b.ctx, b.dexieStore.Call("commitAsync", convertibleOperations))
if err != nil {
return convertJSError(err)
}
return nil
}

// Read

func (d *Datastore) Get(key ds.Key) ([]byte, error) {
jsResult, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("getAsync", key.String()))
if err != nil {
Expand All @@ -79,7 +132,12 @@ func (d *Datastore) GetSize(key ds.Key) (int, error) {
}

func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
jsResults, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("queryAsync", convertQueryToJS(q)))
jsQuery := js.ValueOf(map[string]interface{}{
"prefix": q.Prefix,
"offset": q.Offset,
"limit": q.Limit,
})
jsResults, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("queryAsync", jsQuery))
if err != nil {
return nil, convertJSError(err)
}
Expand All @@ -92,111 +150,35 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
Size: jsResult.Get("size").Int(),
}
}
filteredEntries := []dsq.Entry{}
for _, entry := range entries {
passes := true
for _, filter := range q.Filters {
if !filter.Filter(entry) {
passes = false
break
}
}
if passes {
filteredEntries = append(filteredEntries, entry)
}
}
dsq.Sort(q.Orders, entries)
return dsq.ResultsWithEntries(q, entries), nil
}

/// Batching

type OperationType byte

const (
ADDITION OperationType = iota
// FIXME - Rename to DELETION
REMOVAL
)

type Operation struct {
operationType OperationType
key ds.Key
value []byte
}

func (o *Operation) JSValue() js.Value {
return js.ValueOf(map[string]interface{}{
"operationType": int(o.operationType),
"key": o.key.String(),
"value": string(o.value),
})
}

type Batch struct {
ctx context.Context
dexieStore js.Value
operations []*Operation
}

func (d *Datastore) Batch() (ds.Batch, error) {
return &Batch{
ctx: d.ctx,
dexieStore: d.dexieStore,
}, nil
}

func (b *Batch) Commit() error {
convertibleOperations := make([]interface{}, len(b.operations))
for i, operation := range b.operations {
convertibleOperations[i] = interface{}(operation)
}
_, err := jsutil.AwaitPromiseContext(b.ctx, b.dexieStore.Call("commitAsync", convertibleOperations))
func (d *Datastore) Put(key ds.Key, value []byte) error {
_, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("putAsync", key.String(), string(value)))
if err != nil {
return convertJSError(err)
}
return nil
}

func (b *Batch) Put(key ds.Key, value []byte) error {
b.operations = append(b.operations, &Operation{
operationType: ADDITION,
key: key,
value: value,
})
return nil
}

func (b *Batch) Delete(key ds.Key) error {
b.operations = append(b.operations, &Operation{
operationType: REMOVAL,
key: key,
})
return nil
}

/// js conversions

// FIXME - length checks and code dedupe
func convertQueryToJS(q dsq.Query) js.Value {
jsFilters := make([]interface{}, len(q.Filters))
for i, filter := range q.Filters {
jsFilters[i] = js.FuncOf(func(this js.Value, args []js.Value) interface{} {
entry := dsq.Entry{
Key: args[0].Get("key").String(),
Value: []byte(args[0].Get("value").String()),
Size: args[0].Get("size").Int(),
}
return filter.Filter(entry)
})
}
jsOrders := make([]interface{}, len(q.Orders))
for i, order := range q.Orders {
jsOrders[i] = js.FuncOf(func(this js.Value, args []js.Value) interface{} {
a := dsq.Entry{
Key: args[0].Get("key").String(),
Value: []byte(args[0].Get("value").String()),
Size: args[0].Get("size").Int(),
}
b := dsq.Entry{
Key: args[1].Get("key").String(),
Value: []byte(args[1].Get("value").String()),
Size: args[1].Get("size").Int(),
}
return order.Compare(a, b)
})
func (d *Datastore) Delete(key ds.Key) error {
_, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("deleteAsync", key.String()))
if err != nil {
return convertJSError(err)
}
return js.ValueOf(map[string]interface{}{
"prefix": q.Prefix,
"filters": jsFilters,
"orders": jsOrders,
"limit": q.Limit,
"offset": q.Offset,
})
return nil
}
1 change: 0 additions & 1 deletion p2p/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func getHostOptions(ctx context.Context, config Config) ([]libp2p.Option, error)
}
advertiseAddrs := []ma.Multiaddr{tcpAdvertiseAddr, wsAdvertiseAddr}

// FIXME(Replace Comment) - Set up the peerstore to use LevelDB.
pstore, err := pstoreds.NewPeerstore(ctx, config.DB.PeerStore(), pstoreds.DefaultOpts())
if err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion p2p/opts_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@ func getPubSubOptions() []pubsub.Option {
// NewDHT returns a new Kademlia DHT instance configured to work with 0x Mesh
// in browser environments.
func NewDHT(ctx context.Context, db *db.DB, host host.Host) (*dht.IpfsDHT, error) {
// FIXME - Add a key value to Dexie datastore
return dht.New(ctx, host, dhtopts.Client(true), dhtopts.Datastore(db.DHTStore()), dhtopts.Protocols(DHTProtocolID))
}
2 changes: 1 addition & 1 deletion packages/mesh-browser-lite/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import Dexie from 'dexie';

import { BatchingDatastore } from './key_value_store';
import { BatchingDatastore } from './datastore';

export type Record = Order | MiniHeader | Metadata;

Expand Down
Loading

0 comments on commit 4f70271

Please sign in to comment.