diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index ec0a33ee677b..6aa530b28604 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -19,6 +19,8 @@ package org.apache.iotdb.confignode.procedure.impl.pipe; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.confignode.manager.pipe.metric.PipeProcedureMetrics; import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; @@ -30,6 +32,9 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure; import org.apache.iotdb.confignode.procedure.state.ProcedureLockState; import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.confignode.service.ConfigNode; +import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; @@ -390,9 +395,8 @@ protected Map pushPipeMetaToDataNodes(ConfigNodeProc throws IOException { final List pipeMetaBinaryList = new ArrayList<>(); for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) { - pipeMetaBinaryList.add(pipeMeta.serialize()); + pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize()); } - return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList); } @@ -407,10 +411,9 @@ protected Map pushPipeMetaToDataNodes(ConfigNodeProc public static Map pushPipeMetaToDataNodes( ConfigNodeProcedureEnv env, AtomicReference pipeTaskInfo) throws IOException { final List pipeMetaBinaryList = new ArrayList<>(); - for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) { - pipeMetaBinaryList.add(pipeMeta.serialize()); + for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) { + pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize()); } - return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList); } @@ -487,7 +490,9 @@ protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv env protected Map pushSinglePipeMetaToDataNodes( String pipeName, ConfigNodeProcedureEnv env) throws IOException { return env.pushSinglePipeMetaToDataNodes( - pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize()); + copyAndFilterOutNonWorkingDataRegionPipeTasks( + pipeTaskInfo.get().getPipeMetaByPipeName(pipeName)) + .serialize()); } /** @@ -502,6 +507,68 @@ protected Map dropSinglePipeOnDataNodes( return env.dropSinglePipeOnDataNodes(pipeName); } + public static PipeMeta copyAndFilterOutNonWorkingDataRegionPipeTasks(PipeMeta originalPipeMeta) + throws IOException { + final PipeMeta copiedPipeMeta = originalPipeMeta.deepCopy4TaskAgent(); + + copiedPipeMeta + .getRuntimeMeta() + .getConsensusGroupId2TaskMetaMap() + .entrySet() + .removeIf( + consensusGroupId2TaskMeta -> { + final String database; + try { + database = + ConfigNode.getInstance() + .getConfigManager() + .getPartitionManager() + .getRegionDatabase( + new TConsensusGroupId( + // We assume that the consensus group id is a data region id. + TConsensusGroupType.DataRegion, + consensusGroupId2TaskMeta.getKey())); + if (database == null) { + // If the consensus group id is not a data region id, we keep it. + // If the consensus group id is a data region id, but the database is not found, + // we keep it. + return false; + } + } catch (final Exception ignore) { + // In case of any exception, we keep the consensus group id. + return false; + } + + final boolean isTableModel; + try { + final TDatabaseSchema schema = + ConfigNode.getInstance() + .getConfigManager() + .getClusterSchemaManager() + .getDatabaseSchemaByName(database); + if (schema == null) { + // If the database is not found, we keep it. + return false; + } + isTableModel = schema.isIsTableModel(); + } catch (final Exception ignore) { + // If the database is not found, we keep it. + return false; + } + + try { + return !DataRegionListeningFilter.shouldDatabaseBeListened( + copiedPipeMeta.getStaticMeta().getExtractorParameters(), + isTableModel, + database); + } catch (final Exception e) { + return false; + } + }); + + return copiedPipeMeta; + } + @Override public void serialize(DataOutputStream stream) throws IOException { super.serialize(stream); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java index 2b420136253a..de90951262cf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java @@ -36,6 +36,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2.copyAndFilterOutNonWorkingDataRegionPipeTasks; + public abstract class AbstractOperateSubscriptionAndPipeProcedure extends AbstractOperateSubscriptionProcedure { private static final Logger LOGGER = @@ -135,7 +137,7 @@ protected Map pushMultiPipeMetaToDataNodes( LOGGER.warn("Pipe {} not found in PipeTaskInfo, can not push its meta.", pipeName); continue; } - pipeMetaBinaryList.add(pipeMeta.serialize()); + pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize()); } return env.pushMultiPipeMetaToDataNodes(pipeMetaBinaryList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java index 18de48cfc326..3c0ab3594ad2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionListeningFilter.java @@ -61,6 +61,34 @@ public class DataRegionListeningFilter { } } + public static boolean shouldDatabaseBeListened( + final PipeParameters parameters, final boolean isTableModel, final String databaseRawName) + throws IllegalPathException { + final Pair insertionDeletionListeningOptionPair = + parseInsertionDeletionListeningOptionPair(parameters); + final boolean hasSpecificListeningOption = + insertionDeletionListeningOptionPair.getLeft() + || insertionDeletionListeningOptionPair.getRight(); + if (!hasSpecificListeningOption) { + return false; + } + + if (isTableModel) { + final String databaseTableModel = + databaseRawName.startsWith("root.") ? databaseRawName.substring(5) : databaseRawName; + final TablePattern tablePattern = + TablePattern.parsePipePatternFromSourceParameters(parameters); + return tablePattern.isTableModelDataAllowedToBeCaptured() + && tablePattern.matchesDatabase(databaseTableModel); + } else { + final String databaseTreeModel = + databaseRawName.startsWith("root.") ? databaseRawName : "root." + databaseRawName; + final TreePattern treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters); + return treePattern.isTreeModelDataAllowedToBeCaptured() + && treePattern.mayOverlapWithDb(databaseTreeModel); + } + } + public static boolean shouldDataRegionBeListened( PipeParameters parameters, DataRegionId dataRegionId) throws IllegalPathException { final Pair insertionDeletionListeningOptionPair = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java index c10d2ebefc3e..ae08fa8b0cb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java @@ -132,8 +132,10 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor private TreePattern treePattern; private TablePattern tablePattern; - private boolean isDbNameCoveredByPattern = false; + private boolean isModelDetected = false; + private boolean isTableModel; + private boolean isDbNameCoveredByPattern = false; private boolean isHistoricalExtractorEnabled = false; private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event time @@ -153,8 +155,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor private volatile boolean hasBeenStarted = false; - private final Map tsfile2IsTableModelMap = new HashMap<>(0); - private Queue pendingQueue; @Override @@ -669,49 +669,34 @@ private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource reso return deviceSet.stream() .anyMatch( deviceID -> { - if (deviceID instanceof PlainDeviceID - || deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX) - || deviceID.getTableName().equals(PATH_ROOT)) { - // In case of tree model deviceID - updateIsDbNameCoveredByPattern(resource, false); - if (treePattern.isTreeModelDataAllowedToBeCaptured() - && treePattern.mayOverlapWithDevice(deviceID)) { - tsfile2IsTableModelMap.computeIfAbsent( - resource, (tsFileResource) -> Boolean.FALSE); - return true; - } - } else { - // In case of table model deviceID - updateIsDbNameCoveredByPattern(resource, true); - if (tablePattern.isTableModelDataAllowedToBeCaptured() - // The database name in resource is prefixed with "root." - && tablePattern.matchesDatabase(resource.getDatabaseName().substring(5)) - && tablePattern.matchesTable(deviceID.getTableName())) { - tsfile2IsTableModelMap.computeIfAbsent( - resource, (tsFileResource) -> Boolean.TRUE); - return true; - } + if (!isModelDetected) { + detectModel(resource, deviceID); + isModelDetected = true; } - return false; + + return isTableModel + ? (tablePattern.isTableModelDataAllowedToBeCaptured() + // The database name in resource is prefixed with "root." + && tablePattern.matchesDatabase(resource.getDatabaseName().substring(5)) + && tablePattern.matchesTable(deviceID.getTableName())) + : (treePattern.isTreeModelDataAllowedToBeCaptured() + && treePattern.mayOverlapWithDevice(deviceID)); }); } - private void updateIsDbNameCoveredByPattern( - final TsFileResource resource, final boolean isTableModel) { - if (isModelDetected) { - return; - } + private void detectModel(final TsFileResource resource, final IDeviceID deviceID) { + this.isTableModel = + !(deviceID instanceof PlainDeviceID + || deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX) + || deviceID.getTableName().equals(PATH_ROOT)); final String databaseName = resource.getDatabaseName(); - if (Objects.nonNull(databaseName)) { - isDbNameCoveredByPattern = - isTableModel - ? tablePattern.isTableModelDataAllowedToBeCaptured() - && tablePattern.coversDb(databaseName.substring(5)) - : treePattern.isTreeModelDataAllowedToBeCaptured() - && treePattern.coversDb(databaseName); - isModelDetected = true; - } + isDbNameCoveredByPattern = + isTableModel + ? tablePattern.isTableModelDataAllowedToBeCaptured() + && tablePattern.coversDb(databaseName.substring(5)) + : treePattern.isTreeModelDataAllowedToBeCaptured() + && treePattern.coversDb(databaseName); } private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource resource) { @@ -808,7 +793,7 @@ private Event supplyTerminateEvent() { private Event supplyTsFileEvent(TsFileResource resource) { final PipeTsFileInsertionEvent event = new PipeTsFileInsertionEvent( - tsfile2IsTableModelMap.remove(resource), + isModelDetected ? isTableModel : null, resource.getDatabaseName(), resource, shouldTransferModFile, @@ -909,8 +894,6 @@ public int getPendingQueueSize() { @Override public synchronized void close() { - tsfile2IsTableModelMap.clear(); - if (Objects.nonNull(pendingQueue)) { pendingQueue.forEach( resource -> {