Skip to content

Commit

Permalink
Merge pull request #4593 from connext/clear_cache_locally
Browse files Browse the repository at this point in the history
fix: delete cache only locally and update config
  • Loading branch information
preethamr authored Jun 27, 2023
2 parents 14f2ef2 + 9384722 commit a0752d6
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 6 deletions.
4 changes: 2 additions & 2 deletions ops/testnet/prod/core/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ module "lighthouse_prover_cron" {
container_env_vars = merge(local.lighthouse_env_vars, {
LIGHTHOUSE_SERVICE = "prover-pub"
})
schedule_expression = "rate(15 minutes)"
schedule_expression = "rate(5 minutes)"
timeout = 900
memory_size = 10240
lambda_in_vpc = true
Expand Down Expand Up @@ -337,7 +337,7 @@ module "lighthouse_prover_subscriber" {
health_check_path = "/ping"
container_port = 7072
loadbalancer_port = 80
cpu = 1024
cpu = 2048
memory = 4096
instance_count = 10
timeout = 180
Expand Down
36 changes: 36 additions & 0 deletions packages/adapters/cache/src/lib/caches/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ export class MessagesCache extends Cache {
return (await this.data.hget(`${this.prefix}:${domain}`, index.toString())) ?? undefined;
}

/**
* Deletes leaf of a domain at index.
*
* @param domain - Domain.
* @param index - Index.
* @returns leaf if exists, undefined if not.
*/
public async delNode(domain: string, index: number): Promise<number> {
return await this.data.hdel(`${this.prefix}:${domain}`, index.toString());
}

/**
* Stores leafs of a domain at in a range.
*
Expand All @@ -190,6 +201,19 @@ export class MessagesCache extends Cache {
return result ? (JSON.parse(result) as string[]) : undefined;
}

/**
* Deletes leafs of a domain at in a range.
*
* @param domain - Domain.
* @param start - Range start.
* @param end - Range end.
* @param nodes - Leaf string array.
* @returns 1 if deleted, 0 if not.
*/
public async delNodes(domain: string, start: number, end: number): Promise<number> {
return await this.data.hdel(`${this.prefix}:${domain}`, `${start}-${end}`);
}

/**
* Stores root of a domain at path.
*
Expand All @@ -213,6 +237,18 @@ export class MessagesCache extends Cache {
return (await this.data.hget(`${this.prefix}:${domain}`, path)) ?? undefined;
}

/**
* Deletes root of a domain at path.
*
* @param domain - Domain.
* @param path - Path in the tree to the root.
* @param root - root string.
* @returns 1 if added, 0 if not.
*/
public async delRoot(domain: string, path: string): Promise<number> {
return await this.data.hdel(`${this.prefix}:${domain}`, path);
}

/**
* Clears all leaf and roots of a domain in the cache.
*
Expand Down
27 changes: 27 additions & 0 deletions packages/adapters/cache/test/lib/caches/messages.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ describe("MessagesCache", () => {
const result = await messagesCache.getNode("1111", index);
expect(result).to.be.equal(leaf);
});
it("should del leaf if exists", async () => {
const leaf = "0x1a1";
const index = 1;
await messagesCache.putNode("1111", index, leaf);
await messagesCache.delNode("1111", index);
const result = await messagesCache.getNode("1111", index);
expect(result).to.be.equal(undefined);
});

it("should get undefined if not exist", async () => {
const result = await messagesCache.getNode("1111", 1);
Expand All @@ -77,6 +85,16 @@ describe("MessagesCache", () => {
expect(result![2]).to.be.equal(leafs[2]);
});

it("should del nodes if exists", async () => {
const leafs = ["0x1a1", "0x1a2", "0x1a3"];
const start = 1;
const end = 10;
await messagesCache.putNodes("1111", start, end, leafs);
await messagesCache.delNodes("1111", start, end);
const result = await messagesCache.getNodes("1111", start, end);
expect(result?.length).to.be.equal(undefined);
});

it("should get undefined if not exist", async () => {
const result = await messagesCache.getNodes("1111", 1, 10);
expect(result).to.be.equal(undefined);
Expand All @@ -90,6 +108,15 @@ describe("MessagesCache", () => {
expect(result).to.be.equal(root);
});

it("should del root if exists", async () => {
const root = "0x1a1";
const path = "000111000110";
await messagesCache.putRoot("1111", path, root);
await messagesCache.delRoot("1111", path);
const result = await messagesCache.getRoot("1111", path);
expect(result).to.be.equal(undefined);
});

it("should get undefined if not exist", async () => {
const result = await messagesCache.getRoot("1111", "");
expect(result).to.be.equal(undefined);
Expand Down
2 changes: 1 addition & 1 deletion packages/agents/lighthouse/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const MIN_CARTOGRAPHER_POLL_INTERVAL = 30_000;
const DEFAULT_CARTOGRAPHER_POLL_INTERVAL = 60_000;
export const DEFAULT_PROVER_BATCH_SIZE = 1;
export const DEFAULT_RELAYER_WAIT_TIME = 60_000 * 3600; // 1 hour
export const DEFAULT_PROVER_PUB_MAX = 5000;
export const DEFAULT_PROVER_PUB_MAX = 1500;

dotenvConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ export class SpokeDBHelper implements DBHelper {
}

public async clearLocalCache(): Promise<void> {
this.cachedNode;
try {
Object.keys(this.cachedNode).map(async (key) => await this.cache.delNode(this.domain, parseInt(key)));
Object.keys(this.cachedNodes).map(
async (key) => await this.cache.delNodes(this.domain, parseInt(key.split("-")[0]), parseInt(key.split("-")[1])),
);
Object.keys(this.cachedRoot).map(async (key) => await this.cache.delRoot(this.domain, key));
} catch (err: unknown) {
this.cachedNode = {};
this.cachedNodes = {};
this.cachedRoot = {};
}
this.cachedNode = {};
this.cachedNodes = {};
this.cachedRoot = {};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createLoggingContext } from "@connext/nxtp-utils";
import { createLoggingContext, ExecStatus } from "@connext/nxtp-utils";

import { getContext } from "../prover";

Expand All @@ -11,7 +11,7 @@ export const consume = async () => {
const { requestContext, methodContext } = createLoggingContext(consume.name);
const {
logger,
adapters: { mqClient },
adapters: { mqClient, cache },
config,
} = getContext();
const prefetchSize = config.messageQueue.prefetchSize ?? DEFAULT_PREFETCH_SIZE;
Expand Down Expand Up @@ -43,6 +43,8 @@ export const consume = async () => {
} catch (err: unknown) {
logger.error("Processing messaages failed", requestContext, methodContext, undefined, { err });
channel.reject(message, false);
const statuses = brokerMessage.messages.map((it) => ({ leaf: it.leaf, status: ExecStatus.None }));
await cache.messages.setStatus(statuses);
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ export const processMessages = async (brokerMessage: BrokerMessage, _requestCont
messages,
});
// Clear global tree cache
spokeStore.clearCache();
// spokeStore.clearCache();
}
logger.info("Empty message proofs", requestContext, methodContext, {
originDomain,
Expand Down
3 changes: 3 additions & 0 deletions packages/agents/lighthouse/test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ export const mockCache = () => {
getNode: stub().resolves(),
getNodes: stub().resolves(),
putNode: stub().resolves(),
delNode: stub().resolves(),
putNodes: stub().resolves(),
delNodes: stub().resolves(),
getRoot: stub().resolves(),
putRoot: stub().resolves(),
delRoot: stub().resolves(),
clearDomain: stub().resolves(),
},
};
Expand Down

0 comments on commit a0752d6

Please sign in to comment.