From ba2646059ea8c6c7fa5fc561bbba00e066fb9b75 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Mon, 2 Dec 2024 20:05:19 +0800 Subject: [PATCH] Pipe: Introduce restart strategy to control resources' memory only used by pipe hardlinked files (#14279) --- .../agent/task/PipeDataNodeTaskAgent.java | 18 +++++++++++++++--- ...PipeRealtimeDataRegionHybridExtractor.java | 4 +--- .../resource/memory/PipeMemoryManager.java | 4 ++++ .../resource/tsfile/PipeTsFileResource.java | 4 ++++ .../tsfile/PipeTsFileResourceManager.java | 19 +++++++++++++++++++ 5 files changed, 43 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index a25029f6a583..518b4b204248 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -497,6 +497,19 @@ private Set findAllStuckPipes() { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { stuckPipes.add(pipeMeta); } + LOGGER.warn( + "All {} pipe(s) will be restarted because of forced restart policy.", stuckPipes.size()); + return stuckPipes; + } + + if (3 * PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize() + >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()) { + for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { + stuckPipes.add(pipeMeta); + } + LOGGER.warn( + "All {} pipe(s) will be restarted because linked tsfiles' resource size exceeds memory limit.", + stuckPipes.size()); return stuckPipes; } @@ -527,7 +540,7 @@ && mayDeletedTsFileSizeReachDangerousThreshold()) { continue; } - // Only restart the stream mode pipes for releasing memTables. + // Try to restart the stream mode pipes for releasing memTables. if (extractors.get(0).isStreamMode()) { if (extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles) && (mayMemTablePinnedCountReachDangerousThreshold() @@ -538,8 +551,7 @@ && mayDeletedTsFileSizeReachDangerousThreshold()) { pipeMeta.getStaticMeta()); stuckPipes.add(pipeMeta); } else if (getFloatingMemoryUsageInByte(pipeName) - >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() - - PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()) + >= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes() / pipeMetaKeeper.getPipeMetaCount()) { // Extractors of this pipe may have too many insert nodes LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index 95437f162b08..f3e88b03dd36 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -250,9 +250,7 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold() { return 3 * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName) * PipeDataNodeAgent.task().getPipeCount() - >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() - - PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()) - * 2; + >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index 8ae6235099c7..2bda0e32a10c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -499,6 +499,10 @@ public long getUsedMemorySizeInBytes() { return usedMemorySizeInBytes; } + public long getFreeMemorySizeInBytes() { + return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes; + } + public long getTotalMemorySizeInBytes() { return TOTAL_MEMORY_SIZE_IN_BYTES; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java index 7bb67e781f2b..1c2a46e9b595 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java @@ -97,6 +97,10 @@ public long getFileSize() { return fileSize; } + public long getTsFileResourceSize() { + return Objects.nonNull(tsFileResource) ? tsFileResource.calculateRamSize() : 0; + } + ///////////////////// Reference Count ///////////////////// public int getReferenceCount() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index e9f58753a042..9607c41a94b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -355,4 +355,23 @@ public long getTotalLinkedButDeletedTsfileSize() { return 0; } } + + public long getTotalLinkedButDeletedTsfileResourceRamSize() { + long totalLinkedButDeletedTsfileResourceRamSize = 0; + try { + for (final Map.Entry resourceEntry : + hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet()) { + final PipeTsFileResource pipeTsFileResource = resourceEntry.getValue(); + // If the original TsFile is not deleted, the memory of the resource is not counted + // because the memory of the resource is controlled by TsFileResourceManager. + if (pipeTsFileResource.isOriginalTsFileDeleted()) { + totalLinkedButDeletedTsfileResourceRamSize += pipeTsFileResource.getTsFileResourceSize(); + } + } + return totalLinkedButDeletedTsfileResourceRamSize; + } catch (final Exception e) { + LOGGER.warn("failed to get total size of linked but deleted TsFiles resource ram size: ", e); + return totalLinkedButDeletedTsfileResourceRamSize; + } + } }