Skip to content
This repository has been archived by the owner on Jan 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #29 from rgraphql/master
Browse files Browse the repository at this point in the history
release: rewrite encoding algorithm
  • Loading branch information
paralin authored Jun 5, 2017
2 parents dbcac2e + 2286150 commit 39d4fb5
Show file tree
Hide file tree
Showing 22 changed files with 913 additions and 1,067 deletions.
44 changes: 42 additions & 2 deletions PROTOCOL.md → docs/DESIGN_GENERAL_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
Protocol
========
# Query-Tree Negotiation

When we initially connect to the server, we build a snapshot of the tree and send it to the server. Following, every update to the tree gets transmitted to the server as queries are applied to the store.

Expand Down Expand Up @@ -124,3 +123,44 @@ query myQuery($a: Int!) {
```

The server can be given variable values at the same time as query tree additions. However, the server will forget any variables that aren't relevant to the current query. This way the server keeps only the data it needs in memory with minimal messaging overhead and de-duplicated variable values.

# Distributing data to queries efficiently

Issue: we have a result tree that looks like:

```
0: {
1: {
2: "John",
3: "Doe",
4: 28
5: [153, 148, 140, 133]
}
}
```

The array in qnode 5 needs to be shared between the queries. We can detect when we have reached a leaf when there are no child query nodes.

Here we have the issue. Cursors often extend into the values, past the qnode child leaves. We need to detach the cursors in the queries when reaching a value node.

- Cursors will need to build a "path so far" array (unfortunately).
- This is necessary for caching.
- Array of tuples, with `[Kind, (ArrayIndex/QueryNodeID), ValueKind]`
- Examples (note that "value" in these points to location, is not a copy):
- `ResultTree: {1: {2: [6123, 1523]}}`
- Sequence:
- `[Kind.QNodeID, 1, ValueKind.Object]`
- `[Kind.QNodeID, 2, ValueKind.Value, [6123, 1623]]`
- `ResultTree: {1: [{2: "test"}, {2: "what"}]}`
- Sequence:
- `[Kind.QNodeID, 1, ValueKind.Array]`
- `[Kind.ArrayIndex, 0, ValueKind.Object]`
- When reaching a value, drop the path so far and result tree.
- Allow passing a map of query ID -> location pointers to a result tree cursor.
- When adding a new query:
- Iterate `DFS` over the current known result tree, applying values to the result.
- Iterate over all known cursors, and add query by:
- If the path sequence terminates in a Value then ignore cursor.
- Iterate over path sequence, updating location.
- When removing a query:
- Iterate over all known cursors, remove query ID from map.
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
"main": "./lib/index.js",
"types": "./lib/index.d.ts",
"peerDependencies": {
"rgraphql": "^0.3.0",
"rgraphql": "^0.4.5",
"rxjs": "^5.0.0"
},
"dependencies": {
"lodash": "^4.17.0",
"graphql": "^0.9.0",
"rgraphql": "^0.3.0",
"lodash": "^4.0.0",
"lru_map": "^0.3.0",
"rgraphql": "^0.4.0",
"rxjs": "^5.0.0"
},
"scripts": {
Expand All @@ -22,6 +23,7 @@
"lint": "tslint -c tslint.json --project tsconfig.json --type-check",
"mocha": "ts-node node_modules/istanbul/lib/cli.js cover -e .ts -x \"*.d.ts\" -x \"*.spec.ts\" test/run_tests.js",
"mocha-nocover": "ts-node test/run_tests.js",
"mocha-nocover-debug": "ts-node debug test/run_tests.js",
"semantic-release": "semantic-release pre && npm publish && semantic-release post"
},
"devDependencies": {
Expand Down
34 changes: 22 additions & 12 deletions src/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ import { ITransport } from './transport';
import {
IRGQLServerMessage,
IRGQLClientMessage,
CacheStrategy,
IRGQLValue,
RGQLValue,
Kind,
} from 'rgraphql';
import {
parse,
} from 'graphql';

class MockTransport implements ITransport {
public messageHandler: (mes: IRGQLServerMessage) => void;
private queryIdCtr = 1;

public onMessage(cb: (mes: IRGQLServerMessage) => void) {
this.messageHandler = cb;
Expand All @@ -18,6 +23,10 @@ class MockTransport implements ITransport {
public send(msg: IRGQLClientMessage) {
console.log(`Sending: ${JSON.stringify(msg)}`);
}

public nextQueryId(): number {
return this.queryIdCtr++;
}
}

describe('SoyuzClient', () => {
Expand Down Expand Up @@ -49,24 +58,25 @@ query myQuery($age: Int) {
});
sub.subscribe((val) => {
console.log(`Query returned value: ${JSON.stringify(val)}`);
if (val.data && val.data.allPeople && val.data.allPeople.length) {
if (val.data && val.data.allPeople && val.data.allPeople.length && val.data.allPeople[0].name === 'Test') {
done();
}
});
console.log('Setting transport.');
client.setTransport(mt);
let batchValues: IRGQLValue[] = [
{queryNodeId: 1},
{arrayIndex: 1},
{queryNodeId: 2, value: {kind: Kind.PRIMITIVE_KIND_STRING, stringValue: 'Test'}},
];
let encValues: Uint8Array[] = [];
for (let v of batchValues) {
encValues.push(RGQLValue.encode(v).finish());
}
let msgs: IRGQLServerMessage[] = [
{mutateValue: {valueNodeId: 1, queryNodeId: 1, isArray: true}},
{mutateValue: {valueNodeId: 4, queryNodeId: 1, parentValueNodeId: 1}},
{
mutateValue: {
valueNodeId: 5,
parentValueNodeId: 4,
queryNodeId: 2,
valueJson: '"John"',
hasValue: true,
},
},
{valueInit: {queryId: 1, resultId: 1, cacheSize: 200, cacheStrategy: CacheStrategy.CACHE_LRU}},
{valueBatch: {resultId: 1, values: encValues}},
{valueFinalize: {resultId: 1}},
];
for (let msg of msgs) {
mt.messageHandler(msg);
Expand Down
114 changes: 62 additions & 52 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,62 +1,73 @@
import { QueryTreeNode } from './query-tree';
import { ValueTreeNode } from './value-tree';
import { ResultTree } from './result';
import { ITransport } from './transport';
import { ClientBus } from './client-bus';
import { RunningQuery } from './running-query';
import {
ObservableQuery,
IQueryOptions,
} from './query';
import {
Mutation,
IMutationOptions,
} from './mutation';
import {
ISoyuzClientContext,
ISoyuzSerialOperation,
} from './interfaces';
import { parse, OperationDefinitionNode } from 'graphql';
import { simplifyQueryAst } from './util/graphql';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { Subscription } from 'rxjs/Subscription';

// Soyuz client.
export class SoyuzClient {
private queryTree: QueryTreeNode;
private context = new BehaviorSubject<ISoyuzClientContext>(null);
// queryTree holds the global live query tree.
private queryTree: QueryTreeNode = new QueryTreeNode();
private transportIdCounter = 0;

// Active serial operations in-flight.
private serialOperations: { [operationId: number]: ISoyuzSerialOperation } = {};
private serialOperationIdCounter = 0;

constructor() {
this.queryTree = new QueryTreeNode();
this.initHandlers();
}
private transport: ITransport;
private queries: { [id: number]: RunningQuery } = {};
private primaryQueryId: number;
private primaryResultTree: BehaviorSubject<ResultTree> = new BehaviorSubject<ResultTree>(null);
private transportSubs: Subscription[] = [];

// Set transport causes the client to start using a new transport to talk to the server.
// Pass null to stop using the previous transport.
public setTransport(transport: ITransport) {
if (this.context.value && this.context.value.transport === transport) {
if (this.transport === transport) {
return;
}

if (this.transport) {
for (let queryId in this.queries) {
if (!this.queries.hasOwnProperty(queryId)) {
continue;
}
this.queries[queryId].dispose();
}
this.queries = {};
for (let sub of this.transportSubs) {
sub.unsubscribe();
}
this.transportSubs.length = 0;
}

if (!transport) {
this.context.next(null);
return;
}

let tid: number = this.transportIdCounter++;
let vtr = new ValueTreeNode(this.queryTree);
let clib = new ClientBus(transport, this.queryTree, vtr, this.serialOperations);
this.context.next({
transport: transport,
valueTree: vtr,
clientBus: clib,
});
// start the initial root query
let query = new RunningQuery(transport, this.queryTree, 'query');
this.transportSubs.push(query.resultTree.subscribe((tree) => {
this.primaryResultTree.next(tree);
}));
this.primaryQueryId = query.id;
this.queries = {};
this.queries[query.id] = query;
query.resultTree.subscribe({complete: () => {
if (this.queries[query.id] === query) {
delete this.queries[query.id];
}
}});
}

// Build a query against the system.
public query<T>(options: IQueryOptions): ObservableQuery<T> {
public query<T>(options: IQueryOptions): ObservableQuery {
if (!options || !options.query) {
throw new Error('You must specify a options object and query.');
}
Expand All @@ -70,10 +81,7 @@ export class SoyuzClient {
if (!odef) {
throw new Error('Your provided query document did not contain a query definition.');
}
return new ObservableQuery<T>(this.context,
this.queryTree,
odef,
options.variables);
return new ObservableQuery(this.primaryResultTree, this.queryTree, odef, options.variables);
}

// Execute a mutation against the system.
Expand All @@ -91,27 +99,29 @@ export class SoyuzClient {
if (!odef) {
throw new Error('Your provided mutation document did not contain a mutation definition.');
}
let operationId = ++this.serialOperationIdCounter;
let mutation: ISoyuzSerialOperation = new Mutation<T>(operationId, this.context, odef, options.variables);
this.startSerialOperation(operationId, mutation);
return mutation.asPromise();
}

// startSerialOperation begins a already-built operation.
private startSerialOperation(id: number, operation: ISoyuzSerialOperation) {
this.serialOperations[id] = operation;
operation.init();
}

private initHandlers() {
let lastContext: ISoyuzClientContext;
this.context.subscribe((ctx) => {
if (lastContext) {
lastContext.valueTree.dispose();
lastContext.clientBus.dispose();
// start the query.
let qt = new QueryTreeNode();
let qr = new RunningQuery(this.transport, qt, 'mutation');
let uqr = qt.buildQuery(odef, options.variables || {});
this.queries[qr.id] = qr;
let lrt: any;
let data: any;
let rtsub = qr.resultTree.subscribe((rt) => {
if (!rt || rt === lrt) {
return;
}

lastContext = ctx;
lrt = rt;
data = rt.addQuery(uqr.id, (id) => {});
});
return new Promise<T>((reject, resolve) => {
qr.resultTree.subscribe({
error: (err) => {
reject(err);
},
complete: () => {
resolve(data);
},
});
});
}
}
18 changes: 0 additions & 18 deletions src/interfaces.ts

This file was deleted.

Loading

0 comments on commit 39d4fb5

Please sign in to comment.