diff --git a/.changeset/flat-trainers-marry.md b/.changeset/flat-trainers-marry.md new file mode 100644 index 0000000000..54ff1653ea --- /dev/null +++ b/.changeset/flat-trainers-marry.md @@ -0,0 +1,23 @@ +--- +"@latticexyz/common": patch +"@latticexyz/store-sync": patch +--- + +Initial sync from indexer no longer blocks the promise returning from `createStoreSync`, `syncToRecs`, and `syncToSqlite`. This should help with rendering loading screens using the `SyncProgress` RECS component and avoid the long flashes of no content in templates. + +By default, `syncToRecs` and `syncToSqlite` will start syncing (via observable subscription) immediately after called. + +If your app needs to control when syncing starts, you can use the `startSync: false` option and then `blockStoreOperations$.subscribe()` to start the sync yourself. Just be sure to unsubscribe to avoid memory leaks. + +```ts +const { blockStorageOperations$ } = syncToRecs({ + ... + startSync: false, +}); + +// start sync manually by subscribing to `blockStorageOperation$` +const subcription = blockStorageOperation$.subscribe(); + +// clean up subscription +subscription.unsubscribe(); +``` diff --git a/e2e/packages/sync-test/indexerSync.test.ts b/e2e/packages/sync-test/indexerSync.test.ts index 1d0bb9a986..e86398ba2f 100644 --- a/e2e/packages/sync-test/indexerSync.test.ts +++ b/e2e/packages/sync-test/indexerSync.test.ts @@ -51,7 +51,7 @@ describe("Sync from indexer", async () => { await waitForInitialSync(page); expect(asyncErrorHandler.getErrors()).toHaveLength(1); - expect(asyncErrorHandler.getErrors()[0]).toContain("couldn't get initial state from indexer"); + expect(asyncErrorHandler.getErrors()[0]).toContain("error fetching initial state from indexer"); }); describe("indexer online", () => { diff --git a/packages/common/src/utils/index.ts b/packages/common/src/utils/index.ts index 7ba406179b..5f06e434e0 100644 --- a/packages/common/src/utils/index.ts +++ b/packages/common/src/utils/index.ts @@ -7,3 +7,4 @@ export * from "./curry"; export * from "./isDefined"; export * from "./isNotNull"; export * from "./wait"; +export * from "./waitForIdle"; diff --git a/packages/common/src/utils/wait.ts b/packages/common/src/utils/wait.ts index 7b18fbe5dd..49439c80c0 100644 --- a/packages/common/src/utils/wait.ts +++ b/packages/common/src/utils/wait.ts @@ -1,3 +1,3 @@ export function wait(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); + return new Promise((resolve) => setTimeout(() => resolve(), ms)); } diff --git a/packages/common/src/utils/waitForIdle.ts b/packages/common/src/utils/waitForIdle.ts new file mode 100644 index 0000000000..7dd4763cf8 --- /dev/null +++ b/packages/common/src/utils/waitForIdle.ts @@ -0,0 +1,5 @@ +export function waitForIdle(): Promise { + return new Promise((resolve) => { + requestIdleCallback(() => resolve()); + }); +} diff --git a/packages/store-sync/src/blockLogsToStorage.ts b/packages/store-sync/src/blockLogsToStorage.ts index 5651f254a3..41658a5ceb 100644 --- a/packages/store-sync/src/blockLogsToStorage.ts +++ b/packages/store-sync/src/blockLogsToStorage.ts @@ -83,10 +83,12 @@ export function blockLogsToStorage({ .filter(isDefined); // Then register tables before we start storing data in them - await registerTables({ - blockNumber: block.blockNumber, - tables: newTables, - }); + if (newTables.length > 0) { + await registerTables({ + blockNumber: block.blockNumber, + tables: newTables, + }); + } const tablesToFetch = Array.from( new Set( diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 97480a68d0..2c52b81f7e 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -1,14 +1,30 @@ import { ConfigToKeyPrimitives, ConfigToValuePrimitives, StoreConfig, storeEventsAbi } from "@latticexyz/store"; -import { Hex, TransactionReceipt } from "viem"; -import { SetRecordOperation, SyncOptions, SyncResult } from "./common"; +import { Hex, TransactionReceipt, WaitForTransactionReceiptTimeoutError } from "viem"; +import { SetRecordOperation, SyncOptions, SyncResult, TableWithRecords } from "./common"; import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; -import { filter, map, tap, mergeMap, from, concatMap, share, firstValueFrom } from "rxjs"; +import { + filter, + map, + tap, + mergeMap, + from, + concat, + concatMap, + share, + firstValueFrom, + defer, + of, + catchError, + shareReplay, + combineLatest, +} from "rxjs"; import pRetry from "p-retry"; import { blockLogsToStorage } from "./blockLogsToStorage"; import { debug as parentDebug } from "./debug"; import { createIndexerClient } from "./trpc-indexer"; import { BlockLogsToStorageOptions } from "./blockLogsToStorage"; import { SyncStep } from "./SyncStep"; +import { chunk } from "@latticexyz/common/utils"; const debug = parentDebug.extend("createStoreSync"); @@ -19,6 +35,7 @@ type CreateStoreSyncOptions = SyncOpt percentage: number; latestBlockNumber: bigint; lastBlockNumberProcessed: bigint; + message: string; }) => void; }; @@ -29,85 +46,145 @@ export async function createStoreSync onProgress, address, publicClient, - startBlock = 0n, + startBlock: initialStartBlock = 0n, maxBlockRange, initialState, indexerUrl, }: CreateStoreSyncOptions): Promise> { - if (indexerUrl != null && initialState == null) { - try { + const initialState$ = defer( + async (): Promise< + | { + blockNumber: bigint | null; + tables: TableWithRecords[]; + } + | undefined + > => { + if (initialState) return initialState; + if (!indexerUrl) return; + + debug("fetching initial state from indexer", indexerUrl); + + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: 0, + latestBlockNumber: 0n, + lastBlockNumberProcessed: 0n, + message: "Fetching snapshot from indexer", + }); + const indexer = createIndexerClient({ url: indexerUrl }); const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); - initialState = await indexer.findAll.query({ chainId, address }); - } catch (error) { - debug("couldn't get initial state from indexer", error); + const result = await indexer.findAll.query({ chainId, address }); + + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: 100, + latestBlockNumber: 0n, + lastBlockNumberProcessed: 0n, + message: "Fetched snapshot from indexer", + }); + + return result; } - } + ).pipe( + catchError((error) => { + debug("error fetching initial state from indexer", error); - if (initialState != null) { - const { blockNumber, tables } = initialState; - if (blockNumber != null) { - debug("hydrating from initial state to block", initialState.blockNumber); - startBlock = blockNumber + 1n; + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: 100, + latestBlockNumber: 0n, + lastBlockNumberProcessed: initialStartBlock, + message: "Failed to fetch snapshot from indexer", + }); - await storageAdapter.registerTables({ blockNumber, tables }); + return of(undefined); + }), + shareReplay(1) + ); - const numRecords = initialState.tables.reduce((sum, table) => sum + table.records.length, 0); - const recordsPerProgressUpdate = Math.floor(numRecords / 100); - let recordsProcessed = 0; - let recordsProcessedSinceLastUpdate = 0; - - for (const table of initialState.tables) { - await storageAdapter.storeOperations({ - blockNumber, - operations: table.records.map( - (record) => - ({ - type: "SetRecord", - address: table.address, - namespace: table.namespace, - name: table.name, - key: record.key as ConfigToKeyPrimitives, - value: record.value as ConfigToValuePrimitives, - } as const satisfies SetRecordOperation) - ), - }); + const startBlock$ = initialState$.pipe( + map((initialState) => initialState?.blockNumber ?? initialStartBlock), + // TODO: if start block is still 0, find via deploy event + tap((startBlock) => debug("starting sync from block", startBlock)) + ); - recordsProcessed += table.records.length; - recordsProcessedSinceLastUpdate += table.records.length; - - if (recordsProcessedSinceLastUpdate > recordsPerProgressUpdate) { - recordsProcessedSinceLastUpdate = 0; - onProgress?.({ - step: SyncStep.SNAPSHOT, - percentage: (recordsProcessed / numRecords) * 100, - latestBlockNumber: 0n, - lastBlockNumberProcessed: blockNumber, - }); - } + const initialStorageOperations$ = initialState$.pipe( + filter( + (initialState): initialState is { blockNumber: bigint; tables: TableWithRecords[] } => + initialState != null && initialState.blockNumber != null && initialState.tables.length > 0 + ), + concatMap(async ({ blockNumber, tables }) => { + debug("hydrating from initial state to block", blockNumber); + + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: 0, + latestBlockNumber: 0n, + lastBlockNumberProcessed: blockNumber, + message: "Hydrating from snapshot", + }); + + await storageAdapter.registerTables({ blockNumber, tables }); + + const operations: SetRecordOperation[] = tables.flatMap((table) => + table.records.map((record) => ({ + type: "SetRecord", + address: table.address, + namespace: table.namespace, + name: table.name, + key: record.key as ConfigToKeyPrimitives, + value: record.value as ConfigToValuePrimitives, + })) + ); - debug(`hydrated ${table.records.length} records for table ${table.namespace}:${table.name}`); + // Split snapshot operations into chunks so we can update the progress callback (and ultimately render visual progress for the user). + // This isn't ideal if we want to e.g. batch load these into a DB in a single DB tx, but we'll take it. + // + // Split into 50 equal chunks (for better `onProgress` updates) but only if we have 100+ items per chunk + const chunkSize = Math.max(100, Math.floor(operations.length / 50)); + const chunks = Array.from(chunk(operations, chunkSize)); + for (const [i, chunk] of chunks.entries()) { + await storageAdapter.storeOperations({ blockNumber, operations: chunk }); + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: (i + chunk.length) / chunks.length, + latestBlockNumber: 0n, + lastBlockNumberProcessed: blockNumber, + message: "Hydrating from snapshot", + }); } - } - } - // TODO: if startBlock is still 0, find via deploy event + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: 100, + latestBlockNumber: 0n, + lastBlockNumberProcessed: blockNumber, + message: "Hydrated from snapshot", + }); - debug("starting sync from block", startBlock); + return { blockNumber, operations }; + }), + shareReplay(1) + ); - const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share()); + const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(shareReplay(1)); const latestBlockNumber$ = latestBlock$.pipe( map((block) => block.number), - share() - ); - - let latestBlockNumber: bigint | null = null; - const blockLogs$ = latestBlockNumber$.pipe( tap((blockNumber) => { debug("latest block number", blockNumber); - latestBlockNumber = blockNumber; }), - map((blockNumber) => ({ startBlock, endBlock: blockNumber })), + shareReplay(1) + ); + + let startBlock: bigint | null = null; + let endBlock: bigint | null = null; + const blockLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( + map(([startBlock, endBlock]) => ({ startBlock, endBlock })), + tap((range) => { + startBlock = range.startBlock; + endBlock = range.endBlock; + }), blockRangeToLogs({ publicClient, address, @@ -119,32 +196,38 @@ export async function createStoreSync ); let lastBlockNumberProcessed: bigint | null = null; - const blockStorageOperations$ = blockLogs$.pipe( - concatMap(blockLogsToStorage(storageAdapter)), - tap(({ blockNumber, operations }) => { - debug("stored", operations.length, "operations for block", blockNumber); - lastBlockNumberProcessed = blockNumber; - - if (latestBlockNumber != null) { - if (blockNumber < latestBlockNumber) { - onProgress?.({ - step: SyncStep.RPC, - percentage: Number((lastBlockNumberProcessed * 1000n) / (latestBlockNumber * 1000n)) / 100, - latestBlockNumber, - lastBlockNumberProcessed, - }); - } else { - onProgress?.({ - step: SyncStep.LIVE, - percentage: 100, - latestBlockNumber, - lastBlockNumberProcessed, - }); + const blockStorageOperations$ = concat( + initialStorageOperations$, + blockLogs$.pipe( + concatMap(blockLogsToStorage(storageAdapter)), + tap(({ blockNumber, operations }) => { + debug("stored", operations.length, "operations for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + + if (startBlock != null && endBlock != null) { + if (blockNumber < endBlock) { + const totalBlocks = endBlock - startBlock; + const processedBlocks = lastBlockNumberProcessed - startBlock; + onProgress?.({ + step: SyncStep.RPC, + percentage: Number((processedBlocks * 1000n) / totalBlocks) / 1000, + latestBlockNumber: endBlock, + lastBlockNumberProcessed, + message: "Hydrating from RPC", + }); + } else { + onProgress?.({ + step: SyncStep.LIVE, + percentage: 100, + latestBlockNumber: endBlock, + lastBlockNumberProcessed, + message: "All caught up!", + }); + } } - } - }), - share() - ); + }) + ) + ).pipe(share()); async function waitForTransaction(tx: Hex): Promise<{ receipt: TransactionReceipt; @@ -159,7 +242,16 @@ export async function createStoreSync timeout: publicClient.pollingInterval * 2 * attempt, }); }, - { retries: 3 } + { + retries: 3, + onFailedAttempt: (error) => { + if (error instanceof WaitForTransactionReceiptTimeoutError) { + debug("timed out waiting for tx receipt, trying again", tx); + return; + } + throw error; + }, + } ); debug("got tx receipt", tx, receipt); diff --git a/packages/store-sync/src/recs/common.ts b/packages/store-sync/src/recs/common.ts index 132dcc8bc1..79292ac176 100644 --- a/packages/store-sync/src/recs/common.ts +++ b/packages/store-sync/src/recs/common.ts @@ -1,9 +1,9 @@ import { KeySchema, StoreConfig, ValueSchema } from "@latticexyz/store"; -import { Component as RecsComponent, Type as RecsType } from "@latticexyz/recs"; +import { Component as RecsComponent, Metadata as RecsMetadata, Type as RecsType } from "@latticexyz/recs"; import { SchemaAbiTypeToRecsType } from "./schemaAbiTypeToRecsType"; import { SchemaAbiType } from "@latticexyz/schema-type"; -export type StoreComponentMetadata = { +export type StoreComponentMetadata = RecsMetadata & { componentName: string; tableName: string; keySchema: KeySchema; diff --git a/packages/store-sync/src/recs/getTableEntity.ts b/packages/store-sync/src/recs/getTableEntity.ts new file mode 100644 index 0000000000..83b746e89b --- /dev/null +++ b/packages/store-sync/src/recs/getTableEntity.ts @@ -0,0 +1,15 @@ +import { Address, Hex, getAddress, stringToHex } from "viem"; +import { Table } from "../common"; +import { Entity } from "@latticexyz/recs"; +import { encodeEntity } from "./encodeEntity"; + +export function getTableEntity(table: Pick): Entity { + return encodeEntity( + { address: "address", namespace: "bytes16", name: "bytes16" }, + { + address: table.address, + namespace: stringToHex(table.namespace, { size: 16 }), + name: stringToHex(table.name, { size: 16 }), + } + ); +} diff --git a/packages/store-sync/src/recs/getTableKey.ts b/packages/store-sync/src/recs/getTableKey.ts deleted file mode 100644 index 5a8edb9f35..0000000000 --- a/packages/store-sync/src/recs/getTableKey.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Address, getAddress } from "viem"; -import { Table, TableName, TableNamespace } from "../common"; - -export type TableKey = `${Address}:${TableNamespace}:${TableName}`; - -export function getTableKey(table: Pick): TableKey { - return `${getAddress(table.address)}:${table.namespace}:${table.name}`; -} diff --git a/packages/store-sync/src/recs/recsStorage.ts b/packages/store-sync/src/recs/recsStorage.ts index d41548c485..71cee4dd4e 100644 --- a/packages/store-sync/src/recs/recsStorage.ts +++ b/packages/store-sync/src/recs/recsStorage.ts @@ -3,7 +3,6 @@ import { StoreConfig } from "@latticexyz/store"; import { debug } from "./debug"; import { ComponentValue, - Entity, Component as RecsComponent, Schema as RecsSchema, getComponentValue, @@ -14,7 +13,7 @@ import { import { isDefined } from "@latticexyz/common/utils"; import { schemaToDefaults } from "../schemaToDefaults"; import { defineInternalComponents } from "./defineInternalComponents"; -import { getTableKey } from "./getTableKey"; +import { getTableEntity } from "./getTableEntity"; import { StoreComponentMetadata } from "./common"; import { tableIdToHex } from "@latticexyz/common"; import { encodeEntity } from "./encodeEntity"; @@ -36,24 +35,24 @@ export function recsStorage({ async registerTables({ tables }) { for (const table of tables) { // TODO: check if table exists already and skip/warn? - setComponent(components.RegisteredTables, getTableKey(table) as Entity, { table }); + setComponent(components.RegisteredTables, getTableEntity(table), { table }); } }, async getTables({ tables }) { // TODO: fetch schema from RPC if table not found? return tables - .map((table) => getComponentValue(components.RegisteredTables, getTableKey(table) as Entity)?.table) + .map((table) => getComponentValue(components.RegisteredTables, getTableEntity(table))?.table) .filter(isDefined); }, async storeOperations({ operations }) { for (const operation of operations) { const table = getComponentValue( components.RegisteredTables, - getTableKey({ + getTableEntity({ address: operation.address, namespace: operation.namespace, name: operation.name, - }) as Entity + }) )?.table; if (!table) { debug(`skipping update for unknown table: ${operation.namespace}:${operation.name} at ${operation.address}`); diff --git a/packages/store-sync/src/recs/syncStepToMessage.ts b/packages/store-sync/src/recs/syncStepToMessage.ts deleted file mode 100644 index 8f702c3102..0000000000 --- a/packages/store-sync/src/recs/syncStepToMessage.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { SyncStep } from "../SyncStep"; -import { assertExhaustive } from "@latticexyz/common/utils"; - -export function syncStepToMessage(step: SyncStep): string { - switch (step) { - case SyncStep.INITIALIZE: - return "Connecting"; - case SyncStep.SNAPSHOT: - return "Hydrating from snapshot"; - case SyncStep.RPC: - return "Hydrating from RPC"; - case SyncStep.LIVE: - return "All caught up!"; - default: - assertExhaustive(step, `Unexpected sync step: ${step}`); - } -} diff --git a/packages/store-sync/src/recs/syncToRecs.ts b/packages/store-sync/src/recs/syncToRecs.ts index 858ebff912..4483fd5817 100644 --- a/packages/store-sync/src/recs/syncToRecs.ts +++ b/packages/store-sync/src/recs/syncToRecs.ts @@ -1,5 +1,5 @@ import { StoreConfig } from "@latticexyz/store"; -import { World as RecsWorld, setComponent } from "@latticexyz/recs"; +import { World as RecsWorld, getComponentValue, setComponent } from "@latticexyz/recs"; import { SyncOptions, SyncResult } from "../common"; import { recsStorage } from "./recsStorage"; import { defineInternalComponents } from "./defineInternalComponents"; @@ -9,19 +9,20 @@ import storeConfig from "@latticexyz/store/mud.config"; import worldConfig from "@latticexyz/world/mud.config"; import { configToRecsComponents } from "./configToRecsComponents"; import { singletonEntity } from "./singletonEntity"; -import { syncStepToMessage } from "./syncStepToMessage"; +import { SyncStep } from "../SyncStep"; type SyncToRecsOptions = SyncOptions & { world: RecsWorld; config: TConfig; + startSync?: boolean; }; type SyncToRecsResult = SyncResult & { - // TODO: return publicClient? components: ConfigToRecsComponents & ConfigToRecsComponents & ConfigToRecsComponents & ReturnType; + stopSync: () => void; }; export async function syncToRecs({ @@ -33,6 +34,7 @@ export async function syncToRecs({ maxBlockRange, initialState, indexerUrl, + startSync = true, }: SyncToRecsOptions): Promise> { const components = { ...configToRecsComponents(world, config), @@ -52,24 +54,29 @@ export async function syncToRecs({ maxBlockRange, indexerUrl, initialState, - onProgress: ({ step, percentage, latestBlockNumber, lastBlockNumberProcessed }) => { - console.log("got progress", step, percentage); - // TODO: stop updating once live? - setComponent(components.SyncProgress, singletonEntity, { - step, - percentage, - latestBlockNumber, - lastBlockNumberProcessed, - message: syncStepToMessage(step), - }); + onProgress: ({ step, percentage, latestBlockNumber, lastBlockNumberProcessed, message }) => { + if (getComponentValue(components.SyncProgress, singletonEntity)?.step !== SyncStep.LIVE) { + setComponent(components.SyncProgress, singletonEntity, { + step, + percentage, + latestBlockNumber, + lastBlockNumberProcessed, + message, + }); + } }, }); - const sub = storeSync.blockStorageOperations$.subscribe(); - world.registerDisposer(() => sub.unsubscribe()); + const sub = startSync ? storeSync.blockStorageOperations$.subscribe() : null; + const stopSync = (): void => { + sub?.unsubscribe(); + }; + + world.registerDisposer(stopSync); return { ...storeSync, components, + stopSync, }; } diff --git a/packages/store-sync/src/sqlite/syncToSqlite.ts b/packages/store-sync/src/sqlite/syncToSqlite.ts index 89ed96ca12..2da2185bf9 100644 --- a/packages/store-sync/src/sqlite/syncToSqlite.ts +++ b/packages/store-sync/src/sqlite/syncToSqlite.ts @@ -11,10 +11,11 @@ type SyncToSqliteOptions = SyncOption * [0]: https://orm.drizzle.team/docs/installation-and-db-connection/sqlite/better-sqlite3 */ database: BaseSQLiteDatabase<"sync", any>; + startSync?: boolean; }; type SyncToSqliteResult = SyncResult & { - destroy: () => void; + stopSync: () => void; }; /** @@ -32,6 +33,7 @@ export async function syncToSqlite({ maxBlockRange, indexerUrl, initialState, + startSync = true, }: SyncToSqliteOptions): Promise> { const storeSync = await createStoreSync({ storageAdapter: await sqliteStorage({ database, publicClient, config }), @@ -44,13 +46,13 @@ export async function syncToSqlite({ initialState, }); - // Start the sync - const sub = storeSync.blockStorageOperations$.subscribe(); + const sub = startSync ? storeSync.blockStorageOperations$.subscribe() : null; + const stopSync = (): void => { + sub?.unsubscribe(); + }; return { ...storeSync, - destroy: (): void => { - sub.unsubscribe(); - }, + stopSync, }; }