Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Main < Staging Sync #5398

Merged
merged 10 commits into from
Dec 20, 2023
1 change: 0 additions & 1 deletion ops/testnet/staging/backend/config.tf
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ locals {
"1735356532" = { confirmations = 1 }
"1735353714" = { confirmations = 10 }
"9991" = { confirmations = 200 }
"1734439522" = { confirmations = 1 }
}
environment = var.stage
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- migrate:up
ALTER TABLE snapshot_roots DROP CONSTRAINT snapshot_roots_pkey;
ALTER TABLE snapshot_roots
ADD CONSTRAINT snapshot_roots_pkey PRIMARY KEY (spoke_domain, root);
-- migrate:down
ALTER TABLE snapshot_roots DROP CONSTRAINT snapshot_roots_pkey;
ALTER TABLE snapshot_roots
ADD CONSTRAINT snapshot_roots_pkey PRIMARY KEY (id, spoke_domain);
5 changes: 3 additions & 2 deletions packages/adapters/database/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,7 @@ ALTER TABLE ONLY public.schema_migrations
--

ALTER TABLE ONLY public.snapshot_roots
ADD CONSTRAINT snapshot_roots_pkey PRIMARY KEY (id, spoke_domain);
ADD CONSTRAINT snapshot_roots_pkey PRIMARY KEY (spoke_domain, root);


--
Expand Down Expand Up @@ -1639,4 +1639,5 @@ INSERT INTO public.schema_migrations (version) VALUES
('20231128023332'),
('20231130084431'),
('20231219013906'),
('20231219072355');
('20231219072355'),
('20231219231640');
9 changes: 7 additions & 2 deletions packages/adapters/database/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ export const saveSnapshotRoots = async (
const roots: s.snapshot_roots.Insertable[] = _roots.map((r) => convertToDbSnapshotRoot(r)).map(sanitizeNull);

// use upsert here. if the root exists, we don't want to overwrite anything
await db.upsert("snapshot_roots", roots, ["id", "spoke_domain"], { updateColumns: [] }).run(poolToUse);
await db.upsert("snapshot_roots", roots, ["spoke_domain", "root"], { updateColumns: [] }).run(poolToUse);
};

export const saveCheckPoint = async (
Expand Down Expand Up @@ -1064,11 +1064,16 @@ export const getFinalizedSnapshot = async (

export const getLatestPendingSnapshotRootByDomain = async (
spoke_domain: number,
id: string,
_pool?: Pool | db.TxnClientForRepeatableRead,
): Promise<SnapshotRoot | undefined> => {
const poolToUse = _pool ?? pool;
const snapshot = await db
.select("snapshot_roots", { processed: false, spoke_domain }, { limit: 1, order: { by: "id", direction: "DESC" } })
.select(
"snapshot_roots",
{ processed: false, spoke_domain, id },
{ limit: 1, order: { by: "id", direction: "DESC" } },
)
.run(poolToUse);
return snapshot.length > 0 ? convertFromDbSnapshotRoot(snapshot[0]) : undefined;
};
Expand Down
1 change: 1 addition & 0 deletions packages/adapters/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ export type Database = {
) => Promise<Snapshot | undefined>;
getLatestPendingSnapshotRootByDomain: (
spoke_domain: number,
id: string,
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<SnapshotRoot | undefined>;
getAggregateRootByRootAndDomain: (
Expand Down
6 changes: 5 additions & 1 deletion packages/adapters/database/test/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,11 @@ describe("Database client", () => {
}
await saveSnapshotRoots(roots, pool);

const dbRootLast = await getLatestPendingSnapshotRootByDomain(+roots[batchSize - 1].spokeDomain, pool);
const dbRootLast = await getLatestPendingSnapshotRootByDomain(
+roots[batchSize - 1].spokeDomain,
(batchSize - 1).toString(),
pool,
);
expect(dbRootLast?.root).to.eq(roots[batchSize - 1].root);
});

Expand Down
6 changes: 3 additions & 3 deletions packages/agents/lighthouse/src/tasks/propose/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ export class AggregateRootChecksFailed extends NxtpError {
}
}

export class SubgraphDelayed extends NxtpError {
export class WaitTimeNotCompleted extends NxtpError {
constructor(
public readonly hubDomain: string,
public readonly domain: string,
public readonly requestContext: RequestContext,
public readonly methodContext: MethodContext,
public readonly context: any = {},
) {
super(`Subgraph is more than 1 snapshot behind the latest snapshot for domain ${hubDomain}`, {
super(`Wait time not completed for ${domain}`, {
...context,
requestContext,
methodContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import {
MissingRequiredDomain,
NoSnapshotRoot,
NoSpokeConnector,
SubgraphDelayed,
NoMerkleTreeAddress,
AggregateRootDuplicated,
AggregateRootChecksFailed,
WaitTimeNotCompleted,
} from "../errors";
import { getContext } from "../propose";
import { OptimisticHubDBHelper } from "../adapters";
Expand Down Expand Up @@ -70,9 +70,23 @@ export const proposeHub = async () => {
throw new NoSpokeConnector(config.hubDomain, requestContext, methodContext);
}

const latestSnapshotId: number = Math.floor(getNtpTimeSeconds() / config.snapshotDuration);
// Use N snapshot ID to ensure that the proposal submitted.
const currentTimestamp = getNtpTimeSeconds();
const latestSnapshotId: number = Math.floor(currentTimestamp / config.snapshotDuration);
const latestSnapshotTimestamp = latestSnapshotId * config.snapshotDuration;

const timeSinceSnapshotStart = currentTimestamp - latestSnapshotTimestamp;
const waitTime = config.snapshotDuration / 3;
if (timeSinceSnapshotStart < waitTime) {
// Exit if earlier than 1/3 of the snapshot duration to help accommodate time boundary conditions
logger.info("Skipping ProposeHub. Wait time not completed", requestContext, methodContext, {
remainingTime: waitTime - timeSinceSnapshotStart,
currentTimestamp,
latestSnapshotTimestamp,
});
throw new WaitTimeNotCompleted(config.hubDomain, requestContext, methodContext);
}

logger.info("Using latest snapshot ID", requestContext, methodContext, {
latestSnapshotId,
});
Expand All @@ -81,18 +95,17 @@ export const proposeHub = async () => {
const orderedSnapshotRoots: string[] = [];
const snapshotRoots: Map<string, string> = new Map();

// Sort the snapshot roots in the order of root manager domains
await Promise.all(
rootManagerDomains.map(async (domain) => {
let latestSnapshotRoot: string;
const snapshotRoot = await database.getLatestPendingSnapshotRootByDomain(+domain);
if (!snapshotRoot) {
throw new NoSnapshotRoot(domain, requestContext, methodContext);
}
const latestDbSnapshotId = +snapshotRoot.id;
latestSnapshotRoot = await getCurrentOutboundRoot(domain, requestContext);
let domainSnapshotRoot;
const snapshotRoot = await database.getLatestPendingSnapshotRootByDomain(+domain, latestSnapshotId.toString());
if (snapshotRoot) {
logger.debug("Found snapshot root in db", requestContext, methodContext, { snapshotRoot, domain });
domainSnapshotRoot = snapshotRoot.root;
} else {
const domainOutboundRoot = await getCurrentOutboundRoot(domain, requestContext);

/* Handle boundary case where snapshot for the outbound root is not yet saved.
/* Handle boundary case where snapshot for the outbound root is not yet saved.
There is a window between the time LH proposes the outbound root and watcher reads the spoke connector contract,
in which if there is a new xcall the outbound root gets updated. We were exposed to false alarm by watcher.

Expand All @@ -102,57 +115,63 @@ export const proposeHub = async () => {
which will be the outboundRroot because LH waited
and ensured it was the outboundRoot that will be saved as snapshotRoot if and when there is an xcall
2. Watcher does not find the snapshotId of the proposal, as there was no xcall and hence no save,
and instead gets exact same outbound root from the spoke connector contract
and instead gets the exact same outbound root from the spoke connector contract
3. Ensure that the snapshot root is not older than the latest snapshot timestamp
*/
const outboundRootTimestamp = await database.getOutboundRootTimestamp(domain, latestSnapshotRoot);
if (
latestSnapshotId > latestDbSnapshotId &&
outboundRootTimestamp &&
outboundRootTimestamp < latestSnapshotTimestamp
) {
const messageRootCount = await database.getMessageRootCount(domain, latestSnapshotRoot);
if (
messageRootCount &&
snapshotRoot.root.toLowerCase() != latestSnapshotRoot.toLowerCase() &&
snapshotRoot.count != messageRootCount
) {
logger.debug("Storing the virtual snapshot root in the db", requestContext, methodContext, {
domain,
count: messageRootCount + 1,
latestDbSnapshotId,
latestSnapshotId,
});
await database.saveSnapshotRoots([
{
id: latestSnapshotId.toString(),
spokeDomain: +domain,
root: latestSnapshotRoot,
const outboundRootTimestamp = await database.getOutboundRootTimestamp(domain, domainOutboundRoot);
if (outboundRootTimestamp && outboundRootTimestamp < latestSnapshotTimestamp) {
const messageRootCount = await database.getMessageRootCount(domain, domainOutboundRoot);
if (messageRootCount) {
logger.debug("Storing the virtual snapshot root in the db", requestContext, methodContext, {
domain,
count: messageRootCount + 1,
timestamp: latestSnapshotTimestamp,
snapshotId: latestSnapshotId,
});
await database.saveSnapshotRoots([
{
id: latestSnapshotId.toString(),
spokeDomain: +domain,
root: domainOutboundRoot,
count: messageRootCount + 1,
timestamp: latestSnapshotTimestamp,
},
]);
domainSnapshotRoot = domainOutboundRoot;
} else {
logger.warn("No message root count found for domain", requestContext, methodContext, {
domain,
domainOutboundRoot,
});
}
} else {
logger.warn(
"Likely subgraph or cartographer delay. No qualified outbound root timestamp found for domain.",
requestContext,
methodContext,
{
domain,
domainOutboundRoot,
outboundRootTimestamp,
cooledDown: outboundRootTimestamp ? outboundRootTimestamp < latestSnapshotTimestamp : false,
},
]);
);
}
}
if (domainSnapshotRoot) {
snapshotRoots.set(domain, domainSnapshotRoot);
} else {
// If the latest db snapshot id for the domain is more than 1 snapshot behind latest,
// and ouboundroot has not cooled down, stop.
// This can happen when the subgraph or carto data is stale.
// And if there is infact a proposal at n-1 on the spoke, it will be picked up by the watcher.
// This is to avoid false alarms.
if (latestDbSnapshotId < latestSnapshotId - 1) {
throw new SubgraphDelayed(config.hubDomain, requestContext, methodContext, {
snapshotRoot,
latestSnapshotId,
latestDbSnapshotId,
outboundRootTimestamp,
latestSnapshotTimestamp,
});
}
latestSnapshotRoot = snapshotRoot.root;
logger.warn("No snapshot root found for domain", requestContext, methodContext, {
domain,
});
throw new NoSnapshotRoot(domain, requestContext, methodContext, {
latestSnapshotId,
latestSnapshotTimestamp,
});
}
snapshotRoots.set(domain, latestSnapshotRoot);
}),
);

// Sort the snapshot roots in the order of root manager domains
rootManagerDomains.forEach((domain) => {
orderedSnapshotRoots.push(snapshotRoots.get(domain)!);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe("Operations: Propose", () => {
);

await proposeHub();
expect(getCurrentOutboundRootStub).callCount(2);
expect(getCurrentOutboundRootStub).callCount(0);
expect(proposeSnapshotStub).callCount(1);
});
});
Expand Down
Loading