Skip to content

Commit

Permalink
Merge pull request #182 from upstash/DX-1289-base64decode
Browse files Browse the repository at this point in the history
Add decodeBase64 and use it in workflow
  • Loading branch information
CahidArda authored Sep 13, 2024
2 parents 6f572a2 + 9b63dae commit a65ab18
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 8 deletions.
27 changes: 27 additions & 0 deletions src/client/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,30 @@ export function nanoid() {
.map((x) => NANOID_CHARS[x % NANOID_CHARS.length])
.join("");
}

/**
* When the base64 string has unicode characters, atob doesn't decode
* them correctly since it only outputs ASCII characters. Therefore,
* instead of using atob, we properly decode them.
*
* If the decoding into unicode somehow fails, returns the result of atob
*
* https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem
*
* @param base64 encoded string
*/
export function decodeBase64(base64: string) {
try {
const binString = atob(base64);
// @ts-expect-error m will always be defined
const intArray = Uint8Array.from(binString, (m) => m.codePointAt(0));
return new TextDecoder().decode(intArray);
} catch (error) {
// this error should never happen essentially. It's only a failsafe
console.warn(
`Upstash Qstash: Failed while decoding base64 "${base64}".` +
` Decoding with atob and returning it instead. Error: ${error}`
);
return atob(base64);
}
}
29 changes: 29 additions & 0 deletions src/client/workflow/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,33 @@ describe.skip("live serve tests", () => {
timeout: 22_000,
}
);

test(
"unicode payload",
async () => {
const finishState = new FinishState();
const payload = "“unicode-quotes”";
await testEndpoint({
finalCount: 3,
waitFor: 5000,
initialPayload: payload,
finishState,
routeFunction: async (context) => {
const input = context.requestPayload;

expect(input).toBe(payload);

const result = await context.run("step1", () => {
return `result: ${input}`;
});

expect(result).toBe(`result: ${payload}`);
finishState.finish();
},
});
},
{
timeout: 7000,
}
);
});
10 changes: 5 additions & 5 deletions src/client/workflow/workflow-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import type {
import type { WorkflowLogger } from "./logger";
import { WorkflowContext } from "./context";
import { recreateUserHeaders } from "./workflow-requests";
import { nanoid } from "../utils";
import { decodeBase64, nanoid } from "../utils";

/**
* Gets the request body. If that fails, returns undefined
Expand Down Expand Up @@ -51,7 +51,7 @@ const parsePayload = (rawPayload: string) => {
const [encodedInitialPayload, ...encodedSteps] = JSON.parse(rawPayload) as RawStep[];

// decode initial payload:
const rawInitialPayload = atob(encodedInitialPayload.body);
const rawInitialPayload = decodeBase64(encodedInitialPayload.body);
const initialStep: Step = {
stepId: 0,
stepName: "init",
Expand All @@ -65,7 +65,7 @@ const parsePayload = (rawPayload: string) => {

// decode & parse other steps:
const otherSteps = stepsToDecode.map((rawStep) => {
return JSON.parse(atob(rawStep.body)) as Step;
return JSON.parse(decodeBase64(rawStep.body)) as Step;
});

// join and deduplicate steps:
Expand Down Expand Up @@ -275,15 +275,15 @@ export const handleFailure = async <TInitialPayload>(
workflowRunId: string;
};

const decodedBody = body ? atob(body) : "{}";
const decodedBody = body ? decodeBase64(body) : "{}";
const errorPayload = JSON.parse(decodedBody) as FailureFunctionPayload;

// parse steps
const {
rawInitialPayload,
steps,
isLastDuplicate: _isLastDuplicate,
} = await parseRequest(atob(sourceBody), false, debug);
} = await parseRequest(decodeBase64(sourceBody), false, debug);

// create context
const workflowContext = new WorkflowContext<TInitialPayload>({
Expand Down
7 changes: 4 additions & 3 deletions src/client/workflow/workflow-requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type {
} from "./types";
import { StepTypes } from "./types";
import type { WorkflowLogger } from "./logger";
import { decodeBase64 } from "../utils";

export const triggerFirstInvocation = async <TInitialPayload>(
workflowContext: WorkflowContext<TInitialPayload>,
Expand Down Expand Up @@ -155,13 +156,13 @@ export const handleThirdPartyCallResult = async (
if (!(callbackMessage.status >= 200 && callbackMessage.status < 300)) {
await debug?.log("WARN", "SUBMIT_THIRD_PARTY_RESULT", {
status: callbackMessage.status,
body: atob(callbackMessage.body),
body: decodeBase64(callbackMessage.body),
});
// this callback will be retried by the QStash, we just ignore it
console.warn(
`Workflow Warning: "context.call" failed with status ${callbackMessage.status}` +
` and will retry (if there are retries remaining).` +
` Error Message:\n${atob(callbackMessage.body)}`
` Error Message:\n${decodeBase64(callbackMessage.body)}`
);
return ok("call-will-retry");
}
Expand Down Expand Up @@ -210,7 +211,7 @@ export const handleThirdPartyCallResult = async (
stepId: Number(stepIdString),
stepName,
stepType,
out: atob(callbackMessage.body),
out: decodeBase64(callbackMessage.body),
concurrent: Number(concurrentString),
};

Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from "./client/messages";
export * from "./client/schedules";
export * from "./client/url-groups";
export * from "./client/types";
export { decodeBase64 } from "./client/utils";

export * from "./client/llm/chat";
export * from "./client/llm/types";
Expand Down

0 comments on commit a65ab18

Please sign in to comment.