From aa641ad7cdd1b7986e15265081a0264c0d37d2b7 Mon Sep 17 00:00:00 2001 From: Sergey Bakulin Date: Sun, 20 Sep 2020 22:44:19 +0300 Subject: [PATCH] feat: add the `fetchStream` method --- src/rest.ts | 39 ++++++++++++++++++++++++ src/stream.ts | 2 ++ test/rest.spec.ts | 76 +++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 115 insertions(+), 2 deletions(-) diff --git a/src/rest.ts b/src/rest.ts index f779def..a9961d4 100644 --- a/src/rest.ts +++ b/src/rest.ts @@ -2,6 +2,7 @@ import { createHmac } from "crypto"; import fetch from "node-fetch"; import FetchError from "./error"; +import JSONStream from "./stream"; export const ExanteDemoURL = "https://api-demo.exante.eu/"; export const ExanteLiveURL = "https://api-live.exante.eu/"; @@ -1308,6 +1309,21 @@ export class RestClient { return order; } + /** + * Make a request and return JSONStream + */ + public async fetchStream( + url: string | URL, + options: fetch.RequestInit = {} + ): Promise { + const headers = new fetch.Headers(options.headers || this.headers); + headers.set("Accept", "application/x-json-stream"); + + const stream = RestClient.fetchStream(url, { ...options, headers }); + + return stream; + } + /** * Get a JSON Web Token */ @@ -1398,6 +1414,29 @@ export class RestClient { return jwt; } + /** + * Make a request and return a stream + */ + public static async fetchStream( + url: string | URL, + options: fetch.RequestInit = {} + ): Promise { + const response = await fetch(url.toString(), { ...options }); + + if (!response.body) { + /* istanbul ignore next */ + throw new FetchError("Empty body", response); + } else if (!response.ok) { + throw new FetchError(response.statusText, response); + } + + const stream = new JSONStream(); + + response.body.pipe(stream); + + return stream; + } + /** * Make a request and parse the body as JSON */ diff --git a/src/stream.ts b/src/stream.ts index 77f1511..4bc96b1 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -23,3 +23,5 @@ export class JSONStream extends Transform { } } } + +export default JSONStream; diff --git a/test/rest.spec.ts b/test/rest.spec.ts index e0deb7c..29c19cd 100644 --- a/test/rest.spec.ts +++ b/test/rest.spec.ts @@ -1,4 +1,5 @@ import assert from "assert"; +import { Readable } from "stream"; import nock from "nock"; import fetch from "node-fetch"; @@ -22,6 +23,7 @@ import { ITransactions, IOrder, DefaultAPIVersion, + JSONStream, } from "../"; const client_id = "d0c5340b-6d6c-49d9-b567-48c4bfca13d2"; @@ -32,6 +34,18 @@ const url = "https://some-other-api.exante.eu/"; const client = new RestClient({ client_id, shared_key, app_id, url }); +async function* StreamMessages( + messages: Record[] +): AsyncGenerator { + for (const message of messages) { + const data = Buffer.from(JSON.stringify({ ...message })); + const promise = await new Promise((resolve) => { + setTimeout(resolve, 1, data); + }); + yield promise; + } +} + suite("RestClient", () => { test("constructor", () => { assert.deepStrictEqual(client.url.href, url); @@ -71,6 +85,29 @@ suite("RestClient", () => { assert.deepStrictEqual(data, response); }); + test(".fetchStream() (passes headers)", async () => { + const response = { ok: 1 }; + const reqheaders = { + "Content-Type": "application/json", + Authorization: (value: string) => value.includes("Bearer "), + Accept: "application/x-json-stream", + }; + + nock(url, { reqheaders }) + .get("/") + .delay(1) + .reply(200, () => Readable.from(StreamMessages([response]))); + + const stream = await client.fetchStream(url); + + await new Promise((resolve) => { + stream.on("data", (data) => { + assert.deepStrictEqual(data, response); + resolve(); + }); + }); + }); + test(".getAccounts()", async () => { const version = "3.0"; const response: IUserAccount[] = [ @@ -1683,6 +1720,41 @@ suite("RestClient", () => { assert.deepStrictEqual(token, jwt); }); + test(".fetchStream()", async () => { + const heartbeat = { event: "heartbeat" }; + const pong = { event: "pong" }; + nock(url) + .get("/") + .delay(1) + .reply(200, () => Readable.from(StreamMessages([heartbeat, pong]))); + + const stream = await RestClient.fetchStream(url); + + assert.ok(stream instanceof JSONStream); + + await new Promise((resolve) => { + stream.once("data", (data) => { + assert.deepStrictEqual(data, heartbeat); + stream.once("data", (data) => { + assert.deepStrictEqual(data, pong); + stream.once("end", resolve); + }); + }); + }); + }); + + test(".fetchStream() (throws `FetchError` on non 2xx responses)", async () => { + nock(url).get("/").delay(1).reply(404); + + try { + await RestClient.fetchStream(url); + assert.fail("Should throw a FetchError"); + } catch (error) { + assert.ok(error instanceof FetchError); + assert.ok(error.response instanceof fetch.Response); + } + }); + test(".fetch()", async () => { const response = { ok: 1 }; @@ -1698,7 +1770,7 @@ suite("RestClient", () => { try { await RestClient.fetch(url); - throw new Error("Should throw a FetchError"); + assert.fail("Should throw a FetchError"); } catch (error) { assert.ok(error instanceof FetchError); assert.ok(error.response instanceof fetch.Response); @@ -1710,7 +1782,7 @@ suite("RestClient", () => { try { await RestClient.fetch(url); - throw new Error("Should throw a FetchError"); + assert.fail("Should throw a FetchError"); } catch (error) { assert.ok(error instanceof FetchError); assert.ok(error.response instanceof fetch.Response);