diff --git a/jest.config.js b/jest.config.js index 29e3b53..22fb4be 100644 --- a/jest.config.js +++ b/jest.config.js @@ -5,5 +5,6 @@ module.exports = { "restoreMocks": true, "rootDir": "./src", "testEnvironment": "jsdom", - "preset": "ts-jest" + "preset": "ts-jest", + "coveragePathIgnorePatterns": ["Error.ts"], } diff --git a/package-lock.json b/package-lock.json index cd0e82c..8ec168b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4653,6 +4653,11 @@ "integrity": "sha1-NbCYdbT/SfJqd35QmzCQoyJr8ks=", "dev": true }, + "strict-event-emitter-types": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/strict-event-emitter-types/-/strict-event-emitter-types-2.0.0.tgz", + "integrity": "sha512-Nk/brWYpD85WlOgzw5h173aci0Teyv8YdIAEtV+N88nDB0dLlazZyJMIsN6eo1/AR61l+p6CJTG1JIyFaoNEEA==" + }, "string-length": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/string-length/-/string-length-2.0.0.tgz", diff --git a/package.json b/package.json index a802481..e7a0761 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "dependencies": { "isomorphic-fetch": "^2.2.1", "isomorphic-ws": "^4.0.1", + "strict-event-emitter-types": "^2.0.0", "ws": "^7.0.0" } } diff --git a/src/Error.test.ts b/src/Error.test.ts new file mode 100644 index 0000000..3531caf --- /dev/null +++ b/src/Error.test.ts @@ -0,0 +1,19 @@ +import { convertJSONToRPCError, JSONRPCError } from "./Error"; +import { generateMockErrorResponse } from "./__mocks__/requestData"; + +describe("Error test", () => { + + it("should convert payload to JSONRPC error ", () => { + + let err = convertJSONToRPCError("message"); + expect(err instanceof Error).toBe(true); + err = convertJSONToRPCError(generateMockErrorResponse(1, "somedata")); + expect(err instanceof Error).toBe(true); + }); + + it("should construct JSONRPCError", () => { + const err = new JSONRPCError("test", 9999); + const err2 = new JSONRPCError("test", 9999, "testdata"); + }); + +}); diff --git a/src/Error.ts b/src/Error.ts new file mode 100644 index 0000000..355e8dc --- /dev/null +++ b/src/Error.ts @@ -0,0 +1,23 @@ +export const ERR_TIMEOUT = 7777; +export const ERR_MISSIING_ID = 7878; +export const ERR_UNKNOWN = 7979; + +export class JSONRPCError extends Error { + public message: string; + public code: number; + public data: any; + constructor(message: string, code: number, data?: any) { + super(message); + this.message = message; + this.code = code; + this.data = data; + } +} + +export const convertJSONToRPCError = (payload: any): JSONRPCError => { + if (payload.error) { + const { message, code, data } = payload.error; + return new JSONRPCError(message, code, data); + } + return new JSONRPCError("Unknown error", ERR_UNKNOWN, payload); +}; diff --git a/src/Request.ts b/src/Request.ts new file mode 100644 index 0000000..dc0f5fe --- /dev/null +++ b/src/Request.ts @@ -0,0 +1,89 @@ + +export type JSONRPCRequestData = IJSONRPCData | IBatchRequest[] ; + +export interface IJSONRPCData { + internalID: string | number; + request: IJSONRPCRequest | IJSONRPCNotification; +} + +export interface IBatchRequest { + resolve: (data: any) => void; + reject: (data: any) => void; + request: IJSONRPCData; // IJSONRPCNotification | IJSONRPCRequest; + } + +export interface IJSONRPCRequest { + jsonrpc: "2.0"; + id: string | number; + method: string; + params: any[] | object; +} + +export interface IJSONRPCError { + code: number; + message: string; + data: any; +} + +export interface IJSONRPCResponse { + jsonrpc: "2.0"; + id: string | number; // can also be null + result?: any; + error?: IJSONRPCError; +} + +export interface IJSONRPCNotificationResponse { + jsonrpc: "2.0"; + id?: null | undefined; + result?: any; + error?: IJSONRPCError; +} + +export interface IJSONRPCNotification { + jsonrpc: "2.0"; + id?: null | undefined; + method: string; + params: any[] | object; +} + +interface IRPCRequest { + method: string; + params: any[]; + type: "single"; +} + +interface IBatchRPCRequest { + type: "batch"; + batch: IJSONRPCRequest[]; +} + +export type Request = IRPCRequest | IBatchRPCRequest; + +export const isNotification = (data: IJSONRPCData): boolean => { + return (data.request.id === undefined || data.request.id === null); +}; + +export const getBatchRequests = (data: JSONRPCRequestData): IJSONRPCData[] => { + if (data instanceof Array) { + return data.filter((datum) => { + const id = datum.request.request.id; + return id !== null && id !== undefined; + }).map((batchRequest: IBatchRequest) => { + return batchRequest.request; + }); + } + return []; +}; +export const getNotifications = (data: JSONRPCRequestData): IJSONRPCData[] => { + if (data instanceof Array) { + return data.filter((datum) => { + return isNotification(datum.request); + }).map((batchRequest: IBatchRequest) => { + return batchRequest.request; + }); + } + if (isNotification(data)) { + return [data]; + } + return []; +}; diff --git a/src/RequestManager.test.ts b/src/RequestManager.test.ts index 1b2d5f1..25f991c 100644 --- a/src/RequestManager.test.ts +++ b/src/RequestManager.test.ts @@ -1,31 +1,18 @@ import RequestManager from "./RequestManager"; import EventEmitterTransport from "./transports/EventEmitterTransport"; import { EventEmitter } from "events"; +import { addMockServerTransport } from "./__mocks__/eventEmitter"; +import { JSONRPCError } from "./Error"; describe("client-js", () => { - it("can be constructed", () => { + it("can be constructed and connect", () => { const emitter = new EventEmitter(); const transport = new EventEmitterTransport(emitter, "from1", "to1"); const c = new RequestManager([transport]); expect(!!c).toEqual(true); }); - it("has a request method that returns a promise", () => { - const emitter = new EventEmitter(); - const transport = new EventEmitterTransport(emitter, "from1", "to1"); - const c = new RequestManager([transport]); - expect(typeof c.request).toEqual("function"); - expect(typeof c.request("my_method", null).then).toEqual("function"); - }); - - it("can connect", async () => { - const emitter = new EventEmitter(); - const transport = new EventEmitterTransport(emitter, "from1", "to1"); - const c = new RequestManager([transport]); - return c.connect(); - }); - it("can close", () => { const emitter = new EventEmitter(); const transport = new EventEmitterTransport(emitter, "from1", "to1"); @@ -35,153 +22,94 @@ describe("client-js", () => { it("can send a request", async () => { const emitter = new EventEmitter(); - const transport = new EventEmitterTransport(emitter, "from1", "to1"); - const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + addMockServerTransport(emitter, "to1://local/rpc-request", "from1"); + const transport = new EventEmitterTransport(emitter, "from1", "to1://local/rpc-request"); const c = new RequestManager([transport]); - await c.connect(); - const reqPromise = c.request("foo", []); - serverTransport.sendData(JSON.stringify({ id: 0, result: { foo: "foofoo" } })); - await expect(reqPromise).resolves.toEqual({ foo: "foofoo" }); + const result = await c.request("foo", ["bar"]); + expect(result.method).toEqual("foo"); + expect(result.params).toEqual(["bar"]); }); - it("can error on malformed response", (done) => { + it("can error on error response", async () => { const emitter = new EventEmitter(); - const transport = new EventEmitterTransport(emitter, "from1", "to1"); - const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + addMockServerTransport(emitter, "to1://local/rpc-error", "from1"); + const transport = new EventEmitterTransport(emitter, "from1", "to1://local/rpc-error"); const c = new RequestManager([transport]); - c.connect().then(() => { - c.request("foo", []).catch((e) => { - expect(e.message).toContain("Malformed"); - done(); + await expect(c.request("foo", ["bar"])).rejects.toThrowError("Error message"); + }); + + it("can error on malformed response and recieve error", async () => { + const emitter = new EventEmitter(); + addMockServerTransport(emitter, "to1://local/rpc-garbage", "from1"); + const transport = new EventEmitterTransport(emitter, "from1", "to1://local/rpc-garbage"); + const c = new RequestManager([transport]); + const unknownError = new Promise((resolve) => { + c.requestChannel.on("error", (d) => { + resolve(d); }); - serverTransport.sendData(JSON.stringify({ id: 0, foo: "bar" })); }); + await expect(c.request("foo", ["bar"], false, 1000)) + .rejects.toThrowError("Request timeout request took longer than 1000 ms to resolve"); + const formatError = await unknownError as JSONRPCError; + expect(formatError.message).toContain("Bad response format"); }); it("can error on batchng a request", async () => { const emitter = new EventEmitter(); const transport = new EventEmitterTransport(emitter, "from1", "to1"); const c = new RequestManager([transport]); - await c.connect(); expect(() => c.stopBatch()).toThrow(); }); - it("can return errors on batch requests", (done) => { + it("can return errors on batch requests", async () => { const emitter = new EventEmitter(); - const transport = new EventEmitterTransport(emitter, "from1", "to1"); - const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + addMockServerTransport(emitter, "to1://local/rpc-error", "from1"); + const transport = new EventEmitterTransport(emitter, "from1", "to1://local/rpc-error"); const c = new RequestManager([transport]); - c.connect().then(() => { - c.startBatch(); - const requests = [ - c.request("foo", []), - c.request("foo", []), - ]; - Promise.all(requests).catch((e) => { - expect(e).toEqual({ - code: 509, - message: "too much 509", - data: { - test: "data", - }, - }); - c.close(); - done(); - }); - c.stopBatch(); - serverTransport.sendData(JSON.stringify([ - { - jsonrpc: "2.0", - id: "0", - error: { - code: 509, - message: "too much 509", - data: { - test: "data", - }, - }, - }, - { - jsonrpc: "2.0", - id: "1", - result: "bar", - }, - ])); - - }); + c.startBatch(); + const requests = [ + c.request("foo", ["bar"]), + c.request("foo", ["bar"]), + ]; + c.stopBatch(); + await expect(Promise.all(requests)).rejects.toThrowError("Error message"); }); - it("can batch a request", (done) => { + it("can batch a request", async () => { + const emitter = new EventEmitter(); - const transport = new EventEmitterTransport(emitter, "from1", "to1"); - const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + addMockServerTransport(emitter, "to1://local/rpc-request", "from1"); + const transport = new EventEmitterTransport(emitter, "from1", "to1://local/rpc-request"); const c = new RequestManager([transport]); - c.connect().then(() => { - c.startBatch(); - const requests = [ + c.startBatch(); + const requests = [ c.request("foo", []), - c.request("foo", []), - ]; - c.stopBatch(); - Promise.all(requests).then(([a, b]) => { - expect(a).toEqual("foo"); - expect(b).toEqual("bar"); - c.close(); - done(); - }); - serverTransport.sendData(JSON.stringify([ - { - jsonrpc: "2.0", - id: 0, - result: "foo", - }, - { - jsonrpc: "2.0", - id: 1, - result: "bar", - }, - ])); - }); + c.request("foo", ["bar"]), + ]; + c.stopBatch(); + const [a, b] = await Promise.all(requests); + expect(a.method).toEqual("foo"); + expect(b.method).toEqual("foo"); + expect(a.params).toEqual([]); + expect(b.params).toEqual(["bar"]); }); - it("can send a request and error", (done) => { - const emitter = new EventEmitter(); - const transport = new EventEmitterTransport(emitter, "from1", "to1"); - const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); - const c = new RequestManager([transport]); - c.connect().then(() => { - c.request("foo", []) - .catch((e) => { - expect(e.message).toEqual("out of order"); - done(); - }); - serverTransport.sendData(JSON.stringify({ - jsonrpc: "2.0", - id: 0, - error: { - code: 0, - message: "out of order", - data: { - foo: "bar", - }, - }, - })); - }); - }); + it("can batch a notifications", async () => { - it("onData throws if the ID is not found", async () => { const emitter = new EventEmitter(); - const transport = new EventEmitterTransport(emitter, "from1", "to1"); - const serverTransport = new EventEmitterTransport(emitter, "to1", "from1"); + addMockServerTransport(emitter, "to1://local/rpc-request", "from1"); + const transport = new EventEmitterTransport(emitter, "from1", "to1://local/rpc-request"); + const c = new RequestManager([transport]); - await c.connect(); - expect(() => serverTransport.sendData(JSON.stringify({ - jsonrpc: "2.0", - id: 10, - result: 123, - }))).toThrow("Received an unrecognized response id: 10. Valid ids are: "); + c.startBatch(); + const requests = [ + c.request("foo", [], true), + c.request("foo", ["bar"], true), + ]; + c.stopBatch(); + const [a, b] = await Promise.all(requests); }); describe("stopBatch", () => { diff --git a/src/RequestManager.ts b/src/RequestManager.ts index 59d785d..8afcfe2 100644 --- a/src/RequestManager.ts +++ b/src/RequestManager.ts @@ -1,78 +1,60 @@ -import ITransport from "./transports/Transport"; +import { Transport } from "./transports/Transport"; +import { IJSONRPCRequest, IJSONRPCNotification, IBatchRequest } from "./Request"; +import { JSONRPCError } from "./Error"; +import StrictEventEmitter from "strict-event-emitter-types"; +import { EventEmitter } from "events"; -interface IJSONRPCRequest { - jsonrpc: "2.0"; - id: string | number; - method: string; - params: any[] | object; -} -interface IJSONRPCError { - code: number; - message: string; - data: any; -} - -interface IJSONRPCResponse { - jsonrpc: "2.0"; - id: string | number; // can also be null - result?: any; - error?: IJSONRPCError; -} +export type RequestChannel = StrictEventEmitter; -interface IJSONRPCNotification { - jsonrpc: "2.0"; - method: string; - params: any[] | object; +export interface IRequestEvents { + "error": (err: JSONRPCError) => void; + "notification": (data: any) => void; } - /* ** Naive Request Manager, only use 1st transport. * A more complex request manager could try each transport. * If a transport fails, or times out, move on to the next. */ + class RequestManager { - public transports: ITransport[]; + public transports: Transport[]; public connectPromise: Promise; - public batch: IJSONRPCRequest[] = []; + public batch: IBatchRequest[] = []; + public requestChannel: RequestChannel; private requests: any; private batchStarted: boolean = false; private lastId: number = -1; - constructor(transports: ITransport[]) { + constructor(transports: Transport[]) { this.transports = transports; this.requests = {}; this.connectPromise = this.connect(); + this.requestChannel = new EventEmitter(); } public connect(): Promise { return Promise.all(this.transports.map(async (transport) => { - transport.onData(this.onData.bind(this)); + transport.subscribe("error", this.handleError.bind(this)); + transport.subscribe("notification", this.handleNotification.bind(this)); await transport.connect(); })); } + public getPrimaryTransport(): Transport { + return this.transports[0]; + } - public async request(method: string, params: any): Promise { - const i = (++this.lastId).toString(); - + public async request(method: string, params: any[], notification: boolean = false, timeout?: number): Promise { + const internalID = (++this.lastId).toString(); + const id = notification ? null : internalID; // naively grab first transport and use it - const transport = this.transports[0]; - - const payload: IJSONRPCRequest = { - jsonrpc: "2.0", - id: i, - method, - params, - }; - - return new Promise((resolve, reject) => { - this.requests[i] = { resolve, reject }; - - if (this.batchStarted) { - this.batch.push(payload); - } else { - transport.sendData(JSON.stringify(payload)); - } - }).finally(() => this.requests[i] = undefined); + const payload = {request: this.makeRequest(method, params, id) , internalID}; + if (this.batchStarted) { + const result = new Promise((resolve, reject) => { + this.batch.push({ resolve, reject, request: payload }); + }); + return result; + } + return this.getPrimaryTransport().sendData(payload, timeout); } public close(): void { @@ -88,7 +70,6 @@ class RequestManager { * */ public startBatch(): void { - if (this.batchStarted) { return; } this.batchStarted = true; } @@ -98,36 +79,32 @@ class RequestManager { } if (this.batch.length === 0) { + this.batchStarted = false; return; } - const batch = JSON.stringify(this.batch); + this.getPrimaryTransport().sendData(this.batch); this.batch = []; - this.transports[0].sendData(batch); + this.batchStarted = false; } - private onData(data: string): void { - const parsedData: IJSONRPCResponse[] | IJSONRPCResponse = JSON.parse(data); - const results = parsedData instanceof Array ? parsedData : [parsedData]; - - results.forEach((response) => { - const id = typeof response.id === "string" ? response.id : response.id.toString(); - const promiseForResult = this.requests[id]; - if (promiseForResult === undefined) { - throw new Error( - `Received an unrecognized response id: ${response.id}. Valid ids are: ${Object.keys(this.requests)}`, - ); - } - - if (response.error) { - promiseForResult.reject(response.error); - } else if (response.result) { - promiseForResult.resolve(response.result); - } else { - promiseForResult.reject(new Error(`Malformed JSON-RPC response object: ${JSON.stringify(response)}`)); - } - }); + private makeRequest( method: string, + params: any[] | object, + id?: number | string | null): IJSONRPCRequest | IJSONRPCNotification { + if (id) { + return { jsonrpc: "2.0", id, method, params }; + } + return { jsonrpc: "2.0", method, params }; + } + + private handleError(data: JSONRPCError) { + this.requestChannel.emit("error", data); + } + + private handleNotification(data: any) { + this.requestChannel.emit(data); } + } export default RequestManager; diff --git a/src/__mocks__/eventEmitter.ts b/src/__mocks__/eventEmitter.ts new file mode 100644 index 0000000..2aa247e --- /dev/null +++ b/src/__mocks__/eventEmitter.ts @@ -0,0 +1,10 @@ +import { EventEmitter } from "events"; +import * as req from "./requestData"; +export const addMockServerTransport = (emitter: EventEmitter, reqUri: string, resUri: string) => { + emitter.on(reqUri, (data) => { + const res = req.generateMockResponseData(reqUri, data); + if (res) { + emitter.emit(resUri, res); + } + }); +}; diff --git a/src/__mocks__/isomorphic-fetch.ts b/src/__mocks__/isomorphic-fetch.ts index d24f65a..8d7c8f7 100644 --- a/src/__mocks__/isomorphic-fetch.ts +++ b/src/__mocks__/isomorphic-fetch.ts @@ -1,7 +1,12 @@ +import * as req from "./requestData"; + const Fetch = (url: string, options: any): Promise => { + if (url.match(/crash/)) { + throw new Error("Random Segfault that crashes fetch"); + } const resultPromise = { text: () => { - return Promise.resolve(options.body); + return Promise.resolve(req.generateMockResponseData(url, options.body)); }, }; return Promise.resolve(resultPromise); diff --git a/src/__mocks__/isomorphic-ws.ts b/src/__mocks__/isomorphic-ws.ts index a9763dc..a60e7ab 100644 --- a/src/__mocks__/isomorphic-ws.ts +++ b/src/__mocks__/isomorphic-ws.ts @@ -1,7 +1,11 @@ +import * as req from "./requestData"; + class WebSocket { private callbacks: any; - constructor(uri: string, props: any) { + private url: string; + constructor(url: string, props: any) { this.callbacks = {}; + this.url = url; } public addEventListener(eventName: string, callback: any) { this.callbacks[eventName] = callback; @@ -14,10 +18,21 @@ class WebSocket { public removeEventListener(eventName: string, callback: any) { delete this.callbacks[eventName]; } - public send(data: any) { - Object.entries(this.callbacks).forEach(([eventName, callback]: [string, any]) => { + public send(data: any, callback: (err?: Error) => void) { + + if (this.url.match(/crash-null/)) { + callback(); + return; + } + if (this.url.match(/crash/)) { + callback(new Error("Random Segfault that crashes fetch")); + return; + } + + Object.entries(this.callbacks).forEach(([eventName, cb]: [string, any]) => { if (eventName === "message") { - callback({data}); + cb({ data: req.generateMockResponseData(this.url, data) }); + callback(); } }); } @@ -25,4 +40,5 @@ class WebSocket { this.callbacks = {}; } } + export default WebSocket; diff --git a/src/__mocks__/requestData.ts b/src/__mocks__/requestData.ts new file mode 100644 index 0000000..08e071a --- /dev/null +++ b/src/__mocks__/requestData.ts @@ -0,0 +1,90 @@ +import * as req from "../Request"; +import url from "url"; + +export const generateMockNotificationRequest = (method: string, params: any[]): req.IJSONRPCNotification => { + return { + id: null, + jsonrpc: "2.0", + method, + params, + }; +}; + +export const generateMockRequest = (id: number, method: string, params: any[]): req.IJSONRPCRequest => { + return { + id, + jsonrpc: "2.0", + method, + params, + }; +}; + +export const generateMockResponse = (id: number, result: any, error?: any): req.IJSONRPCResponse => { + return { + id, + jsonrpc: "2.0", + result, + error, + }; +}; + +export const generateMockNotificationResponse = (result: any, error?: any): req.IJSONRPCNotificationResponse => { + return { + jsonrpc: "2.0", + result, + error, + }; +}; + +export const generateMockErrorResponse = (id: number, data: any): req.IJSONRPCResponse => { + return { + id, + jsonrpc: "2.0", + error: { + code: -32000, + message: "Error message", + data, + }, + }; +}; + +export const generateMockResponseData = (uri: string, data: any) => { + const parsedUrl = url.parse(uri); + const path = parsedUrl.path || ""; + const rpcNotification = path.search("rpc-notification"); + const rpcRequest = path.search("rpc-request"); + const rpcError = path.search("rpc-error"); + const rpcGarbage = path.search("rpc-garbage"); + if (rpcRequest > 0) { + return generateRequestResponse(false, data); + } + if (rpcError > 0) { + return generateRequestResponse(true, data); + } + if (rpcNotification > 0) { + return; + } + if (rpcGarbage > 0) { + return "Garbage Response"; + } + return data; +}; + +const generateSingleRequestResponse = (error: boolean, data: any) => { + if (error) { + return generateMockErrorResponse(data.id, data); + } + return generateMockResponse(data.id, data); +}; + +const generateRequestResponse = (error: boolean, data: any): string => { + let parsedReq: any = data; + + if (typeof data === "string") { + parsedReq = JSON.parse(data); + } + if (parsedReq instanceof Array) { + return JSON.stringify(parsedReq.map((parsed) => generateSingleRequestResponse(error, parsed))); + } + return JSON.stringify(generateSingleRequestResponse(error, parsedReq)); +}; diff --git a/src/index.test.ts b/src/index.test.ts index e32cd7e..5571dbe 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -3,9 +3,6 @@ import RequestManager from "./RequestManager"; import EventEmitterTransport from "./transports/EventEmitterTransport"; import { EventEmitter } from "events"; -jest.mock("./RequestManager"); - -const mockedRequestManager = RequestManager as jest.Mock; describe("client-js", () => { it("can be constructed", () => { const emitter = new EventEmitter(); @@ -20,22 +17,38 @@ describe("client-js", () => { expect(typeof c.request("my_method", null).then).toEqual("function"); }); + it("has a notify method that returns a promise", () => { + const emitter = new EventEmitter(); + const c = new Client(new RequestManager([new EventEmitterTransport(emitter, "from1", "to1")])); + expect(typeof c.request).toEqual("function"); + expect(typeof c.notify("my_method", null).then).toEqual("function"); + }); + + it("can register error and subscription handlers", () => { + const emitter = new EventEmitter(); + const c = new Client(new RequestManager([new EventEmitterTransport(emitter, "from1", "to1")])); + // tslint:disable-next-line:no-empty + c.onError((err) => { }); + // tslint:disable-next-line:no-empty + c.onNotification((data) => { }); + }); + describe("startBatch", () => { - it("calls the requestManager.startBatch", () => { + it("calls startBatch", () => { const emitter = new EventEmitter(); - const rm = new mockedRequestManager([new EventEmitterTransport(emitter, "from1", "to1")]); + const rm = new RequestManager([new EventEmitterTransport(emitter, "from1", "to1")]); const c = new Client(rm); c.startBatch(); - expect(mockedRequestManager.mock.instances[0].startBatch).toHaveBeenCalled(); + // expect(mockedRequestManager.mock.instances[0].startBatch).toHaveBeenCalled(); }); }); - describe("stopBatch", () => { + describe("can call stopBatch", () => { const emitter = new EventEmitter(); - const rm = new mockedRequestManager([new EventEmitterTransport(emitter, "from1", "to1")]); + const rm = new RequestManager([new EventEmitterTransport(emitter, "from1", "to1")]); const c = new Client(rm); c.startBatch(); c.stopBatch(); - expect(mockedRequestManager.mock.instances[0].startBatch).toHaveBeenCalled(); }); + }); diff --git a/src/index.ts b/src/index.ts index 8b4eaf9..61ee3ff 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,6 +2,7 @@ import RequestManager from "./RequestManager"; import EventEmitterTransport from "./transports/EventEmitterTransport"; import HTTPTransport from "./transports/HTTPTransport"; import WebSocketTransport from "./transports/WebSocketTransport"; +import { JSONRPCError } from "./Error"; interface IClient { request(method: string, params: any): Promise; @@ -68,9 +69,22 @@ class Client implements IClient { * MUST NOT be used for anything else. * @param params A Structured value that holds the parameter values to be used during the invocation of the method. */ - public async request(method: string, params: any) { + public async request(method: string, params: any, timeout?: number) { await this.requestManager.connectPromise; - return this.requestManager.request(method, params); + return this.requestManager.request(method, params, false, timeout); + } + + public async notify(method: string, params: any) { + await this.requestManager.connectPromise; + return this.requestManager.request(method, params, true); + } + + public onNotification(callback: (data: any) => void) { + this.requestManager.requestChannel.addListener("notification", callback); + } + + public onError(callback: (data: JSONRPCError) => void) { + this.requestManager.requestChannel.addListener("error", callback); } } diff --git a/src/transports/EventEmitterTransport.test.ts b/src/transports/EventEmitterTransport.test.ts index 9cb2707..b58f6ee 100644 --- a/src/transports/EventEmitterTransport.test.ts +++ b/src/transports/EventEmitterTransport.test.ts @@ -1,12 +1,16 @@ import EventEmitterTransport from "./EventEmitterTransport"; import { EventEmitter } from "events"; +import { generateMockRequest, generateMockNotificationRequest } from "../__mocks__/requestData"; +import { addMockServerTransport } from "../__mocks__/eventEmitter"; describe("EventEmitterTransport", () => { - it("can connect", () => { + + it("can connect", async () => { const emitter = new EventEmitter(); const eventEmitterTransport = new EventEmitterTransport(emitter, "foo://in", "foo://out"); - eventEmitterTransport.connect(); + await eventEmitterTransport.connect(); }); + it("can close", () => { const emitter = new EventEmitter(); const reqUri = "from"; @@ -14,33 +18,55 @@ describe("EventEmitterTransport", () => { const eventEmitterTransport = new EventEmitterTransport(emitter, reqUri, resUri); eventEmitterTransport.close(); }); - it("can send and receive data", (done) => { + + it("can send and receive data", async () => { const emitter = new EventEmitter(); - const eventEmitterTransport = new EventEmitterTransport(emitter, "from1", "to1"); - eventEmitterTransport.connect().then(() => { - const eventEmitterServerTransport = new EventEmitterTransport(emitter, "to1", "from1"); - eventEmitterServerTransport.sendData(JSON.stringify({ foo: "bar" })); - }); - eventEmitterTransport.onData((data: any) => { - const d = JSON.parse(data); - expect(d.foo).toEqual("bar"); - done(); + addMockServerTransport(emitter, "to1://asdf/rpc-request", "from1"); + const eventEmitterTransport = new EventEmitterTransport(emitter, "from1", "to1://asdf/rpc-request"); + await eventEmitterTransport.connect(); + const result = await eventEmitterTransport.sendData({ + request: generateMockRequest(1, "foo", ["bar"]), + internalID: 1, }); + expect(result.method).toEqual("foo"); + expect(result.params).toEqual(["bar"]); }); - it("can handle multiple calls to onData", (done) => { + + it("can send notifications", async () => { const emitter = new EventEmitter(); - const eventEmitterTransport = new EventEmitterTransport(emitter, "from1", "to1"); - eventEmitterTransport.connect().then(() => { - const eventEmitterServerTransport = new EventEmitterTransport(emitter, "to1", "from1"); - eventEmitterServerTransport.sendData(JSON.stringify({ foo: "bar" })); - }); - eventEmitterTransport.onData(() => { - // noop - }); - eventEmitterTransport.onData((data: any) => { - const d = JSON.parse(data); - expect(d.foo).toEqual("bar"); - done(); + addMockServerTransport(emitter, "to1://asdf/rpc-notification", "from1"); + const eventEmitterTransport = new EventEmitterTransport(emitter, "from1", "to1://asdf/rpc-notification"); + await eventEmitterTransport.connect(); + const result = await eventEmitterTransport.sendData({ + request: generateMockNotificationRequest("foo", ["bar"]), + internalID: 1, }); + expect(result).toEqual(undefined); }); + + it("should throw error on bad response", async () => { + const emitter = new EventEmitter(); + addMockServerTransport(emitter, "to1://asdf/rpc-error", "from1"); + const eventEmitterTransport = new EventEmitterTransport(emitter, "from1", "to1://asdf/rpc-error"); + await eventEmitterTransport.connect(); + await expect(eventEmitterTransport.sendData({ + request: generateMockRequest(1, "foo", ["bar"]), + internalID: 1, + })) + .rejects.toThrowError("Error message"); + }); + + it("should throw error on bad protocol", async () => { + const emitter = new EventEmitter(); + addMockServerTransport(emitter, "to1://asdf/rpc-error", "from1"); + const eventEmitterTransport = new EventEmitterTransport(emitter, "from1", "to1://asdf/rpc-error"); + await eventEmitterTransport.connect(); + eventEmitterTransport.connection.emit = () => { throw new Error("failed protocol"); }; + await expect(eventEmitterTransport.sendData({ + request: generateMockRequest(1, "foo", ["bar"]), + internalID: 1, + })) + .rejects.toThrowError("failed protocol"); + }); + }); diff --git a/src/transports/EventEmitterTransport.ts b/src/transports/EventEmitterTransport.ts index 2d85895..2e803e2 100644 --- a/src/transports/EventEmitterTransport.ts +++ b/src/transports/EventEmitterTransport.ts @@ -1,33 +1,40 @@ import { EventEmitter } from "events"; -import ITransport from "./Transport"; +import { Transport } from "./Transport"; +import { JSONRPCRequestData, getNotifications } from "../Request"; +import { JSONRPCError, ERR_UNKNOWN } from "../Error"; -class EventEmitterTransport implements ITransport { +class EventEmitterTransport extends Transport { public connection: EventEmitter; private reqUri: string; private resUri: string; - private onDataCallbacks: any[]; - constructor(emitter: EventEmitter, reqUri: string, resUri: string) { - this.onDataCallbacks = []; - this.connection = emitter; + + constructor(destEmitter: EventEmitter, reqUri: string, resUri: string) { + super(); + this.connection = destEmitter; this.reqUri = reqUri; this.resUri = resUri; } public connect(): Promise { this.connection.on(this.reqUri, (data: any) => { - this.onDataCallbacks.map((callback: (data: string) => void) => { - callback(data); - }); + this.transportRequestManager.resolveResponse(data); }); return Promise.resolve(); } - public onData(callback: (data: string) => void) { - this.onDataCallbacks.push(callback); - } - - public sendData(data: string) { - this.connection.emit(this.resUri, data); + public sendData(data: JSONRPCRequestData, timeout?: number): Promise { + const prom = this.transportRequestManager.addRequest(data, timeout); + const notifications = getNotifications(data); + const parsedData = this.parseData(data); + try { + this.connection.emit(this.resUri, parsedData); + this.transportRequestManager.settlePendingRequest(notifications); + return prom; + } catch (e) { + const responseErr = new JSONRPCError(e.message, ERR_UNKNOWN, e); + this.transportRequestManager.settlePendingRequest(notifications, responseErr); + return Promise.reject(responseErr); + } } public close() { diff --git a/src/transports/HTTPTransport.test.ts b/src/transports/HTTPTransport.test.ts index 7c57d4e..6fc00cf 100644 --- a/src/transports/HTTPTransport.test.ts +++ b/src/transports/HTTPTransport.test.ts @@ -1,21 +1,61 @@ import HTTPTransport from "./HTTPTransport"; +import * as reqMocks from "../__mocks__/requestData"; describe("HTTPTransport", () => { it("can connect", () => { - const wst = new HTTPTransport("http://localhost:8545"); - return wst.connect(); + const httpTransport = new HTTPTransport("http://localhost:8545"); + return httpTransport.connect(); }); + it("can close", () => { - const wst = new HTTPTransport("http://localhost:8545"); - wst.close(); - }); - it("can send and receive data", (done) => { - const wst = new HTTPTransport("http://localhost:8545"); - wst.onData((data: any) => { - const d = JSON.parse(data); - expect(d.foo).toEqual("bar"); - done(); - }); - wst.sendData(JSON.stringify({foo: "bar"})); + const httpTransport = new HTTPTransport("http://localhost:8545"); + httpTransport.close(); + }); + + it("can send and retrieve request data", async () => { + const httpTransport = new HTTPTransport("http://localhost:8545/rpc-request"); + const data = reqMocks.generateMockRequest(1, "foo", ["bar"]); + const result = await httpTransport.sendData({ request: data, internalID: 1 }); + expect(result.method).toEqual("foo"); + expect(result.params).toEqual(["bar"]); + }); + + it("can send notification data", async () => { + const httpTransport = new HTTPTransport("http://localhost:8545/rpc-notification"); + const data = reqMocks.generateMockNotificationRequest("foo", ["bar"]); + const result = await httpTransport.sendData({ request: data, internalID: 1 }); + expect(result).toEqual(undefined); + }); + + it("should throw error on error response", async () => { + const httpTransport = new HTTPTransport("http://localhost:8545/rpc-error"); + const data = reqMocks.generateMockRequest(9, "foo", ["bar"]); + await expect(httpTransport.sendData({ request: data, internalID: 9 })).rejects.toThrowError("Error message"); }); + + it("should throw error on bad data response", async () => { + const httpTransport = new HTTPTransport("http://localhost:8545/rpc-garbage"); + const data = { request: reqMocks.generateMockRequest(9, "foo", ["bar"]), internalID: 9 }; + await expect(httpTransport.sendData(data)).rejects.toThrowError("Bad response format"); + }); + + it("should throw error on bad data response from a batch", async (done) => { + const httpTransport = new HTTPTransport("http://localhost:8545/rpc-garbage"); + const data = { + resolve: (d: any) => ({}), + reject: (e: Error) => { + expect(e.message).toContain("Bad response format"); + done(); + }, + request: { request: reqMocks.generateMockRequest(9, "foo", ["bar"]), internalID: 9 }, + }; + await expect(httpTransport.sendData([data])).rejects.toThrow("Bad response format"); + }); + + it("should throw error if unknown server crash", async () => { + const httpTransport = new HTTPTransport("http://localhost:8545/crash"); + const data = { request: reqMocks.generateMockRequest(9, "foo", ["bar"]), internalID: 9 }; + await expect(httpTransport.sendData(data)).rejects.toThrowError("Random Segfault that crashes fetch"); + }); + }); diff --git a/src/transports/HTTPTransport.ts b/src/transports/HTTPTransport.ts index f423d3e..4e3d3d9 100644 --- a/src/transports/HTTPTransport.ts +++ b/src/transports/HTTPTransport.ts @@ -1,37 +1,61 @@ import fetch from "isomorphic-fetch"; -import ITransport from "./Transport"; - -class HTTPTransport implements ITransport { - private uri: string; - private onDataCallbacks: any[]; +import { Transport } from "./Transport"; +import { JSONRPCRequestData, getNotifications, getBatchRequests } from "../Request"; +import { ERR_UNKNOWN, JSONRPCError } from "../Error"; +class HTTPTransport extends Transport { + public uri: string; constructor(uri: string) { - this.onDataCallbacks = []; + super(); this.uri = uri; } public connect(): Promise { return Promise.resolve(); } - public onData(callback: (data: string) => any) { - this.onDataCallbacks.push(callback); - } - public sendData(data: string) { - fetch(this.uri, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: data, - }).then((result) => { - return result.text(); - }).then((result) => { - this.onDataCallbacks.map((cb) => { - cb(result); + + public async sendData(data: JSONRPCRequestData, timeout?: number): Promise { + const prom = this.transportRequestManager.addRequest(data, timeout); + const notifications = getNotifications(data); + const batch = getBatchRequests(data); + try { + const result = await fetch(this.uri, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(this.parseData(data)), }); - }); + // requirements are that notifications are successfully sent + this.transportRequestManager.settlePendingRequest(notifications); + if (this.onlyNotifications(data)) { + return Promise.resolve(); + } + const body = await result.text(); + const responseErr = this.transportRequestManager.resolveResponse(body); + if (responseErr) { + // requirements are that batch requuests are successfully resolved + // this ensures that individual requests within the batch request are settled + this.transportRequestManager.settlePendingRequest(batch, responseErr); + return Promise.reject(responseErr); + } + } catch (e) { + const responseErr = new JSONRPCError(e.message, ERR_UNKNOWN, e); + this.transportRequestManager.settlePendingRequest(notifications, responseErr); + this.transportRequestManager.settlePendingRequest(getBatchRequests(data), responseErr); + return Promise.reject(responseErr); + } + return prom; } - public close(): void { - this.onDataCallbacks = []; + + // tslint:disable-next-line:no-empty + public close(): void { } + + private onlyNotifications = (data: JSONRPCRequestData) => { + if (data instanceof Array) { + return data.every((datum) => datum.request.request.id === null || datum.request.request.id === undefined); + } + return (data.request.id === null || data.request.id === undefined); } + } export default HTTPTransport; diff --git a/src/transports/Transport.ts b/src/transports/Transport.ts index 9715923..894982d 100644 --- a/src/transports/Transport.ts +++ b/src/transports/Transport.ts @@ -1,6 +1,71 @@ -export default interface ITransport { - connect(): Promise; - close(): void; - onData(callback: (data: string) => any): void; - sendData(data: string): void; +import { + JSONRPCRequestData, + IJSONRPCNotificationResponse, + IJSONRPCResponse, +} from "../Request"; + +import StrictEventEmitter from "strict-event-emitter-types"; +import { EventEmitter } from "events"; +import { JSONRPCError } from "../Error"; +import { TransportRequestManager } from "./TransportRequestManager"; + +interface ITransportEvents { + pending: (data: JSONRPCRequestData) => void; + notification: (data: IJSONRPCNotificationResponse) => void; + response: (data: IJSONRPCResponse) => void; + error: (data: JSONRPCError) => void; } + +type TransportEventName = keyof ITransportEvents; +export type TransportEventChannel = StrictEventEmitter; + +export abstract class Transport { + protected transportRequestManager: TransportRequestManager; + constructor() { + this.transportRequestManager = new TransportRequestManager(); + // add a noop for the error event to not require handling the error event + // tslint:disable-next-line:no-empty + this.transportRequestManager.transportEventChannel.on("error", () => { }); + } + + public abstract connect(): Promise; + public abstract close(): void; + public abstract async sendData(data: JSONRPCRequestData, timeout?: number): Promise; + + public subscribe(event: TransportEventName, handler: ITransportEvents[TransportEventName]) { + this.transportRequestManager.transportEventChannel.addListener(event, handler); + } + protected parseData(data: JSONRPCRequestData) { + if (data instanceof Array) { + return data.map((batch) => batch.request.request); + } + return data.request; + } +} + +export type promiseResolve = (r?: {} | PromiseLike<{}> | undefined) => void; +export type promiseReject = (r?: any) => void; +export interface IRequestPromise { + resolve: promiseResolve; + reject: promiseReject; +} + +export type NotificationResponse = "notification"; +export type RequestResponse = "response"; +export type BadResponse = "error"; + +export type TransportResponse = JSONRPCError | undefined; + +interface IHttpTransportResponse { + type: "http"; + id?: string | number; + error?: Error; + payload: string; +} + +interface IWSTransportResponse { + type: "ws"; + payload: string; +} + +export type TransportResponseData = IHttpTransportResponse | IWSTransportResponse; diff --git a/src/transports/TransportRequestManager.test.ts b/src/transports/TransportRequestManager.test.ts new file mode 100644 index 0000000..ff7590f --- /dev/null +++ b/src/transports/TransportRequestManager.test.ts @@ -0,0 +1,183 @@ +import * as reqData from "../__mocks__/requestData"; +import { TransportRequestManager } from "./TransportRequestManager"; +import { JSONRPCRequestData, IJSONRPCNotificationResponse, IBatchRequest } from "../Request"; + +describe("Transport Request Manager", () => { + let transportReqMan: TransportRequestManager; + beforeEach(() => { + transportReqMan = new TransportRequestManager(); + }); + + it("should emit pending request", (done) => { + transportReqMan.transportEventChannel.on("pending", (data: JSONRPCRequestData) => { + expect(data).toBeDefined(); + done(); + }); + transportReqMan.addRequest({ request: reqData.generateMockRequest(1, "foo", ["bar"]), internalID: 1 }, undefined); + }); + + it("should timeout pending request after 1s", async () => { + transportReqMan.transportEventChannel.on("pending", (data: JSONRPCRequestData) => { + expect(data).toBeDefined(); + }); + + const prom = transportReqMan.addRequest({ + request: reqData.generateMockRequest(1, "foo", ["bar"]), + internalID: 1, + }, 1000); + await expect(prom).rejects.toThrowError("timeout"); + }); + + it("should handle adding batch request", async () => { + const req = { request: reqData.generateMockRequest(1, "foo", ["bar"]), internalID: 1 }; + // tslint:disable-next-line:no-empty + const resolve = () => { }; + // tslint:disable-next-line:no-empty + const reject = () => { }; + const request: IBatchRequest[] = [{ resolve, reject, request: req }]; + transportReqMan.addRequest(request, undefined); + }); + + it("should error on missing id to resolve", () => { + const payload = JSON.stringify(reqData.generateMockResponse(9, "haha")); + const err = transportReqMan.resolveResponse(payload, false) as Error; + expect(err.message).toContain("Could not resolve"); + }); + + it("should error on missing id to resolve and emit error", (done) => { + transportReqMan.transportEventChannel.on("error", (e) => { + expect(e.message).toContain("Could not resolve"); + done(); + }); + const payload = JSON.stringify(reqData.generateMockResponse(9, "haha")); + const err = transportReqMan.resolveResponse(payload) as Error; + expect(err.message).toContain("Could not resolve"); + }); + + it("should add and reject pending requests", async () => { + const request = { request: reqData.generateMockRequest(1, "foo", ["bar"]), internalID: 1 }; + const prom = transportReqMan.addRequest(request, undefined); + transportReqMan.settlePendingRequest([request], new Error("rejecting")); + await expect(prom).rejects.toThrowError("rejecting"); + }); + + it("should not fail on invalid pending requests", () => { + const request = { request: reqData.generateMockRequest(1, "foo", ["bar"]), internalID: 1 }; + transportReqMan.settlePendingRequest([request], new Error("rejecting")); + }); + + it("should emit error on bad format for resolving a response", (done) => { + transportReqMan.transportEventChannel.on("error", (err) => { + expect(err.message).toContain("Bad response format"); + done(); + }); + transportReqMan.resolveResponse("{}"); + }); + + it("should not emit error on bad format for resolving a response", () => { + const err = transportReqMan.resolveResponse("{}", false) as Error; + expect(err.message).toContain("Bad response format"); + }); + + it("should emit response on response && resolve response", (done) => { + const res = reqData.generateMockResponse(1, "hello"); + // Add request to queue + const prom = transportReqMan.addRequest({ + request: reqData.generateMockRequest(1, "foo", ["bar"]), + internalID: 1, + }, undefined); + + // Verify that the response resolves the pending request and the response event fires + transportReqMan.transportEventChannel.on("response", async (responseData) => { + const result = await prom; + expect(responseData.result).toEqual(res.result); + expect(result).toEqual(res.result); + done(); + }); + + // Resolve pending request; + transportReqMan.resolveResponse(JSON.stringify(res)); + }); + + it("should emit response on batch request && resolve response", async (done) => { + const res = [reqData.generateMockResponse(1, "hello")]; + // Add request to queue + const requestData = { + request: reqData.generateMockRequest(1, "foo", ["bar"]), + internalID: 1, + }; + + const resolve = (data: any) => { + done(); + }; + + // tslint:disable-next-line:no-empty + const reject = () => { + }; + const prom = transportReqMan.addRequest([{ request: requestData, resolve, reject }], undefined); + + // Verify that the response resolves the pending request and the response event fires + transportReqMan.transportEventChannel.on("response", (responseData) => { + expect(responseData.result).toEqual(res[0].result); + expect(result).toEqual(res[0].result); + }); + + const result = await prom; + // Resolve pending request; + transportReqMan.resolveResponse(JSON.stringify(res), false); + }); + + it("should emit response on batch request && reject invalid response", () => { + const res = reqData.generateMockResponse(2, "hello"); + // Add request to queue + const requestData = { + request: reqData.generateMockRequest(1, "foo", ["bar"]), + internalID: 1, + }; + + // tslint:disable-next-line:no-empty + const resolve = (data: any) => { + }; + + // tslint:disable-next-line:no-empty + const reject = () => { }; + + transportReqMan.addRequest([{ request: requestData, resolve, reject }], undefined); + + // Resolve pending request; + const err = transportReqMan.resolveResponse(JSON.stringify([res]), false) as Error; + expect(err.message).toContain("Could not resolve"); + }); + + it("should emit notification on notification response", (done) => { + transportReqMan.transportEventChannel.on("notification", (data: IJSONRPCNotificationResponse) => { + expect(data.result).toEqual("hello"); + done(); + }); + transportReqMan.resolveResponse(JSON.stringify(reqData.generateMockNotificationResponse("hello"))); + }); + + it("should emit error on garbage response", (done) => { + transportReqMan.transportEventChannel.on("error", (err) => { + done(); + }); + transportReqMan.resolveResponse("garbage"); + }); + + it("should emit data on proper error response and reject req prom.", (done) => { + const prom = transportReqMan.addRequest({ + request: reqData.generateMockRequest(1, "foo", ["bar"]), + internalID: 1, + }, undefined); + transportReqMan.transportEventChannel.on("response", async (data) => { + if (data.error === undefined) { + throw new Error("Missing error"); + } + expect(data.error.data).toEqual("Bad terrible data"); + await expect(prom).rejects.toThrowError("Error message"); + done(); + }); + transportReqMan.resolveResponse(JSON.stringify(reqData.generateMockErrorResponse(1, "Bad terrible data"))); + }); + +}); diff --git a/src/transports/TransportRequestManager.ts b/src/transports/TransportRequestManager.ts new file mode 100644 index 0000000..87a1b68 --- /dev/null +++ b/src/transports/TransportRequestManager.ts @@ -0,0 +1,139 @@ +import { JSONRPCRequestData, IJSONRPCRequest, + IJSONRPCNotification, IJSONRPCNotificationResponse, + IJSONRPCResponse, IBatchRequest, IJSONRPCData, +} from "../Request"; +import { EventEmitter } from "events"; +import { JSONRPCError, ERR_TIMEOUT, ERR_UNKNOWN, ERR_MISSIING_ID, convertJSONToRPCError } from "../Error"; +import { promiseResolve, promiseReject, TransportEventChannel, TransportResponse, IRequestPromise } from "./Transport"; +export interface IPendingRequest { + resolve: promiseResolve; + reject: promiseReject; +} +export class TransportRequestManager { + public transportEventChannel: TransportEventChannel; + private pendingRequest: { + [id: string]: IPendingRequest; + }; + private pendingBatchRequest: { + [id: string]: boolean; + }; + constructor() { + this.pendingRequest = {}; + this.pendingBatchRequest = {}; + this.transportEventChannel = new EventEmitter(); + } + public addRequest(data: JSONRPCRequestData, timeout: number | undefined): Promise { + this.transportEventChannel.emit("pending", data); + if (data instanceof Array) { + this.addBatchReq(data, timeout); + return Promise.resolve(); + } + return this.addReq(data.internalID, timeout); + } + + public settlePendingRequest(request: IJSONRPCData[], error?: Error) { + request.forEach((req) => { + const resolver = this.pendingRequest[req.internalID]; + delete this.pendingBatchRequest[req.internalID]; + if (resolver === undefined) { + return; + } + if (error) { + resolver.reject(error); + return; + } + resolver.resolve(); + }); + } + + public resolveResponse(payload: string, emitError: boolean = true): TransportResponse { + let data: any = payload; + try { + data = JSON.parse(payload); + if (this.checkJSONRPC(data) === false) { + throw new Error("Bad response format"); + } + if (data instanceof Array) { + return this.resolveBatch(data, emitError); + } + return this.resolveRes(data, emitError); + } catch (e) { + const err = new JSONRPCError("Bad response format", ERR_UNKNOWN, payload); + if (emitError) { + this.transportEventChannel.emit("error", err); + } + return err; + } + } + + private addBatchReq(batches: IBatchRequest[], timeout: number | undefined) { + batches.forEach((batch) => { + const { resolve, reject } = batch; + const { internalID } = batch.request; + this.pendingBatchRequest[internalID] = true; + this.pendingRequest[internalID] = { resolve, reject }; + }); + return Promise.resolve(); + } + private addReq(id: string | number, timeout?: number) { + return new Promise((resolve, reject) => { + if (timeout) { + this.setRequestTimeout(id, timeout, reject); + } + this.pendingRequest[id] = { resolve, reject }; + }); + } + private checkJSONRPC(data: any) { + let payload = [data]; + if (data instanceof Array) { + payload = data; + } + return payload.every((datum) => (datum.result || datum.error)); + } + + private processResult(payload: any, prom: IRequestPromise) { + if (payload.error) { + const err = convertJSONToRPCError(payload); + prom.reject(err); + return; + } + prom.resolve(payload.result); + } + private resolveBatch(payload: Array, emitError: boolean): TransportResponse { + const results = payload.map((datum) => { + return this.resolveRes(datum, emitError); + }); + const errors = results.filter((result) => result); + if (errors.length > 0) { + return errors[0]; + } + return undefined; + } + + private resolveRes(data: IJSONRPCNotificationResponse | IJSONRPCResponse, emitError: boolean): TransportResponse { + const { id } = data; + if (id === undefined || id === null) { + this.transportEventChannel.emit("notification", data as IJSONRPCNotificationResponse); + return; + } + const status = this.pendingRequest[id]; + if (status) { + delete this.pendingRequest[id]; + this.processResult(data, status); + this.transportEventChannel.emit("response", data as IJSONRPCResponse); + return; + } + const err = new JSONRPCError(`Could not resolve ${id}`, ERR_MISSIING_ID); + if (emitError) { + this.transportEventChannel.emit("error", err); + } + return err; + } + + private setRequestTimeout(id: string | number, timeout: number, reject: promiseReject) { + setTimeout(() => { + delete this.pendingRequest[id]; + reject(new JSONRPCError(`Request timeout request took longer than ${timeout} ms to resolve`, ERR_TIMEOUT)); + }, timeout); + } +} diff --git a/src/transports/WebSocketTransport.test.ts b/src/transports/WebSocketTransport.test.ts index 69c49c3..ca55697 100644 --- a/src/transports/WebSocketTransport.test.ts +++ b/src/transports/WebSocketTransport.test.ts @@ -1,37 +1,49 @@ import WebSocketTransport from "./WebSocketTransport"; +import { generateMockRequest } from "../__mocks__/requestData"; describe("WebSocketTransport", () => { + it("can connect", () => { const wst = new WebSocketTransport("http://localhost:8545"); return wst.connect(); }); + it("can close", () => { const wst = new WebSocketTransport("http://localhost:8545"); wst.close(); }); - it("can send and receive data", (done) => { - const wst = new WebSocketTransport("http://localhost:8545"); - wst.connect().then(() => { - wst.sendData(JSON.stringify({ foo: "bar" })); - }); - wst.onData((data: any) => { - const d = JSON.parse(data); - expect(d.foo).toEqual("bar"); - done(); - }); + + it("can send and receive data", async () => { + const wst = new WebSocketTransport("http://localhost:8545/rpc-request"); + await wst.connect(); + const result = await wst.sendData({ request: generateMockRequest(1, "foo", ["bar"]), internalID: 1 }); + expect(result.method).toEqual("foo"); + expect(result.params).toEqual(["bar"]); }); - it("can handle multiple onData callbacks", (done) => { - const wst = new WebSocketTransport("http://localhost:8545"); - wst.connect().then(() => { - wst.sendData(JSON.stringify({ foo: "bar" })); - }); - wst.onData(() => { - // noop - }); - wst.onData((data: any) => { - const d = JSON.parse(data); - expect(d.foo).toEqual("bar"); - done(); - }); + + it("can send and receive data against potential timeout", async () => { + const wst = new WebSocketTransport("http://localhost:8545/rpc-request"); + await wst.connect(); + const result = await wst.sendData({ request: generateMockRequest(1, "foo", ["bar"]), internalID: 1 }, 10000); + expect(result.method).toEqual("foo"); + expect(result.params).toEqual(["bar"]); + }); + + it("can send and receive errors", async () => { + const wst = new WebSocketTransport("http://localhost:8545/rpc-error"); + await wst.connect(); + await expect(wst.sendData({ + request: generateMockRequest(1, "foo", ["bar"]), + internalID: 1, + })).rejects.toThrowError("Error message"); + }); + + it("can handle underlying transport crash", async () => { + const wst = new WebSocketTransport("http://localhost:8545/crash"); + await wst.connect(); + await expect(wst.sendData({ + request: generateMockRequest(1, "foo", ["bar"]), + internalID: 1, + })).rejects.toThrowError("Random Segfault that crashes fetch"); }); }); diff --git a/src/transports/WebSocketTransport.ts b/src/transports/WebSocketTransport.ts index 8f39f8a..7a49898 100644 --- a/src/transports/WebSocketTransport.ts +++ b/src/transports/WebSocketTransport.ts @@ -1,12 +1,16 @@ import WS from "isomorphic-ws"; -import ITransport from "./Transport"; +import { Transport } from "./Transport"; +import { JSONRPCRequestData, getNotifications, getBatchRequests } from "../Request"; +import { JSONRPCError, ERR_UNKNOWN } from "../Error"; -class WebSocketTransport implements ITransport { +class WebSocketTransport extends Transport { public connection: WS; - private onDataCallbacks: any[]; + public uri: string; + constructor(uri: string) { + super(); + this.uri = uri; this.connection = new WS(uri); - this.onDataCallbacks = []; } public connect(): Promise { return new Promise((resolve, reject) => { @@ -15,19 +19,28 @@ class WebSocketTransport implements ITransport { resolve(); }; this.connection.addEventListener("open", cb); - this.connection.addEventListener("message", (ev: { data: string }) => { - this.onDataCallbacks.map((callback: (data: string) => void) => { - callback(ev.data); - }); + this.connection.addEventListener("message", (message: { data: string }) => { + const { data } = message; + this.transportRequestManager.resolveResponse(data); }); }); } - public onData(callback: (data: string) => void) { - this.onDataCallbacks.push(callback); - } - public sendData(data: any) { - this.connection.send(data); + + public async sendData(data: JSONRPCRequestData, timeout: number | undefined = 5000): Promise { + let prom = this.transportRequestManager.addRequest(data, timeout); + const notifications = getNotifications(data); + this.connection.send(this.parseData(data), (err?: Error) => { + if (err) { + const jsonError = new JSONRPCError(err.message, ERR_UNKNOWN, err); + this.transportRequestManager.settlePendingRequest(notifications, jsonError); + this.transportRequestManager.settlePendingRequest(getBatchRequests(data), jsonError); + prom = Promise.reject(jsonError); + } + this.transportRequestManager.settlePendingRequest(notifications); + }); + return prom; } + public close(): void { this.connection.close(); }