Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow horizontal scalability of lighthouse prover #4347

Merged
merged 59 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
2b04d69
feat: install amqplib
wanglonghong May 26, 2023
3112591
build: fix block-scoped variable
wanglonghong May 26, 2023
a9b49c6
feat: make pub/sub
wanglonghong May 30, 2023
0d29963
feat: add cache
wanglonghong May 30, 2023
9b51b23
fix: cache and config
wanglonghong May 30, 2023
89cd6fc
Merge branch 'main' into feat/lh-prover-pub-sub
wanglonghong Jun 1, 2023
86ae94b
feat: mvp of mq consumer
wanglonghong Jun 1, 2023
d9d85fa
Merge branch 'main' into feat/lh-prover-pub-sub
rhlsthrm Jun 2, 2023
ccff694
feat: lambda subscriber (#4372)
carlomazzaferro Jun 2, 2023
730d581
Merge branch 'main' into feat/lh-prover-pub-sub
rhlsthrm Jun 2, 2023
e1a8285
chore: yarn
rhlsthrm Jun 2, 2023
6719fa2
ci: fix tf and build docker
rhlsthrm Jun 2, 2023
f265b4a
ci: always build lh
rhlsthrm Jun 2, 2023
ecea6e9
ci: try adding package
rhlsthrm Jun 2, 2023
49bd57e
ci: secrets
rhlsthrm Jun 2, 2023
2d0a6d3
ci: fix lh config
rhlsthrm Jun 2, 2023
40d0417
ci: fix mq
rhlsthrm Jun 2, 2023
58a218a
test: missing functions and branches
wanglonghong Jun 2, 2023
fa067f6
chore: Yarn lock
wanglonghong Jun 2, 2023
49c054b
chore: eslint
wanglonghong Jun 2, 2023
6d31546
ci: check the latest commit
wanglonghong Jun 2, 2023
75f1ec9
fix: index
wanglonghong Jun 5, 2023
6335584
ci: ops
wanglonghong Jun 5, 2023
d1ca170
feat: create a queue and binding on publisher side
wanglonghong Jun 5, 2023
1f14aad
ci: bump memory
rhlsthrm Jun 5, 2023
f5656f7
feat: get event from process.argv
wanglonghong Jun 5, 2023
c321799
fix: type
wanglonghong Jun 5, 2023
c1ecc31
chore: logging arg1 and arg2
wanglonghong Jun 5, 2023
a2abc49
chore: move logging location
wanglonghong Jun 5, 2023
0674bf9
chore: console
wanglonghong Jun 5, 2023
590169e
chore: optional
wanglonghong Jun 5, 2023
413f212
feat: decode base64 string
wanglonghong Jun 5, 2023
d83649c
fix: pass event param
wanglonghong Jun 5, 2023
ee6fca6
ci: change secret
rhlsthrm Jun 6, 2023
235b588
fix: decode properly
wanglonghong Jun 6, 2023
0303cc1
Merge branch 'feat/lh-prover-pub-sub' of https://github.com/connext/m…
wanglonghong Jun 6, 2023
07b4132
ci: secret
rhlsthrm Jun 6, 2023
3ed6549
ci: secret
rhlsthrm Jun 6, 2023
ceef615
ci: ops for lh prover
rhlsthrm Jun 6, 2023
fb25c54
ci: remove lighthouse lambda subscriber
rhlsthrm Jun 6, 2023
a97b90e
ci: var
rhlsthrm Jun 6, 2023
72fa480
feat: implement message cache
wanglonghong Jun 6, 2023
f41ee26
feat: update message cache per pair
wanglonghong Jun 6, 2023
8900d12
feat: store processed messages
wanglonghong Jun 6, 2023
a58e8b8
choe: print logs
wanglonghong Jun 6, 2023
4ab5787
chore: remove console
wanglonghong Jun 6, 2023
41534cb
fix: use lighthouse directory
preethamr Jun 6, 2023
8204f7a
fix: disable lh prover caching temporarily
preethamr Jun 6, 2023
336b8cf
fix: update tf config for lh prover sub
preethamr Jun 7, 2023
8c04409
fix: make auto scaling more responsive to load
preethamr Jun 7, 2023
7680bdd
fix: add lh redis config to main and staging tf
preethamr Jun 7, 2023
6266bf2
Revert "fix: disable lh prover caching temporarily"
wanglonghong Jun 7, 2023
e83f2ea
Merge branch 'main' into feat/lh-prover-pub-sub
wanglonghong Jun 7, 2023
f92f008
fix: update config to connect lambda to redis
preethamr Jun 8, 2023
83e5b98
lh prover in vpc
carlomazzaferro Jun 8, 2023
8f2774a
fix: lh prover lambda network config
preethamr Jun 8, 2023
d97e7dc
fix: attach private subnets only
preethamr Jun 8, 2023
2c375b0
ci: fix
rhlsthrm Jun 9, 2023
1cb101f
ci: ops
rhlsthrm Jun 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/agents/lighthouse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"src/**/*"
],
"scripts": {
"start:prover": "LIGHTHOUSE_SERVICE=prover node --enable-source-maps dist/tasks/run.js",
"start:prover:pub": "LIGHTHOUSE_SERVICE=prover-pub node --enable-source-maps dist/tasks/run.js",
"start:prover:sub": "LIGHTHOUSE_SERVICE=prover-sub node --enable-source-maps dist/tasks/run.js",
"start:process": "LIGHTHOUSE_SERVICE=process node --enable-source-maps dist/tasks/run.js",
"start:propagate": "LIGHTHOUSE_SERVICE=propagate node --enable-source-maps dist/tasks/run.js",
"start:sendoutboundroot": "LIGHTHOUSE_SERVICE=sendoutboundroot node --enable-source-maps dist/tasks/run.js",
Expand All @@ -34,6 +35,7 @@
"@eth-optimism/sdk": "2.0.1",
"@sinclair/typebox": "0.25.21",
"@types/aws-lambda": "8.10.110",
"amqplib": "0.10.3",
"datadog-lambda-js": "6.86.0",
"dd-trace": "3.13.2",
"dotenv": "16.0.3",
Expand Down
55 changes: 48 additions & 7 deletions packages/agents/lighthouse/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ const MIN_CARTOGRAPHER_POLL_INTERVAL = 30_000;
const DEFAULT_CARTOGRAPHER_POLL_INTERVAL = 60_000;
export const DEFAULT_PROVER_BATCH_SIZE = 1;
export const DEFAULT_RELAYER_WAIT_TIME = 60_000 * 3600; // 1 hour
export const DEFAULT_CONCURRENCY = 1;

dotenvConfig();

Expand All @@ -35,6 +34,30 @@ export const TPollingConfig = Type.Object({
cartographer: Type.Integer({ minimum: MIN_CARTOGRAPHER_POLL_INTERVAL }),
});

export const TMQConfig = Type.Object({
connection: Type.Object({
uri: Type.String(),
}),
exchange: Type.Object({
name: Type.String(),
type: Type.Union([Type.Literal("fanout"), Type.Literal("topic"), Type.Literal("direct")]),
publishTimeout: Type.Integer(),
persistent: Type.Boolean(),
durable: Type.Boolean(),
}),
subscriber: Type.Optional(Type.String()),
queueLimit: Type.Optional(Type.Number()),
prefetchSize: Type.Optional(Type.Number()),
});

export const TServerConfig = Type.Object({
prover: Type.Object({
port: Type.Integer({ minimum: 1, maximum: 65535 }),
host: Type.String({ format: "ipv4" }),
}),
adminToken: Type.String(),
});

export const NxtpLighthouseConfigSchema = Type.Object({
hubDomain: Type.String(),
chains: Type.Record(Type.String(), TChainConfig),
Expand Down Expand Up @@ -63,13 +86,15 @@ export const NxtpLighthouseConfigSchema = Type.Object({
),
proverBatchSize: Type.Record(Type.String(), Type.Integer({ minimum: 1, maximum: 100 })),
relayerWaitTime: Type.Integer({ minimum: 0 }),
concurrency: Type.Integer({ minimum: 0 }),
service: Type.Union([
Type.Literal("prover"),
Type.Literal("prover-pub"),
Type.Literal("prover-sub"),
Type.Literal("propagate"),
Type.Literal("process"),
Type.Literal("sendoutboundroot"),
]),
messageQueue: TMQConfig,
server: TServerConfig,
});

export type NxtpLighthouseConfig = Static<typeof NxtpLighthouseConfigSchema>;
Expand Down Expand Up @@ -162,11 +187,27 @@ export const getEnvConfig = (
configJson.relayerWaitTime ||
configFile.relayerWaitTime ||
DEFAULT_RELAYER_WAIT_TIME,
concurrency: process.env.NXTP_PROVER_CONCURRENCY
? +process.env.NXTP_PROVER_CONCURRENCY
: undefined || configJson.concurrency || configFile.concurrency || DEFAULT_CONCURRENCY,
messageQueue: process.env.MESSAGE_QUEUE
? JSON.parse(process.env.MESSAGE_QUEUE)
: configJson.messageQueue ?? configFile.messageQueue,
server: {
prover: {
host:
process.env.PROVER_SUB_SERVER_HOST ||
configJson.server?.prover?.host ||
configFile.server?.prover?.host ||
"0.0.0.0",
port: process.env.PROVER_SUB_SERVER_PORT
? +process.env.PROVER_SUB_SERVER_PORT
: undefined || configJson.server?.prover?.port || configFile.server?.prover?.port || 7072,
},
adminToken:
process.env.LH_SERVER_ADMIN_TOKEN ||
configJson.server?.adminToken ||
configFile.server?.adminToken ||
"blahblah",
},
};

nxtpConfig.cartographerUrl =
nxtpConfig.cartographerUrl ??
(nxtpConfig.environment === "production"
Expand Down
14 changes: 9 additions & 5 deletions packages/agents/lighthouse/src/tasks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@ import { contractDeployments } from "@connext/nxtp-txservice";

import { getConfig } from "../config";

import { makeProver } from "./prover";
import { makeProverPublisher, makeProverSubscriber } from "./prover";
import { makePropagate } from "./propagate";
import { makeProcessFromRoot } from "./processFromRoot";
import { makeSendOutboundRoot } from "./sendOutboundRoot";

export const makeLighthouse = async () => {
export const makeLighthouse = async (_service?: string) => {
const chainData = await getChainData();
if (!chainData) {
throw new Error("Could not get chain data");
}
const config = await getConfig(chainData, contractDeployments);
switch (process.env.LIGHTHOUSE_SERVICE) {
case "prover":
await makeProver(config, chainData);
const service = _service ?? process.env.LIGHTHOUSE_SERVICE;
switch (service) {
case "prover-pub":
await makeProverPublisher(config, chainData);
break;
case "prover-sub":
await makeProverSubscriber(config, chainData);
break;
case "propagate":
await makePropagate(config, chainData);
Expand Down
34 changes: 34 additions & 0 deletions packages/agents/lighthouse/src/tasks/prover/bindings/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { AdminRequest } from "@connext/nxtp-utils";
import fastify, { FastifyInstance, FastifyReply } from "fastify";

import { getContext } from "../prover";

export const bindHealthServer = async (): Promise<FastifyInstance> => {
const { config, logger } = getContext();

const server = fastify();

server.get("/ping", (_, res) => api.get.ping(res));

const address = await server.listen({ port: config.server.prover.port, host: config.server.prover.host });
logger.info(`Server listening at ${address}`);
return server;
};

export const api = {
auth: {
admin: (body: AdminRequest, res: FastifyReply, nested: (res: FastifyReply) => Promise<void>) => {
const { config } = getContext();
const { adminToken } = body;
if (adminToken !== config.server.adminToken) {
return res.status(401).send("Unauthorized to perform this operation");
}
return nested(res);
},
},
get: {
ping: async (res: FastifyReply) => {
return res.status(200).send("pong\n");
},
},
};
2 changes: 2 additions & 0 deletions packages/agents/lighthouse/src/tasks/prover/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { ChainReader, ConnextContractInterfaces } from "@connext/nxtp-txservice"
import { ChainData, Logger, RelayerType } from "@connext/nxtp-utils";
import { Database } from "@connext/nxtp-adapters-database";
import { Relayer } from "@connext/nxtp-adapters-relayer";
import Broker from "amqplib";

import { NxtpLighthouseConfig } from "../../config";

Expand All @@ -13,6 +14,7 @@ export type ProverContext = {
contracts: ConnextContractInterfaces; // Used to read and write to smart contracts.
relayers: { instance: Relayer; apiKey: string; type: RelayerType }[]; // Used to send txs to relayer.
database: Database;
mqClient: Broker.Connection;
};
config: NxtpLighthouseConfig;
chainData: Map<string, ChainData>;
Expand Down
72 changes: 72 additions & 0 deletions packages/agents/lighthouse/src/tasks/prover/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { createLoggingContext, getChainData } from "@connext/nxtp-utils";
import { contractDeployments } from "@connext/nxtp-txservice";

import { getConfig } from "../../config";

import { getContext, makeProver } from "./prover";
import { BrokerMessage } from "./operations/publisher";
import { processMessages } from "./operations/consumer";

export const handler = async (event: any): Promise<{ statusCode: number; body: string }> => {
const { requestContext, methodContext } = createLoggingContext("AmazonMQ.consumer");
try {
const chainData = await getChainData();
if (!chainData) {
return {
statusCode: 500,
body: "Failed to get chaind data",
};
}
const config = await getConfig(chainData, contractDeployments);
await makeProver(config, chainData);

const { logger } = getContext();

/*
* Example Rabbit MQ record events
* In the RabbitMQ example, pizzaQueue is the name of the RabbitMQ queue, and / is the name of the virtual host.
* When receiving messages, the event source lists messages under pizzaQueue::/.
*
* {
* "eventSource": "aws:rmq",
* "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8",
* "rmqMessagesByQueue": {
* "pizzaQueue::/": [
* {
* "basicProperties": {
* ...
* },
* "redelivered": false,
* "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
* }
* ]
* }
* }
*/
logger.info("Received an event from mq", requestContext, methodContext, { event });

const rmqMessagesByQueue = event.rmqMessagesByQueue as Record<string, any[]>;
const queues = Object.keys(rmqMessagesByQueue);
const brokerMessagesToProcess: BrokerMessage[] = [];
for (const queue of queues) {
const queueMessages = rmqMessagesByQueue[queue].map(
(msg: any) => JSON.parse(msg.data as string) as BrokerMessage,
);
brokerMessagesToProcess.push(...queueMessages);
}

for (const brokerMessage of brokerMessagesToProcess) {
await processMessages(brokerMessage, requestContext);
}
return {
statusCode: 200,
body: `Processed ${brokerMessagesToProcess.length} messages`,
};
} catch (err: unknown) {
console.error(`Message processing failed, error: ${err}`);
return {
statusCode: 500,
body: `Message processing failed, error: ${err}`,
};
}
};
2 changes: 1 addition & 1 deletion packages/agents/lighthouse/src/tasks/prover/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export { makeProver } from "./prover";
export { makeProverPublisher, makeProverSubscriber } from "./prover";
Loading