Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Account balance aggregation app #139

Merged
merged 29 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8476296
feat: Account balance application
satran004 Mar 5, 2024
3edf97b
chore: Dummy invalidtransaction storage
satran004 Mar 6, 2024
1e4c492
chore: Update yaci store dependency and auto index management
satran004 Mar 6, 2024
ca5b51b
chore: Update yaci store dependency and auto index management
satran004 Mar 6, 2024
6a11337
chore: Add balance views
satran004 Mar 6, 2024
715bf5b
fix: address balance view query
satran004 Mar 6, 2024
5fdf46b
chore: Convert to lower case
satran004 Mar 6, 2024
1834d54
chore: Handle Genesis Utxos
satran004 Mar 6, 2024
f26fcbb
chore: Add index for slot
satran004 Mar 7, 2024
7da972b
chore: Replace JPA with JOOQ for AddressTxAmount
satran004 Mar 7, 2024
79a6a9e
chore: Replace JPA with JOOQ for AddressTxAmount
satran004 Mar 7, 2024
93aec7d
chore: Save by block if # of records > 500
satran004 Mar 7, 2024
b658069
chore: Save by block if # of records > 200
satran004 Mar 7, 2024
73add72
chore: Save by block if # of records > 200
satran004 Mar 7, 2024
5cc37f7
chore: Update
satran004 Mar 7, 2024
9d0e186
chore: Update
satran004 Mar 7, 2024
b9dc6a9
chore: Update
satran004 Mar 7, 2024
654c7d9
chore: Update
satran004 Mar 7, 2024
a26fa5d
chore: Update
satran004 Mar 7, 2024
5a8a01d
chore: Update
satran004 Mar 7, 2024
f62d427
chore: Update
satran004 Mar 7, 2024
d1b167e
chore: Update
satran004 Mar 7, 2024
9c86b09
chore: Update
satran004 Mar 7, 2024
1f34b5b
chore: Update
satran004 Mar 7, 2024
adfda88
chore: Update
satran004 Mar 8, 2024
ae394d7
chore: Rollback
satran004 Mar 8, 2024
a71e257
chore: Virtual thread
satran004 Mar 8, 2024
1609ec2
chore: Virtual thread
satran004 Mar 8, 2024
0ca3045
chore: Remove extra columns
satran004 Mar 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions aggregates/account/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
dependencies {
implementation project(":components:common")
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation(libs.cardano.client.lib)
api(libs.yaci.store.starter)
implementation(libs.yaci.store.utxo.starter)
implementation(libs.yaci.store.account.starter)

testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

publishing {
publications {
mavenJava(MavenPublication) {
pom {
name = 'Ledger Sync Account'
description = 'Ledger Sync Account Module'
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.cardanofoundation.ledgersync.account;

import com.bloxbean.cardano.yaci.store.utxo.domain.InvalidTransaction;
import com.bloxbean.cardano.yaci.store.utxo.storage.impl.InvalidTransactionStorageImpl;
import com.bloxbean.cardano.yaci.store.utxo.storage.impl.repository.InvalidTransactionRepository;
import org.springframework.stereotype.Component;

//TODO -- remove this class later
//Overrides the save method to do nothing to fix the NUL char issue in invalid transaction
//https://github.com/bloxbean/yaci-store/issues/209
@Component
public class DummyInvalidTransactionStorage extends InvalidTransactionStorageImpl {
public DummyInvalidTransactionStorage(InvalidTransactionRepository repository) {
super(repository);
}

@Override
public InvalidTransaction save(InvalidTransaction invalidTransaction) {
return invalidTransaction; //do nothing
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.cardanofoundation.ledgersync.account.domain;

import com.bloxbean.cardano.yaci.store.common.domain.BlockAwareDomain;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import java.math.BigInteger;

@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class AddressTxAmount extends BlockAwareDomain {
private String address;
private String unit;
private String txHash;
private Long slot;
private BigInteger quantity;
private String stakeAddress;
private Integer epoch;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package org.cardanofoundation.ledgersync.account.processor;

import com.bloxbean.cardano.yaci.store.client.utxo.UtxoClient;
import com.bloxbean.cardano.yaci.store.common.domain.AddressUtxo;
import com.bloxbean.cardano.yaci.store.common.domain.Amt;
import com.bloxbean.cardano.yaci.store.common.domain.UtxoKey;
import com.bloxbean.cardano.yaci.store.events.EventMetadata;
import com.bloxbean.cardano.yaci.store.events.RollbackEvent;
import com.bloxbean.cardano.yaci.store.events.internal.ReadyForBalanceAggregationEvent;
import com.bloxbean.cardano.yaci.store.utxo.domain.AddressUtxoEvent;
import com.bloxbean.cardano.yaci.store.utxo.domain.TxInputOutput;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount;
import org.cardanofoundation.ledgersync.account.storage.AddressTxAmountStorage;
import org.springframework.context.event.EventListener;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigInteger;
import java.util.*;

import static org.cardanofoundation.ledgersync.account.util.AddressUtil.getAddress;

@Component
@RequiredArgsConstructor
@Slf4j
public class AddressTxAmountProcessor {
public static final int BLOCK_ADDRESS_TX_AMT_THRESHOLD = 100; //Threshold to save address_tx_amounts records for block

private final AddressTxAmountStorage addressTxAmountStorage;
private final UtxoClient utxoClient;

private List<Pair<EventMetadata, TxInputOutput>> pendingTxInputOutputListCache = Collections.synchronizedList(new ArrayList<>());
private List<AddressTxAmount> addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>());

@EventListener
@Transactional
public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) {
//Ignore Genesis Txs as it's handled by GEnesisBlockAddressTxAmtProcessor
if (addressUtxoEvent.getEventMetadata().getSlot() == -1)
return;

var txInputOutputList = addressUtxoEvent.getTxInputOutputs();
if (txInputOutputList == null || txInputOutputList.isEmpty())
return;

List<AddressTxAmount> addressTxAmountList = new ArrayList<>();

for (var txInputOutput : txInputOutputList) {
var txAddressTxAmountEntities = processAddressAmountForTx(addressUtxoEvent.getEventMetadata(), txInputOutput, false);
if (txAddressTxAmountEntities == null || txAddressTxAmountEntities.isEmpty()) continue;

addressTxAmountList.addAll(txAddressTxAmountEntities);
}

if (addressTxAmountList.size() > BLOCK_ADDRESS_TX_AMT_THRESHOLD) {
if (log.isDebugEnabled())
log.debug("Saving address_tx_amounts records : {} -- {}", addressTxAmountList.size(), addressUtxoEvent.getEventMetadata().getBlock());
addressTxAmountStorage.save(addressTxAmountList); //Save
} else if (addressTxAmountList.size() > 0) {
addressTxAmountListCache.addAll(addressTxAmountList);
}
}

private List<AddressTxAmount> processAddressAmountForTx(EventMetadata metadata, TxInputOutput txInputOutput,
boolean throwExceptionOnFailure) {
var txHash = txInputOutput.getTxHash();
var inputs = txInputOutput.getInputs();
var outputs = txInputOutput.getOutputs();
if (inputs == null || inputs.isEmpty() || outputs == null || outputs.isEmpty())
return null;

var inputUtxoKeys = inputs.stream()
.map(input -> new UtxoKey(input.getTxHash(), input.getOutputIndex()))
.toList();

var inputAddressUtxos = utxoClient.getUtxosByIds(inputUtxoKeys)
.stream()
.filter(Objects::nonNull)
.toList();

if (inputAddressUtxos.size() != inputUtxoKeys.size()) {
log.debug("Unable to get inputs for all input keys for account balance calculation. Add this Tx to cache to process later : " + txHash);
if (throwExceptionOnFailure)
throw new IllegalStateException("Unable to get inputs for all input keys for account balance calculation : " + inputUtxoKeys);
else
pendingTxInputOutputListCache.add(Pair.of(metadata, txInputOutput));

return Collections.emptyList();
}

var txAddressTxAmount =
processTxAmount(txHash, metadata, inputAddressUtxos, outputs);
return txAddressTxAmount;
}

private List<AddressTxAmount> processTxAmount(String txHash, EventMetadata metadata, List<AddressUtxo> inputs, List<AddressUtxo> outputs) {
Map<Pair<String, String>, BigInteger> addressTxAmountMap = new HashMap<>();
Map<String, AddressDetails> addressToAddressDetailsMap = new HashMap<>();
Map<String, AssetDetails> unitToAssetDetailsMap = new HashMap<>();

//Subtract input amounts
for (var input : inputs) {
for (Amt amt : input.getAmounts()) {
var key = Pair.of(input.getOwnerAddr(), amt.getUnit());
var amount = addressTxAmountMap.getOrDefault(key, BigInteger.ZERO);
amount = amount.subtract(amt.getQuantity());
addressTxAmountMap.put(key, amount);

var addressDetails = new AddressDetails(input.getOwnerStakeAddr(), input.getOwnerPaymentCredential(), input.getOwnerStakeCredential());
addressToAddressDetailsMap.put(input.getOwnerAddr(), addressDetails);

var assetDetails = new AssetDetails(amt.getPolicyId(), amt.getAssetName());
unitToAssetDetailsMap.put(amt.getUnit(), assetDetails);
}
}

//Add output amounts
for (var output : outputs) {
for (Amt amt : output.getAmounts()) {
var key = Pair.of(output.getOwnerAddr(), amt.getUnit());
var amount = addressTxAmountMap.getOrDefault(key, BigInteger.ZERO);
amount = amount.add(amt.getQuantity());
addressTxAmountMap.put(key, amount);

var addressDetails = new AddressDetails(output.getOwnerStakeAddr(), output.getOwnerPaymentCredential(), output.getOwnerStakeCredential());
addressToAddressDetailsMap.put(output.getOwnerAddr(), addressDetails);

var assetDetails = new AssetDetails(amt.getPolicyId(), amt.getAssetName());
unitToAssetDetailsMap.put(amt.getUnit(), assetDetails);
}
}

return (List<AddressTxAmount>) addressTxAmountMap.entrySet()
.stream()
.filter(entry -> entry.getValue().compareTo(BigInteger.ZERO) != 0)
.map(entry -> {
var addressDetails = addressToAddressDetailsMap.get(entry.getKey().getFirst());
var assetDetails = unitToAssetDetailsMap.get(entry.getKey().getSecond());

//address and full address if the address is too long
var addressTuple = getAddress(entry.getKey().getFirst());

return AddressTxAmount.builder()
.address(addressTuple._1)
.unit(entry.getKey().getSecond())
.txHash(txHash)
.slot(metadata.getSlot())
.quantity(entry.getValue())
.stakeAddress(addressDetails.ownerStakeAddress)
.epoch(metadata.getEpochNumber())
.blockNumber(metadata.getBlock())
.blockTime(metadata.getBlockTime())
.build();
}).toList();
}

@EventListener
@Transactional //We can also listen to CommitEvent here
public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyForBalanceAggregationEvent) {
try {
List<AddressTxAmount> addressTxAmountList = new ArrayList<>();
for (var pair : pendingTxInputOutputListCache) {
EventMetadata metadata = pair.getFirst();
TxInputOutput txInputOutput = pair.getSecond();

var addrAmountEntitiesForTx = processAddressAmountForTx(metadata, txInputOutput, true);

if (addrAmountEntitiesForTx != null) {
addressTxAmountList.addAll(addrAmountEntitiesForTx);
}
}

if (addressTxAmountList.size() > 0) {
addressTxAmountListCache.addAll(addressTxAmountList);
}

long t1 = System.currentTimeMillis();
if (addressTxAmountListCache.size() > 0) {
addressTxAmountStorage.save(addressTxAmountListCache);
}

long t2 = System.currentTimeMillis();
log.info("Time taken to save additional address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1));

} finally {
pendingTxInputOutputListCache.clear();
addressTxAmountListCache.clear();
}
}

@EventListener
@Transactional
public void handleRollback(RollbackEvent rollbackEvent) {
int addressTxAmountDeleted = addressTxAmountStorage.deleteAddressBalanceBySlotGreaterThan(rollbackEvent.getRollbackTo().getSlot());

log.info("Rollback -- {} address_tx_amounts records", addressTxAmountDeleted);
}

record AssetDetails(String policy, String assetName) {
}

record AddressDetails(String ownerStakeAddress, String ownerPaymentCredential, String ownerStakeCredential) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.cardanofoundation.ledgersync.account.processor;

import com.bloxbean.cardano.client.address.Address;
import com.bloxbean.cardano.client.address.AddressProvider;
import com.bloxbean.cardano.yaci.core.util.HexUtil;
import com.bloxbean.cardano.yaci.store.events.GenesisBlockEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount;
import org.cardanofoundation.ledgersync.account.storage.AddressTxAmountStorage;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;

import static com.bloxbean.cardano.yaci.core.util.Constants.LOVELACE;
import static org.cardanofoundation.ledgersync.account.util.AddressUtil.getAddress;

@Component
@RequiredArgsConstructor
@Slf4j
public class GensisBlockAddressTxAmtProcessor {
private final AddressTxAmountStorage addressTxAmountStorage;

@EventListener
@Transactional
public void handleAddressTxAmtForGenesisBlock(GenesisBlockEvent genesisBlockEvent) {
var genesisBalances = genesisBlockEvent.getGenesisBalances();
if (genesisBalances == null || genesisBalances.isEmpty()) {
log.info("No genesis balances found");
return;
}

List<AddressTxAmount> genesisAddressTxAmtList = new ArrayList<>();
for (var genesisBalance : genesisBalances) {
var receiverAddress = genesisBalance.getAddress();
var txnHash = genesisBalance.getTxnHash();
var balance = genesisBalance.getBalance();

if (balance == null || balance.compareTo(BigInteger.ZERO) == 0) {
continue;
}

//address and full address if the address is too long
var addressTuple = getAddress(receiverAddress);

String ownerPaymentCredential = null;
String stakeAddress = null;
if (genesisBalance.getAddress() != null &&
genesisBalance.getAddress().startsWith("addr")) { //If shelley address
try {
Address address = new Address(genesisBalance.getAddress());
ownerPaymentCredential = address.getPaymentCredential().map(paymentKeyHash -> HexUtil.encodeHexString(paymentKeyHash.getBytes()))
.orElse(null);
stakeAddress = address.getDelegationCredential().map(delegationHash -> AddressProvider.getStakeAddress(address).toBech32())
.orElse(null);
} catch (Exception e) {
log.warn("Error getting address details: " + genesisBalance.getAddress());
}
}

var addressTxAmount = AddressTxAmount.builder()
.address(addressTuple._1)
.unit(LOVELACE)
.txHash(txnHash)
.slot(genesisBlockEvent.getSlot())
.quantity(balance)
.stakeAddress(stakeAddress)
.epoch(0)
.blockNumber(genesisBlockEvent.getBlock())
.blockTime(genesisBlockEvent.getBlockTime())
.build();

genesisAddressTxAmtList.add(addressTxAmount);
}

if (genesisAddressTxAmtList.size() > 0) {
addressTxAmountStorage.save(genesisAddressTxAmtList);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.cardanofoundation.ledgersync.account.storage;

import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount;

import java.util.List;

public interface AddressTxAmountStorage {
void save(List<AddressTxAmount> addressTxAmount);
int deleteAddressBalanceBySlotGreaterThan(Long slot);
}
Loading
Loading