Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry issue on prod #5160

Merged
merged 3 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/adapters/cache/src/lib/caches/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class MessagesCache extends Cache {
for (const value of values) {
const message = await this.getMessage(value.leaf);
if (message) {
await this.storeMessage(message.data, value.status, message.attempt);
await this.storeMessage(message.data, value.status, value.status == ExecStatus.None ? 0 : message.attempt);
}
}
}
Expand Down Expand Up @@ -122,7 +122,10 @@ export class MessagesCache extends Cache {
*/
private async addPending(originDomain: string, destinationDomain: string, leaf: string) {
const pendingKey = `${originDomain}-${destinationDomain}`;
await this.data.rpush(`${this.prefix}:pending:${pendingKey}`, leaf);
const message = await this.getMessage(leaf);
if (!message) {
await this.data.rpush(`${this.prefix}:pending:${pendingKey}`, leaf);
}
}

/**
Expand Down
47 changes: 46 additions & 1 deletion packages/adapters/cache/test/lib/caches/messages.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
import { ExecStatus, Logger, RelayerType, XMessage, expect, mkBytes32, mock } from "@connext/nxtp-utils";
import {
ExecStatus,
Logger,
RelayerType,
XMessage,
expect,
getRandomBytes32,
mkBytes32,
mock,
} from "@connext/nxtp-utils";
import { MessagesCache } from "../../../src/index";

const logger = new Logger({ level: "debug" });
Expand All @@ -10,6 +19,23 @@ const mockXMessages: XMessage[] = [
{ ...mock.entity.xMessage(), originDomain, destinationDomain, leaf: mkBytes32("0x222") },
];

const genMockXMessages = (count: number): XMessage[] => {
const xMessages: XMessage[] = [];
for (let i = 0; i < count; i++) {
const leaf = getRandomBytes32();
const root = getRandomBytes32();
xMessages.push({
...mock.entity.xMessage(),
originDomain,
destinationDomain,
origin: { index: 100 + i, root, message: leaf },
leaf,
});
}

return xMessages;
};

describe("MessagesCache", () => {
beforeEach(async () => {
messagesCache = new MessagesCache({ host: "mock", port: 1234, mock: true, logger });
Expand Down Expand Up @@ -140,6 +166,25 @@ describe("MessagesCache", () => {
const message2 = await messagesCache.getMessage(mockXMessages[1].leaf);
expect(message2?.status).to.be.deep.eq(ExecStatus.Completed);
});

it("shouldn't add the message to the pending list if exists", async () => {
const mockXMessages = genMockXMessages(10);
await messagesCache.storeMessages(mockXMessages);

const xMessage1 = mockXMessages[0];
await messagesCache.setStatus([{ leaf: xMessage1.leaf, status: ExecStatus.Sent }]);
const message = await messagesCache.getMessage(xMessage1.leaf);
expect(message?.status).to.be.eq(ExecStatus.Sent);

let pendings = await messagesCache.getPending(originDomain, destinationDomain, 0, 100);
expect(pendings.length).to.be.eq(10);
expect(pendings).to.be.deep.eq(mockXMessages.map((it) => it.leaf));

await messagesCache.removePending(originDomain, destinationDomain, [xMessage1.leaf]);
pendings = await messagesCache.getPending(originDomain, destinationDomain, 0, 100);
expect(pendings.length).to.be.eq(9);
expect(pendings).to.be.deep.eq(mockXMessages.slice(1).map((it) => it.leaf));
});
});

describe("#nonce", () => {
Expand Down
10 changes: 10 additions & 0 deletions packages/agents/lighthouse/src/errors/prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,13 @@ export class NoMessageProof extends NxtpError {
super(`No index ${index} for message hash ${leaf}`, context, NoMessageProof.name);
}
}

export class EmptyMessageProofs extends NxtpError {
constructor(originDomain: string, destinationDomain: string, context: any = {}) {
super(
`Empty message proofs for origin: ${originDomain} and destination: ${destinationDomain}`,
context,
EmptyMessageProofs.name,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export const consume = async () => {
await processMessages(brokerMessage, requestContext);
channel.ack(message);
} catch (err: unknown) {
logger.error("Processing messaages failed", requestContext, methodContext, undefined, { err });
logger.error("Processing messages failed", requestContext, methodContext, undefined, { err });
channel.reject(message, false);
const statuses = brokerMessage.messages.map((it) => ({ leaf: it.leaf, status: ExecStatus.None }));
await cache.messages.setStatus(statuses);
Expand Down
37 changes: 21 additions & 16 deletions packages/agents/lighthouse/src/tasks/prover/operations/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
NoMessageRootProof,
NoMessageProof,
MessageRootVerificationFailed,
EmptyMessageProofs,
RelayerSendFailed,
} from "../../../errors";
import { sendWithRelayerWithBackup } from "../../../mockable";
import { HubDBHelper, SpokeDBHelper } from "../adapters";
Expand Down Expand Up @@ -86,7 +88,8 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
let failCount = 0;
for (const message of messages) {
// If message has been removed. Skip processing it.
if (!cache.messages.getMessage(message.leaf)) continue;
const _message = await cache.messages.getMessage(message.leaf);
if (!_message) continue;

const messageEncodedData = contracts.spokeConnector.encodeFunctionData("messages", [message.leaf]);
try {
Expand Down Expand Up @@ -150,6 +153,9 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
});
// Do not process message if proof verification fails.
failCount += 1;

// Before you skip to process a message, the status needs to be reset so it can be retried in the next cycle.
await cache.messages.setStatus([{ leaf: message.leaf, status: ExecStatus.None }]);
continue;
}
}
Expand All @@ -171,7 +177,8 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
originDomain,
destinationDomain,
});
return;

throw new EmptyMessageProofs(originDomain, destinationDomain);
}

// Proof path for proving inclusion of messageRoot in aggregateRoot.
Expand Down Expand Up @@ -271,20 +278,18 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
requestContext,
);
logger.info("Proved and processed message sent to relayer", requestContext, methodContext, { taskId });
if (taskId) {
await cache.messages.addTaskPending(
taskId,
relayerType,
originDomain,
destinationDomain,
provenMessages.map((it) => it.leaf),
);
const statuses = messages.map((it) => ({ leaf: it.leaf, status: ExecStatus.Sent }));
await cache.messages.setStatus(statuses);

return;
}
await cache.messages.addTaskPending(
taskId,
relayerType,
originDomain,
destinationDomain,
provenMessages.map((it) => it.leaf),
);
const statuses = messages.map((it) => ({ leaf: it.leaf, status: ExecStatus.Sent }));
await cache.messages.setStatus(statuses);
} catch (err: unknown) {
logger.error("Error sending proofs to relayer", requestContext, methodContext, jsonifyError(err as NxtpError));
throw new RelayerSendFailed({
error: jsonifyError(err as Error),
});
}
};
2 changes: 2 additions & 0 deletions packages/agents/lighthouse/test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ export const mockCache = () => {
storeMessages: stub().resolves(),
getPending: stub().resolves(),
getPendingTasks: stub().resolves(),
addTaskPending: stub().resolves(),
getMessage: stub().resolves(),
setStatus: stub().resolves(),
increaseAttempt: stub().resolves(),
removePending: stub().resolves(),
getNode: stub().resolves(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { expect, createRequestContext, SparseMerkleTree, mkBytes32, XMessage } from "@connext/nxtp-utils";
import { SinonStub, stub } from "sinon";
import { SinonStub, stub, restore, reset } from "sinon";

import { mock, mockXMessage1, mockXMessage2 } from "../../../mock";
import { proverCtxMock } from "../../../globalTestHook";
import { NoDestinationDomainForProof, NoMessageProof } from "../../../../src/errors";
import { proverCtxMock, sendWithRelayerWithBackupStub } from "../../../globalTestHook";
import { EmptyMessageProofs, NoDestinationDomainForProof, NoMessageProof } from "../../../../src/errors";
import { BrokerMessage } from "../../../../src/tasks/prover/operations/types";
import { processMessages } from "../../../../src/tasks/prover/operations";

Expand All @@ -23,14 +23,36 @@ describe("Operations: Process", () => {
describe("#processMessages", () => {
let getProofStub: SinonStub;
let verifyStub: SinonStub;
let getMessageStub: SinonStub;
let addTaskPendingStub: SinonStub;
let setStatusStub: SinonStub;
let encodeFunctionDataStubSC: SinonStub;

beforeEach(() => {
getProofStub = stub(SparseMerkleTree.prototype, "getProof");
verifyStub = stub(SparseMerkleTree.prototype, "verify");
encodeFunctionDataStubSC = proverCtxMock.adapters.contracts.spokeConnector.encodeFunctionData as SinonStub;
getMessageStub = proverCtxMock.adapters.cache.messages.getMessage as SinonStub;
addTaskPendingStub = proverCtxMock.adapters.cache.messages.addTaskPending as SinonStub;
setStatusStub = proverCtxMock.adapters.cache.messages.setStatus as SinonStub;
});
it("should dedup and be fulfilled", async () => {
getProofStub.resolves(["0x1"]);
verifyStub.resolves({ verified: true });
(proverCtxMock.adapters.contracts.spokeConnector.encodeFunctionData as SinonStub).returns("0x");

afterEach(() => {
restore();
reset();
});

it("happy case should work", async () => {
getProofStub.returns(["0x1"]);
verifyStub.returns({ calculated: "0x", verified: true });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
encodeFunctionDataStubSC.returns("0x");
sendWithRelayerWithBackupStub.resolves({
taskId: "0x123",
});

mockBrokerMesage.messages.push(mockBrokerMesage.messages[0]);
mockBrokerMesage.messages.push(mockBrokerMesage.messages[2]);
const mockXMessage2: XMessage = {
Expand All @@ -42,13 +64,17 @@ describe("Operations: Process", () => {
mockXMessage2.leaf = mockBrokerMesage.messages[0].leaf;
mockBrokerMesage.messages.push(mockXMessage2);
await processMessages(mockBrokerMesage, requestContext);
expect(getProofStub.callCount).to.equal(2);

// 2 getProof calls for leaves, 1 getProof call for root
expect(getProofStub.callCount).to.equal(3);
});

it("should be fulfilled", async () => {
getProofStub.resolves(["0x1"]);
verifyStub.resolves({ verified: true });
(proverCtxMock.adapters.contracts.spokeConnector.encodeFunctionData as SinonStub).returns("0x");
getProofStub.returns(["0x1"]);
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
verifyStub.returns({ verified: true });
await processMessages(mockBrokerMesage, requestContext);
});
it("should catch error if no destination domain proof", async () => {
Expand All @@ -60,31 +86,36 @@ describe("Operations: Process", () => {
).to.eventually.be.rejectedWith(NoDestinationDomainForProof);
});
it("should catch error if no message proof", async () => {
getProofStub.resolves(undefined);
verifyStub.resolves({ verified: true });
(proverCtxMock.adapters.contracts.spokeConnector.encodeFunctionData as SinonStub).returns("0x");
(proverCtxMock.adapters.contracts.spokeConnector.decodeFunctionResult as SinonStub).returns([0]);
getProofStub.returns(undefined);
verifyStub.returns({ verified: true });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
await expect(processMessages(mockBrokerMesage, requestContext)).to.eventually.be.rejectedWith(NoMessageProof);
});
it("should do nothing if empty message proof", async () => {
getProofStub.resolves(["0x"]);
verifyStub.resolves({ verified: false });
(proverCtxMock.adapters.contracts.spokeConnector.encodeFunctionData as SinonStub).returns("0x");
await processMessages(mockBrokerMesage, requestContext);
getProofStub.returns(["0x"]);
verifyStub.returns({ verified: false });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
await expect(processMessages(mockBrokerMesage, requestContext)).to.eventually.be.rejectedWith(EmptyMessageProofs);
});
it("should do nothing if already processed", async () => {
getProofStub.resolves(["0x"]);
verifyStub.resolves({ verified: false });
(proverCtxMock.adapters.contracts.spokeConnector.encodeFunctionData as SinonStub).returns("0x");
(proverCtxMock.adapters.contracts.spokeConnector.decodeFunctionResult as SinonStub).returns([2]);
await processMessages(mockBrokerMesage, requestContext);
getProofStub.returns(["0x"]);
verifyStub.returns({ verified: false });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
await expect(processMessages(mockBrokerMesage, requestContext)).to.eventually.be.rejectedWith(EmptyMessageProofs);
});
it("should do nothing if status unused", async () => {
getProofStub.resolves(["0x"]);
verifyStub.resolves({ verified: false });
(proverCtxMock.adapters.contracts.spokeConnector.encodeFunctionData as SinonStub).returns("0x");
(proverCtxMock.adapters.contracts.spokeConnector.decodeFunctionResult as SinonStub).returns([1]);
await processMessages(mockBrokerMesage, requestContext);
getProofStub.returns(["0x"]);
verifyStub.returns({ verified: false });
getMessageStub.resolves(mockXMessage1);
addTaskPendingStub.resolves();
setStatusStub.resolves();
await expect(processMessages(mockBrokerMesage, requestContext)).to.eventually.be.rejectedWith(EmptyMessageProofs);
});
});
});