Skip to content

Commit

Permalink
feat: add support for response streams (#134)
Browse files Browse the repository at this point in the history
* feat: add support for response streams

* ci: enable AbortController on Node 14

* feat: support streaming request body

fix #25
  • Loading branch information
alexghr authored Apr 10, 2022
1 parent f1925a2 commit abc1732
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 158 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
pull_request:
branches: [main]


jobs:
test:
runs-on: ubuntu-latest
Expand All @@ -22,4 +23,5 @@ jobs:
- run: yarn lint
- run: yarn run test
env:
NODE_OPTIONS: "--experimental-abortcontroller"
CI: true
2 changes: 2 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
- 'next'
- '4.x'


jobs:
test:
runs-on: ubuntu-latest
Expand All @@ -25,6 +26,7 @@ jobs:
- run: yarn run test
env:
CI: true
NODE_OPTIONS: "--experimental-abortcontroller"

release:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"lint": "eslint .",
"prepublish": "$npm_execpath run build",
"pretest": "$npm_execpath run build:test",
"test": "NODE_OPTIONS=--experimental-vm-modules jest",
"test": "NODE_OPTIONS=\"$NODE_OPTIONS --experimental-vm-modules\" jest",
"pretest:watch": "$npm_execpath run build:test",
"test:watch": "concurrently '$npm_execpath run build:test:watch' 'jest --watchAll'",
"release": "semantic-release",
Expand All @@ -52,7 +52,7 @@
"eslint": "^8.4.1",
"got": "^12.0.0",
"jest": "^27.4.5",
"nock": "^13.2.1",
"nock": "^13.2.4",
"semantic-release": "^19.0.2",
"typescript": "^4.5.4"
},
Expand Down
3 changes: 3 additions & 0 deletions src/lib/body-type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { Readable } from "node:stream";

export type Body = Readable | Blob | BufferSource | FormData | URLSearchParams | string;
123 changes: 82 additions & 41 deletions src/lib/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
import { CancelableRequest, Got, Method, OptionsOfUnknownResponseBody, Response } from 'got';
import { Readable } from 'stream';
import { Got, Method, OptionsInit, OptionsOfUnknownResponseBody, Request } from 'got';
import { URL, URLSearchParams } from 'url';
import { format } from 'util';
import { GotFetchResponse } from './response.js';
import { finished, Readable } from "node:stream";
import { once } from "node:events";
import { Body } from "./body-type.js";

type GotFetchRequestInit = Omit<RequestInit, 'body'> & {
body?: Body | null;
};

export type GotFetch = typeof fetch;
export type GotFetch = (
input: string | (GotFetchRequestInit & { url: string }),
init?: GotFetchRequestInit
) => Promise<GotFetchResponse>;

export function createFetch(got: Got): GotFetch {
const globalCache = new Map();

return async (input, opts) => {
const url = new URL(typeof input === 'string' ? input : input.url);
const request: RequestInit = typeof input === 'object' ? input : opts || {};
const request: GotFetchRequestInit = typeof input === 'object' ? input : opts || {};

if (request.mode === 'no-cors' || request.mode === 'same-origin' || request.mode === 'navigate') {
throw new TypeError(format('request.mode not supported: %s', request.mode));
Expand All @@ -39,69 +47,102 @@ export function createFetch(got: Got): GotFetch {
const searchParams = new URLSearchParams(url.searchParams);
url.search = '';

const gotOpts: OptionsOfUnknownResponseBody = {
const { body = "", headers: bodyHeaders } = serializeBody(request.body);

const gotOpts: OptionsInit = {
// url needs to be stringified to support UNIX domain sockets, and
// For more info see https://github.com/alexghr/got-fetch/pull/8
url: url.toString(),
searchParams,
followRedirect: true,
throwHttpErrors: false,
method: (request.method as Method) || 'get',
isStream: false,
method: (request.method as Method) ?? "get",
resolveBodyOnly: false,
// we'll do our own response parsing in `GotFetchResponse`
responseType: undefined
responseType: undefined,
allowGetBody:
["GET", "HEAD"].includes(request.method?.toLowerCase() ?? "") && Boolean(body),
headers: {
...bodyHeaders,
...(request.headers as object),
},
};

const { body, headers: bodyHeaders } = serializeBody(request.body);

// only set the `body` key on the options if a body is sent
// otherwise got crashes
if (body) {
gotOpts.body = body;
}

if (bodyHeaders || request.headers) {
gotOpts.headers = {
...bodyHeaders,
...(request.headers as object)
};
}
console.log(gotOpts);

// there's a bug in got where it crashes if we send both a body and cache
// https://github.com/sindresorhus/got/issues/1021
if ((typeof request.cache === 'undefined' || request.cache === 'default') && !gotOpts.body) {
gotOpts.cache = globalCache;
}

const response = got(gotOpts) as CancelableRequest<Response<string | Buffer>>;
const gotReq = got({ ...gotOpts, isStream: true });

if (request.signal) {
const abortHandler = () => response.cancel()
const abortHandler = () => gotReq.destroy()
request.signal.addEventListener('abort', abortHandler);
response.then(() => request.signal!.removeEventListener('abort', abortHandler))
const cleanup = finished(gotReq, () => {
request.signal!.removeEventListener('abort', abortHandler);
cleanup();
})
}

return response.then(r => {
return new GotFetchResponse(r.body, {
headers: r.headers,
redirected: r.redirectUrls && r.redirectUrls.length > 0,
status: r.statusCode,
statusText: r.statusMessage,
type: "default",
// according to spec this should be the final URL, after all redirects
url:
r.redirectUrls.length > 0
// using Array.prototype.at would've been nice but it's not
// supported by anything below Node 16.8
? r.redirectUrls[r.redirectUrls.length - 1].href
: url.href,
});
try {
// got creates a Duplex stream of the request but it only allows writing
// to it sometimes. It's list of methods which accept a payload is
// incomplete so alwasy try to close the request and swallow any errors
if (body instanceof Readable) {
body.pipe(gotReq);
} else {
gotReq.end(body);
}
} catch {
// noop
// I hate this
}

const raceController = new AbortController();
// wait for the first chunk to arrive so that got gives us back the status
// one of two things could happen: either we get back a body with a
// length > 0, in which case at least one 'data' event is emitted; OR
// we get an empty body (lenght === 0) in which case got will emit 'end'
const [firstChunk] = await Promise.race([
once(gotReq, "data", { signal: raceController.signal }),
once(gotReq, "end", { signal: raceController.signal }),
]);
// cancel whoever lost
raceController.abort();

const response = gotReq.response!;
// put back the chunk we got (if any) or create an empty ReadableStream
const responseBody = firstChunk ? restream(firstChunk, gotReq) : Readable.from([]);

return new GotFetchResponse(responseBody, {
headers: response.headers,
redirected: response.redirectUrls && response.redirectUrls.length > 0,
status: response.statusCode,
statusText: response.statusMessage,
type: "default",
// according to spec this should be the final URL, after all redirects
url:
response.redirectUrls.length > 0
// using Array.prototype.at would've been nice but it's not
// supported by anything below Node 16.8
? response.redirectUrls[response.redirectUrls.length - 1].href
: url.href,
});
}
}

function serializeBody(body: BodyInit | null | undefined): Pick<OptionsOfUnknownResponseBody, 'body' | 'headers'> {
async function* restream(firstChunk: any, req: Request): any {
yield firstChunk;

for await (const chunk of req) {
yield chunk;
}
}

function serializeBody(body: Body | null | undefined): Pick<OptionsOfUnknownResponseBody, 'body' | 'headers'> {
if (!body) {
return {};
} else if (body instanceof URLSearchParams) {
Expand Down
24 changes: 18 additions & 6 deletions src/lib/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
*/

import { IncomingHttpHeaders } from 'http2';
import {Readable, Stream} from 'stream';
import { format } from 'util';
import {Body} from './body-type.js';

import { GotHeaders } from './headers.js';

Expand All @@ -26,10 +28,15 @@ export class GotFetchResponse implements Response {
readonly url: string;
readonly type: ResponseType;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
/**
* This is a Node Readable stream
*/
readonly body: any | null;

#bodyUsed = false;

constructor(
body: BodyInit | null,
body: Body | null,
init?: ResponseInit | null
) {
if (init && typeof init.status === 'number' && (init.status < 200 || init.status > 599)) {
Expand All @@ -49,9 +56,7 @@ export class GotFetchResponse implements Response {
}

get bodyUsed(): boolean {
// if it's a string or a Buffer then we've already read the full body in memory
// and the stream this body came from is "disturbed"
return typeof this.body === 'string' || Buffer.isBuffer(this.body);
return this.#bodyUsed
}

get ok(): boolean {
Expand Down Expand Up @@ -79,7 +84,9 @@ export class GotFetchResponse implements Response {
return this.text().then(JSON.parse);
}

text(): Promise<string> {
async text(): Promise<string> {
this.#bodyUsed = true;

if (this.body === null) {
return Promise.resolve('');
}
Expand All @@ -89,7 +96,12 @@ export class GotFetchResponse implements Response {
} else if (Buffer.isBuffer(this.body)) {
return Promise.resolve(this.body.toString('utf8'));
} else {
return Promise.reject(new TypeError('Unsupported body type'));
let body = '';
for await (const chunk of this.body) {
body += chunk;
}

return body;
}
}

Expand Down
36 changes: 29 additions & 7 deletions src/test/fetch.request.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import got from 'got';
import {Readable} from 'stream';
import { URLSearchParams } from 'url';

import { createFetch } from '../lib/fetch.js';
Expand All @@ -12,7 +13,7 @@ describe('fetch request', () => {
});

describe('method', () => {
it.each(['get', 'post', 'put', 'delete', 'options'])('%s', async (method) => {
it.each(['get', 'post', 'put', 'patch', 'delete', 'options', 'trace', 'head'])('%s', async (method) => {
expect.assertions(1);
interceptor.intercept('/', method).reply(200);

Expand Down Expand Up @@ -104,16 +105,37 @@ describe('fetch request', () => {
});

describe('body', () => {
const tests: ReadonlyArray<[string, string | URLSearchParams | Buffer, RegExp]> = [
const tests: ReadonlyArray<
[string, string | URLSearchParams | Buffer | Readable, string, RegExp]
> = [
// test name, body, expected content type
['string', 'foo', /^text\/plain/],
['querystring', new URLSearchParams({ foo: 'foo' }), /^application\/x-www-form-urlencoded/],
['buffer', Buffer.from('foo', 'utf-8'), /^application\/octet-stream/]
["string", "foo", "foo", /^text\/plain/],
[
"querystring",
new URLSearchParams({ foo: "foo" }),
"foo=foo",
/^application\/x-www-form-urlencoded/,
],
[
"buffer",
Buffer.from("foo", "utf-8"),
"foo",
/^application\/octet-stream/,
],
[
"stream",
Readable.from(["foo", "bar", "baz"]),
"foobarbaz",
/^application\/octet-stream/,
],
];

it.each(tests)('sends %s body', async (_, body) => {
it.only.each(tests)('sends %s body', async (_, body, bodyMatch, contentType) => {
expect.assertions(1);
interceptor.intercept('/', 'post', String(body)).reply(200);
interceptor
.intercept("/", "post", bodyMatch)
.matchHeader("content-type", contentType)
.reply(200);

const fetch = createFetch(got);
await assert200(fetch(url('/'), {
Expand Down
Loading

0 comments on commit abc1732

Please sign in to comment.