From 69a124a1895b795ad016faf5123bc5dc315936e6 Mon Sep 17 00:00:00 2001 From: Sotatek-HuyLe3a Date: Sun, 5 Nov 2023 18:23:47 +0700 Subject: [PATCH] feat: consume data by batch --- .../listeners/BlockEventListener.java | 21 ++++++-- .../listeners/LedgerSyncEventListener.java | 50 +++++++++++++++++++ .../repository/BlockRepository.java | 2 + .../impl/block/BlockSyncServiceImpl.java | 6 +-- .../main/resources/config/application.yaml | 2 +- gradle/libs.versions.toml | 4 +- 6 files changed, 74 insertions(+), 11 deletions(-) create mode 100644 application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/LedgerSyncEventListener.java diff --git a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/BlockEventListener.java b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/BlockEventListener.java index d75ffc0c4..57476cac7 100644 --- a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/BlockEventListener.java +++ b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/BlockEventListener.java @@ -10,6 +10,7 @@ import org.cardanofoundation.ledgersync.explorerconsumer.service.impl.block.BlockAggregatorServiceImpl; import org.cardanofoundation.ledgersync.explorerconsumer.service.impl.block.ByronEbbAggregatorServiceImpl; import org.cardanofoundation.ledgersync.explorerconsumer.service.impl.block.ByronMainAggregatorServiceImpl; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -33,10 +34,15 @@ public class BlockEventListener { private final BlockRepository blockRepository; private final MetricCollectorService metricCollectorService; - private final AtomicInteger blockCount = new AtomicInteger(0); - private AtomicLong blockHeight; + @Value("${blocks.batch-size}") + private Integer batchSize; + @Value("${blocks.commitThreshold}") + private Long commitThreshold; + + private final AtomicLong lastMessageReceivedTime = new AtomicLong(System.currentTimeMillis()); + private AtomicLong blockHeight; private long lastLog; @PostConstruct @@ -46,7 +52,6 @@ private void initBlockHeight() { log.info("Block height {}", blockNo); } - @EventListener @Transactional public void handleBlockEvent(BlockEvent blockEvent) { @@ -119,6 +124,8 @@ private boolean checkIfBlockExists(EventMetadata metadata) { private void handleAggregateBlock(EventMetadata eventMetadata, AggregatedBlock aggregatedBlock) { try { long currentTime = System.currentTimeMillis(); + long lastReceivedTimeElapsed = currentTime - lastMessageReceivedTime.getAndSet(currentTime); + if (currentTime - lastLog >= 500) {//reduce log log.info("Block number {}, slot_no {}, hash {}", eventMetadata.getBlock(), eventMetadata.getSlot(), eventMetadata.getBlockHash()); @@ -156,8 +163,12 @@ private void handleAggregateBlock(EventMetadata eventMetadata, AggregatedBlock a //AggregatedBlock aggregatedBlock = aggregatorServiceFactory.aggregateBlock(eraBlock); blockDataService.saveAggregatedBlock(aggregatedBlock); - blockSyncService.startBlockSyncing(); - blockCount.set(0); + int currentBlockCount = blockCount.incrementAndGet(); + if (currentBlockCount % batchSize == 0 || lastReceivedTimeElapsed >= commitThreshold) { + blockSyncService.startBlockSyncing(); + blockCount.set(0); + } + } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage()); diff --git a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/LedgerSyncEventListener.java b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/LedgerSyncEventListener.java new file mode 100644 index 000000000..7d8f9505d --- /dev/null +++ b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/LedgerSyncEventListener.java @@ -0,0 +1,50 @@ +package org.cardanofoundation.ledgersync.explorerconsumer.listeners; + +import com.bloxbean.cardano.yaci.store.core.domain.Cursor; +import com.bloxbean.cardano.yaci.store.core.service.CursorService; +import com.bloxbean.cardano.yaci.store.core.service.StartService; +import com.bloxbean.cardano.yaci.store.core.storage.api.CursorStorage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.cardanofoundation.ledgersync.explorerconsumer.repository.BlockRepository; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Slf4j +public class LedgerSyncEventListener { + private final StartService startService; + private final CursorService cursorService; + private final CursorStorage cursorStorage; + private final BlockRepository blockRepository; + + @Value("${store.sync-auto-start: true}") + private boolean syncAutoStart; + @Value("${store.event-publisher-id: 1}") + private Long eventPublisherId; + + @EventListener + public void initialize(ApplicationReadyEvent applicationReadyEvent) { + long slotHeight = blockRepository.getSlotHeight().orElse(0L); + + if (slotHeight == 0) { + if (!syncAutoStart) { + startService.start(); + } + return; + } + + Cursor currentCursor = cursorService.getCursor().orElse(null); + if (currentCursor != null && currentCursor.getSlot() > slotHeight) { + log.info("Delete cursors with slot greater than {}", slotHeight); + cursorStorage.deleteBySlotGreaterThan(eventPublisherId, slotHeight); + } + + if (!syncAutoStart) { + startService.start(); + } + } +} diff --git a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/repository/BlockRepository.java b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/repository/BlockRepository.java index 8dddda95a..349ed9fe6 100644 --- a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/repository/BlockRepository.java +++ b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/repository/BlockRepository.java @@ -20,6 +20,8 @@ public interface BlockRepository extends JpaRepository { @Query("SELECT MAX(block.blockNo) FROM Block block") Optional getBlockHeight(); + @Query("SELECT MAX(block.slotNo) FROM Block block") + Optional getSlotHeight(); @Query("SELECT MAX(block.id) FROM Block block") Optional getBlockIdHeight(); diff --git a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/impl/block/BlockSyncServiceImpl.java b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/impl/block/BlockSyncServiceImpl.java index b0ceac711..b6032a7f1 100644 --- a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/impl/block/BlockSyncServiceImpl.java +++ b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/impl/block/BlockSyncServiceImpl.java @@ -57,9 +57,9 @@ public void handleBlockSync() { AggregatedBlock firstBlock = firstAndLastBlock.getFirst(); AggregatedBlock lastBlock = firstAndLastBlock.getSecond(); -// log.trace("Commit from block {} to block {} ", -// firstBlock.getBlockNo(), -// lastBlock.getBlockNo()); + log.trace("Commit from block {} to block {} ", + firstBlock.getBlockNo(), + lastBlock.getBlockNo()); metricCollectorService.collectCountBlockProcessingMetric(firstBlock, lastBlock); metricCollectorService.collectCurrentBlockMetric(lastBlock); diff --git a/application/src/main/resources/config/application.yaml b/application/src/main/resources/config/application.yaml index ad055216d..dc0a87d65 100755 --- a/application/src/main/resources/config/application.yaml +++ b/application/src/main/resources/config/application.yaml @@ -66,7 +66,7 @@ logging: org.cardanofoundation: ${LOG:TRACE} blocks: - batch-size: ${BLOCKS_BATCH_SIZE:1000} + batch-size: ${BLOCKS_BATCH_SIZE:100} commitThreshold: ${COMMIT_THRESHOLD:3000} genesis: diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4578c22b8..7bf859cc4 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,6 +1,6 @@ [libraries] -yaci-store-starter="com.bloxbean.cardano:yaci-store-spring-boot-starter:0.0.13-beta2-3e96927-SNAPSHOT" -yaci-store-remote-starter="com.bloxbean.cardano:yaci-store-remote-spring-boot-starter:0.0.13-beta2-3e96927-SNAPSHOT" +yaci-store-starter="com.bloxbean.cardano:yaci-store-spring-boot-starter:0.0.13-beta2-59c24e4-SNAPSHOT" +yaci-store-remote-starter="com.bloxbean.cardano:yaci-store-remote-spring-boot-starter:0.0.13-beta2-59c24e4-SNAPSHOT" cardano-client-lib="com.bloxbean.cardano:cardano-client-lib:0.5.0" snakeyaml="org.yaml:snakeyaml:2.0"