From 4bd76af14be9231b4418afac51a044dc24d09413 Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Thu, 21 Nov 2024 12:17:42 -0400 Subject: [PATCH 1/8] feat: introduce fal cdn v3 with storage API --- libs/client/src/storage.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libs/client/src/storage.ts b/libs/client/src/storage.ts index adb1981..ebb4186 100644 --- a/libs/client/src/storage.ts +++ b/libs/client/src/storage.ts @@ -66,7 +66,8 @@ async function initiateUpload( file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`; return await dispatchRequest({ method: "POST", - targetUrl: `${getRestApiUrl()}/storage/upload/initiate`, + // NOTE: We want to test V3 without making it the default at the API level + targetUrl: `${getRestApiUrl()}/storage/upload/initiate?storage_type=fal-cdn-v3`, input: { content_type: contentType, file_name: filename, From 39239d20e06f6d199f81bb4ffa096550ab863f91 Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Mon, 25 Nov 2024 15:35:34 -0400 Subject: [PATCH 2/8] feat: automatically do multi-part upload on 90+ MB files --- libs/client/src/storage.ts | 165 +++++++++++++++++++++++++++++++++++-- package-lock.json | 6 ++ package.json | 1 + 3 files changed, 167 insertions(+), 5 deletions(-) diff --git a/libs/client/src/storage.ts b/libs/client/src/storage.ts index ebb4186..4721dd8 100644 --- a/libs/client/src/storage.ts +++ b/libs/client/src/storage.ts @@ -1,7 +1,7 @@ +import { Sema } from "async-sema"; import { getRestApiUrl, RequiredConfig } from "./config"; import { dispatchRequest } from "./request"; import { isPlainObject } from "./utils"; - /** * File support for the client. This interface establishes the contract for * uploading files to the server and transforming the input to replace file @@ -53,17 +53,15 @@ function getExtensionFromContentType(contentType: string): string { /** * Initiate the upload of a file to the server. This returns the URL to upload * the file to and the URL of the file once it is uploaded. - * - * @param file the file to upload - * @returns the URL to upload the file to and the URL of the file once it is uploaded. */ async function initiateUpload( file: Blob, config: RequiredConfig, + contentType: string, ): Promise { - const contentType = file.type || "application/octet-stream"; const filename = file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`; + return await dispatchRequest({ method: "POST", // NOTE: We want to test V3 without making it the default at the API level @@ -76,6 +74,155 @@ async function initiateUpload( }); } +/** + * Initiate the multipart upload of a file to the server. This returns the URL to upload + * the file to and the URL of the file once it is uploaded. + */ +async function initiateMultipartUpload( + file: Blob, + config: RequiredConfig, + contentType: string, +): Promise { + const filename = + file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`; + + return await dispatchRequest({ + method: "POST", + targetUrl: `${getRestApiUrl()}/storage/upload/initiate-multipart?storage_type=fal-cdn-v3`, + input: { + content_type: contentType, + file_name: filename, + }, + config, + }); +} + +async function partUploadRetries( + uploadUrl: string, + chunk: Blob, + config: RequiredConfig, + cancelled: { current: boolean }, + tries: number, +) { + if (cancelled.current) { + throw new Error("Part upload failed, upload cancelled"); + } + + if (tries === 0) { + throw new Error("Part upload failed, retries exhausted"); + } + + const { fetch, responseHandler } = config; + + try { + const response = await fetch(uploadUrl, { + method: "PUT", + body: chunk, + }); + await responseHandler(response); + return response; + } catch (error) { + console.error("Part upload failed, retrying", uploadUrl, error); + return await partUploadRetries( + uploadUrl, + chunk, + config, + cancelled, + tries - 1, + ); + } +} + +async function partUpload( + uploadUrl: string, + chunk: Blob, + config: RequiredConfig, + parallelUploads: Sema, + cancelled: { current: boolean }, +) { + await parallelUploads.acquire(); + + try { + return await partUploadRetries(uploadUrl, chunk, config, cancelled, 3); + } finally { + parallelUploads.release(); + } +} + +async function multipartUpload( + file: Blob, + config: RequiredConfig, +): Promise { + const { fetch, responseHandler } = config; + const contentType = file.type || "application/octet-stream"; + const { upload_url: uploadUrl, file_url: url } = + await initiateMultipartUpload(file, config, contentType); + + // Break the file into 10MB chunks + const chunkSize = 10 * 1024 * 1024; + const chunks = Math.ceil(file.size / chunkSize); + + const parsedUrl = new URL(uploadUrl); + + // Max 5 parallel uploads + const limitedParallelUploads = new Sema(3); + + const partPromises: Promise[] = []; + + // To be able to cancel the upload if any of the parts fail + const cancelled = { current: false }; + for (let i = 0; i < chunks; i++) { + const start = i * chunkSize; + const end = Math.min(start + chunkSize, file.size); + + const chunk = file.slice(start, end); + + const partNumber = i + 1; + // {uploadUrl}/{part_number}?uploadUrlParams=... + const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`; + + partPromises.push( + partUpload( + partUploadUrl, + chunk, + config, + limitedParallelUploads, + cancelled, + ), + ); + } + + let responses: Response[]; + try { + // Does this wait for all to finish even if an early one fails? + responses = await Promise.all(partPromises); + } catch (error) { + // If any of the parts fail, cancel other uploads + cancelled.current = true; + + console.error("Multipart upload failed, aborting upload", error); + throw error; + } + + // Complete the upload + const completeUrl = `${parsedUrl.origin}${parsedUrl.pathname}/complete${parsedUrl.search}`; + const response = await fetch(completeUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + parts: responses.map((response, index) => ({ + partNumber: index + 1, // Parts are 1-indexed + etag: response.headers.get("ETag"), + })), + }), + }); + await responseHandler(response); + + return url; +} + // eslint-disable-next-line @typescript-eslint/no-explicit-any type KeyValuePair = [string, any]; @@ -88,10 +235,18 @@ export function createStorageClient({ }: StorageClientDependencies): StorageClient { const ref: StorageClient = { upload: async (file: Blob) => { + // Check for 90+ MB file size to do multipart upload + if (file.size > 90 * 1024 * 1024) { + return await multipartUpload(file, config); + } + + const contentType = file.type || "application/octet-stream"; + const { fetch, responseHandler } = config; const { upload_url: uploadUrl, file_url: url } = await initiateUpload( file, config, + contentType, ); const response = await fetch(uploadUrl, { method: "PUT", diff --git a/package-lock.json b/package-lock.json index 73d41ae..94fcf26 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "@oclif/plugin-help": "^5.2.5", "@remix-run/dev": "^2.11.1", "@remix-run/node": "^2.11.1", + "async-sema": "^3.1.1", "axios": "^1.0.0", "chalk": "^5.3.0", "change-case": "^4.1.2", @@ -10422,6 +10423,11 @@ "resolved": "https://registry.npmjs.org/async/-/async-3.2.4.tgz", "integrity": "sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==" }, + "node_modules/async-sema": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/async-sema/-/async-sema-3.1.1.tgz", + "integrity": "sha512-tLRNUXati5MFePdAk8dw7Qt7DpxPB60ofAgn8WRhW6a2rcimZnYBP9oxHiv0OHy+Wz7kPMG+t4LGdt31+4EmGg==" + }, "node_modules/asynciterator.prototype": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/asynciterator.prototype/-/asynciterator.prototype-1.0.0.tgz", diff --git a/package.json b/package.json index 3d6231f..1689a2c 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "@oclif/plugin-help": "^5.2.5", "@remix-run/dev": "^2.11.1", "@remix-run/node": "^2.11.1", + "async-sema": "^3.1.1", "axios": "^1.0.0", "chalk": "^5.3.0", "change-case": "^4.1.2", From ee8ec9b1159acd35b571ec5c23ab6739c97fbc45 Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Mon, 25 Nov 2024 17:03:33 -0400 Subject: [PATCH 3/8] fix: use new API from CDN --- libs/client/src/storage.ts | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/libs/client/src/storage.ts b/libs/client/src/storage.ts index 4721dd8..8ade4ae 100644 --- a/libs/client/src/storage.ts +++ b/libs/client/src/storage.ts @@ -97,13 +97,18 @@ async function initiateMultipartUpload( }); } +type MultipartObject = { + partNumber: number; + etag: string; +}; + async function partUploadRetries( uploadUrl: string, chunk: Blob, config: RequiredConfig, cancelled: { current: boolean }, tries: number, -) { +): Promise { if (cancelled.current) { throw new Error("Part upload failed, upload cancelled"); } @@ -119,8 +124,8 @@ async function partUploadRetries( method: "PUT", body: chunk, }); - await responseHandler(response); - return response; + + return (await responseHandler(response)) as MultipartObject; } catch (error) { console.error("Part upload failed, retrying", uploadUrl, error); return await partUploadRetries( @@ -167,7 +172,7 @@ async function multipartUpload( // Max 5 parallel uploads const limitedParallelUploads = new Sema(3); - const partPromises: Promise[] = []; + const partPromises: Promise[] = []; // To be able to cancel the upload if any of the parts fail const cancelled = { current: false }; @@ -192,7 +197,7 @@ async function multipartUpload( ); } - let responses: Response[]; + let responses: MultipartObject[]; try { // Does this wait for all to finish even if an early one fails? responses = await Promise.all(partPromises); @@ -204,6 +209,7 @@ async function multipartUpload( throw error; } + console.log("All parts uploaded, completing upload", responses); // Complete the upload const completeUrl = `${parsedUrl.origin}${parsedUrl.pathname}/complete${parsedUrl.search}`; const response = await fetch(completeUrl, { @@ -212,9 +218,9 @@ async function multipartUpload( "Content-Type": "application/json", }, body: JSON.stringify({ - parts: responses.map((response, index) => ({ - partNumber: index + 1, // Parts are 1-indexed - etag: response.headers.get("ETag"), + parts: responses.map((mpart) => ({ + partNumber: mpart.partNumber, + etag: mpart.etag, })), }), }); From 1544ca0dea524e641c0c4c0920b8fae014a7077b Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Mon, 25 Nov 2024 17:45:28 -0400 Subject: [PATCH 4/8] alpha --- libs/client/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/client/package.json b/libs/client/package.json index 5b13b60..4f1b048 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/client", "description": "The fal.ai client for JavaScript and TypeScript", - "version": "1.1.3", + "version": "1.2.0-alpha.4", "license": "MIT", "repository": { "type": "git", From 2726a5eb5ae29d1fd280fbd63007c5094520ca5d Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Mon, 25 Nov 2024 17:56:46 -0400 Subject: [PATCH 5/8] fix: include dependency --- libs/client/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/client/package.json b/libs/client/package.json index 4f1b048..1e4fb65 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -30,6 +30,7 @@ "types": "./src/index.d.ts", "dependencies": { "@msgpack/msgpack": "^3.0.0-beta2", + "async-sema": "^3.1.1", "eventsource-parser": "^1.1.2", "robot3": "^0.4.1" }, From 22b622ba569f90882fae5ff6cf6632d90f6cafb0 Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Mon, 25 Nov 2024 17:58:15 -0400 Subject: [PATCH 6/8] chore: bump alpha --- libs/client/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/client/package.json b/libs/client/package.json index 1e4fb65..62e4f9c 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/client", "description": "The fal.ai client for JavaScript and TypeScript", - "version": "1.2.0-alpha.4", + "version": "1.2.0-alpha.5", "license": "MIT", "repository": { "type": "git", From 954dc3acfb7898363be420cf21bc39599d2104d0 Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Mon, 25 Nov 2024 19:43:31 -0400 Subject: [PATCH 7/8] refactor: remove semaphore --- libs/client/package.json | 1 - libs/client/src/storage.ts | 74 +++++++------------------------------- package-lock.json | 6 ---- package.json | 1 - 4 files changed, 13 insertions(+), 69 deletions(-) diff --git a/libs/client/package.json b/libs/client/package.json index 62e4f9c..b197974 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -30,7 +30,6 @@ "types": "./src/index.d.ts", "dependencies": { "@msgpack/msgpack": "^3.0.0-beta2", - "async-sema": "^3.1.1", "eventsource-parser": "^1.1.2", "robot3": "^0.4.1" }, diff --git a/libs/client/src/storage.ts b/libs/client/src/storage.ts index 8ade4ae..7ddc0c0 100644 --- a/libs/client/src/storage.ts +++ b/libs/client/src/storage.ts @@ -1,4 +1,3 @@ -import { Sema } from "async-sema"; import { getRestApiUrl, RequiredConfig } from "./config"; import { dispatchRequest } from "./request"; import { isPlainObject } from "./utils"; @@ -106,13 +105,8 @@ async function partUploadRetries( uploadUrl: string, chunk: Blob, config: RequiredConfig, - cancelled: { current: boolean }, - tries: number, + tries: number = 3, ): Promise { - if (cancelled.current) { - throw new Error("Part upload failed, upload cancelled"); - } - if (tries === 0) { throw new Error("Part upload failed, retries exhausted"); } @@ -128,29 +122,7 @@ async function partUploadRetries( return (await responseHandler(response)) as MultipartObject; } catch (error) { console.error("Part upload failed, retrying", uploadUrl, error); - return await partUploadRetries( - uploadUrl, - chunk, - config, - cancelled, - tries - 1, - ); - } -} - -async function partUpload( - uploadUrl: string, - chunk: Blob, - config: RequiredConfig, - parallelUploads: Sema, - cancelled: { current: boolean }, -) { - await parallelUploads.acquire(); - - try { - return await partUploadRetries(uploadUrl, chunk, config, cancelled, 3); - } finally { - parallelUploads.release(); + return await partUploadRetries(uploadUrl, chunk, config, tries - 1); } } @@ -169,42 +141,22 @@ async function multipartUpload( const parsedUrl = new URL(uploadUrl); - // Max 5 parallel uploads - const limitedParallelUploads = new Sema(3); - - const partPromises: Promise[] = []; + const responses: MultipartObject[] = []; - // To be able to cancel the upload if any of the parts fail - const cancelled = { current: false }; - for (let i = 0; i < chunks; i++) { - const start = i * chunkSize; - const end = Math.min(start + chunkSize, file.size); - - const chunk = file.slice(start, end); + try { + for (let i = 0; i < chunks; i++) { + const start = i * chunkSize; + const end = Math.min(start + chunkSize, file.size); - const partNumber = i + 1; - // {uploadUrl}/{part_number}?uploadUrlParams=... - const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`; + const chunk = file.slice(start, end); - partPromises.push( - partUpload( - partUploadUrl, - chunk, - config, - limitedParallelUploads, - cancelled, - ), - ); - } + const partNumber = i + 1; + // {uploadUrl}/{part_number}?uploadUrlParams=... + const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`; - let responses: MultipartObject[]; - try { - // Does this wait for all to finish even if an early one fails? - responses = await Promise.all(partPromises); + responses.push(await partUploadRetries(partUploadUrl, chunk, config)); + } } catch (error) { - // If any of the parts fail, cancel other uploads - cancelled.current = true; - console.error("Multipart upload failed, aborting upload", error); throw error; } diff --git a/package-lock.json b/package-lock.json index 94fcf26..73d41ae 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,6 @@ "@oclif/plugin-help": "^5.2.5", "@remix-run/dev": "^2.11.1", "@remix-run/node": "^2.11.1", - "async-sema": "^3.1.1", "axios": "^1.0.0", "chalk": "^5.3.0", "change-case": "^4.1.2", @@ -10423,11 +10422,6 @@ "resolved": "https://registry.npmjs.org/async/-/async-3.2.4.tgz", "integrity": "sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==" }, - "node_modules/async-sema": { - "version": "3.1.1", - "resolved": "https://registry.npmjs.org/async-sema/-/async-sema-3.1.1.tgz", - "integrity": "sha512-tLRNUXati5MFePdAk8dw7Qt7DpxPB60ofAgn8WRhW6a2rcimZnYBP9oxHiv0OHy+Wz7kPMG+t4LGdt31+4EmGg==" - }, "node_modules/asynciterator.prototype": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/asynciterator.prototype/-/asynciterator.prototype-1.0.0.tgz", diff --git a/package.json b/package.json index 1689a2c..3d6231f 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,6 @@ "@oclif/plugin-help": "^5.2.5", "@remix-run/dev": "^2.11.1", "@remix-run/node": "^2.11.1", - "async-sema": "^3.1.1", "axios": "^1.0.0", "chalk": "^5.3.0", "change-case": "^4.1.2", From 82a28bb1a666f6d4357f6d1d1418c763f40a175b Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Mon, 25 Nov 2024 19:52:43 -0400 Subject: [PATCH 8/8] refactor: remove console --- libs/client/src/storage.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/libs/client/src/storage.ts b/libs/client/src/storage.ts index 7ddc0c0..5bd86a7 100644 --- a/libs/client/src/storage.ts +++ b/libs/client/src/storage.ts @@ -121,7 +121,6 @@ async function partUploadRetries( return (await responseHandler(response)) as MultipartObject; } catch (error) { - console.error("Part upload failed, retrying", uploadUrl, error); return await partUploadRetries(uploadUrl, chunk, config, tries - 1); } } @@ -157,11 +156,9 @@ async function multipartUpload( responses.push(await partUploadRetries(partUploadUrl, chunk, config)); } } catch (error) { - console.error("Multipart upload failed, aborting upload", error); throw error; } - console.log("All parts uploaded, completing upload", responses); // Complete the upload const completeUrl = `${parsedUrl.origin}${parsedUrl.pathname}/complete${parsedUrl.search}`; const response = await fetch(completeUrl, {