Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dedup the aggregated root in propose #5070

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions packages/adapters/database/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -929,12 +929,12 @@ export const getBaseAggregateRootCount = async (
};

export const getAggregateRootCount = async (
aggreateRoot: string,
aggregateRoot: string,
_pool?: Pool | db.TxnClientForRepeatableRead,
): Promise<number | undefined> => {
const poolToUse = _pool ?? pool;
// Get the leaf count at the aggregated root
const root = await db.selectOne("propagated_roots", { aggregate_root: aggreateRoot }).run(poolToUse);
const root = await db.selectOne("propagated_roots", { aggregate_root: aggregateRoot }).run(poolToUse);
return root ? convertFromDbPropagatedRoot(root).count : undefined;
};

Expand Down Expand Up @@ -1096,6 +1096,16 @@ export const getCurrentProposedOptimisticRoot = async (
return opRoot ? convertFromDbSpokeOptimisticRoot(opRoot) : undefined;
};

export const getSpokeOptimisticRoot = async (
root: string,
domain: string,
_pool?: Pool | db.TxnClientForRepeatableRead,
): Promise<SpokeOptimisticRoot | undefined> => {
const poolToUse = _pool ?? pool;
const opRoot = await db.selectOne("spoke_optimistic_roots", { root, domain }).run(poolToUse);
return opRoot ? convertFromDbSpokeOptimisticRoot(opRoot) : undefined;
};

export const savePropagatedOptimisticRoots = async (
roots: OptimisticRootPropagated[],
_pool?: Pool | db.TxnClientForRepeatableRead,
Expand Down
8 changes: 8 additions & 0 deletions packages/adapters/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import {
saveProposedSpokeRoots,
saveFinalizedSpokeRoots,
getCurrentProposedOptimisticRoot,
getSpokeOptimisticRoot,
} from "./client";

export * as db from "zapatos/db";
Expand Down Expand Up @@ -355,6 +356,11 @@ export type Database = {
domain: string,
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<SpokeOptimisticRoot | undefined>;
getSpokeOptimisticRoot: (
root: string,
domain: string,
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<SpokeOptimisticRoot | undefined>;
};

export let pool: Pool;
Expand Down Expand Up @@ -443,6 +449,7 @@ export const getDatabase = async (databaseUrl: string, logger: Logger): Promise<
saveProposedSpokeRoots,
saveFinalizedSpokeRoots,
getCurrentProposedOptimisticRoot,
getSpokeOptimisticRoot,
};
};

Expand Down Expand Up @@ -536,6 +543,7 @@ export const getDatabaseAndPool = async (
saveProposedSpokeRoots,
saveFinalizedSpokeRoots,
getCurrentProposedOptimisticRoot,
getSpokeOptimisticRoot,
},
};
};
Expand Down
69 changes: 67 additions & 2 deletions packages/adapters/database/test/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
Snapshot,
OptimisticRootPropagated,
OptimisticRootFinalized,
SpokeOptimisticRoot,
RouterDailyTVL,
} from "@connext/nxtp-utils";
import { Pool } from "pg";
Expand All @@ -48,6 +49,8 @@ import {
saveAggregatedRoots,
saveReceivedAggregateRoot,
savePropagatedRoots,
saveProposedSpokeRoots,
saveFinalizedSpokeRoots,
getHubNode,
getHubNodes,
putRoot,
Expand Down Expand Up @@ -87,8 +90,11 @@ import {
getMessageByLeaf,
deleteNonExistTransfers,
getAggregateRoots,
getCurrentProposedOptimisticRoot,
getSpokeOptimisticRoot,
saveSnapshotRoots,
getLatestPendingSnapshotRootByDomain,
getLatestPendingSpokeOptimisticRootByDomain,
saveProposedSnapshots,
savePropagatedOptimisticRoots,
getCurrentProposedSnapshot,
Expand Down Expand Up @@ -134,6 +140,7 @@ describe("Database client", () => {
await pool.query("DELETE FROM snapshot_roots CASCADE");
await pool.query("DELETE FROM snapshots CASCADE");
await pool.query("DELETE FROM stableswap_exchanges CASCADE");
await pool.query("DELETE FROM spoke_optimistic_roots CASCADE");
await pool.end();
restore();
reset();
Expand Down Expand Up @@ -1121,6 +1128,8 @@ describe("Database client", () => {
.not.be.rejected;
await expect(saveAggregatedRoots(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(savePropagatedRoots(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(saveProposedSpokeRoots(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(saveFinalizedSpokeRoots(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getTransfersByTransferIds(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(
getUnProcessedMessages(
Expand All @@ -1133,12 +1142,15 @@ describe("Database client", () => {
),
).to.eventually.not.be.rejected;
await expect(getAggregateRoot(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getCurrentProposedOptimisticRoot(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getSpokeOptimisticRoot(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getAggregateRootCount(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getBaseAggregateRootCount(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getMessageRootIndex(undefined as any, undefined as any, undefined as any)).to.eventually.not.be
.rejected;
await expect(getMessageRootAggregatedFromIndex(undefined as any, undefined as any, undefined as any)).to.eventually
.not.be.rejected;
await expect(getMessageRootAggregatedFromIndex(undefined as any, undefined as any, undefined as any)).to.eventually;
await expect(getLatestPendingSpokeOptimisticRootByDomain(undefined as any, undefined as any)).to.eventually.not.be
.rejected;
await expect(getMessageRootCount(undefined as any, undefined as any, undefined as any)).to.eventually.not.be
.rejected;
await expect(getSpokeNode(undefined as any, undefined as any, undefined as any, undefined as any)).to.eventually.not
Expand Down Expand Up @@ -1670,6 +1682,59 @@ describe("Database client", () => {
expect(queryRes.rows.length).to.eq(finalizedRoots.length);
});

it("should save and get spoke optimistic roots", async () => {
const spokeOptimisticRoots: SpokeOptimisticRoot[] = [];
for (let _i = 0; _i < batchSize; _i++) {
const m = mock.entity.spokeOptimisticRoot();
m.id = `${_i}`;
m.rootTimestamp = _i;
spokeOptimisticRoots.push(m);
}
await saveProposedSpokeRoots(spokeOptimisticRoots, pool);

const latestSpokeRoot = await getCurrentProposedOptimisticRoot(spokeOptimisticRoots[batchSize - 1].domain, pool);
expect(latestSpokeRoot!.id).to.eq(spokeOptimisticRoots[batchSize - 1].id);

const missingDbSpokeRoot = await getLatestPendingSpokeOptimisticRootByDomain("", pool);
expect(missingDbSpokeRoot).to.eq(undefined);
});

it("should save and get finalized spoke optimistic roots", async () => {
const spokeOptimisticRoots: SpokeOptimisticRoot[] = [];
const finalizedRoots: OptimisticRootFinalized[] = [];
for (let _i = 0; _i < batchSize; _i++) {
const m = mock.entity.optimisticRootFinalized();
m.id = `${_i}`;
m.aggregateRoot = `${_i}`;
finalizedRoots.push(m);
const s = mock.entity.spokeOptimisticRoot();
s.id = `${_i}`;
s.aggregateRoot = `${_i}`;
spokeOptimisticRoots.push(s);
}
await saveProposedSpokeRoots(spokeOptimisticRoots, pool);
await saveFinalizedSpokeRoots(mock.domain.A, finalizedRoots, pool);

const latestSpokeRoot = await getLatestPendingSpokeOptimisticRootByDomain(
spokeOptimisticRoots[batchSize - 1].domain,
pool,
);
expect(latestSpokeRoot!.id).to.eq(spokeOptimisticRoots[batchSize - 1].id);

const missingDbLatestSpokeRoot = await getLatestPendingSpokeOptimisticRootByDomain("", pool);
expect(missingDbLatestSpokeRoot).to.eq(undefined);

const spokeRoot = await getSpokeOptimisticRoot(
spokeOptimisticRoots[batchSize - 1].aggregateRoot,
spokeOptimisticRoots[batchSize - 1].domain,
pool,
);
expect(spokeRoot!.id).to.eq(spokeOptimisticRoots[batchSize - 1].id);

const missingDbSpokeRoot = await getSpokeOptimisticRoot("", "", pool);
expect(missingDbSpokeRoot).to.eq(undefined);
});

it("should save and get RouterDailyTVL", async () => {
const tvls: RouterDailyTVL[] = [];
for (let _i = 0; _i < batchSize; _i++) {
Expand Down
1 change: 1 addition & 0 deletions packages/adapters/database/test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,6 @@ export const mockDatabase = (): Database => {
getRootMessage: stub().resolves(),
getLatestPendingSnapshotRootByDomain: stub().resolves([mock.entity.snapshotRoot().root]),
getCurrentProposedOptimisticRoot: stub().resolves([mock.entity.spokeOptimisticRoot()]),
getSpokeOptimisticRoot: stub().resolves([mock.entity.spokeOptimisticRoot()]),
};
};
6 changes: 6 additions & 0 deletions packages/agents/lighthouse/src/errors/prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,9 @@ export class NoDomainInSnapshot extends NxtpError {
super(`No domain ${originDomain} found in snapshot ${snapshot}`, context, NoDomainInSnapshot.name);
}
}

export class AggregateRootDuplicated extends NxtpError {
constructor(aggregateRoot: string, context: any = {}) {
super(`Trying to propose same aggregate root for ${aggregateRoot}`, context, AggregateRootDuplicated.name);
}
}
30 changes: 30 additions & 0 deletions packages/agents/lighthouse/src/tasks/propose/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,33 @@ export class NoRootTimestamp extends NxtpError {
});
}
}

export class AggregateRootDuplicated extends NxtpError {
constructor(
public readonly aggregateRoot: string,
public readonly requestContext: RequestContext,
public readonly methodContext: MethodContext,
public readonly context: any = {},
) {
super(`Trying to send the same aggregateRoot: ${aggregateRoot}`, {
...context,
requestContext,
methodContext,
});
}
}

export class AggregateRootChecksFailed extends NxtpError {
constructor(
public readonly aggregateRoot: string,
public readonly requestContext: RequestContext,
public readonly methodContext: MethodContext,
public readonly context: any = {},
) {
super(`Root checks failed for aggregate root ${aggregateRoot}`, {
...context,
requestContext,
methodContext,
});
}
}
101 changes: 101 additions & 0 deletions packages/agents/lighthouse/src/tasks/propose/operations/propose.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import {
NoSnapshotRoot,
NoSpokeConnector,
NoMerkleTreeAddress,
AggregateRootDuplicated,
AggregateRootChecksFailed,
} from "../errors";
import { getContext } from "../propose";
import { OptimisticHubDBHelper } from "../adapters";
Expand Down Expand Up @@ -190,6 +192,7 @@ export const proposeSnapshot = async (
if (baseAggregateRootCount === undefined) {
throw new NoBaseAggregateRootCount(baseAggregateRoot);
}

const baseAggregateRoots: string[] = await database.getAggregateRoots(baseAggregateRootCount);
const aggregateRootCount = baseAggregateRootCount + snapshotRoots.length;
const opRoots = baseAggregateRoots.concat(snapshotRoots);
Expand All @@ -199,6 +202,16 @@ export const proposeSnapshot = async (
const hubSMT = new SparseMerkleTree(hubStore);
const aggregateRoot = await hubSMT.getRoot();

const snapshot = await database.getPendingAggregateRoot(aggregateRoot);
if (snapshot) {
throw new AggregateRootDuplicated(aggregateRoot, requestContext, methodContext);
}

const rootChecks = await aggregateRootCheck(aggregateRoot, requestContext);
if (!rootChecks) {
throw new AggregateRootChecksFailed(aggregateRoot, requestContext, methodContext);
}

const proposal = { snapshotId, aggregateRoot, snapshotRoots, orderedDomains };

// TODO: Sign the proposal -- need signature from whitelisted proposer agent
Expand Down Expand Up @@ -241,3 +254,91 @@ export const proposeSnapshot = async (
});
}
};

export const aggregateRootCheck = async (aggregateRoot: string, _requestContext: RequestContext): Promise<boolean> => {
const {
logger,
adapters: { contracts, database, chainreader },
config,
} = getContext();
const { requestContext, methodContext } = createLoggingContext("proposeSnapshot", _requestContext);

const rootManagerAddress = config.chains[config.hubDomain].deployments.rootManager;
//
const encodedTimestampData = contracts.rootManager.encodeFunctionData("lastSavedAggregateRootTimestamp");
let _rootTimestamp: any;
try {
const idResultData = await chainreader.readTx({
domain: +config.hubDomain,
to: rootManagerAddress,
data: encodedTimestampData,
});

_rootTimestamp = contracts.rootManager.decodeFunctionResult("lastSavedAggregateRootTimestamp", idResultData);
} catch (err: unknown) {
logger.error(
"Failed to read the lastSavedAggregateRootTimestamp",
requestContext,
methodContext,
jsonifyError(err as NxtpError),
{ _rootTimestamp },
);
// Cannot proceed without the latest lastSavedAggregateRootTimestamp.
return false;
}
if (!_rootTimestamp) {
// Cannot proceed without the lastSavedAggregateRootTimestamp.
return false;
}
const rootTimestamp = BigNumber.from(_rootTimestamp).toString();

const encodedData = contracts.rootManager.encodeFunctionData("validAggregateRoots", [rootTimestamp]);
let _onChainRoot: any;
try {
const idResultData = await chainreader.readTx({
domain: +config.hubDomain,
to: rootManagerAddress,
data: encodedData,
});

_onChainRoot = contracts.rootManager.decodeFunctionResult("validAggregateRoots", idResultData);
} catch (err: unknown) {
logger.error(
"Failed to read the validated aggregate root ",
requestContext,
methodContext,
jsonifyError(err as NxtpError),
{ rootTimestamp },
);
// Cannot proceed without the valid aggregate root for lastSavedAggregateRootTimestamp.
return false;
}
if (!_onChainRoot) {
// Cannot proceed without the valid aggregate root for lastSavedAggregateRootTimestamp.
return false;
}
logger.info("Got the validated aggregate root from onchain", requestContext, methodContext, {
_onChainRoot,
});

const onChainRoot = _onChainRoot as string;

if (_onChainRoot === aggregateRoot) {
logger.info("Stop propose. Found onchain root same as proposed root", requestContext, methodContext, {
aggregateRoot,
});
return false;
}

const snapshot = await database.getPendingAggregateRoot(onChainRoot);
if (!snapshot) {
// This can happen when DB and/or subgraph is out of sync
logger.info("Stop propose. Onchain root not found in db", requestContext, methodContext, {
aggregateRoot,
});
return false;
}

// All checks passed, can propose the aggregate root.
return true;
};
Loading