Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from the Node to the Web ReadableStream #8410

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/itchy-boxes-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'firebase': minor
dlarocque marked this conversation as resolved.
Show resolved Hide resolved
'@firebase/storage': minor
---

Migrate from the Node to Web ReadableStream interface
2 changes: 1 addition & 1 deletion common/api-review/storage.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export function getMetadata(ref: StorageReference): Promise<FullMetadata>;
export function getStorage(app?: FirebaseApp, bucketUrl?: string): FirebaseStorage;

// @public
export function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): NodeJS.ReadableStream;
export function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): ReadableStream;

// @internal (undocumented)
export function _invalidArgument(message: string): StorageError;
Expand Down
4 changes: 2 additions & 2 deletions docs-devsite/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ This API is only available in Node.
<b>Signature:</b>

```typescript
export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): NodeJS.ReadableStream;
export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): ReadableStream;
```

#### Parameters
Expand All @@ -291,7 +291,7 @@ export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?:

<b>Returns:</b>

NodeJS.ReadableStream
ReadableStream

A stream with the object's data as bytes

Expand Down
2 changes: 1 addition & 1 deletion packages/storage/src/api.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ export function getBlob(
export function getStream(
ref: StorageReference,
maxDownloadSizeBytes?: number
): NodeJS.ReadableStream {
): ReadableStream {
throw new Error('getStream() is only supported by NodeJS builds');
}
2 changes: 1 addition & 1 deletion packages/storage/src/api.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export function getBlob(
export function getStream(
ref: StorageReference,
maxDownloadSizeBytes?: number
): NodeJS.ReadableStream {
): ReadableStream {
ref = getModularInstance(ref);
return getStreamInternal(ref as Reference, maxDownloadSizeBytes);
}
3 changes: 1 addition & 2 deletions packages/storage/src/implementation/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Network headers */
export type Headers = Record<string, string>;

Expand All @@ -23,7 +22,7 @@ export type ConnectionType =
| string
| ArrayBuffer
| Blob
| NodeJS.ReadableStream;
| ReadableStream<Uint8Array>;

/**
* A lightweight wrapper around XMLHttpRequest with a
Expand Down
2 changes: 1 addition & 1 deletion packages/storage/src/platform/browser/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export function newBlobConnection(): Connection<Blob> {
return new XhrBlobConnection();
}

export function newStreamConnection(): Connection<NodeJS.ReadableStream> {
export function newStreamConnection(): Connection<ReadableStream> {
throw new Error('Streams are only supported on Node');
}

Expand Down
2 changes: 1 addition & 1 deletion packages/storage/src/platform/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export function newBlobConnection(): Connection<Blob> {
return nodeNewBlobConnection();
}

export function newStreamConnection(): Connection<NodeJS.ReadableStream> {
export function newStreamConnection(): Connection<ReadableStream<Uint8Array>> {
// This file is only used in Node.js tests using ts-node.
return nodeNewStreamConnection();
}
20 changes: 11 additions & 9 deletions packages/storage/src/platform/node/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class FetchConnection<T extends ConnectionType>
async send(
url: string,
method: string,
body?: ArrayBufferView | Blob | string,
body?: NodeJS.ArrayBufferView | Blob | string,
headers?: Record<string, string>
): Promise<void> {
if (this.sent_) {
Expand All @@ -62,7 +62,7 @@ abstract class FetchConnection<T extends ConnectionType>
const response = await this.fetch_(url, {
method,
headers: headers || {},
body: body as ArrayBufferView | string
body: body as NodeJS.ArrayBufferView | string
});
this.headers_ = response.headers;
this.statusCode_ = response.status;
Expand Down Expand Up @@ -146,13 +146,15 @@ export function newBytesConnection(): Connection<ArrayBuffer> {
return new FetchBytesConnection();
}

export class FetchStreamConnection extends FetchConnection<NodeJS.ReadableStream> {
private stream_: NodeJS.ReadableStream | null = null;
export class FetchStreamConnection extends FetchConnection<
ReadableStream<Uint8Array>
> {
private stream_: ReadableStream<Uint8Array> | null = null;

async send(
url: string,
method: string,
body?: ArrayBufferView | Blob | string,
body?: NodeJS.ArrayBufferView | Blob | string,
headers?: Record<string, string>
): Promise<void> {
if (this.sent_) {
Expand All @@ -164,12 +166,12 @@ export class FetchStreamConnection extends FetchConnection<NodeJS.ReadableStream
const response = await this.fetch_(url, {
method,
headers: headers || {},
body: body as ArrayBufferView | string
body: body as NodeJS.ArrayBufferView | string
});
this.headers_ = response.headers;
this.statusCode_ = response.status;
this.errorCode_ = ErrorCode.NO_ERROR;
this.stream_ = response.body;
this.stream_ = response.body as ReadableStream<Uint8Array>;
} catch (e) {
this.errorText_ = (e as Error)?.message;
// emulate XHR which sets status to 0 when encountering a network error
Expand All @@ -178,15 +180,15 @@ export class FetchStreamConnection extends FetchConnection<NodeJS.ReadableStream
}
}

getResponse(): NodeJS.ReadableStream {
getResponse(): ReadableStream {
if (!this.stream_) {
throw internalError('cannot .getResponse() before sending');
}
return this.stream_;
}
}

export function newStreamConnection(): Connection<NodeJS.ReadableStream> {
export function newStreamConnection(): Connection<ReadableStream<Uint8Array>> {
return new FetchStreamConnection();
}

Expand Down
33 changes: 16 additions & 17 deletions packages/storage/src/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
* @fileoverview Defines the Firebase StorageReference class.
*/

import { PassThrough, Transform, TransformOptions } from 'stream';

import { FbsBlob } from './implementation/blob';
import { Location } from './implementation/location';
import { getMappings } from './implementation/metadata';
Expand Down Expand Up @@ -48,6 +46,7 @@ import {
newStreamConnection,
newTextConnection
} from './platform/connection';
import { RequestInfo } from './implementation/requestinfo';

/**
* Provides methods to interact with a bucket in the Firebase Storage service.
Expand Down Expand Up @@ -203,42 +202,42 @@ export function getBlobInternal(
export function getStreamInternal(
ref: Reference,
maxDownloadSizeBytes?: number
): NodeJS.ReadableStream {
): ReadableStream {
ref._throwIfRoot('getStream');
const requestInfo = getBytes(
const requestInfo: RequestInfo<ReadableStream, ReadableStream> = getBytes(
ref.storage,
ref._location,
maxDownloadSizeBytes
);

/** A transformer that passes through the first n bytes. */
const newMaxSizeTransform: (n: number) => TransformOptions = n => {
// Transforms the stream so that only `maxDownloadSizeBytes` bytes are piped to the result
const newMaxSizeTransform = (n: number): Transformer => {
let missingBytes = n;
return {
transform(chunk, encoding, callback) {
transform(chunk, controller: TransformStreamDefaultController) {
// GCS may not honor the Range header for small files
if (chunk.length < missingBytes) {
this.push(chunk);
controller.enqueue(chunk);
missingBytes -= chunk.length;
} else {
this.push(chunk.slice(0, missingBytes));
this.emit('end');
controller.enqueue(chunk.slice(0, missingBytes));
controller.terminate();
}
callback();
}
} as TransformOptions;
};
};

const result =
maxDownloadSizeBytes !== undefined
? new Transform(newMaxSizeTransform(maxDownloadSizeBytes))
: new PassThrough();
? new TransformStream(newMaxSizeTransform(maxDownloadSizeBytes))
: new TransformStream(); // The default transformer forwards all chunks to its readable side

ref.storage
.makeRequestWithTokens(requestInfo, newStreamConnection)
.then(stream => (stream as NodeJS.ReadableStream).pipe(result))
.catch(e => result.destroy(e));
return result;
.then(readableStream => readableStream.pipeThrough(result))
.catch(err => result.writable.abort(err));

return result.readable;
}

/**
Expand Down
37 changes: 25 additions & 12 deletions packages/storage/test/node/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,27 @@ import { FirebaseApp, deleteApp } from '@firebase/app';
import { getStream, ref, uploadBytes } from '../../src/index.node';
import * as types from '../../src/public-types';

async function readData(reader: NodeJS.ReadableStream): Promise<number[]> {
return new Promise<number[]>((resolve, reject) => {
const data: number[] = [];
reader.on('error', e => reject(e));
reader.on('data', chunk => data.push(...Array.from(chunk as Buffer)));
reader.on('end', () => resolve(data));
// See: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/getReader
async function readData(readableStream: ReadableStream): Promise<Uint8Array> {
return new Promise<Uint8Array>((resolve, reject) => {
const reader: ReadableStreamDefaultReader = readableStream.getReader();
const result: any[] = [];
reader
.read()
.then(function processBytes({ done, value }): any {
if (done) {
resolve(new Uint8Array(result));
return;
}

result.push(...value);
return reader.read().then(processBytes);
})
.catch(err => {
console.error(err);
reject(err);
return;
});
});
}

Expand All @@ -46,19 +61,17 @@ describe('Firebase Storage > getStream', () => {
it('can get stream', async () => {
const reference = ref(storage, 'public/exp-bytes');
await uploadBytes(reference, new Uint8Array([0, 1, 3, 128, 255]));
const stream = await getStream(reference);
const stream = getStream(reference);
const data = await readData(stream);
expect(new Uint8Array(data)).to.deep.equal(
new Uint8Array([0, 1, 3, 128, 255])
);
expect(data).to.deep.equal(new Uint8Array([0, 1, 3, 128, 255]));
});

it('can get first n bytes of stream', async () => {
const reference = ref(storage, 'public/exp-bytes');
await uploadBytes(reference, new Uint8Array([0, 1, 3]));
const stream = await getStream(reference, 2);
const stream = getStream(reference, 2);
const data = await readData(stream);
expect(new Uint8Array(data)).to.deep.equal(new Uint8Array([0, 1]));
expect(data).to.deep.equal(new Uint8Array([0, 1]));
});

it('getStream() throws for missing file', async () => {
Expand Down
Loading