Skip to content

Commit

Permalink
Pipe/Subscription: Filter out non-working DR PipeTasks in CN & Reduce…
Browse files Browse the repository at this point in the history
… model judgement cost in PipeHistoricalDataRegionTsFileAndDeletionExtractor (#14059)

Co-authored-by: Steve Yurong Su <rong@apache.org>
  • Loading branch information
luoluoyuyu and SteveYurongSu authored Nov 26, 2024
1 parent 559b58a commit 68daea6
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.confignode.procedure.impl.pipe;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeProcedureMetrics;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
Expand All @@ -30,6 +32,9 @@
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -390,9 +395,8 @@ protected Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(ConfigNodeProc
throws IOException {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
}

return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
}

Expand All @@ -407,10 +411,9 @@ protected Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(ConfigNodeProc
public static Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(
ConfigNodeProcedureEnv env, AtomicReference<PipeTaskInfo> pipeTaskInfo) throws IOException {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
}

return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
}

Expand Down Expand Up @@ -487,7 +490,9 @@ protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv env
protected Map<Integer, TPushPipeMetaResp> pushSinglePipeMetaToDataNodes(
String pipeName, ConfigNodeProcedureEnv env) throws IOException {
return env.pushSinglePipeMetaToDataNodes(
pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
copyAndFilterOutNonWorkingDataRegionPipeTasks(
pipeTaskInfo.get().getPipeMetaByPipeName(pipeName))
.serialize());
}

/**
Expand All @@ -502,6 +507,68 @@ protected Map<Integer, TPushPipeMetaResp> dropSinglePipeOnDataNodes(
return env.dropSinglePipeOnDataNodes(pipeName);
}

public static PipeMeta copyAndFilterOutNonWorkingDataRegionPipeTasks(PipeMeta originalPipeMeta)
throws IOException {
final PipeMeta copiedPipeMeta = originalPipeMeta.deepCopy4TaskAgent();

copiedPipeMeta
.getRuntimeMeta()
.getConsensusGroupId2TaskMetaMap()
.entrySet()
.removeIf(
consensusGroupId2TaskMeta -> {
final String database;
try {
database =
ConfigNode.getInstance()
.getConfigManager()
.getPartitionManager()
.getRegionDatabase(
new TConsensusGroupId(
// We assume that the consensus group id is a data region id.
TConsensusGroupType.DataRegion,
consensusGroupId2TaskMeta.getKey()));
if (database == null) {
// If the consensus group id is not a data region id, we keep it.
// If the consensus group id is a data region id, but the database is not found,
// we keep it.
return false;
}
} catch (final Exception ignore) {
// In case of any exception, we keep the consensus group id.
return false;
}

final boolean isTableModel;
try {
final TDatabaseSchema schema =
ConfigNode.getInstance()
.getConfigManager()
.getClusterSchemaManager()
.getDatabaseSchemaByName(database);
if (schema == null) {
// If the database is not found, we keep it.
return false;
}
isTableModel = schema.isIsTableModel();
} catch (final Exception ignore) {
// If the database is not found, we keep it.
return false;
}

try {
return !DataRegionListeningFilter.shouldDatabaseBeListened(
copiedPipeMeta.getStaticMeta().getExtractorParameters(),
isTableModel,
database);
} catch (final Exception e) {
return false;
}
});

return copiedPipeMeta;
}

@Override
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2.copyAndFilterOutNonWorkingDataRegionPipeTasks;

public abstract class AbstractOperateSubscriptionAndPipeProcedure
extends AbstractOperateSubscriptionProcedure {
private static final Logger LOGGER =
Expand Down Expand Up @@ -135,7 +137,7 @@ protected Map<Integer, TPushPipeMetaResp> pushMultiPipeMetaToDataNodes(
LOGGER.warn("Pipe {} not found in PipeTaskInfo, can not push its meta.", pipeName);
continue;
}
pipeMetaBinaryList.add(pipeMeta.serialize());
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
}

return env.pushMultiPipeMetaToDataNodes(pipeMetaBinaryList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,34 @@ public class DataRegionListeningFilter {
}
}

public static boolean shouldDatabaseBeListened(
final PipeParameters parameters, final boolean isTableModel, final String databaseRawName)
throws IllegalPathException {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
parseInsertionDeletionListeningOptionPair(parameters);
final boolean hasSpecificListeningOption =
insertionDeletionListeningOptionPair.getLeft()
|| insertionDeletionListeningOptionPair.getRight();
if (!hasSpecificListeningOption) {
return false;
}

if (isTableModel) {
final String databaseTableModel =
databaseRawName.startsWith("root.") ? databaseRawName.substring(5) : databaseRawName;
final TablePattern tablePattern =
TablePattern.parsePipePatternFromSourceParameters(parameters);
return tablePattern.isTableModelDataAllowedToBeCaptured()
&& tablePattern.matchesDatabase(databaseTableModel);
} else {
final String databaseTreeModel =
databaseRawName.startsWith("root.") ? databaseRawName : "root." + databaseRawName;
final TreePattern treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
return treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.mayOverlapWithDb(databaseTreeModel);
}
}

public static boolean shouldDataRegionBeListened(
PipeParameters parameters, DataRegionId dataRegionId) throws IllegalPathException {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor

private TreePattern treePattern;
private TablePattern tablePattern;
private boolean isDbNameCoveredByPattern = false;

private boolean isModelDetected = false;
private boolean isTableModel;
private boolean isDbNameCoveredByPattern = false;

private boolean isHistoricalExtractorEnabled = false;
private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event time
Expand All @@ -153,8 +155,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor

private volatile boolean hasBeenStarted = false;

private final Map<TsFileResource, Boolean> tsfile2IsTableModelMap = new HashMap<>(0);

private Queue<PersistentResource> pendingQueue;

@Override
Expand Down Expand Up @@ -669,49 +669,34 @@ private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource reso
return deviceSet.stream()
.anyMatch(
deviceID -> {
if (deviceID instanceof PlainDeviceID
|| deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
|| deviceID.getTableName().equals(PATH_ROOT)) {
// In case of tree model deviceID
updateIsDbNameCoveredByPattern(resource, false);
if (treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.mayOverlapWithDevice(deviceID)) {
tsfile2IsTableModelMap.computeIfAbsent(
resource, (tsFileResource) -> Boolean.FALSE);
return true;
}
} else {
// In case of table model deviceID
updateIsDbNameCoveredByPattern(resource, true);
if (tablePattern.isTableModelDataAllowedToBeCaptured()
// The database name in resource is prefixed with "root."
&& tablePattern.matchesDatabase(resource.getDatabaseName().substring(5))
&& tablePattern.matchesTable(deviceID.getTableName())) {
tsfile2IsTableModelMap.computeIfAbsent(
resource, (tsFileResource) -> Boolean.TRUE);
return true;
}
if (!isModelDetected) {
detectModel(resource, deviceID);
isModelDetected = true;
}
return false;

return isTableModel
? (tablePattern.isTableModelDataAllowedToBeCaptured()
// The database name in resource is prefixed with "root."
&& tablePattern.matchesDatabase(resource.getDatabaseName().substring(5))
&& tablePattern.matchesTable(deviceID.getTableName()))
: (treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.mayOverlapWithDevice(deviceID));
});
}

private void updateIsDbNameCoveredByPattern(
final TsFileResource resource, final boolean isTableModel) {
if (isModelDetected) {
return;
}
private void detectModel(final TsFileResource resource, final IDeviceID deviceID) {
this.isTableModel =
!(deviceID instanceof PlainDeviceID
|| deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
|| deviceID.getTableName().equals(PATH_ROOT));

final String databaseName = resource.getDatabaseName();
if (Objects.nonNull(databaseName)) {
isDbNameCoveredByPattern =
isTableModel
? tablePattern.isTableModelDataAllowedToBeCaptured()
&& tablePattern.coversDb(databaseName.substring(5))
: treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.coversDb(databaseName);
isModelDetected = true;
}
isDbNameCoveredByPattern =
isTableModel
? tablePattern.isTableModelDataAllowedToBeCaptured()
&& tablePattern.coversDb(databaseName.substring(5))
: treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.coversDb(databaseName);
}

private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource resource) {
Expand Down Expand Up @@ -808,7 +793,7 @@ private Event supplyTerminateEvent() {
private Event supplyTsFileEvent(TsFileResource resource) {
final PipeTsFileInsertionEvent event =
new PipeTsFileInsertionEvent(
tsfile2IsTableModelMap.remove(resource),
isModelDetected ? isTableModel : null,
resource.getDatabaseName(),
resource,
shouldTransferModFile,
Expand Down Expand Up @@ -909,8 +894,6 @@ public int getPendingQueueSize() {

@Override
public synchronized void close() {
tsfile2IsTableModelMap.clear();

if (Objects.nonNull(pendingQueue)) {
pendingQueue.forEach(
resource -> {
Expand Down

0 comments on commit 68daea6

Please sign in to comment.