Skip to content

Commit

Permalink
Merge branch 'develop' into feature/closeable
Browse files Browse the repository at this point in the history
# Conflicts:
#	packages/sequencer/src/storage/Database.ts
  • Loading branch information
rpanic committed Jan 28, 2025
2 parents 8ec5fcb + 13f9bed commit 6206bd1
Show file tree
Hide file tree
Showing 30 changed files with 485 additions and 168 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pull-request-develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ jobs:

- name: "Test"
run: npm run test:ci
env:
IN_CI: true

integration:
runs-on: ubuntu-latest
Expand Down
10 changes: 9 additions & 1 deletion packages/common/src/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ function logProvable(
}
/* eslint-enable */

// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
if (process.env?.IN_CI ?? false) {
loglevel.setLevel("ERROR");
}

const timeMap: Record<string, number> = {};

function time(label = "time") {
Expand Down Expand Up @@ -111,7 +116,10 @@ export const log = {
},

setLevel: (level: LogLevelDesc) => {
loglevel.setLevel(level);
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
if (!(process.env?.IN_CI ?? false)) {
loglevel.setLevel(level);
}
},

get levels() {
Expand Down
4 changes: 4 additions & 0 deletions packages/common/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export function requireTrue(
}
}

/**
* Utility function to split an array of type T into a record <K, T[]> based on a
* function T => K that determines the key of each record
*/
export function splitArray<T, K extends string | number>(
arr: T[],
split: (t: T) => K
Expand Down
4 changes: 4 additions & 0 deletions packages/persistance/src/PrismaDatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,8 @@ export class PrismaDatabaseConnection
public async close() {
await this.prismaClient.$disconnect();
}

public async executeInTransaction(f: () => Promise<void>) {
await this.prismaClient.$transaction(f);
}
}
13 changes: 13 additions & 0 deletions packages/persistance/src/PrismaRedisDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
RedisConnection,
RedisConnectionConfig,
RedisConnectionModule,
RedisTransaction,
} from "./RedisConnection";

export interface PrismaRedisCombinedConfig {
Expand Down Expand Up @@ -49,6 +50,10 @@ export class PrismaRedisDatabase
return this.redis.redisClient;
}

public get currentMulti(): RedisTransaction {
return this.redis.currentMulti;
}

public create(childContainerProvider: ChildContainerProvider) {
super.create(childContainerProvider);
this.prisma.create(childContainerProvider);
Expand Down Expand Up @@ -79,4 +84,12 @@ export class PrismaRedisDatabase
await this.prisma.pruneDatabase();
await this.redis.pruneDatabase();
}

public async executeInTransaction(f: () => Promise<void>) {
// TODO Long-term we want to somehow make sure we can rollback one data source
// if commiting the other one's transaction fails
await this.prisma.executeInTransaction(async () => {
await this.redis.executeInTransaction(f);
});
}
}
19 changes: 19 additions & 0 deletions packages/persistance/src/RedisConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ export interface RedisConnectionConfig {
username?: string;
}

export type RedisTransaction = ReturnType<RedisClientType["multi"]>;

export interface RedisConnection {
get redisClient(): RedisClientType;
get currentMulti(): RedisTransaction;
}

export class RedisConnectionModule
Expand Down Expand Up @@ -82,4 +85,20 @@ export class RedisConnectionModule
public async pruneDatabase() {
await this.redisClient.flushDb();
}

private multi?: RedisTransaction;

public get currentMulti() {
if (this.multi === undefined) {
throw new Error("Redis multi was access outside of a transaction");
}
return this.multi;
}

public async executeInTransaction(f: () => Promise<void>) {
this.multi = this.redisClient.multi();
await f();
await this.multi.exec();
this.multi = undefined;
}
}
103 changes: 58 additions & 45 deletions packages/persistance/src/services/prisma/PrismaBlockStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
BlockStorage,
BlockWithResult,
BlockWithPreviousResult,
BlockWithMaybeResult,
} from "@proto-kit/sequencer";
import { filterNonNull, log } from "@proto-kit/common";
import {
Expand Down Expand Up @@ -39,7 +40,7 @@ export class PrismaBlockStorage

private async getBlockByQuery(
where: { height: number } | { hash: string }
): Promise<BlockWithResult | undefined> {
): Promise<BlockWithMaybeResult | undefined> {
const dbResult = await this.connection.prismaClient.block.findFirst({
where,
include: {
Expand All @@ -57,18 +58,15 @@ export class PrismaBlockStorage
const transactions = dbResult.transactions.map<TransactionExecutionResult>(
(txresult) => this.transactionResultMapper.mapIn([txresult, txresult.tx])
);
if (dbResult.result === undefined || dbResult.result === null) {
throw new Error(
`No Metadata has been set for block ${JSON.stringify(where)} yet`
);
}

return {
block: {
...this.blockMapper.mapIn(dbResult),
transactions,
},
result: this.blockResultMapper.mapIn(dbResult.result),
result: dbResult.result
? this.blockResultMapper.mapIn(dbResult.result)
: undefined,
};
}

Expand Down Expand Up @@ -100,45 +98,42 @@ export class PrismaBlockStorage

const { prismaClient } = this.connection;

await prismaClient.$transaction([
prismaClient.transaction.createMany({
data: block.transactions.map((txr) =>
this.transactionMapper.mapOut(txr.tx)
),
skipDuplicates: true,
}),

prismaClient.block.create({
data: {
...encodedBlock,
beforeNetworkState:
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
duringNetworkState:
encodedBlock.duringNetworkState as Prisma.InputJsonObject,

transactions: {
createMany: {
data: transactions.map((tx) => {
return {
status: tx.status,
statusMessage: tx.statusMessage,
txHash: tx.txHash,

stateTransitions:
tx.stateTransitions as Prisma.InputJsonArray,
protocolTransitions:
tx.protocolTransitions as Prisma.InputJsonArray,
events: tx.events as Prisma.InputJsonArray,
};
}),
skipDuplicates: true,
},
},
await prismaClient.transaction.createMany({
data: block.transactions.map((txr) =>
this.transactionMapper.mapOut(txr.tx)
),
skipDuplicates: true,
});

await prismaClient.block.create({
data: {
...encodedBlock,
beforeNetworkState:
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
duringNetworkState:
encodedBlock.duringNetworkState as Prisma.InputJsonObject,

batchHeight: undefined,
transactions: {
createMany: {
data: transactions.map((tx) => {
return {
status: tx.status,
statusMessage: tx.statusMessage,
txHash: tx.txHash,

stateTransitions: tx.stateTransitions as Prisma.InputJsonArray,
protocolTransitions:
tx.protocolTransitions as Prisma.InputJsonArray,
events: tx.events as Prisma.InputJsonArray,
};
}),
skipDuplicates: true,
},
},
}),
]);

batchHeight: undefined,
},
});
}

public async pushResult(result: BlockResult): Promise<void> {
Expand Down Expand Up @@ -169,7 +164,9 @@ export class PrismaBlockStorage
return (result?._max.height ?? -1) + 1;
}

public async getLatestBlock(): Promise<BlockWithResult | undefined> {
public async getLatestBlockAndResult(): Promise<
BlockWithMaybeResult | undefined
> {
const latestBlock = await this.connection.prismaClient.$queryRaw<
{ hash: string }[]
>`SELECT b1."hash" FROM "Block" b1
Expand All @@ -185,6 +182,22 @@ export class PrismaBlockStorage
});
}

public async getLatestBlock(): Promise<BlockWithResult | undefined> {
const result = await this.getLatestBlockAndResult();
if (result !== undefined) {
if (result.result === undefined) {
throw new Error(
`Block result for block ${result.block.height.toString()} not found`
);
}
return {
block: result.block,
result: result.result,
};
}
return result;
}

public async getNewBlocks(): Promise<BlockWithPreviousResult[]> {
const blocks = await this.connection.prismaClient.block.findMany({
where: {
Expand Down
33 changes: 16 additions & 17 deletions packages/persistance/src/services/prisma/PrismaMessageStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,24 @@ export class PrismaMessageStorage implements MessageStorage {
);

const { prismaClient } = this.connection;
await prismaClient.$transaction([
prismaClient.transaction.createMany({
data: transactions,
skipDuplicates: true,
}),

prismaClient.incomingMessageBatch.create({
data: {
fromMessageHash,
toMessageHash,
messages: {
createMany: {
data: transactions.map((transaction) => ({
transactionHash: transaction.hash,
})),
},
await prismaClient.transaction.createMany({
data: transactions,
skipDuplicates: true,
});

await prismaClient.incomingMessageBatch.create({
data: {
fromMessageHash,
toMessageHash,
messages: {
createMany: {
data: transactions.map((transaction) => ({
transactionHash: transaction.hash,
})),
},
},
}),
]);
},
});
}
}
22 changes: 10 additions & 12 deletions packages/persistance/src/services/prisma/PrismaStateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,17 @@ export class PrismaStateService implements AsyncStateService {
mask: this.mask,
}));

await prismaClient.$transaction([
prismaClient.state.deleteMany({
where: {
path: {
in: this.cache.map((x) => new Decimal(x.key.toString())),
},
mask: this.mask,
await prismaClient.state.deleteMany({
where: {
path: {
in: this.cache.map((x) => new Decimal(x.key.toString())),
},
}),
prismaClient.state.createMany({
data,
}),
]);
mask: this.mask,
},
});
await prismaClient.state.createMany({
data,
});

this.cache = [];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class RedisMerkleTreeStore implements AsyncMerkleTreeStore {
}

try {
await this.connection.redisClient.mSet(array.flat(1));
this.connection.currentMulti.mSet(array.flat(1));
} catch (error) {
log.error(error);
}
Expand Down Expand Up @@ -62,8 +62,8 @@ export class RedisMerkleTreeStore implements AsyncMerkleTreeStore {
public writeNodes(nodes: MerkleTreeNode[]): void {
this.cache = this.cache.concat(nodes);
// TODO Filter distinct
// We might not even need this, since the distinctness filter might already
// be implicitely done by the layer above (i.e. cachedmtstore)
// We might not even need this, since the distinctness filter might already
// be implicitely done by the layer above (i.e. cachedmtstore)

// Leaving this for now until I get to implementing it
// const concat = this.cache.concat(nodes);
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/query/BlockStorageNetworkStateModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class BlockStorageNetworkStateModule
* with afterBundle() hooks executed
*/
public async getStagedNetworkState(): Promise<NetworkState | undefined> {
const result = await this.unprovenQueue.getLatestBlock();
const result = await this.unprovenStorage.getLatestBlock();
return result?.result.afterNetworkState;
}

Expand Down
Loading

0 comments on commit 6206bd1

Please sign in to comment.