Skip to content

Commit

Permalink
Improve logging and refactor (#4337)
Browse files Browse the repository at this point in the history
* feat: expose nonce api and refactors

* fix(utils):  mock

* test(fix): make optional
  • Loading branch information
wanglonghong authored May 25, 2023
1 parent c2e21f3 commit ca03018
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { jsonifyError, NxtpError, ClearCacheRequestSchema, ClearCacheRequest, AdminRequest } from "@connext/nxtp-utils";
import {
jsonifyError,
NxtpError,
ClearCacheRequestSchema,
ClearCacheRequest,
AdminRequest,
AdminSchema,
} from "@connext/nxtp-utils";
import fastify, { FastifyInstance, FastifyReply } from "fastify";
import { register } from "prom-client";

Expand All @@ -11,6 +18,8 @@ export const bindServer = async (): Promise<FastifyInstance> => {

server.get("/ping", (_, res) => api.get.ping(res));

server.get("/nonce", (_, res) => api.get.nonce(res));

server.get("/metrics", (_, res) => api.get.metrics(res));

server.post<{ Body: ClearCacheRequest }>(
Expand All @@ -19,20 +28,24 @@ export const bindServer = async (): Promise<FastifyInstance> => {
async (req, res) => api.auth.admin(req.body, res, api.post.clearCache),
);

server.post<{ Body: AdminRequest }>("/nonce", { schema: { body: AdminSchema } }, async (req, res) =>
api.auth.admin(req.body, res, api.post.nonce),
);

const address = await server.listen({ port: config.server.pub.port, host: config.server.pub.host });
logger.info(`Server listening at ${address}`);
return server;
};

export const api = {
auth: {
admin: (body: AdminRequest, res: FastifyReply, nested: (res: FastifyReply) => Promise<void>) => {
admin: (body: AdminRequest, res: FastifyReply, nested: (res: FastifyReply, req?: any) => Promise<void>) => {
const { config } = getContext();
const { adminToken } = body;
if (adminToken !== config.server.adminToken) {
return res.status(401).send("Unauthorized to perform this operation");
}
return nested(res);
return nested(res, body.additions);
},
},
get: {
Expand All @@ -50,6 +63,24 @@ export const api = {
return res.status(500).send(json);
}
},
nonce: async (res: FastifyReply) => {
const {
config: { chains },
logger,
adapters: { cache },
} = getContext();
try {
const result: any = {};
for (const domain of Object.keys(chains)) {
result[domain] = await cache.transfers.getLatestNonce(domain);
}
return res.status(200).send(JSON.stringify(result));
} catch (e: unknown) {
const json = jsonifyError(e as NxtpError);
logger.error("Failed to getLatestNonce", undefined, undefined, json);
return res.status(500).send(json);
}
},
},
post: {
clearCache: async (res: FastifyReply) => {
Expand All @@ -59,5 +90,13 @@ export const api = {
await cache.transfers.clear();
return res.status(200).send();
},
nonce: async (res: FastifyReply, req: any) => {
const {
adapters: { cache },
} = getContext();
const { domain, nonce } = req;
await cache.transfers.setLatestNonce(domain as string, nonce as number);
return res.status(200).send();
},
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const bindSubscriber = async (queueName: string, channel: Broker.Channel)
logger.info("Binding subscriber for queue", requestContext, methodContext, { queue: queueName });
interval(async () => {
if (numberOfChild < maxChildCount) {
logger.debug("Trying to pull data from the queue", requestContext, methodContext);
logger.debug("Trying to pull data from the queue", requestContext, methodContext, { waitPeriod });
try {
const messages: Broker.GetMessage[] = [];
for (let i = 0; i < batchSize; i++) {
Expand Down
8 changes: 5 additions & 3 deletions packages/agents/sequencer/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ export const getEnvConfig = (
executer: {
batchSize: process.env.EXECUTER_BATCH_SIZE
? Number(process.env.EXECUTER_BATCH_SIZE)
: DEFAULT_EXECUTER_BATCH_SIZE,
: undefined || configFile?.executer?.batchSize || DEFAULT_EXECUTER_BATCH_SIZE,
maxChildCount: process.env.MAX_CHILD_PROCESS
? Number(process.env.MAX_CHILD_PROCESS)
: DEFAULT_CHILD_PROCESS_COUNT,
waitPeriod: process.env.WAIT_PERIOD ? Number(process.env.WAIT_PERIOD) : DEFAULT_EXECUTER_WAIT_PERIOD,
: undefined || configFile?.executer?.maxChildCount || DEFAULT_CHILD_PROCESS_COUNT,
waitPeriod: process.env.WAIT_PERIOD
? Number(process.env.WAIT_PERIOD)
: undefined || configFile?.executer?.waitPeriod || DEFAULT_EXECUTER_WAIT_PERIOD,
},
};

Expand Down
2 changes: 1 addition & 1 deletion packages/agents/sequencer/src/lib/entities/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export const TMQExchangeConfig = Type.Object({
export const TMQQueueConfig = Type.Object({
name: Type.String(),
limit: Type.Integer(),
queueLimit: Type.Integer(),
queueLimit: Type.Optional(Type.Integer()),
deadLetter: Type.Optional(Type.String()),
subscribe: Type.Boolean(),
});
Expand Down
5 changes: 5 additions & 0 deletions packages/agents/sequencer/test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ export const mock = {
database: {
url: "http://example.com",
},
executer: {
batchSize: 100,
maxChildCount: 5,
waitPeriod: 3000,
},
}),
adapters: {
cache: (): SinonStubbedInstance<StoreManager> => {
Expand Down
1 change: 1 addition & 0 deletions packages/utils/src/types/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export enum ExecStatus {
/// MARK - Shared API
export const AdminSchema = Type.Object({
adminToken: Type.String(),
additions: Type.Optional(Type.Any()),
});
export type AdminRequest = Static<typeof AdminSchema>;

Expand Down

0 comments on commit ca03018

Please sign in to comment.