Skip to content

Commit

Permalink
Merge pull request #5070 from connext/dedup-propose-aggregateroot
Browse files Browse the repository at this point in the history
feat: dedup the aggregated root in propose
  • Loading branch information
wanglonghong authored Oct 30, 2023
2 parents fe01a45 + 993592a commit 3c0f55a
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 9 deletions.
14 changes: 12 additions & 2 deletions packages/adapters/database/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -929,12 +929,12 @@ export const getBaseAggregateRootCount = async (
};

export const getAggregateRootCount = async (
aggreateRoot: string,
aggregateRoot: string,
_pool?: Pool | db.TxnClientForRepeatableRead,
): Promise<number | undefined> => {
const poolToUse = _pool ?? pool;
// Get the leaf count at the aggregated root
const root = await db.selectOne("propagated_roots", { aggregate_root: aggreateRoot }).run(poolToUse);
const root = await db.selectOne("propagated_roots", { aggregate_root: aggregateRoot }).run(poolToUse);
return root ? convertFromDbPropagatedRoot(root).count : undefined;
};

Expand Down Expand Up @@ -1096,6 +1096,16 @@ export const getCurrentProposedOptimisticRoot = async (
return opRoot ? convertFromDbSpokeOptimisticRoot(opRoot) : undefined;
};

export const getSpokeOptimisticRoot = async (
root: string,
domain: string,
_pool?: Pool | db.TxnClientForRepeatableRead,
): Promise<SpokeOptimisticRoot | undefined> => {
const poolToUse = _pool ?? pool;
const opRoot = await db.selectOne("spoke_optimistic_roots", { root, domain }).run(poolToUse);
return opRoot ? convertFromDbSpokeOptimisticRoot(opRoot) : undefined;
};

export const savePropagatedOptimisticRoots = async (
roots: OptimisticRootPropagated[],
_pool?: Pool | db.TxnClientForRepeatableRead,
Expand Down
8 changes: 8 additions & 0 deletions packages/adapters/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import {
saveProposedSpokeRoots,
saveFinalizedSpokeRoots,
getCurrentProposedOptimisticRoot,
getSpokeOptimisticRoot,
} from "./client";

export * as db from "zapatos/db";
Expand Down Expand Up @@ -355,6 +356,11 @@ export type Database = {
domain: string,
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<SpokeOptimisticRoot | undefined>;
getSpokeOptimisticRoot: (
root: string,
domain: string,
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<SpokeOptimisticRoot | undefined>;
};

export let pool: Pool;
Expand Down Expand Up @@ -443,6 +449,7 @@ export const getDatabase = async (databaseUrl: string, logger: Logger): Promise<
saveProposedSpokeRoots,
saveFinalizedSpokeRoots,
getCurrentProposedOptimisticRoot,
getSpokeOptimisticRoot,
};
};

Expand Down Expand Up @@ -536,6 +543,7 @@ export const getDatabaseAndPool = async (
saveProposedSpokeRoots,
saveFinalizedSpokeRoots,
getCurrentProposedOptimisticRoot,
getSpokeOptimisticRoot,
},
};
};
Expand Down
69 changes: 67 additions & 2 deletions packages/adapters/database/test/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
Snapshot,
OptimisticRootPropagated,
OptimisticRootFinalized,
SpokeOptimisticRoot,
RouterDailyTVL,
} from "@connext/nxtp-utils";
import { Pool } from "pg";
Expand All @@ -48,6 +49,8 @@ import {
saveAggregatedRoots,
saveReceivedAggregateRoot,
savePropagatedRoots,
saveProposedSpokeRoots,
saveFinalizedSpokeRoots,
getHubNode,
getHubNodes,
putRoot,
Expand Down Expand Up @@ -87,8 +90,11 @@ import {
getMessageByLeaf,
deleteNonExistTransfers,
getAggregateRoots,
getCurrentProposedOptimisticRoot,
getSpokeOptimisticRoot,
saveSnapshotRoots,
getLatestPendingSnapshotRootByDomain,
getLatestPendingSpokeOptimisticRootByDomain,
saveProposedSnapshots,
savePropagatedOptimisticRoots,
getCurrentProposedSnapshot,
Expand Down Expand Up @@ -134,6 +140,7 @@ describe("Database client", () => {
await pool.query("DELETE FROM snapshot_roots CASCADE");
await pool.query("DELETE FROM snapshots CASCADE");
await pool.query("DELETE FROM stableswap_exchanges CASCADE");
await pool.query("DELETE FROM spoke_optimistic_roots CASCADE");
await pool.end();
restore();
reset();
Expand Down Expand Up @@ -1121,6 +1128,8 @@ describe("Database client", () => {
.not.be.rejected;
await expect(saveAggregatedRoots(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(savePropagatedRoots(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(saveProposedSpokeRoots(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(saveFinalizedSpokeRoots(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getTransfersByTransferIds(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(
getUnProcessedMessages(
Expand All @@ -1133,12 +1142,15 @@ describe("Database client", () => {
),
).to.eventually.not.be.rejected;
await expect(getAggregateRoot(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getCurrentProposedOptimisticRoot(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getSpokeOptimisticRoot(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getAggregateRootCount(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getBaseAggregateRootCount(undefined as any, undefined as any)).to.eventually.not.be.rejected;
await expect(getMessageRootIndex(undefined as any, undefined as any, undefined as any)).to.eventually.not.be
.rejected;
await expect(getMessageRootAggregatedFromIndex(undefined as any, undefined as any, undefined as any)).to.eventually
.not.be.rejected;
await expect(getMessageRootAggregatedFromIndex(undefined as any, undefined as any, undefined as any)).to.eventually;
await expect(getLatestPendingSpokeOptimisticRootByDomain(undefined as any, undefined as any)).to.eventually.not.be
.rejected;
await expect(getMessageRootCount(undefined as any, undefined as any, undefined as any)).to.eventually.not.be
.rejected;
await expect(getSpokeNode(undefined as any, undefined as any, undefined as any, undefined as any)).to.eventually.not
Expand Down Expand Up @@ -1670,6 +1682,59 @@ describe("Database client", () => {
expect(queryRes.rows.length).to.eq(finalizedRoots.length);
});

it("should save and get spoke optimistic roots", async () => {
const spokeOptimisticRoots: SpokeOptimisticRoot[] = [];
for (let _i = 0; _i < batchSize; _i++) {
const m = mock.entity.spokeOptimisticRoot();
m.id = `${_i}`;
m.rootTimestamp = _i;
spokeOptimisticRoots.push(m);
}
await saveProposedSpokeRoots(spokeOptimisticRoots, pool);

const latestSpokeRoot = await getCurrentProposedOptimisticRoot(spokeOptimisticRoots[batchSize - 1].domain, pool);
expect(latestSpokeRoot!.id).to.eq(spokeOptimisticRoots[batchSize - 1].id);

const missingDbSpokeRoot = await getLatestPendingSpokeOptimisticRootByDomain("", pool);
expect(missingDbSpokeRoot).to.eq(undefined);
});

it("should save and get finalized spoke optimistic roots", async () => {
const spokeOptimisticRoots: SpokeOptimisticRoot[] = [];
const finalizedRoots: OptimisticRootFinalized[] = [];
for (let _i = 0; _i < batchSize; _i++) {
const m = mock.entity.optimisticRootFinalized();
m.id = `${_i}`;
m.aggregateRoot = `${_i}`;
finalizedRoots.push(m);
const s = mock.entity.spokeOptimisticRoot();
s.id = `${_i}`;
s.aggregateRoot = `${_i}`;
spokeOptimisticRoots.push(s);
}
await saveProposedSpokeRoots(spokeOptimisticRoots, pool);
await saveFinalizedSpokeRoots(mock.domain.A, finalizedRoots, pool);

const latestSpokeRoot = await getLatestPendingSpokeOptimisticRootByDomain(
spokeOptimisticRoots[batchSize - 1].domain,
pool,
);
expect(latestSpokeRoot!.id).to.eq(spokeOptimisticRoots[batchSize - 1].id);

const missingDbLatestSpokeRoot = await getLatestPendingSpokeOptimisticRootByDomain("", pool);
expect(missingDbLatestSpokeRoot).to.eq(undefined);

const spokeRoot = await getSpokeOptimisticRoot(
spokeOptimisticRoots[batchSize - 1].aggregateRoot,
spokeOptimisticRoots[batchSize - 1].domain,
pool,
);
expect(spokeRoot!.id).to.eq(spokeOptimisticRoots[batchSize - 1].id);

const missingDbSpokeRoot = await getSpokeOptimisticRoot("", "", pool);
expect(missingDbSpokeRoot).to.eq(undefined);
});

it("should save and get RouterDailyTVL", async () => {
const tvls: RouterDailyTVL[] = [];
for (let _i = 0; _i < batchSize; _i++) {
Expand Down
1 change: 1 addition & 0 deletions packages/adapters/database/test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,6 @@ export const mockDatabase = (): Database => {
getRootMessage: stub().resolves(),
getLatestPendingSnapshotRootByDomain: stub().resolves([mock.entity.snapshotRoot().root]),
getCurrentProposedOptimisticRoot: stub().resolves([mock.entity.spokeOptimisticRoot()]),
getSpokeOptimisticRoot: stub().resolves([mock.entity.spokeOptimisticRoot()]),
};
};
6 changes: 6 additions & 0 deletions packages/agents/lighthouse/src/errors/prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,9 @@ export class NoDomainInSnapshot extends NxtpError {
super(`No domain ${originDomain} found in snapshot ${snapshot}`, context, NoDomainInSnapshot.name);
}
}

export class AggregateRootDuplicated extends NxtpError {
constructor(aggregateRoot: string, context: any = {}) {
super(`Trying to propose same aggregate root for ${aggregateRoot}`, context, AggregateRootDuplicated.name);
}
}
30 changes: 30 additions & 0 deletions packages/agents/lighthouse/src/tasks/propose/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,33 @@ export class NoRootTimestamp extends NxtpError {
});
}
}

export class AggregateRootDuplicated extends NxtpError {
constructor(
public readonly aggregateRoot: string,
public readonly requestContext: RequestContext,
public readonly methodContext: MethodContext,
public readonly context: any = {},
) {
super(`Trying to send the same aggregateRoot: ${aggregateRoot}`, {
...context,
requestContext,
methodContext,
});
}
}

export class AggregateRootChecksFailed extends NxtpError {
constructor(
public readonly aggregateRoot: string,
public readonly requestContext: RequestContext,
public readonly methodContext: MethodContext,
public readonly context: any = {},
) {
super(`Root checks failed for aggregate root ${aggregateRoot}`, {
...context,
requestContext,
methodContext,
});
}
}
101 changes: 101 additions & 0 deletions packages/agents/lighthouse/src/tasks/propose/operations/propose.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import {
NoSnapshotRoot,
NoSpokeConnector,
NoMerkleTreeAddress,
AggregateRootDuplicated,
AggregateRootChecksFailed,
} from "../errors";
import { getContext } from "../propose";
import { OptimisticHubDBHelper } from "../adapters";
Expand Down Expand Up @@ -190,6 +192,7 @@ export const proposeSnapshot = async (
if (baseAggregateRootCount === undefined) {
throw new NoBaseAggregateRootCount(baseAggregateRoot);
}

const baseAggregateRoots: string[] = await database.getAggregateRoots(baseAggregateRootCount);
const aggregateRootCount = baseAggregateRootCount + snapshotRoots.length;
const opRoots = baseAggregateRoots.concat(snapshotRoots);
Expand All @@ -199,6 +202,16 @@ export const proposeSnapshot = async (
const hubSMT = new SparseMerkleTree(hubStore);
const aggregateRoot = await hubSMT.getRoot();

const snapshot = await database.getPendingAggregateRoot(aggregateRoot);
if (snapshot) {
throw new AggregateRootDuplicated(aggregateRoot, requestContext, methodContext);
}

const rootChecks = await aggregateRootCheck(aggregateRoot, requestContext);
if (!rootChecks) {
throw new AggregateRootChecksFailed(aggregateRoot, requestContext, methodContext);
}

const proposal = { snapshotId, aggregateRoot, snapshotRoots, orderedDomains };

// TODO: Sign the proposal -- need signature from whitelisted proposer agent
Expand Down Expand Up @@ -241,3 +254,91 @@ export const proposeSnapshot = async (
});
}
};

export const aggregateRootCheck = async (aggregateRoot: string, _requestContext: RequestContext): Promise<boolean> => {
const {
logger,
adapters: { contracts, database, chainreader },
config,
} = getContext();
const { requestContext, methodContext } = createLoggingContext("proposeSnapshot", _requestContext);

const rootManagerAddress = config.chains[config.hubDomain].deployments.rootManager;
//
const encodedTimestampData = contracts.rootManager.encodeFunctionData("lastSavedAggregateRootTimestamp");
let _rootTimestamp: any;
try {
const idResultData = await chainreader.readTx({
domain: +config.hubDomain,
to: rootManagerAddress,
data: encodedTimestampData,
});

_rootTimestamp = contracts.rootManager.decodeFunctionResult("lastSavedAggregateRootTimestamp", idResultData);
} catch (err: unknown) {
logger.error(
"Failed to read the lastSavedAggregateRootTimestamp",
requestContext,
methodContext,
jsonifyError(err as NxtpError),
{ _rootTimestamp },
);
// Cannot proceed without the latest lastSavedAggregateRootTimestamp.
return false;
}
if (!_rootTimestamp) {
// Cannot proceed without the lastSavedAggregateRootTimestamp.
return false;
}
const rootTimestamp = BigNumber.from(_rootTimestamp).toString();

const encodedData = contracts.rootManager.encodeFunctionData("validAggregateRoots", [rootTimestamp]);
let _onChainRoot: any;
try {
const idResultData = await chainreader.readTx({
domain: +config.hubDomain,
to: rootManagerAddress,
data: encodedData,
});

_onChainRoot = contracts.rootManager.decodeFunctionResult("validAggregateRoots", idResultData);
} catch (err: unknown) {
logger.error(
"Failed to read the validated aggregate root ",
requestContext,
methodContext,
jsonifyError(err as NxtpError),
{ rootTimestamp },
);
// Cannot proceed without the valid aggregate root for lastSavedAggregateRootTimestamp.
return false;
}
if (!_onChainRoot) {
// Cannot proceed without the valid aggregate root for lastSavedAggregateRootTimestamp.
return false;
}
logger.info("Got the validated aggregate root from onchain", requestContext, methodContext, {
_onChainRoot,
});

const onChainRoot = _onChainRoot as string;

if (_onChainRoot === aggregateRoot) {
logger.info("Stop propose. Found onchain root same as proposed root", requestContext, methodContext, {
aggregateRoot,
});
return false;
}

const snapshot = await database.getPendingAggregateRoot(onChainRoot);
if (!snapshot) {
// This can happen when DB and/or subgraph is out of sync
logger.info("Stop propose. Onchain root not found in db", requestContext, methodContext, {
aggregateRoot,
});
return false;
}

// All checks passed, can propose the aggregate root.
return true;
};
Loading

0 comments on commit 3c0f55a

Please sign in to comment.