From 48eca40fb0b1f9aa8b611f1ea1c00aa4231929bd Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 25 Oct 2024 11:29:13 -0700 Subject: [PATCH 01/30] Support streaming streaming responses for callable functions. --- common/api-review/functions.api.md | 17 ++- packages/functions-types/index.d.ts | 9 ++ packages/functions/package.json | 2 +- packages/functions/src/api.ts | 13 +- packages/functions/src/callable.test.ts | 170 +++++++++++++++++++++++- packages/functions/src/public-types.ts | 16 ++- packages/functions/src/service.ts | 156 ++++++++++++++++++++-- 7 files changed, 354 insertions(+), 29 deletions(-) diff --git a/common/api-review/functions.api.md b/common/api-review/functions.api.md index 90c26814746..ad1458c9a7d 100644 --- a/common/api-review/functions.api.md +++ b/common/api-review/functions.api.md @@ -33,13 +33,16 @@ export type FunctionsErrorCodeCore = 'ok' | 'cancelled' | 'unknown' | 'invalid-a export function getFunctions(app?: FirebaseApp, regionOrCustomDomain?: string): Functions; // @public -export type HttpsCallable = (data?: RequestData | null) => Promise>; +export type HttpsCallable = { + (data?: RequestData | null): Promise>; + stream: (data?: RequestData | null) => Promise>; +}; // @public -export function httpsCallable(functionsInstance: Functions, name: string, options?: HttpsCallableOptions): HttpsCallable; +export function httpsCallable(functionsInstance: Functions, name: string, options?: HttpsCallableOptions): HttpsCallable; // @public -export function httpsCallableFromURL(functionsInstance: Functions, url: string, options?: HttpsCallableOptions): HttpsCallable; +export function httpsCallableFromURL(functionsInstance: Functions, url: string, options?: HttpsCallableOptions): HttpsCallable; // @public export interface HttpsCallableOptions { @@ -51,5 +54,13 @@ export interface HttpsCallableResult { readonly data: ResponseData; } +// @public +export interface HttpsCallableStreamResult { + // (undocumented) + readonly data: Promise; + // (undocumented) + readonly stream: AsyncIterable; +} + ``` diff --git a/packages/functions-types/index.d.ts b/packages/functions-types/index.d.ts index 4d1770377d3..0c5499a1154 100644 --- a/packages/functions-types/index.d.ts +++ b/packages/functions-types/index.d.ts @@ -22,12 +22,21 @@ export interface HttpsCallableResult { readonly data: any; } +/** + * An HttpsCallableStreamResult wraps a single streaming result from a function call. + */ +export interface HttpsCallableStreamResult { + readonly data: Promise; + readonly stream: AsyncIterable; +} + /** * An HttpsCallable is a reference to a "callable" http trigger in * Google Cloud Functions. */ export interface HttpsCallable { (data?: {} | null): Promise; + stream(data?: {} | null): Promise; } /** diff --git a/packages/functions/package.json b/packages/functions/package.json index ec0c0cd2f25..dd2bbc6aeca 100644 --- a/packages/functions/package.json +++ b/packages/functions/package.json @@ -35,7 +35,7 @@ "test:browser": "karma start --single-run", "test:browser:debug": "karma start --browsers=Chrome --auto-watch", "test:node": "TS_NODE_COMPILER_OPTIONS='{\"module\":\"commonjs\"}' nyc --reporter lcovonly -- mocha 'src/{,!(browser)/**/}*.test.ts' --file src/index.node.ts --config ../../config/mocharc.node.js", - "test:emulator": "env FIREBASE_FUNCTIONS_EMULATOR_ORIGIN=http://localhost:5005 run-p test:node", + "test:emulator": "env FIREBASE_FUNCTIONS_EMULATOR_ORIGIN=http://127.0.0.1:5005 run-p test:node", "api-report": "api-extractor run --local --verbose", "doc": "api-documenter markdown --input temp --output docs", "build:doc": "yarn build && yarn doc", diff --git a/packages/functions/src/api.ts b/packages/functions/src/api.ts index f6b5066b9a8..cabfcf5b929 100644 --- a/packages/functions/src/api.ts +++ b/packages/functions/src/api.ts @@ -86,12 +86,12 @@ export function connectFunctionsEmulator( * @param name - The name of the trigger. * @public */ -export function httpsCallable( +export function httpsCallable( functionsInstance: Functions, name: string, options?: HttpsCallableOptions -): HttpsCallable { - return _httpsCallable( +): HttpsCallable { + return _httpsCallable( getModularInstance(functionsInstance as FunctionsService), name, options @@ -105,13 +105,14 @@ export function httpsCallable( */ export function httpsCallableFromURL< RequestData = unknown, - ResponseData = unknown + ResponseData = unknown, + StreamData = unknown, >( functionsInstance: Functions, url: string, options?: HttpsCallableOptions -): HttpsCallable { - return _httpsCallableFromURL( +): HttpsCallable { + return _httpsCallableFromURL( getModularInstance(functionsInstance as FunctionsService), url, options diff --git a/packages/functions/src/callable.test.ts b/packages/functions/src/callable.test.ts index 1d8ae0cdb31..6b7c687b4a7 100644 --- a/packages/functions/src/callable.test.ts +++ b/packages/functions/src/callable.test.ts @@ -92,13 +92,19 @@ describe('Firebase Functions > Call', () => { Record, { message: string; code: number; long: number } >(functions, 'dataTest'); - const result = await func(data); + try { + + const result = await func(data); + + expect(result.data).to.deep.equal({ + message: 'stub response', + code: 42, + long: 420 + }); + } catch (err) { + console.error(err) + } - expect(result.data).to.deep.equal({ - message: 'stub response', - code: 42, - long: 420 - }); }); it('scalars', async () => { @@ -226,3 +232,155 @@ describe('Firebase Functions > Call', () => { await expectError(func(), 'deadline-exceeded', 'deadline-exceeded'); }); }); + +describe('Firebase Functions > Stream', () => { + let app: FirebaseApp; + const region = 'us-central1'; + + before(() => { + const useEmulator = !!process.env.FIREBASE_FUNCTIONS_EMULATOR_ORIGIN; + const projectId = useEmulator + ? 'functions-integration-test' + : TEST_PROJECT.projectId; + const messagingSenderId = 'messaging-sender-id'; + app = makeFakeApp({ projectId, messagingSenderId }); + }); + + it('successfully streams data and resolves final result', async () => { + const functions = createTestService(app, region); + const mockFetch = sinon.stub(functions, 'fetchImpl' as any); + + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('data: {"message":"Hello"}\n')); + controller.enqueue(new TextEncoder().encode('data: {"message":"World"}\n')); + controller.enqueue(new TextEncoder().encode('data: {"result":"Final Result"}\n')); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK', + } as Response); + + const func = httpsCallable, string, string>(functions, 'streamTest'); + const streamResult = await func.stream({}); + + const messages: string[] = []; + for await (const message of streamResult.stream) { + messages.push(message); + } + + expect(messages).to.deep.equal(['Hello', 'World']); + expect(await streamResult.data).to.equal('Final Result'); + + mockFetch.restore(); + }); + + it('handles network errors', async () => { + const functions = createTestService(app, region); + const mockFetch = sinon.stub(functions, 'fetchImpl' as any); + + mockFetch.rejects(new Error('Network error')); + + const func = httpsCallable, string, string>(functions, 'errTest'); + const streamResult = await func.stream({}); + + let errorThrown = false; + try { + for await (const _ of streamResult.stream) { + // This should not execute + } + } catch (error: unknown) { + errorThrown = true; + expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/internal`); + } + + expect(errorThrown).to.be.true; + expect(streamResult.data).to.be.a('promise'); + + mockFetch.restore(); + }); + + it('handles server-side errors', async () => { + const functions = createTestService(app, region); + const mockFetch = sinon.stub(functions, 'fetchImpl' as any); + + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('data: {"error":{"status":"INVALID_ARGUMENT","message":"Invalid input"}}\n')); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK', + } as Response); + + const func = httpsCallable, string, string>(functions, 'errTest'); + const streamResult = await func.stream({}); + + let errorThrown = false; + try { + for await (const _ of streamResult.stream) { + // This should not execute + } + } catch (error) { + errorThrown = true; + expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/invalid-argument`); + expect((error as FunctionsError).message).to.equal('Invalid input'); + } + + expect(errorThrown).to.be.true; + expectError(streamResult.data, "invalid-argument", "Invalid input") + + mockFetch.restore(); + }); + + it('includes authentication and app check tokens in request headers', async () => { + const authMock: FirebaseAuthInternal = { + getToken: async () => ({ accessToken: 'auth-token' }) + } as unknown as FirebaseAuthInternal; + const authProvider = new Provider( + 'auth-internal', + new ComponentContainer('test') + ); + authProvider.setComponent( + new Component('auth-internal', () => authMock, ComponentType.PRIVATE) + ); + + const functions = createTestService(app, region, authProvider); + const mockFetch = sinon.stub(functions, 'fetchImpl' as any); + + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('data: {"result":"Success"}\n')); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK', + } as Response); + + const func = httpsCallable, string, string>(functions, 'errTest'); + await func.stream({}); + + expect(mockFetch.calledOnce).to.be.true; + const [_, options] = mockFetch.firstCall.args; + expect(options.headers['Authorization']).to.equal('Bearer auth-token'); + expect(options.headers['Content-Type']).to.equal('application/json'); + expect(options.headers['Accept']).to.equal('text/event-stream'); + + mockFetch.restore(); + }); +}); diff --git a/packages/functions/src/public-types.ts b/packages/functions/src/public-types.ts index f1c5d23ccba..2fd668adab6 100644 --- a/packages/functions/src/public-types.ts +++ b/packages/functions/src/public-types.ts @@ -28,14 +28,24 @@ export interface HttpsCallableResult { readonly data: ResponseData; } +/** + * An `HttpsCallableStreamResult` wraps a single streaming result from a function call. + * @public + */ +export interface HttpsCallableStreamResult { + readonly data: Promise; + readonly stream: AsyncIterable; +} + /** * A reference to a "callable" HTTP trigger in Google Cloud Functions. * @param data - Data to be passed to callable function. * @public */ -export type HttpsCallable = ( - data?: RequestData | null -) => Promise>; +export type HttpsCallable = { + (data?: RequestData | null): Promise>; + stream: (data?: RequestData | null) => Promise>; +}; /** * An interface for metadata about how calls should be executed. diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 954954bd3fd..4137d08a2d7 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -19,6 +19,7 @@ import { FirebaseApp, _FirebaseService } from '@firebase/app'; import { HttpsCallable, HttpsCallableResult, + HttpsCallableStreamResult, HttpsCallableOptions } from './public-types'; import { _errorForResponse, FunctionsError } from './error'; @@ -176,14 +177,20 @@ export function connectFunctionsEmulator( * @param name - The name of the trigger. * @public */ -export function httpsCallable( +export function httpsCallable( functionsInstance: FunctionsService, name: string, options?: HttpsCallableOptions -): HttpsCallable { - return (data => { +): HttpsCallable { + const callable = (data?: RequestData | null) => { return call(functionsInstance, name, data, options || {}); - }) as HttpsCallable; + }; + + callable.stream = (data?: RequestData | null) => { + return stream(functionsInstance, name, data); + }; + + return callable as HttpsCallable; } /** @@ -191,14 +198,19 @@ export function httpsCallable( * @param url - The url of the trigger. * @public */ -export function httpsCallableFromURL( +export function httpsCallableFromURL( functionsInstance: FunctionsService, url: string, options?: HttpsCallableOptions -): HttpsCallable { - return (data => { +): HttpsCallable { + const callable = (data?: RequestData | null) => { return callAtURL(functionsInstance, url, data, options || {}); - }) as HttpsCallable; + }; + + callable.stream = (data?: RequestData | null) => { + return streamAtURL(functionsInstance, url, options || {}); + }; + return callable as HttpsCallable; } /** @@ -248,7 +260,7 @@ async function postJSON( /** * Calls a callable function asynchronously and returns the result. * @param name The name of the callable trigger. - * @param data The data to pass as params to the function.s + * @param data The data to pass as params to the function. */ function call( functionsInstance: FunctionsService, @@ -263,7 +275,7 @@ function call( /** * Calls a callable function asynchronously and returns the result. * @param url The url of the callable trigger. - * @param data The data to pass as params to the function.s + * @param data The data to pass as params to the function. */ async function callAtURL( functionsInstance: FunctionsService, @@ -271,6 +283,7 @@ async function callAtURL( data: unknown, options: HttpsCallableOptions ): Promise { + console.log(url); // Encode any special types, such as dates, in the input data. data = encode(data); const body = { data }; @@ -335,3 +348,126 @@ async function callAtURL( return { data: decodedData }; } + +/** + * Calls a callable function asynchronously and returns a streaming result. + * @param name The name of the callable trigger. + * @param data The data to pass as params to the function. + */ +function stream( + functionsInstance: FunctionsService, + name: string, + data: unknown, +): Promise { + const url = functionsInstance._url(name); + return streamAtURL(functionsInstance, url, data); +} + +/** + * Calls a callable function asynchronously and return a streaming result. + * @param url The url of the callable trigger. + * @param data The data to pass as params to the function. + */ +async function streamAtURL( + functionsInstance: FunctionsService, + url: string, + data: unknown, +): Promise { + // Encode any special types, such as dates, in the input data. + data = encode(data); + const body = { data }; + + // Add a header for the authToken. + const headers: { [key: string]: string } = {}; + const context = await functionsInstance.contextProvider.getContext(); + if (context.authToken) { + headers['Authorization'] = 'Bearer ' + context.authToken; + } + if (context.messagingToken) { + headers['Firebase-Instance-ID-Token'] = context.messagingToken; + } + if (context.appCheckToken !== null) { + headers['X-Firebase-AppCheck'] = context.appCheckToken; + } + + headers['Content-Type'] = 'application/json'; + headers['Accept'] = 'text/event-stream'; + + let response: Response; + try { + response = await functionsInstance.fetchImpl(url, { + method: 'POST', + body: JSON.stringify(body), + headers + }); + } catch (e) { + // This could be an unhandled error on the backend, or it could be a + // network error. There's no way to know, since an unhandled error on the + // backend will fail to set the proper CORS header, and thus will be + // treated as a network error by fetch. + const error = _errorForResponse(0, null) + return { + data: Promise.reject(error), + // Return an empty async iterator + stream: { + [Symbol.asyncIterator]() { + return { + next() { + return Promise.reject(error); + } + }; + } + } + }; + } + + const reader = response.body!.getReader(); + const decoder = new TextDecoder(); + + let buffer = ''; + let resultResolver: (value: unknown) => void; + let resultRejecter: (reason: any) => void; + + const resultPromise = new Promise((resolve, reject) => { + resultResolver = resolve; + resultRejecter = reject; + }); + + const stream = { + [Symbol.asyncIterator]() { + return { + async next() { + while (true) { + const { value, done } = await reader.read(); + if (done) return { done: true, value: undefined }; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (line.startsWith('data: ')) { + const jsonData = JSON.parse(line.slice(6)); + if ('result' in jsonData) { + resultResolver(decode(jsonData.result)); + return { done: true, value: undefined }; + } else if ('message' in jsonData) { + return { done: false, value: decode(jsonData.message) }; + } else if ('error' in jsonData) { + const error = _errorForResponse(0, jsonData); + resultRejecter(error) + throw error; + } + } + } + } + } + }; + } + }; + + return { + stream, + data: resultPromise, + } +} From d28863c6196c5634bc4961cf96769cb4e2ce85f5 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 25 Oct 2024 16:00:22 -0700 Subject: [PATCH 02/30] Support AbortSignal --- packages/functions/src/callable.test.ts | 8 ++-- packages/functions/src/public-types.ts | 15 +++++++ packages/functions/src/service.ts | 59 +++++++++++++++++++++---- 3 files changed, 70 insertions(+), 12 deletions(-) diff --git a/packages/functions/src/callable.test.ts b/packages/functions/src/callable.test.ts index 4e890c0fefc..bff2ba0529e 100644 --- a/packages/functions/src/callable.test.ts +++ b/packages/functions/src/callable.test.ts @@ -321,7 +321,7 @@ describe('Firebase Functions > Stream', () => { it('successfully streams data and resolves final result', async () => { const functions = createTestService(app, region); - const mockFetch = sinon.stub(functions, 'fetchImpl' as any); + const mockFetch = sinon.stub(globalThis, 'fetch' as any); const mockResponse = new ReadableStream({ start(controller) { @@ -355,7 +355,7 @@ describe('Firebase Functions > Stream', () => { it('handles network errors', async () => { const functions = createTestService(app, region); - const mockFetch = sinon.stub(functions, 'fetchImpl' as any); + const mockFetch = sinon.stub(globalThis, 'fetch' as any); mockFetch.rejects(new Error('Network error')); @@ -380,7 +380,7 @@ describe('Firebase Functions > Stream', () => { it('handles server-side errors', async () => { const functions = createTestService(app, region); - const mockFetch = sinon.stub(functions, 'fetchImpl' as any); + const mockFetch = sinon.stub(globalThis, 'fetch' as any); const mockResponse = new ReadableStream({ start(controller) { @@ -429,7 +429,7 @@ describe('Firebase Functions > Stream', () => { ); const functions = createTestService(app, region, authProvider); - const mockFetch = sinon.stub(functions, 'fetchImpl' as any); + const mockFetch = sinon.stub(globalThis, 'fetch' as any); const mockResponse = new ReadableStream({ start(controller) { diff --git a/packages/functions/src/public-types.ts b/packages/functions/src/public-types.ts index f47e0018a6b..76cfb103257 100644 --- a/packages/functions/src/public-types.ts +++ b/packages/functions/src/public-types.ts @@ -64,6 +64,21 @@ export interface HttpsCallableOptions { limitedUseAppCheckTokens?: boolean; } + +/** + * An interface for metadata about how stream call should be executed. + * @public + */ +export interface HttpsCallableStreamOptions { + /** + * An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, + * both the underlying connection and stream will be terminated. + */ + signal?: AbortSignal +} + + + /** * A `Functions` instance. * @public diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index a1a9e714fa3..3afbe7227a6 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -20,7 +20,8 @@ import { HttpsCallable, HttpsCallableResult, HttpsCallableStreamResult, - HttpsCallableOptions + HttpsCallableOptions, + HttpsCallableStreamOptions } from './public-types'; import { _errorForResponse, FunctionsError } from './error'; import { ContextProvider } from './context'; @@ -186,8 +187,8 @@ export function httpsCallable( return call(functionsInstance, name, data, options || {}); }; - callable.stream = (data?: RequestData | null) => { - return stream(functionsInstance, name, data); + callable.stream = (data?: RequestData | null, options?: HttpsCallableStreamOptions) => { + return stream(functionsInstance, name, data, options); }; return callable as HttpsCallable; @@ -207,8 +208,8 @@ export function httpsCallableFromURL { - return streamAtURL(functionsInstance, url, options || {}); + callable.stream = (data?: RequestData | null, options?: HttpsCallableStreamOptions) => { + return streamAtURL(functionsInstance, url, options); }; return callable as HttpsCallable; } @@ -354,25 +355,29 @@ async function callAtURL( * Calls a callable function asynchronously and returns a streaming result. * @param name The name of the callable trigger. * @param data The data to pass as params to the function. + * @param options Streaming request options. */ function stream( functionsInstance: FunctionsService, name: string, data: unknown, + options?: HttpsCallableStreamOptions ): Promise { const url = functionsInstance._url(name); - return streamAtURL(functionsInstance, url, data); + return streamAtURL(functionsInstance, url, options); } /** * Calls a callable function asynchronously and return a streaming result. * @param url The url of the callable trigger. * @param data The data to pass as params to the function. + * @param options Streaming request options. */ async function streamAtURL( functionsInstance: FunctionsService, url: string, data: unknown, + options?: HttpsCallableStreamOptions ): Promise { // Encode any special types, such as dates, in the input data. data = encode(data); @@ -396,12 +401,31 @@ async function streamAtURL( let response: Response; try { - response = await functionsInstance.fetchImpl(url, { + response = await fetch(url, { method: 'POST', body: JSON.stringify(body), - headers + headers, + signal: options?.signal }); } catch (e) { + if (e instanceof Error && e.name === 'AbortError') { + const error = new FunctionsError( + 'cancelled', + 'Request was cancelled.' + ); + return { + data: Promise.reject(error), + stream: { + [Symbol.asyncIterator]() { + return { + next() { + return Promise.reject(error); + } + }; + } + } + }; + } // This could be an unhandled error on the backend, or it could be a // network error. There's no way to know, since an unhandled error on the // backend will fail to set the proper CORS header, and thus will be @@ -434,10 +458,29 @@ async function streamAtURL( resultRejecter = reject; }); + // Set up abort handler for the stream + options?.signal?.addEventListener('abort', () => { + reader.cancel(); + const error = new FunctionsError( + 'cancelled', + 'Request was cancelled.' + ); + resultRejecter(error); + }); + const stream = { [Symbol.asyncIterator]() { return { async next() { + if (options?.signal?.aborted) { + const error = new FunctionsError( + 'cancelled', + 'Request was cancelled.' + ); + resultRejecter(error) + throw error; + } + while (true) { const { value, done } = await reader.read(); if (done) return { done: true, value: undefined }; From a02b25f456ec07d70c9eb79451c1986b134bdf78 Mon Sep 17 00:00:00 2001 From: taeold Date: Fri, 25 Oct 2024 23:12:33 +0000 Subject: [PATCH 03/30] Update API reports --- common/api-review/functions.api.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/common/api-review/functions.api.md b/common/api-review/functions.api.md index 55fef4c51fe..e0696d67385 100644 --- a/common/api-review/functions.api.md +++ b/common/api-review/functions.api.md @@ -57,6 +57,11 @@ export interface HttpsCallableResult { readonly data: ResponseData; } +// @public +export interface HttpsCallableStreamOptions { + signal?: AbortSignal; +} + // @public export interface HttpsCallableStreamResult { // (undocumented) From ab99970dbf81f99b08cf46d2089e95e121b20070 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Sat, 26 Oct 2024 20:48:10 -0700 Subject: [PATCH 04/30] Re-introduce fetchImpl. --- packages/functions/src/service.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 3afbe7227a6..fae2e16df1b 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -106,7 +106,8 @@ export class FunctionsService implements _FirebaseService { authProvider: Provider, messagingProvider: Provider, appCheckProvider: Provider, - regionOrCustomDomain: string = DEFAULT_REGION + regionOrCustomDomain: string = DEFAULT_REGION, + readonly fetchImpl: typeof fetch = fetch, ) { this.contextProvider = new ContextProvider( authProvider, @@ -224,13 +225,14 @@ export function httpsCallableFromURL { headers['Content-Type'] = 'application/json'; let response: Response; try { - response = await fetch(url, { + response = await fetchImpl(url, { method: 'POST', body: JSON.stringify(body), headers @@ -308,7 +310,7 @@ async function callAtURL( const failAfterHandle = failAfter(timeout); const response = await Promise.race([ - postJSON(url, body, headers), + postJSON(url, body, headers, functionsInstance.fetchImpl), failAfterHandle.promise, functionsInstance.cancelAllRequests ]); @@ -364,7 +366,7 @@ function stream( options?: HttpsCallableStreamOptions ): Promise { const url = functionsInstance._url(name); - return streamAtURL(functionsInstance, url, options); + return streamAtURL(functionsInstance, url, data, options); } /** @@ -401,7 +403,7 @@ async function streamAtURL( let response: Response; try { - response = await fetch(url, { + response = await functionsInstance.fetchImpl(url, { method: 'POST', body: JSON.stringify(body), headers, @@ -458,7 +460,6 @@ async function streamAtURL( resultRejecter = reject; }); - // Set up abort handler for the stream options?.signal?.addEventListener('abort', () => { reader.cancel(); const error = new FunctionsError( From 4aee03eca1e125635d5b2ebc899f106b47bbaffa Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Sat, 26 Oct 2024 22:29:22 -0700 Subject: [PATCH 05/30] Add more tests. --- packages/functions/src/callable.test.ts | 190 +++++++++++++++++++++--- packages/functions/src/public-types.ts | 2 +- packages/functions/src/service.ts | 1 - 3 files changed, 167 insertions(+), 26 deletions(-) diff --git a/packages/functions/src/callable.test.ts b/packages/functions/src/callable.test.ts index bff2ba0529e..4403799a3cb 100644 --- a/packages/functions/src/callable.test.ts +++ b/packages/functions/src/callable.test.ts @@ -37,7 +37,7 @@ import { AppCheckInternalComponentName } from '@firebase/app-check-interop-types'; import { makeFakeApp, createTestService } from '../test/utils'; -import { httpsCallable } from './service'; +import { FunctionsService, httpsCallable } from './service'; import { FUNCTIONS_TYPE } from './constants'; import { FunctionsError } from './error'; @@ -308,21 +308,26 @@ describe('Firebase Functions > Call', () => { describe('Firebase Functions > Stream', () => { let app: FirebaseApp; + let functions: FunctionsService; + let mockFetch: sinon.SinonStub; const region = 'us-central1'; - before(() => { + beforeEach(() => { const useEmulator = !!process.env.FIREBASE_FUNCTIONS_EMULATOR_ORIGIN; const projectId = useEmulator ? 'functions-integration-test' : TEST_PROJECT.projectId; const messagingSenderId = 'messaging-sender-id'; app = makeFakeApp({ projectId, messagingSenderId }); + functions = createTestService(app, region); + mockFetch = sinon.stub(functions, 'fetchImpl' as any); }); - it('successfully streams data and resolves final result', async () => { - const functions = createTestService(app, region); - const mockFetch = sinon.stub(globalThis, 'fetch' as any); + afterEach(() => { + mockFetch.restore(); + }) + it('successfully streams data and resolves final result', async () => { const mockResponse = new ReadableStream({ start(controller) { controller.enqueue(new TextEncoder().encode('data: {"message":"Hello"}\n')); @@ -349,14 +354,9 @@ describe('Firebase Functions > Stream', () => { expect(messages).to.deep.equal(['Hello', 'World']); expect(await streamResult.data).to.equal('Final Result'); - - mockFetch.restore(); }); it('handles network errors', async () => { - const functions = createTestService(app, region); - const mockFetch = sinon.stub(globalThis, 'fetch' as any); - mockFetch.rejects(new Error('Network error')); const func = httpsCallable, string, string>(functions, 'errTest'); @@ -371,17 +371,11 @@ describe('Firebase Functions > Stream', () => { errorThrown = true; expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/internal`); } - expect(errorThrown).to.be.true; - expect(streamResult.data).to.be.a('promise'); - - mockFetch.restore(); + expectError(streamResult.data, "internal", "Internal"); }); it('handles server-side errors', async () => { - const functions = createTestService(app, region); - const mockFetch = sinon.stub(globalThis, 'fetch' as any); - const mockResponse = new ReadableStream({ start(controller) { controller.enqueue(new TextEncoder().encode('data: {"error":{"status":"INVALID_ARGUMENT","message":"Invalid input"}}\n')); @@ -396,7 +390,7 @@ describe('Firebase Functions > Stream', () => { statusText: 'OK', } as Response); - const func = httpsCallable, string, string>(functions, 'errTest'); + const func = httpsCallable, string, string>(functions, 'stream'); const streamResult = await func.stream({}); let errorThrown = false; @@ -412,8 +406,6 @@ describe('Firebase Functions > Stream', () => { expect(errorThrown).to.be.true; expectError(streamResult.data, "invalid-argument", "Invalid input") - - mockFetch.restore(); }); it('includes authentication and app check tokens in request headers', async () => { @@ -427,9 +419,23 @@ describe('Firebase Functions > Stream', () => { authProvider.setComponent( new Component('auth-internal', () => authMock, ComponentType.PRIVATE) ); + const appCheckMock: FirebaseAppCheckInternal = { + getToken: async () => ({ token: 'app-check-token' }) + } as unknown as FirebaseAppCheckInternal; + const appCheckProvider = new Provider( + 'app-check-internal', + new ComponentContainer('test') + ); + appCheckProvider.setComponent( + new Component( + 'app-check-internal', + () => appCheckMock, + ComponentType.PRIVATE + ) + ); - const functions = createTestService(app, region, authProvider); - const mockFetch = sinon.stub(globalThis, 'fetch' as any); + const functions = createTestService(app, region, authProvider, undefined, appCheckProvider); + const mockFetch = sinon.stub(functions, 'fetchImpl' as any); const mockResponse = new ReadableStream({ start(controller) { @@ -445,7 +451,7 @@ describe('Firebase Functions > Stream', () => { statusText: 'OK', } as Response); - const func = httpsCallable, string, string>(functions, 'errTest'); + const func = httpsCallable, string, string>(functions, 'stream'); await func.stream({}); expect(mockFetch.calledOnce).to.be.true; @@ -453,7 +459,143 @@ describe('Firebase Functions > Stream', () => { expect(options.headers['Authorization']).to.equal('Bearer auth-token'); expect(options.headers['Content-Type']).to.equal('application/json'); expect(options.headers['Accept']).to.equal('text/event-stream'); + }); - mockFetch.restore(); + it('aborts during initial fetch', async () => { + const controller = new AbortController(); + + // Create a fetch that rejects when aborted + const fetchPromise = new Promise((_, reject) => { + controller.signal.addEventListener('abort', () => { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + reject(error); + }); + }); + mockFetch.returns(fetchPromise); + + const func = httpsCallable, string, string>(functions, 'streamTest'); + const streamPromise = func.stream({}, { signal: controller.signal }); + + controller.abort(); + + const streamResult = await streamPromise; + + // Verify fetch was called with abort signal + expect(mockFetch.calledOnce).to.be.true; + const [_, options] = mockFetch.firstCall.args; + expect(options.signal).to.equal(controller.signal); + + // Verify stream iteration throws AbortError + let errorThrown = false; + try { + for await (const _ of streamResult.stream) { + // Should not execute + } + } catch (error) { + errorThrown = true; + expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); + } + expect(errorThrown).to.be.true; + expectError(streamResult.data, "cancelled", "Request was cancelled") + }); + + it('aborts during streaming', async () => { + const controller = new AbortController(); + + const mockResponse = new ReadableStream({ + async start(controller) { + controller.enqueue(new TextEncoder().encode('data: {"message":"First"}\n')); + // Add delay to simulate network latency + await new Promise(resolve => setTimeout(resolve, 50)); + controller.enqueue(new TextEncoder().encode('data: {"message":"Second"}\n')); + await new Promise(resolve => setTimeout(resolve, 50)); + controller.enqueue(new TextEncoder().encode('data: {"result":"Final"}\n')); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK', + } as Response); + + const func = httpsCallable, string, string>(functions, 'streamTest'); + const streamResult = await func.stream({}, { signal: controller.signal }); + + const messages: string[] = []; + try { + for await (const message of streamResult.stream) { + messages.push(message); + if (messages.length === 1) { + // Abort after receiving first message + controller.abort(); + } + } + throw new Error('Stream should have been aborted'); + } catch (error) { + expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); + } + expect(messages).to.deep.equal(['First']); + expectError(streamResult.data, "cancelled", "Request was cancelled") + }); + + it('fails immediately with pre-aborted signal', async () => { + mockFetch.callsFake((url: string, options: RequestInit) => { + if (options.signal?.aborted) { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + return Promise.reject(error); + } + return Promise.resolve(new Response()); + }); + const func = httpsCallable, string, string>(functions, 'streamTest'); + const streamResult = await func.stream({}, { signal: AbortSignal.abort() }); + + let errorThrown = false; + try { + for await (const _ of streamResult.stream) { + // Should not execute + } + } catch (error) { + errorThrown = true; + expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); + } + expect(errorThrown).to.be.true; + expectError(streamResult.data, "cancelled", "Request was cancelled") + }); + + it('properly handles AbortSignal.timeout()', async () => { + const timeoutMs = 50; + const signal = AbortSignal.timeout(timeoutMs); + + mockFetch.callsFake(async (url: string, options: RequestInit) => { + await new Promise((resolve, reject) => { + options.signal?.addEventListener('abort', () => { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + reject(error); + }); + setTimeout(resolve, timeoutMs * 3); + }); + + // If we get here, timeout didn't occur + return new Response(); + }); + + const func = httpsCallable, string, string>(functions, 'streamTest'); + const streamResult = await func.stream({}, { signal }); + + try { + for await (const message of streamResult.stream) { + // Should not execute + } + throw new Error('Stream should have timed out'); + } catch (error) { + expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); + } + expectError(streamResult.data, "cancelled", "Request was cancelled") }); }); diff --git a/packages/functions/src/public-types.ts b/packages/functions/src/public-types.ts index 76cfb103257..381570eb4d9 100644 --- a/packages/functions/src/public-types.ts +++ b/packages/functions/src/public-types.ts @@ -43,7 +43,7 @@ export interface HttpsCallableStreamResult = { (data?: RequestData | null): Promise>; - stream: (data?: RequestData | null) => Promise>; + stream: (data?: RequestData | null, options?: HttpsCallableStreamOptions) => Promise>; }; /** diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index fae2e16df1b..23f2d292bda 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -285,7 +285,6 @@ async function callAtURL( data: unknown, options: HttpsCallableOptions ): Promise { - console.log(url); // Encode any special types, such as dates, in the input data. data = encode(data); const body = { data }; From fdc70d8bdb60f901fbd1e81127c3c5b9ee3c6e3a Mon Sep 17 00:00:00 2001 From: taeold Date: Sun, 27 Oct 2024 05:41:30 +0000 Subject: [PATCH 06/30] Update API reports --- common/api-review/functions.api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/api-review/functions.api.md b/common/api-review/functions.api.md index e0696d67385..d56f522c1c7 100644 --- a/common/api-review/functions.api.md +++ b/common/api-review/functions.api.md @@ -37,7 +37,7 @@ export function getFunctions(app?: FirebaseApp, regionOrCustomDomain?: string): // @public export type HttpsCallable = { (data?: RequestData | null): Promise>; - stream: (data?: RequestData | null) => Promise>; + stream: (data?: RequestData | null, options?: HttpsCallableStreamOptions) => Promise>; }; // @public From 55d9266653291d34740c3e8db214183bba9d9d4f Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Sun, 27 Oct 2024 11:12:09 -0700 Subject: [PATCH 07/30] Add comment. --- packages/functions/src/service.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 23f2d292bda..ab0a5934d2b 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -503,6 +503,7 @@ async function streamAtURL( throw error; } } + // ignore all other lines (newline, comments, etc.) } } } From 785e9061a55fda9636446a147bcdc96379bb9d74 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Wed, 6 Nov 2024 15:34:50 -0800 Subject: [PATCH 08/30] Allow reading multiple data chunks from single read() chunk. --- packages/functions/src/service.ts | 60 ++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index ab0a5934d2b..569a54ca3b8 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -107,7 +107,7 @@ export class FunctionsService implements _FirebaseService { messagingProvider: Provider, appCheckProvider: Provider, regionOrCustomDomain: string = DEFAULT_REGION, - readonly fetchImpl: typeof fetch = fetch, + readonly fetchImpl: typeof fetch = (...args) => fetch(...args), ) { this.contextProvider = new ContextProvider( authProvider, @@ -450,6 +450,7 @@ async function streamAtURL( const reader = response.body!.getReader(); const decoder = new TextDecoder(); + let pendingLines: string[] = []; let buffer = ''; let resultResolver: (value: unknown) => void; let resultRejecter: (reason: any) => void; @@ -470,6 +471,30 @@ async function streamAtURL( const stream = { [Symbol.asyncIterator]() { + + const processLine = (line: string | undefined) => { + // ignore all other lines (newline, comments, etc.) + if (!line?.startsWith('data: ')) return null; + + try { + const jsonData = JSON.parse(line.slice(6)); + if ('result' in jsonData) { + resultResolver(decode(jsonData.result)); + return { done: true, value: undefined }; + } + if ('message' in jsonData) { + return { done: false, value: decode(jsonData.message) }; + } + if ('error' in jsonData) { + const error = _errorForResponse(0, jsonData); + resultRejecter(error); + throw error; + } + return null; // Unrecognize keys. Skip this line. + } catch (error) { + // Not json. Skip this line. + } + }; return { async next() { if (options?.signal?.aborted) { @@ -481,29 +506,30 @@ async function streamAtURL( throw error; } + while (pendingLines.length > 0) { + const result = processLine(pendingLines.shift()); + if (result) return result; + } + while (true) { const { value, done } = await reader.read(); - if (done) return { done: true, value: undefined }; + + if (done) { + if (buffer.trim()) { + const result = processLine(buffer); + if (result) return result; + } + return { done: true, value: undefined }; + } buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; + pendingLines.push(...lines.filter(line => line.trim())); - for (const line of lines) { - if (line.startsWith('data: ')) { - const jsonData = JSON.parse(line.slice(6)); - if ('result' in jsonData) { - resultResolver(decode(jsonData.result)); - return { done: true, value: undefined }; - } else if ('message' in jsonData) { - return { done: false, value: decode(jsonData.message) }; - } else if ('error' in jsonData) { - const error = _errorForResponse(0, jsonData); - resultRejecter(error) - throw error; - } - } - // ignore all other lines (newline, comments, etc.) + if (pendingLines.length > 0) { + const result = processLine(pendingLines.shift()); + if (result) return result; } } } From 861edce78b0cd501c74bce8dde414c17aa8d31e6 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Wed, 6 Nov 2024 15:51:06 -0800 Subject: [PATCH 09/30] Fix lint errors. --- packages/functions/src/callable.test.ts | 47 +++++++++++++++++++------ packages/functions/src/public-types.ts | 4 +-- packages/functions/src/service.ts | 28 +++++++-------- 3 files changed, 53 insertions(+), 26 deletions(-) diff --git a/packages/functions/src/callable.test.ts b/packages/functions/src/callable.test.ts index 4403799a3cb..6947d3375d0 100644 --- a/packages/functions/src/callable.test.ts +++ b/packages/functions/src/callable.test.ts @@ -107,9 +107,8 @@ describe('Firebase Functions > Call', () => { long: 420 }); } catch (err) { - console.error(err) + console.error(err); } - }); it('scalars', async () => { @@ -325,7 +324,7 @@ describe('Firebase Functions > Stream', () => { afterEach(() => { mockFetch.restore(); - }) + }); it('successfully streams data and resolves final result', async () => { const mockResponse = new ReadableStream({ @@ -356,6 +355,34 @@ describe('Firebase Functions > Stream', () => { expect(await streamResult.data).to.equal('Final Result'); }); + it('successfully process request chunk with multiple events', async () => { + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('data: {"message":"Hello"}\n\ndata: {"message":"World"}\n')); + controller.enqueue(new TextEncoder().encode('data: {"result":"Final Result"}\n')); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK', + } as Response); + + const func = httpsCallable, string, string>(functions, 'streamTest'); + const streamResult = await func.stream({}); + + const messages: string[] = []; + for await (const message of streamResult.stream) { + messages.push(message); + } + + expect(messages).to.deep.equal(['Hello', 'World']); + expect(await streamResult.data).to.equal('Final Result'); + }); + it('handles network errors', async () => { mockFetch.rejects(new Error('Network error')); @@ -372,7 +399,7 @@ describe('Firebase Functions > Stream', () => { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/internal`); } expect(errorThrown).to.be.true; - expectError(streamResult.data, "internal", "Internal"); + await expectError(streamResult.data, "internal", "Internal"); }); it('handles server-side errors', async () => { @@ -405,7 +432,7 @@ describe('Firebase Functions > Stream', () => { } expect(errorThrown).to.be.true; - expectError(streamResult.data, "invalid-argument", "Invalid input") + await expectError(streamResult.data, "invalid-argument", "Invalid input"); }); it('includes authentication and app check tokens in request headers', async () => { @@ -497,7 +524,7 @@ describe('Firebase Functions > Stream', () => { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); } expect(errorThrown).to.be.true; - expectError(streamResult.data, "cancelled", "Request was cancelled") + await expectError(streamResult.data, "cancelled", "Request was cancelled"); }); it('aborts during streaming', async () => { @@ -539,7 +566,7 @@ describe('Firebase Functions > Stream', () => { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); } expect(messages).to.deep.equal(['First']); - expectError(streamResult.data, "cancelled", "Request was cancelled") + await expectError(streamResult.data, "cancelled", "Request was cancelled"); }); it('fails immediately with pre-aborted signal', async () => { @@ -564,7 +591,7 @@ describe('Firebase Functions > Stream', () => { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); } expect(errorThrown).to.be.true; - expectError(streamResult.data, "cancelled", "Request was cancelled") + await expectError(streamResult.data, "cancelled", "Request was cancelled"); }); it('properly handles AbortSignal.timeout()', async () => { @@ -589,13 +616,13 @@ describe('Firebase Functions > Stream', () => { const streamResult = await func.stream({}, { signal }); try { - for await (const message of streamResult.stream) { + for await (const _ of streamResult.stream) { // Should not execute } throw new Error('Stream should have timed out'); } catch (error) { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); } - expectError(streamResult.data, "cancelled", "Request was cancelled") + await expectError(streamResult.data, "cancelled", "Request was cancelled"); }); }); diff --git a/packages/functions/src/public-types.ts b/packages/functions/src/public-types.ts index 381570eb4d9..c9da0836e5c 100644 --- a/packages/functions/src/public-types.ts +++ b/packages/functions/src/public-types.ts @@ -41,10 +41,10 @@ export interface HttpsCallableStreamResult = { +export interface HttpsCallable { (data?: RequestData | null): Promise>; stream: (data?: RequestData | null, options?: HttpsCallableStreamOptions) => Promise>; -}; +} /** * An interface for metadata about how calls should be executed. diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 569a54ca3b8..ac024e6df1d 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -184,7 +184,7 @@ export function httpsCallable( name: string, options?: HttpsCallableOptions ): HttpsCallable { - const callable = (data?: RequestData | null) => { + const callable = (data?: RequestData | null): Promise => { return call(functionsInstance, name, data, options || {}); }; @@ -205,7 +205,7 @@ export function httpsCallableFromURL { - const callable = (data?: RequestData | null) => { + const callable = (data?: RequestData | null): Promise => { return callAtURL(functionsInstance, url, data, options || {}); }; @@ -431,7 +431,7 @@ async function streamAtURL( // network error. There's no way to know, since an unhandled error on the // backend will fail to set the proper CORS header, and thus will be // treated as a network error by fetch. - const error = _errorForResponse(0, null) + const error = _errorForResponse(0, null); return { data: Promise.reject(error), // Return an empty async iterator @@ -450,10 +450,10 @@ async function streamAtURL( const reader = response.body!.getReader(); const decoder = new TextDecoder(); - let pendingLines: string[] = []; + const pendingLines: string[] = []; let buffer = ''; let resultResolver: (value: unknown) => void; - let resultRejecter: (reason: any) => void; + let resultRejecter: (reason: unknown) => void; const resultPromise = new Promise((resolve, reject) => { resultResolver = resolve; @@ -461,7 +461,7 @@ async function streamAtURL( }); options?.signal?.addEventListener('abort', () => { - reader.cancel(); + void reader.cancel(); const error = new FunctionsError( 'cancelled', 'Request was cancelled.' @@ -472,9 +472,9 @@ async function streamAtURL( const stream = { [Symbol.asyncIterator]() { - const processLine = (line: string | undefined) => { + const processLine = (line: string | undefined): { done: boolean, value: unknown } | null => { // ignore all other lines (newline, comments, etc.) - if (!line?.startsWith('data: ')) return null; + if (!line?.startsWith('data: ')) { return null; } try { const jsonData = JSON.parse(line.slice(6)); @@ -492,7 +492,7 @@ async function streamAtURL( } return null; // Unrecognize keys. Skip this line. } catch (error) { - // Not json. Skip this line. + return null; } }; return { @@ -502,13 +502,13 @@ async function streamAtURL( 'cancelled', 'Request was cancelled.' ); - resultRejecter(error) + resultRejecter(error); throw error; } while (pendingLines.length > 0) { const result = processLine(pendingLines.shift()); - if (result) return result; + if (result) { return result; } } while (true) { @@ -517,7 +517,7 @@ async function streamAtURL( if (done) { if (buffer.trim()) { const result = processLine(buffer); - if (result) return result; + if (result) { return result; } } return { done: true, value: undefined }; } @@ -529,7 +529,7 @@ async function streamAtURL( if (pendingLines.length > 0) { const result = processLine(pendingLines.shift()); - if (result) return result; + if (result) { return result; } } } } @@ -540,5 +540,5 @@ async function streamAtURL( return { stream, data: resultPromise, - } + }; } From 4b188d00d0e327a87f0d5eb669f5451332d26b91 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Wed, 6 Nov 2024 15:59:45 -0800 Subject: [PATCH 10/30] Don't swallow errors. --- packages/functions/src/service.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index ac024e6df1d..ba997c15246 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -474,7 +474,9 @@ async function streamAtURL( const processLine = (line: string | undefined): { done: boolean, value: unknown } | null => { // ignore all other lines (newline, comments, etc.) - if (!line?.startsWith('data: ')) { return null; } + if (!line?.startsWith('data: ')) { + return null; + } try { const jsonData = JSON.parse(line.slice(6)); @@ -492,6 +494,10 @@ async function streamAtURL( } return null; // Unrecognize keys. Skip this line. } catch (error) { + if (error instanceof FunctionsError) { + throw error; + } + // ignore other parsing error return null; } }; From 39c44d5a155f227049490ea59b331fc27d7bfebc Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Wed, 6 Nov 2024 15:59:54 -0800 Subject: [PATCH 11/30] Fix linter issues. --- packages/functions/src/callable.test.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/functions/src/callable.test.ts b/packages/functions/src/callable.test.ts index 6947d3375d0..698cb25ef3d 100644 --- a/packages/functions/src/callable.test.ts +++ b/packages/functions/src/callable.test.ts @@ -399,7 +399,7 @@ describe('Firebase Functions > Stream', () => { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/internal`); } expect(errorThrown).to.be.true; - await expectError(streamResult.data, "internal", "Internal"); + await expectError(streamResult.data, "internal", "internal"); }); it('handles server-side errors', async () => { @@ -524,7 +524,7 @@ describe('Firebase Functions > Stream', () => { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); } expect(errorThrown).to.be.true; - await expectError(streamResult.data, "cancelled", "Request was cancelled"); + await expectError(streamResult.data, "cancelled", "Request was cancelled."); }); it('aborts during streaming', async () => { @@ -566,7 +566,7 @@ describe('Firebase Functions > Stream', () => { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); } expect(messages).to.deep.equal(['First']); - await expectError(streamResult.data, "cancelled", "Request was cancelled"); + await expectError(streamResult.data, "cancelled", "Request was cancelled."); }); it('fails immediately with pre-aborted signal', async () => { @@ -591,7 +591,7 @@ describe('Firebase Functions > Stream', () => { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); } expect(errorThrown).to.be.true; - await expectError(streamResult.data, "cancelled", "Request was cancelled"); + await expectError(streamResult.data, "cancelled", "Request was cancelled."); }); it('properly handles AbortSignal.timeout()', async () => { @@ -623,6 +623,6 @@ describe('Firebase Functions > Stream', () => { } catch (error) { expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); } - await expectError(streamResult.data, "cancelled", "Request was cancelled"); + await expectError(streamResult.data, "cancelled", "Request was cancelled."); }); }); From 1c1f533882be5d53ba7f5ea42da65830436828b6 Mon Sep 17 00:00:00 2001 From: taeold Date: Thu, 7 Nov 2024 00:11:14 +0000 Subject: [PATCH 12/30] Update API reports --- common/api-review/functions.api.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/api-review/functions.api.md b/common/api-review/functions.api.md index d56f522c1c7..da89bb94e17 100644 --- a/common/api-review/functions.api.md +++ b/common/api-review/functions.api.md @@ -35,10 +35,12 @@ export type FunctionsErrorCodeCore = 'ok' | 'cancelled' | 'unknown' | 'invalid-a export function getFunctions(app?: FirebaseApp, regionOrCustomDomain?: string): Functions; // @public -export type HttpsCallable = { +export interface HttpsCallable { + // (undocumented) (data?: RequestData | null): Promise>; + // (undocumented) stream: (data?: RequestData | null, options?: HttpsCallableStreamOptions) => Promise>; -}; +} // @public export function httpsCallable(functionsInstance: Functions, name: string, options?: HttpsCallableOptions): HttpsCallable; From ebd74cb26d773c5bafd18447a3105d0626f3199e Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Sat, 9 Nov 2024 12:02:27 -0800 Subject: [PATCH 13/30] Add changeset. --- .changeset/bright-scissors-care.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/bright-scissors-care.md diff --git a/.changeset/bright-scissors-care.md b/.changeset/bright-scissors-care.md new file mode 100644 index 00000000000..d7f94f13d19 --- /dev/null +++ b/.changeset/bright-scissors-care.md @@ -0,0 +1,6 @@ +--- +'@firebase/functions-types': minor +'@firebase/functions': minor +--- + +Add `.stream()` api for callable functions for consuming streaming responses. From cf56623e8ea51b2886a492d9518266fbaf44da55 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 21 Nov 2024 16:25:41 -0800 Subject: [PATCH 14/30] Update code to use pump() patter. --- packages/functions/src/service.ts | 156 +++++++++++++++++------------- 1 file changed, 89 insertions(+), 67 deletions(-) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index ba997c15246..e69fdb87c8b 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -447,11 +447,6 @@ async function streamAtURL( }; } - const reader = response.body!.getReader(); - const decoder = new TextDecoder(); - - const pendingLines: string[] = []; - let buffer = ''; let resultResolver: (value: unknown) => void; let resultRejecter: (reason: unknown) => void; @@ -461,7 +456,6 @@ async function streamAtURL( }); options?.signal?.addEventListener('abort', () => { - void reader.cancel(); const error = new FunctionsError( 'cancelled', 'Request was cancelled.' @@ -469,82 +463,110 @@ async function streamAtURL( resultRejecter(error); }); - const stream = { - [Symbol.asyncIterator]() { + const processLine = (line: string, controller: ReadableStreamDefaultController): void => { + // ignore all other lines (newline, comments, etc.) + if (!line.startsWith('data: ')) { + return; + } + try { + // Skip 'data: ' (5 chars) + const jsonData = JSON.parse(line.slice(6)); + if ('result' in jsonData) { + resultResolver(decode(jsonData.result)); + return; + } + if ('message' in jsonData) { + controller.enqueue(decode(jsonData.message)); + return; + } + if ('error' in jsonData) { + const error = _errorForResponse(0, jsonData); + controller.error(error); + resultRejecter(error); + return; + } + } catch (error) { + if (error instanceof FunctionsError) { + controller.error(error); + resultRejecter(error); + return; + } + // ignore other parsing errors + } + }; - const processLine = (line: string | undefined): { done: boolean, value: unknown } | null => { - // ignore all other lines (newline, comments, etc.) - if (!line?.startsWith('data: ')) { - return null; + const reader = response.body!.getReader(); + const decoder = new TextDecoder(); + const rstream = new ReadableStream({ + start(controller) { + let currentText = ''; + return pump(); + async function pump(): Promise { + if (options?.signal?.aborted) { + const error = new FunctionsError('cancelled', 'Request was cancelled'); + controller.error(error); + resultRejecter(error); + return Promise.resolve(); } try { - const jsonData = JSON.parse(line.slice(6)); - if ('result' in jsonData) { - resultResolver(decode(jsonData.result)); - return { done: true, value: undefined }; - } - if ('message' in jsonData) { - return { done: false, value: decode(jsonData.message) }; - } - if ('error' in jsonData) { - const error = _errorForResponse(0, jsonData); - resultRejecter(error); - throw error; - } - return null; // Unrecognize keys. Skip this line. - } catch (error) { - if (error instanceof FunctionsError) { - throw error; + const { value, done } = await reader.read(); + if (done) { + if (currentText.trim()) { + processLine(currentText.trim(), controller); + } + controller.close(); + return; } - // ignore other parsing error - return null; - } - }; - return { - async next() { + if (options?.signal?.aborted) { - const error = new FunctionsError( - 'cancelled', - 'Request was cancelled.' - ); + const error = new FunctionsError('cancelled', 'Request was cancelled'); + controller.error(error); resultRejecter(error); - throw error; - } - - while (pendingLines.length > 0) { - const result = processLine(pendingLines.shift()); - if (result) { return result; } + await reader.cancel(); + return; } - while (true) { - const { value, done } = await reader.read(); - - if (done) { - if (buffer.trim()) { - const result = processLine(buffer); - if (result) { return result; } - } - return { done: true, value: undefined }; - } - - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - pendingLines.push(...lines.filter(line => line.trim())); + currentText += decoder.decode(value, { stream: true }); + const lines = currentText.split("\n"); + currentText = lines.pop() || ''; - if (pendingLines.length > 0) { - const result = processLine(pendingLines.shift()); - if (result) { return result; } + for (const line of lines) { + if (line.trim()) { + processLine(line.trim(), controller); } } - } - }; + return pump(); + } catch (error) { + const functionsError = error instanceof FunctionsError + ? error + : _errorForResponse(0, null); + controller.error(functionsError); + resultRejecter(functionsError); + }; + } + }, + cancel() { + return reader.cancel(); } - }; + }); return { - stream, + stream: { + [Symbol.asyncIterator]() { + const rreader = rstream.getReader(); + return { + async next() { + const { value, done } = await rreader.read(); + return { value: value as unknown, done }; + }, + async return() { + await reader.cancel(); + return { done: true, value: undefined }; + } + }; + } + }, data: resultPromise, }; } From cfbcd46335056fed645ed318aca5a91f80511d3c Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 21 Nov 2024 16:43:25 -0800 Subject: [PATCH 15/30] Refactor implementation for better readability. --- packages/functions/src/service.ts | 72 ++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 26 deletions(-) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index e69fdb87c8b..2065aec5502 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -33,6 +33,8 @@ import { AppCheckInternalComponentName } from '@firebase/app-check-interop-types export const DEFAULT_REGION = 'us-central1'; +const responseLineRE = /^data: (.*?)(?:\n|$)/; + /** * The response to an http request. */ @@ -449,7 +451,6 @@ async function streamAtURL( let resultResolver: (value: unknown) => void; let resultRejecter: (reason: unknown) => void; - const resultPromise = new Promise((resolve, reject) => { resultResolver = resolve; resultRejecter = reject; @@ -463,14 +464,51 @@ async function streamAtURL( resultRejecter(error); }); + const reader = response.body!.getReader(); + const rstream = createResponseStream( + reader, + resultResolver!, + resultRejecter!, + options?.signal + ); + + return { + stream: { + [Symbol.asyncIterator]() { + const rreader = rstream.getReader(); + return { + async next() { + const { value, done } = await rreader.read(); + return { value: value as unknown, done }; + }, + async return() { + await rreader.cancel(); + return { done: true, value: undefined }; + } + }; + } + }, + data: resultPromise, + }; +} + +function createResponseStream( + reader: ReadableStreamDefaultReader, + resultResolver: (value: unknown) => void, + resultRejecter: (reason: unknown) => void, + signal?: AbortSignal +): ReadableStream { const processLine = (line: string, controller: ReadableStreamDefaultController): void => { + const match = line.match(responseLineRE); + // // ignore all other lines (newline, comments, etc.) - if (!line.startsWith('data: ')) { + if (!match) { return; } + + const data = match[1]; try { - // Skip 'data: ' (5 chars) - const jsonData = JSON.parse(line.slice(6)); + const jsonData = JSON.parse(data); if ('result' in jsonData) { resultResolver(decode(jsonData.result)); return; @@ -495,14 +533,13 @@ async function streamAtURL( } }; - const reader = response.body!.getReader(); const decoder = new TextDecoder(); - const rstream = new ReadableStream({ + return new ReadableStream({ start(controller) { let currentText = ''; return pump(); async function pump(): Promise { - if (options?.signal?.aborted) { + if (signal?.aborted) { const error = new FunctionsError('cancelled', 'Request was cancelled'); controller.error(error); resultRejecter(error); @@ -519,7 +556,7 @@ async function streamAtURL( return; } - if (options?.signal?.aborted) { + if (signal?.aborted) { const error = new FunctionsError('cancelled', 'Request was cancelled'); controller.error(error); resultRejecter(error); @@ -551,22 +588,5 @@ async function streamAtURL( } }); - return { - stream: { - [Symbol.asyncIterator]() { - const rreader = rstream.getReader(); - return { - async next() { - const { value, done } = await rreader.read(); - return { value: value as unknown, done }; - }, - async return() { - await reader.cancel(); - return { done: true, value: undefined }; - } - }; - } - }, - data: resultPromise, - }; + } From 1801c536fc62ce4b398fe34555d2d53589b758ba Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 21 Nov 2024 16:44:54 -0800 Subject: [PATCH 16/30] Nits. --- config/.gitignore | 1 + packages/functions/src/service.ts | 12 ------------ 2 files changed, 1 insertion(+), 12 deletions(-) create mode 100644 config/.gitignore diff --git a/config/.gitignore b/config/.gitignore new file mode 100644 index 00000000000..54b6cccf34d --- /dev/null +++ b/config/.gitignore @@ -0,0 +1 @@ +# Uncomment this if you'd like others to create their own Firebase project. diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 2065aec5502..dfbe2b578de 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -385,7 +385,6 @@ async function streamAtURL( // Encode any special types, such as dates, in the input data. data = encode(data); const body = { data }; - // Add a header for the authToken. const headers: { [key: string]: string } = {}; const context = await functionsInstance.contextProvider.getContext(); @@ -398,7 +397,6 @@ async function streamAtURL( if (context.appCheckToken !== null) { headers['X-Firebase-AppCheck'] = context.appCheckToken; } - headers['Content-Type'] = 'application/json'; headers['Accept'] = 'text/event-stream'; @@ -448,14 +446,12 @@ async function streamAtURL( } }; } - let resultResolver: (value: unknown) => void; let resultRejecter: (reason: unknown) => void; const resultPromise = new Promise((resolve, reject) => { resultResolver = resolve; resultRejecter = reject; }); - options?.signal?.addEventListener('abort', () => { const error = new FunctionsError( 'cancelled', @@ -463,7 +459,6 @@ async function streamAtURL( ); resultRejecter(error); }); - const reader = response.body!.getReader(); const rstream = createResponseStream( reader, @@ -471,7 +466,6 @@ async function streamAtURL( resultRejecter!, options?.signal ); - return { stream: { [Symbol.asyncIterator]() { @@ -500,12 +494,10 @@ function createResponseStream( ): ReadableStream { const processLine = (line: string, controller: ReadableStreamDefaultController): void => { const match = line.match(responseLineRE); - // // ignore all other lines (newline, comments, etc.) if (!match) { return; } - const data = match[1]; try { const jsonData = JSON.parse(data); @@ -545,7 +537,6 @@ function createResponseStream( resultRejecter(error); return Promise.resolve(); } - try { const { value, done } = await reader.read(); if (done) { @@ -555,7 +546,6 @@ function createResponseStream( controller.close(); return; } - if (signal?.aborted) { const error = new FunctionsError('cancelled', 'Request was cancelled'); controller.error(error); @@ -563,11 +553,9 @@ function createResponseStream( await reader.cancel(); return; } - currentText += decoder.decode(value, { stream: true }); const lines = currentText.split("\n"); currentText = lines.pop() || ''; - for (const line of lines) { if (line.trim()) { processLine(line.trim(), controller); From bf4f5e940f8f1dee430dd581defe74cf81e4c1a2 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Thu, 21 Nov 2024 16:45:19 -0800 Subject: [PATCH 17/30] More nits. --- packages/functions/src/service.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index dfbe2b578de..76f110f9140 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -575,6 +575,4 @@ function createResponseStream( return reader.cancel(); } }); - - } From 31998c238f10e39b673ececaa0289c856c46c5e0 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 22 Nov 2024 15:48:53 -0800 Subject: [PATCH 18/30] Revert changes to the functions-type package. --- .changeset/bright-scissors-care.md | 2 +- packages/functions-types/index.d.ts | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/.changeset/bright-scissors-care.md b/.changeset/bright-scissors-care.md index d7f94f13d19..b97b22b7248 100644 --- a/.changeset/bright-scissors-care.md +++ b/.changeset/bright-scissors-care.md @@ -1,6 +1,6 @@ --- -'@firebase/functions-types': minor '@firebase/functions': minor +'firebase': minor --- Add `.stream()` api for callable functions for consuming streaming responses. diff --git a/packages/functions-types/index.d.ts b/packages/functions-types/index.d.ts index 0c5499a1154..4d1770377d3 100644 --- a/packages/functions-types/index.d.ts +++ b/packages/functions-types/index.d.ts @@ -22,21 +22,12 @@ export interface HttpsCallableResult { readonly data: any; } -/** - * An HttpsCallableStreamResult wraps a single streaming result from a function call. - */ -export interface HttpsCallableStreamResult { - readonly data: Promise; - readonly stream: AsyncIterable; -} - /** * An HttpsCallable is a reference to a "callable" http trigger in * Google Cloud Functions. */ export interface HttpsCallable { (data?: {} | null): Promise; - stream(data?: {} | null): Promise; } /** From 72e87b4276563187cb8f7d768c0fd432d57e6ab1 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 22 Nov 2024 15:49:59 -0800 Subject: [PATCH 19/30] Remove duplicate entry in the package script for functions. --- packages/functions/package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/functions/package.json b/packages/functions/package.json index 388de9e6c1a..cc53d9120fb 100644 --- a/packages/functions/package.json +++ b/packages/functions/package.json @@ -37,7 +37,6 @@ "test:browser": "karma start", "test:browser:debug": "karma start --browsers=Chrome --auto-watch", "test:node": "TS_NODE_COMPILER_OPTIONS='{\"module\":\"commonjs\"}' nyc --reporter lcovonly -- mocha 'src/{,!(browser)/**/}*.test.ts' --file src/index.ts --config ../../config/mocharc.node.js", - "test:emulator": "env FIREBASE_FUNCTIONS_EMULATOR_ORIGIN=http://127.0.0.1:5005 run-p test:node", "test:emulator": "env FIREBASE_FUNCTIONS_EMULATOR_ORIGIN=http://127.0.0.1:5005 run-p --npm-path npm test:node", "trusted-type-check": "tsec -p tsconfig.json --noEmit", "api-report": "api-extractor run --local --verbose", From d8ebbb83713dac48ad1150b7e479f865e06b9f9d Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 22 Nov 2024 15:50:54 -0800 Subject: [PATCH 20/30] Run formatter. --- packages/functions/src/api.ts | 8 +- packages/functions/src/callable.test.ts | 144 +++++++++++++++++------- packages/functions/src/public-types.ts | 21 ++-- packages/functions/src/service.ts | 64 +++++++---- 4 files changed, 167 insertions(+), 70 deletions(-) diff --git a/packages/functions/src/api.ts b/packages/functions/src/api.ts index 159ac26ac70..7f92cba8343 100644 --- a/packages/functions/src/api.ts +++ b/packages/functions/src/api.ts @@ -88,7 +88,11 @@ export function connectFunctionsEmulator( * @param name - The name of the trigger. * @public */ -export function httpsCallable( +export function httpsCallable< + RequestData = unknown, + ResponseData = unknown, + StreamData = unknown +>( functionsInstance: Functions, name: string, options?: HttpsCallableOptions @@ -108,7 +112,7 @@ export function httpsCallable( functionsInstance: Functions, url: string, diff --git a/packages/functions/src/callable.test.ts b/packages/functions/src/callable.test.ts index 698cb25ef3d..26d45c32ac8 100644 --- a/packages/functions/src/callable.test.ts +++ b/packages/functions/src/callable.test.ts @@ -98,7 +98,6 @@ describe('Firebase Functions > Call', () => { { message: string; code: number; long: number } >(functions, 'dataTest'); try { - const result = await func(data); expect(result.data).to.deep.equal({ @@ -329,9 +328,15 @@ describe('Firebase Functions > Stream', () => { it('successfully streams data and resolves final result', async () => { const mockResponse = new ReadableStream({ start(controller) { - controller.enqueue(new TextEncoder().encode('data: {"message":"Hello"}\n')); - controller.enqueue(new TextEncoder().encode('data: {"message":"World"}\n')); - controller.enqueue(new TextEncoder().encode('data: {"result":"Final Result"}\n')); + controller.enqueue( + new TextEncoder().encode('data: {"message":"Hello"}\n') + ); + controller.enqueue( + new TextEncoder().encode('data: {"message":"World"}\n') + ); + controller.enqueue( + new TextEncoder().encode('data: {"result":"Final Result"}\n') + ); controller.close(); } }); @@ -340,10 +345,13 @@ describe('Firebase Functions > Stream', () => { body: mockResponse, headers: new Headers({ 'Content-Type': 'text/event-stream' }), status: 200, - statusText: 'OK', + statusText: 'OK' } as Response); - const func = httpsCallable, string, string>(functions, 'streamTest'); + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); const streamResult = await func.stream({}); const messages: string[] = []; @@ -358,8 +366,14 @@ describe('Firebase Functions > Stream', () => { it('successfully process request chunk with multiple events', async () => { const mockResponse = new ReadableStream({ start(controller) { - controller.enqueue(new TextEncoder().encode('data: {"message":"Hello"}\n\ndata: {"message":"World"}\n')); - controller.enqueue(new TextEncoder().encode('data: {"result":"Final Result"}\n')); + controller.enqueue( + new TextEncoder().encode( + 'data: {"message":"Hello"}\n\ndata: {"message":"World"}\n' + ) + ); + controller.enqueue( + new TextEncoder().encode('data: {"result":"Final Result"}\n') + ); controller.close(); } }); @@ -368,10 +382,13 @@ describe('Firebase Functions > Stream', () => { body: mockResponse, headers: new Headers({ 'Content-Type': 'text/event-stream' }), status: 200, - statusText: 'OK', + statusText: 'OK' } as Response); - const func = httpsCallable, string, string>(functions, 'streamTest'); + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); const streamResult = await func.stream({}); const messages: string[] = []; @@ -386,7 +403,10 @@ describe('Firebase Functions > Stream', () => { it('handles network errors', async () => { mockFetch.rejects(new Error('Network error')); - const func = httpsCallable, string, string>(functions, 'errTest'); + const func = httpsCallable, string, string>( + functions, + 'errTest' + ); const streamResult = await func.stream({}); let errorThrown = false; @@ -396,16 +416,22 @@ describe('Firebase Functions > Stream', () => { } } catch (error: unknown) { errorThrown = true; - expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/internal`); + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/internal` + ); } expect(errorThrown).to.be.true; - await expectError(streamResult.data, "internal", "internal"); + await expectError(streamResult.data, 'internal', 'internal'); }); it('handles server-side errors', async () => { const mockResponse = new ReadableStream({ start(controller) { - controller.enqueue(new TextEncoder().encode('data: {"error":{"status":"INVALID_ARGUMENT","message":"Invalid input"}}\n')); + controller.enqueue( + new TextEncoder().encode( + 'data: {"error":{"status":"INVALID_ARGUMENT","message":"Invalid input"}}\n' + ) + ); controller.close(); } }); @@ -414,10 +440,13 @@ describe('Firebase Functions > Stream', () => { body: mockResponse, headers: new Headers({ 'Content-Type': 'text/event-stream' }), status: 200, - statusText: 'OK', + statusText: 'OK' } as Response); - const func = httpsCallable, string, string>(functions, 'stream'); + const func = httpsCallable, string, string>( + functions, + 'stream' + ); const streamResult = await func.stream({}); let errorThrown = false; @@ -427,12 +456,14 @@ describe('Firebase Functions > Stream', () => { } } catch (error) { errorThrown = true; - expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/invalid-argument`); + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/invalid-argument` + ); expect((error as FunctionsError).message).to.equal('Invalid input'); } expect(errorThrown).to.be.true; - await expectError(streamResult.data, "invalid-argument", "Invalid input"); + await expectError(streamResult.data, 'invalid-argument', 'Invalid input'); }); it('includes authentication and app check tokens in request headers', async () => { @@ -461,12 +492,20 @@ describe('Firebase Functions > Stream', () => { ) ); - const functions = createTestService(app, region, authProvider, undefined, appCheckProvider); + const functions = createTestService( + app, + region, + authProvider, + undefined, + appCheckProvider + ); const mockFetch = sinon.stub(functions, 'fetchImpl' as any); const mockResponse = new ReadableStream({ start(controller) { - controller.enqueue(new TextEncoder().encode('data: {"result":"Success"}\n')); + controller.enqueue( + new TextEncoder().encode('data: {"result":"Success"}\n') + ); controller.close(); } }); @@ -475,10 +514,13 @@ describe('Firebase Functions > Stream', () => { body: mockResponse, headers: new Headers({ 'Content-Type': 'text/event-stream' }), status: 200, - statusText: 'OK', + statusText: 'OK' } as Response); - const func = httpsCallable, string, string>(functions, 'stream'); + const func = httpsCallable, string, string>( + functions, + 'stream' + ); await func.stream({}); expect(mockFetch.calledOnce).to.be.true; @@ -501,7 +543,10 @@ describe('Firebase Functions > Stream', () => { }); mockFetch.returns(fetchPromise); - const func = httpsCallable, string, string>(functions, 'streamTest'); + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); const streamPromise = func.stream({}, { signal: controller.signal }); controller.abort(); @@ -521,10 +566,12 @@ describe('Firebase Functions > Stream', () => { } } catch (error) { errorThrown = true; - expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/cancelled` + ); } expect(errorThrown).to.be.true; - await expectError(streamResult.data, "cancelled", "Request was cancelled."); + await expectError(streamResult.data, 'cancelled', 'Request was cancelled.'); }); it('aborts during streaming', async () => { @@ -532,12 +579,18 @@ describe('Firebase Functions > Stream', () => { const mockResponse = new ReadableStream({ async start(controller) { - controller.enqueue(new TextEncoder().encode('data: {"message":"First"}\n')); + controller.enqueue( + new TextEncoder().encode('data: {"message":"First"}\n') + ); // Add delay to simulate network latency await new Promise(resolve => setTimeout(resolve, 50)); - controller.enqueue(new TextEncoder().encode('data: {"message":"Second"}\n')); + controller.enqueue( + new TextEncoder().encode('data: {"message":"Second"}\n') + ); await new Promise(resolve => setTimeout(resolve, 50)); - controller.enqueue(new TextEncoder().encode('data: {"result":"Final"}\n')); + controller.enqueue( + new TextEncoder().encode('data: {"result":"Final"}\n') + ); controller.close(); } }); @@ -546,10 +599,13 @@ describe('Firebase Functions > Stream', () => { body: mockResponse, headers: new Headers({ 'Content-Type': 'text/event-stream' }), status: 200, - statusText: 'OK', + statusText: 'OK' } as Response); - const func = httpsCallable, string, string>(functions, 'streamTest'); + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); const streamResult = await func.stream({}, { signal: controller.signal }); const messages: string[] = []; @@ -563,10 +619,12 @@ describe('Firebase Functions > Stream', () => { } throw new Error('Stream should have been aborted'); } catch (error) { - expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/cancelled` + ); } expect(messages).to.deep.equal(['First']); - await expectError(streamResult.data, "cancelled", "Request was cancelled."); + await expectError(streamResult.data, 'cancelled', 'Request was cancelled.'); }); it('fails immediately with pre-aborted signal', async () => { @@ -578,7 +636,10 @@ describe('Firebase Functions > Stream', () => { } return Promise.resolve(new Response()); }); - const func = httpsCallable, string, string>(functions, 'streamTest'); + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); const streamResult = await func.stream({}, { signal: AbortSignal.abort() }); let errorThrown = false; @@ -588,10 +649,12 @@ describe('Firebase Functions > Stream', () => { } } catch (error) { errorThrown = true; - expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/cancelled` + ); } expect(errorThrown).to.be.true; - await expectError(streamResult.data, "cancelled", "Request was cancelled."); + await expectError(streamResult.data, 'cancelled', 'Request was cancelled.'); }); it('properly handles AbortSignal.timeout()', async () => { @@ -612,7 +675,10 @@ describe('Firebase Functions > Stream', () => { return new Response(); }); - const func = httpsCallable, string, string>(functions, 'streamTest'); + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); const streamResult = await func.stream({}, { signal }); try { @@ -621,8 +687,10 @@ describe('Firebase Functions > Stream', () => { } throw new Error('Stream should have timed out'); } catch (error) { - expect((error as FunctionsError).code).to.equal(`${FUNCTIONS_TYPE}/cancelled`); + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/cancelled` + ); } - await expectError(streamResult.data, "cancelled", "Request was cancelled."); + await expectError(streamResult.data, 'cancelled', 'Request was cancelled.'); }); }); diff --git a/packages/functions/src/public-types.ts b/packages/functions/src/public-types.ts index c9da0836e5c..9429c44cdfa 100644 --- a/packages/functions/src/public-types.ts +++ b/packages/functions/src/public-types.ts @@ -31,7 +31,10 @@ export interface HttpsCallableResult { * An `HttpsCallableStreamResult` wraps a single streaming result from a function call. * @public */ -export interface HttpsCallableStreamResult { +export interface HttpsCallableStreamResult< + ResponseData = unknown, + StreamData = unknown +> { readonly data: Promise; readonly stream: AsyncIterable; } @@ -41,9 +44,16 @@ export interface HttpsCallableStreamResult { +export interface HttpsCallable< + RequestData = unknown, + ResponseData = unknown, + StreamData = unknown +> { (data?: RequestData | null): Promise>; - stream: (data?: RequestData | null, options?: HttpsCallableStreamOptions) => Promise>; + stream: ( + data?: RequestData | null, + options?: HttpsCallableStreamOptions + ) => Promise>; } /** @@ -64,7 +74,6 @@ export interface HttpsCallableOptions { limitedUseAppCheckTokens?: boolean; } - /** * An interface for metadata about how stream call should be executed. * @public @@ -74,11 +83,9 @@ export interface HttpsCallableStreamOptions { * An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, * both the underlying connection and stream will be terminated. */ - signal?: AbortSignal + signal?: AbortSignal; } - - /** * A `Functions` instance. * @public diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 76f110f9140..7c8fa546157 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -109,7 +109,7 @@ export class FunctionsService implements _FirebaseService { messagingProvider: Provider, appCheckProvider: Provider, regionOrCustomDomain: string = DEFAULT_REGION, - readonly fetchImpl: typeof fetch = (...args) => fetch(...args), + readonly fetchImpl: typeof fetch = (...args) => fetch(...args) ) { this.contextProvider = new ContextProvider( authProvider, @@ -186,11 +186,16 @@ export function httpsCallable( name: string, options?: HttpsCallableOptions ): HttpsCallable { - const callable = (data?: RequestData | null): Promise => { + const callable = ( + data?: RequestData | null + ): Promise => { return call(functionsInstance, name, data, options || {}); }; - callable.stream = (data?: RequestData | null, options?: HttpsCallableStreamOptions) => { + callable.stream = ( + data?: RequestData | null, + options?: HttpsCallableStreamOptions + ) => { return stream(functionsInstance, name, data, options); }; @@ -202,16 +207,25 @@ export function httpsCallable( * @param url - The url of the trigger. * @public */ -export function httpsCallableFromURL( +export function httpsCallableFromURL< + RequestData, + ResponseData, + StreamData = unknown +>( functionsInstance: FunctionsService, url: string, options?: HttpsCallableOptions ): HttpsCallable { - const callable = (data?: RequestData | null): Promise => { + const callable = ( + data?: RequestData | null + ): Promise => { return callAtURL(functionsInstance, url, data, options || {}); }; - callable.stream = (data?: RequestData | null, options?: HttpsCallableStreamOptions) => { + callable.stream = ( + data?: RequestData | null, + options?: HttpsCallableStreamOptions + ) => { return streamAtURL(functionsInstance, url, options); }; return callable as HttpsCallable; @@ -410,10 +424,7 @@ async function streamAtURL( }); } catch (e) { if (e instanceof Error && e.name === 'AbortError') { - const error = new FunctionsError( - 'cancelled', - 'Request was cancelled.' - ); + const error = new FunctionsError('cancelled', 'Request was cancelled.'); return { data: Promise.reject(error), stream: { @@ -453,10 +464,7 @@ async function streamAtURL( resultRejecter = reject; }); options?.signal?.addEventListener('abort', () => { - const error = new FunctionsError( - 'cancelled', - 'Request was cancelled.' - ); + const error = new FunctionsError('cancelled', 'Request was cancelled.'); resultRejecter(error); }); const reader = response.body!.getReader(); @@ -482,7 +490,7 @@ async function streamAtURL( }; } }, - data: resultPromise, + data: resultPromise }; } @@ -492,7 +500,10 @@ function createResponseStream( resultRejecter: (reason: unknown) => void, signal?: AbortSignal ): ReadableStream { - const processLine = (line: string, controller: ReadableStreamDefaultController): void => { + const processLine = ( + line: string, + controller: ReadableStreamDefaultController + ): void => { const match = line.match(responseLineRE); // ignore all other lines (newline, comments, etc.) if (!match) { @@ -532,7 +543,10 @@ function createResponseStream( return pump(); async function pump(): Promise { if (signal?.aborted) { - const error = new FunctionsError('cancelled', 'Request was cancelled'); + const error = new FunctionsError( + 'cancelled', + 'Request was cancelled' + ); controller.error(error); resultRejecter(error); return Promise.resolve(); @@ -547,14 +561,17 @@ function createResponseStream( return; } if (signal?.aborted) { - const error = new FunctionsError('cancelled', 'Request was cancelled'); + const error = new FunctionsError( + 'cancelled', + 'Request was cancelled' + ); controller.error(error); resultRejecter(error); await reader.cancel(); return; } currentText += decoder.decode(value, { stream: true }); - const lines = currentText.split("\n"); + const lines = currentText.split('\n'); currentText = lines.pop() || ''; for (const line of lines) { if (line.trim()) { @@ -563,12 +580,13 @@ function createResponseStream( } return pump(); } catch (error) { - const functionsError = error instanceof FunctionsError - ? error - : _errorForResponse(0, null); + const functionsError = + error instanceof FunctionsError + ? error + : _errorForResponse(0, null); controller.error(functionsError); resultRejecter(functionsError); - }; + } } }, cancel() { From fd2cb151033fdab8f4f961a0a99af9091c9bb23b Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 22 Nov 2024 15:54:53 -0800 Subject: [PATCH 21/30] Generate updated docs. --- docs-devsite/_toc.yaml | 6 +++ docs-devsite/functions.httpscallable.md | 33 +++++++++++++++ .../functions.httpscallablestreamoptions.md | 35 ++++++++++++++++ .../functions.httpscallablestreamresult.md | 42 +++++++++++++++++++ docs-devsite/functions.md | 22 ++++------ 5 files changed, 123 insertions(+), 15 deletions(-) create mode 100644 docs-devsite/functions.httpscallable.md create mode 100644 docs-devsite/functions.httpscallablestreamoptions.md create mode 100644 docs-devsite/functions.httpscallablestreamresult.md diff --git a/docs-devsite/_toc.yaml b/docs-devsite/_toc.yaml index a27f2832eb7..9d60c12906c 100644 --- a/docs-devsite/_toc.yaml +++ b/docs-devsite/_toc.yaml @@ -375,10 +375,16 @@ toc: path: /docs/reference/js/functions.functions.md - title: FunctionsError path: /docs/reference/js/functions.functionserror.md + - title: HttpsCallable + path: /docs/reference/js/functions.httpscallable.md - title: HttpsCallableOptions path: /docs/reference/js/functions.httpscallableoptions.md - title: HttpsCallableResult path: /docs/reference/js/functions.httpscallableresult.md + - title: HttpsCallableStreamOptions + path: /docs/reference/js/functions.httpscallablestreamoptions.md + - title: HttpsCallableStreamResult + path: /docs/reference/js/functions.httpscallablestreamresult.md - title: installations path: /docs/reference/js/installations.md section: diff --git a/docs-devsite/functions.httpscallable.md b/docs-devsite/functions.httpscallable.md new file mode 100644 index 00000000000..22bbca620cb --- /dev/null +++ b/docs-devsite/functions.httpscallable.md @@ -0,0 +1,33 @@ +Project: /docs/reference/js/_project.yaml +Book: /docs/reference/_book.yaml +page_type: reference + +{% comment %} +DO NOT EDIT THIS FILE! +This is generated by the JS SDK team, and any local changes will be +overwritten. Changes should be made in the source code at +https://github.com/firebase/firebase-js-sdk +{% endcomment %} + +# HttpsCallable interface +A reference to a "callable" HTTP trigger in Google Cloud Functions. + +Signature: + +```typescript +export interface HttpsCallable +``` + +## Properties + +| Property | Type | Description | +| --- | --- | --- | +| [stream](./functions.httpscallable.md#httpscallablestream) | (data?: RequestData \| null, options?: [HttpsCallableStreamOptions](./functions.httpscallablestreamoptions.md#httpscallablestreamoptions_interface)) => Promise<[HttpsCallableStreamResult](./functions.httpscallablestreamresult.md#httpscallablestreamresult_interface)<ResponseData, StreamData>> | | + +## HttpsCallable.stream + +Signature: + +```typescript +stream: (data?: RequestData | null, options?: HttpsCallableStreamOptions) => Promise>; +``` diff --git a/docs-devsite/functions.httpscallablestreamoptions.md b/docs-devsite/functions.httpscallablestreamoptions.md new file mode 100644 index 00000000000..948b229d8f7 --- /dev/null +++ b/docs-devsite/functions.httpscallablestreamoptions.md @@ -0,0 +1,35 @@ +Project: /docs/reference/js/_project.yaml +Book: /docs/reference/_book.yaml +page_type: reference + +{% comment %} +DO NOT EDIT THIS FILE! +This is generated by the JS SDK team, and any local changes will be +overwritten. Changes should be made in the source code at +https://github.com/firebase/firebase-js-sdk +{% endcomment %} + +# HttpsCallableStreamOptions interface +An interface for metadata about how stream call should be executed. + +Signature: + +```typescript +export interface HttpsCallableStreamOptions +``` + +## Properties + +| Property | Type | Description | +| --- | --- | --- | +| [signal](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionssignal) | AbortSignal | An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, both the underlying connection and stream will be terminated. | + +## HttpsCallableStreamOptions.signal + +An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, both the underlying connection and stream will be terminated. + +Signature: + +```typescript +signal?: AbortSignal; +``` diff --git a/docs-devsite/functions.httpscallablestreamresult.md b/docs-devsite/functions.httpscallablestreamresult.md new file mode 100644 index 00000000000..ba0d852041b --- /dev/null +++ b/docs-devsite/functions.httpscallablestreamresult.md @@ -0,0 +1,42 @@ +Project: /docs/reference/js/_project.yaml +Book: /docs/reference/_book.yaml +page_type: reference + +{% comment %} +DO NOT EDIT THIS FILE! +This is generated by the JS SDK team, and any local changes will be +overwritten. Changes should be made in the source code at +https://github.com/firebase/firebase-js-sdk +{% endcomment %} + +# HttpsCallableStreamResult interface +An `HttpsCallableStreamResult` wraps a single streaming result from a function call. + +Signature: + +```typescript +export interface HttpsCallableStreamResult +``` + +## Properties + +| Property | Type | Description | +| --- | --- | --- | +| [data](./functions.httpscallablestreamresult.md#httpscallablestreamresultdata) | Promise<ResponseData> | | +| [stream](./functions.httpscallablestreamresult.md#httpscallablestreamresultstream) | AsyncIterable<StreamData> | | + +## HttpsCallableStreamResult.data + +Signature: + +```typescript +readonly data: Promise; +``` + +## HttpsCallableStreamResult.stream + +Signature: + +```typescript +readonly stream: AsyncIterable; +``` diff --git a/docs-devsite/functions.md b/docs-devsite/functions.md index 4887fcd4911..e1856319a5c 100644 --- a/docs-devsite/functions.md +++ b/docs-devsite/functions.md @@ -34,8 +34,11 @@ Cloud Functions for Firebase | Interface | Description | | --- | --- | | [Functions](./functions.functions.md#functions_interface) | A Functions instance. | +| [HttpsCallable](./functions.httpscallable.md#httpscallable_interface) | A reference to a "callable" HTTP trigger in Google Cloud Functions. | | [HttpsCallableOptions](./functions.httpscallableoptions.md#httpscallableoptions_interface) | An interface for metadata about how calls should be executed. | | [HttpsCallableResult](./functions.httpscallableresult.md#httpscallableresult_interface) | An HttpsCallableResult wraps a single result from a function call. | +| [HttpsCallableStreamOptions](./functions.httpscallablestreamoptions.md#httpscallablestreamoptions_interface) | An interface for metadata about how stream call should be executed. | +| [HttpsCallableStreamResult](./functions.httpscallablestreamresult.md#httpscallablestreamresult_interface) | An HttpsCallableStreamResult wraps a single streaming result from a function call. | ## Type Aliases @@ -43,7 +46,6 @@ Cloud Functions for Firebase | --- | --- | | [FunctionsErrorCode](./functions.md#functionserrorcode) | The set of Firebase Functions status codes. The codes are the same at the ones exposed by gRPC here: https://github.com/grpc/grpc/blob/master/doc/statuscodes.mdPossible values: - 'cancelled': The operation was cancelled (typically by the caller). - 'unknown': Unknown error or an error from a different error domain. - 'invalid-argument': Client specified an invalid argument. Note that this differs from 'failed-precondition'. 'invalid-argument' indicates arguments that are problematic regardless of the state of the system (e.g. an invalid field name). - 'deadline-exceeded': Deadline expired before operation could complete. For operations that change the state of the system, this error may be returned even if the operation has completed successfully. For example, a successful response from a server could have been delayed long enough for the deadline to expire. - 'not-found': Some requested document was not found. - 'already-exists': Some document that we attempted to create already exists. - 'permission-denied': The caller does not have permission to execute the specified operation. - 'resource-exhausted': Some resource has been exhausted, perhaps a per-user quota, or perhaps the entire file system is out of space. - 'failed-precondition': Operation was rejected because the system is not in a state required for the operation's execution. - 'aborted': The operation was aborted, typically due to a concurrency issue like transaction aborts, etc. - 'out-of-range': Operation was attempted past the valid range. - 'unimplemented': Operation is not implemented or not supported/enabled. - 'internal': Internal errors. Means some invariants expected by underlying system has been broken. If you see one of these errors, something is very broken. - 'unavailable': The service is currently unavailable. This is most likely a transient condition and may be corrected by retrying with a backoff. - 'data-loss': Unrecoverable data loss or corruption. - 'unauthenticated': The request does not have valid authentication credentials for the operation. | | [FunctionsErrorCodeCore](./functions.md#functionserrorcodecore) | Functions error code string appended after "functions/" product prefix. See [FunctionsErrorCode](./functions.md#functionserrorcode) for full documentation of codes. | -| [HttpsCallable](./functions.md#httpscallable) | A reference to a "callable" HTTP trigger in Google Cloud Functions. | ## function(app, ...) @@ -101,7 +103,7 @@ Returns a reference to the callable HTTPS trigger with the given name. Signature: ```typescript -export declare function httpsCallable(functionsInstance: Functions, name: string, options?: HttpsCallableOptions): HttpsCallable; +export declare function httpsCallable(functionsInstance: Functions, name: string, options?: HttpsCallableOptions): HttpsCallable; ``` #### Parameters @@ -114,7 +116,7 @@ export declare function httpsCallableReturns: -[HttpsCallable](./functions.md#httpscallable)<RequestData, ResponseData> +[HttpsCallable](./functions.httpscallable.md#httpscallable_interface)<RequestData, ResponseData, StreamData> ### httpsCallableFromURL(functionsInstance, url, options) {:#httpscallablefromurl_7af6987} @@ -123,7 +125,7 @@ Returns a reference to the callable HTTPS trigger with the specified url. Signature: ```typescript -export declare function httpsCallableFromURL(functionsInstance: Functions, url: string, options?: HttpsCallableOptions): HttpsCallable; +export declare function httpsCallableFromURL(functionsInstance: Functions, url: string, options?: HttpsCallableOptions): HttpsCallable; ``` #### Parameters @@ -136,7 +138,7 @@ export declare function httpsCallableFromURLReturns: -[HttpsCallable](./functions.md#httpscallable)<RequestData, ResponseData> +[HttpsCallable](./functions.httpscallable.md#httpscallable_interface)<RequestData, ResponseData, StreamData> ## FunctionsErrorCode @@ -159,13 +161,3 @@ Functions error code string appended after "functions/" product prefix. See [Fun ```typescript export type FunctionsErrorCodeCore = 'ok' | 'cancelled' | 'unknown' | 'invalid-argument' | 'deadline-exceeded' | 'not-found' | 'already-exists' | 'permission-denied' | 'resource-exhausted' | 'failed-precondition' | 'aborted' | 'out-of-range' | 'unimplemented' | 'internal' | 'unavailable' | 'data-loss' | 'unauthenticated'; ``` - -## HttpsCallable - -A reference to a "callable" HTTP trigger in Google Cloud Functions. - -Signature: - -```typescript -export type HttpsCallable = (data?: RequestData | null) => Promise>; -``` From 9a85bb251fc36748ba8baf5ab9cdf91c33dc0a39 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 22 Nov 2024 16:08:10 -0800 Subject: [PATCH 22/30] Refactor code to make auth headers. --- packages/functions/src/public-types.ts | 6 +++ packages/functions/src/service.ts | 53 +++++++++++++------------- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/packages/functions/src/public-types.ts b/packages/functions/src/public-types.ts index 9429c44cdfa..1ff321252f2 100644 --- a/packages/functions/src/public-types.ts +++ b/packages/functions/src/public-types.ts @@ -84,6 +84,12 @@ export interface HttpsCallableStreamOptions { * both the underlying connection and stream will be terminated. */ signal?: AbortSignal; + /** + * If set to true, uses limited-use App Check token for callable function requests from this + * instance of {@link Functions}. You must use limited-use tokens to call functions with + * replay protection enabled. By default, this is false. + */ + limitedUseAppCheckTokens?: boolean; } /** diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 7c8fa546157..8355597970a 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -226,7 +226,7 @@ export function httpsCallableFromURL< data?: RequestData | null, options?: HttpsCallableStreamOptions ) => { - return streamAtURL(functionsInstance, url, options); + return streamAtURL(functionsInstance, url, options || {}); }; return callable as HttpsCallable; } @@ -275,6 +275,26 @@ async function postJSON( }; } +async function makeAuthHeaders( + functionsInstance: FunctionsService, + options: HttpsCallableOptions +) { + const headers: Record = {}; + const context = await functionsInstance.contextProvider.getContext( + options.limitedUseAppCheckTokens + ); + if (context.authToken) { + headers['Authorization'] = 'Bearer ' + context.authToken; + } + if (context.messagingToken) { + headers['Firebase-Instance-ID-Token'] = context.messagingToken; + } + if (context.appCheckToken !== null) { + headers['X-Firebase-AppCheck'] = context.appCheckToken; + } + return headers; +} + /** * Calls a callable function asynchronously and returns the result. * @param name The name of the callable trigger. @@ -306,19 +326,7 @@ async function callAtURL( const body = { data }; // Add a header for the authToken. - const headers: { [key: string]: string } = {}; - const context = await functionsInstance.contextProvider.getContext( - options.limitedUseAppCheckTokens - ); - if (context.authToken) { - headers['Authorization'] = 'Bearer ' + context.authToken; - } - if (context.messagingToken) { - headers['Firebase-Instance-ID-Token'] = context.messagingToken; - } - if (context.appCheckToken !== null) { - headers['X-Firebase-AppCheck'] = context.appCheckToken; - } + const headers = await makeAuthHeaders(functionsInstance, options); // Default timeout to 70s, but let the options override it. const timeout = options.timeout || 70000; @@ -381,7 +389,7 @@ function stream( options?: HttpsCallableStreamOptions ): Promise { const url = functionsInstance._url(name); - return streamAtURL(functionsInstance, url, data, options); + return streamAtURL(functionsInstance, url, data, options || {}); } /** @@ -394,23 +402,14 @@ async function streamAtURL( functionsInstance: FunctionsService, url: string, data: unknown, - options?: HttpsCallableStreamOptions + options: HttpsCallableStreamOptions ): Promise { // Encode any special types, such as dates, in the input data. data = encode(data); const body = { data }; + // // Add a header for the authToken. - const headers: { [key: string]: string } = {}; - const context = await functionsInstance.contextProvider.getContext(); - if (context.authToken) { - headers['Authorization'] = 'Bearer ' + context.authToken; - } - if (context.messagingToken) { - headers['Firebase-Instance-ID-Token'] = context.messagingToken; - } - if (context.appCheckToken !== null) { - headers['X-Firebase-AppCheck'] = context.appCheckToken; - } + const headers = await makeAuthHeaders(functionsInstance, options); headers['Content-Type'] = 'application/json'; headers['Accept'] = 'text/event-stream'; From f581b610e92bba9be2e27ddfea71e5f55a33357e Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Fri, 22 Nov 2024 16:16:32 -0800 Subject: [PATCH 23/30] Fix misc. --- packages/functions/src/service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 8355597970a..1a74f8891e7 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -226,7 +226,7 @@ export function httpsCallableFromURL< data?: RequestData | null, options?: HttpsCallableStreamOptions ) => { - return streamAtURL(functionsInstance, url, options || {}); + return streamAtURL(functionsInstance, url, data, options || {}); }; return callable as HttpsCallable; } @@ -278,7 +278,7 @@ async function postJSON( async function makeAuthHeaders( functionsInstance: FunctionsService, options: HttpsCallableOptions -) { +): Promise> { const headers: Record = {}; const context = await functionsInstance.contextProvider.getContext( options.limitedUseAppCheckTokens From b9a42e58985527771403aa353edb55fe625ae40d Mon Sep 17 00:00:00 2001 From: taeold Date: Sat, 23 Nov 2024 00:27:54 +0000 Subject: [PATCH 24/30] Update API reports --- common/api-review/functions.api.md | 1 + 1 file changed, 1 insertion(+) diff --git a/common/api-review/functions.api.md b/common/api-review/functions.api.md index da89bb94e17..883bde3bc0d 100644 --- a/common/api-review/functions.api.md +++ b/common/api-review/functions.api.md @@ -61,6 +61,7 @@ export interface HttpsCallableResult { // @public export interface HttpsCallableStreamOptions { + limitedUseAppCheckTokens?: boolean; signal?: AbortSignal; } From d0cc9071bddc32e9ba882a7c90372fc7667f54d3 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Mon, 2 Dec 2024 20:37:46 -0800 Subject: [PATCH 25/30] Revert uninteded changes to untouched test case. --- common/api-review/functions.api.md | 1 + .../functions.httpscallablestreamoptions.md | 11 +++++++++++ packages/functions/src/callable.test.ts | 16 ++++++---------- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/common/api-review/functions.api.md b/common/api-review/functions.api.md index da89bb94e17..883bde3bc0d 100644 --- a/common/api-review/functions.api.md +++ b/common/api-review/functions.api.md @@ -61,6 +61,7 @@ export interface HttpsCallableResult { // @public export interface HttpsCallableStreamOptions { + limitedUseAppCheckTokens?: boolean; signal?: AbortSignal; } diff --git a/docs-devsite/functions.httpscallablestreamoptions.md b/docs-devsite/functions.httpscallablestreamoptions.md index 948b229d8f7..b0088e66db7 100644 --- a/docs-devsite/functions.httpscallablestreamoptions.md +++ b/docs-devsite/functions.httpscallablestreamoptions.md @@ -22,8 +22,19 @@ export interface HttpsCallableStreamOptions | Property | Type | Description | | --- | --- | --- | +| [limitedUseAppCheckTokens](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionslimiteduseappchecktokens) | boolean | If set to true, uses limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. | | [signal](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionssignal) | AbortSignal | An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, both the underlying connection and stream will be terminated. | +## HttpsCallableStreamOptions.limitedUseAppCheckTokens + +If set to true, uses limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. + +Signature: + +```typescript +limitedUseAppCheckTokens?: boolean; +``` + ## HttpsCallableStreamOptions.signal An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, both the underlying connection and stream will be terminated. diff --git a/packages/functions/src/callable.test.ts b/packages/functions/src/callable.test.ts index 26d45c32ac8..b969304c89e 100644 --- a/packages/functions/src/callable.test.ts +++ b/packages/functions/src/callable.test.ts @@ -97,17 +97,13 @@ describe('Firebase Functions > Call', () => { Record, { message: string; code: number; long: number } >(functions, 'dataTest'); - try { - const result = await func(data); + const result = await func(data); - expect(result.data).to.deep.equal({ - message: 'stub response', - code: 42, - long: 420 - }); - } catch (err) { - console.error(err); - } + expect(result.data).to.deep.equal({ + message: 'stub response', + code: 42, + long: 420 + }); }); it('scalars', async () => { From f773110eaac792a14bc2a7454486dcdc9455e663 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Mon, 2 Dec 2024 20:38:04 -0800 Subject: [PATCH 26/30] Add docstring comments to utility functions. --- packages/functions/src/service.ts | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 1a74f8891e7..19a4eb042fd 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -275,6 +275,12 @@ async function postJSON( }; } +/** + * Creates authorization headers for Firebase Functions requests. + * @param functionsInstance The Firebase Functions service instance. + * @param options Options for the callable function, including AppCheck token settings. + * @return A Promise that resolves a headers map to include in outgoing fetch request. + */ async function makeAuthHeaders( functionsInstance: FunctionsService, options: HttpsCallableOptions @@ -493,6 +499,21 @@ async function streamAtURL( }; } +/** + * Creates a ReadableStream that processes a streaming response from a streaming + * callable function that returns data in server-sent event format. + * + * @param reader The underlying reader providing raw response data + * @param resultResolver Callback to resolve the final result when received + * @param resultRejecter Callback to reject with an error if encountered + * @param signal Optional AbortSignal to cancel the stream processing + * @returns A ReadableStream that emits decoded messages from the response + * + * The returned ReadableStream: + * 1. Emits individual messages when "message" data is received + * 2. Resolves with the final result when a "result" message is received + * 3. Rejects with an error if an "error" message is received + */ function createResponseStream( reader: ReadableStreamDefaultReader, resultResolver: (value: unknown) => void, From 9ae31c075311e3afae7f8854424f8e9c16b3d98a Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Mon, 2 Dec 2024 20:44:51 -0800 Subject: [PATCH 27/30] Correct documentation on AbortSignal. --- packages/functions/src/public-types.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/functions/src/public-types.ts b/packages/functions/src/public-types.ts index 1ff321252f2..e92687b33d5 100644 --- a/packages/functions/src/public-types.ts +++ b/packages/functions/src/public-types.ts @@ -81,7 +81,7 @@ export interface HttpsCallableOptions { export interface HttpsCallableStreamOptions { /** * An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, - * both the underlying connection and stream will be terminated. + * the underlying http connection will be terminated. */ signal?: AbortSignal; /** From 327c1cb4a9796a2b4572d4089f507ed634d237b1 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Mon, 2 Dec 2024 20:47:01 -0800 Subject: [PATCH 28/30] Run formatter --- packages/functions/src/service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 19a4eb042fd..ec459472b5a 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -508,7 +508,7 @@ async function streamAtURL( * @param resultRejecter Callback to reject with an error if encountered * @param signal Optional AbortSignal to cancel the stream processing * @returns A ReadableStream that emits decoded messages from the response - * + * * The returned ReadableStream: * 1. Emits individual messages when "message" data is received * 2. Resolves with the final result when a "result" message is received From 5fb40f174496d6a0cd16dee830d0fb087547de93 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Mon, 2 Dec 2024 20:49:56 -0800 Subject: [PATCH 29/30] Update docs. --- docs-devsite/functions.httpscallablestreamoptions.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs-devsite/functions.httpscallablestreamoptions.md b/docs-devsite/functions.httpscallablestreamoptions.md index b0088e66db7..35eb2dea55b 100644 --- a/docs-devsite/functions.httpscallablestreamoptions.md +++ b/docs-devsite/functions.httpscallablestreamoptions.md @@ -23,7 +23,7 @@ export interface HttpsCallableStreamOptions | Property | Type | Description | | --- | --- | --- | | [limitedUseAppCheckTokens](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionslimiteduseappchecktokens) | boolean | If set to true, uses limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. | -| [signal](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionssignal) | AbortSignal | An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, both the underlying connection and stream will be terminated. | +| [signal](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionssignal) | AbortSignal | An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, the underlying http connection will be terminated. | ## HttpsCallableStreamOptions.limitedUseAppCheckTokens @@ -37,7 +37,7 @@ limitedUseAppCheckTokens?: boolean; ## HttpsCallableStreamOptions.signal -An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, both the underlying connection and stream will be terminated. +An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, the underlying http connection will be terminated. Signature: From 9ea463dee97ee6a5bf8127838f4f82b5a8109111 Mon Sep 17 00:00:00 2001 From: Daniel Young Lee Date: Tue, 3 Dec 2024 12:29:05 -0800 Subject: [PATCH 30/30] Update docs per comments. --- docs-devsite/functions.httpscallable.md | 2 +- docs-devsite/functions.httpscallableoptions.md | 4 ++-- docs-devsite/functions.httpscallablestreamoptions.md | 10 +++++----- docs-devsite/functions.md | 4 ++-- packages/functions/src/public-types.ts | 12 ++++++------ 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/docs-devsite/functions.httpscallable.md b/docs-devsite/functions.httpscallable.md index 22bbca620cb..3b9d70f3a05 100644 --- a/docs-devsite/functions.httpscallable.md +++ b/docs-devsite/functions.httpscallable.md @@ -10,7 +10,7 @@ https://github.com/firebase/firebase-js-sdk {% endcomment %} # HttpsCallable interface -A reference to a "callable" HTTP trigger in Google Cloud Functions. +A reference to a "callable" HTTP trigger in Cloud Functions. Signature: diff --git a/docs-devsite/functions.httpscallableoptions.md b/docs-devsite/functions.httpscallableoptions.md index b4a261918ce..22933a2f1f0 100644 --- a/docs-devsite/functions.httpscallableoptions.md +++ b/docs-devsite/functions.httpscallableoptions.md @@ -22,12 +22,12 @@ export interface HttpsCallableOptions | Property | Type | Description | | --- | --- | --- | -| [limitedUseAppCheckTokens](./functions.httpscallableoptions.md#httpscallableoptionslimiteduseappchecktokens) | boolean | If set to true, uses limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. | +| [limitedUseAppCheckTokens](./functions.httpscallableoptions.md#httpscallableoptionslimiteduseappchecktokens) | boolean | If set to true, uses a limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. | | [timeout](./functions.httpscallableoptions.md#httpscallableoptionstimeout) | number | Time in milliseconds after which to cancel if there is no response. Default is 70000. | ## HttpsCallableOptions.limitedUseAppCheckTokens -If set to true, uses limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. +If set to true, uses a limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. Signature: diff --git a/docs-devsite/functions.httpscallablestreamoptions.md b/docs-devsite/functions.httpscallablestreamoptions.md index 35eb2dea55b..6c790c7e0f9 100644 --- a/docs-devsite/functions.httpscallablestreamoptions.md +++ b/docs-devsite/functions.httpscallablestreamoptions.md @@ -10,7 +10,7 @@ https://github.com/firebase/firebase-js-sdk {% endcomment %} # HttpsCallableStreamOptions interface -An interface for metadata about how stream call should be executed. +An interface for metadata about how a stream call should be executed. Signature: @@ -22,12 +22,12 @@ export interface HttpsCallableStreamOptions | Property | Type | Description | | --- | --- | --- | -| [limitedUseAppCheckTokens](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionslimiteduseappchecktokens) | boolean | If set to true, uses limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. | -| [signal](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionssignal) | AbortSignal | An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, the underlying http connection will be terminated. | +| [limitedUseAppCheckTokens](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionslimiteduseappchecktokens) | boolean | If set to true, uses a limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. | +| [signal](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionssignal) | AbortSignal | An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, the underlying HTTP connection will be terminated. | ## HttpsCallableStreamOptions.limitedUseAppCheckTokens -If set to true, uses limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. +If set to true, uses a limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. Signature: @@ -37,7 +37,7 @@ limitedUseAppCheckTokens?: boolean; ## HttpsCallableStreamOptions.signal -An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, the underlying http connection will be terminated. +An `AbortSignal` that can be used to cancel the streaming response. When the signal is aborted, the underlying HTTP connection will be terminated. Signature: diff --git a/docs-devsite/functions.md b/docs-devsite/functions.md index e1856319a5c..7e2eefa1569 100644 --- a/docs-devsite/functions.md +++ b/docs-devsite/functions.md @@ -34,10 +34,10 @@ Cloud Functions for Firebase | Interface | Description | | --- | --- | | [Functions](./functions.functions.md#functions_interface) | A Functions instance. | -| [HttpsCallable](./functions.httpscallable.md#httpscallable_interface) | A reference to a "callable" HTTP trigger in Google Cloud Functions. | +| [HttpsCallable](./functions.httpscallable.md#httpscallable_interface) | A reference to a "callable" HTTP trigger in Cloud Functions. | | [HttpsCallableOptions](./functions.httpscallableoptions.md#httpscallableoptions_interface) | An interface for metadata about how calls should be executed. | | [HttpsCallableResult](./functions.httpscallableresult.md#httpscallableresult_interface) | An HttpsCallableResult wraps a single result from a function call. | -| [HttpsCallableStreamOptions](./functions.httpscallablestreamoptions.md#httpscallablestreamoptions_interface) | An interface for metadata about how stream call should be executed. | +| [HttpsCallableStreamOptions](./functions.httpscallablestreamoptions.md#httpscallablestreamoptions_interface) | An interface for metadata about how a stream call should be executed. | | [HttpsCallableStreamResult](./functions.httpscallablestreamresult.md#httpscallablestreamresult_interface) | An HttpsCallableStreamResult wraps a single streaming result from a function call. | ## Type Aliases diff --git a/packages/functions/src/public-types.ts b/packages/functions/src/public-types.ts index e92687b33d5..50b2d9a9e0c 100644 --- a/packages/functions/src/public-types.ts +++ b/packages/functions/src/public-types.ts @@ -40,7 +40,7 @@ export interface HttpsCallableStreamResult< } /** - * A reference to a "callable" HTTP trigger in Google Cloud Functions. + * A reference to a "callable" HTTP trigger in Cloud Functions. * @param data - Data to be passed to callable function. * @public */ @@ -67,7 +67,7 @@ export interface HttpsCallableOptions { */ timeout?: number; /** - * If set to true, uses limited-use App Check token for callable function requests from this + * If set to true, uses a limited-use App Check token for callable function requests from this * instance of {@link Functions}. You must use limited-use tokens to call functions with * replay protection enabled. By default, this is false. */ @@ -75,17 +75,17 @@ export interface HttpsCallableOptions { } /** - * An interface for metadata about how stream call should be executed. + * An interface for metadata about how a stream call should be executed. * @public */ export interface HttpsCallableStreamOptions { /** - * An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, - * the underlying http connection will be terminated. + * An `AbortSignal` that can be used to cancel the streaming response. When the signal is aborted, + * the underlying HTTP connection will be terminated. */ signal?: AbortSignal; /** - * If set to true, uses limited-use App Check token for callable function requests from this + * If set to true, uses a limited-use App Check token for callable function requests from this * instance of {@link Functions}. You must use limited-use tokens to call functions with * replay protection enabled. By default, this is false. */