Skip to content

Commit

Permalink
[BDGR-198] Do not delete source from Tus until processing job succeeds (
Browse files Browse the repository at this point in the history
#671)

* [BDGR-198] Do not delete source from Tus until processing job succeeds

* Add test-force-failure to ProcessMediaJob too

* Fix failure check in test
  • Loading branch information
markspolakovs authored Sep 22, 2024
1 parent 2840cdd commit 7c9767d
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 9 deletions.
1 change: 1 addition & 0 deletions jobrunner/src/jobs/LoadAssetJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export class LoadAssetJob extends MediaJobCommon {
},
},
});
await this._cleanupSourceFile(params);
} catch (e) {
await this.db.asset.update({
where: {
Expand Down
23 changes: 17 additions & 6 deletions jobrunner/src/jobs/MediaJobCommon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ export abstract class MediaJobCommon extends AbstractJob {
filePath,
);
invariant(await dl.wait(), "download did not succeed");

await got.delete(process.env.TUS_ENDPOINT + "/" + params.source, {
headers: {
"Tus-Resumable": "1.0.0",
},
});
break;
}
//fallthrough
Expand Down Expand Up @@ -103,4 +97,21 @@ export abstract class MediaJobCommon extends AbstractJob {
await upload.done();
return s3Path;
}

protected async _cleanupSourceFile(params: PrismaJson.JobPayload) {
invariant("sourceType" in params, "sourceType is required");
switch (params.sourceType) {
case "Tus": {
await got.delete(process.env.TUS_ENDPOINT + "/" + params.source, {
headers: {
"Tus-Resumable": "1.0.0",
},
});
break;
}
case "S3":
// Nothing to do here, it will already be at the expected location
break;
}
}
}
49 changes: 47 additions & 2 deletions jobrunner/src/jobs/ProcessMediaJob.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MediaState } from "@badger/prisma/client";
import { JobState, MediaState } from "@badger/prisma/client";
import { it, expect } from "vitest";
import { doOneJob } from "../index.js";
import { integrate } from "@badger/testing";
Expand Down Expand Up @@ -28,7 +28,7 @@ async function uploadTestFileToTus() {
throw new Error("Tus rejected creation");
}

const uploadReq = await got.stream.patch(createRes.headers.location!, {
const uploadReq = got.stream.patch(createRes.headers.location!, {
body: sourceFile,
headers: {
"Tus-Resumable": "1.0.0",
Expand Down Expand Up @@ -104,4 +104,49 @@ integrate("ProcessMediaJob", () => {
);
expect(tusRes.statusCode).not.toBe(200);
});

it("handles failure", async () => {
const testMediaPath = await uploadTestFileToTus();
const media = await db.media.create({
data: {
name: "__FAIL__smpte_bars_15s.mp4",
durationSeconds: 0,
rawPath: "",
continuityItems: {
create: {
name: "Test",
durationSeconds: 0,
order: 1,
show: {
create: {
name: "Test",
start: new Date(),
},
},
},
},
},
});
const job = await db.baseJob.create({
data: {
jobType: "ProcessMediaJob",
jobPayload: {
mediaId: media.id,
sourceType: "Tus",
source: testMediaPath,
},
},
});
await doOneJob();
await expect(
db.baseJob.findFirst({ where: { id: job.id } }),
).resolves.toHaveProperty("state", JobState.Failed);
// Check the file is not deleted from Tus
const res = await got.head(process.env.TUS_ENDPOINT + "/" + testMediaPath, {
headers: {
"Tus-Resumable": "1.0.0",
},
});
expect(res.statusCode).toBe(200);
});
});
8 changes: 8 additions & 0 deletions jobrunner/src/jobs/ProcessMediaJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ export default class ProcessMediaJob extends MediaJobCommon {
});

try {
// Test only: allow testing failure handling
if (media.name.includes("__FAIL__")) {
throw new Error(
"Failing job to test error handling (I sure do hope this is a test...)",
);
}

const rawTempPath = await this._wrapTask(
media,
"Downloading source file",
Expand Down Expand Up @@ -291,6 +298,7 @@ export default class ProcessMediaJob extends MediaJobCommon {
},
},
});
await this._cleanupSourceFile(params);
} catch (e) {
await this.db.media.update({
where: {
Expand Down
2 changes: 1 addition & 1 deletion server/next.config.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// @ts-check
/* eslint-disable @typescript-eslint/no-var-requires */
/* eslint-disable @typescript-eslint/no-require-imports */
const { PrismaPlugin } = require("@prisma/nextjs-monorepo-workaround-plugin");
const { withSentryConfig } = require("@sentry/nextjs");
const { execFileSync } = require("child_process");
Expand Down

0 comments on commit 7c9767d

Please sign in to comment.