From 8476296c9ac9ed4dedb8d2b74556cf709abbbdac Mon Sep 17 00:00:00 2001 From: Satya Date: Wed, 6 Mar 2024 00:42:06 +0800 Subject: [PATCH 01/29] feat: Account balance application --- aggregates/account/build.gradle | 25 ++ .../account/model/AddressTxAmountEntity.java | 61 +++++ .../account/model/AddressTxAmountId.java | 20 ++ .../processor/AddressTxAmountProcessor.java | 218 ++++++++++++++++++ .../repository/AddressTxAmountRepository.java | 11 + .../resources/db/account/V2_1_1__init.sql | 20 ++ aggregates/build.gradle | 3 + aggregation-app/build.gradle | 38 +++ .../app/AggregationApplication.java | 20 ++ .../src/main/resources/application.yml | 33 +++ gradle/libs.versions.toml | 6 +- settings.gradle | 2 + 12 files changed, 455 insertions(+), 2 deletions(-) create mode 100644 aggregates/account/build.gradle create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountEntity.java create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountId.java create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/repository/AddressTxAmountRepository.java create mode 100644 aggregates/account/src/main/resources/db/account/V2_1_1__init.sql create mode 100644 aggregates/build.gradle create mode 100644 aggregation-app/build.gradle create mode 100644 aggregation-app/src/main/java/org/cardanofoundation/ledgersync/aggregation/app/AggregationApplication.java create mode 100644 aggregation-app/src/main/resources/application.yml diff --git a/aggregates/account/build.gradle b/aggregates/account/build.gradle new file mode 100644 index 00000000..9f035dce --- /dev/null +++ b/aggregates/account/build.gradle @@ -0,0 +1,25 @@ +dependencies { + implementation project(":components:common") + implementation 'org.springframework.boot:spring-boot-starter-data-jpa' + implementation 'org.springframework.boot:spring-boot-starter-webflux' + implementation(libs.cardano.client.lib) + implementation(libs.yaci.store.starter) + implementation(libs.yaci.store.utxo.starter) + implementation(libs.yaci.store.account.starter) + + compileOnly(libs.lombok) + annotationProcessor(libs.lombok) + + testImplementation 'org.springframework.boot:spring-boot-starter-test' +} + +publishing { + publications { + mavenJava(MavenPublication) { + pom { + name = 'Ledger Sync Account' + description = 'Ledger Sync Account Module' + } + } + } +} diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountEntity.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountEntity.java new file mode 100644 index 00000000..130c934e --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountEntity.java @@ -0,0 +1,61 @@ +package org.cardanofoundation.ledgersync.account.model; + +import com.bloxbean.cardano.yaci.store.common.model.BlockAwareEntity; +import jakarta.persistence.*; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.hibernate.annotations.DynamicUpdate; + +import java.math.BigInteger; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@SuperBuilder +@Entity +@Table(name = "address_tx_amount") +@IdClass(AddressTxAmountId.class) +@DynamicUpdate +public class AddressTxAmountEntity extends BlockAwareEntity { + @Id + @Column(name = "address") + private String address; + + @Id + @Column(name = "unit") + private String unit; + + @Column(name = "tx_hash") + private String txHash; + + @Column(name = "slot") + private Long slot; + + @Column(name = "quantity") + private BigInteger quantity; + + //Only set if address doesn't fit in ownerAddr field. Required for few Byron Era addr + @Column(name = "addr_full") + private String addrFull; + + @Column(name = "policy") + private String policy; + + @Column(name = "asset_name") + private String assetName; + + @Column(name = "payment_credential") + private String paymentCredential; + + @Column(name = "stake_address") + private String stakeAddress; + + @Column(name = "block_hash") + private String blockHash; + + @Column(name = "epoch") + private Integer epoch; + +} diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountId.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountId.java new file mode 100644 index 00000000..d6cf16fd --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountId.java @@ -0,0 +1,20 @@ +package org.cardanofoundation.ledgersync.account.model; + +import jakarta.persistence.Column; +import lombok.*; + +import java.io.Serializable; + +@Getter +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode +@Builder +public class AddressTxAmountId implements Serializable { + @Column(name = "address") + private String address; + @Column(name = "unit") + private String unit; + @Column(name = "tx_hash") + private String txHash; +} diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java new file mode 100644 index 00000000..233be290 --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -0,0 +1,218 @@ +package org.cardanofoundation.ledgersync.account.processor; + +import com.bloxbean.cardano.yaci.store.client.utxo.UtxoClient; +import com.bloxbean.cardano.yaci.store.common.domain.AddressUtxo; +import com.bloxbean.cardano.yaci.store.common.domain.Amt; +import com.bloxbean.cardano.yaci.store.common.domain.UtxoKey; +import com.bloxbean.cardano.yaci.store.common.util.Tuple; +import com.bloxbean.cardano.yaci.store.events.EventMetadata; +import com.bloxbean.cardano.yaci.store.events.RollbackEvent; +import com.bloxbean.cardano.yaci.store.events.internal.ReadyForBalanceAggregationEvent; +import com.bloxbean.cardano.yaci.store.utxo.domain.AddressUtxoEvent; +import com.bloxbean.cardano.yaci.store.utxo.domain.TxInputOutput; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.cardanofoundation.ledgersync.account.model.AddressTxAmountEntity; +import org.cardanofoundation.ledgersync.account.repository.AddressTxAmountRepository; +import org.springframework.context.event.EventListener; +import org.springframework.data.util.Pair; +import org.springframework.stereotype.Component; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; + +import java.math.BigInteger; +import java.util.*; + +@Component +@RequiredArgsConstructor +@Slf4j +public class AddressTxAmountProcessor { + private static final int MAX_ADDR_SIZE = 500; //Required for Byron Addresses + + private final AddressTxAmountRepository addressTxAmountRepository; + private final UtxoClient utxoClient; + + private List> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); + + private final PlatformTransactionManager transactionManager; + private TransactionTemplate transactionTemplate; + + @PostConstruct + void init() { + transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + } + + @EventListener + @Transactional + public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { + //Ignore Genesis Txs for now -- TODO + if (addressUtxoEvent.getEventMetadata().getSlot() == -1) + return; + + var txInputOutputList = addressUtxoEvent.getTxInputOutputs(); + if (txInputOutputList == null || txInputOutputList.isEmpty()) + return; + + List addressTxAmountEntities = new ArrayList<>(); + + for (var txInputOutput : txInputOutputList) { + var txAddressTxAmountEntities = processAddressAmountForTx(addressUtxoEvent.getEventMetadata(), txInputOutput, false); + if (txAddressTxAmountEntities == null) continue; + + addressTxAmountEntities.addAll(txAddressTxAmountEntities); + } + + if (addressTxAmountEntities.size() > 0) { + addressTxAmountRepository.saveAll(addressTxAmountEntities); + } + } + + private List processAddressAmountForTx(EventMetadata metadata, TxInputOutput txInputOutput, + boolean throwExceptionOnFailure) { + var txHash = txInputOutput.getTxHash(); + var inputs = txInputOutput.getInputs(); + var outputs = txInputOutput.getOutputs(); + if (inputs == null || inputs.isEmpty() || outputs == null || outputs.isEmpty()) + return null; + + var inputUtxoKeys = inputs.stream() + .map(input -> new UtxoKey(input.getTxHash(), input.getOutputIndex())) + .toList(); + + var inputAddressUtxos = utxoClient.getUtxosByIds(inputUtxoKeys) + .stream() + .filter(Objects::nonNull) + .toList(); + + if (inputAddressUtxos.size() != inputUtxoKeys.size()) { + log.debug("Unable to get inputs for all input keys for account balance calculation. Add this Tx to cache to process later : " + txHash); + if (throwExceptionOnFailure) + throw new IllegalStateException("Unable to get inputs for all input keys for account balance calculation : " + inputUtxoKeys); + else + txInputOutputListCache.add(Pair.of(metadata, txInputOutput)); + } + + var txAddressTxAmountEntities = + processTxAmount(txHash, metadata, inputAddressUtxos, outputs); + return txAddressTxAmountEntities; + } + + private List processTxAmount(String txHash, EventMetadata metadata, List inputs, List outputs) { + Map, BigInteger> addressTxAmountMap = new HashMap<>(); + Map addressToAddressDetailsMap = new HashMap<>(); + Map unitToAssetDetailsMap = new HashMap<>(); + + //Subtract input amounts + for (var input : inputs) { + for (Amt amt : input.getAmounts()) { + var key = Pair.of(input.getOwnerAddr(), amt.getUnit()); + var amount = addressTxAmountMap.getOrDefault(key, BigInteger.ZERO); + amount = amount.subtract(amt.getQuantity()); + addressTxAmountMap.put(key, amount); + + var addressDetails = new AddressDetails(input.getOwnerStakeAddr(), input.getOwnerPaymentCredential(), input.getOwnerStakeCredential()); + addressToAddressDetailsMap.put(input.getOwnerAddr(), addressDetails); + + var assetDetails = new AssetDetails(amt.getPolicyId(), amt.getAssetName()); + unitToAssetDetailsMap.put(amt.getUnit(), assetDetails); + } + } + + //Add output amounts + for (var output : outputs) { + for (Amt amt : output.getAmounts()) { + var key = Pair.of(output.getOwnerAddr(), amt.getUnit()); + var amount = addressTxAmountMap.getOrDefault(key, BigInteger.ZERO); + amount = amount.add(amt.getQuantity()); + addressTxAmountMap.put(key, amount); + + var addressDetails = new AddressDetails(output.getOwnerStakeAddr(), output.getOwnerPaymentCredential(), output.getOwnerStakeCredential()); + addressToAddressDetailsMap.put(output.getOwnerAddr(), addressDetails); + + var assetDetails = new AssetDetails(amt.getPolicyId(), amt.getAssetName()); + unitToAssetDetailsMap.put(amt.getUnit(), assetDetails); + } + } + + return (List) addressTxAmountMap.entrySet() + .stream() + .map(entry -> { + var addressDetails = addressToAddressDetailsMap.get(entry.getKey().getFirst()); + var assetDetails = unitToAssetDetailsMap.get(entry.getKey().getSecond()); + + //address and full address if the address is too long + var addressTuple = getAddress(entry.getKey().getFirst()); + + return AddressTxAmountEntity.builder() + .address(addressTuple._1) + .unit(entry.getKey().getSecond()) + .txHash(txHash) + .slot(metadata.getSlot()) + .quantity(entry.getValue()) + .addrFull(addressTuple._2) + .stakeAddress(addressDetails.ownerStakeAddress) + .assetName(assetDetails.assetName) + .policy(assetDetails.policy) + .paymentCredential(addressDetails.ownerPaymentCredential) + .epoch(metadata.getEpochNumber()) + .blockNumber(metadata.getBlock()) + .blockHash(metadata.getBlockHash()) + .blockTime(metadata.getBlockTime()) + .build(); + }).toList(); + } + + //Return the address and full address if the address is too long + //Using Tuple as Pair doesn't allow null values + private Tuple getAddress(String address) { + if (address != null && address.length() > MAX_ADDR_SIZE) { + String addr = address.substring(0, MAX_ADDR_SIZE); + String fullAddr = address; + return new Tuple<>(addr, fullAddr); + } else { + return new Tuple<>(address, null); + } + } + + @EventListener + @Transactional //We can also listen to CommitEvent here + public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyForBalanceAggregationEvent) { + try { + List addressTxAmountEntities = new ArrayList<>(); + for (var pair : txInputOutputListCache) { + EventMetadata metadata = pair.getFirst(); + TxInputOutput txInputOutput = pair.getSecond(); + + var addrAmountEntitiesForTx = processAddressAmountForTx(metadata, txInputOutput, true); + + if (addrAmountEntitiesForTx != null) { + addressTxAmountEntities.addAll(addrAmountEntitiesForTx); + } + } + + if (addressTxAmountEntities.size() > 0) { + addressTxAmountRepository.saveAll(addressTxAmountEntities); + } + } finally { + txInputOutputListCache.clear(); + } + } + + @EventListener + @Transactional + public void handleRollback(RollbackEvent rollbackEvent) { + int addressTxAmountDeleted = addressTxAmountRepository.deleteAddressBalanceBySlotGreaterThan(rollbackEvent.getRollbackTo().getSlot()); + + log.info("Rollback -- {} address_tx_amounts records", addressTxAmountDeleted); + } + + record AssetDetails(String policy, String assetName) { + } + + record AddressDetails(String ownerStakeAddress, String ownerPaymentCredential, String ownerStakeCredential) { + } +} diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/repository/AddressTxAmountRepository.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/repository/AddressTxAmountRepository.java new file mode 100644 index 00000000..04f4032d --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/repository/AddressTxAmountRepository.java @@ -0,0 +1,11 @@ +package org.cardanofoundation.ledgersync.account.repository; + +import org.cardanofoundation.ledgersync.account.model.AddressTxAmountEntity; +import org.cardanofoundation.ledgersync.account.model.AddressTxAmountId; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface AddressTxAmountRepository extends JpaRepository { + int deleteAddressBalanceBySlotGreaterThan(long slot); +} diff --git a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql new file mode 100644 index 00000000..dfa0ffb9 --- /dev/null +++ b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql @@ -0,0 +1,20 @@ +drop table if exists address_tx_amount; +create table address_tx_amount +( + address varchar(500), + unit varchar(255), + tx_hash varchar(64), + slot bigint, + quantity numeric(38) null, + addr_full text, + policy varchar(56), + asset_name varchar(255), + payment_credential varchar(56), + stake_address varchar(255), + block_hash varchar(64), + block bigint, + block_time bigint, + epoch integer, + update_datetime timestamp, + primary key (address, unit, tx_hash) +); diff --git a/aggregates/build.gradle b/aggregates/build.gradle new file mode 100644 index 00000000..8e6ee257 --- /dev/null +++ b/aggregates/build.gradle @@ -0,0 +1,3 @@ +subprojects { + apply from: "../../publish-common.gradle" +} diff --git a/aggregation-app/build.gradle b/aggregation-app/build.gradle new file mode 100644 index 00000000..77a7ea0d --- /dev/null +++ b/aggregation-app/build.gradle @@ -0,0 +1,38 @@ +plugins { + id 'org.springframework.boot' version '3.2.2' +} + +dependencies { + implementation 'org.springframework.boot:spring-boot-starter' + implementation 'org.springframework.boot:spring-boot-starter-validation' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa' + implementation 'org.springframework.boot:spring-boot-starter-webflux' + implementation 'org.springframework.boot:spring-boot-configuration-processor' + implementation 'org.springframework.boot:spring-boot-starter-actuator' + + implementation(libs.yaci.store.starter) + implementation(libs.yaci.store.utxo.starter) + implementation(libs.yaci.store.account.starter) + + implementation project(':aggregates:account') + runtimeOnly 'org.postgresql:postgresql' + + compileOnly(libs.lombok) + annotationProcessor(libs.lombok) + annotationProcessor(libs.lombok.mapstruct.binding) + annotationProcessor(libs.mapstruct.processor) + + testImplementation 'org.springframework.boot:spring-boot-starter-test' + testImplementation('org.hsqldb:hsqldb') + testImplementation(libs.pitest) + testCompileOnly(libs.lombok) + testAnnotationProcessor(libs.lombok) +} + +compileJava { + options.compilerArgs += ['-Amapstruct.defaultComponentModel=spring'] +} + +jar { + enabled = false +} diff --git a/aggregation-app/src/main/java/org/cardanofoundation/ledgersync/aggregation/app/AggregationApplication.java b/aggregation-app/src/main/java/org/cardanofoundation/ledgersync/aggregation/app/AggregationApplication.java new file mode 100644 index 00000000..25e4dc05 --- /dev/null +++ b/aggregation-app/src/main/java/org/cardanofoundation/ledgersync/aggregation/app/AggregationApplication.java @@ -0,0 +1,20 @@ +package org.cardanofoundation.ledgersync.aggregation.app; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; + +@SpringBootApplication +@ComponentScan(basePackages = "org.cardanofoundation.*") +@ComponentScan(basePackages = "org.cardanofoundation.*") +@EntityScan("org.cardanofoundation.*") +@EnableJpaRepositories("org.cardanofoundation.*") +public class AggregationApplication { + + public static void main(String[] args) { + SpringApplication.run(AggregationApplication.class, args); + } + +} diff --git a/aggregation-app/src/main/resources/application.yml b/aggregation-app/src/main/resources/application.yml new file mode 100644 index 00000000..8ca9f767 --- /dev/null +++ b/aggregation-app/src/main/resources/application.yml @@ -0,0 +1,33 @@ +spring: +# banner: +# location: classpath:/banner.txt + application: + name: Ledger Sync Aggregation App + + flyway: + locations: + - classpath:db/store/{vendor} + - classpath:db/account + out-of-order: true + +apiPrefix: /api/v1 + +logging: + file: + name: ./logs/yaci-store.log + +store: + event-publisher-id: 1000 + account: + enabled: true + balance-aggregation-enabled: true + history-cleanup-enabled: false + api-enabled: true + executor: + enable-parallel-processing: true + block-processing-threads: 15 + event-processing-threads: 30 + blocks-batch-size: 100 + blocks-partition-size: 10 + use-virtual-thread-for-batch-processing: false + use-virtual-thread-for-event-processing: true diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index bce986b3..3bb64543 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,8 +1,10 @@ [libraries] yaci-store-starter="com.bloxbean.cardano:yaci-store-spring-boot-starter:0.1.0-rc2-891f739-SNAPSHOT" +yaci-store-utxo-starter="com.bloxbean.cardano:yaci-store-utxo-spring-boot-starter:0.1.0-rc2-891f739-SNAPSHOT" +yaci-store-account-starter="com.bloxbean.cardano:yaci-store-account-spring-boot-starter:0.1.0-rc2-891f739-SNAPSHOT" yaci-store-remote-starter="com.bloxbean.cardano:yaci-store-remote-spring-boot-starter:0.1.0-rc2-891f739-SNAPSHOT" -cardano-client-lib="com.bloxbean.cardano:cardano-client-lib:0.5.0" +cardano-client-lib="com.bloxbean.cardano:cardano-client-lib:0.5.1" snakeyaml="org.yaml:snakeyaml:2.0" guava="com.google.guava:guava:33.0.0-jre" log4j-api="org.apache.logging.log4j:log4j-api:2.22.1" @@ -26,4 +28,4 @@ junit-engine="org.junit.jupiter:junit-jupiter-engine:5.9.0" junit-api="org.junit.jupiter:junit-jupiter-api:5.9.0" assertj-core="org.assertj:assertj-core:3.25.1" -hypersistence-utils-hibernate="io.hypersistence:hypersistence-utils-hibernate-63:3.7.1" \ No newline at end of file +hypersistence-utils-hibernate="io.hypersistence:hypersistence-utils-hibernate-63:3.7.1" diff --git a/settings.gradle b/settings.gradle index 99927d67..37a4f1aa 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,7 +10,9 @@ rootProject.name = 'ledger-sync' include 'components:common' include 'components:consumer-common' include 'components:scheduler' +include 'aggregates:account' include 'application' include 'scheduler-app' include 'streamer-app' +include 'aggregation-app' From 3edf97b60b52e007903f78e8f943668dc68900d6 Mon Sep 17 00:00:00 2001 From: Satya Date: Wed, 6 Mar 2024 08:30:14 +0800 Subject: [PATCH 02/29] chore: Dummy invalidtransaction storage --- .../DummyInvalidTransactionStorage.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/DummyInvalidTransactionStorage.java diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/DummyInvalidTransactionStorage.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/DummyInvalidTransactionStorage.java new file mode 100644 index 00000000..0694b0ad --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/DummyInvalidTransactionStorage.java @@ -0,0 +1,21 @@ +package org.cardanofoundation.ledgersync.account; + +import com.bloxbean.cardano.yaci.store.utxo.domain.InvalidTransaction; +import com.bloxbean.cardano.yaci.store.utxo.storage.impl.InvalidTransactionStorageImpl; +import com.bloxbean.cardano.yaci.store.utxo.storage.impl.repository.InvalidTransactionRepository; +import org.springframework.stereotype.Component; + +//TODO -- remove this class later +//Overrides the save method to do nothing to fix the NUL char issue in invalid transaction +//https://github.com/bloxbean/yaci-store/issues/209 +@Component +public class DummyInvalidTransactionStorage extends InvalidTransactionStorageImpl { + public DummyInvalidTransactionStorage(InvalidTransactionRepository repository) { + super(repository); + } + + @Override + public InvalidTransaction save(InvalidTransaction invalidTransaction) { + return invalidTransaction; //do nothing + } +} From 1e4c492790dce315c79a4cd0c92ed77402f89a02 Mon Sep 17 00:00:00 2001 From: Satya Date: Wed, 6 Mar 2024 12:20:23 +0800 Subject: [PATCH 03/29] chore: Update yaci store dependency and auto index management --- .../aggregation/app/DBIndexService.java | 106 ++++++++++++++++++ .../src/main/resources/application.yml | 10 +- aggregation-app/src/main/resources/banner.txt | 12 ++ .../src/main/resources/sql/drop-index.sql | 23 ++++ gradle/libs.versions.toml | 8 +- 5 files changed, 152 insertions(+), 7 deletions(-) create mode 100644 aggregation-app/src/main/java/org/cardanofoundation/ledgersync/aggregation/app/DBIndexService.java create mode 100644 aggregation-app/src/main/resources/banner.txt create mode 100644 aggregation-app/src/main/resources/sql/drop-index.sql diff --git a/aggregation-app/src/main/java/org/cardanofoundation/ledgersync/aggregation/app/DBIndexService.java b/aggregation-app/src/main/java/org/cardanofoundation/ledgersync/aggregation/app/DBIndexService.java new file mode 100644 index 00000000..bd1175e9 --- /dev/null +++ b/aggregation-app/src/main/java/org/cardanofoundation/ledgersync/aggregation/app/DBIndexService.java @@ -0,0 +1,106 @@ +package org.cardanofoundation.ledgersync.aggregation.app; + +import com.bloxbean.cardano.yaci.store.events.BlockHeaderEvent; +import com.bloxbean.cardano.yaci.store.events.ByronMainBlockEvent; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.event.EventListener; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import javax.sql.DataSource; +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicBoolean; + +@Component +@RequiredArgsConstructor +@Slf4j +@ConditionalOnProperty(name = "store.auto-index-management", havingValue = "true", matchIfMissing = false) +public class DBIndexService { + private final DataSource dataSource; + + private AtomicBoolean indexRemoved = new AtomicBoolean(false); + private AtomicBoolean indexApplied = new AtomicBoolean(false); + + @PostConstruct + public void init() { + log.info("<< Enable DBIndexService >>"); + } + + @EventListener + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void handleFirstBlockEvent(BlockHeaderEvent blockHeaderEvent) { + if (indexRemoved.get() || blockHeaderEvent.getMetadata().getBlock() > 1 + || blockHeaderEvent.getMetadata().isSyncMode()) + return; + + runDeleteIndexes(); + } + + @EventListener + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void handleFirstBlockEventToCreateIndex(BlockHeaderEvent blockHeaderEvent) { + if (blockHeaderEvent.getMetadata().isSyncMode() && !indexApplied.get()) { + if (blockHeaderEvent.getMetadata().getBlock() < 50000) { + reApplyIndexes(); + } else { + log.info("<< I can't manage the creation of automatic indexes because the number of actual blocks in database exceeds the # of blocks threshold for automatic index application >>"); + log.info("Please manually reapply the required indexes if not done yet. For more details, refer to the 'create-index.sql' file !!!"); + indexApplied.set(true); + } + } + } + + @EventListener + @Transactional(propagation = Propagation.REQUIRES_NEW) + @SneakyThrows + public void handleFirstBlockEvent(ByronMainBlockEvent byronMainBlockEvent) { + if (indexRemoved.get() || byronMainBlockEvent.getMetadata().getBlock() > 1 + || byronMainBlockEvent.getMetadata().isSyncMode()) + return; + + runDeleteIndexes(); + } + + private void runDeleteIndexes() { + try { + String scriptPath = "sql/drop-index.sql"; + + log.info("Deleting optional indexes to speed-up the sync process ..... " + scriptPath); + indexRemoved.set(true); + ResourceDatabasePopulator populator = new ResourceDatabasePopulator(); + populator.addScripts( + new ClassPathResource(scriptPath)); + populator.execute(this.dataSource); + + log.info("Optional indexes have been removed successfully."); + } catch (Exception e) { + log.error("Index deletion failed.", e); + } + } + + private void reApplyIndexes() { + try { + String scriptPath = "sql/create-index.sql"; + + log.info("Re-applying optional indexes after sync process ..... " + scriptPath); + indexApplied.set(true); + + ResourceDatabasePopulator populator = new ResourceDatabasePopulator(); + populator.addScripts( + new ClassPathResource(scriptPath)); + populator.execute(this.dataSource); + + log.info("Optional indexes have been re-applied successfully."); + } catch (Exception e) { + log.error("Filed to re-apply indexes.", e); + } + } + +} diff --git a/aggregation-app/src/main/resources/application.yml b/aggregation-app/src/main/resources/application.yml index 8ca9f767..c8bff9bd 100644 --- a/aggregation-app/src/main/resources/application.yml +++ b/aggregation-app/src/main/resources/application.yml @@ -1,6 +1,6 @@ spring: -# banner: -# location: classpath:/banner.txt + banner: + location: classpath:/banner.txt application: name: Ledger Sync Aggregation App @@ -14,15 +14,19 @@ apiPrefix: /api/v1 logging: file: - name: ./logs/yaci-store.log + name: ./logs/ls-aggregation.log store: event-publisher-id: 1000 + auto-index-management: true account: enabled: true balance-aggregation-enabled: true history-cleanup-enabled: false api-enabled: true + parallel-write: true + per-thread-batch-size: 6000 + jooq-write-batch-size: 3000 executor: enable-parallel-processing: true block-processing-threads: 15 diff --git a/aggregation-app/src/main/resources/banner.txt b/aggregation-app/src/main/resources/banner.txt new file mode 100644 index 00000000..d201825b --- /dev/null +++ b/aggregation-app/src/main/resources/banner.txt @@ -0,0 +1,12 @@ + █████╗ ██████╗ ██████╗ ██████╗ ███████╗ ██████╗ █████╗ ████████╗██╗ ██████╗ ███╗ ██╗ █████╗ ██████╗ ██████╗ +██╔══██╗██╔════╝ ██╔════╝ ██╔══██╗██╔════╝██╔════╝ ██╔══██╗╚══██╔══╝██║██╔═══██╗████╗ ██║ ██╔══██╗██╔══██╗██╔══██╗ +███████║██║ ███╗██║ ███╗██████╔╝█████╗ ██║ ███╗███████║ ██║ ██║██║ ██║██╔██╗ ██║ ███████║██████╔╝██████╔╝ +██╔══██║██║ ██║██║ ██║██╔══██╗██╔══╝ ██║ ██║██╔══██║ ██║ ██║██║ ██║██║╚██╗██║ ██╔══██║██╔═══╝ ██╔═══╝ +██║ ██║╚██████╔╝╚██████╔╝██║ ██║███████╗╚██████╔╝██║ ██║ ██║ ██║╚██████╔╝██║ ╚████║ ██║ ██║██║ ██║ +╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚══════╝ ╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═╝ ╚═╝╚═╝ ╚═╝ + +██╗ ███████╗██████╗ ██████╗ ███████╗██████╗ ███████╗██╗ ██╗███╗ ██╗ ██████╗ +██║ ██╔════╝██╔══██╗██╔════╝ ██╔════╝██╔══██╗ ██╔════╝╚██╗ ██╔╝████╗ ██║██╔════╝ +██║ █████╗ ██║ ██║██║ ███╗█████╗ ██████╔╝ ███████╗ ╚████╔╝ ██╔██╗ ██║██║ +██║ ██╔══╝ ██║ ██║██║ ██║██╔══╝ ██╔══██╗ ╚════██║ ╚██╔╝ ██║╚██╗██║██║ +███████╗███████╗██████╔╝╚██████╔╝███████╗██║ ██║ ███████║ ██║ ██║ ╚████║╚██████╗ diff --git a/aggregation-app/src/main/resources/sql/drop-index.sql b/aggregation-app/src/main/resources/sql/drop-index.sql new file mode 100644 index 00000000..1d8903ed --- /dev/null +++ b/aggregation-app/src/main/resources/sql/drop-index.sql @@ -0,0 +1,23 @@ +-- set search_path to mainnet; + +-- utxo store +drop index idx_address_utxo_owner_addr; +drop index idx_address_utxo_owner_stake_addr; +drop index idx_address_utxo_owner_paykey_hash; +drop index idx_address_utxo_owner_stakekey_hash; +drop index idx_address_utxo_epoch; + +-- account balance +drop index idx_address_balance_address; +drop index idx_address_balance_block_time; +drop index idx_address_balance_epoch; +drop index idx_address_balance_unit; +drop index idx_address_balance_policy; +drop index idx_address_stake_address; +drop index idx_address_balance_policy_asset; + +-- stake address balance + +drop index idx_stake_addr_balance_stake_addr; +drop index idx_stake_addr_balance_block_time; +drop index idx_stake_addr_balance_epoch; diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3bb64543..66e490a7 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,8 +1,8 @@ [libraries] -yaci-store-starter="com.bloxbean.cardano:yaci-store-spring-boot-starter:0.1.0-rc2-891f739-SNAPSHOT" -yaci-store-utxo-starter="com.bloxbean.cardano:yaci-store-utxo-spring-boot-starter:0.1.0-rc2-891f739-SNAPSHOT" -yaci-store-account-starter="com.bloxbean.cardano:yaci-store-account-spring-boot-starter:0.1.0-rc2-891f739-SNAPSHOT" -yaci-store-remote-starter="com.bloxbean.cardano:yaci-store-remote-spring-boot-starter:0.1.0-rc2-891f739-SNAPSHOT" +yaci-store-starter="com.bloxbean.cardano:yaci-store-spring-boot-starter:0.1.0-rc2-94c9d24-SNAPSHOT" +yaci-store-utxo-starter="com.bloxbean.cardano:yaci-store-utxo-spring-boot-starter:0.1.0-rc2-94c9d24-SNAPSHOT" +yaci-store-account-starter="com.bloxbean.cardano:yaci-store-account-spring-boot-starter:0.1.0-rc2-94c9d24-SNAPSHOT" +yaci-store-remote-starter="com.bloxbean.cardano:yaci-store-remote-spring-boot-starter:0.1.0-rc2-94c9d24-SNAPSHOT" cardano-client-lib="com.bloxbean.cardano:cardano-client-lib:0.5.1" snakeyaml="org.yaml:snakeyaml:2.0" From ca5b51b648bf99d912b6eb52aa5a56a02f20c960 Mon Sep 17 00:00:00 2001 From: Satya Date: Wed, 6 Mar 2024 12:24:36 +0800 Subject: [PATCH 04/29] chore: Update yaci store dependency and auto index management --- .../src/main/resources/sql/create-index.sql | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 aggregation-app/src/main/resources/sql/create-index.sql diff --git a/aggregation-app/src/main/resources/sql/create-index.sql b/aggregation-app/src/main/resources/sql/create-index.sql new file mode 100644 index 00000000..73b40679 --- /dev/null +++ b/aggregation-app/src/main/resources/sql/create-index.sql @@ -0,0 +1,52 @@ +-- set search_path to mainnet; + +-- utxo store + +CREATE INDEX if not exists idx_address_utxo_owner_addr + ON address_utxo(owner_addr); + +CREATE INDEX if not exists idx_address_utxo_owner_stake_addr + ON address_utxo(owner_stake_addr); + +CREATE INDEX if not exists idx_address_utxo_owner_paykey_hash + ON address_utxo(owner_payment_credential); + +CREATE INDEX if not exists idx_address_utxo_owner_stakekey_hash + ON address_utxo(owner_stake_credential); + +CREATE INDEX if not exists idx_address_utxo_epoch + ON address_utxo(epoch); + +-- account balance + +CREATE INDEX if not exists idx_address_balance_address + ON address_balance (address); + +CREATE INDEX if not exists idx_address_balance_block_time + ON address_balance (block_time); + +CREATE INDEX if not exists idx_address_balance_epoch + ON address_balance (epoch); + +CREATE INDEX if not exists idx_address_balance_unit + ON address_balance (unit); + +CREATE INDEX if not exists idx_address_balance_policy + ON address_balance (policy); + +CREATE INDEX if not exists idx_address_stake_address + ON address_balance (stake_address); + +CREATE INDEX if not exists idx_address_balance_policy_asset + ON address_balance (policy, asset_name); + +-- stake address balance + +CREATE INDEX if not exists idx_stake_addr_balance_stake_addr + ON stake_address_balance (address); + +CREATE INDEX if not exists idx_stake_addr_balance_block_time + ON stake_address_balance (block_time); + +CREATE INDEX if not exists idx_stake_addr_balance_epoch + ON stake_address_balance (epoch); From 6a113378944187df80388484807c2d80478797ba Mon Sep 17 00:00:00 2001 From: Satya Date: Wed, 6 Mar 2024 13:01:58 +0800 Subject: [PATCH 05/29] chore: Add balance views --- .../resources/db/account/V2_1_1__init.sql | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql index dfa0ffb9..f656e670 100644 --- a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql +++ b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql @@ -5,7 +5,7 @@ create table address_tx_amount unit varchar(255), tx_hash varchar(64), slot bigint, - quantity numeric(38) null, + quantity numeric(38) null, addr_full text, policy varchar(56), asset_name varchar(255), @@ -18,3 +18,21 @@ create table address_tx_amount update_datetime timestamp, primary key (address, unit, tx_hash) ); + +-- address_balance_view +drop view if exists address_balance_view; +create view address_balance_view AS +select ab.* +from address_balance ab + inner join (select address, MAX(slot) as max_slot + from address_balance ab2 + group by address) max_ab on ab.address = max_ab.address and ab.slot = max_ab.max_slot; + +-- stake_address_balance_view +drop view if exists stake_address_balance_view; +create view stake_address_balance_view AS +select sb.* +from stake_address_balance sb + inner join (select address, MAX(slot) as max_slot + from stake_address_balance sb2 + group by address) max_sb on sb.address = max_sb.address and sb.slot = max_sb.max_slot; From 715bf5b0c0ca1451a132a189bb01251858848ee2 Mon Sep 17 00:00:00 2001 From: Satya Date: Wed, 6 Mar 2024 16:48:56 +0800 Subject: [PATCH 06/29] fix: address balance view query --- .../src/main/resources/db/account/V2_1_1__init.sql | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql index f656e670..47a67326 100644 --- a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql +++ b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql @@ -21,12 +21,13 @@ create table address_tx_amount -- address_balance_view drop view if exists address_balance_view; -create view address_balance_view AS -select ab.* -from address_balance ab - inner join (select address, MAX(slot) as max_slot - from address_balance ab2 - group by address) max_ab on ab.address = max_ab.address and ab.slot = max_ab.max_slot; +CREATE VIEW address_balance_view AS +SELECT ab.* +FROM address_balance ab + INNER JOIN (SELECT address, unit, MAX(slot) AS max_slot + FROM address_balance ab2 + GROUP BY address, unit) max_ab + ON ab.address = max_ab.address AND ab.unit = max_ab.unit AND ab.slot = max_ab.max_slot; -- stake_address_balance_view drop view if exists stake_address_balance_view; From 5fdf46b9e2a76f69a1c1563a4c17ce2bcbd3549b Mon Sep 17 00:00:00 2001 From: Satya Date: Wed, 6 Mar 2024 16:54:23 +0800 Subject: [PATCH 07/29] chore: Convert to lower case --- .../src/main/resources/db/account/V2_1_1__init.sql | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql index 47a67326..39afca89 100644 --- a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql +++ b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql @@ -21,13 +21,13 @@ create table address_tx_amount -- address_balance_view drop view if exists address_balance_view; -CREATE VIEW address_balance_view AS -SELECT ab.* -FROM address_balance ab - INNER JOIN (SELECT address, unit, MAX(slot) AS max_slot - FROM address_balance ab2 - GROUP BY address, unit) max_ab - ON ab.address = max_ab.address AND ab.unit = max_ab.unit AND ab.slot = max_ab.max_slot; +create view address_balance_view as +select ab.* +from address_balance ab + inner join (select address, unit, max(slot) as max_slot + from address_balance ab2 + group by address, unit) max_ab + on ab.address = max_ab.address and ab.unit = max_ab.unit and ab.slot = max_ab.max_slot; -- stake_address_balance_view drop view if exists stake_address_balance_view; From 1834d548a70f678d6d39e91b95184629f6d0a2a9 Mon Sep 17 00:00:00 2001 From: Satya Date: Wed, 6 Mar 2024 22:02:39 +0800 Subject: [PATCH 08/29] chore: Handle Genesis Utxos --- .../processor/AddressTxAmountProcessor.java | 18 +--- .../GensisBlockAddressTxAmtProcessor.java | 90 +++++++++++++++++++ .../ledgersync/account/util/AddressUtil.java | 19 ++++ 3 files changed, 112 insertions(+), 15 deletions(-) create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/util/AddressUtil.java diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 233be290..e907e81b 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -4,7 +4,6 @@ import com.bloxbean.cardano.yaci.store.common.domain.AddressUtxo; import com.bloxbean.cardano.yaci.store.common.domain.Amt; import com.bloxbean.cardano.yaci.store.common.domain.UtxoKey; -import com.bloxbean.cardano.yaci.store.common.util.Tuple; import com.bloxbean.cardano.yaci.store.events.EventMetadata; import com.bloxbean.cardano.yaci.store.events.RollbackEvent; import com.bloxbean.cardano.yaci.store.events.internal.ReadyForBalanceAggregationEvent; @@ -26,12 +25,12 @@ import java.math.BigInteger; import java.util.*; +import static org.cardanofoundation.ledgersync.account.util.AddressUtil.getAddress; + @Component @RequiredArgsConstructor @Slf4j public class AddressTxAmountProcessor { - private static final int MAX_ADDR_SIZE = 500; //Required for Byron Addresses - private final AddressTxAmountRepository addressTxAmountRepository; private final UtxoClient utxoClient; @@ -140,6 +139,7 @@ private List processTxAmount(String txHash, EventMetadata return (List) addressTxAmountMap.entrySet() .stream() + .filter(entry -> entry.getValue().compareTo(BigInteger.ZERO) != 0) .map(entry -> { var addressDetails = addressToAddressDetailsMap.get(entry.getKey().getFirst()); var assetDetails = unitToAssetDetailsMap.get(entry.getKey().getSecond()); @@ -166,18 +166,6 @@ private List processTxAmount(String txHash, EventMetadata }).toList(); } - //Return the address and full address if the address is too long - //Using Tuple as Pair doesn't allow null values - private Tuple getAddress(String address) { - if (address != null && address.length() > MAX_ADDR_SIZE) { - String addr = address.substring(0, MAX_ADDR_SIZE); - String fullAddr = address; - return new Tuple<>(addr, fullAddr); - } else { - return new Tuple<>(address, null); - } - } - @EventListener @Transactional //We can also listen to CommitEvent here public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyForBalanceAggregationEvent) { diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java new file mode 100644 index 00000000..d58cfb59 --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java @@ -0,0 +1,90 @@ +package org.cardanofoundation.ledgersync.account.processor; + +import com.bloxbean.cardano.client.address.Address; +import com.bloxbean.cardano.client.address.AddressProvider; +import com.bloxbean.cardano.yaci.core.util.HexUtil; +import com.bloxbean.cardano.yaci.store.events.GenesisBlockEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.cardanofoundation.ledgersync.account.model.AddressTxAmountEntity; +import org.cardanofoundation.ledgersync.account.repository.AddressTxAmountRepository; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +import static com.bloxbean.cardano.yaci.core.util.Constants.LOVELACE; +import static org.cardanofoundation.ledgersync.account.util.AddressUtil.getAddress; + +@Component +@RequiredArgsConstructor +@Slf4j +public class GensisBlockAddressTxAmtProcessor { + private final AddressTxAmountRepository addressTxAmountRepository; + + @EventListener + @Transactional + public void handleAddressTxAmtForGenesisBlock(GenesisBlockEvent genesisBlockEvent) { + var genesisBalances = genesisBlockEvent.getGenesisBalances(); + if (genesisBalances == null || genesisBalances.isEmpty()) { + log.info("No genesis balances found"); + return; + } + + List genesisAddressTxAmtList = new ArrayList<>(); + for (var genesisBalance : genesisBalances) { + var receiverAddress = genesisBalance.getAddress(); + var txnHash = genesisBalance.getTxnHash(); + var balance = genesisBalance.getBalance(); + + if (balance == null || balance.compareTo(BigInteger.ZERO) == 0) { + continue; + } + + //address and full address if the address is too long + var addressTuple = getAddress(receiverAddress); + + String ownerPaymentCredential = null; + String stakeAddress = null; + if (genesisBalance.getAddress() != null && + genesisBalance.getAddress().startsWith("addr")) { //If shelley address + try { + Address address = new Address(genesisBalance.getAddress()); + ownerPaymentCredential = address.getPaymentCredential().map(paymentKeyHash -> HexUtil.encodeHexString(paymentKeyHash.getBytes())) + .orElse(null); + stakeAddress = address.getDelegationCredential().map(delegationHash -> AddressProvider.getStakeAddress(address).toBech32()) + .orElse(null); + } catch (Exception e) { + log.warn("Error getting address details: " + genesisBalance.getAddress()); + } + } + + var addressTxAmountEntity = AddressTxAmountEntity.builder() + .address(addressTuple._1) + .unit(LOVELACE) + .txHash(txnHash) + .slot(genesisBlockEvent.getSlot()) + .quantity(balance) + .addrFull(addressTuple._2) + .stakeAddress(stakeAddress) + .assetName(LOVELACE) + .policy(null) + .paymentCredential(ownerPaymentCredential) + .epoch(0) + .blockNumber(genesisBlockEvent.getBlock()) + .blockHash(genesisBlockEvent.getBlockHash()) + .blockTime(genesisBlockEvent.getBlockTime()) + .build(); + + genesisAddressTxAmtList.add(addressTxAmountEntity); + } + + if (genesisAddressTxAmtList.size() > 0) { + addressTxAmountRepository.saveAll(genesisAddressTxAmtList); + } + } + +} diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/util/AddressUtil.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/util/AddressUtil.java new file mode 100644 index 00000000..dec22558 --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/util/AddressUtil.java @@ -0,0 +1,19 @@ +package org.cardanofoundation.ledgersync.account.util; + +import com.bloxbean.cardano.yaci.store.common.util.Tuple; + +public class AddressUtil { + private static final int MAX_ADDR_SIZE = 500; //Required for Byron Addresses + + //Return the address and full address if the address is too long + //Using Tuple as Pair doesn't allow null values + public static Tuple getAddress(String address) { + if (address != null && address.length() > MAX_ADDR_SIZE) { + String addr = address.substring(0, MAX_ADDR_SIZE); + String fullAddr = address; + return new Tuple<>(addr, fullAddr); + } else { + return new Tuple<>(address, null); + } + } +} From f26fcbb4eda08d3868a18037e06527566380b0d4 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 10:58:44 +0800 Subject: [PATCH 09/29] chore: Add index for slot --- .../account/src/main/resources/db/account/V2_1_1__init.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql index 39afca89..a8b4539e 100644 --- a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql +++ b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql @@ -19,6 +19,9 @@ create table address_tx_amount primary key (address, unit, tx_hash) ); +CREATE INDEX idx_address_tx_amount_slot + ON address_tx_amount(slot); + -- address_balance_view drop view if exists address_balance_view; create view address_balance_view as From 7da972b8c0332779411acc1480d5c64d412185aa Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 21:53:26 +0800 Subject: [PATCH 10/29] chore: Replace JPA with JOOQ for AddressTxAmount --- aggregates/account/build.gradle | 5 +- .../account/domain/AddressTxAmount.java | 33 ++++ .../processor/AddressTxAmountProcessor.java | 56 ++++--- .../GensisBlockAddressTxAmtProcessor.java | 15 +- .../storage/AddressTxAmountStorage.java | 10 ++ .../impl/AddressTxAmountStorageImpl.java | 142 ++++++++++++++++++ .../storage/impl/mapper/AggrMapper.java | 17 +++ .../impl/mapper/AggrMapperDecorator.java | 36 +++++ .../impl}/model/AddressTxAmountEntity.java | 4 +- .../impl}/model/AddressTxAmountId.java | 2 +- .../repository/AddressTxAmountRepository.java | 6 +- .../resources/db/account/V2_1_1__init.sql | 42 +++--- aggregation-app/build.gradle | 1 - build.gradle | 5 + publish-common.gradle | 91 ++++++++++- 15 files changed, 402 insertions(+), 63 deletions(-) create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/AddressTxAmountStorage.java create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/mapper/AggrMapper.java create mode 100644 aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/mapper/AggrMapperDecorator.java rename aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/{ => storage/impl}/model/AddressTxAmountEntity.java (94%) rename aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/{ => storage/impl}/model/AddressTxAmountId.java (84%) rename aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/{ => storage/impl}/repository/AddressTxAmountRepository.java (54%) diff --git a/aggregates/account/build.gradle b/aggregates/account/build.gradle index 9f035dce..87bd018b 100644 --- a/aggregates/account/build.gradle +++ b/aggregates/account/build.gradle @@ -3,13 +3,10 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-jpa' implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation(libs.cardano.client.lib) - implementation(libs.yaci.store.starter) + api(libs.yaci.store.starter) implementation(libs.yaci.store.utxo.starter) implementation(libs.yaci.store.account.starter) - compileOnly(libs.lombok) - annotationProcessor(libs.lombok) - testImplementation 'org.springframework.boot:spring-boot-starter-test' } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java new file mode 100644 index 00000000..a5c1ac22 --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java @@ -0,0 +1,33 @@ +package org.cardanofoundation.ledgersync.account.domain; + +import com.bloxbean.cardano.yaci.store.common.domain.BlockAwareDomain; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.math.BigInteger; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@SuperBuilder(toBuilder = true) +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +public class AddressTxAmount extends BlockAwareDomain { + private String address; + private String unit; + private String txHash; + private Long slot; + private BigInteger quantity; + private String policy; + private String assetName; + private String paymentCredential; + private String stakeAddress; + private String blockHash; + private Integer epoch; +} + diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index e907e81b..65ca2520 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -12,8 +12,8 @@ import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.cardanofoundation.ledgersync.account.model.AddressTxAmountEntity; -import org.cardanofoundation.ledgersync.account.repository.AddressTxAmountRepository; +import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount; +import org.cardanofoundation.ledgersync.account.storage.AddressTxAmountStorage; import org.springframework.context.event.EventListener; import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; @@ -31,10 +31,11 @@ @RequiredArgsConstructor @Slf4j public class AddressTxAmountProcessor { - private final AddressTxAmountRepository addressTxAmountRepository; + private final AddressTxAmountStorage addressTxAmountStorage; private final UtxoClient utxoClient; private List> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); + private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); private final PlatformTransactionManager transactionManager; private TransactionTemplate transactionTemplate; @@ -48,7 +49,7 @@ void init() { @EventListener @Transactional public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { - //Ignore Genesis Txs for now -- TODO + //Ignore Genesis Txs as it's handled by GEnesisBlockAddressTxAmtProcessor if (addressUtxoEvent.getEventMetadata().getSlot() == -1) return; @@ -56,22 +57,22 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { if (txInputOutputList == null || txInputOutputList.isEmpty()) return; - List addressTxAmountEntities = new ArrayList<>(); + List addressTxAmountList = new ArrayList<>(); for (var txInputOutput : txInputOutputList) { var txAddressTxAmountEntities = processAddressAmountForTx(addressUtxoEvent.getEventMetadata(), txInputOutput, false); - if (txAddressTxAmountEntities == null) continue; + if (txAddressTxAmountEntities == null || txAddressTxAmountEntities.isEmpty()) continue; - addressTxAmountEntities.addAll(txAddressTxAmountEntities); + addressTxAmountList.addAll(txAddressTxAmountEntities); } - if (addressTxAmountEntities.size() > 0) { - addressTxAmountRepository.saveAll(addressTxAmountEntities); + if (addressTxAmountList.size() > 0) { + addressTxAmountListCache.addAll(addressTxAmountList); } } - private List processAddressAmountForTx(EventMetadata metadata, TxInputOutput txInputOutput, - boolean throwExceptionOnFailure) { + private List processAddressAmountForTx(EventMetadata metadata, TxInputOutput txInputOutput, + boolean throwExceptionOnFailure) { var txHash = txInputOutput.getTxHash(); var inputs = txInputOutput.getInputs(); var outputs = txInputOutput.getOutputs(); @@ -93,14 +94,16 @@ private List processAddressAmountForTx(EventMetadata meta throw new IllegalStateException("Unable to get inputs for all input keys for account balance calculation : " + inputUtxoKeys); else txInputOutputListCache.add(Pair.of(metadata, txInputOutput)); + + return Collections.emptyList(); } - var txAddressTxAmountEntities = + var txAddressTxAmount = processTxAmount(txHash, metadata, inputAddressUtxos, outputs); - return txAddressTxAmountEntities; + return txAddressTxAmount; } - private List processTxAmount(String txHash, EventMetadata metadata, List inputs, List outputs) { + private List processTxAmount(String txHash, EventMetadata metadata, List inputs, List outputs) { Map, BigInteger> addressTxAmountMap = new HashMap<>(); Map addressToAddressDetailsMap = new HashMap<>(); Map unitToAssetDetailsMap = new HashMap<>(); @@ -137,7 +140,7 @@ private List processTxAmount(String txHash, EventMetadata } } - return (List) addressTxAmountMap.entrySet() + return (List) addressTxAmountMap.entrySet() .stream() .filter(entry -> entry.getValue().compareTo(BigInteger.ZERO) != 0) .map(entry -> { @@ -147,13 +150,12 @@ private List processTxAmount(String txHash, EventMetadata //address and full address if the address is too long var addressTuple = getAddress(entry.getKey().getFirst()); - return AddressTxAmountEntity.builder() + return AddressTxAmount.builder() .address(addressTuple._1) .unit(entry.getKey().getSecond()) .txHash(txHash) .slot(metadata.getSlot()) .quantity(entry.getValue()) - .addrFull(addressTuple._2) .stakeAddress(addressDetails.ownerStakeAddress) .assetName(assetDetails.assetName) .policy(assetDetails.policy) @@ -170,7 +172,7 @@ private List processTxAmount(String txHash, EventMetadata @Transactional //We can also listen to CommitEvent here public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyForBalanceAggregationEvent) { try { - List addressTxAmountEntities = new ArrayList<>(); + List addressTxAmountList = new ArrayList<>(); for (var pair : txInputOutputListCache) { EventMetadata metadata = pair.getFirst(); TxInputOutput txInputOutput = pair.getSecond(); @@ -178,22 +180,32 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo var addrAmountEntitiesForTx = processAddressAmountForTx(metadata, txInputOutput, true); if (addrAmountEntitiesForTx != null) { - addressTxAmountEntities.addAll(addrAmountEntitiesForTx); + addressTxAmountList.addAll(addrAmountEntitiesForTx); } } - if (addressTxAmountEntities.size() > 0) { - addressTxAmountRepository.saveAll(addressTxAmountEntities); + if (addressTxAmountList.size() > 0) { + addressTxAmountListCache.addAll(addressTxAmountList); + } + + long t1 = System.currentTimeMillis(); + if (addressTxAmountListCache.size() > 0) { + addressTxAmountStorage.save(addressTxAmountListCache); + log.info("Total {} address_tx_amounts records saved", addressTxAmountListCache.size()); } + long t2 = System.currentTimeMillis(); + log.info("Time taken to save address_tx_amounts records : " + (t2 - t1) + " ms"); + } finally { txInputOutputListCache.clear(); + addressTxAmountListCache.clear(); } } @EventListener @Transactional public void handleRollback(RollbackEvent rollbackEvent) { - int addressTxAmountDeleted = addressTxAmountRepository.deleteAddressBalanceBySlotGreaterThan(rollbackEvent.getRollbackTo().getSlot()); + int addressTxAmountDeleted = addressTxAmountStorage.deleteAddressBalanceBySlotGreaterThan(rollbackEvent.getRollbackTo().getSlot()); log.info("Rollback -- {} address_tx_amounts records", addressTxAmountDeleted); } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java index d58cfb59..1ca05d9b 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java @@ -6,8 +6,8 @@ import com.bloxbean.cardano.yaci.store.events.GenesisBlockEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.cardanofoundation.ledgersync.account.model.AddressTxAmountEntity; -import org.cardanofoundation.ledgersync.account.repository.AddressTxAmountRepository; +import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount; +import org.cardanofoundation.ledgersync.account.storage.AddressTxAmountStorage; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -23,7 +23,7 @@ @RequiredArgsConstructor @Slf4j public class GensisBlockAddressTxAmtProcessor { - private final AddressTxAmountRepository addressTxAmountRepository; + private final AddressTxAmountStorage addressTxAmountStorage; @EventListener @Transactional @@ -34,7 +34,7 @@ public void handleAddressTxAmtForGenesisBlock(GenesisBlockEvent genesisBlockEven return; } - List genesisAddressTxAmtList = new ArrayList<>(); + List genesisAddressTxAmtList = new ArrayList<>(); for (var genesisBalance : genesisBalances) { var receiverAddress = genesisBalance.getAddress(); var txnHash = genesisBalance.getTxnHash(); @@ -62,13 +62,12 @@ public void handleAddressTxAmtForGenesisBlock(GenesisBlockEvent genesisBlockEven } } - var addressTxAmountEntity = AddressTxAmountEntity.builder() + var addressTxAmount = AddressTxAmount.builder() .address(addressTuple._1) .unit(LOVELACE) .txHash(txnHash) .slot(genesisBlockEvent.getSlot()) .quantity(balance) - .addrFull(addressTuple._2) .stakeAddress(stakeAddress) .assetName(LOVELACE) .policy(null) @@ -79,11 +78,11 @@ public void handleAddressTxAmtForGenesisBlock(GenesisBlockEvent genesisBlockEven .blockTime(genesisBlockEvent.getBlockTime()) .build(); - genesisAddressTxAmtList.add(addressTxAmountEntity); + genesisAddressTxAmtList.add(addressTxAmount); } if (genesisAddressTxAmtList.size() > 0) { - addressTxAmountRepository.saveAll(genesisAddressTxAmtList); + addressTxAmountStorage.save(genesisAddressTxAmtList); } } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/AddressTxAmountStorage.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/AddressTxAmountStorage.java new file mode 100644 index 00000000..a18ae2f8 --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/AddressTxAmountStorage.java @@ -0,0 +1,10 @@ +package org.cardanofoundation.ledgersync.account.storage; + +import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount; + +import java.util.List; + +public interface AddressTxAmountStorage { + void save(List addressTxAmount); + int deleteAddressBalanceBySlotGreaterThan(Long slot); +} diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java new file mode 100644 index 00000000..f54f962e --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -0,0 +1,142 @@ +package org.cardanofoundation.ledgersync.account.storage.impl; + +import com.bloxbean.cardano.yaci.store.account.AccountStoreProperties; +import com.bloxbean.cardano.yaci.store.common.util.ListUtil; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount; +import org.cardanofoundation.ledgersync.account.storage.AddressTxAmountStorage; +import org.cardanofoundation.ledgersync.account.storage.impl.mapper.AggrMapper; +import org.cardanofoundation.ledgersync.account.storage.impl.model.AddressTxAmountEntity; +import org.cardanofoundation.ledgersync.account.storage.impl.repository.AddressTxAmountRepository; +import org.jooq.DSLContext; +import org.springframework.stereotype.Component; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; + +import java.time.LocalDateTime; +import java.util.List; + +import static org.cardanofoundation.ledgersync.account.jooq.Tables.ADDRESS_TX_AMOUNT; + +@Component +@RequiredArgsConstructor +@Slf4j +public class AddressTxAmountStorageImpl implements AddressTxAmountStorage { + private final AddressTxAmountRepository addressTxAmountRepository; + private final DSLContext dsl; + private final AccountStoreProperties accountStoreProperties; + private final PlatformTransactionManager transactionManager; + + private final AggrMapper aggrMapper = AggrMapper.INSTANCE; + private TransactionTemplate transactionTemplate; + + @PostConstruct + public void postConstruct() { + this.dsl.settings().setBatchSize(accountStoreProperties.getJooqWriteBatchSize()); + + transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + } + + @Override + @Transactional + public void save(List addressTxAmount) { + var addressTxAmtEntities = addressTxAmount.stream() + .map(addressTxAmount1 -> aggrMapper.toAddressTxAmountEntity(addressTxAmount1)) + .toList(); + + if (accountStoreProperties.isParallelWrite()) { + transactionTemplate.execute(status -> { + ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave); + return null; + }); + } else { + doSave(addressTxAmtEntities); + } + } + + private void doSave(List addressTxAmountEntities) { + LocalDateTime localDateTime = LocalDateTime.now(); + + var inserts = addressTxAmountEntities.stream() + .map(addressTxAmount -> { + return dsl.insertInto(ADDRESS_TX_AMOUNT) + .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) + .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) + .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) + .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) + .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime) + .onDuplicateKeyUpdate() + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) + .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) + .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime); + }).toList(); + dsl.batch(inserts).execute(); + + + /** + dsl.batched(c -> { + for (var addressTxAmount : addressTxAmountEntities) { + c.dsl().insertInto(ADDRESS_TX_AMOUNT) + .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) + .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) + .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) + .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) + .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime) + .onDuplicateKeyUpdate() + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) + .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) + .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime) + .execute(); + } + }); + **/ + } + + @Override + public int deleteAddressBalanceBySlotGreaterThan(Long slot) { + return addressTxAmountRepository.deleteAddressBalanceBySlotGreaterThan(slot); + } +} diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/mapper/AggrMapper.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/mapper/AggrMapper.java new file mode 100644 index 00000000..cc9ec167 --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/mapper/AggrMapper.java @@ -0,0 +1,17 @@ +package org.cardanofoundation.ledgersync.account.storage.impl.mapper; + +import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount; +import org.cardanofoundation.ledgersync.account.storage.impl.model.AddressTxAmountEntity; +import org.mapstruct.DecoratedWith; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +@Mapper(componentModel = "default") +@DecoratedWith(AggrMapperDecorator.class) +public interface AggrMapper { + AggrMapper INSTANCE = Mappers.getMapper(AggrMapper.class); + + AddressTxAmount toAddressTxAmount(AddressTxAmountEntity entity); + AddressTxAmountEntity toAddressTxAmountEntity(AddressTxAmount addressTxAmount); +} + diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/mapper/AggrMapperDecorator.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/mapper/AggrMapperDecorator.java new file mode 100644 index 00000000..673f019c --- /dev/null +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/mapper/AggrMapperDecorator.java @@ -0,0 +1,36 @@ +package org.cardanofoundation.ledgersync.account.storage.impl.mapper; + +import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount; +import org.cardanofoundation.ledgersync.account.storage.impl.model.AddressTxAmountEntity; + +public class AggrMapperDecorator implements AggrMapper { + public static final int MAX_ADDR_SIZE = 500; + private final AggrMapper delegate; + + public AggrMapperDecorator(AggrMapper delegate) { + this.delegate = delegate; + } + + @Override + public AddressTxAmount toAddressTxAmount(AddressTxAmountEntity entity) { + AddressTxAmount addrTxAmount = delegate.toAddressTxAmount(entity); + + if (entity.getAddrFull() != null && entity.getAddrFull().length() > 0) + addrTxAmount.setAddress(entity.getAddrFull()); + + return addrTxAmount; + } + + @Override + public AddressTxAmountEntity toAddressTxAmountEntity(AddressTxAmount addressTxAmount) { + AddressTxAmountEntity entity = delegate.toAddressTxAmountEntity(addressTxAmount); + + if (addressTxAmount.getAddress() != null && addressTxAmount.getAddress().length() > MAX_ADDR_SIZE) { + entity.setAddress(addressTxAmount.getAddress().substring(0, MAX_ADDR_SIZE)); + entity.setAddrFull(addressTxAmount.getAddress()); + } + + return entity; + } + +} diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountEntity.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java similarity index 94% rename from aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountEntity.java rename to aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java index 130c934e..cdd83420 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountEntity.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java @@ -1,4 +1,4 @@ -package org.cardanofoundation.ledgersync.account.model; +package org.cardanofoundation.ledgersync.account.storage.impl.model; import com.bloxbean.cardano.yaci.store.common.model.BlockAwareEntity; import jakarta.persistence.*; @@ -27,6 +27,7 @@ public class AddressTxAmountEntity extends BlockAwareEntity { @Column(name = "unit") private String unit; + @Id @Column(name = "tx_hash") private String txHash; @@ -57,5 +58,4 @@ public class AddressTxAmountEntity extends BlockAwareEntity { @Column(name = "epoch") private Integer epoch; - } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountId.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountId.java similarity index 84% rename from aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountId.java rename to aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountId.java index d6cf16fd..0fc63378 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/model/AddressTxAmountId.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountId.java @@ -1,4 +1,4 @@ -package org.cardanofoundation.ledgersync.account.model; +package org.cardanofoundation.ledgersync.account.storage.impl.model; import jakarta.persistence.Column; import lombok.*; diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/repository/AddressTxAmountRepository.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/repository/AddressTxAmountRepository.java similarity index 54% rename from aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/repository/AddressTxAmountRepository.java rename to aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/repository/AddressTxAmountRepository.java index 04f4032d..bb3ade6d 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/repository/AddressTxAmountRepository.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/repository/AddressTxAmountRepository.java @@ -1,7 +1,7 @@ -package org.cardanofoundation.ledgersync.account.repository; +package org.cardanofoundation.ledgersync.account.storage.impl.repository; -import org.cardanofoundation.ledgersync.account.model.AddressTxAmountEntity; -import org.cardanofoundation.ledgersync.account.model.AddressTxAmountId; +import org.cardanofoundation.ledgersync.account.storage.impl.model.AddressTxAmountEntity; +import org.cardanofoundation.ledgersync.account.storage.impl.model.AddressTxAmountId; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; diff --git a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql index a8b4539e..b5e4e039 100644 --- a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql +++ b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql @@ -19,24 +19,24 @@ create table address_tx_amount primary key (address, unit, tx_hash) ); -CREATE INDEX idx_address_tx_amount_slot - ON address_tx_amount(slot); - --- address_balance_view -drop view if exists address_balance_view; -create view address_balance_view as -select ab.* -from address_balance ab - inner join (select address, unit, max(slot) as max_slot - from address_balance ab2 - group by address, unit) max_ab - on ab.address = max_ab.address and ab.unit = max_ab.unit and ab.slot = max_ab.max_slot; - --- stake_address_balance_view -drop view if exists stake_address_balance_view; -create view stake_address_balance_view AS -select sb.* -from stake_address_balance sb - inner join (select address, MAX(slot) as max_slot - from stake_address_balance sb2 - group by address) max_sb on sb.address = max_sb.address and sb.slot = max_sb.max_slot; +-- CREATE INDEX idx_address_tx_amount_slot +-- ON address_tx_amount(slot); +-- +-- -- address_balance_view +-- drop view if exists address_balance_view; +-- create view address_balance_view as +-- select ab.* +-- from address_balance ab +-- inner join (select address, unit, max(slot) as max_slot +-- from address_balance ab2 +-- group by address, unit) max_ab +-- on ab.address = max_ab.address and ab.unit = max_ab.unit and ab.slot = max_ab.max_slot; +-- +-- -- stake_address_balance_view +-- drop view if exists stake_address_balance_view; +-- create view stake_address_balance_view AS +-- select sb.* +-- from stake_address_balance sb +-- inner join (select address, MAX(slot) as max_slot +-- from stake_address_balance sb2 +-- group by address) max_sb on sb.address = max_sb.address and sb.slot = max_sb.max_slot; diff --git a/aggregation-app/build.gradle b/aggregation-app/build.gradle index 77a7ea0d..7116b316 100644 --- a/aggregation-app/build.gradle +++ b/aggregation-app/build.gradle @@ -19,7 +19,6 @@ dependencies { compileOnly(libs.lombok) annotationProcessor(libs.lombok) - annotationProcessor(libs.lombok.mapstruct.binding) annotationProcessor(libs.mapstruct.processor) testImplementation 'org.springframework.boot:spring-boot-starter-test' diff --git a/build.gradle b/build.gradle index 4c15f01e..1bcd3fc6 100644 --- a/build.gradle +++ b/build.gradle @@ -37,7 +37,12 @@ subprojects { } dependencies { + implementation(libs.map.struct) + compileOnly(libs.lombok) + annotationProcessor(libs.lombok) + annotationProcessor(libs.lombok.mapstruct.binding) + annotationProcessor(libs.mapstruct.processor) } compileJava { diff --git a/publish-common.gradle b/publish-common.gradle index 8ccd6ad7..eb375f2d 100644 --- a/publish-common.gradle +++ b/publish-common.gradle @@ -1,7 +1,7 @@ apply plugin: 'maven-publish' apply plugin: 'signing' apply plugin: 'org.flywaydb.flyway' -//apply plugin: 'nu.studer.jooq' +apply plugin: 'nu.studer.jooq' if (!name.equalsIgnoreCase("components")) { publishing { @@ -57,6 +57,95 @@ if (!name.equalsIgnoreCase("components")) { } } +//JOOQ generator +//Add store modules to this array +def jooq_modules = ['account'] +if (jooq_modules.contains(name)) { + configurations { + flywayMigration + } + + dependencies { + flywayMigration 'com.h2database:h2' + jooqGenerator 'com.h2database:h2' + } + + flyway { + configurations = ['flywayMigration'] + url = 'jdbc:h2:' + project.buildDir.absolutePath + File.separator + 'testdb;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE;INIT=CREATE SCHEMA IF NOT EXISTS \"PUBLIC\"\\;' + user = 'sa' + password = '' + locations = ['filesystem:src/main/resources/db/'] + createSchemas = true + schemas = ['PUBLIC'] +// cleanOnValidationError = true +// cleanDisabled = false + } + + jooq { + version = '3.18.9' + configurations { + main { + generationTool { +// logging = org.jooq.meta.jaxb.Logging.WARN + jdbc { + driver = 'org.h2.Driver' + url = flyway.url + user = flyway.user + password = flyway.password + } + generator { + name = 'org.jooq.codegen.DefaultGenerator' + database { + name = 'org.jooq.meta.h2.H2Database' + includes = 'PUBLIC.*' + excludes = 'FLYWAY_SCHEMA_HISTORY | UNUSED_TABLE | PREFIX_.* | SECRET_SCHEMA.SECRET_TABLE | SECRET_ROUTINE' + + schemata { + schema { + inputSchema = 'PUBLIC' + outputSchemaToDefault = true + } + } + } + target { + packageName = 'org.cardanofoundation.ledgersync.' + project.name + ".jooq"; + } + + generate { + // Generate the DAO classes + daos = true + // Annotate DAOs (and other types) with spring annotations, such as @Repository and @Autowired + // for auto-wiring the Configuration instance, e.g. from Spring Boot's jOOQ starter + springAnnotations = true + // Generate Spring-specific DAOs containing @Transactional annotations + springDao = true + } + } + } + } + } + } + + // configure jOOQ task such that it only executes when something has changed that potentially affects the generated JOOQ sources + // - the jOOQ configuration has changed (Jdbc, Generator, Strategy, etc.) + // - the classpath used to execute the jOOQ generation tool has changed (jOOQ library, database driver, strategy classes, etc.) + // - the schema files from which the schema is generated and which is used by jOOQ to generate the sources have changed (scripts added, modified, etc.) + tasks.named('generateJooq').configure { + // ensure database schema has been prepared by Flyway before generating the jOOQ sources + dependsOn tasks.named('flywayMigrate') + + // declare Flyway migration scripts as inputs on the jOOQ task + inputs.files(fileTree('src/main/resources/db/')) + .withPropertyName('migrations') + .withPathSensitivity(PathSensitivity.RELATIVE) + + // make jOOQ task participate in incremental builds (and build caching) + allInputsDeclared = true + } +} + + ext.isReleaseVersion = !version.endsWith("SNAPSHOT") if (isReleaseVersion && !project.hasProperty("skipSigning")) { From 79a6a9e9324c6dbc3904527ba1f6c3fc189a1021 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 22:10:54 +0800 Subject: [PATCH 11/29] chore: Replace JPA with JOOQ for AddressTxAmount --- .../account/storage/impl/AddressTxAmountStorageImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index f54f962e..6f8a22bc 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -62,6 +62,7 @@ public void save(List addressTxAmount) { private void doSave(List addressTxAmountEntities) { LocalDateTime localDateTime = LocalDateTime.now(); + /** var inserts = addressTxAmountEntities.stream() .map(addressTxAmount -> { return dsl.insertInto(ADDRESS_TX_AMOUNT) @@ -95,9 +96,9 @@ private void doSave(List addressTxAmountEntities) { .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime); }).toList(); dsl.batch(inserts).execute(); + **/ - /** dsl.batched(c -> { for (var addressTxAmount : addressTxAmountEntities) { c.dsl().insertInto(ADDRESS_TX_AMOUNT) @@ -132,7 +133,7 @@ private void doSave(List addressTxAmountEntities) { .execute(); } }); - **/ + } @Override From 93aec7d9e7a4edb1fffb33db374fdf936429d7e7 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 22:21:39 +0800 Subject: [PATCH 12/29] chore: Save by block if # of records > 500 --- .../account/processor/AddressTxAmountProcessor.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 65ca2520..733eed83 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -66,6 +66,11 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } + if (addressTxAmountList.size() > 500) { + addressTxAmountStorage.save(addressTxAmountList); //Save + return; + } + if (addressTxAmountList.size() > 0) { addressTxAmountListCache.addAll(addressTxAmountList); } From b6580690cdaaf24210024df2913f6581f1558d0c Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 22:35:36 +0800 Subject: [PATCH 13/29] chore: Save by block if # of records > 200 --- .../account/processor/AddressTxAmountProcessor.java | 2 +- .../account/storage/impl/AddressTxAmountStorageImpl.java | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 733eed83..f1371ccf 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -66,7 +66,7 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } - if (addressTxAmountList.size() > 500) { + if (addressTxAmountList.size() > 200) { addressTxAmountStorage.save(addressTxAmountList); //Save return; } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index 6f8a22bc..946ba0e8 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -62,7 +62,6 @@ public void save(List addressTxAmount) { private void doSave(List addressTxAmountEntities) { LocalDateTime localDateTime = LocalDateTime.now(); - /** var inserts = addressTxAmountEntities.stream() .map(addressTxAmount -> { return dsl.insertInto(ADDRESS_TX_AMOUNT) @@ -96,9 +95,8 @@ private void doSave(List addressTxAmountEntities) { .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime); }).toList(); dsl.batch(inserts).execute(); - **/ - + /** dsl.batched(c -> { for (var addressTxAmount : addressTxAmountEntities) { c.dsl().insertInto(ADDRESS_TX_AMOUNT) @@ -133,6 +131,7 @@ private void doSave(List addressTxAmountEntities) { .execute(); } }); + **/ } From 73add72165c959840efc252817efe8d261be97c2 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 22:49:33 +0800 Subject: [PATCH 14/29] chore: Save by block if # of records > 200 --- .../account/processor/AddressTxAmountProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index f1371ccf..0e6b0a88 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -66,10 +66,10 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } - if (addressTxAmountList.size() > 200) { - addressTxAmountStorage.save(addressTxAmountList); //Save - return; - } +// if (addressTxAmountList.size() > 200) { +// addressTxAmountStorage.save(addressTxAmountList); //Save +// return; +// } if (addressTxAmountList.size() > 0) { addressTxAmountListCache.addAll(addressTxAmountList); From 5cc37f71c5586cecee430311521ff9b299481287 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 22:56:23 +0800 Subject: [PATCH 15/29] chore: Update --- .../account/storage/impl/AddressTxAmountStorageImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index 946ba0e8..f165bbfe 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -62,6 +62,7 @@ public void save(List addressTxAmount) { private void doSave(List addressTxAmountEntities) { LocalDateTime localDateTime = LocalDateTime.now(); + /** var inserts = addressTxAmountEntities.stream() .map(addressTxAmount -> { return dsl.insertInto(ADDRESS_TX_AMOUNT) @@ -95,8 +96,8 @@ private void doSave(List addressTxAmountEntities) { .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime); }).toList(); dsl.batch(inserts).execute(); + **/ - /** dsl.batched(c -> { for (var addressTxAmount : addressTxAmountEntities) { c.dsl().insertInto(ADDRESS_TX_AMOUNT) @@ -131,7 +132,6 @@ private void doSave(List addressTxAmountEntities) { .execute(); } }); - **/ } From 9d0e18698f24b284a70af0845cece1dfe745fa33 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 23:00:01 +0800 Subject: [PATCH 16/29] chore: Update --- .../account/processor/AddressTxAmountProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 0e6b0a88..f1371ccf 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -66,10 +66,10 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } -// if (addressTxAmountList.size() > 200) { -// addressTxAmountStorage.save(addressTxAmountList); //Save -// return; -// } + if (addressTxAmountList.size() > 200) { + addressTxAmountStorage.save(addressTxAmountList); //Save + return; + } if (addressTxAmountList.size() > 0) { addressTxAmountListCache.addAll(addressTxAmountList); From b9dc6a99222721bc0b6826b81e7ab2b815210d8f Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 23:07:30 +0800 Subject: [PATCH 17/29] chore: Update --- .../processor/AddressTxAmountProcessor.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index f1371ccf..69b2992d 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -66,14 +66,16 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } - if (addressTxAmountList.size() > 200) { - addressTxAmountStorage.save(addressTxAmountList); //Save - return; - } - - if (addressTxAmountList.size() > 0) { - addressTxAmountListCache.addAll(addressTxAmountList); - } + addressTxAmountStorage.save(addressTxAmountList); + +// if (addressTxAmountList.size() > 200) { +// addressTxAmountStorage.save(addressTxAmountList); //Save +// return; +// } +// +// if (addressTxAmountList.size() > 0) { +// addressTxAmountListCache.addAll(addressTxAmountList); +// } } private List processAddressAmountForTx(EventMetadata metadata, TxInputOutput txInputOutput, From 654c7d91362af986c2d4d4e74c990495569b3b65 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 23:16:49 +0800 Subject: [PATCH 18/29] chore: Update --- .../processor/AddressTxAmountProcessor.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 69b2992d..f1371ccf 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -66,16 +66,14 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } - addressTxAmountStorage.save(addressTxAmountList); - -// if (addressTxAmountList.size() > 200) { -// addressTxAmountStorage.save(addressTxAmountList); //Save -// return; -// } -// -// if (addressTxAmountList.size() > 0) { -// addressTxAmountListCache.addAll(addressTxAmountList); -// } + if (addressTxAmountList.size() > 200) { + addressTxAmountStorage.save(addressTxAmountList); //Save + return; + } + + if (addressTxAmountList.size() > 0) { + addressTxAmountListCache.addAll(addressTxAmountList); + } } private List processAddressAmountForTx(EventMetadata metadata, TxInputOutput txInputOutput, From a26fa5dbd5ed6581fb7f3a5c747d05bc90dde57c Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 23:22:47 +0800 Subject: [PATCH 19/29] chore: Update --- .../account/processor/AddressTxAmountProcessor.java | 2 ++ .../account/storage/impl/AddressTxAmountStorageImpl.java | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index f1371ccf..565ffd69 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -37,6 +37,8 @@ public class AddressTxAmountProcessor { private List> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); +// private List addressUtxoEvents = Collections.synchronizedList(new ArrayList<>()); + private final PlatformTransactionManager transactionManager; private TransactionTemplate transactionTemplate; diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index f165bbfe..8f222f1e 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -50,10 +50,10 @@ public void save(List addressTxAmount) { .toList(); if (accountStoreProperties.isParallelWrite()) { - transactionTemplate.execute(status -> { + //transactionTemplate.execute(status -> { ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave); - return null; - }); + // return null; + // }); } else { doSave(addressTxAmtEntities); } From 5a8a01d66fa7fa03cbed4fd57e5aaddb31c5a62f Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 23:27:32 +0800 Subject: [PATCH 20/29] chore: Update --- .../account/processor/AddressTxAmountProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 565ffd69..e290fb45 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -68,10 +68,10 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } - if (addressTxAmountList.size() > 200) { - addressTxAmountStorage.save(addressTxAmountList); //Save - return; - } +// if (addressTxAmountList.size() > 200) { +// addressTxAmountStorage.save(addressTxAmountList); //Save +// return; +// } if (addressTxAmountList.size() > 0) { addressTxAmountListCache.addAll(addressTxAmountList); From f62d427e68047a306d9e5436c877d16877a95748 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 23:32:50 +0800 Subject: [PATCH 21/29] chore: Update --- .../account/processor/AddressTxAmountProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index e290fb45..ad38e008 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -68,10 +68,10 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } -// if (addressTxAmountList.size() > 200) { -// addressTxAmountStorage.save(addressTxAmountList); //Save -// return; -// } + if (addressTxAmountList.size() > 300) { + addressTxAmountStorage.save(addressTxAmountList); //Save + return; + } if (addressTxAmountList.size() > 0) { addressTxAmountListCache.addAll(addressTxAmountList); From d1b167ea3ac74c20c6c70411a8a76d4e6f7b3f79 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 23:36:49 +0800 Subject: [PATCH 22/29] chore: Update --- .../account/processor/AddressTxAmountProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index ad38e008..d5bbe6df 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -48,8 +48,8 @@ void init() { transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); } - @EventListener - @Transactional +// @EventListener +// @Transactional public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { //Ignore Genesis Txs as it's handled by GEnesisBlockAddressTxAmtProcessor if (addressUtxoEvent.getEventMetadata().getSlot() == -1) From 9c86b09ad14f0ed45f7d92076f3178ecb6590c5f Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 23:47:35 +0800 Subject: [PATCH 23/29] chore: Update --- .../processor/AddressTxAmountProcessor.java | 15 ++++++++++++--- .../storage/impl/AddressTxAmountStorageImpl.java | 6 +++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index d5bbe6df..72da233b 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -37,7 +37,7 @@ public class AddressTxAmountProcessor { private List> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); -// private List addressUtxoEvents = Collections.synchronizedList(new ArrayList<>()); + private List addressUtxoEvents = Collections.synchronizedList(new ArrayList<>()); private final PlatformTransactionManager transactionManager; private TransactionTemplate transactionTemplate; @@ -48,8 +48,12 @@ void init() { transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); } -// @EventListener -// @Transactional + @EventListener + @Transactional + public void handleEvent(AddressUtxoEvent addressUtxoEvent) { + addressUtxoEvents.add(addressUtxoEvent); + } + public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { //Ignore Genesis Txs as it's handled by GEnesisBlockAddressTxAmtProcessor if (addressUtxoEvent.getEventMetadata().getSlot() == -1) @@ -179,6 +183,9 @@ private List processTxAmount(String txHash, EventMetadata metad @Transactional //We can also listen to CommitEvent here public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyForBalanceAggregationEvent) { try { + addressUtxoEvents.parallelStream().forEach(this::processAddressUtxoEvent); + + /** List addressTxAmountList = new ArrayList<>(); for (var pair : txInputOutputListCache) { EventMetadata metadata = pair.getFirst(); @@ -194,6 +201,7 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo if (addressTxAmountList.size() > 0) { addressTxAmountListCache.addAll(addressTxAmountList); } + **/ long t1 = System.currentTimeMillis(); if (addressTxAmountListCache.size() > 0) { @@ -206,6 +214,7 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo } finally { txInputOutputListCache.clear(); addressTxAmountListCache.clear(); + addressUtxoEvents.clear(); } } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index 8f222f1e..f165bbfe 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -50,10 +50,10 @@ public void save(List addressTxAmount) { .toList(); if (accountStoreProperties.isParallelWrite()) { - //transactionTemplate.execute(status -> { + transactionTemplate.execute(status -> { ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave); - // return null; - // }); + return null; + }); } else { doSave(addressTxAmtEntities); } From 1f34b5bfebe75ffbfc2c936f1db6970eb29a62b5 Mon Sep 17 00:00:00 2001 From: Satya Date: Thu, 7 Mar 2024 23:53:37 +0800 Subject: [PATCH 24/29] chore: Update --- .../account/processor/AddressTxAmountProcessor.java | 13 +------------ .../storage/impl/AddressTxAmountStorageImpl.java | 6 +++--- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 72da233b..21fa950c 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -37,8 +37,6 @@ public class AddressTxAmountProcessor { private List> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); - private List addressUtxoEvents = Collections.synchronizedList(new ArrayList<>()); - private final PlatformTransactionManager transactionManager; private TransactionTemplate transactionTemplate; @@ -50,10 +48,6 @@ void init() { @EventListener @Transactional - public void handleEvent(AddressUtxoEvent addressUtxoEvent) { - addressUtxoEvents.add(addressUtxoEvent); - } - public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { //Ignore Genesis Txs as it's handled by GEnesisBlockAddressTxAmtProcessor if (addressUtxoEvent.getEventMetadata().getSlot() == -1) @@ -72,7 +66,7 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } - if (addressTxAmountList.size() > 300) { + if (addressTxAmountList.size() > 100) { addressTxAmountStorage.save(addressTxAmountList); //Save return; } @@ -183,9 +177,6 @@ private List processTxAmount(String txHash, EventMetadata metad @Transactional //We can also listen to CommitEvent here public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyForBalanceAggregationEvent) { try { - addressUtxoEvents.parallelStream().forEach(this::processAddressUtxoEvent); - - /** List addressTxAmountList = new ArrayList<>(); for (var pair : txInputOutputListCache) { EventMetadata metadata = pair.getFirst(); @@ -201,7 +192,6 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo if (addressTxAmountList.size() > 0) { addressTxAmountListCache.addAll(addressTxAmountList); } - **/ long t1 = System.currentTimeMillis(); if (addressTxAmountListCache.size() > 0) { @@ -214,7 +204,6 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo } finally { txInputOutputListCache.clear(); addressTxAmountListCache.clear(); - addressUtxoEvents.clear(); } } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index f165bbfe..57a5e9a8 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -50,10 +50,10 @@ public void save(List addressTxAmount) { .toList(); if (accountStoreProperties.isParallelWrite()) { - transactionTemplate.execute(status -> { +// transactionTemplate.execute(status -> { ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave); - return null; - }); +// return null; +// }); } else { doSave(addressTxAmtEntities); } From adfda88178e4d1da0910f6ec54ccb46c9093302b Mon Sep 17 00:00:00 2001 From: Satya Date: Fri, 8 Mar 2024 09:31:50 +0800 Subject: [PATCH 25/29] chore: Update --- .../processor/AddressTxAmountProcessor.java | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 21fa950c..e80340a3 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -37,6 +37,9 @@ public class AddressTxAmountProcessor { private List> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); + private Map> addressTxAmountListCacheMap = new HashMap<>(); + private int bucketSize = 10; + private final PlatformTransactionManager transactionManager; private TransactionTemplate transactionTemplate; @@ -44,6 +47,39 @@ public class AddressTxAmountProcessor { void init() { transactionTemplate = new TransactionTemplate(transactionManager); transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + + initAddressTxAmtCache(); + } + + private void initAddressTxAmtCache() { + for (int i = 0; i < bucketSize; i++) { + addressTxAmountListCacheMap.put(i, Collections.synchronizedList(new ArrayList<>())); + } + } + + private void save(long blockNo, List addressTxAmounts) { + int index = (int) (blockNo % bucketSize); + List addressTxAmountList = addressTxAmountListCacheMap.get(index); + addressTxAmountList.addAll(addressTxAmounts); + + if (addressTxAmountList.size() > 1000) { + synchronized (addressTxAmountList) { + addressTxAmountStorage.save(addressTxAmountList); + if (log.isDebugEnabled()) + log.debug("-- Saved address_tx_amounts records : {}", addressTxAmountList.size()); + addressTxAmountList.clear(); + } + } + } + + private void clearCache() { + for (int i = 0; i < bucketSize; i++) { + List addressTxAmountList = addressTxAmountListCacheMap.get(i); + if (addressTxAmountList.size() > 0) { + //addressTxAmountStorage.save(addressTxAmountList); + addressTxAmountList.clear(); + } + } } @EventListener @@ -66,13 +102,14 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } - if (addressTxAmountList.size() > 100) { - addressTxAmountStorage.save(addressTxAmountList); //Save - return; - } +// if (addressTxAmountList.size() > 100) { +// addressTxAmountStorage.save(addressTxAmountList); //Save +// return; +// } if (addressTxAmountList.size() > 0) { - addressTxAmountListCache.addAll(addressTxAmountList); + //TODO -- addressTxAmountListCache.addAll(addressTxAmountList); + save(addressUtxoEvent.getEventMetadata().getBlock(), addressTxAmountList); } } @@ -193,17 +230,25 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo addressTxAmountListCache.addAll(addressTxAmountList); } + var remainingAddressAmtList = addressTxAmountListCacheMap.values().stream() + .flatMap(List::stream) + .toList(); + if (remainingAddressAmtList.size() > 0) { + addressTxAmountListCache.addAll(remainingAddressAmtList); + } + long t1 = System.currentTimeMillis(); if (addressTxAmountListCache.size() > 0) { addressTxAmountStorage.save(addressTxAmountListCache); - log.info("Total {} address_tx_amounts records saved", addressTxAmountListCache.size()); } + long t2 = System.currentTimeMillis(); - log.info("Time taken to save address_tx_amounts records : " + (t2 - t1) + " ms"); + log.info("Time taken to save address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1)); } finally { txInputOutputListCache.clear(); addressTxAmountListCache.clear(); + clearCache(); } } From ae394d74a7f14b4a99068fc684b142667db7faba Mon Sep 17 00:00:00 2001 From: Satya Date: Fri, 8 Mar 2024 09:41:02 +0800 Subject: [PATCH 26/29] chore: Rollback --- .../processor/AddressTxAmountProcessor.java | 55 ++----------------- 1 file changed, 5 insertions(+), 50 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index e80340a3..f0da54dc 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -37,9 +37,6 @@ public class AddressTxAmountProcessor { private List> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); - private Map> addressTxAmountListCacheMap = new HashMap<>(); - private int bucketSize = 10; - private final PlatformTransactionManager transactionManager; private TransactionTemplate transactionTemplate; @@ -47,39 +44,6 @@ public class AddressTxAmountProcessor { void init() { transactionTemplate = new TransactionTemplate(transactionManager); transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - - initAddressTxAmtCache(); - } - - private void initAddressTxAmtCache() { - for (int i = 0; i < bucketSize; i++) { - addressTxAmountListCacheMap.put(i, Collections.synchronizedList(new ArrayList<>())); - } - } - - private void save(long blockNo, List addressTxAmounts) { - int index = (int) (blockNo % bucketSize); - List addressTxAmountList = addressTxAmountListCacheMap.get(index); - addressTxAmountList.addAll(addressTxAmounts); - - if (addressTxAmountList.size() > 1000) { - synchronized (addressTxAmountList) { - addressTxAmountStorage.save(addressTxAmountList); - if (log.isDebugEnabled()) - log.debug("-- Saved address_tx_amounts records : {}", addressTxAmountList.size()); - addressTxAmountList.clear(); - } - } - } - - private void clearCache() { - for (int i = 0; i < bucketSize; i++) { - List addressTxAmountList = addressTxAmountListCacheMap.get(i); - if (addressTxAmountList.size() > 0) { - //addressTxAmountStorage.save(addressTxAmountList); - addressTxAmountList.clear(); - } - } } @EventListener @@ -102,14 +66,13 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } -// if (addressTxAmountList.size() > 100) { -// addressTxAmountStorage.save(addressTxAmountList); //Save -// return; -// } + if (addressTxAmountList.size() > 100) { + addressTxAmountStorage.save(addressTxAmountList); //Save + return; + } if (addressTxAmountList.size() > 0) { - //TODO -- addressTxAmountListCache.addAll(addressTxAmountList); - save(addressUtxoEvent.getEventMetadata().getBlock(), addressTxAmountList); + addressTxAmountListCache.addAll(addressTxAmountList); } } @@ -230,13 +193,6 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo addressTxAmountListCache.addAll(addressTxAmountList); } - var remainingAddressAmtList = addressTxAmountListCacheMap.values().stream() - .flatMap(List::stream) - .toList(); - if (remainingAddressAmtList.size() > 0) { - addressTxAmountListCache.addAll(remainingAddressAmtList); - } - long t1 = System.currentTimeMillis(); if (addressTxAmountListCache.size() > 0) { addressTxAmountStorage.save(addressTxAmountListCache); @@ -248,7 +204,6 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo } finally { txInputOutputListCache.clear(); addressTxAmountListCache.clear(); - clearCache(); } } From a71e257f8e84bf9dcd2ff7a44b6d0204f227564e Mon Sep 17 00:00:00 2001 From: Satya Date: Fri, 8 Mar 2024 09:51:44 +0800 Subject: [PATCH 27/29] chore: Virtual thread --- .../account/storage/impl/AddressTxAmountStorageImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index 57a5e9a8..bcae8840 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -1,6 +1,7 @@ package org.cardanofoundation.ledgersync.account.storage.impl; import com.bloxbean.cardano.yaci.store.account.AccountStoreProperties; +import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor; import com.bloxbean.cardano.yaci.store.common.util.ListUtil; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; @@ -30,6 +31,7 @@ public class AddressTxAmountStorageImpl implements AddressTxAmountStorage { private final DSLContext dsl; private final AccountStoreProperties accountStoreProperties; private final PlatformTransactionManager transactionManager; + private final ParallelExecutor parallelExecutor; private final AggrMapper aggrMapper = AggrMapper.INSTANCE; private TransactionTemplate transactionTemplate; @@ -51,7 +53,7 @@ public void save(List addressTxAmount) { if (accountStoreProperties.isParallelWrite()) { // transactionTemplate.execute(status -> { - ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave); + ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave, parallelExecutor.getVirtualThreadExecutor()); // return null; // }); } else { @@ -98,6 +100,7 @@ private void doSave(List addressTxAmountEntities) { dsl.batch(inserts).execute(); **/ + transactionTemplate.execute(status -> { dsl.batched(c -> { for (var addressTxAmount : addressTxAmountEntities) { c.dsl().insertInto(ADDRESS_TX_AMOUNT) @@ -132,6 +135,8 @@ private void doSave(List addressTxAmountEntities) { .execute(); } }); + return null; + }); } From 1609ec2e7eb4c5488826ac1ef95a6470072433c8 Mon Sep 17 00:00:00 2001 From: Satya Date: Fri, 8 Mar 2024 09:58:57 +0800 Subject: [PATCH 28/29] chore: Virtual thread --- .../processor/AddressTxAmountProcessor.java | 28 +++++++++++++++---- .../impl/AddressTxAmountStorageImpl.java | 2 +- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index f0da54dc..3bf6f619 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -4,6 +4,7 @@ import com.bloxbean.cardano.yaci.store.common.domain.AddressUtxo; import com.bloxbean.cardano.yaci.store.common.domain.Amt; import com.bloxbean.cardano.yaci.store.common.domain.UtxoKey; +import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor; import com.bloxbean.cardano.yaci.store.events.EventMetadata; import com.bloxbean.cardano.yaci.store.events.RollbackEvent; import com.bloxbean.cardano.yaci.store.events.internal.ReadyForBalanceAggregationEvent; @@ -24,6 +25,8 @@ import java.math.BigInteger; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.cardanofoundation.ledgersync.account.util.AddressUtil.getAddress; @@ -38,6 +41,7 @@ public class AddressTxAmountProcessor { private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); private final PlatformTransactionManager transactionManager; + private final ParallelExecutor parallelExecutor; private TransactionTemplate transactionTemplate; @PostConstruct @@ -193,13 +197,25 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo addressTxAmountListCache.addAll(addressTxAmountList); } - long t1 = System.currentTimeMillis(); - if (addressTxAmountListCache.size() > 0) { - addressTxAmountStorage.save(addressTxAmountListCache); - } + var future = CompletableFuture.supplyAsync(() -> { + long t1 = System.currentTimeMillis(); + if (addressTxAmountListCache.size() > 0) { + addressTxAmountStorage.save(addressTxAmountListCache); + } + + long t2 = System.currentTimeMillis(); + log.info("Time taken to save address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1)); - long t2 = System.currentTimeMillis(); - log.info("Time taken to save address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1)); + return null; + }, parallelExecutor.getVirtualThreadExecutor()); + + try { + future.get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } finally { txInputOutputListCache.clear(); diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index bcae8840..4756c349 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -53,7 +53,7 @@ public void save(List addressTxAmount) { if (accountStoreProperties.isParallelWrite()) { // transactionTemplate.execute(status -> { - ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave, parallelExecutor.getVirtualThreadExecutor()); + ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave); // return null; // }); } else { From 0ca3045a540e9778bcdd6f10d78ec7d160615818 Mon Sep 17 00:00:00 2001 From: Satya Date: Fri, 8 Mar 2024 17:43:45 +0800 Subject: [PATCH 29/29] chore: Remove extra columns --- .../account/domain/AddressTxAmount.java | 4 - .../processor/AddressTxAmountProcessor.java | 64 ++------ .../GensisBlockAddressTxAmtProcessor.java | 4 - .../impl/AddressTxAmountStorageImpl.java | 139 ++++++------------ .../impl/model/AddressTxAmountEntity.java | 25 ++-- .../resources/db/account/V2_1_1__init.sql | 28 +--- .../src/main/resources/application.yml | 18 ++- 7 files changed, 92 insertions(+), 190 deletions(-) diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java index a5c1ac22..00d89a0c 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/domain/AddressTxAmount.java @@ -23,11 +23,7 @@ public class AddressTxAmount extends BlockAwareDomain { private String txHash; private Long slot; private BigInteger quantity; - private String policy; - private String assetName; - private String paymentCredential; private String stakeAddress; - private String blockHash; private Integer epoch; } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java index 3bf6f619..2d7420ca 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/AddressTxAmountProcessor.java @@ -4,13 +4,11 @@ import com.bloxbean.cardano.yaci.store.common.domain.AddressUtxo; import com.bloxbean.cardano.yaci.store.common.domain.Amt; import com.bloxbean.cardano.yaci.store.common.domain.UtxoKey; -import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor; import com.bloxbean.cardano.yaci.store.events.EventMetadata; import com.bloxbean.cardano.yaci.store.events.RollbackEvent; import com.bloxbean.cardano.yaci.store.events.internal.ReadyForBalanceAggregationEvent; import com.bloxbean.cardano.yaci.store.utxo.domain.AddressUtxoEvent; import com.bloxbean.cardano.yaci.store.utxo.domain.TxInputOutput; -import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.cardanofoundation.ledgersync.account.domain.AddressTxAmount; @@ -18,15 +16,10 @@ import org.springframework.context.event.EventListener; import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionTemplate; import java.math.BigInteger; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import static org.cardanofoundation.ledgersync.account.util.AddressUtil.getAddress; @@ -34,22 +27,14 @@ @RequiredArgsConstructor @Slf4j public class AddressTxAmountProcessor { + public static final int BLOCK_ADDRESS_TX_AMT_THRESHOLD = 100; //Threshold to save address_tx_amounts records for block + private final AddressTxAmountStorage addressTxAmountStorage; private final UtxoClient utxoClient; - private List> txInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); + private List> pendingTxInputOutputListCache = Collections.synchronizedList(new ArrayList<>()); private List addressTxAmountListCache = Collections.synchronizedList(new ArrayList<>()); - private final PlatformTransactionManager transactionManager; - private final ParallelExecutor parallelExecutor; - private TransactionTemplate transactionTemplate; - - @PostConstruct - void init() { - transactionTemplate = new TransactionTemplate(transactionManager); - transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - } - @EventListener @Transactional public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { @@ -70,12 +55,11 @@ public void processAddressUtxoEvent(AddressUtxoEvent addressUtxoEvent) { addressTxAmountList.addAll(txAddressTxAmountEntities); } - if (addressTxAmountList.size() > 100) { + if (addressTxAmountList.size() > BLOCK_ADDRESS_TX_AMT_THRESHOLD) { + if (log.isDebugEnabled()) + log.debug("Saving address_tx_amounts records : {} -- {}", addressTxAmountList.size(), addressUtxoEvent.getEventMetadata().getBlock()); addressTxAmountStorage.save(addressTxAmountList); //Save - return; - } - - if (addressTxAmountList.size() > 0) { + } else if (addressTxAmountList.size() > 0) { addressTxAmountListCache.addAll(addressTxAmountList); } } @@ -102,7 +86,7 @@ private List processAddressAmountForTx(EventMetadata metadata, if (throwExceptionOnFailure) throw new IllegalStateException("Unable to get inputs for all input keys for account balance calculation : " + inputUtxoKeys); else - txInputOutputListCache.add(Pair.of(metadata, txInputOutput)); + pendingTxInputOutputListCache.add(Pair.of(metadata, txInputOutput)); return Collections.emptyList(); } @@ -166,12 +150,8 @@ private List processTxAmount(String txHash, EventMetadata metad .slot(metadata.getSlot()) .quantity(entry.getValue()) .stakeAddress(addressDetails.ownerStakeAddress) - .assetName(assetDetails.assetName) - .policy(assetDetails.policy) - .paymentCredential(addressDetails.ownerPaymentCredential) .epoch(metadata.getEpochNumber()) .blockNumber(metadata.getBlock()) - .blockHash(metadata.getBlockHash()) .blockTime(metadata.getBlockTime()) .build(); }).toList(); @@ -182,7 +162,7 @@ private List processTxAmount(String txHash, EventMetadata metad public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyForBalanceAggregationEvent) { try { List addressTxAmountList = new ArrayList<>(); - for (var pair : txInputOutputListCache) { + for (var pair : pendingTxInputOutputListCache) { EventMetadata metadata = pair.getFirst(); TxInputOutput txInputOutput = pair.getSecond(); @@ -197,28 +177,16 @@ public void handleRemainingTxInputOuputs(ReadyForBalanceAggregationEvent readyFo addressTxAmountListCache.addAll(addressTxAmountList); } - var future = CompletableFuture.supplyAsync(() -> { - long t1 = System.currentTimeMillis(); - if (addressTxAmountListCache.size() > 0) { - addressTxAmountStorage.save(addressTxAmountListCache); - } - - long t2 = System.currentTimeMillis(); - log.info("Time taken to save address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1)); - - return null; - }, parallelExecutor.getVirtualThreadExecutor()); - - try { - future.get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); + long t1 = System.currentTimeMillis(); + if (addressTxAmountListCache.size() > 0) { + addressTxAmountStorage.save(addressTxAmountListCache); } + long t2 = System.currentTimeMillis(); + log.info("Time taken to save additional address_tx_amounts records : {}, time: {} ms", addressTxAmountListCache.size(), (t2 - t1)); + } finally { - txInputOutputListCache.clear(); + pendingTxInputOutputListCache.clear(); addressTxAmountListCache.clear(); } } diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java index 1ca05d9b..e9cf050d 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/processor/GensisBlockAddressTxAmtProcessor.java @@ -69,12 +69,8 @@ public void handleAddressTxAmtForGenesisBlock(GenesisBlockEvent genesisBlockEven .slot(genesisBlockEvent.getSlot()) .quantity(balance) .stakeAddress(stakeAddress) - .assetName(LOVELACE) - .policy(null) - .paymentCredential(ownerPaymentCredential) .epoch(0) .blockNumber(genesisBlockEvent.getBlock()) - .blockHash(genesisBlockEvent.getBlockHash()) .blockTime(genesisBlockEvent.getBlockTime()) .build(); diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java index 4756c349..61fa6b2c 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/AddressTxAmountStorageImpl.java @@ -1,7 +1,6 @@ package org.cardanofoundation.ledgersync.account.storage.impl; import com.bloxbean.cardano.yaci.store.account.AccountStoreProperties; -import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor; import com.bloxbean.cardano.yaci.store.common.util.ListUtil; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; @@ -13,12 +12,8 @@ import org.cardanofoundation.ledgersync.account.storage.impl.repository.AddressTxAmountRepository; import org.jooq.DSLContext; import org.springframework.stereotype.Component; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionTemplate; -import java.time.LocalDateTime; import java.util.List; import static org.cardanofoundation.ledgersync.account.jooq.Tables.ADDRESS_TX_AMOUNT; @@ -30,18 +25,12 @@ public class AddressTxAmountStorageImpl implements AddressTxAmountStorage { private final AddressTxAmountRepository addressTxAmountRepository; private final DSLContext dsl; private final AccountStoreProperties accountStoreProperties; - private final PlatformTransactionManager transactionManager; - private final ParallelExecutor parallelExecutor; private final AggrMapper aggrMapper = AggrMapper.INSTANCE; - private TransactionTemplate transactionTemplate; @PostConstruct public void postConstruct() { this.dsl.settings().setBatchSize(accountStoreProperties.getJooqWriteBatchSize()); - - transactionTemplate = new TransactionTemplate(transactionManager); - transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); } @Override @@ -51,93 +40,63 @@ public void save(List addressTxAmount) { .map(addressTxAmount1 -> aggrMapper.toAddressTxAmountEntity(addressTxAmount1)) .toList(); - if (accountStoreProperties.isParallelWrite()) { -// transactionTemplate.execute(status -> { - ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::doSave); -// return null; -// }); + if (accountStoreProperties.isParallelWrite() + && addressTxAmtEntities.size() > accountStoreProperties.getPerThreadBatchSize()) { + ListUtil.partitionAndApplyInParallel(addressTxAmtEntities, accountStoreProperties.getPerThreadBatchSize(), this::saveBatch); } else { - doSave(addressTxAmtEntities); + saveBatch(addressTxAmtEntities); } } - private void doSave(List addressTxAmountEntities) { - LocalDateTime localDateTime = LocalDateTime.now(); - - /** + private void saveBatch(List addressTxAmountEntities) { var inserts = addressTxAmountEntities.stream() - .map(addressTxAmount -> { - return dsl.insertInto(ADDRESS_TX_AMOUNT) - .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) - .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) - .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) - .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) - .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) - .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) - .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) - .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) - .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) - .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) - .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) - .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) - .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) - .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) - .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime) - .onDuplicateKeyUpdate() - .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) - .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) - .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) - .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) - .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) - .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) - .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) - .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) - .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) - .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) - .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) - .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime); - }).toList(); + .map(addressTxAmount -> dsl.insertInto(ADDRESS_TX_AMOUNT) + .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) + .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) + .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .onDuplicateKeyUpdate() + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch())).toList(); dsl.batch(inserts).execute(); - **/ - transactionTemplate.execute(status -> { - dsl.batched(c -> { - for (var addressTxAmount : addressTxAmountEntities) { - c.dsl().insertInto(ADDRESS_TX_AMOUNT) - .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) - .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) - .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) - .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) - .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) - .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) - .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) - .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) - .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) - .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) - .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) - .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) - .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) - .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) - .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime) - .onDuplicateKeyUpdate() - .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) - .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) - .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) - .set(ADDRESS_TX_AMOUNT.POLICY, addressTxAmount.getPolicy()) - .set(ADDRESS_TX_AMOUNT.ASSET_NAME, addressTxAmount.getAssetName()) - .set(ADDRESS_TX_AMOUNT.PAYMENT_CREDENTIAL, addressTxAmount.getPaymentCredential()) - .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) - .set(ADDRESS_TX_AMOUNT.BLOCK_HASH, addressTxAmount.getBlockHash()) - .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) - .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) - .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) - .set(ADDRESS_TX_AMOUNT.UPDATE_DATETIME, localDateTime) - .execute(); - } - }); - return null; + /** + dsl.batched(c -> { + for (var addressTxAmount : addressTxAmountEntities) { + c.dsl().insertInto(ADDRESS_TX_AMOUNT) + .set(ADDRESS_TX_AMOUNT.ADDRESS, addressTxAmount.getAddress()) + .set(ADDRESS_TX_AMOUNT.UNIT, addressTxAmount.getUnit()) + .set(ADDRESS_TX_AMOUNT.TX_HASH, addressTxAmount.getTxHash()) + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .onDuplicateKeyUpdate() + .set(ADDRESS_TX_AMOUNT.SLOT, addressTxAmount.getSlot()) + .set(ADDRESS_TX_AMOUNT.QUANTITY, addressTxAmount.getQuantity()) + .set(ADDRESS_TX_AMOUNT.ADDR_FULL, addressTxAmount.getAddrFull()) + .set(ADDRESS_TX_AMOUNT.STAKE_ADDRESS, addressTxAmount.getStakeAddress()) + .set(ADDRESS_TX_AMOUNT.BLOCK, addressTxAmount.getBlockNumber()) + .set(ADDRESS_TX_AMOUNT.BLOCK_TIME, addressTxAmount.getBlockTime()) + .set(ADDRESS_TX_AMOUNT.EPOCH, addressTxAmount.getEpoch()) + .execute(); + } }); - + **/ } @Override diff --git a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java index cdd83420..bff2d895 100644 --- a/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java +++ b/aggregates/account/src/main/java/org/cardanofoundation/ledgersync/account/storage/impl/model/AddressTxAmountEntity.java @@ -1,11 +1,10 @@ package org.cardanofoundation.ledgersync.account.storage.impl.model; -import com.bloxbean.cardano.yaci.store.common.model.BlockAwareEntity; import jakarta.persistence.*; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; import org.hibernate.annotations.DynamicUpdate; import java.math.BigInteger; @@ -13,12 +12,12 @@ @Data @NoArgsConstructor @AllArgsConstructor -@SuperBuilder +@Builder @Entity @Table(name = "address_tx_amount") @IdClass(AddressTxAmountId.class) @DynamicUpdate -public class AddressTxAmountEntity extends BlockAwareEntity { +public class AddressTxAmountEntity { @Id @Column(name = "address") private String address; @@ -41,21 +40,15 @@ public class AddressTxAmountEntity extends BlockAwareEntity { @Column(name = "addr_full") private String addrFull; - @Column(name = "policy") - private String policy; - - @Column(name = "asset_name") - private String assetName; - - @Column(name = "payment_credential") - private String paymentCredential; - @Column(name = "stake_address") private String stakeAddress; - @Column(name = "block_hash") - private String blockHash; - @Column(name = "epoch") private Integer epoch; + + @Column(name = "block") + private Long blockNumber; + + @Column(name = "block_time") + private Long blockTime; } diff --git a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql index b5e4e039..bc42aea3 100644 --- a/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql +++ b/aggregates/account/src/main/resources/db/account/V2_1_1__init.sql @@ -7,36 +7,12 @@ create table address_tx_amount slot bigint, quantity numeric(38) null, addr_full text, - policy varchar(56), - asset_name varchar(255), - payment_credential varchar(56), stake_address varchar(255), - block_hash varchar(64), block bigint, block_time bigint, epoch integer, - update_datetime timestamp, primary key (address, unit, tx_hash) ); --- CREATE INDEX idx_address_tx_amount_slot --- ON address_tx_amount(slot); --- --- -- address_balance_view --- drop view if exists address_balance_view; --- create view address_balance_view as --- select ab.* --- from address_balance ab --- inner join (select address, unit, max(slot) as max_slot --- from address_balance ab2 --- group by address, unit) max_ab --- on ab.address = max_ab.address and ab.unit = max_ab.unit and ab.slot = max_ab.max_slot; --- --- -- stake_address_balance_view --- drop view if exists stake_address_balance_view; --- create view stake_address_balance_view AS --- select sb.* --- from stake_address_balance sb --- inner join (select address, MAX(slot) as max_slot --- from stake_address_balance sb2 --- group by address) max_sb on sb.address = max_sb.address and sb.slot = max_sb.max_slot; +CREATE INDEX idx_address_tx_amount_slot + ON address_tx_amount(slot); diff --git a/aggregation-app/src/main/resources/application.yml b/aggregation-app/src/main/resources/application.yml index c8bff9bd..cad62292 100644 --- a/aggregation-app/src/main/resources/application.yml +++ b/aggregation-app/src/main/resources/application.yml @@ -9,6 +9,16 @@ spring: - classpath:db/store/{vendor} - classpath:db/account out-of-order: true + datasource: + hikari: + maximum-pool-size: 30 + minimum-idle: 5 + jpa: + properties: + hibernate: + jdbc: + batch_size: 100 + order_inserts: true apiPrefix: /api/v1 @@ -19,10 +29,14 @@ logging: store: event-publisher-id: 1000 auto-index-management: true + cardano: + keep-alive-interval: 3000 account: enabled: true balance-aggregation-enabled: true - history-cleanup-enabled: false + history-cleanup-enabled: true + # 3 months + balance-cleanup-slot-count: 7889238 api-enabled: true parallel-write: true per-thread-batch-size: 6000 @@ -34,4 +48,4 @@ store: blocks-batch-size: 100 blocks-partition-size: 10 use-virtual-thread-for-batch-processing: false - use-virtual-thread-for-event-processing: true + use-virtual-thread-for-event-processing: false