-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
855a82b
commit 5912460
Showing
26 changed files
with
1,552 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
auto-install-peers=true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# attach-write-to-read | ||
|
||
> Attach Pipeline is responsible for pulling CAR files written in a S3 bucket and streamming them into R2. | ||
# Table of Contents <!-- omit in toc --> | ||
|
||
- [High level attach pipeline architecture](#high-level-attach-pipeline-architecture) | ||
- [Contributing](#contributing) | ||
- [License](#license) | ||
|
||
## High level attach pipeline architecture | ||
|
||
![High level Architecture](./attach-pipeline.png) | ||
|
||
The attach pipeline is composed by: | ||
- a Gateway API running in Cloudlare Workers | ||
- a Attach Queue to trigger requests to pull CARs | ||
- a Car Puller handler running in Cloudflare Workers | ||
- a R2 bucket | ||
|
||
The Gateway endpoint receives requests with batches of CAR URLs (from S3) to be pulled into R2. | ||
|
||
The Car Puller handler, which pulls CAR files from S3 and write them into R2 (mirroring with same key format `carCid/carCid.car`). Worth pointing out that Car Puller API is currently opinated and requires the `ETag` header provided on AWS S3 response to guarantee data integrity while writing to R2 via md5. | ||
|
||
## Usage | ||
|
||
The `attach-pipeline` entry point is the Gateway API. It exposes an authenticated endpoint that receives requests with batches of CAR URLs (from S3) and delegates the batch processing to a Queue. | ||
|
||
```console | ||
$ echo -e '{"bafy0":"https://cars.s3.amazonaws.com/bafy0/bafy0.car"}' | curl -X POST -H 'Authorization: Basic ACCESS_KEY=' --data-binary @- https://attach-gateway.web3.storage.web3.storage | ||
``` | ||
|
||
# Contributing | ||
|
||
Feel free to join in. All welcome. [Open an issue](https://github.com/web3-storage/attach-pipeline/issues)! | ||
|
||
# License | ||
|
||
Dual-licensed under [MIT + Apache 2.0](https://github.com/web3-storage/attach-pipeline/blob/main/LICENSE.md) |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
export default { | ||
files: ['test/*.spec.js'], | ||
timeout: '5m', | ||
concurrency: 1, | ||
nodeArguments: ['--experimental-vm-modules'] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
{ | ||
"name": "attach-pipeline", | ||
"version": "1.0.0", | ||
"description": "Attach Pipeline is responsible for pulling CAR files written in a S3 bucket and streamming them into R2", | ||
"private": true, | ||
"type": "module", | ||
"main": "./dist/worker.js", | ||
"scripts": { | ||
"lint": "standard", | ||
"build": "tsc && node scripts/cli.js build", | ||
"dev": "miniflare dist/worker.js --watch --debug -m", | ||
"test": "npm run test:setup && npm run test:worker", | ||
"test:worker": "ava --verbose test/*.spec.js", | ||
"test:setup": "npm run build" | ||
}, | ||
"dependencies": { | ||
"@web3-storage/worker-utils": "^0.4.3-dev", | ||
"itty-router": "^2.6.6", | ||
"toucan-js": "^2.7.0" | ||
}, | ||
"devDependencies": { | ||
"@cloudflare/workers-types": "^3.16.0", | ||
"@sentry/cli": "^1.71.0", | ||
"@types/git-rev-sync": "^2.0.0", | ||
"@web-std/fetch": "^4.0.0", | ||
"ava": "^3.15.0", | ||
"esbuild": "^0.15.10", | ||
"git-rev-sync": "^3.0.1", | ||
"miniflare": "^2.9.0", | ||
"sade": "^1.8.1", | ||
"standard": "^17.0.0", | ||
"typescript": "4.8.4", | ||
"toml": "^3.0.0", | ||
"undici": "^5.11.0" | ||
}, | ||
"standard": { | ||
"ignore": [ | ||
"dist" | ||
] | ||
}, | ||
"author": "Vasco Santos <santos.vasco10@gmail.com>", | ||
"license": "Apache-2.0 OR MIT" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import fs from 'fs' | ||
import path from 'path' | ||
import { fileURLToPath } from 'url' | ||
import { build } from 'esbuild' | ||
import git from 'git-rev-sync' | ||
import Sentry from '@sentry/cli' | ||
|
||
const __dirname = path.dirname(fileURLToPath(import.meta.url)) | ||
const pkg = JSON.parse( | ||
fs.readFileSync(path.join(__dirname, '..', 'package.json'), 'utf8') | ||
) | ||
|
||
/** | ||
* @param {{ env: string; }} opts | ||
*/ | ||
export async function buildCmd (opts) { | ||
const sentryRelease = `attach-pipeline@${pkg.version}-${opts.env}+${git.short( | ||
__dirname | ||
)}` | ||
console.log(`Building ${sentryRelease}`) | ||
|
||
await build({ | ||
entryPoints: [path.join(__dirname, '..', 'src', 'index.js')], | ||
bundle: true, | ||
format: 'esm', | ||
outfile: path.join(__dirname, '..', 'dist', 'worker.js'), | ||
legalComments: 'external', | ||
define: { | ||
SENTRY_RELEASE: JSON.stringify(sentryRelease), | ||
VERSION: JSON.stringify(pkg.version), | ||
COMMITHASH: JSON.stringify(git.long(__dirname)), | ||
BRANCH: JSON.stringify(git.branch(__dirname)), | ||
global: 'globalThis' | ||
}, | ||
minify: opts.env !== 'dev', | ||
sourcemap: 'external' | ||
}) | ||
|
||
// Sentry release and sourcemap upload | ||
if (process.env.SENTRY_UPLOAD === 'true') { | ||
const cli = new Sentry(undefined, { | ||
authToken: process.env.SENTRY_TOKEN, | ||
org: 'protocol-labs-it', | ||
project: 'attach-pipeline', | ||
dist: git.short(__dirname) | ||
}) | ||
|
||
await cli.releases.new(sentryRelease) | ||
await cli.releases.setCommits(sentryRelease, { | ||
auto: true, | ||
ignoreEmpty: true, | ||
ignoreMissing: true | ||
}) | ||
await cli.releases.uploadSourceMaps(sentryRelease, { | ||
include: [path.join(__dirname, '..', 'dist')], | ||
ext: ['map', 'js'] | ||
}) | ||
await cli.releases.finalize(sentryRelease) | ||
await cli.releases.newDeploy(sentryRelease, { | ||
env: opts.env | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
#!/usr/bin/env node | ||
|
||
import sade from 'sade' | ||
|
||
import { buildCmd } from './build.js' | ||
|
||
const env = process.env.ENV || 'dev' | ||
const prog = sade('attach-pipeline') | ||
|
||
prog | ||
.command('build') | ||
.describe('Build the worker.') | ||
.option('--env', 'Environment', env) | ||
.action(buildCmd) | ||
|
||
prog.parse(process.argv) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import { | ||
NoTokenError, | ||
ExpectedBasicStringError, | ||
NoValidTokenError | ||
} from './errors.js' | ||
|
||
/** | ||
* Middleware: verify the request is authenticated with a valid JWT token. | ||
* | ||
* @param {import('itty-router').RouteHandler<Request>} handler | ||
* @returns {import('itty-router').RouteHandler<Request>} | ||
*/ | ||
export function withAuthToken (handler) { | ||
/** | ||
* @param {Request} request | ||
* @param {import('./env').Env} env | ||
* @returns {Promise<Response>} | ||
*/ | ||
return async (request, env, ctx) => { | ||
const token = getTokenFromRequest(request) | ||
if (token !== env.SECRET) { | ||
throw new NoValidTokenError() | ||
} | ||
return handler(request, env, ctx) | ||
} | ||
} | ||
|
||
/** | ||
* @param {Request} request | ||
*/ | ||
function getTokenFromRequest (request) { | ||
const authHeader = request.headers.get('Authorization') || '' | ||
if (!authHeader) { | ||
throw new NoTokenError() | ||
} | ||
|
||
const token = parseAuthorizationHeader(authHeader) | ||
if (!token) { | ||
throw new NoTokenError() | ||
} | ||
return token | ||
} | ||
|
||
/** | ||
* @param {string} header | ||
* @returns | ||
*/ | ||
function parseAuthorizationHeader (header) { | ||
if (!header.toLowerCase().startsWith('basic ')) { | ||
throw new ExpectedBasicStringError() | ||
} | ||
|
||
return header.substring(6) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
import Toucan from 'toucan-js' | ||
import { Logging } from '@web3-storage/worker-utils/loki' | ||
|
||
export {} | ||
|
||
export interface EnvInput { | ||
ENV: string | ||
DEBUG: string | ||
SECRET: string | ||
ATTACH_PIPELINE_SECRET: string | ||
SENTRY_DSN?: string | ||
LOKI_URL?: string | ||
LOKI_TOKEN?: string | ||
ATTACH_PIPELINE_QUEUE: Queue | ||
CARPARK: R2Bucket | ||
} | ||
|
||
export interface EnvTransformed { | ||
VERSION: string | ||
BRANCH: string | ||
COMMITHASH: string | ||
SENTRY_RELEASE: string | ||
sentry?: Toucan | ||
log: Logging | ||
} | ||
|
||
export type Env = EnvInput & EnvTransformed | ||
|
||
declare global { | ||
const BRANCH: string | ||
const VERSION: string | ||
const COMMITHASH: string | ||
const SENTRY_RELEASE: string | ||
const ENV: string | ||
} | ||
|
||
// Temporary Queue types while not in https://github.com/cloudflare/workers-types/blob/master/index.d.ts | ||
// These come from https://github.com/cloudflare/miniflare/blob/4c1bfdb8e4da7fa87ec69fcc28531b894b858693/packages/shared/src/queues.ts | ||
export const kGetConsumer = Symbol("kGetConsumer"); | ||
export const kSetConsumer = Symbol("kSetConsumer"); | ||
|
||
export type QueueEventDispatcher = (batch: MessageBatch) => Promise<void>; | ||
|
||
export interface QueueBroker { | ||
getOrCreateQueue(name: string): Queue; | ||
|
||
setConsumer(queue: Queue, consumer: Consumer): void; | ||
} | ||
|
||
export interface Consumer { | ||
queueName: string; | ||
maxBatchSize: number; | ||
maxWaitMs: number; | ||
dispatcher: QueueEventDispatcher; | ||
} | ||
|
||
// External types (exposed to user code): | ||
export type MessageSendOptions = { | ||
// Reserved | ||
}; | ||
|
||
export type MessageSendRequest<Body = unknown> = { | ||
body: Body; | ||
} & MessageSendOptions; | ||
|
||
export interface Queue<Body = unknown> { | ||
send(message: Body, options?: MessageSendOptions): Promise<void>; | ||
sendBatch(batch: Iterable<MessageSendRequest<Body>>): Promise<void>; | ||
|
||
[kSetConsumer](consumer: Consumer): void; | ||
[kGetConsumer](): Consumer | null; | ||
} | ||
|
||
export interface Message<Body = unknown> { | ||
readonly id: string; | ||
readonly timestamp: Date; | ||
readonly body: Body; | ||
retry(): void; | ||
} | ||
|
||
export interface MessageBatch<Body = unknown> { | ||
readonly queue: string; | ||
readonly messages: Message<Body>[]; | ||
retryAll(): void; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* eslint-env serviceworker */ | ||
|
||
/** | ||
* @param {import('itty-router').RouteHandler<Request>} handler | ||
* @returns {import('itty-router').RouteHandler<Request>} | ||
*/ | ||
export function withCorsHeaders (handler) { | ||
/** | ||
* @param {Request} request | ||
* @returns {Promise<Response>} | ||
*/ | ||
return async (request, ...rest) => { | ||
const response = await handler(request, ...rest) | ||
return addCorsHeaders(request, response) | ||
} | ||
} | ||
|
||
/** | ||
* @param {Request} request | ||
* @param {Response} response | ||
* @returns {Response} | ||
*/ | ||
export function addCorsHeaders (request, response) { | ||
// Clone the response so that it's no longer immutable (like if it comes from cache or fetch) | ||
response = new Response(response.body, response) | ||
const origin = request.headers.get('origin') | ||
if (origin) { | ||
response.headers.set('Access-Control-Allow-Origin', origin) | ||
response.headers.set('Vary', 'Origin') | ||
} else { | ||
response.headers.set('Access-Control-Allow-Origin', '*') | ||
} | ||
response.headers.set('Access-Control-Expose-Headers', 'Link') | ||
return response | ||
} |
Oops, something went wrong.