Skip to content

Commit

Permalink
fix: transaction tracker now compares transactions in linear time bef…
Browse files Browse the repository at this point in the history
…ore emission

- rollback logic optimized to only check transactions in reverse order

Co-Authored-By: mirceahasegan <mircea.hasegan@iohk.io>
  • Loading branch information
AngelCastilloB and mirceahasegan committed Nov 18, 2024
1 parent 7c8b6e9 commit 2306f10
Show file tree
Hide file tree
Showing 4 changed files with 532 additions and 68 deletions.
14 changes: 12 additions & 2 deletions packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as AssetId from '../assetId';
import * as Crypto from '@cardano-sdk/crypto';
import { Cardano, Paginated } from '@cardano-sdk/core';
import { Cardano, Paginated, TransactionsByAddressesArgs } from '@cardano-sdk/core';
import { currentEpoch, handleAssetId, ledgerTip, stakeCredential } from './mockData';
import { somePartialStakePools } from '../createStubStakePoolProvider';
import delay from 'delay';
Expand Down Expand Up @@ -219,10 +219,20 @@ export const mockChainHistoryProvider2 = (delayMs: number) => {
const delayedJestFn = <T>(resolvedValue: T) =>
jest.fn().mockImplementationOnce(() => delay(delayMs).then(() => resolvedValue));

const blockRangeTransactions = (blockRangeStart: number): Paginated<Cardano.HydratedTx> => {
const pageResults = queryTransactionsResult2.pageResults.filter(
(res) => res.blockHeader.blockNo >= blockRangeStart
);

return { pageResults, totalResultCount: pageResults.length };
};

return {
blocksByHashes: delayedJestFn(blocksByHashes),
healthCheck: delayedJestFn({ ok: true }),
transactionsByAddresses: delayedJestFn(queryTransactionsResult2),
transactionsByAddresses: jest.fn(({ blockRange }: TransactionsByAddressesArgs) =>
delay(delayMs).then(() => blockRangeTransactions(blockRange?.lowerBound || 0))
),
transactionsByHashes: delayedJestFn(queryTransactionsResult2)
};
};
Expand Down
225 changes: 161 additions & 64 deletions packages/wallet/src/services/TransactionsTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ import { distinctBlock, signedTxsEquals, transactionsEquals, txInEquals } from '
import { WitnessedTx } from '@cardano-sdk/key-management';
import { newAndStoredMulticast } from './util/newAndStoredMulticast';
import chunk from 'lodash/chunk.js';
import intersectionBy from 'lodash/intersectionBy.js';
import sortBy from 'lodash/sortBy.js';
import unionBy from 'lodash/unionBy.js';

export interface TransactionsTrackerProps {
chainHistoryProvider: ChainHistoryProvider;
Expand Down Expand Up @@ -107,15 +105,159 @@ const allTransactionsByAddresses = async (
return response;
};

export const createAddressTransactionsProvider = ({
const getLastTransactionsAtBlock = (
transactions: Cardano.HydratedTx[],
blockNo: Cardano.BlockNo
): Cardano.HydratedTx[] => {
const txsFromSameBlock = [];

for (let i = transactions.length - 1; i >= 0; --i) {
const tx = transactions[i];
if (tx.blockHeader.blockNo === blockNo) {
txsFromSameBlock.push(tx);
} else {
break;
}
}

return txsFromSameBlock;
};

export const revertLastBlock = (
localTransactions: Cardano.HydratedTx[],
blockNo: Cardano.BlockNo,
rollback$: Subject<Cardano.HydratedTx>,
newTransactions: Cardano.HydratedTx[],
logger: Logger
) => {
const result = [...localTransactions];

while (result.length > 0) {
const lastKnownTx = result[result.length - 1];

if (lastKnownTx.blockHeader.blockNo === blockNo) {
// only emit if the tx is also not present in the new transactions to be added
if (newTransactions.findIndex((tx) => tx.id === lastKnownTx.id) === -1) {
logger.debug(`Transaction ${lastKnownTx.id} was rolled back`);
rollback$.next(lastKnownTx);
}

result.pop();
} else {
break;
}
}

return result;
};

const findIntersectionAndUpdateTxStore = ({
chainHistoryProvider,
addresses$,
logger,
store,
retryBackoffConfig,
onFatalError,
tipBlockHeight$,
store,
logger,
onFatalError
}: TransactionsTrackerInternalsProps): TransactionsTrackerInternals => {
rollback$,
localTransactions,
addresses
}: Pick<
TransactionsTrackerInternalsProps,
'chainHistoryProvider' | 'logger' | 'store' | 'retryBackoffConfig' | 'onFatalError' | 'tipBlockHeight$'
> & {
localTransactions: Cardano.HydratedTx[];
rollback$: Subject<Cardano.HydratedTx>;
addresses: Cardano.PaymentAddress[];
}) =>
coldObservableProvider({
// Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first
// It should also help when using poor internet connection.
// Caveat is that local transactions might get out of date...
combinator: exhaustMap,
equals: transactionsEquals,
onFatalError,
// eslint-disable-next-line sonarjs/cognitive-complexity
provider: async () => {
let rollbackOcurred = false;
// eslint-disable-next-line no-constant-condition
while (true) {
const lastStoredTransaction: Cardano.HydratedTx | undefined = localTransactions[localTransactions.length - 1];

lastStoredTransaction &&
logger.debug(
`Last stored tx: ${lastStoredTransaction?.id} block:${lastStoredTransaction?.blockHeader.blockNo}`
);

const lowerBound = lastStoredTransaction?.blockHeader.blockNo;
const newTransactions = await allTransactionsByAddresses(chainHistoryProvider, {
addresses,
blockRange: { lowerBound }
});

logger.debug(
`chainHistoryProvider returned ${newTransactions.length} transactions`,
lowerBound !== undefined && `since block ${lowerBound}`
);

// Fetching transactions from scratch, nothing else to do here.
if (lowerBound === undefined) {
if (newTransactions.length > 0) {
localTransactions = newTransactions;
store.setAll(newTransactions);
}

return newTransactions;
}

// If no transactions found from that block range, it means the last known block has been rolled back.
if (newTransactions.length === 0) {
localTransactions = revertLastBlock(localTransactions, lowerBound, rollback$, newTransactions, logger);
rollbackOcurred = true;

continue;
}

const localTxsFromSameBlock = getLastTransactionsAtBlock(localTransactions, lowerBound);
const firstSegmentOfNewTransactions = newTransactions.slice(0, localTxsFromSameBlock.length);

// The first segment of new transaction should match exactly (same txs and same order) our last know TXs. Otherwise
// roll them back and re-apply in new order.
const sameTxAndOrder = localTxsFromSameBlock.every(
(tx, index) => tx.id === firstSegmentOfNewTransactions[index].id
);

if (!sameTxAndOrder) {
localTransactions = revertLastBlock(localTransactions, lowerBound, rollback$, newTransactions, logger);
rollbackOcurred = true;

continue;
}

// No rollbacks, if they overlap 100% do nothing, otherwise add the difference.
const areTransactionsSame =
newTransactions.length === localTxsFromSameBlock.length &&
localTxsFromSameBlock.every((tx, index) => tx.id === newTransactions[index].id);

if (!areTransactionsSame) {
// Skip overlapping transactions to avoid duplicates
localTransactions = [...localTransactions, ...newTransactions.slice(localTxsFromSameBlock.length)];
store.setAll(localTransactions);
} else if (rollbackOcurred) {
// This case handles rollbacks without new additions
store.setAll(localTransactions);
}

return localTransactions;
}
},
retryBackoffConfig,
trigger$: tipBlockHeight$
});

export const createAddressTransactionsProvider = (
props: TransactionsTrackerInternalsProps
): TransactionsTrackerInternals => {
const { addresses$, store, logger } = props;
const rollback$ = new Subject<Cardano.HydratedTx>();
const storedTransactions$ = store.getAll().pipe(share());
return {
Expand All @@ -127,61 +269,14 @@ export const createAddressTransactionsProvider = ({
)
),
combineLatest([addresses$, storedTransactions$.pipe(defaultIfEmpty([] as Cardano.HydratedTx[]))]).pipe(
switchMap(([addresses, storedTransactions]) => {
let localTransactions: Cardano.HydratedTx[] = [...storedTransactions];

return coldObservableProvider({
// Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first
// It should also help when using poor internet connection.
// Caveat is that local transactions might get out of date...
combinator: exhaustMap,
equals: transactionsEquals,
onFatalError,
provider: async () => {
// eslint-disable-next-line no-constant-condition
while (true) {
const lastStoredTransaction: Cardano.HydratedTx | undefined =
localTransactions[localTransactions.length - 1];

lastStoredTransaction &&
logger.debug(
`Last stored tx: ${lastStoredTransaction?.id} block:${lastStoredTransaction?.blockHeader.blockNo}`
);

const lowerBound = lastStoredTransaction?.blockHeader.blockNo;
const newTransactions = await allTransactionsByAddresses(chainHistoryProvider, {
addresses,
blockRange: { lowerBound }
});

logger.debug(
`chainHistoryProvider returned ${newTransactions.length} transactions`,
lowerBound !== undefined && `since block ${lowerBound}`
);
const duplicateTransactions =
lastStoredTransaction && intersectionBy(localTransactions, newTransactions, (tx) => tx.id);
if (typeof duplicateTransactions !== 'undefined' && duplicateTransactions.length === 0) {
const rollbackTransactions = localTransactions.filter(
({ blockHeader: { blockNo } }) => blockNo >= lowerBound
);

from(rollbackTransactions)
.pipe(tap((tx) => logger.debug(`Transaction ${tx.id} was rolled back`)))
.subscribe((v) => rollback$.next(v));

// Rollback by 1 block, try again in next loop iteration
localTransactions = localTransactions.filter(({ blockHeader: { blockNo } }) => blockNo < lowerBound);
} else {
localTransactions = unionBy(localTransactions, newTransactions, (tx) => tx.id);
store.setAll(localTransactions);
return localTransactions;
}
}
},
retryBackoffConfig,
trigger$: tipBlockHeight$
});
})
switchMap(([addresses, storedTransactions]) =>
findIntersectionAndUpdateTxStore({
addresses,
localTransactions: [...storedTransactions],
rollback$,
...props
})
)
)
)
};
Expand Down Expand Up @@ -247,7 +342,9 @@ export const createTransactionsTracker = (

const transactionsSource$ = new TrackerSubject(txSource$);

const historicalTransactions$ = createHistoricalTransactionsTrackerSubject(transactionsSource$);
const historicalTransactions$ = createHistoricalTransactionsTrackerSubject(transactionsSource$).pipe(
tap((transactions) => logger.debug(`History transactions count: ${transactions?.length || 0}`))
);

const [onChainNewTxPhase2Failed$, onChainNewTxSuccess$] = partition(
newTransactions$(historicalTransactions$).pipe(share()),
Expand Down
6 changes: 5 additions & 1 deletion packages/wallet/src/services/util/equals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ export const sameSortedArrayItems = <T>(arrayA: T[], arrayB: T[], itemEquals: (a
return true;
};

export const transactionsEquals = (a: Cardano.HydratedTx[], b: Cardano.HydratedTx[]) => sameArrayItems(a, b, txEquals);
export const transactionsEquals = (a: Cardano.HydratedTx[], b: Cardano.HydratedTx[]) => {
if (a === b) return true;

return sameSortedArrayItems(a, b, txEquals);
};

export const txInEquals = (a: Cardano.TxIn, b: Cardano.TxIn) => a.txId === b.txId && a.index === b.index;

Expand Down
Loading

0 comments on commit 2306f10

Please sign in to comment.