Skip to content

Commit

Permalink
feat: consume data by batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Sotatek-HuyLe3a committed Nov 9, 2023
1 parent 5575b05 commit 69a124a
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.cardanofoundation.ledgersync.explorerconsumer.service.impl.block.BlockAggregatorServiceImpl;
import org.cardanofoundation.ledgersync.explorerconsumer.service.impl.block.ByronEbbAggregatorServiceImpl;
import org.cardanofoundation.ledgersync.explorerconsumer.service.impl.block.ByronMainAggregatorServiceImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -33,10 +34,15 @@ public class BlockEventListener {

private final BlockRepository blockRepository;
private final MetricCollectorService metricCollectorService;

private final AtomicInteger blockCount = new AtomicInteger(0);
private AtomicLong blockHeight;

@Value("${blocks.batch-size}")
private Integer batchSize;
@Value("${blocks.commitThreshold}")
private Long commitThreshold;

private final AtomicLong lastMessageReceivedTime = new AtomicLong(System.currentTimeMillis());
private AtomicLong blockHeight;
private long lastLog;

@PostConstruct
Expand All @@ -46,7 +52,6 @@ private void initBlockHeight() {
log.info("Block height {}", blockNo);
}


@EventListener
@Transactional
public void handleBlockEvent(BlockEvent blockEvent) {
Expand Down Expand Up @@ -119,6 +124,8 @@ private boolean checkIfBlockExists(EventMetadata metadata) {
private void handleAggregateBlock(EventMetadata eventMetadata, AggregatedBlock aggregatedBlock) {
try {
long currentTime = System.currentTimeMillis();
long lastReceivedTimeElapsed = currentTime - lastMessageReceivedTime.getAndSet(currentTime);

if (currentTime - lastLog >= 500) {//reduce log
log.info("Block number {}, slot_no {}, hash {}",
eventMetadata.getBlock(), eventMetadata.getSlot(), eventMetadata.getBlockHash());
Expand Down Expand Up @@ -156,8 +163,12 @@ private void handleAggregateBlock(EventMetadata eventMetadata, AggregatedBlock a

//AggregatedBlock aggregatedBlock = aggregatorServiceFactory.aggregateBlock(eraBlock);
blockDataService.saveAggregatedBlock(aggregatedBlock);
blockSyncService.startBlockSyncing();
blockCount.set(0);
int currentBlockCount = blockCount.incrementAndGet();
if (currentBlockCount % batchSize == 0 || lastReceivedTimeElapsed >= commitThreshold) {
blockSyncService.startBlockSyncing();
blockCount.set(0);
}

} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.cardanofoundation.ledgersync.explorerconsumer.listeners;

import com.bloxbean.cardano.yaci.store.core.domain.Cursor;
import com.bloxbean.cardano.yaci.store.core.service.CursorService;
import com.bloxbean.cardano.yaci.store.core.service.StartService;
import com.bloxbean.cardano.yaci.store.core.storage.api.CursorStorage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.cardanofoundation.ledgersync.explorerconsumer.repository.BlockRepository;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class LedgerSyncEventListener {
private final StartService startService;
private final CursorService cursorService;
private final CursorStorage cursorStorage;
private final BlockRepository blockRepository;

@Value("${store.sync-auto-start: true}")
private boolean syncAutoStart;
@Value("${store.event-publisher-id: 1}")
private Long eventPublisherId;

@EventListener
public void initialize(ApplicationReadyEvent applicationReadyEvent) {
long slotHeight = blockRepository.getSlotHeight().orElse(0L);

if (slotHeight == 0) {
if (!syncAutoStart) {
startService.start();
}
return;
}

Cursor currentCursor = cursorService.getCursor().orElse(null);
if (currentCursor != null && currentCursor.getSlot() > slotHeight) {
log.info("Delete cursors with slot greater than {}", slotHeight);
cursorStorage.deleteBySlotGreaterThan(eventPublisherId, slotHeight);
}

if (!syncAutoStart) {
startService.start();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public interface BlockRepository extends JpaRepository<Block, Long> {

@Query("SELECT MAX(block.blockNo) FROM Block block")
Optional<Long> getBlockHeight();
@Query("SELECT MAX(block.slotNo) FROM Block block")
Optional<Long> getSlotHeight();

@Query("SELECT MAX(block.id) FROM Block block")
Optional<Long> getBlockIdHeight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public void handleBlockSync() {
AggregatedBlock firstBlock = firstAndLastBlock.getFirst();
AggregatedBlock lastBlock = firstAndLastBlock.getSecond();

// log.trace("Commit from block {} to block {} ",
// firstBlock.getBlockNo(),
// lastBlock.getBlockNo());
log.trace("Commit from block {} to block {} ",
firstBlock.getBlockNo(),
lastBlock.getBlockNo());

metricCollectorService.collectCountBlockProcessingMetric(firstBlock, lastBlock);
metricCollectorService.collectCurrentBlockMetric(lastBlock);
Expand Down
2 changes: 1 addition & 1 deletion application/src/main/resources/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ logging:
org.cardanofoundation: ${LOG:TRACE}

blocks:
batch-size: ${BLOCKS_BATCH_SIZE:1000}
batch-size: ${BLOCKS_BATCH_SIZE:100}
commitThreshold: ${COMMIT_THRESHOLD:3000}

genesis:
Expand Down
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[libraries]
yaci-store-starter="com.bloxbean.cardano:yaci-store-spring-boot-starter:0.0.13-beta2-3e96927-SNAPSHOT"
yaci-store-remote-starter="com.bloxbean.cardano:yaci-store-remote-spring-boot-starter:0.0.13-beta2-3e96927-SNAPSHOT"
yaci-store-starter="com.bloxbean.cardano:yaci-store-spring-boot-starter:0.0.13-beta2-59c24e4-SNAPSHOT"
yaci-store-remote-starter="com.bloxbean.cardano:yaci-store-remote-spring-boot-starter:0.0.13-beta2-59c24e4-SNAPSHOT"

cardano-client-lib="com.bloxbean.cardano:cardano-client-lib:0.5.0"
snakeyaml="org.yaml:snakeyaml:2.0"
Expand Down

0 comments on commit 69a124a

Please sign in to comment.