Skip to content

Commit

Permalink
Merge pull request #5150 from connext/fix-retry-issue-in-lh-on-staging
Browse files Browse the repository at this point in the history
fix: retry issue in lh on staging
  • Loading branch information
wanglonghong authored Nov 10, 2023
2 parents f857e3c + a9e703e commit 32c8281
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 25 deletions.
7 changes: 5 additions & 2 deletions packages/adapters/cache/src/lib/caches/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class MessagesCache extends Cache {
for (const value of values) {
const message = await this.getMessage(value.leaf);
if (message) {
await this.storeMessage(message.data, value.status, message.attempt);
await this.storeMessage(message.data, value.status, value.status == ExecStatus.None ? 0 : message.attempt);
}
}
}
Expand Down Expand Up @@ -122,7 +122,10 @@ export class MessagesCache extends Cache {
*/
private async addPending(originDomain: string, destinationDomain: string, leaf: string) {
const pendingKey = `${originDomain}-${destinationDomain}`;
await this.data.rpush(`${this.prefix}:pending:${pendingKey}`, leaf);
const message = await this.getMessage(leaf);
if (!message) {
await this.data.rpush(`${this.prefix}:pending:${pendingKey}`, leaf);
}
}

/**
Expand Down
47 changes: 46 additions & 1 deletion packages/adapters/cache/test/lib/caches/messages.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
import { ExecStatus, Logger, RelayerType, XMessage, expect, mkBytes32, mock } from "@connext/nxtp-utils";
import {
ExecStatus,
Logger,
RelayerType,
XMessage,
expect,
getRandomBytes32,
mkBytes32,
mock,
} from "@connext/nxtp-utils";
import { MessagesCache } from "../../../src/index";

const logger = new Logger({ level: "debug" });
Expand All @@ -10,6 +19,23 @@ const mockXMessages: XMessage[] = [
{ ...mock.entity.xMessage(), originDomain, destinationDomain, leaf: mkBytes32("0x222") },
];

const genMockXMessages = (count: number): XMessage[] => {
const xMessages: XMessage[] = [];
for (let i = 0; i < count; i++) {
const leaf = getRandomBytes32();
const root = getRandomBytes32();
xMessages.push({
...mock.entity.xMessage(),
originDomain,
destinationDomain,
origin: { index: 100 + i, root, message: leaf },
leaf,
});
}

return xMessages;
};

describe("MessagesCache", () => {
beforeEach(async () => {
messagesCache = new MessagesCache({ host: "mock", port: 1234, mock: true, logger });
Expand Down Expand Up @@ -140,6 +166,25 @@ describe("MessagesCache", () => {
const message2 = await messagesCache.getMessage(mockXMessages[1].leaf);
expect(message2?.status).to.be.deep.eq(ExecStatus.Completed);
});

it("shouldn't add the message to the pending list if exists", async () => {
const mockXMessages = genMockXMessages(10);
await messagesCache.storeMessages(mockXMessages);

const xMessage1 = mockXMessages[0];
await messagesCache.setStatus([{ leaf: xMessage1.leaf, status: ExecStatus.Sent }]);
const message = await messagesCache.getMessage(xMessage1.leaf);
expect(message?.status).to.be.eq(ExecStatus.Sent);

let pendings = await messagesCache.getPending(originDomain, destinationDomain, 0, 100);
expect(pendings.length).to.be.eq(10);
expect(pendings).to.be.deep.eq(mockXMessages.map((it) => it.leaf));

await messagesCache.removePending(originDomain, destinationDomain, [xMessage1.leaf]);
pendings = await messagesCache.getPending(originDomain, destinationDomain, 0, 100);
expect(pendings.length).to.be.eq(9);
expect(pendings).to.be.deep.eq(mockXMessages.slice(1).map((it) => it.leaf));
});
});

describe("#nonce", () => {
Expand Down
10 changes: 10 additions & 0 deletions packages/agents/lighthouse/src/errors/prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,13 @@ export class AggregateRootDuplicated extends NxtpError {
super(`Trying to propose same aggregate root for ${aggregateRoot}`, context, AggregateRootDuplicated.name);
}
}

export class EmptyMessageProofs extends NxtpError {
constructor(originDomain: string, destinationDomain: string, context: any = {}) {
super(
`Empty message proofs for origin: ${originDomain} and destination: ${destinationDomain}`,
context,
EmptyMessageProofs.name,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export const consume = async () => {
await processMessages(brokerMessage, requestContext);
channel.ack(message);
} catch (err: unknown) {
logger.error("Processing messaages failed", requestContext, methodContext, undefined, { err });
logger.error("Processing messages failed", requestContext, methodContext, undefined, { err });
channel.reject(message, false);
const statuses = brokerMessage.messages.map((it) => ({ leaf: it.leaf, status: ExecStatus.None }));
await cache.messages.setStatus(statuses);
Expand Down
37 changes: 21 additions & 16 deletions packages/agents/lighthouse/src/tasks/prover/operations/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
NoMessageRootProof,
NoMessageProof,
MessageRootVerificationFailed,
EmptyMessageProofs,
RelayerSendFailed,
} from "../../../errors";
import { sendWithRelayerWithBackup } from "../../../mockable";
import { HubDBHelper, SpokeDBHelper, OptimisticHubDBHelper } from "../adapters";
Expand Down Expand Up @@ -98,7 +100,8 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
let failCount = 0;
for (const message of messages) {
// If message has been removed. Skip processing it.
if (!cache.messages.getMessage(message.leaf)) continue;
const _message = await cache.messages.getMessage(message.leaf);
if (!_message) continue;

const messageEncodedData = contracts.merkleTreeManager.encodeFunctionData("leaves", [message.leaf]);
try {
Expand Down Expand Up @@ -162,6 +165,9 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
});
// Do not process message if proof verification fails.
failCount += 1;

// Before you skip to process a message, the status needs to be reset so it can be retried in the next cycle.
await cache.messages.setStatus([{ leaf: message.leaf, status: ExecStatus.None }]);
continue;
}
}
Expand All @@ -183,7 +189,8 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
originDomain,
destinationDomain,
});
return;

throw new EmptyMessageProofs(originDomain, destinationDomain);
}

// Proof path for proving inclusion of messageRoot in aggregateRoot.
Expand Down Expand Up @@ -264,20 +271,18 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
requestContext,
);
logger.info("Proved and processed message sent to relayer", requestContext, methodContext, { taskId });
if (taskId) {
await cache.messages.addTaskPending(
taskId,
relayerType,
originDomain,
destinationDomain,
provenMessages.map((it) => it.leaf),
);
const statuses = messages.map((it) => ({ leaf: it.leaf, status: ExecStatus.Sent }));
await cache.messages.setStatus(statuses);

return;
}
await cache.messages.addTaskPending(
taskId,
relayerType,
originDomain,
destinationDomain,
provenMessages.map((it) => it.leaf),
);
const statuses = messages.map((it) => ({ leaf: it.leaf, status: ExecStatus.Sent }));
await cache.messages.setStatus(statuses);
} catch (err: unknown) {
logger.error("Error sending proofs to relayer", requestContext, methodContext, jsonifyError(err as NxtpError));
throw new RelayerSendFailed({
error: jsonifyError(err as Error),
});
}
};
1 change: 1 addition & 0 deletions packages/agents/lighthouse/test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export const mockCache = () => {
storeMessages: stub().resolves(),
getPending: stub().resolves(),
getPendingTasks: stub().resolves(),
addTaskPending: stub().resolves(),
getMessage: stub().resolves(),
increaseAttempt: stub().resolves(),
removePending: stub().resolves(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { expect, createRequestContext, SparseMerkleTree, mkBytes32, XMessage } from "@connext/nxtp-utils";
import { SinonStub, stub } from "sinon";
import { SinonStub, stub, restore, reset } from "sinon";

import { mock, mockXMessage1, mockXMessage2 } from "../../../mock";
import { proverCtxMock, sendWithRelayerWithBackupStub } from "../../../globalTestHook";
import { NoDestinationDomainForProof, NoMessageProof } from "../../../../src/errors";
import { EmptyMessageProofs, NoDestinationDomainForProof, NoMessageProof } from "../../../../src/errors";
import { BrokerMessage } from "../../../../src/tasks/prover/operations/types";
import { processMessages } from "../../../../src/tasks/prover/operations";

Expand All @@ -24,6 +24,9 @@ describe("Operations: Process", () => {
describe("#processMessages", () => {
let getProofStub: SinonStub;
let verifyStub: SinonStub;
let getMessageStub: SinonStub;
let addTaskPendingStub: SinonStub;
let setStatusStub: SinonStub;
let encodeFunctionDataStubMT: SinonStub;
let encodeFunctionDataStubSC: SinonStub;

Expand All @@ -32,16 +35,28 @@ describe("Operations: Process", () => {
verifyStub = stub(SparseMerkleTree.prototype, "verify");
encodeFunctionDataStubMT = proverCtxMock.adapters.contracts.merkleTreeManager.encodeFunctionData as SinonStub;
encodeFunctionDataStubSC = proverCtxMock.adapters.contracts.spokeConnector.encodeFunctionData as SinonStub;
getMessageStub = proverCtxMock.adapters.cache.messages.getMessage as SinonStub;
addTaskPendingStub = proverCtxMock.adapters.cache.messages.addTaskPending as SinonStub;
setStatusStub = proverCtxMock.adapters.cache.messages.setStatus as SinonStub;
});

afterEach(() => {
restore();
reset();
});

it("happy case should work", async () => {
getProofStub.returns(["0x1"]);
verifyStub.returns({ calculated: "0x", verified: true });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
encodeFunctionDataStubSC.returns("0x");
encodeFunctionDataStubMT.returns("0x");
sendWithRelayerWithBackupStub.resolves({
taskId: "0x123",
});

mockBrokerMesage.messages.push(mockBrokerMesage.messages[0]);
mockBrokerMesage.messages.push(mockBrokerMesage.messages[2]);
const mockXMessage2: XMessage = {
Expand All @@ -60,6 +75,9 @@ describe("Operations: Process", () => {

it("should be fulfilled", async () => {
getProofStub.returns(["0x1"]);
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
verifyStub.returns({ verified: true });
(proverCtxMock.adapters.contracts.merkleTreeManager.encodeFunctionData as SinonStub).returns("0x");
await processMessages(mockBrokerMesage, requestContext);
Expand All @@ -77,6 +95,9 @@ describe("Operations: Process", () => {
it("should catch error if no message proof", async () => {
getProofStub.returns(undefined);
verifyStub.returns({ verified: true });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
(proverCtxMock.adapters.contracts.merkleTreeManager.encodeFunctionData as SinonStub).returns("0x");
(proverCtxMock.adapters.contracts.merkleTreeManager.decodeFunctionResult as SinonStub).returns([0]);
await expect(processMessages(mockBrokerMesage, requestContext)).to.eventually.be.rejectedWith(NoMessageProof);
Expand All @@ -85,24 +106,33 @@ describe("Operations: Process", () => {
it("should do nothing if empty message proof", async () => {
getProofStub.returns(["0x"]);
verifyStub.returns({ verified: false });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
(proverCtxMock.adapters.contracts.merkleTreeManager.encodeFunctionData as SinonStub).returns("0x");
await processMessages(mockBrokerMesage, requestContext);
await expect(processMessages(mockBrokerMesage, requestContext)).to.eventually.be.rejectedWith(EmptyMessageProofs);
});

it("should do nothing if already processed", async () => {
getProofStub.returns(["0x"]);
verifyStub.returns({ verified: false });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
(proverCtxMock.adapters.contracts.merkleTreeManager.encodeFunctionData as SinonStub).returns("0x");
(proverCtxMock.adapters.contracts.merkleTreeManager.decodeFunctionResult as SinonStub).returns([2]);
await processMessages(mockBrokerMesage, requestContext);
await expect(processMessages(mockBrokerMesage, requestContext)).to.eventually.be.rejectedWith(EmptyMessageProofs);
});

it("should do nothing if status unused", async () => {
getProofStub.returns(["0x"]);
verifyStub.returns({ verified: false });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
(proverCtxMock.adapters.contracts.merkleTreeManager.encodeFunctionData as SinonStub).returns("0x");
(proverCtxMock.adapters.contracts.merkleTreeManager.decodeFunctionResult as SinonStub).returns([1]);
await processMessages(mockBrokerMesage, requestContext);
await expect(processMessages(mockBrokerMesage, requestContext)).to.eventually.be.rejectedWith(EmptyMessageProofs);
});
});
});

0 comments on commit 32c8281

Please sign in to comment.