Skip to content

Commit

Permalink
setup
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Nov 18, 2024
1 parent 393327b commit afee1ef
Showing 1 changed file with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
Expand Down Expand Up @@ -506,7 +507,14 @@ private void flushTsFilesForExtraction(
final long lastFlushedByPipeTime = DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
dataRegion.syncCloseAllWorkingTsFileProcessors();
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis());
// Consider the scenario: a consensus pipe comes to the same region, followed by a user pipe
// **immediately**.
// Since a large number of consensus pipes are not created at the same time, resulting in no
// serious waiting for locks, the lastFlushedByPipeTime timestamp is not updated for the
// consensus pipe.
if (!pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis());
}
LOGGER.info(
"Pipe {}@{}: finish to flush data region, took {} ms",
pipeName,
Expand Down

0 comments on commit afee1ef

Please sign in to comment.