Skip to content

Commit

Permalink
[IOTDB-6325] Support RegionScan for active metadata queries [BE Part …
Browse files Browse the repository at this point in the history
…One]
  • Loading branch information
ycycse authored May 23, 2024
1 parent f1d8fce commit 9a29af7
Show file tree
Hide file tree
Showing 48 changed files with 2,774 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.source.DataSourceOperator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;

import com.google.common.util.concurrent.SettableFuture;

Expand Down Expand Up @@ -68,21 +68,23 @@ protected boolean init(SettableFuture<?> blockedFuture) {
}

/**
* Init seq file list and unseq file list in {@link QueryDataSource} and set it into each
* Init seq file list and unseq file list in {@link
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and set it into each
* SourceNode.
*
* @throws QueryProcessException while failed to init query resource, QueryProcessException will
* be thrown
* @throws IllegalStateException if {@link QueryDataSource} is null after initialization,
* IllegalStateException will be thrown
* @throws IllegalStateException if {@link
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} is null after
* initialization, IllegalStateException will be thrown
*/
private void initialize() throws QueryProcessException {
long startTime = System.nanoTime();
try {
List<DataSourceOperator> sourceOperators =
((DataDriverContext) driverContext).getSourceOperators();
if (sourceOperators != null && !sourceOperators.isEmpty()) {
QueryDataSource dataSource = initQueryDataSource();
IQueryDataSource dataSource = initQueryDataSource();
if (dataSource == null) {
// If this driver is being initialized, meanwhile the whole FI was aborted or cancelled
// for some reasons, we may get null QueryDataSource here.
Expand All @@ -92,14 +94,7 @@ private void initialize() throws QueryProcessException {
sourceOperators.forEach(
sourceOperator -> {
// Construct QueryDataSource for source operator
QueryDataSource queryDataSource =
new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());

queryDataSource.setSingleDevice(dataSource.isSingleDevice());

queryDataSource.setDataTTL(dataSource.getDataTTL());

sourceOperator.initQueryDataSource(queryDataSource);
sourceOperator.initQueryDataSource(dataSource.clone());
});
}

Expand All @@ -118,12 +113,12 @@ protected void releaseResource() {

/**
* The method is called in mergeLock() when executing query. This method will get all the {@link
* QueryDataSource} needed for this query.
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} needed for this query.
*
* @throws QueryProcessException while failed to init query resource, QueryProcessException will
* be thrown
*/
private QueryDataSource initQueryDataSource() throws QueryProcessException {
private IQueryDataSource initQueryDataSource() throws QueryProcessException {
return ((DataDriverContext) driverContext).getSharedQueryDataSource();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.operator.source.DataSourceOperator;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -69,7 +69,7 @@ public IDataRegionForQuery getDataRegion() {
return getFragmentInstanceContext().getDataRegion();
}

public QueryDataSource getSharedQueryDataSource() throws QueryProcessException {
public IQueryDataSource getSharedQueryDataSource() throws QueryProcessException {
return getFragmentInstanceContext().getSharedQueryDataSource();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,22 @@
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.filter.basic.Filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -64,8 +67,11 @@ public class FragmentInstanceContext extends QueryContext {

// it will only be used once, after sharedQueryDataSource being inited, it will be set to null
private List<PartialPath> sourcePaths;
// Used for region scan, relating methods are to be added.
private Map<IDeviceID, Boolean> devicePathsToAligned;

// Shared by all scan operators in this fragment instance to avoid memory problem
private QueryDataSource sharedQueryDataSource;
private IQueryDataSource sharedQueryDataSource;

/** closed tsfile used in this fragment instance. */
private Set<TsFileResource> closedFilePaths;
Expand All @@ -80,6 +86,8 @@ public class FragmentInstanceContext extends QueryContext {
// empty for zero time partitions
private List<Long> timePartitions;

private QueryDataSourceType queryDataSourceType = QueryDataSourceType.SERIES_SCAN;

private final AtomicLong startNanos = new AtomicLong();
private final AtomicLong endNanos = new AtomicLong();

Expand Down Expand Up @@ -153,6 +161,10 @@ public static FragmentInstanceContext createFragmentInstanceContextForCompaction
return new FragmentInstanceContext(queryId);
}

public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) {
this.queryDataSourceType = queryDataSourceType;
}

@TestOnly
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
Expand Down Expand Up @@ -374,59 +386,143 @@ public void initQueryDataSource(List<PartialPath> sourcePaths) throws QueryProce
if (sharedQueryDataSource != null) {
closedFilePaths = new HashSet<>();
unClosedFilePaths = new HashSet<>();
addUsedFilesForQuery(sharedQueryDataSource);
sharedQueryDataSource.setSingleDevice(selectedDeviceIdSet.size() == 1);
addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource);
((QueryDataSource) sharedQueryDataSource).setSingleDevice(selectedDeviceIdSet.size() == 1);
}
} finally {
setInitQueryDataSourceCost(System.nanoTime() - startTime);
dataRegion.readUnlock();
}
}

public void initRegionScanQueryDataSource(Map<IDeviceID, Boolean> devicePathToAligned)
throws QueryProcessException {
long startTime = System.nanoTime();
dataRegion.readLock();
try {
this.sharedQueryDataSource =
dataRegion.queryForDeviceRegionScan(
devicePathToAligned,
this,
globalTimeFilter != null ? globalTimeFilter.copy() : null,
timePartitions);

if (sharedQueryDataSource != null) {
closedFilePaths = new HashSet<>();
unClosedFilePaths = new HashSet<>();
addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) sharedQueryDataSource);
}
} finally {
setInitQueryDataSourceCost(System.nanoTime() - startTime);
dataRegion.readUnlock();
}
}

public synchronized QueryDataSource getSharedQueryDataSource() throws QueryProcessException {
public void initRegionScanQueryDataSource(List<PartialPath> pathList)
throws QueryProcessException {
long startTime = System.nanoTime();
dataRegion.readLock();
try {
this.sharedQueryDataSource =
dataRegion.queryForSeriesRegionScan(
pathList,
this,
globalTimeFilter != null ? globalTimeFilter.copy() : null,
timePartitions);

if (sharedQueryDataSource != null) {
closedFilePaths = new HashSet<>();
unClosedFilePaths = new HashSet<>();
addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) sharedQueryDataSource);
}
} finally {
setInitQueryDataSourceCost(System.nanoTime() - startTime);
dataRegion.readUnlock();
}
}

public synchronized IQueryDataSource getSharedQueryDataSource() throws QueryProcessException {
if (sharedQueryDataSource == null) {
initQueryDataSource(sourcePaths);
// friendly for gc
sourcePaths = null;
switch (queryDataSourceType) {
case SERIES_SCAN:
initQueryDataSource(sourcePaths);
// Friendly for gc
sourcePaths = null;
break;
case DEVICE_REGION_SCAN:
initRegionScanQueryDataSource(devicePathsToAligned);
devicePathsToAligned = null;
break;
case TIME_SERIES_REGION_SCAN:
initRegionScanQueryDataSource(sourcePaths);
sourcePaths = null;
break;
default:
throw new QueryProcessException(
"Unsupported query data source type: " + queryDataSourceType);
}
}
return sharedQueryDataSource;
}

/** Lock and check if tsFileResource is deleted */
private boolean processTsFileResource(TsFileResource tsFileResource, boolean isClosed) {
addFilePathToMap(tsFileResource, isClosed);
// this file may be deleted just before we lock it
if (tsFileResource.isDeleted()) {
Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths;
// This resource may be removed by other threads of this query.
if (pathSet.remove(tsFileResource)) {
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
}
return true;
} else {
return false;
}
}

/** Add the unique file paths to closeddFilePathsMap and unClosedFilePathsMap. */
private void addUsedFilesForQuery(QueryDataSource dataSource) {

// sequence data
addUsedFilesForQuery(dataSource.getSeqResources());
dataSource
.getSeqResources()
.removeIf(
tsFileResource -> processTsFileResource(tsFileResource, tsFileResource.isClosed()));

// Record statistics of seqFiles
unclosedSeqFileNum = unClosedFilePaths.size();
closedSeqFileNum = closedFilePaths.size();

// unsequence data
addUsedFilesForQuery(dataSource.getUnseqResources());
dataSource
.getUnseqResources()
.removeIf(
tsFileResource -> processTsFileResource(tsFileResource, tsFileResource.isClosed()));

// Record statistics of files of unseqFiles
unclosedUnseqFileNum = unClosedFilePaths.size() - unclosedSeqFileNum;
closedUnseqFileNum = closedFilePaths.size() - closedSeqFileNum;
}

private void addUsedFilesForQuery(List<TsFileResource> resources) {
Iterator<TsFileResource> iterator = resources.iterator();
while (iterator.hasNext()) {
TsFileResource tsFileResource = iterator.next();
boolean isClosed = tsFileResource.isClosed();
addFilePathToMap(tsFileResource, isClosed);

// this file may be deleted just before we lock it
if (tsFileResource.isDeleted()) {
Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths;
// This resource may be removed by other threads of this query.
if (pathSet.remove(tsFileResource)) {
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
}
iterator.remove();
}
}
private void addUsedFilesForRegionQuery(QueryDataSourceForRegionScan dataSource) {
dataSource
.getSeqFileScanHandles()
.removeIf(
fileScanHandle ->
processTsFileResource(fileScanHandle.getTsResource(), fileScanHandle.isClosed()));

unclosedSeqFileNum = unClosedFilePaths.size();
closedSeqFileNum = closedFilePaths.size();

dataSource
.getUnseqFileScanHandles()
.removeIf(
fileScanHandle ->
processTsFileResource(fileScanHandle.getTsResource(), fileScanHandle.isClosed()));

unclosedUnseqFileNum = unClosedFilePaths.size() - unclosedSeqFileNum;
closedUnseqFileNum = closedFilePaths.size() - closedSeqFileNum;
}

/**
Expand Down
Loading

0 comments on commit 9a29af7

Please sign in to comment.