diff --git a/packages/core/src/__tests__/create-dispatcher.ts b/packages/core/src/__tests__/create-dispatcher.ts index 3506c8ecd2..6029195966 100644 --- a/packages/core/src/__tests__/create-dispatcher.ts +++ b/packages/core/src/__tests__/create-dispatcher.ts @@ -32,6 +32,7 @@ export async function createDispatcher(ipfs: IpfsApi, pubsubTopic: string): Prom } as unknown as AnchorRequestStore const index = { init: () => Promise.resolve(), + indexedModels: () => [], } as unknown as IndexApi repository.setDeps({ pinStore, diff --git a/packages/core/src/__tests__/recon.test.ts b/packages/core/src/__tests__/recon.test.ts index b9c99bd42a..c02763c7ef 100644 --- a/packages/core/src/__tests__/recon.test.ts +++ b/packages/core/src/__tests__/recon.test.ts @@ -5,6 +5,7 @@ import { jest } from '@jest/globals' import { CARFactory, type CAR } from 'cartonne' import { toArray, take, lastValueFrom, firstValueFrom, race, timer } from 'rxjs' import { CommonTestUtils } from '@ceramicnetwork/common-test-utils' +import { BaseTestUtils as TestUtils } from '@ceramicnetwork/base-test-utils' const RECON_URL = 'http://example.com' const LOGGER = new LoggerProvider().getDiagnosticsLogger() @@ -80,6 +81,22 @@ describe('ReconApi', () => { await firstValueFrom(race(reconApi, timer(1000))) expect(mockSendRequest).toHaveBeenCalledTimes(1) }) + + test('should register interests on init', async () => { + const mockSendRequest = jest.fn(() => Promise.resolve()) + const reconApi = new ReconApi( + { enabled: true, url: RECON_URL, feedEnabled: false }, + LOGGER, + mockSendRequest + ) + const fakeInterest0 = TestUtils.randomStreamID() + const fakeInterest1 = TestUtils.randomStreamID() + await reconApi.init('testInitialCursor', [fakeInterest0, fakeInterest1]) + expect(mockSendRequest).toHaveBeenCalledTimes(3) + expect(mockSendRequest.mock.calls[1][0]).toContain(fakeInterest0.toString()) + expect(mockSendRequest.mock.calls[2][0]).toContain(fakeInterest1.toString()) + reconApi.stop() + }) }) describe('registerInterest', () => { diff --git a/packages/core/src/recon.ts b/packages/core/src/recon.ts index c1a4efa4f9..090c15746d 100644 --- a/packages/core/src/recon.ts +++ b/packages/core/src/recon.ts @@ -56,7 +56,7 @@ export interface ReconEventFeedResponse { * Recon API Interface */ export interface IReconApi extends Observable { - init(initialCursor?: string): Promise + init(initialCursor?: string, initialInterests?: Array): Promise registerInterest(model: StreamID): Promise put(car: CAR, opts?: AbortOptions): Promise enabled: boolean @@ -96,7 +96,7 @@ export class ReconApi extends Observable implements IRec * @param initialCursor * @returns */ - async init(initialCursor = ''): Promise { + async init(initialCursor = '', initialInterests: Array = []): Promise { if (this.#initialized) { return } @@ -110,6 +110,10 @@ export class ReconApi extends Observable implements IRec this.#url = await this.#config.url await this.registerInterest(Model.MODEL) + for (const interest of initialInterests) { + await this.registerInterest(interest) + } + if (this.#config.feedEnabled) { this.#eventsSubscription = this.createSubscription(initialCursor).subscribe(this.#feed$) } diff --git a/packages/core/src/state-management/repository.ts b/packages/core/src/state-management/repository.ts index 2d705afe4e..64321f6d2e 100644 --- a/packages/core/src/state-management/repository.ts +++ b/packages/core/src/state-management/repository.ts @@ -186,7 +186,9 @@ export class Repository { const cursor = (await reconStore.exists(RECON_STORE_CURSOR_KEY)) ? await reconStore.get(RECON_STORE_CURSOR_KEY) : '0' - await this.recon.init(cursor) + + const interests = this.index.indexedModels().map((modelData) => modelData.streamID) + await this.recon.init(cursor, interests) this.reconEventFeedSubscription = this.recon .pipe(concatMap(this.handleReconEvents.bind(this))) .subscribe() diff --git a/packages/stream-tests/src/__tests__/model-instance-document.test.ts b/packages/stream-tests/src/__tests__/model-instance-document.test.ts index d114fcc020..0b6318ef28 100644 --- a/packages/stream-tests/src/__tests__/model-instance-document.test.ts +++ b/packages/stream-tests/src/__tests__/model-instance-document.test.ts @@ -19,6 +19,7 @@ import { CeramicDaemon, DaemonConfig } from '@ceramicnetwork/cli' import { CeramicClient } from '@ceramicnetwork/http-client' import { Model, ModelDefinition } from '@ceramicnetwork/stream-model' import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils' +import tmp from 'tmp-promise' const CONTENT0 = { myData: 0 } const CONTENT1 = { myData: 1 } @@ -508,6 +509,7 @@ describe('ModelInstanceDocument API multi-node tests', () => { let ceramic1: Ceramic let model: Model let midMetadata: ModelInstanceDocumentMetadataArgs + let ceramic1StateStore: string beforeAll(async () => { ipfs0 = await createIPFS() @@ -515,7 +517,16 @@ describe('ModelInstanceDocument API multi-node tests', () => { await swarmConnect(ipfs0, ipfs1) ceramic0 = await createCeramic(ipfs0) - ceramic1 = await createCeramic(ipfs1) + ceramic1StateStore = await tmp.tmpName() + ceramic1 = await createCeramic(ipfs1, { + indexing: { + db: `sqlite://${ceramic1StateStore}/ceramic.sqlite`, + allowQueriesBeforeHistoricalSync: false, + disableComposedb: false, + enableHistoricalSync: false, + }, + stateStoreDirectory: ceramic1StateStore, + }) model = await Model.create(ceramic0, MODEL_DEFINITION) midMetadata = { model: model.id } @@ -593,4 +604,33 @@ describe('ModelInstanceDocument API multi-node tests', () => { expect(loaded.state.log.length).toEqual(3) expect(JSON.stringify(loaded.state)).toEqual(JSON.stringify(doc.state)) }) + + test('can load doc after restart', async () => { + //restart ceramic1 + await ceramic1.close() + ceramic1 = await createCeramic(ipfs1, { + indexing: { + db: `sqlite://${ceramic1StateStore}/ceramic.sqlite`, + allowQueriesBeforeHistoricalSync: false, + disableComposedb: false, + enableHistoricalSync: false, + }, + stateStoreDirectory: ceramic1StateStore, + }) + + const doc = await ModelInstanceDocument.create(ceramic0, CONTENT0, midMetadata) + if (EnvironmentUtils.useRustCeramic()) + await TestUtils.waitForEvent(ceramic1.repository.recon, doc.tip) + + const loaded = await ModelInstanceDocument.load(ceramic1, doc.id) + + const docState = doc.state + const loadedState = loaded.state + expect(docState.anchorStatus).toEqual(AnchorStatus.PENDING) + expect(loadedState.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + delete docState.anchorStatus + delete loadedState.anchorStatus + expect(loadedState.log.length).toEqual(1) + expect(JSON.stringify(loadedState)).toEqual(JSON.stringify(docState)) + }) })