Skip to content

Commit

Permalink
feat: automatically do multi-part upload on 90+ MB files
Browse files Browse the repository at this point in the history
  • Loading branch information
chamini2 committed Nov 25, 2024
1 parent 4bd76af commit 39239d2
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 5 deletions.
165 changes: 160 additions & 5 deletions libs/client/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Sema } from "async-sema";
import { getRestApiUrl, RequiredConfig } from "./config";
import { dispatchRequest } from "./request";
import { isPlainObject } from "./utils";

/**
* File support for the client. This interface establishes the contract for
* uploading files to the server and transforming the input to replace file
Expand Down Expand Up @@ -53,17 +53,15 @@ function getExtensionFromContentType(contentType: string): string {
/**
* Initiate the upload of a file to the server. This returns the URL to upload
* the file to and the URL of the file once it is uploaded.
*
* @param file the file to upload
* @returns the URL to upload the file to and the URL of the file once it is uploaded.
*/
async function initiateUpload(
file: Blob,
config: RequiredConfig,
contentType: string,
): Promise<InitiateUploadResult> {
const contentType = file.type || "application/octet-stream";
const filename =
file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`;

return await dispatchRequest<InitiateUploadData, InitiateUploadResult>({
method: "POST",
// NOTE: We want to test V3 without making it the default at the API level
Expand All @@ -76,6 +74,155 @@ async function initiateUpload(
});
}

/**
* Initiate the multipart upload of a file to the server. This returns the URL to upload
* the file to and the URL of the file once it is uploaded.
*/
async function initiateMultipartUpload(
file: Blob,
config: RequiredConfig,
contentType: string,
): Promise<InitiateUploadResult> {
const filename =
file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`;

return await dispatchRequest<InitiateUploadData, InitiateUploadResult>({
method: "POST",
targetUrl: `${getRestApiUrl()}/storage/upload/initiate-multipart?storage_type=fal-cdn-v3`,
input: {
content_type: contentType,
file_name: filename,
},
config,
});
}

async function partUploadRetries(
uploadUrl: string,
chunk: Blob,
config: RequiredConfig,
cancelled: { current: boolean },
tries: number,
) {
if (cancelled.current) {
throw new Error("Part upload failed, upload cancelled");
}

if (tries === 0) {
throw new Error("Part upload failed, retries exhausted");
}

const { fetch, responseHandler } = config;

try {
const response = await fetch(uploadUrl, {
method: "PUT",
body: chunk,
});
await responseHandler(response);
return response;
} catch (error) {
console.error("Part upload failed, retrying", uploadUrl, error);
return await partUploadRetries(
uploadUrl,
chunk,
config,
cancelled,
tries - 1,
);
}
}

async function partUpload(
uploadUrl: string,
chunk: Blob,
config: RequiredConfig,
parallelUploads: Sema,
cancelled: { current: boolean },
) {
await parallelUploads.acquire();

try {
return await partUploadRetries(uploadUrl, chunk, config, cancelled, 3);
} finally {
parallelUploads.release();
}
}

async function multipartUpload(
file: Blob,
config: RequiredConfig,
): Promise<string> {
const { fetch, responseHandler } = config;
const contentType = file.type || "application/octet-stream";
const { upload_url: uploadUrl, file_url: url } =
await initiateMultipartUpload(file, config, contentType);

// Break the file into 10MB chunks
const chunkSize = 10 * 1024 * 1024;
const chunks = Math.ceil(file.size / chunkSize);

const parsedUrl = new URL(uploadUrl);

// Max 5 parallel uploads
const limitedParallelUploads = new Sema(3);

const partPromises: Promise<Response>[] = [];

// To be able to cancel the upload if any of the parts fail
const cancelled = { current: false };
for (let i = 0; i < chunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, file.size);

const chunk = file.slice(start, end);

const partNumber = i + 1;
// {uploadUrl}/{part_number}?uploadUrlParams=...
const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`;

partPromises.push(
partUpload(
partUploadUrl,
chunk,
config,
limitedParallelUploads,
cancelled,
),
);
}

let responses: Response[];
try {
// Does this wait for all to finish even if an early one fails?
responses = await Promise.all(partPromises);
} catch (error) {
// If any of the parts fail, cancel other uploads
cancelled.current = true;

console.error("Multipart upload failed, aborting upload", error);
throw error;
}

// Complete the upload
const completeUrl = `${parsedUrl.origin}${parsedUrl.pathname}/complete${parsedUrl.search}`;
const response = await fetch(completeUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
parts: responses.map((response, index) => ({
partNumber: index + 1, // Parts are 1-indexed
etag: response.headers.get("ETag"),
})),
}),
});
await responseHandler(response);

return url;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type KeyValuePair = [string, any];

Expand All @@ -88,10 +235,18 @@ export function createStorageClient({
}: StorageClientDependencies): StorageClient {
const ref: StorageClient = {
upload: async (file: Blob) => {
// Check for 90+ MB file size to do multipart upload
if (file.size > 90 * 1024 * 1024) {
return await multipartUpload(file, config);
}

const contentType = file.type || "application/octet-stream";

const { fetch, responseHandler } = config;
const { upload_url: uploadUrl, file_url: url } = await initiateUpload(
file,
config,
contentType,
);
const response = await fetch(uploadUrl, {
method: "PUT",
Expand Down
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"@oclif/plugin-help": "^5.2.5",
"@remix-run/dev": "^2.11.1",
"@remix-run/node": "^2.11.1",
"async-sema": "^3.1.1",
"axios": "^1.0.0",
"chalk": "^5.3.0",
"change-case": "^4.1.2",
Expand Down

0 comments on commit 39239d2

Please sign in to comment.