diff --git a/package.json b/package.json index 760b8e3..e5edb76 100644 --- a/package.json +++ b/package.json @@ -6,12 +6,13 @@ "main": "./lib/index.js", "types": "./lib/index.d.ts", "peerDependencies": { - "rgraphql": "^0.3.0", + "rgraphql": "^0.4.5", "rxjs": "^5.0.0" }, "dependencies": { - "lodash": "^4.17.0", "graphql": "^0.9.0", + "lodash": "^4.17.0", + "lru_map": "^0.3.3", "rgraphql": "^0.3.0", "rxjs": "^5.0.0" }, @@ -22,6 +23,7 @@ "lint": "tslint -c tslint.json --project tsconfig.json --type-check", "mocha": "ts-node node_modules/istanbul/lib/cli.js cover -e .ts -x \"*.d.ts\" -x \"*.spec.ts\" test/run_tests.js", "mocha-nocover": "ts-node test/run_tests.js", + "mocha-nocover-debug": "ts-node debug test/run_tests.js", "semantic-release": "semantic-release pre && npm publish && semantic-release post" }, "devDependencies": { diff --git a/src/client.spec.ts b/src/client.spec.ts index 9d546a9..7f8f443 100644 --- a/src/client.spec.ts +++ b/src/client.spec.ts @@ -3,6 +3,10 @@ import { ITransport } from './transport'; import { IRGQLServerMessage, IRGQLClientMessage, + CacheStrategy, + IRGQLValue, + RGQLValue, + Kind, } from 'rgraphql'; import { parse, @@ -10,6 +14,7 @@ import { class MockTransport implements ITransport { public messageHandler: (mes: IRGQLServerMessage) => void; + private queryIdCtr = 1; public onMessage(cb: (mes: IRGQLServerMessage) => void) { this.messageHandler = cb; @@ -18,6 +23,10 @@ class MockTransport implements ITransport { public send(msg: IRGQLClientMessage) { console.log(`Sending: ${JSON.stringify(msg)}`); } + + public nextQueryId(): number { + return this.queryIdCtr++; + } } describe('SoyuzClient', () => { @@ -49,24 +58,25 @@ query myQuery($age: Int) { }); sub.subscribe((val) => { console.log(`Query returned value: ${JSON.stringify(val)}`); - if (val.data && val.data.allPeople && val.data.allPeople.length) { + if (val.data && val.data.allPeople && val.data.allPeople.length && val.data.allPeople[0].name === 'Test') { done(); } }); console.log('Setting transport.'); client.setTransport(mt); + let batchValues: IRGQLValue[] = [ + {queryNodeId: 1}, + {arrayIndex: 1}, + {queryNodeId: 2, value: {kind: Kind.PRIMITIVE_KIND_STRING, stringValue: 'Test'}}, + ]; + let encValues: Uint8Array[] = []; + for (let v of batchValues) { + encValues.push(RGQLValue.encode(v).finish()); + } let msgs: IRGQLServerMessage[] = [ - {mutateValue: {valueNodeId: 1, queryNodeId: 1, isArray: true}}, - {mutateValue: {valueNodeId: 4, queryNodeId: 1, parentValueNodeId: 1, arrayIdx: 1}}, - { - mutateValue: { - valueNodeId: 5, - parentValueNodeId: 4, - queryNodeId: 2, - valueJson: '"John"', - hasValue: true, - }, - }, + {valueInit: {queryId: 1, resultId: 1, cacheSize: 200, cacheStrategy: CacheStrategy.CACHE_LRU}}, + {valueBatch: {resultId: 1, values: encValues}}, + {valueFinalize: {resultId: 1}}, ]; for (let msg of msgs) { mt.messageHandler(msg); diff --git a/src/client.ts b/src/client.ts index eb07cac..e58681c 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,62 +1,73 @@ import { QueryTreeNode } from './query-tree'; -import { ValueTreeNode } from './value-tree'; +import { ResultTree } from './result'; import { ITransport } from './transport'; -import { ClientBus } from './client-bus'; +import { RunningQuery } from './running-query'; import { ObservableQuery, IQueryOptions, } from './query'; import { - Mutation, IMutationOptions, } from './mutation'; -import { - ISoyuzClientContext, - ISoyuzSerialOperation, -} from './interfaces'; import { parse, OperationDefinitionNode } from 'graphql'; import { simplifyQueryAst } from './util/graphql'; import { BehaviorSubject } from 'rxjs/BehaviorSubject'; +import { Subscription } from 'rxjs/Subscription'; // Soyuz client. export class SoyuzClient { - private queryTree: QueryTreeNode; - private context = new BehaviorSubject(null); + // queryTree holds the global live query tree. + private queryTree: QueryTreeNode = new QueryTreeNode(); private transportIdCounter = 0; - - // Active serial operations in-flight. - private serialOperations: { [operationId: number]: ISoyuzSerialOperation } = {}; - private serialOperationIdCounter = 0; - - constructor() { - this.queryTree = new QueryTreeNode(); - this.initHandlers(); - } + private transport: ITransport; + private queries: { [id: number]: RunningQuery } = {}; + private primaryQueryId: number; + private primaryResultTree: BehaviorSubject = new BehaviorSubject(null); + private transportSubs: Subscription[] = []; // Set transport causes the client to start using a new transport to talk to the server. // Pass null to stop using the previous transport. public setTransport(transport: ITransport) { - if (this.context.value && this.context.value.transport === transport) { + if (this.transport === transport) { return; } + if (this.transport) { + for (let queryId in this.queries) { + if (!this.queries.hasOwnProperty(queryId)) { + continue; + } + this.queries[queryId].dispose(); + } + this.queries = {}; + for (let sub of this.transportSubs) { + sub.unsubscribe(); + } + this.transportSubs.length = 0; + } + if (!transport) { - this.context.next(null); return; } let tid: number = this.transportIdCounter++; - let vtr = new ValueTreeNode(this.queryTree); - let clib = new ClientBus(transport, this.queryTree, vtr, this.serialOperations); - this.context.next({ - transport: transport, - valueTree: vtr, - clientBus: clib, - }); + // start the initial root query + let query = new RunningQuery(transport, this.queryTree, 'query'); + this.transportSubs.push(query.resultTree.subscribe((tree) => { + this.primaryResultTree.next(tree); + })); + this.primaryQueryId = query.id; + this.queries = {}; + this.queries[query.id] = query; + query.resultTree.subscribe({complete: () => { + if (this.queries[query.id] === query) { + delete this.queries[query.id]; + } + }}); } // Build a query against the system. - public query(options: IQueryOptions): ObservableQuery { + public query(options: IQueryOptions): ObservableQuery { if (!options || !options.query) { throw new Error('You must specify a options object and query.'); } @@ -70,10 +81,7 @@ export class SoyuzClient { if (!odef) { throw new Error('Your provided query document did not contain a query definition.'); } - return new ObservableQuery(this.context, - this.queryTree, - odef, - options.variables); + return new ObservableQuery(this.primaryResultTree, this.queryTree, odef, options.variables); } // Execute a mutation against the system. @@ -91,27 +99,29 @@ export class SoyuzClient { if (!odef) { throw new Error('Your provided mutation document did not contain a mutation definition.'); } - let operationId = ++this.serialOperationIdCounter; - let mutation: ISoyuzSerialOperation = new Mutation(operationId, this.context, odef, options.variables); - this.startSerialOperation(operationId, mutation); - return mutation.asPromise(); - } - - // startSerialOperation begins a already-built operation. - private startSerialOperation(id: number, operation: ISoyuzSerialOperation) { - this.serialOperations[id] = operation; - operation.init(); - } - - private initHandlers() { - let lastContext: ISoyuzClientContext; - this.context.subscribe((ctx) => { - if (lastContext) { - lastContext.valueTree.dispose(); - lastContext.clientBus.dispose(); + // start the query. + let qt = new QueryTreeNode(); + let qr = new RunningQuery(this.transport, qt, 'mutation'); + let uqr = qt.buildQuery(odef, options.variables || {}); + this.queries[qr.id] = qr; + let lrt: any; + let data: any; + let rtsub = qr.resultTree.subscribe((rt) => { + if (!rt || rt === lrt) { + return; } - - lastContext = ctx; + lrt = rt; + data = rt.addQuery(uqr.id, (id) => {}); + }); + return new Promise((reject, resolve) => { + qr.resultTree.subscribe({ + error: (err) => { + reject(err); + }, + complete: () => { + resolve(data); + }, + }); }); } } diff --git a/src/interfaces.ts b/src/interfaces.ts deleted file mode 100644 index 453ef08..0000000 --- a/src/interfaces.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { ValueTreeNode } from './value-tree'; -import { ITransport } from './transport'; -import { ClientBus } from './client-bus'; -import { IRGQLSerialResponse } from 'rgraphql'; - -// Client context, including transport and bus. -export interface ISoyuzClientContext { - valueTree: ValueTreeNode; - transport: ITransport; - clientBus: ClientBus; -} - -// General interface for a serial operation handler. -export interface ISoyuzSerialOperation { - init(): void; - asPromise(): Promise; - handleResult(result: IRGQLSerialResponse): void; -} diff --git a/src/mutation.ts b/src/mutation.ts index 4aa4e81..423a065 100644 --- a/src/mutation.ts +++ b/src/mutation.ts @@ -1,27 +1,4 @@ -import { Subscription } from 'rxjs/Subscription'; -import { Subject } from 'rxjs/Subject'; -import { BehaviorSubject } from 'rxjs/BehaviorSubject'; -import { ISoyuzClientContext } from './interfaces'; -import { ITransport } from './transport'; -import { - Query, - QueryError, -} from './query-tree/query'; -import { QueryTreeNode } from './query-tree/query-tree'; -import { - OperationDefinitionNode, - DocumentNode, -} from 'graphql'; -import { - SerialOperationType, - IRGQLSerialResponse, - IRGQLQueryTreeNode, - IASTVariable, -} from 'rgraphql'; -import { Variable } from './var-store/var-store'; - -// An error in the input query. -export type MutationError = QueryError; +import { DocumentNode } from 'graphql'; // Options when starting a mutation. export interface IMutationOptions { @@ -30,154 +7,3 @@ export interface IMutationOptions { // Data to fill variables with. variables?: { [name: string]: any }; } - -interface ISoyuzMutationContext { - transport: ITransport; -} - -interface PromiseResolver { - promise?: Promise; - resolve?: (value: T) => void; - reject?: (error?: any) => void; -} - -function buildPromiseResolver(): PromiseResolver { - let pr: PromiseResolver = {}; - pr.promise = new Promise((resolve, reject) => { - let resolved = false; - pr.resolve = (value: T) => { - if (resolved) { - return; - } - resolved = true; - resolve(value); - }; - pr.reject = (error?: any) => { - if (resolved) { - return; - } - resolved = true; - reject(error); - }; - }); - return pr; -} - -// A mutation (promise pattern). -export class Mutation { - // Operation ID - private operationId: number; - // Promise resolver. - private promiseResolver: PromiseResolver; - // AST - private ast: OperationDefinitionNode; - // Variables - private variables: { [name: string]: any }; - // The query tree reference. - private queryTree: QueryTreeNode; - // The query in the query tree. - private query: Query; - // Any handles we should clear when dropping this.query. - private querySubHandles: Subscription[]; - // Client context subject. - private clientContext: BehaviorSubject; - // Mutation context. - private mutationContext: BehaviorSubject; - - constructor(operationId: number, - clientContext: BehaviorSubject, - ast: OperationDefinitionNode, - variables: { [name: string]: any }) { - // Initialize the promise. - this.promiseResolver = buildPromiseResolver(); - this.mutationContext = new BehaviorSubject(null); - - this.operationId = operationId; - this.ast = ast; - this.variables = variables || {}; - this.querySubHandles = []; - this.clientContext = clientContext; - } - - // Returns the promise for this mutation. - public asPromise(): Promise { - return this.promiseResolver.promise; - } - - // Apply this mutation to the transport and begin resolution. - public init() { - let queryRoot: IRGQLQueryTreeNode; - let queryVariables: IASTVariable[] = []; - this.queryTree = new QueryTreeNode(); - try { - this.query = this.queryTree.buildQuery(this.ast, this.variables); - queryRoot = this.queryTree.buildRGQLTree(true); - this.queryTree.variableStore.forEach((vb: Variable) => { - queryVariables.push(vb.toProto()); - }); - } catch (ex) { - this.promiseResolver.reject(ex); - return; - } - this.querySubHandles.push(this.clientContext.subscribe((ctx) => { - let currentMutationContext = this.mutationContext.value; - if (!ctx || !ctx.transport) { - if (currentMutationContext) { - this.mutationContext.next(null); - } - return; - } - if (currentMutationContext && - currentMutationContext.transport === ctx.transport) { - return; - } - let nctx: ISoyuzMutationContext = { - transport: ctx.transport, - }; - this.mutationContext.next(nctx); - ctx.transport.send({ - serialOperation: { - operationId: this.operationId, - operationType: SerialOperationType.MUTATION, - variables: queryVariables, - queryRoot, - }, - }); - })); - } - - // Handle the result for this mutation. - public handleResult(result: IRGQLSerialResponse) { - this.dispose(); - try { - if (result.queryError && - result.queryError.errorJson && - result.queryError.errorJson.length) { - this.promiseResolver.reject( - JSON.parse(result.queryError.errorJson), - ); - return; - } - if (result.resolveError && - result.resolveError.errorJson && - result.resolveError.errorJson.length) { - this.promiseResolver.reject( - JSON.parse(result.resolveError.errorJson), - ); - return; - } - this.promiseResolver.resolve( - JSON.parse(result.responseJson), - ); - } catch (ex) { - this.promiseResolver.reject(ex); - } - } - - public dispose() { - for (let sub of this.querySubHandles) { - sub.unsubscribe(); - } - this.querySubHandles.length = 0; - } -} diff --git a/src/query-tree/query-tree.ts b/src/query-tree/query-tree.ts index f859513..ea94512 100644 --- a/src/query-tree/query-tree.ts +++ b/src/query-tree/query-tree.ts @@ -293,6 +293,13 @@ export class QueryTreeNode { } } + public fieldNameForQuery(query: number): string { + if (this.queriesAlias.hasOwnProperty(query + '')) { + return this.queriesAlias[query]; + } + return this.fieldName; + } + public buildQuery(query: OperationDefinitionNode, variables: { [name: string]: any }): Query { if (query.kind !== 'OperationDefinition' || @@ -300,7 +307,7 @@ export class QueryTreeNode { throw new Error('buildQuery expects a query or mutation operation.'); } - let result = new Query(this.queryIdCounter++, query, this, this.variableStore); + let result = new Query(++this.queryIdCounter, query, this, this.variableStore); query = result.transformVariables(variables); let self = this; this.addQuery(result, null, null); @@ -405,7 +412,7 @@ export class QueryTreeNode { if (!nod) { return; } - nod.error.next(JSON.parse(err.errorJson)); + nod.error.next(err.error); } public dispose() { diff --git a/src/query.spec.ts b/src/query.spec.ts deleted file mode 100644 index 67e0f29..0000000 --- a/src/query.spec.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { parse } from 'graphql'; -import { ObservableQuery } from './query'; -import { QueryTreeNode } from './query-tree'; -import { ValueTreeNode } from './value-tree'; -import { IChangeBus, ITreeMutation } from './query-tree/change-bus'; -import { IRGQLValueMutation } from 'rgraphql'; -import { ISoyuzClientContext } from './interfaces'; -import { BehaviorSubject } from 'rxjs/BehaviorSubject'; - -import * as _ from 'lodash'; - -function mockAst() { - return parse( -`query myQuery($age: Int) { - allPeople(age: $age) { - name - } -} -`); -} - -describe('ObservableQuery', () => { - it('should build result values correctly', (done) => { - let ast = mockAst(); - let qt = new QueryTreeNode(); - let changeBus: IChangeBus = { - applyTreeMutation: (mutation: ITreeMutation) => { - console.log('Applying:'); - console.log(mutation); - }, - }; - qt.addChangeBus(changeBus); - let vt = new ValueTreeNode(qt); - let ctxSubject = new BehaviorSubject({ - valueTree: vt, - transport: null, - clientBus: null, - }); - let query = new ObservableQuery(ctxSubject, qt, ast.definitions[0], {age: 40}); - query.subscribe((value) => { - let jsonValue = JSON.stringify(value); - console.log(`Got value: ${jsonValue}`); - let expectedVal = - `{"data":{"allPeople":[{"name":"Jane"},{"name":"Bill"}]},"errors":[]}`; - if (jsonValue === expectedVal) { - done(); - } - }); - // This will build: - // {"allPeople":[{"name":"John"},{"name":"Jane"},{"name":"Bill"}]} - // However, due to debouncing we won't see the entire thing on the output without delays. - let mutations: IRGQLValueMutation[] = [ - {valueNodeId: 1, queryNodeId: 1, isArray: true}, - {valueNodeId: 4, parentValueNodeId: 1, queryNodeId: 1, arrayIdx: 1}, - {valueNodeId: 5, parentValueNodeId: 4, queryNodeId: 2, valueJson: '"John"', hasValue: true}, - {valueNodeId: 2, parentValueNodeId: 1, queryNodeId: 1, arrayIdx: 2}, - {valueNodeId: 6, parentValueNodeId: 2, queryNodeId: 2, valueJson: '"Jane"', hasValue: true}, - {valueNodeId: 3, parentValueNodeId: 1, queryNodeId: 1, arrayIdx: 3}, - {valueNodeId: 7, parentValueNodeId: 3, queryNodeId: 2, valueJson: '"Bill"', hasValue: true}, - {valueNodeId: 5, parentValueNodeId: 4, queryNodeId: 2, operation: 2}, - {valueNodeId: 4, parentValueNodeId: 1, queryNodeId: 1, operation: 2}, - ]; - for (let mut of mutations) { - vt.applyValueMutation(mut); - } - }); -}); diff --git a/src/query.ts b/src/query.ts index fd4b153..b9509a4 100644 --- a/src/query.ts +++ b/src/query.ts @@ -3,13 +3,12 @@ import { Observer } from 'rxjs/Observer'; import { Subscription } from 'rxjs/Subscription'; import { Subject } from 'rxjs/Subject'; import { BehaviorSubject } from 'rxjs/BehaviorSubject'; -import { ISoyuzClientContext } from './interfaces'; import { Query, QueryError, } from './query-tree/query'; import { QueryTreeNode } from './query-tree/query-tree'; -import { ValueTreeNode } from './value-tree'; +import { ResultTree } from './result'; import { OperationDefinitionNode, DocumentNode, @@ -25,9 +24,9 @@ import * as _ from 'lodash'; export type QueryError = QueryError; // Result / ongoing status of a query. -export type QueryResult = { - data: T; - errors: QueryError[]; +export type QueryResult = { + data: any; + errors: any[]; }; // Options when starting a query. @@ -39,13 +38,13 @@ export interface IQueryOptions { } interface ISoyuzQueryContext { - valueTree: ValueTreeNode; + resultTree: ResultTree; } // An observable query. -export class ObservableQuery extends Observable> { +export class ObservableQuery extends Observable { // Any observers listening to this query's result. - private observers: Observer>[]; + private observers: Observer[]; // AST private ast: OperationDefinitionNode; // Variables @@ -59,29 +58,31 @@ export class ObservableQuery extends Observable> { // Any handles we should clear when dropping this.query. private querySubHandles: Subscription[]; // Result data storage - private lastResult: QueryResult; - // Client context subject - private clientContext: BehaviorSubject; + private lastResult: QueryResult; + // Result tree + private resultTree: ResultTree; + // Result tree update subject + private resultTreeSubj: BehaviorSubject; private emitResult: Function; - constructor(clientContext: BehaviorSubject, + constructor(resultTreeSubj: BehaviorSubject, queryTree: QueryTreeNode, ast: OperationDefinitionNode, variables: { [name: string]: any }) { // Initialize the Observable - const subscriberFn = (observer: Observer>) => { + const subscriberFn = (observer: Observer) => { return this.onSubscribe(observer); }; super(subscriberFn); this.queryTree = queryTree; - this.clientContext = clientContext; this.queryContext = new BehaviorSubject(null); this.ast = ast; this.variables = variables || {}; this.observers = []; this.querySubHandles = []; + this.resultTreeSubj = resultTreeSubj; this.lastResult = { data: {}, errors: [], @@ -92,10 +93,10 @@ export class ObservableQuery extends Observable> { obs.next(this.lastResult); } } - }); + }, 10, {maxWait: 50, leading: false}); } - private onSubscribe(observer: Observer>) { + private onSubscribe(observer: Observer) { this.observers.push(observer); if (this.observers.length === 1) { @@ -132,206 +133,33 @@ export class ObservableQuery extends Observable> { this.emitResult(); })); - this.querySubHandles.push(this.clientContext.subscribe((ctx) => { - let currentQueryContext = this.queryContext.value; - let hasObservers = this.observers.length; - if (!ctx || !ctx.valueTree) { - if (currentQueryContext) { - this.queryContext.next(null); - } - return; - } - if (currentQueryContext && - currentQueryContext.valueTree === ctx.valueTree) { - return; - } - let nctx: ISoyuzQueryContext = { - valueTree: ctx.valueTree, - }; - this.queryContext.next(nctx); - let sub = this.hookValueTree(ctx.valueTree).subscribe((val: any) => { - this.emitResult(); - }); - let subb = this.queryContext.subscribe((rctx) => { - if (rctx !== nctx) { - sub.unsubscribe(); - subb.unsubscribe(); - } - }); + this.querySubHandles.push(this.resultTreeSubj.subscribe((rt) => { + this.setResultTree(rt); })); } - // Traverse the value tree and register new hooks. - private hookValueTree(vtree: ValueTreeNode, - parentVal: any = this.lastResult.data, - parentIdxMarker: number[] = [], - reuseSubject: Subject = null): Subject { - let context = this.queryContext.value; - if (!context || context.valueTree !== vtree.root) { + private setResultTree(rt: ResultTree) { + if (rt === this.resultTree) { return; } - // Disposed prevents multiple errant re-evaluations - let disposed = false; - let changed = reuseSubject || new Subject(); - let subHandles: Subscription[] = []; - let cleanupFuncs: (() => void)[] = []; - let qnode = vtree.queryNode; - let cleanup = () => { - if (disposed) { - return; - } - for (let sh of subHandles) { - sh.unsubscribe(); - } - subHandles.length = 0; - for (let cu of cleanupFuncs) { - cu(); - } - cleanupFuncs.length = 0; - }; - let reevaluate = () => { - cleanup(); - if (disposed) { - return; - } - disposed = true; - this.hookValueTree(vtree, parentVal, parentIdxMarker, changed); - }; - subHandles.push(this.queryContext.subscribe((ctx) => { - if (!ctx || ctx.valueTree !== vtree.root) { - cleanup(); - } - })); - - // If we're not interested in this query node, subscribe in case we become interested. - if (!qnode.queries[this.query.id]) { - subHandles.push(qnode.queryAdded.subscribe((query: Query) => { - if (query === this.query) { - reevaluate(); - } - })); - return changed; - } - - // Handle what happens if we remove this query (lose interest). - subHandles.push(qnode.queryRemoved.subscribe((query: Query) => { - if (query === this.query) { - reevaluate(); - } - })); - - let fieldName = qnode.queriesAlias[this.query.id] || qnode.fieldName; - let isArray = vtree.isArray; - - let pv: any; - let pvChildIdxMarker: any[]; - let applyPv: Function; - - if (qnode.id === 0 || !fieldName) { - pv = parentVal; - } else if (isArray) { - pv = []; - pvChildIdxMarker = []; - } else { - pv = undefined; - } - - if (qnode.id !== 0) { - if (typeof parentVal === 'object' && parentVal.constructor !== Array) { - applyPv = (val: any) => { - parentVal[fieldName] = val; - }; - if (pv !== undefined) { - applyPv(pv); - } - cleanupFuncs.push(() => { - if (!parentVal.hasOwnProperty(fieldName)) { - return; - } - delete parentVal[fieldName]; - changed.next(); - }); - subHandles.push(vtree.value.subscribe((val) => { - if (val === undefined) { - return; - } - applyPv(val); - changed.next(); - })); - } else { - let aidx = vtree.arrayIdx || 0; - applyPv = (value: any) => { - let idx = insertionIndex(parentIdxMarker, aidx); - parentIdxMarker.splice(idx, 0, aidx); - parentVal.splice(idx, 0, value); - }; - if (pv !== undefined) { - applyPv(pv); - } - cleanupFuncs.push(() => { - let idx = binarySearch(parentIdxMarker, aidx); - if (parentIdxMarker[idx] !== aidx) { - return; - } - parentVal.splice(idx, 1); - parentIdxMarker.splice(idx, 1); - changed.next(); - }); - subHandles.push(vtree.value.subscribe((val) => { - if (val === undefined) { - return; - } - if (pv === undefined) { - pv = val; - applyPv(val); - changed.next(); - return; - } - let idx = binarySearch(parentIdxMarker, aidx); - if (parentIdxMarker[idx] !== aidx) { - return; - } - parentVal[idx] = val; - changed.next(); - })); - } - } else { - applyPv = () => {}; + if (this.resultTree && this.query) { + this.resultTree.removeQuery(this.query.id); } - let addChild = (child: ValueTreeNode) => { - let cqnode = child.queryNode; - if (pv === undefined) { - pv = {}; - applyPv(pv); - } - let childSubj = this.hookValueTree(child, pv, pvChildIdxMarker); - if (childSubj) { - childSubj.subscribe(() => { - changed.next(); - }); - } - }; - - for (let child of vtree.children) { - if (child) { - addChild(child); - } + let hasObservers = this.observers.length; + if (rt) { + this.lastResult.data = rt.addQuery(this.query.id, (id: number) => { + this.emitResult(); + }); } - vtree.childAdded.subscribe((vchild: ValueTreeNode) => { - addChild(vchild); - }); - - subHandles.push(vtree.value.subscribe(null, null, () => { - cleanup(); - })); - - return changed; } private cancelQuery() { if (this.query) { + if (this.resultTree) { + this.resultTree.removeQuery(this.query.id); + } for (let hand of this.querySubHandles) { hand.unsubscribe(); } diff --git a/src/result/index.ts b/src/result/index.ts new file mode 100644 index 0000000..9041617 --- /dev/null +++ b/src/result/index.ts @@ -0,0 +1 @@ +export { ResultTree } from './result-tree'; diff --git a/src/result/result-tree.spec.ts b/src/result/result-tree.spec.ts new file mode 100644 index 0000000..8015af7 --- /dev/null +++ b/src/result/result-tree.spec.ts @@ -0,0 +1,135 @@ +import { + parse, +} from 'graphql'; +import { + Kind, + CacheStrategy, + IRGQLValue, +} from 'rgraphql'; +import { + ResultTree, + ResultTreeCursor, + CachedResultTreeCursor, +} from './result-tree'; +import { + QueryTreeNode, +} from '../query-tree/query-tree'; +import { + Query, +} from '../query-tree/query'; + +describe('ResultTreeCursor', () => { + it('should handle a root-level object value', () => { + let root = {}; + let cursor = new ResultTreeCursor(); + cursor.path = []; + cursor.resultLocations = {0: [root, (id) => {}]}; + cursor.apply({queryNodeId: 1}); + cursor.apply({queryNodeId: 2, value: {intValue: 1, kind: Kind.PRIMITIVE_KIND_INT}}); + expect(root).toEqual({1: {2: 1}}); + }); + it('should handle a root-level primitive array', () => { + let root = {}; + let cursor = new ResultTreeCursor(); + cursor.path = []; + cursor.resultLocations = {0: [root, (id) => {}]}; + cursor.apply({queryNodeId: 1}); + cursor.apply({arrayIndex: 1, value: {intValue: 1, kind: Kind.PRIMITIVE_KIND_INT}}); + expect(root).toEqual({1: [1]}); + }); + it('should handle a basic object array, with checkpointing', () => { + let root = {}; + let cursor = new ResultTreeCursor(); + cursor.path = []; + cursor.resultLocations = {0: [root, (id) => {}]}; + cursor.apply({queryNodeId: 1}); + cursor.apply({arrayIndex: 1}); // checkpoint here. + + let cache = new CachedResultTreeCursor(cursor); + cursor.apply({queryNodeId: 2, value: {stringValue: 'test', kind: Kind.PRIMITIVE_KIND_STRING}}); + expect(root).toEqual({1: [{2: 'test'}]}); + + cursor = cache.cursor(); + cursor.apply({queryNodeId: 3, value: {kind: Kind.PRIMITIVE_KIND_NULL}}); + expect(root).toEqual({1: [{2: 'test', 3: null}]}); + }); + it('should handle aliasing a live value', () => { + let root = {}; + let cursor = new ResultTreeCursor(); + cursor.path = []; + cursor.resultLocations = {0: [root, (id) => {}]}; + cursor.apply({queryNodeId: 1}); + cursor.apply({arrayIndex: 1, value: {kind: Kind.PRIMITIVE_KIND_INT, intValue: 5}}); + let cache = new CachedResultTreeCursor(cursor); + expect(root).toEqual({1: [5]}); + + cursor = cache.cursor(); + cursor.apply({posIdentifier: 1, value: {stringValue: 'test', kind: Kind.PRIMITIVE_KIND_STRING}}); + expect(root).toEqual({1: ['test']}); + }); +}); + +describe('ResultTree', () => { + it('should handle a complex object', () => { + let qt = new QueryTreeNode(); + let q: Query = qt.buildQuery(parse(` +query { + allPeople { + name + age + parents + } +} + `).definitions[0], {}); + let rt = new ResultTree(0, qt, CacheStrategy.CACHE_LRU, 2); + let qres = rt.addQuery(q.id, (id) => {}); + let segments: IRGQLValue[] = [ + {queryNodeId: 1}, + {arrayIndex: 1}, + {queryNodeId: 4, posIdentifier: 1}, + {arrayIndex: 3, value: {kind: Kind.PRIMITIVE_KIND_STRING, stringValue: 'Parent3'}}, + {posIdentifier: 1, arrayIndex: 1, value: {kind: Kind.PRIMITIVE_KIND_STRING, stringValue: 'Parent1'}}, + {posIdentifier: 1, arrayIndex: 2, value: {kind: Kind.PRIMITIVE_KIND_STRING, stringValue: 'Parent2'}}, + {queryNodeId: 1}, + {arrayIndex: 1}, + {queryNodeId: 3, value: {kind: Kind.PRIMITIVE_KIND_INT, intValue: 5}}, + {queryNodeId: 1}, + {arrayIndex: 1}, + {queryNodeId: 2, value: {kind: Kind.PRIMITIVE_KIND_STRING, stringValue: 'Jane'}}, + ]; + + for (let seg of segments) { + rt.handleSegment(seg); + } + + expect(rt.result).toEqual({ + 1: [{ + 2: 'Jane', + 3: 5, + 4: ['Parent1', 'Parent2', 'Parent3'], + }], + }); + expect(qres).toEqual({ + 'allPeople': [{ + 'name': 'Jane', + 'age': 5, + 'parents': ['Parent1', 'Parent2', 'Parent3'], + }], + }); + + // Attempt to build a second query to read the cache. + q = qt.buildQuery(parse(` +query { + allPeople { + parents + } +} + `).definitions[0], {}); + qres = rt.addQuery(q.id, (id) => {}); + expect(qres).toEqual({ + 'allPeople': [{ + 'parents': ['Parent1', 'Parent2', 'Parent3'], + }], + }); + }); +}); diff --git a/src/result/result-tree.ts b/src/result/result-tree.ts new file mode 100644 index 0000000..a772180 --- /dev/null +++ b/src/result/result-tree.ts @@ -0,0 +1,339 @@ +import { QueryTreeNode } from '../query-tree/query-tree'; +import { + IRGQLValue, + IRGQLPrimitive, + CacheStrategy, + UnpackPrimitive, +} from 'rgraphql'; +import { + LRUMap, +} from 'lru_map'; + +import * as _ from 'lodash'; + +// CursorPathElementKind represents if a path element is a qnode ID or array idx. +enum CursorPathElementKind { + QUERY_NODE = 1, + ARRAY_IDX = 2, +} + +// ResultLocation stores a pointer to a location and a changed callback. +export type ResultLocation = [any, (id: number) => void]; + +// ResultTreeCursor handles pointing to a location in a result tree. +export class ResultTreeCursor { + // Attached result locations. + public resultLocations: { [id: number]: ResultLocation }; + // If we have an index in addition to the location, set here. 1-based. + public arrayIndex: number; + // If we have a query node ID in addition to the location, set here. + public queryNodeId: number; + // The query tree node. + public queryNode: QueryTreeNode; + // Path to this cursor, if not a value cursor. + public path: number[]; + + // Copy duplicates the cursor. + public copy(): ResultTreeCursor { + let result = new ResultTreeCursor(); + result.arrayIndex = this.arrayIndex; + result.queryNode = this.queryNode; + result.queryNodeId = this.queryNodeId; + result.resultLocations = {}; + for (let rid in this.resultLocations) { + if (!this.resultLocations.hasOwnProperty(rid)) { + continue; + } + result.resultLocations[+rid] = _.clone(this.resultLocations[rid]); + } + if (this.path) { + result.path = this.path.slice(); + } + return result; + } + + // ApplyValue assumes this is a leaf and applies a value primitive. + public applyValue(unpk: any) { + for (let key in this.resultLocations) { + if (!this.resultLocations.hasOwnProperty(key)) { + continue; + } + let qid = +key; + let loc = this.resultLocations[key][0]; + if (this.arrayIndex) { + loc[this.arrayIndex - 1] = unpk; + } else { + let qnk: any; + if (qid === 0) { + qnk = this.queryNodeId; + } else { + qnk = this.queryNode.root.rootNodeMap[this.queryNodeId].fieldNameForQuery(qid); + } + loc[qnk] = unpk; + } + } + } + + // Apply applies a value segment. + public apply(segment: IRGQLValue) { + if (segment.queryNodeId || segment.arrayIndex) { + this.resolvePendingLocation(segment.queryNodeId, segment.arrayIndex); + } + + if (segment.value) { + this.applyValue(UnpackPrimitive(segment.value)); + segment.value = undefined; // clear in case we are used as a pos identifier. + } else if (segment.error && segment.error.length) { + this.applyValue({'$error': segment.error}); + segment.error = undefined; + } + } + + // addQuery checks if a query should be included in this cursor, and adds it if so. + public addQuery(qid: number, result: any, resultCb: (id: number) => void) { + // make sure we don't double register + if (this.resultLocations.hasOwnProperty(qid + '')) { + return; + } + + // check if the qnode has this query + if (!this.path || !this.queryNode || !this.queryNode.queries.hasOwnProperty(qid + '')) { + return; + } + + let qnr = this.queryNode.root; + for (let element of this.path) { + if (result instanceof Array) { + result = result[element - 1]; + } else { + let qn = qnr.rootNodeMap[element]; + if (!qn) { + return; + } + let fieldName = qn.fieldNameForQuery(element); + result = result[fieldName]; + } + } + this.resultLocations[qid] = [result, resultCb]; + } + + // removeQuery removes a query from the cursor. + public removeQuery(qid: number) { + delete this.resultLocations[qid]; + } + + // resolvePendingLocation updates location once we know if we have a array or object. + private resolvePendingLocation(queryNodeId: number, arrayIndex: number) { + // Detect if we're reaching a leaf array. + let leafLoc: Array; + let isLeaf = this.queryNodeId && this.queryNode && !this.queryNode.children.length; + if (isLeaf && arrayIndex) { + leafLoc = []; + } + for (let locid in this.resultLocations) { + if (!this.resultLocations.hasOwnProperty(locid)) { + continue; + } + let queryId = +locid; + let location = this.resultLocations[locid][0]; + if (!leafLoc && this.arrayIndex) { + let eloc = location[this.arrayIndex - 1]; + if (eloc) { + location = eloc; + } else { + location = location[this.arrayIndex - 1] = queryNodeId ? {} : []; + } + } else if (this.queryNodeId) { + let qn = this.queryNode ? this.queryNode.root.rootNodeMap[this.queryNodeId] : undefined; + let fieldId = queryId === 0 ? this.queryNodeId : qn.fieldNameForQuery(queryId); + let nloc = leafLoc || (location.hasOwnProperty(fieldId) ? + location[fieldId] : (queryNodeId ? {} : [])); + location = location[fieldId] = nloc; + this.queryNode = qn; + } + this.resultLocations[locid][0] = location; + if (isLeaf) { + // issue a changed notification + this.resultLocations[locid][1](queryId); + } + } + if (leafLoc) { + this.resultLocations = {0: this.resultLocations[0]}; + this.queryNode = undefined; + } + if (this.arrayIndex || this.queryNodeId) { + this.path.push(this.arrayIndex || this.queryNodeId); + } + this.arrayIndex = arrayIndex; + this.queryNodeId = queryNodeId; + } +} + +// A CachedResultTreeCursor contains a tree cursor in the cache. +export class CachedResultTreeCursor { + private _cursor: ResultTreeCursor; + + // Construct the result tree with the cursor. + constructor(cursor: ResultTreeCursor) { + this._cursor = cursor.copy(); + } + + // cursor returns the cached cursor. + public cursor(): ResultTreeCursor { + return this._cursor.copy(); + } + + // addQuery adds a query to the cached cursor. + public addQuery(qid: number, result: Object, resultCb: (id: number) => void) { + this._cursor.addQuery(qid, result, resultCb); + } + + // removeQuery removes a query to the cached cursor. + public removeQuery(qid: number) { + this._cursor.removeQuery(qid); + } +} + +// ResultTree builds and manages results. +export class ResultTree { + // result holds the result thus far. + public result: Object = {}; + + // cursor holds the current position selected by the segments processed so far. + private cursor: ResultTreeCursor; + // rootCursor is cloned to make a new cursor. + private rootCursor: ResultTreeCursor; + // LRU cache + private cache: LRUMap; + // pendingPathComponent holds any previous unresolved path component. + private pendingPathComponent: IRGQLValue; + + constructor(public id: number, + public qtree: QueryTreeNode, + public cacheStrategy: CacheStrategy, + public cacheSize: number) { + if (cacheStrategy !== CacheStrategy.CACHE_LRU) { + throw new Error('Cache strategy not supported.'); + } + this.cache = new LRUMap(cacheSize); + + this.rootCursor = new ResultTreeCursor(); + this.rootCursor.resultLocations = {0: [this.result, (_) => {}]}; + this.rootCursor.path = []; + this.rootCursor.queryNode = qtree; + this.rootCursor.queryNodeId = 0; + } + + // addQuery registers a query by ID, returning the result object. + public addQuery(id: number, changedCb: (id: number) => void): Object { + let result: Object = {}; + this.rootCursor.resultLocations[id] = [result, changedCb]; + + // DFS over the current result tree, apply to result. + this.addQueryDFS(id, this.result, result); + this.cache.forEach((value: CachedResultTreeCursor, key: number, _: any) => { + value.addQuery(id, result, changedCb); + }); + + return result; + } + + // removeQuery removes a query by ID. + public removeQuery(id: number) { + if (!this.rootCursor.resultLocations.hasOwnProperty(id + '')) { + return; + } + delete this.rootCursor.resultLocations[id]; + this.cache.forEach((value: CachedResultTreeCursor, key: number, _: any) => { + value.removeQuery(id); + }); + } + + // addQueryDFS recursively copies the result tree to the query result. + private addQueryDFS(qid: number, srcLocation: any, targetLocation: any) { + for (let qnids in srcLocation) { + if (!srcLocation.hasOwnProperty(qnids)) { + continue; + } + let qnid = +qnids; + let qn = this.qtree.rootNodeMap[qnid]; + if (!qn) { + continue; + } + if (!qn.queries[qid]) { + continue; + } + let srcVal = srcLocation[qnids]; + let val: any = undefined; + if (!qn.children || !qn.children.length) { + val = srcVal; + } else { + // Recurse copy. + // [[[{5: "test"}]]] -> copy recursively until we hit the object, then dfs + if (srcVal instanceof Array) { + this.addQueryArrDFS(qid, srcVal, val = []); + } else { + this.addQueryDFS(qid, srcVal, val = {}); + } + } + targetLocation[qn.fieldNameForQuery(qid)] = val; + } + } + + // srcLocation is source array, targetLocation is target array. + private addQueryArrDFS(qid: number, srcLocation: Array, targetLocation: Array) { + for (let i = 0; i < srcLocation.length; i++) { + let srcVal: any = srcLocation[i]; + if (srcVal instanceof Array) { + let tval: Array = []; + this.addQueryArrDFS(qid, srcVal, targetLocation[i] = tval); + } else { + this.addQueryDFS(qid, srcVal, targetLocation[i] = {}); + } + } + } + + // handleSegment handles an incoming path segment. + public handleSegment(val: IRGQLValue) { + // query_node_id=1 + // - set this.cursor to root {} + // - add root[1]->? + // array_index=1 + // - root[1]->[] + // - root[1][0] = ? + // query_node_id=4, pos_identifier=1 + // - root[1][0] = {} + // - root[1][0][4] = ? + // - pos_identifier[1] -> root[1][0][4] (currently ?) + // array_index=3, value="parent1" + // - root[1][0][4] = [] + // - root[1][0][4][2] = "parent1" + // result: {1: [{4: [-, -, "parent1"]}]} + // ^ + // 1 + // + // note: all the (?) can be resolved in tuples. + let isFirst = this.cursor === undefined; + if (isFirst) { + if (val.posIdentifier) { + this.cursor = this.cache.get(val.posIdentifier).cursor(); + val.posIdentifier = 0; + } else { + this.cursor = this.rootCursor.copy(); + } + } + + let hadValue = (!!val.value) || (!!val.error && !!val.error.length); + this.cursor.apply(val); + + // Register any new position identifiers. + if (!isFirst && val.posIdentifier) { + this.cache.set(val.posIdentifier, new CachedResultTreeCursor(this.cursor)); + } + + // One we process a value, reset the state machine. + if (hadValue) { + this.cursor = undefined; + } + } +} diff --git a/src/client-bus.ts b/src/running-query.ts similarity index 54% rename from src/client-bus.ts rename to src/running-query.ts index f26eddf..083d01e 100644 --- a/src/client-bus.ts +++ b/src/running-query.ts @@ -1,3 +1,4 @@ +import { BehaviorSubject } from 'rxjs/BehaviorSubject'; import { ITransport, } from './transport'; @@ -6,47 +7,70 @@ import { IRGQLServerMessage, INodeMutation, SubtreeOperation, + RGQLValue, + IRGQLValue, } from 'rgraphql'; import { IChangeBus, ITreeMutation, } from './query-tree/change-bus'; -import { - ISoyuzClientContext, - ISoyuzSerialOperation, -} from './interfaces'; import { QueryTreeNode } from './query-tree'; -import { ValueTreeNode } from './value-tree'; +import { ResultTree } from './result'; +import { ObservableQuery } from './query'; + +// RunningQuery applies query-tree changes to a RGQL transport and manages a result tree. +export class RunningQuery implements IChangeBus { + // query ID + public id: number; + // value tree + public resultTree: BehaviorSubject = new BehaviorSubject(null); -// ClientBus applies query-tree changes to a RGQL transport. -export class ClientBus implements IChangeBus { constructor(public transport: ITransport, public queryTree: QueryTreeNode, - public valueTree: ValueTreeNode, - public serialOperations: { [operationId: number]: ISoyuzSerialOperation }) { + public operation: string) { + this.id = transport.nextQueryId(); transport.onMessage((msg: IRGQLServerMessage) => { this.handleMessage(msg); }); queryTree.addChangeBus(this); + transport.send({ + initQuery: { + queryId: this.id, + operationType: operation, + }, + }); } public handleMessage(msg: IRGQLServerMessage) { - if (!this.queryTree || !this.valueTree) { + if (!this.queryTree) { return; } - if (msg.queryError) { - this.queryTree.applyQueryError(msg.queryError); + + if (msg.valueInit && msg.valueInit.queryId === this.id) { + let vi = msg.valueInit; + let vt = new ResultTree(vi.resultId, this.queryTree, vi.cacheStrategy, vi.cacheSize); + this.resultTree.next(vt); } - if (msg.mutateValue) { - this.valueTree.applyValueMutation(msg.mutateValue); + + let rt = this.resultTree.value; + if (!rt) { + return; } - if (msg.serialResponse) { - let operation = this.serialOperations[msg.serialResponse.operationId]; - if (operation) { - delete this.serialOperations[msg.serialResponse.operationId]; - operation.handleResult(msg.serialResponse); + + if (msg.valueBatch && msg.valueBatch.resultId === rt.id) { + for (let seg of msg.valueBatch.values) { + let val: IRGQLValue = RGQLValue.decode(seg).toObject(); + this.resultTree.value.handleSegment(val); } } + + if (msg.queryError && msg.queryError.queryId === this.id) { + this.queryTree.applyQueryError(msg.queryError); + } + + if (msg.valueFinalize && msg.valueFinalize.resultId === rt.id) { + this.dispose(); + } } // A mutation was made to the tree. @@ -85,6 +109,6 @@ export class ClientBus implements IChangeBus { this.queryTree.removeChangeBus(this); this.transport = null; this.queryTree = null; - this.valueTree = null; + this.resultTree.complete(); } } diff --git a/src/transport/transport.ts b/src/transport/transport.ts index 80b9000..b050c3d 100644 --- a/src/transport/transport.ts +++ b/src/transport/transport.ts @@ -9,4 +9,6 @@ export interface ITransport { onMessage(cb: (mes: IRGQLServerMessage) => void): void; // Soyuz will call this function with outgoing messages for the server. send(msg: IRGQLClientMessage): void; + // nextQueryId gets the next available query ID. + nextQueryId(): number; } diff --git a/src/value-tree/index.ts b/src/value-tree/index.ts deleted file mode 100644 index 3456dd1..0000000 --- a/src/value-tree/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './value-tree'; diff --git a/src/value-tree/value-tree.ts b/src/value-tree/value-tree.ts deleted file mode 100644 index e40495e..0000000 --- a/src/value-tree/value-tree.ts +++ /dev/null @@ -1,181 +0,0 @@ -import { - IRGQLValueMutation, - ValueOperation, -} from 'rgraphql'; -import { - QueryTreeNode, -} from '../query-tree'; -import { BehaviorSubject } from 'rxjs/BehaviorSubject'; -import { Subject } from 'rxjs/Subject'; -import { Subscription } from 'rxjs/Subscription'; - -export class ValueTreeNode { - public id: number; - public root: ValueTreeNode; - public parent: ValueTreeNode; - public children: ValueTreeNode[] = []; - - public isArray: boolean; - public arrayIdx: number; - - public value = new BehaviorSubject(undefined); - public error = new BehaviorSubject(undefined); - public valueUpdated = new BehaviorSubject(new Date()); - public childAdded = new Subject(); - - // All nodes in the tree listed by ID. Only on the root. - public rootNodeMap: { [id: number]: ValueTreeNode }; - // Orphaned mutations with parents waiting to come down the wire. - public rootPendingNodeMap: { [parentId: number]: IRGQLValueMutation[] }; - private cleanupSubscriptions: Subscription[]; - - constructor(public queryNode: QueryTreeNode, - root: ValueTreeNode = null, - parent: ValueTreeNode = null, - id = 0) { - this.root = root || this; - this.parent = parent || null; - this.id = id; - this.cleanupSubscriptions = []; - - if (this.root === this) { - this.rootNodeMap = {}; - this.rootPendingNodeMap = {}; - } - this.root.rootNodeMap[id] = this; - this.cleanupSubscriptions.push( - queryNode.error.subscribe((err) => { - if (err) { - this.dispose(); - } - }, null, () => { - this.dispose(); - }), - ); - } - - public get isRoot() { - return this.root === this; - } - - // Apply a value mutation to the tree. - public applyValueMutation(mutation: IRGQLValueMutation) { - // Find the referenced node. - let node: ValueTreeNode = this.root.rootNodeMap[mutation.valueNodeId || 0]; - if (!node) { - if (mutation.operation === ValueOperation.VALUE_DELETE) { - return; - } - - // Create the node. First, find the query tree node for this resolver. - let qnode: QueryTreeNode = this.queryNode.root.rootNodeMap[mutation.queryNodeId || 0]; - if (!qnode) { - // We have already unsubscribed from this. - return; - } - - // Find the parent of the new resolver - let pnode: ValueTreeNode = this.root.rootNodeMap[mutation.parentValueNodeId || 0]; - if (!pnode) { - // throw new Error('Value tree node (parent) ' + mutation.parentValueNodeId + ' not found.'); - // console.log(`Orphan: ${JSON.stringify(mutation.valueNodeId)}`); - let a = this.root.rootPendingNodeMap[mutation.parentValueNodeId] || []; - a.push(mutation); - this.root.rootPendingNodeMap[mutation.parentValueNodeId] = a; - return; - } - - // Push the new node - node = new ValueTreeNode(qnode, this.root, pnode, mutation.valueNodeId || 0); - - // These checks are checking zero-value. - // They shouldn't use terniary, to avoid an unnecessary assignment. - if (mutation.isArray) { - node.isArray = true; - } - if (mutation.arrayLen) { - pnode.children.length = mutation.arrayLen; - } - if (mutation.arrayIdx) { - node.arrayIdx = mutation.arrayIdx - 1; - pnode.children[node.arrayIdx] = node; - } else { - node.arrayIdx = pnode.children.length; - if (pnode.value.value === undefined) { - pnode.value.next({}); - } - pnode.children.push(node); - } - pnode.childAdded.next(node); - - // Find any orphaned nodes tied to this parent. - let orphans = this.root.rootPendingNodeMap[mutation.valueNodeId]; - if (orphans) { - delete this.root.rootPendingNodeMap[mutation.valueNodeId]; - setTimeout(() => { - for (let orphan of orphans) { - this.applyValueMutation(orphan); - } - }, 0); - } - } - - let nval: any = undefined; - if (mutation.hasValue && mutation.valueJson && mutation.valueJson.length) { - nval = JSON.parse(mutation.valueJson); - } - - switch (mutation.operation || 0) { - case ValueOperation.VALUE_SET: - node.value.next(nval); - node.valueUpdated.next(new Date()); - node.error.next(undefined); - break; - case ValueOperation.VALUE_DELETE: - node.dispose(); - break; - case ValueOperation.VALUE_ERROR: - node.error.next(nval); - break; - default: - return; - } - } - - public removeChild(child: ValueTreeNode, disposeChild = true) { - let idx = this.children.indexOf(child); - if (idx === -1) { - return; - } - this.children.splice(idx, 1); - if (disposeChild && child) { - child.dispose(false); - } - } - - public dispose(informParent = true) { - if (this.value.isStopped) { - return; - } - - for (let sub of this.cleanupSubscriptions) { - sub.unsubscribe(); - } - this.cleanupSubscriptions.length = 0; - - // Dispose each child first. - for (let child of this.children) { - if (child) { - child.dispose(); - } - } - this.children.length = 0; - - // Terminate this value stream. - this.value.complete(); - - if (informParent && this.parent) { - this.parent.removeChild(this, false); - } - } -} diff --git a/src/var-store/var-store.ts b/src/var-store/var-store.ts index 2b3a8cf..5af4f9c 100644 --- a/src/var-store/var-store.ts +++ b/src/var-store/var-store.ts @@ -3,6 +3,7 @@ import { Subject } from 'rxjs/Subject'; import { ValueNode } from 'graphql'; import { IASTVariable, + PackPrimitive, } from 'rgraphql'; import * as _ from 'lodash'; @@ -39,7 +40,7 @@ export class Variable { public toProto(): IASTVariable { return { id: this.id, - jsonValue: JSON.stringify(this.value), + value: PackPrimitive(this.value), }; } diff --git a/yarn.lock b/yarn.lock index b0f4a42..d2e33e7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1213,6 +1213,10 @@ loud-rejection@^1.0.0: currently-unhandled "^0.4.1" signal-exit "^3.0.0" +lru_map@^0.3.3: + version "0.3.3" + resolved "https://registry.yarnpkg.com/lru_map/-/lru_map-0.3.3.tgz#b5c8351b9464cbd750335a79650a0ec0e56118dd" + make-error@^1.1.1: version "1.2.2" resolved "https://registry.yarnpkg.com/make-error/-/make-error-1.2.2.tgz#e4e270e474f642cca20fa126fe441163957832ef"