From 753bdce41597200641daba60727ff1b53d2b512e Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 15 Aug 2023 04:37:47 -0700 Subject: [PATCH] feat(store-sync,store-indexer): consolidate sync logic, add syncToSqlite (#1240) --- .changeset/smooth-elephants-wave.md | 9 + .../client-vanilla/src/mud/setupNetwork.ts | 2 - e2e/packages/sync-test/setup/startIndexer.ts | 37 ++- .../src/ui/LoadingScreen/LoadingScreen.tsx | 3 +- packages/dev-tools/src/events/EventIcon.tsx | 21 +- .../src/events/StorageOperationsTable.tsx | 13 +- packages/store-indexer/bin/sqlite-indexer.ts | 18 +- .../store-indexer/src/sqlite/createIndexer.ts | 92 -------- packages/store-sync/src/SyncStep.ts | 6 + .../store-sync/src/blockLogsToStorage.test.ts | 2 + packages/store-sync/src/blockLogsToStorage.ts | 3 + packages/store-sync/src/common.ts | 54 ++++- packages/store-sync/src/createStoreSync.ts | 180 ++++++++++++++ packages/store-sync/src/index.ts | 1 + packages/store-sync/src/recs/common.ts | 7 - packages/store-sync/src/recs/recsStorage.ts | 18 +- .../store-sync/src/recs/syncStepToMessage.ts | 17 ++ packages/store-sync/src/recs/syncToRecs.ts | 221 +++--------------- packages/store-sync/src/sqlite/index.ts | 1 + .../store-sync/src/sqlite/sqliteStorage.ts | 13 +- .../store-sync/src/sqlite/syncToSqlite.ts | 56 +++++ .../store-sync/src/trpc-indexer/common.ts | 13 +- .../src/trpc-indexer/createAppRouter.ts | 2 +- .../src/ui/LoadingScreen/LoadingScreen.tsx | 3 +- 24 files changed, 439 insertions(+), 353 deletions(-) create mode 100644 .changeset/smooth-elephants-wave.md delete mode 100644 packages/store-indexer/src/sqlite/createIndexer.ts create mode 100644 packages/store-sync/src/SyncStep.ts create mode 100644 packages/store-sync/src/createStoreSync.ts create mode 100644 packages/store-sync/src/recs/syncStepToMessage.ts create mode 100644 packages/store-sync/src/sqlite/syncToSqlite.ts diff --git a/.changeset/smooth-elephants-wave.md b/.changeset/smooth-elephants-wave.md new file mode 100644 index 0000000000..63b368bfe4 --- /dev/null +++ b/.changeset/smooth-elephants-wave.md @@ -0,0 +1,9 @@ +--- +"@latticexyz/dev-tools": patch +"@latticexyz/store-indexer": minor +"@latticexyz/store-sync": minor +--- + +Store sync logic is now consolidated into a `createStoreSync` function exported from `@latticexyz/store-sync`. This simplifies each storage sync strategy to just a simple wrapper around the storage adapter. You can now sync to RECS with `syncToRecs` or SQLite with `syncToSqlite` and PostgreSQL support coming soon. + +There are no breaking changes if you were just using `syncToRecs` from `@latticexyz/store-sync` or running the `sqlite-indexer` binary from `@latticexyz/store-indexer`. diff --git a/e2e/packages/client-vanilla/src/mud/setupNetwork.ts b/e2e/packages/client-vanilla/src/mud/setupNetwork.ts index 4a615e6381..341c3e303d 100644 --- a/e2e/packages/client-vanilla/src/mud/setupNetwork.ts +++ b/e2e/packages/client-vanilla/src/mud/setupNetwork.ts @@ -18,8 +18,6 @@ export async function setupNetwork() { pollingInterval: 1000, } as const satisfies ClientConfig; - console.log("client options", clientOptions); - const publicClient = createPublicClient(clientOptions); const burnerAccount = createBurnerAccount(networkConfig.privateKey as Hex); diff --git a/e2e/packages/sync-test/setup/startIndexer.ts b/e2e/packages/sync-test/setup/startIndexer.ts index 38c7f1904b..d13b5a3e58 100644 --- a/e2e/packages/sync-test/setup/startIndexer.ts +++ b/e2e/packages/sync-test/setup/startIndexer.ts @@ -11,13 +11,17 @@ export function startIndexer( ) { let resolve: () => void; let reject: (reason?: string) => void; + const doneSyncing = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); console.log(chalk.magenta("[indexer]:"), "start syncing"); const proc = execa("pnpm", ["start"], { cwd: path.join(__dirname, "..", "..", "..", "..", "packages", "store-indexer"), env: { - DEBUG: "mud:store-indexer", + DEBUG: "mud:*", PORT: port.toString(), CHAIN_ID: "31337", RPC_HTTP_URL: rpcUrl, @@ -32,31 +36,23 @@ export function startIndexer( reject(errorMessage); }); - proc.stdout?.on("data", (data) => { - const dataString = data.toString(); - const errors = extractLineContaining("ERROR", dataString).join("\n"); + function onLog(data: string) { + const errors = extractLineContaining("ERROR", data).join("\n"); if (errors) { - console.log(chalk.magenta("[indexer error]:", errors)); - reject(errors); - } - console.log(chalk.magentaBright("[indexer]:", dataString)); - }); - - proc.stderr?.on("data", (data) => { - const dataString = data.toString(); - const modeErrors = extractLineContaining("ERROR", dataString).join("\n"); - if (modeErrors) { - const errorMessage = chalk.magenta("[indexer error]:", modeErrors); + const errorMessage = chalk.magenta("[indexer error]:", errors); console.log(errorMessage); reportError(errorMessage); - reject(modeErrors); + reject(errors); } if (data.toString().includes("all caught up")) { console.log(chalk.magenta("[indexer]:"), "done syncing"); resolve(); } - console.log(chalk.magentaBright("[indexer ingress]:", dataString)); - }); + console.log(chalk.magentaBright("[indexer]:", data)); + } + + proc.stdout?.on("data", (data) => onLog(data.toString())); + proc.stderr?.on("data", (data) => onLog(data.toString())); function cleanUp() { // attempt to clean up sqlite file @@ -75,10 +71,7 @@ export function startIndexer( return { url: `http://127.0.0.1:${port}`, - doneSyncing: new Promise((res, rej) => { - resolve = res; - reject = rej; - }), + doneSyncing, process: proc, kill: () => new Promise((resolve) => { diff --git a/examples/minimal/packages/client-phaser/src/ui/LoadingScreen/LoadingScreen.tsx b/examples/minimal/packages/client-phaser/src/ui/LoadingScreen/LoadingScreen.tsx index ca4cb4d058..3f42736970 100644 --- a/examples/minimal/packages/client-phaser/src/ui/LoadingScreen/LoadingScreen.tsx +++ b/examples/minimal/packages/client-phaser/src/ui/LoadingScreen/LoadingScreen.tsx @@ -4,7 +4,8 @@ import { LoadingBar } from "./LoadingBar"; import { BootScreen } from "./BootScreen"; import { useComponentValue } from "@latticexyz/react"; import { useMUD } from "../../store"; -import { SyncStep, singletonEntity } from "@latticexyz/store-sync/recs"; +import { singletonEntity } from "@latticexyz/store-sync/recs"; +import { SyncStep } from "@latticexyz/store-sync"; export const LoadingScreen = () => { const { diff --git a/packages/dev-tools/src/events/EventIcon.tsx b/packages/dev-tools/src/events/EventIcon.tsx index 6aba7d871d..8b6aa682e3 100644 --- a/packages/dev-tools/src/events/EventIcon.tsx +++ b/packages/dev-tools/src/events/EventIcon.tsx @@ -1,21 +1,22 @@ import { assertExhaustive } from "@latticexyz/common/utils"; -import { StoreEventsAbiItem } from "@latticexyz/store"; +import { StoreConfig } from "@latticexyz/store"; +import { StorageOperation } from "@latticexyz/store-sync"; type Props = { - eventName: StoreEventsAbiItem["name"]; + type: StorageOperation["type"]; }; -export function EventIcon({ eventName }: Props) { - switch (eventName) { - case "StoreSetRecord": +export function EventIcon({ type }: Props) { + switch (type) { + case "SetRecord": return =; - case "StoreSetField": + case "SetField": return +; - case "StoreDeleteRecord": + case "DeleteRecord": return -; - case "StoreEphemeralRecord": - return ~; + // case "EphemeralRecord": + // return ~; default: - return assertExhaustive(eventName, `Unexpected event name: ${eventName}`); + return assertExhaustive(type, `Unexpected storage operation type: ${type}`); } } diff --git a/packages/dev-tools/src/events/StorageOperationsTable.tsx b/packages/dev-tools/src/events/StorageOperationsTable.tsx index b305fbd779..bcf63d8876 100644 --- a/packages/dev-tools/src/events/StorageOperationsTable.tsx +++ b/packages/dev-tools/src/events/StorageOperationsTable.tsx @@ -23,16 +23,23 @@ export function StorageOperationsTable({ operations }: Props) { {operations.map((operation) => ( - + - {operation.log.blockNumber.toString()} + {operation.log?.blockNumber.toString()} {operation.namespace}:{operation.name} {serialize(operation.key)} - + {operation.type === "SetRecord" ? serialize(operation.value) : null} diff --git a/packages/store-indexer/bin/sqlite-indexer.ts b/packages/store-indexer/bin/sqlite-indexer.ts index 6ea684b819..5d34f2e2d8 100644 --- a/packages/store-indexer/bin/sqlite-indexer.ts +++ b/packages/store-indexer/bin/sqlite-indexer.ts @@ -7,13 +7,14 @@ import Database from "better-sqlite3"; import { createPublicClient, fallback, webSocket, http, Transport } from "viem"; import { createHTTPServer } from "@trpc/server/adapters/standalone"; import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer"; -import { chainState, schemaVersion } from "@latticexyz/store-sync/sqlite"; -import { createIndexer } from "../src/sqlite/createIndexer"; +import { chainState, schemaVersion, syncToSqlite } from "@latticexyz/store-sync/sqlite"; import { createStorageAdapter } from "../src/sqlite/createStorageAdapter"; import type { Chain } from "viem/chains"; import * as mudChains from "@latticexyz/common/chains"; import * as chains from "viem/chains"; import { isNotNull } from "@latticexyz/common/utils"; +import { combineLatest, filter, first } from "rxjs"; +import { debug } from "../src/debug"; const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[]; @@ -89,13 +90,24 @@ try { // ignore errors, this is optional } -await createIndexer({ +const { latestBlockNumber$, blockStorageOperations$ } = await syncToSqlite({ database, publicClient, startBlock, maxBlockRange: env.MAX_BLOCK_RANGE, }); +combineLatest([latestBlockNumber$, blockStorageOperations$]) + .pipe( + filter( + ([latestBlockNumber, { blockNumber: lastBlockNumberProcessed }]) => latestBlockNumber === lastBlockNumberProcessed + ), + first() + ) + .subscribe(() => { + console.log("all caught up"); + }); + const server = createHTTPServer({ middleware: cors(), router: createAppRouter(), diff --git a/packages/store-indexer/src/sqlite/createIndexer.ts b/packages/store-indexer/src/sqlite/createIndexer.ts deleted file mode 100644 index 209dfef7c1..0000000000 --- a/packages/store-indexer/src/sqlite/createIndexer.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { PublicClient } from "viem"; -import { - createBlockStream, - isNonPendingBlock, - blockRangeToLogs, - groupLogsByBlockNumber, -} from "@latticexyz/block-logs-stream"; -import { concatMap, filter, from, map, mergeMap, tap } from "rxjs"; -import { storeEventsAbi } from "@latticexyz/store"; -import { blockLogsToStorage } from "@latticexyz/store-sync"; -import { sqliteStorage } from "@latticexyz/store-sync/sqlite"; -import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; -import { debug } from "../debug"; - -type CreateIndexerOptions = { - /** - * [SQLite database object from Drizzle][0]. - * - * [0]: https://orm.drizzle.team/docs/installation-and-db-connection/sqlite/better-sqlite3 - */ - database: BaseSQLiteDatabase<"sync", any>; - /** - * [viem `PublicClient`][0] used for fetching logs from the RPC. - * - * [0]: https://viem.sh/docs/clients/public.html - */ - publicClient: PublicClient; - /** - * Optional block number to start indexing from. Useful for resuming the indexer from a particular point in time or starting after a particular contract deployment. - */ - startBlock?: bigint; - /** - * Optional maximum block range, if your RPC limits the amount of blocks fetched at a time. - */ - maxBlockRange?: bigint; -}; - -/** - * Creates an indexer to process and store blockchain events. - * - * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. - * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. - */ -export async function createIndexer({ - database, - publicClient, - startBlock = 0n, - maxBlockRange, -}: CreateIndexerOptions): Promise<() => void> { - const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }); - - const latestBlockNumber$ = latestBlock$.pipe( - filter(isNonPendingBlock), - map((block) => block.number) - ); - - let latestBlockNumber: bigint | null = null; - const blockLogs$ = latestBlockNumber$.pipe( - tap((blockNumber) => { - latestBlockNumber = blockNumber; - debug("latest block number", blockNumber); - }), - map((blockNumber) => ({ startBlock, endBlock: blockNumber })), - blockRangeToLogs({ - publicClient, - events: storeEventsAbi, - maxBlockRange, - }), - tap(({ fromBlock, toBlock, logs }) => { - debug("found", logs.length, "logs for block", fromBlock, "-", toBlock); - }), - mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))) - ); - - let lastBlockNumberProcessed: bigint | null = null; - const sub = blockLogs$ - .pipe( - concatMap(blockLogsToStorage(await sqliteStorage({ database, publicClient }))), - tap(({ blockNumber, operations }) => { - lastBlockNumberProcessed = blockNumber; - debug("stored", operations.length, "operations for block", blockNumber); - if (latestBlockNumber === lastBlockNumberProcessed) { - debug("all caught up"); - } - }) - ) - .subscribe(); - - return () => { - sub.unsubscribe(); - }; -} diff --git a/packages/store-sync/src/SyncStep.ts b/packages/store-sync/src/SyncStep.ts new file mode 100644 index 0000000000..be0ce83c5b --- /dev/null +++ b/packages/store-sync/src/SyncStep.ts @@ -0,0 +1,6 @@ +export enum SyncStep { + INITIALIZE = "initialize", + SNAPSHOT = "snapshot", + RPC = "rpc", + LIVE = "live", +} diff --git a/packages/store-sync/src/blockLogsToStorage.test.ts b/packages/store-sync/src/blockLogsToStorage.test.ts index 9e93b24ffe..30a682321c 100644 --- a/packages/store-sync/src/blockLogsToStorage.test.ts +++ b/packages/store-sync/src/blockLogsToStorage.test.ts @@ -120,6 +120,7 @@ describe("blockLogsToStorage", () => { "blockNumber": 5448n, "operations": [ { + "address": "0x5FbDB2315678afecb367f032d93F642f64180aa3", "fieldName": "amount", "fieldValue": 8, "key": { @@ -173,6 +174,7 @@ describe("blockLogsToStorage", () => { "blockNumber": 5448n, "operations": [ { + "address": "0x5FbDB2315678afecb367f032d93F642f64180aa3", "fieldName": "amount", "fieldValue": 8, "key": { diff --git a/packages/store-sync/src/blockLogsToStorage.ts b/packages/store-sync/src/blockLogsToStorage.ts index 090dba77a3..652c92069b 100644 --- a/packages/store-sync/src/blockLogsToStorage.ts +++ b/packages/store-sync/src/blockLogsToStorage.ts @@ -202,6 +202,7 @@ export function blockLogsToStorage({ // they'll eventually be turned into "events", but unclear if that should translate to client storage operations return { log, + address: getAddress(log.address), type: "SetRecord", ...tableId, key, @@ -217,6 +218,7 @@ export function blockLogsToStorage({ >[typeof fieldName]; return { log, + address: getAddress(log.address), type: "SetField", ...tableId, key, @@ -228,6 +230,7 @@ export function blockLogsToStorage({ if (log.eventName === "StoreDeleteRecord") { return { log, + address: getAddress(log.address), type: "DeleteRecord", ...tableId, key, diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 41f00420d7..b0d5b3ba63 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -1,4 +1,4 @@ -import { Address, Hex } from "viem"; +import { Address, Block, Hex, PublicClient, TransactionReceipt } from "viem"; import { GetLogsResult, GroupLogsByBlockNumberResult, NonPendingLog } from "@latticexyz/block-logs-stream"; import { StoreEventsAbi, @@ -7,7 +7,10 @@ import { ValueSchema, ConfigToKeyPrimitives as Key, ConfigToValuePrimitives as Value, + TableRecord, } from "@latticexyz/store"; +import { Observable } from "rxjs"; +import { BlockStorageOperations } from "./blockLogsToStorage"; export type ChainId = number; export type WorldId = `${ChainId}:${Address}`; @@ -24,11 +27,14 @@ export type Table = { valueSchema: ValueSchema; }; +export type TableWithRecords = Table & { records: TableRecord[] }; + export type StoreEventsLog = GetLogsResult[number]; export type BlockLogs = GroupLogsByBlockNumberResult[number]; export type BaseStorageOperation = { - log: NonPendingLog; + log?: NonPendingLog; + address: Hex; namespace: TableNamespace; name: TableName; }; @@ -70,3 +76,47 @@ export type StorageOperation = | SetFieldOperation | SetRecordOperation | DeleteRecordOperation; + +export type SyncOptions = { + /** + * MUD config + */ + config?: TConfig; + /** + * [viem `PublicClient`][0] used for fetching logs from the RPC. + * + * [0]: https://viem.sh/docs/clients/public.html + */ + publicClient: PublicClient; + /** + * MUD Store/World contract address + */ + address?: Address; + /** + * Optional block number to start indexing from. Useful for resuming the indexer from a particular point in time or starting after a particular contract deployment. + */ + startBlock?: bigint; + /** + * Optional maximum block range, if your RPC limits the amount of blocks fetched at a time. + */ + maxBlockRange?: bigint; + /** + * Optional MUD tRPC indexer URL to fetch initial state from. + */ + indexerUrl?: string; + /** + * Optional initial state to hydrate from. Useful if you're hydrating from your own indexer or cache. + */ + initialState?: { + blockNumber: bigint | null; + tables: TableWithRecords[]; + }; +}; + +export type SyncResult = { + latestBlock$: Observable; + latestBlockNumber$: Observable; + blockLogs$: Observable; + blockStorageOperations$: Observable>; + waitForTransaction: (tx: Hex) => Promise<{ receipt: TransactionReceipt }>; +}; diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts new file mode 100644 index 0000000000..0e99d49685 --- /dev/null +++ b/packages/store-sync/src/createStoreSync.ts @@ -0,0 +1,180 @@ +import { ConfigToKeyPrimitives, ConfigToValuePrimitives, StoreConfig, storeEventsAbi } from "@latticexyz/store"; +import { Hex, TransactionReceipt } from "viem"; +import { SetRecordOperation, SyncOptions, SyncResult } from "./common"; +import { + createBlockStream, + isNonPendingBlock, + blockRangeToLogs, + groupLogsByBlockNumber, +} from "@latticexyz/block-logs-stream"; +import { filter, map, tap, mergeMap, from, concatMap, share, firstValueFrom } from "rxjs"; +import { blockLogsToStorage } from "./blockLogsToStorage"; +import { debug as parentDebug } from "./debug"; +import { createIndexerClient } from "./trpc-indexer"; +import { BlockLogsToStorageOptions } from "./blockLogsToStorage"; +import { SyncStep } from "./SyncStep"; + +const debug = parentDebug.extend("createStoreSync"); + +type CreateStoreSyncOptions = SyncOptions & { + storageAdapter: BlockLogsToStorageOptions; + onProgress?: (opts: { + step: SyncStep; + percentage: number; + latestBlockNumber: bigint; + lastBlockNumberProcessed: bigint; + }) => void; +}; + +type CreateStoreSyncResult = SyncResult; + +export async function createStoreSync({ + storageAdapter, + onProgress, + address, + publicClient, + startBlock = 0n, + maxBlockRange, + initialState, + indexerUrl, +}: CreateStoreSyncOptions): Promise> { + if (indexerUrl != null && initialState == null) { + try { + const indexer = createIndexerClient({ url: indexerUrl }); + const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); + initialState = await indexer.findAll.query({ chainId, address }); + } catch (error) { + debug("couldn't get initial state from indexer", error); + } + } + + if (initialState != null) { + const { blockNumber, tables } = initialState; + if (blockNumber != null) { + debug("hydrating from initial state to block", initialState.blockNumber); + startBlock = blockNumber + 1n; + + await storageAdapter.registerTables({ blockNumber, tables }); + + const numRecords = initialState.tables.reduce((sum, table) => sum + table.records.length, 0); + const recordsPerProgressUpdate = Math.floor(numRecords / 100); + let recordsProcessed = 0; + let recordsProcessedSinceLastUpdate = 0; + + for (const table of initialState.tables) { + await storageAdapter.storeOperations({ + blockNumber, + operations: table.records.map( + (record) => + ({ + type: "SetRecord", + address: table.address, + namespace: table.namespace, + name: table.name, + key: record.key as ConfigToKeyPrimitives, + value: record.value as ConfigToValuePrimitives, + } as const satisfies SetRecordOperation) + ), + }); + + recordsProcessed += table.records.length; + recordsProcessedSinceLastUpdate += table.records.length; + + if (recordsProcessedSinceLastUpdate > recordsPerProgressUpdate) { + recordsProcessedSinceLastUpdate = 0; + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: (recordsProcessed / numRecords) * 100, + latestBlockNumber: 0n, + lastBlockNumberProcessed: blockNumber, + }); + } + + debug(`hydrated ${table.records.length} records for table ${table.namespace}:${table.name}`); + } + } + } + + // TODO: if startBlock is still 0, find via deploy event + + debug("starting sync from block", startBlock); + + const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share()); + + const latestBlockNumber$ = latestBlock$.pipe( + filter(isNonPendingBlock), + map((block) => block.number), + share() + ); + + let latestBlockNumber: bigint | null = null; + const blockLogs$ = latestBlockNumber$.pipe( + tap((blockNumber) => { + debug("latest block number", blockNumber); + latestBlockNumber = blockNumber; + }), + map((blockNumber) => ({ startBlock, endBlock: blockNumber })), + blockRangeToLogs({ + publicClient, + address, + events: storeEventsAbi, + maxBlockRange, + }), + mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), + share() + ); + + let lastBlockNumberProcessed: bigint | null = null; + const blockStorageOperations$ = blockLogs$.pipe( + concatMap(blockLogsToStorage(storageAdapter)), + tap(({ blockNumber, operations }) => { + debug("stored", operations.length, "operations for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + + if (latestBlockNumber != null) { + if (blockNumber < latestBlockNumber) { + onProgress?.({ + step: SyncStep.RPC, + percentage: Number((lastBlockNumberProcessed * 1000n) / (latestBlockNumber * 1000n)) / 100, + latestBlockNumber, + lastBlockNumberProcessed, + }); + } else { + onProgress?.({ + step: SyncStep.LIVE, + percentage: 100, + latestBlockNumber, + lastBlockNumberProcessed, + }); + } + } + }), + share() + ); + + async function waitForTransaction(tx: Hex): Promise<{ + receipt: TransactionReceipt; + }> { + // Wait for tx to be mined + const receipt = await publicClient.waitForTransactionReceipt({ hash: tx }); + + // If we haven't processed a block yet or we haven't processed the block for the tx, wait for it + if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) { + await firstValueFrom( + blockStorageOperations$.pipe( + filter(({ blockNumber }) => blockNumber != null && blockNumber >= receipt.blockNumber) + ) + ); + } + + return { receipt }; + } + + return { + latestBlock$, + latestBlockNumber$, + blockLogs$, + blockStorageOperations$, + waitForTransaction, + }; +} diff --git a/packages/store-sync/src/index.ts b/packages/store-sync/src/index.ts index a94b166d1b..a56300146c 100644 --- a/packages/store-sync/src/index.ts +++ b/packages/store-sync/src/index.ts @@ -1,2 +1,3 @@ export * from "./blockLogsToStorage"; export * from "./common"; +export * from "./SyncStep"; diff --git a/packages/store-sync/src/recs/common.ts b/packages/store-sync/src/recs/common.ts index 6e28f72930..132dcc8bc1 100644 --- a/packages/store-sync/src/recs/common.ts +++ b/packages/store-sync/src/recs/common.ts @@ -10,13 +10,6 @@ export type StoreComponentMetadata = { valueSchema: ValueSchema; }; -export enum SyncStep { - INITIALIZE = "initialize", - SNAPSHOT = "snapshot", - RPC = "rpc", - LIVE = "live", -} - export type ConfigToRecsComponents = { [tableName in keyof TConfig["tables"] & string]: RecsComponent< { diff --git a/packages/store-sync/src/recs/recsStorage.ts b/packages/store-sync/src/recs/recsStorage.ts index 42fefdaaa6..d0db64f0c2 100644 --- a/packages/store-sync/src/recs/recsStorage.ts +++ b/packages/store-sync/src/recs/recsStorage.ts @@ -12,14 +12,12 @@ import { updateComponent, } from "@latticexyz/recs"; import { isDefined } from "@latticexyz/common/utils"; -import { TableId } from "@latticexyz/common/deprecated"; import { schemaToDefaults } from "../schemaToDefaults"; -import { hexKeyTupleToEntity } from "./hexKeyTupleToEntity"; import { defineInternalComponents } from "./defineInternalComponents"; import { getTableKey } from "./getTableKey"; import { StoreComponentMetadata } from "./common"; - -// TODO: should we create components here from config rather than passing them in? +import { tableIdToHex } from "@latticexyz/common"; +import { encodeEntity } from "./encodeEntity"; export function recsStorage({ components, @@ -52,26 +50,24 @@ export function recsStorage({ const table = getComponentValue( components.TableMetadata, getTableKey({ - address: operation.log.address, + address: operation.address, namespace: operation.namespace, name: operation.name, }) as Entity )?.table; if (!table) { - debug( - `skipping update for unknown table: ${operation.namespace}:${operation.name} at ${operation.log.address}` - ); + debug(`skipping update for unknown table: ${operation.namespace}:${operation.name} at ${operation.address}`); continue; } - const tableId = new TableId(operation.namespace, operation.name).toString(); - const component = componentsByTableId[operation.log.args.table]; + const tableId = tableIdToHex(operation.namespace, operation.name); + const component = componentsByTableId[tableId]; if (!component) { debug(`skipping update for unknown component: ${tableId}. Available components: ${Object.keys(components)}`); continue; } - const entity = hexKeyTupleToEntity(operation.log.args.key); + const entity = encodeEntity(table.keySchema, operation.key); if (operation.type === "SetRecord") { debug("setting component", tableId, entity, operation.value); diff --git a/packages/store-sync/src/recs/syncStepToMessage.ts b/packages/store-sync/src/recs/syncStepToMessage.ts new file mode 100644 index 0000000000..8f702c3102 --- /dev/null +++ b/packages/store-sync/src/recs/syncStepToMessage.ts @@ -0,0 +1,17 @@ +import { SyncStep } from "../SyncStep"; +import { assertExhaustive } from "@latticexyz/common/utils"; + +export function syncStepToMessage(step: SyncStep): string { + switch (step) { + case SyncStep.INITIALIZE: + return "Connecting"; + case SyncStep.SNAPSHOT: + return "Hydrating from snapshot"; + case SyncStep.RPC: + return "Hydrating from RPC"; + case SyncStep.LIVE: + return "All caught up!"; + default: + assertExhaustive(step, `Unexpected sync step: ${step}`); + } +} diff --git a/packages/store-sync/src/recs/syncToRecs.ts b/packages/store-sync/src/recs/syncToRecs.ts index 2e23bfee58..858ebff912 100644 --- a/packages/store-sync/src/recs/syncToRecs.ts +++ b/packages/store-sync/src/recs/syncToRecs.ts @@ -1,54 +1,27 @@ -import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; -import { Address, Block, Hex, PublicClient, TransactionReceipt } from "viem"; -import { ComponentValue, Entity, World as RecsWorld, getComponentValue, setComponent } from "@latticexyz/recs"; -import { BlockLogs, Table } from "../common"; -import { TableRecord } from "@latticexyz/store"; -import { - createBlockStream, - isNonPendingBlock, - blockRangeToLogs, - groupLogsByBlockNumber, -} from "@latticexyz/block-logs-stream"; -import { filter, map, tap, mergeMap, from, concatMap, Observable, share, firstValueFrom } from "rxjs"; -import { BlockStorageOperations, blockLogsToStorage } from "../blockLogsToStorage"; +import { StoreConfig } from "@latticexyz/store"; +import { World as RecsWorld, setComponent } from "@latticexyz/recs"; +import { SyncOptions, SyncResult } from "../common"; import { recsStorage } from "./recsStorage"; -import { debug } from "./debug"; import { defineInternalComponents } from "./defineInternalComponents"; -import { getTableKey } from "./getTableKey"; -import { ConfigToRecsComponents, SyncStep } from "./common"; -import { encodeEntity } from "./encodeEntity"; -import { createIndexerClient } from "../trpc-indexer"; -import { singletonEntity } from "./singletonEntity"; +import { createStoreSync } from "../createStoreSync"; +import { ConfigToRecsComponents } from "./common"; import storeConfig from "@latticexyz/store/mud.config"; import worldConfig from "@latticexyz/world/mud.config"; import { configToRecsComponents } from "./configToRecsComponents"; +import { singletonEntity } from "./singletonEntity"; +import { syncStepToMessage } from "./syncStepToMessage"; -type SyncToRecsOptions = { +type SyncToRecsOptions = SyncOptions & { world: RecsWorld; config: TConfig; - address: Address; - // TODO: make this optional and return one if none provided (but will need chain ID at least) - publicClient: PublicClient; - startBlock?: bigint; - indexerUrl?: string; - initialState?: { - blockNumber: bigint | null; - tables: (Table & { records: TableRecord[] })[]; - }; }; -type SyncToRecsResult = { +type SyncToRecsResult = SyncResult & { // TODO: return publicClient? components: ConfigToRecsComponents & ConfigToRecsComponents & ConfigToRecsComponents & ReturnType; - latestBlock$: Observable; - latestBlockNumber$: Observable; - blockLogs$: Observable; - blockStorageOperations$: Observable>; - waitForTransaction: (tx: Hex) => Promise<{ receipt: TransactionReceipt }>; - destroy: () => void; }; export async function syncToRecs({ @@ -56,7 +29,8 @@ export async function syncToRecs({ config, address, publicClient, - startBlock = 0n, + startBlock, + maxBlockRange, initialState, indexerUrl, }: SyncToRecsOptions): Promise> { @@ -69,160 +43,33 @@ export async function syncToRecs({ world.registerEntity({ id: singletonEntity }); - if (indexerUrl != null && initialState == null) { - try { - const indexer = createIndexerClient({ url: indexerUrl }); - const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); - initialState = await indexer.findAll.query({ chainId, address }); - } catch (error) { - debug("couldn't get initial state from indexer", error); - } - } - - if (initialState != null && initialState.blockNumber != null) { - debug("hydrating from initial state to block", initialState.blockNumber); - startBlock = initialState.blockNumber + 1n; - - setComponent(components.SyncProgress, singletonEntity, { - step: SyncStep.SNAPSHOT, - message: `Hydrating from snapshot to block ${initialState.blockNumber}`, - percentage: 0, - latestBlockNumber: 0n, - lastBlockNumberProcessed: initialState.blockNumber, - }); - - const componentList = Object.values(components); - - const numRecords = initialState.tables.reduce((sum, table) => sum + table.records.length, 0); - const recordsPerSyncProgressUpdate = Math.floor(numRecords / 100); - let recordsProcessed = 0; - - for (const table of initialState.tables) { - setComponent(components.TableMetadata, getTableKey(table) as Entity, { table }); - const component = componentList.find((component) => component.id === table.tableId); - if (component == null) { - debug(`no component found for table ${table.namespace}:${table.name}, skipping initial state`); - continue; - } - for (const record of table.records) { - const entity = encodeEntity(table.keySchema, record.key); - setComponent(component, entity, record.value as ComponentValue); - - recordsProcessed++; - if (recordsProcessed % recordsPerSyncProgressUpdate === 0) { - setComponent(components.SyncProgress, singletonEntity, { - step: SyncStep.SNAPSHOT, - message: `Hydrating from snapshot to block ${initialState.blockNumber}`, - percentage: (recordsProcessed / numRecords) * 100, - latestBlockNumber: 0n, - lastBlockNumberProcessed: initialState.blockNumber, - }); - } - } - debug(`hydrated ${table.records.length} records for table ${table.namespace}:${table.name}`); - } - - setComponent(components.SyncProgress, singletonEntity, { - step: SyncStep.SNAPSHOT, - message: `Hydrating from snapshot to block ${initialState.blockNumber}`, - percentage: (recordsProcessed / numRecords) * 100, - latestBlockNumber: 0n, - lastBlockNumberProcessed: initialState.blockNumber, - }); - } - - // TODO: if startBlock is still 0, find via deploy event - - debug("starting sync from block", startBlock); - - const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share()); - - const latestBlockNumber$ = latestBlock$.pipe( - filter(isNonPendingBlock), - map((block) => block.number), - share() - ); - - let latestBlockNumber: bigint | null = null; - const blockLogs$ = latestBlockNumber$.pipe( - tap((blockNumber) => { - debug("latest block number", blockNumber); - latestBlockNumber = blockNumber; - }), - map((blockNumber) => ({ startBlock, endBlock: blockNumber })), - blockRangeToLogs({ - publicClient, - address, - events: storeEventsAbi, - }), - mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), - share() - ); - - let lastBlockNumberProcessed: bigint | null = null; - const blockStorageOperations$ = blockLogs$.pipe( - concatMap(blockLogsToStorage(recsStorage({ components, config }))), - tap(({ blockNumber, operations }) => { - debug("stored", operations.length, "operations for block", blockNumber); - lastBlockNumberProcessed = blockNumber; - - if ( - latestBlockNumber != null && - getComponentValue(components.SyncProgress, singletonEntity)?.step !== SyncStep.LIVE - ) { - if (lastBlockNumberProcessed < latestBlockNumber) { - setComponent(components.SyncProgress, singletonEntity, { - step: SyncStep.RPC, - message: `Hydrating from RPC to block ${latestBlockNumber}`, - percentage: (Number(lastBlockNumberProcessed) / Number(latestBlockNumber)) * 100, - latestBlockNumber, - lastBlockNumberProcessed, - }); - } else { - setComponent(components.SyncProgress, singletonEntity, { - step: SyncStep.LIVE, - message: `All caught up!`, - percentage: 100, - latestBlockNumber, - lastBlockNumberProcessed, - }); - } - } - }), - share() - ); + const storeSync = await createStoreSync({ + storageAdapter: recsStorage({ components, config }), + config, + address, + publicClient, + startBlock, + maxBlockRange, + indexerUrl, + initialState, + onProgress: ({ step, percentage, latestBlockNumber, lastBlockNumberProcessed }) => { + console.log("got progress", step, percentage); + // TODO: stop updating once live? + setComponent(components.SyncProgress, singletonEntity, { + step, + percentage, + latestBlockNumber, + lastBlockNumberProcessed, + message: syncStepToMessage(step), + }); + }, + }); - // Start the sync - const sub = blockStorageOperations$.subscribe(); + const sub = storeSync.blockStorageOperations$.subscribe(); world.registerDisposer(() => sub.unsubscribe()); - async function waitForTransaction(tx: Hex): Promise<{ - receipt: TransactionReceipt; - }> { - // Wait for tx to be mined - const receipt = await publicClient.waitForTransactionReceipt({ hash: tx }); - - // If we haven't processed a block yet or we haven't processed the block for the tx, wait for it - if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) { - await firstValueFrom( - blockStorageOperations$.pipe( - filter(({ blockNumber }) => blockNumber != null && blockNumber >= receipt.blockNumber) - ) - ); - } - - return { receipt }; - } - return { + ...storeSync, components, - latestBlock$, - latestBlockNumber$, - blockLogs$, - blockStorageOperations$, - waitForTransaction, - destroy: (): void => { - world.dispose(); - }, }; } diff --git a/packages/store-sync/src/sqlite/index.ts b/packages/store-sync/src/sqlite/index.ts index 9e82168c3f..bcfd3948ad 100644 --- a/packages/store-sync/src/sqlite/index.ts +++ b/packages/store-sync/src/sqlite/index.ts @@ -3,3 +3,4 @@ export * from "./getTables"; export * from "./internalTables"; export * from "./schemaVersion"; export * from "./sqliteStorage"; +export * from "./syncToSqlite"; diff --git a/packages/store-sync/src/sqlite/sqliteStorage.ts b/packages/store-sync/src/sqlite/sqliteStorage.ts index 3c78ba86ee..f5f32a2989 100644 --- a/packages/store-sync/src/sqlite/sqliteStorage.ts +++ b/packages/store-sync/src/sqlite/sqliteStorage.ts @@ -1,4 +1,4 @@ -import { Hex, PublicClient, encodePacked, getAddress } from "viem"; +import { PublicClient, concatHex, encodeAbiParameters, getAddress } from "viem"; import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; import { and, eq, sql } from "drizzle-orm"; import { sqliteTableToSql } from "./sqliteTableToSql"; @@ -76,7 +76,7 @@ export async function sqliteStorage({ new Set( operations.map((operation) => JSON.stringify({ - address: getAddress(operation.log.address), + address: getAddress(operation.address), namespace: operation.namespace, name: operation.name, }) @@ -102,7 +102,7 @@ export async function sqliteStorage({ for (const operation of operations) { const table = tables.find( (table) => - table.address === getAddress(operation.log.address) && + table.address === getAddress(operation.address) && table.namespace === operation.namespace && table.name === operation.name ); @@ -112,9 +112,10 @@ export async function sqliteStorage({ } const sqliteTable = createSqliteTable(table); - const key = encodePacked( - operation.log.args.key.map(() => "bytes32"), - operation.log.args.key as Hex[] + const key = concatHex( + Object.entries(table.keySchema).map(([keyName, type]) => + encodeAbiParameters([{ type }], [operation.key[keyName]]) + ) ); if (operation.type === "SetRecord") { diff --git a/packages/store-sync/src/sqlite/syncToSqlite.ts b/packages/store-sync/src/sqlite/syncToSqlite.ts new file mode 100644 index 0000000000..89ed96ca12 --- /dev/null +++ b/packages/store-sync/src/sqlite/syncToSqlite.ts @@ -0,0 +1,56 @@ +import { StoreConfig } from "@latticexyz/store"; +import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; +import { SyncOptions, SyncResult } from "../common"; +import { sqliteStorage } from "./sqliteStorage"; +import { createStoreSync } from "../createStoreSync"; + +type SyncToSqliteOptions = SyncOptions & { + /** + * [SQLite database object from Drizzle][0]. + * + * [0]: https://orm.drizzle.team/docs/installation-and-db-connection/sqlite/better-sqlite3 + */ + database: BaseSQLiteDatabase<"sync", any>; +}; + +type SyncToSqliteResult = SyncResult & { + destroy: () => void; +}; + +/** + * Creates an indexer to process and store blockchain events. + * + * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. + * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. + */ +export async function syncToSqlite({ + config, + database, + publicClient, + address, + startBlock, + maxBlockRange, + indexerUrl, + initialState, +}: SyncToSqliteOptions): Promise> { + const storeSync = await createStoreSync({ + storageAdapter: await sqliteStorage({ database, publicClient, config }), + config, + address, + publicClient, + startBlock, + maxBlockRange, + indexerUrl, + initialState, + }); + + // Start the sync + const sub = storeSync.blockStorageOperations$.subscribe(); + + return { + ...storeSync, + destroy: (): void => { + sub.unsubscribe(); + }, + }; +} diff --git a/packages/store-sync/src/trpc-indexer/common.ts b/packages/store-sync/src/trpc-indexer/common.ts index 0e71ec464a..7089b74b0b 100644 --- a/packages/store-sync/src/trpc-indexer/common.ts +++ b/packages/store-sync/src/trpc-indexer/common.ts @@ -1,9 +1,12 @@ import { Hex } from "viem"; -import type { TableRecord } from "@latticexyz/store"; -import type { Table } from "../common"; - -export type TableWithRecords = Table & { records: TableRecord[] }; +import { TableWithRecords } from "../common"; export type StorageAdapter = { - findAll: (chainId: number, address: Hex) => Promise<{ blockNumber: bigint | null; tables: TableWithRecords[] }>; + findAll: ( + chainId: number, + address?: Hex + ) => Promise<{ + blockNumber: bigint | null; + tables: TableWithRecords[]; + }>; }; diff --git a/packages/store-sync/src/trpc-indexer/createAppRouter.ts b/packages/store-sync/src/trpc-indexer/createAppRouter.ts index 2065d6a948..2e91c0c3cf 100644 --- a/packages/store-sync/src/trpc-indexer/createAppRouter.ts +++ b/packages/store-sync/src/trpc-indexer/createAppRouter.ts @@ -15,7 +15,7 @@ export function createAppRouter() { .input( z.object({ chainId: z.number(), - address: z.string().refine(isHex), + address: z.string().refine(isHex).optional(), }) ) .query(async (opts): ReturnType => { diff --git a/templates/phaser/packages/client/src/ui/LoadingScreen/LoadingScreen.tsx b/templates/phaser/packages/client/src/ui/LoadingScreen/LoadingScreen.tsx index ca4cb4d058..3f42736970 100644 --- a/templates/phaser/packages/client/src/ui/LoadingScreen/LoadingScreen.tsx +++ b/templates/phaser/packages/client/src/ui/LoadingScreen/LoadingScreen.tsx @@ -4,7 +4,8 @@ import { LoadingBar } from "./LoadingBar"; import { BootScreen } from "./BootScreen"; import { useComponentValue } from "@latticexyz/react"; import { useMUD } from "../../store"; -import { SyncStep, singletonEntity } from "@latticexyz/store-sync/recs"; +import { singletonEntity } from "@latticexyz/store-sync/recs"; +import { SyncStep } from "@latticexyz/store-sync"; export const LoadingScreen = () => { const {