Skip to content

Commit

Permalink
feat: use d1 for tile lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
zackpollard committed Feb 19, 2025
1 parent 568084b commit a3ed5ed
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 30 deletions.
5 changes: 5 additions & 0 deletions deployment/modules/cloudflare/tiles-worker/workers.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ resource "cloudflare_workers_script" "tiles" {
namespace_id = data.terraform_remote_state.tiles_state.outputs.kv_namespace_id
}

d1_database_binding {
database_id = data.terraform_remote_state.tiles_state.outputs.tiles_lookup_d1_id
name = "D1_TILE_LOOKUP"
}

compatibility_date = "2024-07-29"
compatibility_flags = ["nodejs_compat"]
}
Expand Down
269 changes: 269 additions & 0 deletions tiles/src/d1-warmer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
import { S3Client } from '@aws-sdk/client-s3';
import fetch_retry from 'fetch-retry';
import { gunzipSync } from 'fflate';
import { appendFileSync, writeFileSync } from 'fs';
import pLimit from 'p-limit';
import { AsyncFn, IKeyValueRepository, IMetricsRepository, IStorageRepository, Operation } from './interface';
import { DirectoryStream, PMTilesService } from './pmtiles/pmtiles.service';
import { Compression, Directory, Header } from './pmtiles/types';
import { deserializeIndex } from './pmtiles/utils';
import { MemCacheRepository, S3StorageRepository } from './repository';

const fetch = fetch_retry(global.fetch);

class FakeKVRepository implements IKeyValueRepository {
constructor() {}

async get(): Promise<string | undefined> {
return undefined;
}

async getAsStream(): Promise<ReadableStream | undefined> {
return undefined;
}
}

class FakeMetricsRepository implements IMetricsRepository {
constructor() {}

monitorAsyncFunction<T extends AsyncFn>(
operation: Operation,
call: T,
): (...args: Parameters<T>) => Promise<Awaited<ReturnType<T>>> {
return call;
}
push(): void {}
}

export function decompress(buf: ArrayBuffer, compression: Compression): ArrayBuffer {
if (compression !== Compression.Gzip) {
throw new Error('Compression method not supported');
}
const result = gunzipSync(new Uint8Array(buf));
return result;
}

const getDirectory = async (length: number, offset: number, source: IStorageRepository, header: Header) => {
const resp = await source.getRange({ offset, length });
const data = decompress(resp, header.internalCompression);
const entries = deserializeIndex(await new Response(data).arrayBuffer());
if (entries.length === 0) {
throw new Error('Empty directory is invalid');
}
const directory: Directory = { offsetStart: entries[0].offset, tileIdStart: entries[0].tileId, entries };
return directory;
};

const handler = async () => {
const {
S3_ACCESS_KEY,
S3_SECRET_KEY,
S3_ENDPOINT,
KV_API_KEY,
CLOUDFLARE_ACCOUNT_ID,
KV_NAMESPACE_ID,
BUCKET_KEY,
DEPLOYMENT_KEY,
} = process.env;
if (
!S3_ACCESS_KEY ||
!S3_SECRET_KEY ||
!S3_ENDPOINT ||
!KV_API_KEY ||
!CLOUDFLARE_ACCOUNT_ID ||
!KV_NAMESPACE_ID ||
!BUCKET_KEY ||
!DEPLOYMENT_KEY
) {
throw new Error('Missing environment variables');
}

console.log('Starting S3');
const client = new S3Client({
region: 'auto',
endpoint: S3_ENDPOINT,
credentials: {
accessKeyId: S3_ACCESS_KEY,
secretAccessKey: S3_SECRET_KEY,
},
});

const storageRepository = new S3StorageRepository(client, BUCKET_KEY, DEPLOYMENT_KEY);
const memCacheRepository = new MemCacheRepository(new Map());
const kvRepository = new FakeKVRepository();
const metricsRepository = new FakeMetricsRepository();
const pmTilesService = await PMTilesService.init(
storageRepository,
memCacheRepository,
kvRepository,
metricsRepository,
null as unknown as D1Database,
);

console.log('Checking if already warmed');
const kvCheckKey = encodeURIComponent(`${DEPLOYMENT_KEY}|kv-warmed`);
console.log(kvCheckKey);
console.log(
'url',
`https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces/${KV_NAMESPACE_ID}/values/${kvCheckKey}`,
);
// const kvCheckResponse = await fetch(
// `https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces/${KV_NAMESPACE_ID}/values/${kvCheckKey}`,
// {
// method: 'GET',
// headers: {
// Authorization: `Bearer ${KV_API_KEY}`,
// 'Content-Type': 'application/json',
// },
// },
// );
// if (kvCheckResponse.status === 200) {
// console.log('Already warmed');
// return;
// }

// if (kvCheckResponse.status !== 404) {
// console.error('KV Check Failed');
// throw new Error('KV Check Failed, status code ' + kvCheckResponse.status);
// }
//
// console.log('Not warmed', kvCheckResponse.status);

const [header, root] = await pmTilesService.getHeaderAndRootFromSource();
let countR2 = 0;
let total = 0;
let countKV = 0;
const promises: Promise<void>[] = [];
const toPushToKV: { value: string; key: string }[] = [];
const kvPromises: Promise<void>[] = [];
const limit = pLimit(10);
const kvLimit = pLimit(30);

const bulkKVPush = async (toPushOverride?: object[]) => {
if (toPushToKV.length < 10 && !toPushOverride) {
return;
}
const toPush = toPushOverride ?? toPushToKV.splice(0, 10);
const kvResponse = await fetch(
`https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces/${KV_NAMESPACE_ID}/bulk`,
{
retries: 5,
retryDelay: 2000,
retryOn: function (attempt, error, response) {
// retry on any network error, or 4xx or 5xx status codes
if (error !== null || (!!response && response.status >= 400)) {
console.log(`retrying, attempt number ${attempt + 1}`);
return true;
}
return false;
},
method: 'PUT',
headers: {
Authorization: `Bearer ${KV_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(toPush),
},
);
if (!kvResponse.ok) {
console.error('KV Put Failed', kvResponse);
throw new Error('KV Put Failed with non-200 status code');
}
const kvResponseBody = (await kvResponse.json()) as { success: boolean };
if (!kvResponseBody.success) {
console.error('KV Put Failed', kvResponseBody);
throw new Error('KV Put Failed');
}
if (toPushToKV.length !== 0) {
console.log('Remaining in KV queue', toPushToKV.length);
}

countKV += toPush.length;
console.log('KV Progress: ' + countKV + '/' + total);
};

let entryCount = 0;

const createTableStatement = `CREATE TABLE IF NOT EXISTS cache_entries (
startTileId INTEGER NOT NULL PRIMARY KEY,
entry TEXT NOT NULL
) STRICT;`;

writeFileSync('cache_entries.sql', createTableStatement);

for (const entry of root.entries) {
const call = async () => {
const directory = await getDirectory(
entry.length,
entry.offset + header.leafDirectoryOffset,
storageRepository,
header,
);

entryCount += directory.entries.length;

console.log('Entry Progress: ' + entryCount);

const totalChunks = Math.ceil(directory.entries.length / 50);
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
const chunkEntries = directory.entries.slice(chunkIndex * 50, (chunkIndex + 1) * 50);
const directoryChunk = {
offsetStart: chunkEntries[0].offset,
tileIdStart: chunkEntries[0].tileId,
entries: chunkEntries,
};

const startTileId = chunkEntries[0].tileId;

const stream = DirectoryStream.fromDirectory(directoryChunk);
const entryValue = await stream.toString();

const insertStatement = `\nINSERT INTO cache_entries (startTileId, endTileId, entry) VALUES (${startTileId}, '${entryValue}');`;
appendFileSync(`cache_entries.${Math.floor(countR2 / 50)}.sql`, insertStatement);
entryCount++;
}

countR2++;
console.log('R2 Progress: ' + countR2 + '/' + total);
};
promises.push(limit(call));
total++;
}

await Promise.all(promises);
while (toPushToKV.length > 0) {
const kvToPush = toPushToKV.splice(0, 10);
kvPromises.push(kvLimit(() => bulkKVPush(kvToPush)));
console.log('Remaining in KV queue', toPushToKV.length);
}
await Promise.all(kvPromises);
console.log('Done');
const kvResponse = await fetch(
`https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces/${KV_NAMESPACE_ID}/values/${kvCheckKey}`,
{
method: 'PUT',
headers: {
Authorization: `Bearer ${KV_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ value: 'warmed' }),
},
);
if (!kvResponse.ok) {
console.error('Write KV Success Failed');
throw new Error('Write KV Success Failed');
}
};

process.on('uncaughtException', (e) => {
console.error('UNCAUGHT EXCEPTION');
console.error('stack', e);
process.exit(1);
});

handler()
.then(() => console.log('Done'))
.catch((e) => {
console.error('Error', e);
throw e;
});
1 change: 1 addition & 0 deletions tiles/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ async function handleRequest(
memCacheRepository,
kvRepository,
metrics,
env.D1_TILE_LOOKUP,
);

const respHeaders = new Headers();
Expand Down
Loading

0 comments on commit a3ed5ed

Please sign in to comment.