Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add d1 tiles database #83

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 1 addition & 37 deletions .github/workflows/tiles-worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,44 +159,8 @@ jobs:
TF_VAR_tiles_build_dir: "${{ github.workspace }}/dist"
run: op run --env-file=".env" -- terragrunt run-all plan -no-color 2>&1 | tee "${{github.workspace}}/plan_output.txt" && exit ${PIPESTATUS[0]};

kv-warming:
needs: [build, test]
name: KV Warming
runs-on: mich
if: github.event_name == 'workflow_dispatch' || github.ref == 'refs/heads/main'
defaults:
run:
working-directory: ./tiles
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Setup Node
uses: actions/setup-node@v4
with:
node-version-file: './tiles/.nvmrc'

- name: Run npm install
run: npm ci

- name: Get tiles.json
run: echo "TILES_JSON=$(jq -c . < ${{ github.workspace }}/deployment/modules/cloudflare/tiles-worker/tiles.tfvars.json)" >> $GITHUB_ENV

- name: Run kv warming
env:
S3_ACCESS_KEY: ${{ secrets.CLOUDFLARE_TILES_R2_KV_TOKEN_ID }}
S3_SECRET_KEY: ${{ secrets.CLOUDFLARE_TILES_R2_KV_TOKEN_HASHED_VALUE }}
S3_ENDPOINT: https://${{ secrets.CLOUDFLARE_ACCOUNT_ID }}.r2.cloudflarestorage.com
KV_API_KEY: ${{ secrets.CLOUDFLARE_TILES_R2_KV_TOKEN_VALUE }}
CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }}
# Figure out how to extract this from terraform at some point or get it into github vars
KV_NAMESPACE_ID: 5a4b82694e8b490db8b8904cdaea4f00
BUCKET_KEY: tiles-weur
DEPLOYMENT_KEY: ${{ fromJson(env.TILES_JSON).pmtiles_deployment_key }}
run: npm run kv:warm

deploy-terragrunt:
needs: [build, test, kv-warming]
needs: [build, test]
name: Deploy Terragrunt
runs-on: ubuntu-latest
if: github.event_name == 'workflow_dispatch' || github.ref == 'refs/heads/main'
Expand Down
5 changes: 5 additions & 0 deletions deployment/modules/cloudflare/tiles-worker/workers.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ resource "cloudflare_workers_script" "tiles" {
namespace_id = data.terraform_remote_state.tiles_state.outputs.kv_namespace_id
}

d1_database_binding {
database_id = data.terraform_remote_state.tiles_state.outputs.tiles_lookup_d1_id
name = "D1_TILE_LOOKUP"
}

compatibility_date = "2024-07-29"
compatibility_flags = ["nodejs_compat"]
}
Expand Down
16 changes: 16 additions & 0 deletions deployment/modules/cloudflare/tiles/d1.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
resource "cloudflare_d1_database" "tiles_lookup" {
account_id = var.cloudflare_account_id
name = "tiles-lookup"
lifecycle {
prevent_destroy = true
}
}

output "tiles_lookup_d1_id" {
value = cloudflare_d1_database.tiles_lookup.id
}

import {
to = cloudflare_d1_database.tiles_lookup
id = "${var.cloudflare_account_id}/d498d485-7709-410a-9e16-143c8ff4f016"
}
154 changes: 154 additions & 0 deletions tiles/src/d1-warmer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { S3Client } from '@aws-sdk/client-s3';
import { gunzipSync } from 'fflate';
import { appendFileSync, writeFileSync } from 'fs';
import pLimit from 'p-limit';
import { AsyncFn, IMetricsRepository, IStorageRepository, Operation } from './interface';
import { DirectoryString, PMTilesService } from './pmtiles/pmtiles.service';
import { Compression, Directory, Header } from './pmtiles/types';
import { deserializeIndex } from './pmtiles/utils';
import { CloudflareD1Repository, MemCacheRepository, S3StorageRepository } from './repository';

class FakeMetricsRepository implements IMetricsRepository {
constructor() {}

monitorAsyncFunction<T extends AsyncFn>(
operation: Operation,
call: T,
): (...args: Parameters<T>) => Promise<Awaited<ReturnType<T>>> {
return call;
}
push(): void {}
}

export function decompress(buf: ArrayBuffer, compression: Compression): ArrayBuffer {
if (compression !== Compression.Gzip) {
throw new Error('Compression method not supported');
}
const result = gunzipSync(new Uint8Array(buf));
return result;
}

const getDirectory = async (length: number, offset: number, source: IStorageRepository, header: Header) => {
const resp = await source.getRange({ offset, length });
const data = decompress(resp, header.internalCompression);
const entries = deserializeIndex(await new Response(data).arrayBuffer());
if (entries.length === 0) {
throw new Error('Empty directory is invalid');
}
const directory: Directory = { offsetStart: entries[0].offset, tileIdStart: entries[0].tileId, entries };
return directory;
};

const handler = async () => {
const {
S3_ACCESS_KEY,
S3_SECRET_KEY,
S3_ENDPOINT,
KV_API_KEY,
CLOUDFLARE_ACCOUNT_ID,
KV_NAMESPACE_ID,
BUCKET_KEY,
DEPLOYMENT_KEY,
} = process.env;
if (
!S3_ACCESS_KEY ||
!S3_SECRET_KEY ||
!S3_ENDPOINT ||
!KV_API_KEY ||
!CLOUDFLARE_ACCOUNT_ID ||
!KV_NAMESPACE_ID ||
!BUCKET_KEY ||
!DEPLOYMENT_KEY
) {
throw new Error('Missing environment variables');
}

console.log('Starting S3');
const client = new S3Client({
region: 'auto',
endpoint: S3_ENDPOINT,
credentials: {
accessKeyId: S3_ACCESS_KEY,
secretAccessKey: S3_SECRET_KEY,
},
});

const storageRepository = new S3StorageRepository(client, BUCKET_KEY, DEPLOYMENT_KEY);
const memCacheRepository = new MemCacheRepository(new Map());
const metricsRepository = new FakeMetricsRepository();
const pmTilesService = await PMTilesService.init(
storageRepository,
memCacheRepository,
metricsRepository,
null as unknown as CloudflareD1Repository,
);

const [header, root] = await pmTilesService.getHeaderAndRootFromSource();
let countR2 = 0;
let total = 0;
const promises: Promise<void>[] = [];
const limit = pLimit(10);

let entryCount = 0;

const createTableStatement = `CREATE TABLE IF NOT EXISTS cache_entries (
startTileId INTEGER NOT NULL PRIMARY KEY,
entry TEXT NOT NULL
) STRICT;`;

writeFileSync('cache_entries.sql', createTableStatement);

for (const entry of root.entries) {
const call = async () => {
const directory = await getDirectory(
entry.length,
entry.offset + header.leafDirectoryOffset,
storageRepository,
header,
);

entryCount += directory.entries.length;

console.log('Entry Progress: ' + entryCount);

const totalChunks = Math.ceil(directory.entries.length / 50);
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
const chunkEntries = directory.entries.slice(chunkIndex * 50, (chunkIndex + 1) * 50);
const directoryChunk = {
offsetStart: chunkEntries[0].offset,
tileIdStart: chunkEntries[0].tileId,
entries: chunkEntries,
};

const startTileId = chunkEntries[0].tileId;

const stream = DirectoryString.fromDirectory(directoryChunk);
const entryValue = await stream.toString();

const insertStatement = `\nINSERT INTO cache_entries (startTileId, entry) VALUES (${startTileId}, '${entryValue}');`;
appendFileSync(`cache_entries.${Math.floor(countR2 / 50)}.sql`, insertStatement);
entryCount++;
}

countR2++;
console.log('R2 Progress: ' + countR2 + '/' + total);
};
promises.push(limit(call));
total++;
}

await Promise.all(promises);
};

process.on('uncaughtException', (e) => {
console.error('UNCAUGHT EXCEPTION');
console.error('stack', e);
process.exit(1);
});

handler()
.then(() => console.log('Done'))
.catch((e) => {
console.error('Error', e);
throw e;
});
6 changes: 3 additions & 3 deletions tiles/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { preferredBuckets, R2BucketRegion } from './buckets';
import { IMetricsRepository } from './interface';
import { PMTilesService } from './pmtiles/pmtiles.service';
import {
CloudflareD1Repository,
CloudflareDeferredRepository,
CloudflareKVRepository,
CloudflareMetricsRepository,
HeaderMetricsProvider,
InfluxMetricsProvider,
Expand Down Expand Up @@ -156,7 +156,7 @@ async function handleRequest(
}

const memCacheRepository = new MemCacheRepository(globalThis.memCache);
const kvRepository = new CloudflareKVRepository(env.KV);
const d1Repository = new CloudflareD1Repository(env.D1_TILE_LOOKUP);
const bucketMap: Record<R2BucketRegion, R2Bucket> = {
apac: env.BUCKET_APAC,
eeur: env.BUCKET_EEUR,
Expand All @@ -176,8 +176,8 @@ async function handleRequest(
const pmTilesService = await metrics.monitorAsyncFunction({ name: 'pmtiles_init' }, PMTilesService.init)(
storageRepository,
memCacheRepository,
kvRepository,
metrics,
d1Repository,
);

const respHeaders = new Headers();
Expand Down
5 changes: 2 additions & 3 deletions tiles/src/interface.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Metric } from './repository';

export interface IKeyValueRepository {
get(key: string): Promise<string | undefined>;
getAsStream(key: string): Promise<ReadableStream | undefined>;
export interface IDatabaseRepository {
query(query: string, ...values: unknown[]): Promise<D1Result<Record<string, unknown>>>;
}

export interface IMemCacheRepository {
Expand Down
Loading
Loading