Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prover op mode fixes #5014

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions packages/agents/lighthouse/src/tasks/prover/operations/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
RequestContext,
XMessage,
ExecStatus,
DBHelper,
} from "@connext/nxtp-utils";

import {
Expand All @@ -16,7 +17,7 @@ import {
MessageRootVerificationFailed,
} from "../../../errors";
import { sendWithRelayerWithBackup } from "../../../mockable";
import { HubDBHelper, SpokeDBHelper } from "../adapters";
import { HubDBHelper, SpokeDBHelper, OptimisticHubDBHelper } from "../adapters";
import { getContext } from "../prover";

import { BrokerMessage, ProofStruct } from "./types";
Expand All @@ -38,6 +39,7 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
messageRootCount,
aggregateRoot,
aggregateRootCount,
snapshotRoots,
} = brokerMessage;

// Dedup the batch
Expand Down Expand Up @@ -65,17 +67,27 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
);
const spokeSMT = new SparseMerkleTree(spokeStore);

const hubStore = new HubDBHelper(
"hub",
aggregateRootCount,
{
reader: database,
writer: databaseWriter,
},
cache.messages,
);
const hubSMT = new SparseMerkleTree(hubStore);
let hubStore: DBHelper;
if (snapshotRoots.length == 0) {
hubStore = new HubDBHelper(
"hub",
aggregateRootCount,
{
reader: database,
writer: databaseWriter,
},
cache.messages,
);
} else {
const baseAggregateRootCount = aggregateRootCount - snapshotRoots.length;
const baseAggregateRoots: string[] = await database.getAggregateRoots(baseAggregateRootCount);
const opRoots = baseAggregateRoots.concat(snapshotRoots);

// Count of leafs in aggregate tree at targetAggregateRoot.
hubStore = new OptimisticHubDBHelper(opRoots, aggregateRootCount);
}

const hubSMT = new SparseMerkleTree(hubStore);
const destinationSpokeConnector = config.chains[destinationDomain]?.deployments.spokeConnector;
if (!destinationSpokeConnector) {
throw new NoDestinationDomainForProof(destinationDomain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
ExecStatus,
RelayerTaskStatus,
ModeType,
Snapshot,
} from "@connext/nxtp-utils";

import {
Expand Down Expand Up @@ -176,10 +177,15 @@ export const enqueue = async () => {
.filter((domain) => domain != destinationDomain)
.map(async (originDomain) => {
let latestMessageRoot: RootMessage | undefined = undefined;
let aggregateRootCount: number | undefined;
let targetMessageRoot: string;
let messageRootCount: number | undefined;
let messageRootIndex: number | undefined;
let snapshot: Snapshot | undefined;
const targetAggregateRoot: ReceivedAggregateRoot = curDestAggRoots[0];
try {
// Slowmode
if (mode == ModeType.SlowMode) {
if (mode === ModeType.SlowMode) {
// Slowmode
for (const destAggregateRoot of curDestAggRoots) {
latestMessageRoot = await database.getLatestMessageRoot(originDomain, destAggregateRoot.root);
Expand All @@ -188,9 +194,22 @@ export const enqueue = async () => {
if (!latestMessageRoot) {
throw new NoTargetMessageRoot(originDomain);
}
// Count of leafs in aggregate tree at targetAggregateRoot.
aggregateRootCount = await database.getAggregateRootCount(targetAggregateRoot.root);
if (!aggregateRootCount) {
throw new NoAggregateRootCount(targetAggregateRoot.root);
}

targetMessageRoot = latestMessageRoot.root;

// Index of messageRoot leaf node in aggregate tree.
messageRootIndex = await database.getMessageRootIndex(config.hubDomain, targetMessageRoot);
if (messageRootIndex === undefined) {
throw new NoMessageRootIndex(originDomain, targetMessageRoot);
}
} else {
// Optimistic mode
const snapshot = await database.getPendingAggregateRoot(targetAggregateRoot.root);
snapshot = await database.getPendingAggregateRoot(targetAggregateRoot.root);
if (snapshot) {
logger.debug("Got pending snapshot", requestContext, methodContext, {
snapshot,
Expand All @@ -207,36 +226,29 @@ export const enqueue = async () => {
if (domainIndex === -1) {
throw new NoDomainInSnapshot(originDomain, snapshot);
}
const messageRoot = snapshot.roots[domainIndex];
latestMessageRoot = await database.getRootMessage(originDomain, messageRoot);
if (!latestMessageRoot) {
targetMessageRoot = snapshot.roots[domainIndex];
if (!targetMessageRoot) {
throw new NoTargetMessageRoot(originDomain);
}
// Count of leafs in aggregate tree at snapshot baseAggregateRoot.
const _baseAggregateRootCount = await database.getAggregateRootCount(snapshot.baseAggregateRoot);
if (!_baseAggregateRootCount) {
throw new NoAggregateRootCount(snapshot.baseAggregateRoot);
}
aggregateRootCount = snapshot.roots.length + _baseAggregateRootCount;
messageRootIndex = _baseAggregateRootCount + domainIndex;
}

const targetMessageRoot = latestMessageRoot.root;
// Count of leaf nodes in origin domain`s outbound tree with the targetMessageRoot as root
const messageRootCount = await database.getMessageRootCount(originDomain, targetMessageRoot);
messageRootCount = await database.getMessageRootCount(originDomain, targetMessageRoot);
if (messageRootCount === undefined) {
throw new NoMessageRootCount(originDomain, targetMessageRoot);
}
// Index of messageRoot leaf node in aggregate tree.
const messageRootIndex = await database.getMessageRootIndex(config.hubDomain, targetMessageRoot);
if (messageRootIndex === undefined) {
throw new NoMessageRootIndex(originDomain, targetMessageRoot);
}

// Count of leafs in aggregate tree at targetAggregateRoot.
const aggregateRootCount = await database.getAggregateRootCount(targetAggregateRoot.root);
if (!aggregateRootCount) {
throw new NoAggregateRootCount(targetAggregateRoot.root);
}

const batchSize = config.proverBatchSize[destinationDomain] ?? DEFAULT_PROVER_BATCH_SIZE;
const pendingMessages = await getUnProcessedMessagesByIndex(
originDomain,
destinationDomain,
latestMessageRoot.count,
messageRootCount,
);

// Paginate through all unprocessed messages from the domain
Expand All @@ -246,14 +258,14 @@ export const enqueue = async () => {
const unprocessed = pendingMessages.slice(offset, offset + batchSize);
const subContext = createRequestContext(
"processUnprocessedMessages",
`${originDomain}-${destinationDomain}-${offset}-${latestMessageRoot.root}`,
`${originDomain}-${destinationDomain}-${offset}-${targetMessageRoot}`,
);
if (unprocessed.length > 0) {
logger.info("Got unprocessed messages for origin and destination pair", subContext, methodContext, {
unprocessed: unprocessed.map((message) => message.leaf),
originDomain,
destinationDomain,
endIndex: latestMessageRoot.count,
endIndex: messageRootCount,
offset,
});

Expand All @@ -266,6 +278,7 @@ export const enqueue = async () => {
messageRootCount,
targetAggregateRoot.root,
aggregateRootCount,
snapshot ? snapshot.roots : [],
subContext,
);
if (brokerMessage) {
Expand All @@ -287,7 +300,7 @@ export const enqueue = async () => {
{
originDomain,
destinationDomain,
endIndex: latestMessageRoot.count,
endIndex: messageRootCount,
offset,
brokerMessage,
},
Expand Down Expand Up @@ -357,6 +370,7 @@ export const createBrokerMessage = async (
messageRootCount: number,
targetAggregateRoot: string,
aggregateRootCount: number,
snapshotRoots: string[],
_requestContext: RequestContext,
): Promise<BrokerMessage | undefined> => {
const {
Expand Down Expand Up @@ -425,6 +439,7 @@ export const createBrokerMessage = async (
messageRootCount,
aggregateRoot: targetAggregateRoot,
aggregateRootCount,
snapshotRoots,
};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ export type BrokerMessage = {
messageRootCount: number;
aggregateRoot: string;
aggregateRootCount: number;
snapshotRoots: string[];
};
2 changes: 2 additions & 0 deletions packages/agents/lighthouse/test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
RootMessage,
RelayerType,
ReceivedAggregateRoot,
ModeType,
} from "@connext/nxtp-utils";
import { Relayer } from "@connext/nxtp-adapters-relayer";
import { mockRelayer } from "@connext/nxtp-adapters-relayer/test/mock";
Expand Down Expand Up @@ -102,6 +103,7 @@ export const mock = {
},
config: mock.config(),
chainData: mock.chainData(),
mode: ModeType.SlowMode,
};
},
processFromRootCtx: (): ProcessFromRootContext => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const mockBrokerMesage: BrokerMessage = {
messageRootCount: 2,
aggregateRoot: mkBytes32("0x222"),
aggregateRootCount: 2,
snapshotRoots: [],
};

const requestContext = createRequestContext("Publisher");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { expect, createRequestContext, mkBytes32 } from "@connext/nxtp-utils";
import { expect, createRequestContext, mkBytes32, ModeType } from "@connext/nxtp-utils";
import { SinonStub, stub } from "sinon";

import {
enqueue,
prefetch,
createBrokerMessage,
getUnProcessedMessagesByIndex,
acquireLock,
releaseLock,
} from "../../../../src/tasks/prover/operations/publisher";
import * as PublisherFns from "../../../../src/tasks/prover/operations/publisher";
import { mock, mockXMessage1, mockXMessage2, mockRootMessage, mockReceivedRoot } from "../../../mock";
Expand All @@ -22,6 +24,7 @@ const mockBrokerMesage: BrokerMessage = {
messageRootCount: 2,
aggregateRoot: mkBytes32("0x222"),
aggregateRootCount: 2,
snapshotRoots: [],
};

const requestContext = createRequestContext("Publisher");
Expand Down Expand Up @@ -70,6 +73,44 @@ describe("Operations: Publisher", () => {
expect(createBrokerMessageStub.callCount).to.be.eq(4);
});

it("happy case should enqueue a broker messages in op mode", async () => {
acquireLock();
const channel = await proverCtxMock.adapters.mqClient.createChannel();
const publishStub = (channel.publish as SinonStub).resolves();
proverCtxMock.mode = ModeType.OptimisticMode;

(proverCtxMock.adapters.database.getLatestAggregateRoots as SinonStub).resolves([mockReceivedRoot]);
(proverCtxMock.adapters.database.getLatestMessageRoot as SinonStub).resolves(mockRootMessage);
(proverCtxMock.adapters.database.getPendingAggregateRoot as SinonStub).resolves(mock.entity.snapshot());
(proverCtxMock.adapters.database.getRootMessage as SinonStub).resolves(mockRootMessage);
(proverCtxMock.adapters.database.getMessageRootCount as SinonStub).resolves(1);
(proverCtxMock.adapters.database.getMessageRootIndex as SinonStub).resolves(1);
(proverCtxMock.adapters.database.getAggregateRootCount as SinonStub).resolves(1);
(proverCtxMock.adapters.cache.messages.setStatus as SinonStub).resolves();
createBrokerMessageStub.resolves(mockBrokerMesage);
await enqueue();
releaseLock();
expect(createBrokerMessageStub.callCount).to.be.eq(4);
});

it("should not enqueue if no snapshot root in op mode", async () => {
const channel = await proverCtxMock.adapters.mqClient.createChannel();
const publishStub = (channel.publish as SinonStub).resolves();
proverCtxMock.mode = ModeType.OptimisticMode;

(proverCtxMock.adapters.database.getLatestAggregateRoots as SinonStub).resolves([mockReceivedRoot]);
(proverCtxMock.adapters.database.getLatestMessageRoot as SinonStub).resolves(mockRootMessage);
(proverCtxMock.adapters.database.getPendingAggregateRoot as SinonStub).resolves(undefined);
(proverCtxMock.adapters.database.getRootMessage as SinonStub).resolves(mockRootMessage);
(proverCtxMock.adapters.database.getMessageRootCount as SinonStub).resolves(1);
(proverCtxMock.adapters.database.getMessageRootIndex as SinonStub).resolves(1);
(proverCtxMock.adapters.database.getAggregateRootCount as SinonStub).resolves(1);
(proverCtxMock.adapters.cache.messages.setStatus as SinonStub).resolves();
createBrokerMessageStub.resolves(mockBrokerMesage);
await enqueue();
expect(createBrokerMessageStub.callCount).to.be.eq(0);
});

it("should catch error if no pending aggregate root", async () => {
(proverCtxMock.adapters.database.getLatestAggregateRoots as SinonStub).resolves([mockReceivedRoot]);
(proverCtxMock.adapters.database.getLatestMessageRoot as SinonStub).resolves(mockRootMessage);
Expand All @@ -80,7 +121,7 @@ describe("Operations: Publisher", () => {
(proverCtxMock.adapters.cache.messages.setNonce as SinonStub).resolves();
createBrokerMessageStub.resolves(mockBrokerMesage);
expect(await enqueue()).to.throw;
expect(createBrokerMessageStub.callCount).to.be.eq(0);
expect(createBrokerMessageStub.callCount).to.be.eq(4);
});
it("should catch error if no received aggregate root", async () => {
(proverCtxMock.adapters.database.getLatestAggregateRoots as SinonStub).resolves([]);
Expand Down Expand Up @@ -142,6 +183,7 @@ describe("Operations: Publisher", () => {
mockBrokerMesage.messageRootCount,
mockBrokerMesage.aggregateRoot,
mockBrokerMesage.aggregateRootCount,
mockBrokerMesage.snapshotRoots,
requestContext,
);
expect(brokerMessage).to.be.deep.eq(mockBrokerMesage);
Expand All @@ -157,6 +199,7 @@ describe("Operations: Publisher", () => {
mockBrokerMesage.messageRootCount,
mockBrokerMesage.aggregateRoot,
mockBrokerMesage.aggregateRootCount,
mockBrokerMesage.snapshotRoots,
requestContext,
),
).to.eventually.be.rejectedWith(NoDestinationDomainForProof);
Expand All @@ -175,6 +218,7 @@ describe("Operations: Publisher", () => {
mockBrokerMesage.messageRootCount,
mockBrokerMesage.aggregateRoot,
mockBrokerMesage.aggregateRootCount,
mockBrokerMesage.snapshotRoots,
requestContext,
);
expect(brokerMessage).to.be.undefined;
Expand Down