From e773b2ffa6991ad8ae037c3fad24d25847fb834e Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Mon, 17 Aug 2020 15:36:11 -0500 Subject: [PATCH] Finished the persistent storage solution --- db/dexie_datastore.go | 216 ++++++++---------- p2p/opts.go | 1 - p2p/opts_js.go | 1 - packages/mesh-browser-lite/src/database.ts | 2 +- .../src/{key_value_store.ts => datastore.ts} | 83 ++----- 5 files changed, 125 insertions(+), 178 deletions(-) rename packages/mesh-browser-lite/src/{key_value_store.ts => datastore.ts} (66%) diff --git a/db/dexie_datastore.go b/db/dexie_datastore.go index 2dc71cf5c..7ebf84405 100644 --- a/db/dexie_datastore.go +++ b/db/dexie_datastore.go @@ -11,49 +11,102 @@ import ( dsq "github.com/ipfs/go-datastore/query" ) +// Ensure that we are implementing the ds.Batching interface. var _ ds.Batching = &Datastore{} +// NOTE(jalextowle): Close is a noop in this implementation. We do not want a close +// operation to shut down the database connection. +func (d *Datastore) Close() error { + return nil +} + +// NOTE(jalextowle): Sync is not needed in this implementation since operations +// such as Put and Delete are completed before a result is returned. +func (d *Datastore) Sync(ds.Key) error { + return nil +} + +// Datastore provides a Dexie implementation of the ds.Batching interface. The +// corresponding javascript bindings can be found in +// packages/mesh-browser-lite/src/datastore.ts, which is where the bulk of the +// implementation can be found. type Datastore struct { db *DB ctx context.Context dexieStore js.Value } -// io.Closer +type OperationType byte -// FIXME - Is this what we want? -func (d *Datastore) Close() error { - // Noop - return nil +const ( + PUT OperationType = iota + DELETE +) + +// Operation contains all of the data needed to communicate with the Javascript +// bindings that control access to the Dexie datastore. The Javascript bindings +// need to know what the operation should do (put or delete) and the data that +// should be used in the operation. +type Operation struct { + operationType OperationType + key ds.Key + value []byte } -// Sync +func (o *Operation) JSValue() js.Value { + return js.ValueOf(map[string]interface{}{ + "operationType": int(o.operationType), + "key": o.key.String(), + "value": string(o.value), + }) +} -func (d *Datastore) Sync(ds.Key) error { - // Noop - return nil +// Batch implements the ds.Batch interface, which allows Put and Delete operations +// to be queued and then committed all at once. +type Batch struct { + ctx context.Context + dexieStore js.Value + operations []*Operation +} + +func (d *Datastore) Batch() (ds.Batch, error) { + return &Batch{ + ctx: d.ctx, + dexieStore: d.dexieStore, + }, nil } -/// Write +func (b *Batch) Put(key ds.Key, value []byte) error { + b.operations = append(b.operations, &Operation{ + operationType: PUT, + key: key, + value: value, + }) + return nil +} -func (d *Datastore) Put(key ds.Key, value []byte) error { - _, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("putAsync", key.String(), string(value))) - if err != nil { - return convertJSError(err) - } +func (b *Batch) Delete(key ds.Key) error { + b.operations = append(b.operations, &Operation{ + operationType: DELETE, + key: key, + }) return nil } -func (d *Datastore) Delete(key ds.Key) error { - _, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("deleteAsync", key.String())) +// Commit performs a batch of operations on the Dexie datastore. In this implementation, +// all of these operations occur in the same transactional context. +func (b *Batch) Commit() error { + convertibleOperations := make([]interface{}, len(b.operations)) + for i, operation := range b.operations { + convertibleOperations[i] = interface{}(operation) + } + _, err := jsutil.AwaitPromiseContext(b.ctx, b.dexieStore.Call("commitAsync", convertibleOperations)) if err != nil { return convertJSError(err) } return nil } -// Read - func (d *Datastore) Get(key ds.Key) ([]byte, error) { jsResult, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("getAsync", key.String())) if err != nil { @@ -79,7 +132,12 @@ func (d *Datastore) GetSize(key ds.Key) (int, error) { } func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { - jsResults, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("queryAsync", convertQueryToJS(q))) + jsQuery := js.ValueOf(map[string]interface{}{ + "prefix": q.Prefix, + "offset": q.Offset, + "limit": q.Limit, + }) + jsResults, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("queryAsync", jsQuery)) if err != nil { return nil, convertJSError(err) } @@ -92,111 +150,35 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { Size: jsResult.Get("size").Int(), } } + filteredEntries := []dsq.Entry{} + for _, entry := range entries { + passes := true + for _, filter := range q.Filters { + if !filter.Filter(entry) { + passes = false + break + } + } + if passes { + filteredEntries = append(filteredEntries, entry) + } + } + dsq.Sort(q.Orders, entries) return dsq.ResultsWithEntries(q, entries), nil } -/// Batching - -type OperationType byte - -const ( - ADDITION OperationType = iota - // FIXME - Rename to DELETION - REMOVAL -) - -type Operation struct { - operationType OperationType - key ds.Key - value []byte -} - -func (o *Operation) JSValue() js.Value { - return js.ValueOf(map[string]interface{}{ - "operationType": int(o.operationType), - "key": o.key.String(), - "value": string(o.value), - }) -} - -type Batch struct { - ctx context.Context - dexieStore js.Value - operations []*Operation -} - -func (d *Datastore) Batch() (ds.Batch, error) { - return &Batch{ - ctx: d.ctx, - dexieStore: d.dexieStore, - }, nil -} - -func (b *Batch) Commit() error { - convertibleOperations := make([]interface{}, len(b.operations)) - for i, operation := range b.operations { - convertibleOperations[i] = interface{}(operation) - } - _, err := jsutil.AwaitPromiseContext(b.ctx, b.dexieStore.Call("commitAsync", convertibleOperations)) +func (d *Datastore) Put(key ds.Key, value []byte) error { + _, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("putAsync", key.String(), string(value))) if err != nil { return convertJSError(err) } return nil } -func (b *Batch) Put(key ds.Key, value []byte) error { - b.operations = append(b.operations, &Operation{ - operationType: ADDITION, - key: key, - value: value, - }) - return nil -} - -func (b *Batch) Delete(key ds.Key) error { - b.operations = append(b.operations, &Operation{ - operationType: REMOVAL, - key: key, - }) - return nil -} - -/// js conversions - -// FIXME - length checks and code dedupe -func convertQueryToJS(q dsq.Query) js.Value { - jsFilters := make([]interface{}, len(q.Filters)) - for i, filter := range q.Filters { - jsFilters[i] = js.FuncOf(func(this js.Value, args []js.Value) interface{} { - entry := dsq.Entry{ - Key: args[0].Get("key").String(), - Value: []byte(args[0].Get("value").String()), - Size: args[0].Get("size").Int(), - } - return filter.Filter(entry) - }) - } - jsOrders := make([]interface{}, len(q.Orders)) - for i, order := range q.Orders { - jsOrders[i] = js.FuncOf(func(this js.Value, args []js.Value) interface{} { - a := dsq.Entry{ - Key: args[0].Get("key").String(), - Value: []byte(args[0].Get("value").String()), - Size: args[0].Get("size").Int(), - } - b := dsq.Entry{ - Key: args[1].Get("key").String(), - Value: []byte(args[1].Get("value").String()), - Size: args[1].Get("size").Int(), - } - return order.Compare(a, b) - }) +func (d *Datastore) Delete(key ds.Key) error { + _, err := jsutil.AwaitPromiseContext(d.ctx, d.dexieStore.Call("deleteAsync", key.String())) + if err != nil { + return convertJSError(err) } - return js.ValueOf(map[string]interface{}{ - "prefix": q.Prefix, - "filters": jsFilters, - "orders": jsOrders, - "limit": q.Limit, - "offset": q.Offset, - }) + return nil } diff --git a/p2p/opts.go b/p2p/opts.go index 3c47c7d4c..6031a7b4e 100644 --- a/p2p/opts.go +++ b/p2p/opts.go @@ -63,7 +63,6 @@ func getHostOptions(ctx context.Context, config Config) ([]libp2p.Option, error) } advertiseAddrs := []ma.Multiaddr{tcpAdvertiseAddr, wsAdvertiseAddr} - // FIXME(Replace Comment) - Set up the peerstore to use LevelDB. pstore, err := pstoreds.NewPeerstore(ctx, config.DB.PeerStore(), pstoreds.DefaultOpts()) if err != nil { return nil, err diff --git a/p2p/opts_js.go b/p2p/opts_js.go index 6de392f36..9ca945fb1 100644 --- a/p2p/opts_js.go +++ b/p2p/opts_js.go @@ -46,6 +46,5 @@ func getPubSubOptions() []pubsub.Option { // NewDHT returns a new Kademlia DHT instance configured to work with 0x Mesh // in browser environments. func NewDHT(ctx context.Context, db *db.DB, host host.Host) (*dht.IpfsDHT, error) { - // FIXME - Add a key value to Dexie datastore return dht.New(ctx, host, dhtopts.Client(true), dhtopts.Datastore(db.DHTStore()), dhtopts.Protocols(DHTProtocolID)) } diff --git a/packages/mesh-browser-lite/src/database.ts b/packages/mesh-browser-lite/src/database.ts index 770436cbe..20508fa01 100644 --- a/packages/mesh-browser-lite/src/database.ts +++ b/packages/mesh-browser-lite/src/database.ts @@ -11,7 +11,7 @@ import Dexie from 'dexie'; -import { BatchingDatastore } from './key_value_store'; +import { BatchingDatastore } from './datastore'; export type Record = Order | MiniHeader | Metadata; diff --git a/packages/mesh-browser-lite/src/key_value_store.ts b/packages/mesh-browser-lite/src/datastore.ts similarity index 66% rename from packages/mesh-browser-lite/src/key_value_store.ts rename to packages/mesh-browser-lite/src/datastore.ts index 3a9713371..2cca065da 100644 --- a/packages/mesh-browser-lite/src/key_value_store.ts +++ b/packages/mesh-browser-lite/src/datastore.ts @@ -8,42 +8,43 @@ * NOTE(jalextowle): This comment must be here so that typedoc knows that the above * comment is a module comment */ -// FIXME - Add better comments - import Dexie from 'dexie'; -export enum OperationType { +interface Entry { + key: string; + value: string; + size: number; +} + +enum OperationType { Addition, - Removal, + Deletion, } -export interface Operation { +interface Operation { operationType: OperationType; key: string; value?: string; } -export type Filter = (entry: Entry) => boolean; -export type Order = (a: Entry, b: Entry) => number; - -export interface Entry { - key: string; - value: string; - size: number; -} - // NOTE(jalextowle): This is missing several fields from the Query interface in // https://github.com/ipfs/go-datastore. These fields include `returnsSizes` and // `returnExpirations`, which are excluded because we are only satisfying the -// ds.Batching interface. -export interface Query { +// ds.Batching interface. Additionally, we exclude any items that require iterating +// through each key and value in a Dexie transaction. We handle that logic on the +// Go side. +interface Query { prefix: string; // namespaces the query to results whose keys have Prefix - filters: Filter[]; // filter results. apply sequentially - orders: Order[]; // order results. apply hierarchically limit: number; // maximum number of results offset: number; // skip given number of results } +// This implements the subset of the ds.Batching interface that should be implemented +// on the Dexie side. The Go bindings for this system can be found in db/dexie_datastore.go. +// Some aspects of the ds.Batching interface make more sense to implement in Go +// for performance or dependency reasons. The most important example of this is +// that query filtering and ordering is performed on the Go side to avoid converting +// Go functions into Javascript functions. export class BatchingDatastore { private readonly _db: Dexie; private readonly _table: Dexie.Table; @@ -56,16 +57,14 @@ export class BatchingDatastore { } } - /*** ds.Batching ***/ - public async commitAsync(operations: Operation[]): Promise { await this._db.transaction('rw!', this._table, async () => { for (const operation of operations) { if (operation.operationType === OperationType.Addition) { if (!operation.value) { - throw new Error('commitDHTAsync: no value for key'); + throw new Error('commitAsync: no value for key'); } - await this._table.add(operation.value, operation.key); + await this._table.put({ key: operation.key, value: operation.value }); } else { await this._table.delete(operation.key); } @@ -73,18 +72,14 @@ export class BatchingDatastore { }); } - /*** ds.Write ***/ - public async putAsync(key: string, value: string): Promise { - await this._table.put(value, key); + await this._table.put({ key, value }); } public async deleteAsync(key: string): Promise { await this._table.delete(key); } - /*** ds.Read ***/ - public async getAsync(key: string): Promise { const value = await this._table.get(key); return value || ''; @@ -105,13 +100,12 @@ export class BatchingDatastore { query.prefix === '' ? this._table.toCollection() : await this._table.where('key').startsWith(query.prefix); - // FIXME - Is this the correct order for the limit and order fields? - if (query.limit !== 0) { - col = col.limit(query.limit); - } if (query.offset !== 0) { col = col.offset(query.limit); } + if (query.limit !== 0) { + col = col.limit(query.limit); + } const values = await col.toArray(); const entries = (await col.keys()).map((key, i) => { return { @@ -120,20 +114,6 @@ export class BatchingDatastore { size: computeByteSize(values[i]), }; }); - for (const entry of entries) { - let passes = true; - for (const filter of query.filters) { - if (!filter(entry)) { - passes = false; - break; - } - } - if (passes) { - filteredEntries.push(entry); - } - } - const masterComparator = createMasterComparator(query.orders); - filteredEntries.sort(masterComparator); }); return filteredEntries; } @@ -142,16 +122,3 @@ export class BatchingDatastore { function computeByteSize(value: string): number { return new TextEncoder().encode(value).length; } - -function createMasterComparator(orders: Order[]): (a: Entry, b: Entry) => number { - return (a: Entry, b: Entry) => { - let comparison = 0; - for (const order of orders) { - comparison = order(a, b); - if (comparison !== 0) { - return comparison; - } - } - return comparison; - }; -}