Skip to content

Commit

Permalink
Pipe: Introduce restart strategy to control resources' memory only us…
Browse files Browse the repository at this point in the history
…ed by pipe hardlinked files (#14279)
  • Loading branch information
SteveYurongSu authored Dec 2, 2024
1 parent d101d76 commit ba26460
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,19 @@ private Set<PipeMeta> findAllStuckPipes() {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
stuckPipes.add(pipeMeta);
}
LOGGER.warn(
"All {} pipe(s) will be restarted because of forced restart policy.", stuckPipes.size());
return stuckPipes;
}

if (3 * PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize()
>= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()) {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
stuckPipes.add(pipeMeta);
}
LOGGER.warn(
"All {} pipe(s) will be restarted because linked tsfiles' resource size exceeds memory limit.",
stuckPipes.size());
return stuckPipes;
}

Expand Down Expand Up @@ -527,7 +540,7 @@ && mayDeletedTsFileSizeReachDangerousThreshold()) {
continue;
}

// Only restart the stream mode pipes for releasing memTables.
// Try to restart the stream mode pipes for releasing memTables.
if (extractors.get(0).isStreamMode()) {
if (extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
&& (mayMemTablePinnedCountReachDangerousThreshold()
Expand All @@ -538,8 +551,7 @@ && mayDeletedTsFileSizeReachDangerousThreshold()) {
pipeMeta.getStaticMeta());
stuckPipes.add(pipeMeta);
} else if (getFloatingMemoryUsageInByte(pipeName)
>= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
- PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
>= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
/ pipeMetaKeeper.getPipeMetaCount()) {
// Extractors of this pipe may have too many insert nodes
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,7 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold() {
return 3
* PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName)
* PipeDataNodeAgent.task().getPipeCount()
>= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
- PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
* 2;
>= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,10 @@ public long getUsedMemorySizeInBytes() {
return usedMemorySizeInBytes;
}

public long getFreeMemorySizeInBytes() {
return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes;
}

public long getTotalMemorySizeInBytes() {
return TOTAL_MEMORY_SIZE_IN_BYTES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public long getFileSize() {
return fileSize;
}

public long getTsFileResourceSize() {
return Objects.nonNull(tsFileResource) ? tsFileResource.calculateRamSize() : 0;
}

///////////////////// Reference Count /////////////////////

public int getReferenceCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,23 @@ public long getTotalLinkedButDeletedTsfileSize() {
return 0;
}
}

public long getTotalLinkedButDeletedTsfileResourceRamSize() {
long totalLinkedButDeletedTsfileResourceRamSize = 0;
try {
for (final Map.Entry<String, PipeTsFileResource> resourceEntry :
hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet()) {
final PipeTsFileResource pipeTsFileResource = resourceEntry.getValue();
// If the original TsFile is not deleted, the memory of the resource is not counted
// because the memory of the resource is controlled by TsFileResourceManager.
if (pipeTsFileResource.isOriginalTsFileDeleted()) {
totalLinkedButDeletedTsfileResourceRamSize += pipeTsFileResource.getTsFileResourceSize();
}
}
return totalLinkedButDeletedTsfileResourceRamSize;
} catch (final Exception e) {
LOGGER.warn("failed to get total size of linked but deleted TsFiles resource ram size: ", e);
return totalLinkedButDeletedTsfileResourceRamSize;
}
}
}

0 comments on commit ba26460

Please sign in to comment.