Skip to content

Commit

Permalink
feat: Add option to wait until anchor requests are durably created on…
Browse files Browse the repository at this point in the history
… the CAS (#2907)
  • Loading branch information
stbrody authored Aug 18, 2023
1 parent d9115f1 commit a36ebf7
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 77 deletions.
9 changes: 5 additions & 4 deletions packages/common/src/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ export interface AnchorService {
url: string

/**
* Request anchor commit on blockchain
* @param streamId - Stream ID
* @param tip - CID tip
* Send request to the anchoring service
* @param carFile - CAR file containing all necessary data for the CAS to anchor
* @param waitForConfirmation - if true, waits until the CAS has acknowledged receipt of the anchor
* request before returning.
*/
requestAnchor(carFile: CAR): Observable<CASResponse>
requestAnchor(carFile: CAR, waitForConfirmation: boolean): Promise<Observable<CASResponse>>

/**
* Start polling the anchor service to learn of the results of an existing anchor request for the
Expand Down
4 changes: 2 additions & 2 deletions packages/common/src/ceramic-api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { DID } from 'dids'
import type { Stream, StreamHandler, CeramicCommit, StreamState } from './stream.js'
import type { CreateOpts, LoadOpts, PublishOpts, UpdateOpts } from './streamopts.js'
import type { AnchorOpts, CreateOpts, LoadOpts, PublishOpts, UpdateOpts } from './streamopts.js'
import type { StreamID, CommitID } from '@ceramicnetwork/streamid'
import type { LoggerProvider } from './logger-provider.js'
import type { GenesisCommit } from './index.js'
Expand Down Expand Up @@ -209,7 +209,7 @@ export interface CeramicApi extends CeramicSigner {
* @param streamId
* @param opts used to load the current Stream state
*/
requestAnchor(streamId: StreamID | string, opts?: LoadOpts): Promise<AnchorStatus>
requestAnchor(streamId: StreamID | string, opts?: LoadOpts & AnchorOpts): Promise<AnchorStatus>

/**
* Sets the DID instance that will be used to author commits to stream. The DID instance
Expand Down
7 changes: 7 additions & 0 deletions packages/common/src/streamopts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ export interface AnchorOpts {
* Whether or not to request an anchor after performing the operation.
*/
anchor?: boolean

/**
* If true, the write operation won't return until the anchor request has been successfully and
* durably created on the Anchor Service. Note this does NOT wait for the write to be fully
* anchored, it only waits until we are confident that the write will *eventually* be anchored.
*/
waitForAnchorConfirmation?: boolean
}

/**
Expand Down
48 changes: 24 additions & 24 deletions packages/core/src/__tests__/state-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ describe('anchor', () => {
const stream1 = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false })
await stream1.subscribe()
const streamState1 = await ceramic.repository.load(stream1.id, {})
await ceramic.repository.stateManager.anchor(streamState1)
await ceramic.repository.stateManager.anchor(streamState1, {})

const ceramic2 = await createCeramic(ipfs)
const stream2 = await ceramic2.loadStream<TileDocument>(stream1.id, { syncTimeoutSeconds: 0 })
Expand Down Expand Up @@ -646,9 +646,9 @@ describe('anchor', () => {
)
// Emulate CAS responses to the 1st commit
const fauxCASResponse$ = new Subject<CASResponse>()
requestAnchorSpy.mockReturnValueOnce(fauxCASResponse$)
requestAnchorSpy.mockReturnValueOnce(Promise.resolve(fauxCASResponse$))
// Subscription for the 1st commit
const stillProcessingFirst = await ceramic.repository.stateManager.anchor(stream$)
const stillProcessingFirst = await ceramic.repository.stateManager.anchor(stream$, {})
// The emulated CAS accepts the request
fauxCASResponse$.next({
status: AnchorRequestStatusName.PENDING,
Expand All @@ -661,7 +661,7 @@ describe('anchor', () => {

// Now do the 2nd commit, anchor it
await tile.update({ abc: 456, def: 789 }, null, { anchor: false })
const stillProcessingSecond = await ceramic.repository.stateManager.anchor(stream$)
const stillProcessingSecond = await ceramic.repository.stateManager.anchor(stream$, {})
await expectAnchorStatus(tile, AnchorStatus.PENDING)

// The emulated CAS informs Ceramic, that the 1st tip got REPLACED
Expand Down Expand Up @@ -692,7 +692,7 @@ describe('anchor', () => {
const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false })
const stream$ = await ceramic.repository.load(stream.id, {})

await ceramic.repository.stateManager.anchor(stream$)
await ceramic.repository.stateManager.anchor(stream$, {})
expect(stream$.value.anchorStatus).toEqual(AnchorStatus.PENDING)

await TestUtils.anchorUpdate(ceramic, stream)
Expand All @@ -711,7 +711,7 @@ describe('anchor', () => {
const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false })
const stream$ = await ceramic.repository.load(stream.id, {})

await ceramic.repository.stateManager.anchor(stream$)
await ceramic.repository.stateManager.anchor(stream$, {})
expect(stream$.value.anchorStatus).toEqual(AnchorStatus.PENDING)

await TestUtils.anchorUpdate(ceramic, stream)
Expand All @@ -720,7 +720,7 @@ describe('anchor', () => {
expect(stream$.value.log.length).toEqual(2)

// Now re-request an anchor when the stream is already anchored. Should be a no-op
await ceramic.repository.stateManager.anchor(stream$)
await ceramic.repository.stateManager.anchor(stream$, {})
expect(stream$.value.log.length).toEqual(2)
})

Expand All @@ -740,7 +740,7 @@ describe('anchor', () => {
// and stop the retrying mechanism
fakeHandleTip.mockImplementationOnce(realHandleTip)

await ceramic.repository.stateManager.anchor(stream$)
await ceramic.repository.stateManager.anchor(stream$, {})

expect(stream$.value.anchorStatus).toEqual(AnchorStatus.PENDING)

Expand All @@ -763,7 +763,7 @@ describe('anchor', () => {
// Mock fakeHandleTip to always throw
fakeHandleTip.mockRejectedValue(new Error('Handle tip failed'))

await ceramic.repository.stateManager.anchor(stream$)
await ceramic.repository.stateManager.anchor(stream$, {})

expect(stream$.value.anchorStatus).toEqual(AnchorStatus.PENDING)

Expand All @@ -783,7 +783,7 @@ describe('anchor', () => {
const anchorRequestStore = ceramic.repository.stateManager.anchorRequestStore

expect(await anchorRequestStore.load(stream.id)).toBeNull()
await ceramic.repository.stateManager.anchor(stream$)
await ceramic.repository.stateManager.anchor(stream$, {})
expect(stream$.value.anchorStatus).toEqual(AnchorStatus.PENDING)
expect(await anchorRequestStore.load(stream.id)).not.toBeNull()

Expand Down Expand Up @@ -870,13 +870,13 @@ describe('anchor', () => {

// Emulate CAS responses to the 1st commit
const firstCASResponse$ = new Subject<CASResponse>()
requestAnchorSpy.mockImplementationOnce((streamId, commit) => {
requestAnchorSpy.mockImplementationOnce(async (streamId, commit) => {
originalPollForAnchorResponse(streamId, commit)
return firstCASResponse$
})

// Anchor the first commit and subscribe
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$)
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$, {})
expect(await anchorRequestStore.load(tile.id).then((ar) => ar.cid.toString())).toEqual(
stream$.tip.toString()
)
Expand Down Expand Up @@ -907,9 +907,9 @@ describe('anchor', () => {
await tile.update({ abc: 456, def: 789 }, null, { anchor: false })
// Emulate CAS responses to the 2nd commit
const secondCASResponse$ = new Subject<CASResponse>()
requestAnchorSpy.mockReturnValueOnce(secondCASResponse$)
requestAnchorSpy.mockReturnValueOnce(Promise.resolve(secondCASResponse$))
// Anchor the 2nd commit and subscribe
await ceramic.repository.stateManager.anchor(stream$)
await ceramic.repository.stateManager.anchor(stream$, {})
expect(await anchorRequestStore.load(tile.id).then((ar) => ar.cid.toString())).toEqual(
stream$.tip.toString()
)
Expand Down Expand Up @@ -951,9 +951,9 @@ describe('anchor', () => {

// Emulate CAS responses to the 1st commit
const firstCASResponse$ = new Subject<CASResponse>()
requestAnchorSpy.mockReturnValueOnce(firstCASResponse$)
requestAnchorSpy.mockReturnValueOnce(Promise.resolve(firstCASResponse$))
// Anchor the 1st commit and subscribe
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$)
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$, {})
expect(await anchorRequestStore.load(tile.id).then((ar) => ar.cid.toString())).toEqual(
stream$.tip.toString()
)
Expand Down Expand Up @@ -1009,15 +1009,15 @@ describe('anchor', () => {
const stream$ = await ceramic.repository.load(tile.id, {})

// anchor the first commit
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$)
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$, {})
expect(await anchorRequestStore.load(tile.id).then((ar) => ar.cid.toString())).toEqual(
stream$.tip.toString()
)
await expectAnchorStatus(tile, AnchorStatus.PENDING)

await tile.update({ x: 2 }, null, { anchor: false })
// anchor the second commit
const secondAnchorRequestSub = await ceramic.repository.stateManager.anchor(stream$)
const secondAnchorRequestSub = await ceramic.repository.stateManager.anchor(stream$, {})
expect(await anchorRequestStore.load(tile.id).then((ar) => ar.cid.toString())).toEqual(
stream$.tip.toString()
)
Expand Down Expand Up @@ -1059,9 +1059,9 @@ describe('anchor', () => {

// Emulate CAS responses to the 1st commit
const firstCASResponse$ = new Subject<CASResponse>()
requestAnchorSpy.mockReturnValueOnce(firstCASResponse$)
requestAnchorSpy.mockReturnValueOnce(Promise.resolve(firstCASResponse$))
// Anchor the first commit and subscribe
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$)
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$, {})
expect(await anchorRequestStore.load(tile.id).then((ar) => ar.cid.toString())).toEqual(
stream$.tip.toString()
)
Expand All @@ -1087,9 +1087,9 @@ describe('anchor', () => {
await tile.update({ abc: 456, def: 789 }, null, { anchor: false })
// Emulate CAS responses to the 2nd commit
const secondCASResponse$ = new Subject<CASResponse>()
requestAnchorSpy.mockReturnValueOnce(secondCASResponse$)
requestAnchorSpy.mockReturnValueOnce(Promise.resolve(secondCASResponse$))
// Anchor the 2nd commit and subscribe
const secondAnchorRequestSub = await ceramic.repository.stateManager.anchor(stream$)
const secondAnchorRequestSub = await ceramic.repository.stateManager.anchor(stream$, {})
expect(await anchorRequestStore.load(tile.id).then((ar) => ar.cid.toString())).toEqual(
stream$.tip.toString()
)
Expand Down Expand Up @@ -1146,9 +1146,9 @@ describe('anchor', () => {

// Emulate CAS responses for the 1st commit
const fauxCASResponse$ = new Subject<CASResponse>()
requestAnchorSpy.mockReturnValueOnce(fauxCASResponse$)
requestAnchorSpy.mockReturnValueOnce(Promise.resolve(fauxCASResponse$))
// anchor the first commit and subscribe
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$)
const firstAnchorResponseSub = await ceramic.repository.stateManager.anchor(stream$, {})
expect(await anchorRequestStore.load(tile.id).then((ar) => ar.cid.toString())).toEqual(
stream$.tip.toString()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe('AuthenticatedEthereumAnchorServiceTest', () => {
return { status: AnchorRequestStatusName.PENDING }
})

const observable = anchorService.requestAnchor(generateFakeCarFile())
const observable = await anchorService.requestAnchor(generateFakeCarFile(), false)
const anchorStatus = await lastValueFrom(observable)
expect(anchorStatus.status).toEqual(AnchorRequestStatusName.FAILED) // because the response didn't match the expected format

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ test('re-request an anchor till get a response', async () => {
POLL_INTERVAL
)
let lastResponse: any
const subscription = anchorService.requestAnchor(generateFakeCarFile()).subscribe((response) => {
if (response.status === AnchorRequestStatusName.PROCESSING) {
lastResponse = response
subscription.unsubscribe()
const subscription = (await anchorService.requestAnchor(generateFakeCarFile(), false)).subscribe(
(response) => {
if (response.status === AnchorRequestStatusName.PROCESSING) {
lastResponse = response
subscription.unsubscribe()
}
}
})
)
await whenSubscriptionDone(subscription)
expect(lastResponse.message).toEqual(casProcessingResponse.message)
expect(errSpy).toBeCalledTimes(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ test('re-request an anchor till get a response', async () => {
)

let lastResponse: any
const subscription = anchorService.requestAnchor(generateFakeCarFile()).subscribe((response) => {
if (response.status === codecs.AnchorRequestStatusName.PROCESSING) {
lastResponse = response
subscription.unsubscribe()
const subscription = (await anchorService.requestAnchor(generateFakeCarFile(), false)).subscribe(
(response) => {
if (response.status === codecs.AnchorRequestStatusName.PROCESSING) {
lastResponse = response
subscription.unsubscribe()
}
}
})
)
await whenSubscriptionDone(subscription)
expect(lastResponse.message).toEqual(casProcessingResponse.message)
expect(errSpy).toBeCalledTimes(3)
Expand Down
89 changes: 62 additions & 27 deletions packages/core/src/anchor/ethereum/ethereum-anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
FetchRequest,
} from '@ceramicnetwork/common'
import { StreamID } from '@ceramicnetwork/streamid'
import { Observable, concat, timer, of, defer, expand, interval } from 'rxjs'
import { Observable, concat, timer, of, defer, expand, interval, lastValueFrom } from 'rxjs'
import { concatMap, catchError, map, retry } from 'rxjs/operators'
import { CAR } from 'cartonne'
import { AnchorRequestCarFileReader } from '../anchor-request-car-file-reader.js'
Expand Down Expand Up @@ -87,26 +87,42 @@ export class EthereumAnchorService implements AnchorService {
}

/**
* Requests anchoring service for current tip of the stream
* Send request to the anchoring service
* @param carFile - CAR file containing all necessary data for the CAS to anchor
* @param waitForConfirmation - if true, waits until the CAS has acknowledged receipt of the anchor
* request before returning.
*/
requestAnchor(carFile: CAR): Observable<CASResponse> {
async requestAnchor(
carFile: CAR,
waitForConfirmation: boolean
): Promise<Observable<CASResponse>> {
const carFileReader = new AnchorRequestCarFileReader(carFile)
const cidStreamPair: CidAndStream = { cid: carFileReader.tip, streamId: carFileReader.streamId }
return concat(

const requestCreated$ = concat(
this._announcePending(cidStreamPair),
this._makeAnchorRequest(carFileReader),
this.pollForAnchorResponse(carFileReader.streamId, carFileReader.tip)
).pipe(
catchError((error) =>
of<CASResponse>({
id: '',
status: AnchorRequestStatusName.FAILED,
streamId: carFileReader.streamId,
cid: carFileReader.tip,
message: error.message,
})
)
this._makeAnchorRequest(carFileReader, !waitForConfirmation)
)

const anchorCompleted$ = this.pollForAnchorResponse(carFileReader.streamId, carFileReader.tip)

const streamId = carFileReader.streamId
const tip = carFileReader.tip
const errHandler = (error) =>
of<CASResponse>({
id: '',
status: AnchorRequestStatusName.FAILED,
streamId: streamId,
cid: tip,
message: error.message,
})

if (waitForConfirmation) {
await lastValueFrom(requestCreated$)
return anchorCompleted$.pipe(catchError(errHandler))
} else {
return concat(requestCreated$, anchorCompleted$).pipe(catchError(errHandler))
}
}

/**
Expand All @@ -130,26 +146,45 @@ export class EthereumAnchorService implements AnchorService {
/**
* Send requests to an external Ceramic Anchor Service
*/
private _makeAnchorRequest(carFileReader: AnchorRequestCarFileReader): Observable<CASResponse> {
return defer(() =>
private _makeAnchorRequest(
carFileReader: AnchorRequestCarFileReader,
shouldRetry: boolean
): Observable<CASResponse> {
let sendRequest$ = defer(() =>
this.sendRequest(this.requestsApiEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/vnd.ipld.car',
},
body: carFileReader.carFile.bytes,
})
).pipe(
retry({
delay: (error) => {
this._logger.err(
new Error(
`Error connecting to CAS while attempting to anchor ${carFileReader.streamId} at commit ${carFileReader.tip}: ${error.message}`
)

if (shouldRetry) {
sendRequest$ = sendRequest$.pipe(
retry({
delay: (error) => {
this._logger.err(
new Error(
`Error connecting to CAS while attempting to anchor ${carFileReader.streamId} at commit ${carFileReader.tip}: ${error.message}`
)
)
return timer(this.pollInterval)
},
})
)
} else {
sendRequest$ = sendRequest$.pipe(
catchError((error) => {
// clean up the error message to have more context
throw new Error(
`Error connecting to CAS while attempting to anchor ${carFileReader.streamId} at commit ${carFileReader.tip}: ${error.message}`
)
return timer(this.pollInterval)
},
}),
})
)
}

return sendRequest$.pipe(
map((response) => {
return this.parseResponse(
{ streamId: carFileReader.streamId, cid: carFileReader.tip },
Expand Down
Loading

0 comments on commit a36ebf7

Please sign in to comment.