Skip to content

Commit

Permalink
feat: add the fetchStream method
Browse files Browse the repository at this point in the history
  • Loading branch information
vansergen committed Sep 20, 2020
1 parent 95b8978 commit aa641ad
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 2 deletions.
39 changes: 39 additions & 0 deletions src/rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { createHmac } from "crypto";
import fetch from "node-fetch";

import FetchError from "./error";
import JSONStream from "./stream";

export const ExanteDemoURL = "https://api-demo.exante.eu/";
export const ExanteLiveURL = "https://api-live.exante.eu/";
Expand Down Expand Up @@ -1308,6 +1309,21 @@ export class RestClient {
return order;
}

/**
* Make a request and return JSONStream
*/
public async fetchStream(
url: string | URL,
options: fetch.RequestInit = {}
): Promise<JSONStream> {
const headers = new fetch.Headers(options.headers || this.headers);
headers.set("Accept", "application/x-json-stream");

const stream = RestClient.fetchStream(url, { ...options, headers });

return stream;
}

/**
* Get a JSON Web Token
*/
Expand Down Expand Up @@ -1398,6 +1414,29 @@ export class RestClient {
return jwt;
}

/**
* Make a request and return a stream
*/
public static async fetchStream(
url: string | URL,
options: fetch.RequestInit = {}
): Promise<JSONStream> {
const response = await fetch(url.toString(), { ...options });

if (!response.body) {
/* istanbul ignore next */
throw new FetchError("Empty body", response);
} else if (!response.ok) {
throw new FetchError(response.statusText, response);
}

const stream = new JSONStream();

response.body.pipe(stream);

return stream;
}

/**
* Make a request and parse the body as JSON
*/
Expand Down
2 changes: 2 additions & 0 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ export class JSONStream extends Transform {
}
}
}

export default JSONStream;
76 changes: 74 additions & 2 deletions test/rest.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import assert from "assert";
import { Readable } from "stream";
import nock from "nock";
import fetch from "node-fetch";

Expand All @@ -22,6 +23,7 @@ import {
ITransactions,
IOrder,
DefaultAPIVersion,
JSONStream,
} from "../";

const client_id = "d0c5340b-6d6c-49d9-b567-48c4bfca13d2";
Expand All @@ -32,6 +34,18 @@ const url = "https://some-other-api.exante.eu/";

const client = new RestClient({ client_id, shared_key, app_id, url });

async function* StreamMessages(
messages: Record<string, unknown>[]
): AsyncGenerator<Buffer> {
for (const message of messages) {
const data = Buffer.from(JSON.stringify({ ...message }));
const promise = await new Promise<Buffer>((resolve) => {
setTimeout(resolve, 1, data);
});
yield promise;
}
}

suite("RestClient", () => {
test("constructor", () => {
assert.deepStrictEqual(client.url.href, url);
Expand Down Expand Up @@ -71,6 +85,29 @@ suite("RestClient", () => {
assert.deepStrictEqual(data, response);
});

test(".fetchStream() (passes headers)", async () => {
const response = { ok: 1 };
const reqheaders = {
"Content-Type": "application/json",
Authorization: (value: string) => value.includes("Bearer "),
Accept: "application/x-json-stream",
};

nock(url, { reqheaders })
.get("/")
.delay(1)
.reply(200, () => Readable.from(StreamMessages([response])));

const stream = await client.fetchStream(url);

await new Promise((resolve) => {
stream.on("data", (data) => {
assert.deepStrictEqual(data, response);
resolve();
});
});
});

test(".getAccounts()", async () => {
const version = "3.0";
const response: IUserAccount[] = [
Expand Down Expand Up @@ -1683,6 +1720,41 @@ suite("RestClient", () => {
assert.deepStrictEqual(token, jwt);
});

test(".fetchStream()", async () => {
const heartbeat = { event: "heartbeat" };
const pong = { event: "pong" };
nock(url)
.get("/")
.delay(1)
.reply(200, () => Readable.from(StreamMessages([heartbeat, pong])));

const stream = await RestClient.fetchStream(url);

assert.ok(stream instanceof JSONStream);

await new Promise((resolve) => {
stream.once("data", (data) => {
assert.deepStrictEqual(data, heartbeat);
stream.once("data", (data) => {
assert.deepStrictEqual(data, pong);
stream.once("end", resolve);
});
});
});
});

test(".fetchStream() (throws `FetchError` on non 2xx responses)", async () => {
nock(url).get("/").delay(1).reply(404);

try {
await RestClient.fetchStream(url);
assert.fail("Should throw a FetchError");
} catch (error) {
assert.ok(error instanceof FetchError);
assert.ok(error.response instanceof fetch.Response);
}
});

test(".fetch()", async () => {
const response = { ok: 1 };

Expand All @@ -1698,7 +1770,7 @@ suite("RestClient", () => {

try {
await RestClient.fetch(url);
throw new Error("Should throw a FetchError");
assert.fail("Should throw a FetchError");
} catch (error) {
assert.ok(error instanceof FetchError);
assert.ok(error.response instanceof fetch.Response);
Expand All @@ -1710,7 +1782,7 @@ suite("RestClient", () => {

try {
await RestClient.fetch(url);
throw new Error("Should throw a FetchError");
assert.fail("Should throw a FetchError");
} catch (error) {
assert.ok(error instanceof FetchError);
assert.ok(error.response instanceof fetch.Response);
Expand Down

0 comments on commit aa641ad

Please sign in to comment.