diff --git a/src/RequestManager.test.ts b/src/RequestManager.test.ts index 6450bd8..1b2d5f1 100644 --- a/src/RequestManager.test.ts +++ b/src/RequestManager.test.ts @@ -1,70 +1,213 @@ import RequestManager from "./RequestManager"; import EventEmitterTransport from "./transports/EventEmitterTransport"; +import { EventEmitter } from "events"; describe("client-js", () => { + it("can be constructed", () => { - const transport = new EventEmitterTransport("foo://unique-uri"); + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); const c = new RequestManager([transport]); expect(!!c).toEqual(true); }); it("has a request method that returns a promise", () => { - const transport = new EventEmitterTransport("foo://unique-uri"); + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); const c = new RequestManager([transport]); expect(typeof c.request).toEqual("function"); expect(typeof c.request("my_method", null).then).toEqual("function"); }); - it("can connect", () => { - const transport = new EventEmitterTransport("foo://unique-uri"); + it("can connect", async () => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); const c = new RequestManager([transport]); return c.connect(); }); it("can close", () => { - const transport = new EventEmitterTransport("foo://unique-uri"); + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); const c = new RequestManager([transport]); c.close(); }); - it("can send a request", (done) => { - const transport = new EventEmitterTransport("foo://unique-uri"); + it("can send a request", async () => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); + const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); const c = new RequestManager([transport]); - c.connect(); - transport.onData((data: any) => { - const d = JSON.parse(data); - expect(d.method).toEqual("foo"); - done(); + await c.connect(); + const reqPromise = c.request("foo", []); + serverTransport.sendData(JSON.stringify({ id: 0, result: { foo: "foofoo" } })); + await expect(reqPromise).resolves.toEqual({ foo: "foofoo" }); + }); + + it("can error on malformed response", (done) => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); + const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + const c = new RequestManager([transport]); + c.connect().then(() => { + c.request("foo", []).catch((e) => { + expect(e.message).toContain("Malformed"); + done(); + }); + serverTransport.sendData(JSON.stringify({ id: 0, foo: "bar" })); }); - c.request("foo", []); }); - it("can send a request and error", () => { - const transport = new EventEmitterTransport("foo://unique-uri"); + it("can error on batchng a request", async () => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); + const c = new RequestManager([transport]); + await c.connect(); + expect(() => c.stopBatch()).toThrow(); + }); + + it("can return errors on batch requests", (done) => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); + const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + const c = new RequestManager([transport]); - transport.onData = (fn) => { - transport.connection.on("message", () => { - fn(JSON.stringify({ + c.connect().then(() => { + c.startBatch(); + const requests = [ + c.request("foo", []), + c.request("foo", []), + ]; + Promise.all(requests).catch((e) => { + expect(e).toEqual({ + code: 509, + message: "too much 509", + data: { + test: "data", + }, + }); + c.close(); + done(); + }); + c.stopBatch(); + serverTransport.sendData(JSON.stringify([ + { jsonrpc: "2.0", - id: 3, + id: "0", error: { - code: 0, - message: "out of order", + code: 509, + message: "too much 509", data: { - foo: "bar", + test: "data", }, }, - })); + }, + { + jsonrpc: "2.0", + id: "1", + result: "bar", + }, + ])); + + }); + }); + + it("can batch a request", (done) => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); + const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + + const c = new RequestManager([transport]); + c.connect().then(() => { + c.startBatch(); + const requests = [ + c.request("foo", []), + c.request("foo", []), + ]; + c.stopBatch(); + Promise.all(requests).then(([a, b]) => { + expect(a).toEqual("foo"); + expect(b).toEqual("bar"); + c.close(); + done(); }); - }; - c.connect(); - expect(c.request("foo", [])).rejects.toBe({ - code: 0, - message: "out of order", - data: { - foo: "bar", - }, + serverTransport.sendData(JSON.stringify([ + { + jsonrpc: "2.0", + id: 0, + result: "foo", + }, + { + jsonrpc: "2.0", + id: 1, + result: "bar", + }, + ])); + }); + }); + + it("can send a request and error", (done) => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); + const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + const c = new RequestManager([transport]); + c.connect().then(() => { + c.request("foo", []) + .catch((e) => { + expect(e.message).toEqual("out of order"); + done(); + }); + serverTransport.sendData(JSON.stringify({ + jsonrpc: "2.0", + id: 0, + error: { + code: 0, + message: "out of order", + data: { + foo: "bar", + }, + }, + })); }); }); + it("onData throws if the ID is not found", async () => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); + const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + const c = new RequestManager([transport]); + await c.connect(); + expect(() => serverTransport.sendData(JSON.stringify({ + jsonrpc: "2.0", + id: 10, + result: 123, + }))).toThrow("Received an unrecognized response id: 10. Valid ids are: "); + }); + + describe("stopBatch", () => { + it("does nothing if the batch is empty", () => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); + transport.sendData = jest.fn(); + const c = new RequestManager([transport]); + c.startBatch(); + c.stopBatch(); + expect(transport.sendData).not.toHaveBeenCalled(); + }); + }); + + describe("startBatch", () => { + it("it does nothing if a batch is already started", async () => { + const emitter = new EventEmitter(); + const transport = new EventEmitterTransport(emitter, "from1", "to1"); + const c = new RequestManager([transport]); + await c.connect(); + c.startBatch(); + c.request("foo", []); + expect(c.batch.length).toBe(1); + c.startBatch(); + c.request("foo", []); + expect(c.batch.length).toBe(2); + }); + }); }); diff --git a/src/RequestManager.ts b/src/RequestManager.ts index a81026a..59d785d 100644 --- a/src/RequestManager.ts +++ b/src/RequestManager.ts @@ -1,5 +1,29 @@ import ITransport from "./transports/Transport"; -let id = 1; + +interface IJSONRPCRequest { + jsonrpc: "2.0"; + id: string | number; + method: string; + params: any[] | object; +} +interface IJSONRPCError { + code: number; + message: string; + data: any; +} + +interface IJSONRPCResponse { + jsonrpc: "2.0"; + id: string | number; // can also be null + result?: any; + error?: IJSONRPCError; +} + +interface IJSONRPCNotification { + jsonrpc: "2.0"; + method: string; + params: any[] | object; +} /* ** Naive Request Manager, only use 1st transport. @@ -8,64 +32,101 @@ let id = 1; */ class RequestManager { public transports: ITransport[]; + public connectPromise: Promise; + public batch: IJSONRPCRequest[] = []; private requests: any; - private connectPromise: Promise; + private batchStarted: boolean = false; + private lastId: number = -1; + constructor(transports: ITransport[]) { this.transports = transports; this.requests = {}; this.connectPromise = this.connect(); } + public connect(): Promise { - const promises = this.transports.map((transport) => { - return new Promise(async (resolve, reject) => { - await transport.connect(); - transport.onData((data: any) => { - this.onData(data); - }); - resolve(); - }); - }); - return Promise.all(promises); + return Promise.all(this.transports.map(async (transport) => { + transport.onData(this.onData.bind(this)); + await transport.connect(); + })); } + public async request(method: string, params: any): Promise { - await this.connectPromise; + const i = (++this.lastId).toString(); + + // naively grab first transport and use it + const transport = this.transports[0]; + + const payload: IJSONRPCRequest = { + jsonrpc: "2.0", + id: i, + method, + params, + }; + return new Promise((resolve, reject) => { - const i = id++; - // naively grab first transport and use it - const transport = this.transports[0]; - this.requests[i] = { - resolve, - reject, - }; - transport.sendData(JSON.stringify({ - jsonrpc: "2.0", - id: i, - method, - params, - })); - }); + this.requests[i] = { resolve, reject }; + + if (this.batchStarted) { + this.batch.push(payload); + } else { + transport.sendData(JSON.stringify(payload)); + } + }).finally(() => this.requests[i] = undefined); } + public close(): void { this.transports.forEach((transport) => { transport.close(); }); } - private onData(data: string): void { - const parsedData = JSON.parse(data); - if (typeof parsedData.result === "undefined" && typeof parsedData.error === "undefined") { - return; + + /** + * Begins a batch call by setting the [[RequestManager.batchStarted]] flag to `true`. + * + * [[RequestManager.batch]] is a singleton - only one batch can exist at a given time, per [[RequestManager]]. + * + */ + public startBatch(): void { + if (this.batchStarted) { return; } + this.batchStarted = true; + } + + public stopBatch(): void { + if (this.batchStarted === false) { + throw new Error("cannot end that which has never started"); } - const req = this.requests[parsedData.id]; - if (req === undefined) { + + if (this.batch.length === 0) { return; } - // resolve promise for id - if (parsedData.error) { - req.reject(parsedData.error); - } else { - req.resolve(parsedData.result); - } - delete this.requests[parsedData.id]; + + const batch = JSON.stringify(this.batch); + this.batch = []; + this.transports[0].sendData(batch); + } + + private onData(data: string): void { + const parsedData: IJSONRPCResponse[] | IJSONRPCResponse = JSON.parse(data); + const results = parsedData instanceof Array ? parsedData : [parsedData]; + + results.forEach((response) => { + const id = typeof response.id === "string" ? response.id : response.id.toString(); + const promiseForResult = this.requests[id]; + if (promiseForResult === undefined) { + throw new Error( + `Received an unrecognized response id: ${response.id}. Valid ids are: ${Object.keys(this.requests)}`, + ); + } + + if (response.error) { + promiseForResult.reject(response.error); + } else if (response.result) { + promiseForResult.resolve(response.result); + } else { + promiseForResult.reject(new Error(`Malformed JSON-RPC response object: ${JSON.stringify(response)}`)); + } + }); } } diff --git a/src/index.test.ts b/src/index.test.ts index 40deb68..e32cd7e 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -1,17 +1,41 @@ import Client from "."; import RequestManager from "./RequestManager"; import EventEmitterTransport from "./transports/EventEmitterTransport"; +import { EventEmitter } from "events"; +jest.mock("./RequestManager"); + +const mockedRequestManager = RequestManager as jest.Mock; describe("client-js", () => { it("can be constructed", () => { - const c = new Client(new RequestManager([new EventEmitterTransport("foo://unique")])); + const emitter = new EventEmitter(); + const c = new Client(new RequestManager([new EventEmitterTransport(emitter, "from1", "to1")])); expect(!!c).toEqual(true); }); it("has a request method that returns a promise", () => { - const c = new Client(new RequestManager([new EventEmitterTransport("foo://unique")])); + const emitter = new EventEmitter(); + const c = new Client(new RequestManager([new EventEmitterTransport(emitter, "from1", "to1")])); expect(typeof c.request).toEqual("function"); expect(typeof c.request("my_method", null).then).toEqual("function"); }); + describe("startBatch", () => { + it("calls the requestManager.startBatch", () => { + const emitter = new EventEmitter(); + const rm = new mockedRequestManager([new EventEmitterTransport(emitter, "from1", "to1")]); + const c = new Client(rm); + c.startBatch(); + expect(mockedRequestManager.mock.instances[0].startBatch).toHaveBeenCalled(); + }); + }); + + describe("stopBatch", () => { + const emitter = new EventEmitter(); + const rm = new mockedRequestManager([new EventEmitterTransport(emitter, "from1", "to1")]); + const c = new Client(rm); + c.startBatch(); + c.stopBatch(); + expect(mockedRequestManager.mock.instances[0].startBatch).toHaveBeenCalled(); + }); }); diff --git a/src/index.ts b/src/index.ts index ff88926..8b4eaf9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,6 +10,7 @@ interface IClient { /** * OpenRPC Client JS is a browser-compatible JSON-RPC client with multiple transports and * multiple request managers to enable features like round-robin or fallback-by-position. + * * @example * ```typescript * import { RequestManager, HTTPTransport, Client } from '@open-rpc/client-js'; @@ -18,24 +19,57 @@ interface IClient { * const result = await client.request(‘addition’, [2, 2]); * // => { jsonrpc: '2.0', id: 1, result: 4 } * ``` + * */ - class Client implements IClient { public requestManager: RequestManager; constructor(requestManager: RequestManager) { this.requestManager = requestManager; } + /** + * Initiates [[RequestManager.startBatch]] in order to build a batch call. + * + * Subsequent calls to [[Client.request]] will be added to the batch. Once [[Client.stopBatch]] is called, the + * promises for the [[Client.request]] will then be resolved. If the [[RequestManager]] already has a batch in + * progress, this method is a noop. + * + * @example + * myClient.startBatch(); + * myClient.request("foo", ["bar"]).then(() => console.log('foobar')); + * myClient.request("foo", ["baz"]).then(() => console.log('foobaz')); + * myClient.stopBatch(); + */ + public startBatch(): void { + return this.requestManager.startBatch(); + } + + /** + * Initiates [[RequestManager.stopBatch]] in order to finalize and send the batch to the underlying transport. + * + * [[Client.stopBatch]] will send the [[Client.request]] calls made since the last [[Client.startBatch]] call. For + * that reason, [[Client.startBatch]] MUST be called before [[Client.stopBatch]]. + * + * @example + * myClient.startBatch(); + * myClient.request("foo", ["bar"]).then(() => console.log('foobar')); + * myClient.request("foo", ["baz"]).then(() => console.log('foobaz')); + * myClient.stopBatch(); + */ + public stopBatch(): void { + return this.requestManager.stopBatch(); + } + /** * A JSON-RPC call is represented by sending a Request object to a Server. * - * @param method A String containing the name of the method to be invoked. - * Method names that begin with the word rpc followed by a - * period character (U+002E or ASCII 46) are reserved for rpc-internal - * methods and extensions and MUST NOT be used for anything else. + * @param method A String containing the name of the method to be invoked. Method names that begin with the word rpc + * followed by a period character (U+002E or ASCII 46) are reserved for rpc-internal methods and extensions and + * MUST NOT be used for anything else. * @param params A Structured value that holds the parameter values to be used during the invocation of the method. */ - public request(method: string, params: any) { + public async request(method: string, params: any) { + await this.requestManager.connectPromise; return this.requestManager.request(method, params); } } diff --git a/src/transports/EventEmitterTransport.test.ts b/src/transports/EventEmitterTransport.test.ts index 5875836..4417126 100644 --- a/src/transports/EventEmitterTransport.test.ts +++ b/src/transports/EventEmitterTransport.test.ts @@ -1,21 +1,29 @@ import EventEmitterTransport from "./EventEmitterTransport"; +import { EventEmitter } from "events"; describe("EventEmitterTransport", () => { it("can connect", () => { - const eventEmitterTransport = new EventEmitterTransport("foo://bar"); + const emitter = new EventEmitter(); + const eventEmitterTransport = new EventEmitterTransport(emitter, "foo://in", "foo://out"); eventEmitterTransport.connect(); }); it("can close", () => { - const eventEmitterTransport = new EventEmitterTransport("foo://bar"); + const emitter = new EventEmitter(); + const reqUri = "from"; + const resUri = "to"; + const eventEmitterTransport = new EventEmitterTransport(emitter, reqUri, resUri); eventEmitterTransport.close(); }); it("can send and receive data", (done) => { - const eventEmitterTransport = new EventEmitterTransport("foo://bar"); + const emitter = new EventEmitter(); + const eventEmitterTransport = new EventEmitterTransport(emitter, "from1", "to1"); eventEmitterTransport.onData((data: any) => { const d = JSON.parse(data); expect(d.foo).toEqual("bar"); done(); }); - eventEmitterTransport.sendData(JSON.stringify({foo: "bar"})); + + const eventEmitterServerTransport = new EventEmitterTransport(emitter, "to1", "from1"); + eventEmitterServerTransport.sendData(JSON.stringify({foo: "bar"})); }); }); diff --git a/src/transports/EventEmitterTransport.ts b/src/transports/EventEmitterTransport.ts index 47c2702..fd0521e 100644 --- a/src/transports/EventEmitterTransport.ts +++ b/src/transports/EventEmitterTransport.ts @@ -3,20 +3,28 @@ import ITransport from "./Transport"; class EventEmitterTransport implements ITransport { public connection: EventEmitter; - constructor(uri: string) { - this.connection = new EventEmitter(); + private reqUri: string; + private resUri: string; + constructor(emitter: EventEmitter, reqUri: string, resUri: string) { + this.connection = emitter; + this.reqUri = reqUri; + this.resUri = resUri; } + public connect(): Promise { return Promise.resolve(); } + public onData(callback: (data: string) => any) { - this.connection.addListener("message", (data: any) => { + this.connection.on(this.reqUri, (data: any) => { callback(data); }); } + public sendData(data: string) { - this.connection.emit("message", data); + this.connection.emit(this.resUri, data); } + public close() { this.connection.removeAllListeners(); }