Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatically do multi-part upload on 90+ MB files #111

Merged
merged 8 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libs/client/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@fal-ai/client",
"description": "The fal.ai client for JavaScript and TypeScript",
"version": "1.1.3",
"version": "1.2.0-alpha.5",
"license": "MIT",
"repository": {
"type": "git",
Expand Down Expand Up @@ -30,6 +30,7 @@
"types": "./src/index.d.ts",
"dependencies": {
"@msgpack/msgpack": "^3.0.0-beta2",
"async-sema": "^3.1.1",
chamini2 marked this conversation as resolved.
Show resolved Hide resolved
"eventsource-parser": "^1.1.2",
"robot3": "^0.4.1"
},
Expand Down
174 changes: 168 additions & 6 deletions libs/client/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Sema } from "async-sema";
import { getRestApiUrl, RequiredConfig } from "./config";
import { dispatchRequest } from "./request";
import { isPlainObject } from "./utils";

/**
* File support for the client. This interface establishes the contract for
* uploading files to the server and transforming the input to replace file
Expand Down Expand Up @@ -53,20 +53,42 @@ function getExtensionFromContentType(contentType: string): string {
/**
* Initiate the upload of a file to the server. This returns the URL to upload
* the file to and the URL of the file once it is uploaded.
*
* @param file the file to upload
* @returns the URL to upload the file to and the URL of the file once it is uploaded.
*/
async function initiateUpload(
file: Blob,
config: RequiredConfig,
contentType: string,
): Promise<InitiateUploadResult> {
const filename =
file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`;

return await dispatchRequest<InitiateUploadData, InitiateUploadResult>({
method: "POST",
// NOTE: We want to test V3 without making it the default at the API level
targetUrl: `${getRestApiUrl()}/storage/upload/initiate?storage_type=fal-cdn-v3`,
input: {
content_type: contentType,
file_name: filename,
},
config,
});
}

/**
* Initiate the multipart upload of a file to the server. This returns the URL to upload
* the file to and the URL of the file once it is uploaded.
*/
async function initiateMultipartUpload(
file: Blob,
config: RequiredConfig,
contentType: string,
): Promise<InitiateUploadResult> {
const contentType = file.type || "application/octet-stream";
const filename =
file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`;

return await dispatchRequest<InitiateUploadData, InitiateUploadResult>({
method: "POST",
targetUrl: `${getRestApiUrl()}/storage/upload/initiate`,
targetUrl: `${getRestApiUrl()}/storage/upload/initiate-multipart?storage_type=fal-cdn-v3`,
input: {
content_type: contentType,
file_name: filename,
Expand All @@ -75,6 +97,138 @@ async function initiateUpload(
});
}

type MultipartObject = {
partNumber: number;
etag: string;
};

async function partUploadRetries(
uploadUrl: string,
chunk: Blob,
config: RequiredConfig,
cancelled: { current: boolean },
tries: number,
chamini2 marked this conversation as resolved.
Show resolved Hide resolved
): Promise<MultipartObject> {
if (cancelled.current) {
throw new Error("Part upload failed, upload cancelled");
}

if (tries === 0) {
throw new Error("Part upload failed, retries exhausted");
}

const { fetch, responseHandler } = config;

try {
const response = await fetch(uploadUrl, {
method: "PUT",
body: chunk,
});

return (await responseHandler(response)) as MultipartObject;
} catch (error) {
console.error("Part upload failed, retrying", uploadUrl, error);
chamini2 marked this conversation as resolved.
Show resolved Hide resolved
return await partUploadRetries(
uploadUrl,
chunk,
config,
cancelled,
tries - 1,
);
}
}

async function partUpload(
uploadUrl: string,
chunk: Blob,
config: RequiredConfig,
parallelUploads: Sema,
cancelled: { current: boolean },
) {
await parallelUploads.acquire();

try {
return await partUploadRetries(uploadUrl, chunk, config, cancelled, 3);
} finally {
parallelUploads.release();
}
}

async function multipartUpload(
file: Blob,
config: RequiredConfig,
): Promise<string> {
const { fetch, responseHandler } = config;
const contentType = file.type || "application/octet-stream";
const { upload_url: uploadUrl, file_url: url } =
await initiateMultipartUpload(file, config, contentType);

// Break the file into 10MB chunks
const chunkSize = 10 * 1024 * 1024;
const chunks = Math.ceil(file.size / chunkSize);

const parsedUrl = new URL(uploadUrl);

// Max 5 parallel uploads
const limitedParallelUploads = new Sema(3);

const partPromises: Promise<MultipartObject>[] = [];

// To be able to cancel the upload if any of the parts fail
const cancelled = { current: false };
for (let i = 0; i < chunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, file.size);

const chunk = file.slice(start, end);

const partNumber = i + 1;
// {uploadUrl}/{part_number}?uploadUrlParams=...
const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`;

partPromises.push(
partUpload(
partUploadUrl,
chunk,
config,
limitedParallelUploads,
cancelled,
),
);
}

let responses: MultipartObject[];
try {
// Does this wait for all to finish even if an early one fails?
responses = await Promise.all(partPromises);
} catch (error) {
// If any of the parts fail, cancel other uploads
cancelled.current = true;

console.error("Multipart upload failed, aborting upload", error);
throw error;
chamini2 marked this conversation as resolved.
Show resolved Hide resolved
}

console.log("All parts uploaded, completing upload", responses);
chamini2 marked this conversation as resolved.
Show resolved Hide resolved
// Complete the upload
const completeUrl = `${parsedUrl.origin}${parsedUrl.pathname}/complete${parsedUrl.search}`;
const response = await fetch(completeUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
parts: responses.map((mpart) => ({
partNumber: mpart.partNumber,
etag: mpart.etag,
})),
chamini2 marked this conversation as resolved.
Show resolved Hide resolved
}),
});
await responseHandler(response);

return url;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type KeyValuePair = [string, any];

Expand All @@ -87,10 +241,18 @@ export function createStorageClient({
}: StorageClientDependencies): StorageClient {
const ref: StorageClient = {
upload: async (file: Blob) => {
// Check for 90+ MB file size to do multipart upload
if (file.size > 90 * 1024 * 1024) {
return await multipartUpload(file, config);
}

const contentType = file.type || "application/octet-stream";

const { fetch, responseHandler } = config;
const { upload_url: uploadUrl, file_url: url } = await initiateUpload(
file,
config,
contentType,
);
const response = await fetch(uploadUrl, {
method: "PUT",
Expand Down
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"@oclif/plugin-help": "^5.2.5",
"@remix-run/dev": "^2.11.1",
"@remix-run/node": "^2.11.1",
"async-sema": "^3.1.1",
"axios": "^1.0.0",
"chalk": "^5.3.0",
"change-case": "^4.1.2",
Expand Down