Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 committed Dec 13, 2024
1 parent 075171f commit 2c27b95
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 44 deletions.
5 changes: 1 addition & 4 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,6 @@ export class AztecNodeService implements AztecNode, Traceable {
}
}

// TODO(md): change this to run in parrel the same as in the p2p client - maybe not required
// as everything is sub ms apart from the double spend validator
public async isValidTx(tx: Tx, isSimulation: boolean = false): Promise<boolean> {
const blockNumber = (await this.blockSource.getBlockNumber()) + 1;
const db = this.worldStateSynchronizer.getCommitted();
Expand All @@ -857,8 +855,7 @@ export class AztecNodeService implements AztecNode, Traceable {
new DataTxValidator(),
new MetadataTxValidator(new Fr(this.l1ChainId), new Fr(blockNumber)),
new DoubleSpendTxValidator({
getNullifierIndices: nullifiers =>
db.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers).then(x => x.filter(index => index !== undefined) as bigint[]),
getNullifierIndices: nullifiers => db.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers),
}),
];

Expand Down
58 changes: 29 additions & 29 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { createPeerScoreParams, createTopicScoreParams } from '@chainsafe/libp2p
import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { identify } from '@libp2p/identify';
import { Message, PeerId, TopicValidatorResult } from '@libp2p/interface';
import { type Message, type PeerId, TopicValidatorResult } from '@libp2p/interface';
import '@libp2p/kad-dht';
import { mplex } from '@libp2p/mplex';
import { tcp } from '@libp2p/tcp';
Expand Down Expand Up @@ -61,7 +61,6 @@ import {
} from '../reqresp/interface.js';
import { ReqResp } from '../reqresp/reqresp.js';
import type { P2PService, PeerDiscoveryService } from '../service.js';
import { Timer } from '@aztec/foundation/timer';

interface MessageValidator {
validator: {
Expand All @@ -76,9 +75,7 @@ interface ValidationResult {
severity: PeerErrorSeverity;
}

type ValidationOutcome =
| { allPassed: true }
| { allPassed: false; failure: ValidationResult };
type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: ValidationResult };

/**
* Lib P2P implementation of the P2PService interface.
Expand Down Expand Up @@ -160,10 +157,10 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement

// add GossipSub listener
this.node.services.pubsub.addEventListener('gossipsub:message', async e => {
const { msg, propagationSource: peerId } = e.detail;
const { msg } = e.detail;
this.logger.trace(`Received PUBSUB message.`);

await this.jobQueue.put(() => this.handleNewGossipMessage(msg, peerId));
await this.jobQueue.put(() => this.handleNewGossipMessage(msg));
});

// Start running promise for peer discovery
Expand Down Expand Up @@ -306,7 +303,6 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
},
});


// Create request response protocol handlers
/**
* Handler for tx requests
Expand Down Expand Up @@ -405,10 +401,10 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
* @param topic - The message's topic.
* @param data - The message data
*/
private async handleNewGossipMessage(message: RawGossipMessage, peerId: PeerId) {
private async handleNewGossipMessage(message: RawGossipMessage) {
if (message.topic === Tx.p2pTopic) {
const tx = Tx.fromBuffer(Buffer.from(message.data));
await this.processTxFromPeer(tx, peerId);
await this.processTxFromPeer(tx);
}
if (message.topic === BlockAttestation.p2pTopic && this.clientType === P2PClientType.Full) {
const attestation = BlockAttestation.fromBuffer(Buffer.from(message.data));
Expand Down Expand Up @@ -497,12 +493,11 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
});
}

private async processTxFromPeer(tx: Tx, _peerId: PeerId): Promise<void> {
private async processTxFromPeer(tx: Tx): Promise<void> {
const txHash = tx.getTxHash();
const txHashString = txHash.toString();
this.logger.verbose(`Received tx ${txHashString} from external peer.`);


await this.mempools.txPool.addTxs([tx]);
}

/**
Expand Down Expand Up @@ -542,18 +537,25 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
return true;
}

private async validatePropagatedTxFromMessage(propagationSource: PeerId, msg: Message): Promise<TopicValidatorResult> {
private async validatePropagatedTxFromMessage(
propagationSource: PeerId,
msg: Message,
): Promise<TopicValidatorResult> {
const tx = Tx.fromBuffer(Buffer.from(msg.data));
const timer = new Timer();
const isValid = await this.validatePropagatedTx(tx, propagationSource);
this.logger.info(`\n\n\n validatePropagatedTx took ${timer.ms()}ms \n\n\n`);
this.logger.trace(`validatePropagatedTx: ${isValid}`, {
[Attributes.TX_HASH]: tx.getTxHash().toString(),
[Attributes.P2P_ID]: propagationSource.toString(),
});
return isValid ? TopicValidatorResult.Accept : TopicValidatorResult.Reject;
}

/**
* Validate a tx that has been propagated from a peer.
* @param tx - The tx to validate.
* @param peerId - The peer ID of the peer that sent the tx.
* @returns True if the tx is valid, false otherwise.
*/
@trackSpan('Libp2pService.validatePropagatedTx', tx => ({
[Attributes.TX_HASH]: tx.getTxHash().toString(),
}))
Expand All @@ -571,7 +573,9 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
// Double spend validator has a special case handler
if (name === 'doubleSpendValidator') {
const isValid = await this.handleDoubleSpendFailure(tx, blockNumber, peerId);
if (isValid) return true;
if (isValid) {
return true;
}
}

this.peerManager.penalizePeer(peerId, severity);
Expand Down Expand Up @@ -603,10 +607,9 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
},
doubleSpendValidator: {
validator: new DoubleSpendTxValidator({
getNullifierIndices: async (nullifiers: Buffer[]) => {
getNullifierIndices: (nullifiers: Buffer[]) => {
const merkleTree = this.worldStateSynchronizer.getCommitted();
const indices = await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers);
return indices.filter(index => index !== undefined) as bigint[];
return merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers);
},
}),
severity: PeerErrorSeverity.HighToleranceError,
Expand All @@ -622,12 +625,10 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
*/
private async runValidations(
tx: Tx,
messageValidators: Record<string, MessageValidator>
messageValidators: Record<string, MessageValidator>,
): Promise<ValidationOutcome> {
const validationPromises = Object.entries(messageValidators).map(async ([name, { validator, severity }]) => {
const timer = new Timer();
const isValid = await validator.validateTx(tx);
this.logger.info(`\n\n\n VALIDATOR: ${name} took ${timer.ms()}ms \n\n\n`);
return { name, isValid, severity };
});

Expand All @@ -636,16 +637,16 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement

// A promise that resolves when the first validation fails
const firstFailure = Promise.race(
validationPromises.map(async (promise) => {
validationPromises.map(async promise => {
const result = await promise;
return result.isValid ? new Promise(() => {}) : result;
})
}),
);

// Wait for the first validation to fail or all validations to pass
const result = await Promise.race([
allValidations.then(() => ({ allPassed: true as const })),
firstFailure.then(failure => ({ allPassed: false as const, failure: failure as ValidationResult }))
firstFailure.then(failure => ({ allPassed: false as const, failure: failure as ValidationResult })),
]);

// If all validations pass, allPassed will be true, if failed, then the failure will be the first validation to fail
Expand All @@ -669,12 +670,11 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
}

const snapshotValidator = new DoubleSpendTxValidator({
getNullifierIndices: async (nullifiers: Buffer[]) => {
getNullifierIndices: (nullifiers: Buffer[]) => {
const merkleTree = this.worldStateSynchronizer.getSnapshot(
blockNumber - this.config.severePeerPenaltyBlockLength,
);
const indices = await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers);
return indices.filter(index => index !== undefined) as bigint[];
return merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers);
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('DoubleSpendTxValidator', () => {
beforeEach(() => {
nullifierSource = mock<NullifierSource>({
getNullifierIndices: mockFn().mockImplementation(() => {
return Promise.resolve(undefined);
return Promise.resolve([undefined]);
}),
});
txValidator = new DoubleSpendTxValidator(nullifierSource);
Expand Down
13 changes: 6 additions & 7 deletions yarn-project/p2p/src/tx_validator/double_spend_validator.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { type AnyTx, Tx, type TxValidator } from '@aztec/circuit-types';
import { Fr } from '@aztec/circuits.js';
import { createLogger } from '@aztec/foundation/log';

export interface NullifierSource {
getNullifierIndices: (nullifiers: Buffer[]) => Promise<bigint[]>;
getNullifierIndices: (nullifiers: Buffer[]) => Promise<(bigint | undefined)[]>;
}

export class DoubleSpendTxValidator<T extends AnyTx> implements TxValidator<T> {
Expand Down Expand Up @@ -36,7 +35,7 @@ export class DoubleSpendTxValidator<T extends AnyTx> implements TxValidator<T> {
}

async #uniqueNullifiers(tx: AnyTx, thisBlockNullifiers: Set<bigint>): Promise<boolean> {
const nullifiers = (tx instanceof Tx ? tx.data.getNonEmptyNullifiers() : tx.txEffect.nullifiers);
const nullifiers = tx instanceof Tx ? tx.data.getNonEmptyNullifiers() : tx.txEffect.nullifiers;

// Ditch this tx if it has repeated nullifiers
const uniqueNullifiers = new Set(nullifiers);
Expand All @@ -46,14 +45,14 @@ export class DoubleSpendTxValidator<T extends AnyTx> implements TxValidator<T> {
}

if (this.isValidatingBlock) {
// TODO: remove all this type casting
for (const nullifier of nullifiers.map(n => n.toBigInt())) {
if (thisBlockNullifiers.has(nullifier)) {
for (const nullifier of nullifiers) {
const nullifierBigInt = nullifier.toBigInt();
if (thisBlockNullifiers.has(nullifierBigInt)) {
this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for repeating a nullifier in the same block`);
return false;
}

thisBlockNullifiers.add(nullifier);
thisBlockNullifiers.add(nullifierBigInt);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ export class TxValidatorFactory {
) {
this.nullifierSource = {
getNullifierIndices: nullifiers =>
this.committedDb.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers).then(x => x.filter(index => index !== undefined) as bigint[]),
this.committedDb
.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers)
.then(x => x.filter(index => index !== undefined) as bigint[]),
};

this.publicStateSource = {
Expand All @@ -52,8 +54,7 @@ export class TxValidatorFactory {

validatorForProcessedTxs(fork: MerkleTreeReadOperations): TxValidator<ProcessedTx> {
return new DoubleSpendTxValidator({
getNullifierIndices: nullifiers =>
fork.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers).then(x => x.filter(index => index !== undefined) as bigint[]),
getNullifierIndices: nullifiers => fork.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers),
});
}
}

0 comments on commit 2c27b95

Please sign in to comment.