Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): implement cloud.Bucket inflight method signedUrl for sim target #7137

Merged
merged 17 commits into from
Sep 17, 2024
8 changes: 8 additions & 0 deletions packages/@winglang/sdk/.projen/deps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/@winglang/sdk/.projenrc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const project = new cdk.JsiiProject({
"protobufjs@7.2.5",
// simulator dependencies
"express",
"busboy",
"uuid",
// using version 3 because starting from version 4, it no longer works with CommonJS.
"nanoid@^3.3.7",
Expand Down Expand Up @@ -114,6 +115,7 @@ const project = new cdk.JsiiProject({
"eslint-plugin-sort-exports",
"fs-extra",
"vitest",
"@types/busboy",
"@types/uuid",
"nanoid", // for ESM import test in target-sim/function.test.ts
"chalk",
Expand Down
3 changes: 3 additions & 0 deletions packages/@winglang/sdk/package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

249 changes: 234 additions & 15 deletions packages/@winglang/sdk/src/target-sim/bucket.inflight.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import * as crypto from "crypto";
import * as fs from "fs";
import { Server } from "http";
import { AddressInfo, Socket } from "net";
import { dirname, join } from "path";
import * as url from "url";
import { pathToFileURL } from "url";
import busboy, { FileInfo } from "busboy";
import express from "express";
import mime from "mime-types";
import { BucketAttributes, BucketSchema } from "./schema-resources";
import { exists } from "./util";
Expand All @@ -16,6 +20,8 @@ import {
BucketGetOptions,
BucketTryGetOptions,
BUCKET_FQN,
BucketSignedUrlAction,
CorsHeaders,
} from "../cloud";
import { deserialize, serialize } from "../simulator/serialization";
import {
Expand All @@ -27,19 +33,128 @@ import { Datetime, Json, LogLevel, TraceType } from "../std";

export const METADATA_FILENAME = "metadata.json";

const LOCALHOST_ADDRESS = "127.0.0.1";

const STATE_FILENAME = "state.json";

/**
* Contents of the state file for this resource.
*/
interface StateFileContents {
/**
* The last port used by the API server on a previous simulator run.
*/
readonly lastPort?: number;
}

export class Bucket implements IBucketClient, ISimulatorResourceInstance {
private _fileDir!: string;
private _context: ISimulatorContext | undefined;
private readonly initialObjects: Record<string, string>;
private readonly _public: boolean;
private readonly topicHandlers: Partial<Record<BucketEventType, string>>;
private _metadata: Map<string, ObjectMetadata>;
private readonly app: express.Application;
private server: Server | undefined;
private url: string | undefined;
private port: number | undefined;

public constructor(props: BucketSchema) {
this.initialObjects = props.initialObjects ?? {};
this._public = props.public ?? false;
this.topicHandlers = props.topics;
this._metadata = new Map();

this.app = express();
// Enable cors for all requests.
this.app.use((req, res, next) => {
const corsHeaders: CorsHeaders = {
defaultResponse: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, PUT",
"Access-Control-Allow-Headers": "*",
},
optionsResponse: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, PUT",
"Access-Control-Allow-Headers": "*",
"Access-Control-Max-Age": "86400",
},
};
const method =
req.method && req.method.toUpperCase && req.method.toUpperCase();

if (method === "OPTIONS") {
for (const [key, value] of Object.entries(
corsHeaders.optionsResponse
)) {
res.setHeader(key, value);
}
res.status(204).send();
} else {
for (const [key, value] of Object.entries(
corsHeaders.defaultResponse
)) {
res.setHeader(key, value);
}
next();
}
});
this.app.put("*", (req, res) => {
const action = req.query.action;
Chriscbr marked this conversation as resolved.
Show resolved Hide resolved
if (action === BucketSignedUrlAction.DOWNLOAD) {
return res.status(403).send("Operation not allowed");
}

const validUntil = req.query.validUntil?.toString();
if (validUntil) {
const validUntilMs = parseInt(validUntil) * 1000;
if (Date.now() > validUntilMs) {
return res.status(403).send("Signed URL has expired");
}
}

const key = req.path;
const hash = this.hashKey(key);
const filename = join(this._fileDir, hash);

const actionType: BucketEventType = this._metadata.has(key)
? BucketEventType.UPDATE
: BucketEventType.CREATE;

let fileInfo: FileInfo | undefined;

const bb = busboy({ headers: req.headers });
bb.on("file", (_name, file, _info) => {
fileInfo = _info;
file.pipe(fs.createWriteStream(filename));
});
bb.on("close", () => {
void this.updateMetadataAndNotify(key, actionType, fileInfo?.mimeType);
res.writeHead(200, { Connection: "close" });
res.end();
});
req.pipe(bb);
return;
});
this.app.get("*", (req, res) => {
const action = req.query.action;
if (action === BucketSignedUrlAction.UPLOAD) {
return res.status(403).send("Operation not allowed");
}

const validUntil = req.query.validUntil?.toString();
if (validUntil) {
const validUntilMs = parseInt(validUntil) * 1000;
if (Date.now() > validUntilMs) {
return res.status(403).send("Signed URL has expired");
}
}

const hash = this.hashKey(req.path);
const filename = join(this._fileDir, hash);
return res.download(filename);
});
}

private get context(): ISimulatorContext {
Expand Down Expand Up @@ -86,22 +201,90 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
});
}

return {};
// Check for a previous state file to see if there was a port that was previously being used
// if so, try to use it out of convenience
let lastPort: number | undefined;
const state: StateFileContents = await this.loadState();
if (state.lastPort) {
const portAvailable = await isPortAvailable(state.lastPort);
if (portAvailable) {
lastPort = state.lastPort;
}
}

// `server.address()` returns `null` until the server is listening
// on a port. We use a promise to wait for the server to start
// listening before returning the URL.
const addrInfo: AddressInfo = await new Promise((resolve, reject) => {
this.server = this.app.listen(lastPort ?? 0, LOCALHOST_ADDRESS, () => {
const addr = this.server?.address();
if (addr && typeof addr === "object" && (addr as AddressInfo).port) {
resolve(addr);
} else {
reject(new Error("No address found"));
}
});
});
this.port = addrInfo.port;
this.url = `http://${addrInfo.address}:${addrInfo.port}`;

this.addTrace(`Server listening on ${this.url}`, LogLevel.VERBOSE);

return {
url: this.url,
};
}

public async cleanup(): Promise<void> {}
public async cleanup(): Promise<void> {
this.addTrace(`Closing server on ${this.url}`, LogLevel.VERBOSE);

return new Promise((resolve, reject) => {
this.server?.close((err) => {
if (err) {
return reject(err);
}

this.server?.closeAllConnections();
return resolve();
});
});
}

public async plan() {
return UpdatePlan.AUTO;
}

private async loadState(): Promise<StateFileContents> {
const stateFileExists = await exists(
join(this.context.statedir, STATE_FILENAME)
);
if (stateFileExists) {
const stateFileContents = await fs.promises.readFile(
join(this.context.statedir, STATE_FILENAME),
"utf-8"
);
return JSON.parse(stateFileContents);
} else {
return {};
}
}

private async saveState(state: StateFileContents): Promise<void> {
fs.writeFileSync(
join(this.context.statedir, STATE_FILENAME),
JSON.stringify(state)
);
}

public async save(): Promise<void> {
// no need to save individual files, since they are already persisted in the state dir
// during the bucket's lifecycle
fs.writeFileSync(
join(this.context.statedir, METADATA_FILENAME),
serialize(Array.from(this._metadata.entries())) // metadata contains Datetime values, so we need to serialize it
);

await this.saveState({ lastPort: this.port });
}

private async notifyListeners(
Expand Down Expand Up @@ -291,21 +474,25 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
);
}

return url.pathToFileURL(filePath).href;
return pathToFileURL(filePath).href;
},
});
}

public async signedUrl(key: string, options?: BucketSignedUrlOptions) {
options;
return this.context.withTrace({
message: `Signed URL (key=${key})`,
activity: async () => {
throw new Error(
`signedUrl is not implemented yet for the simulator (key=${key})`
);
},
});
const url = new URL(key, this.url);
if (options?.action) {
url.searchParams.set("action", options.action);
}
if (options?.duration) {
// BUG: The `options?.duration` is supposed to be an instance of `Duration` but it is not. It's just
// a POJO with seconds, but TypeScript thinks otherwise.
url.searchParams.set(
"validUntil",
String(Datetime.utcNow().ms + options.duration.seconds * 1000)
);
}
return url.toString();
}

/**
Expand Down Expand Up @@ -370,10 +557,19 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
await fs.promises.mkdir(dirName, { recursive: true });
await fs.promises.writeFile(filename, value);

await this.updateMetadataAndNotify(key, actionType, contentType);
}

private async updateMetadataAndNotify(
key: string,
actionType: BucketEventType,
contentType?: string
): Promise<void> {
const hash = this.hashKey(key);
const filename = join(this._fileDir, hash);
const filestat = await fs.promises.stat(filename);
const determinedContentType =
(contentType ?? mime.lookup(key)) || "application/octet-stream";

contentType ?? (mime.lookup(key) || "application/octet-stream");
this._metadata.set(key, {
size: filestat.size,
lastModified: Datetime.fromDate(filestat.mtime),
Expand All @@ -398,3 +594,26 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
});
}
}

async function isPortAvailable(port: number): Promise<boolean> {
return new Promise((resolve, _reject) => {
const s = new Socket();
s.once("error", (err) => {
s.destroy();
if ((err as any).code !== "ECONNREFUSED") {
resolve(false);
} else {
// connection refused means the port is not used
resolve(true);
}
});

s.once("connect", () => {
s.destroy();
// connection successful means the port is used
resolve(false);
});

s.connect(port, LOCALHOST_ADDRESS);
});
}
Chriscbr marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading