From 559b58a2183f4bc0b9079573214a3b44e8141292 Mon Sep 17 00:00:00 2001 From: shuwenwei <55970239+shuwenwei@users.noreply.github.com> Date: Tue, 26 Nov 2024 10:11:39 +0800 Subject: [PATCH] Enhance repair data file scan util (#14167) * enhance repair data file scan util * add ut * add log * remove duplicate code --- ...pactionStatisticsCheckFailedException.java | 87 +++++ .../RepairUnsortedFileCompactionTask.java | 4 +- .../repair/RepairDataFileScanUtil.java | 338 +++++++++++++----- .../repair/RepairTimePartitionScanTask.java | 4 +- .../repair/RepairDataFileScanUtilTest.java | 128 ++++++- 5 files changed, 462 insertions(+), 99 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java new file mode 100644 index 000000000000..9fe58c760424 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception; + +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.common.TimeRange; + +public class CompactionStatisticsCheckFailedException extends RuntimeException { + + public CompactionStatisticsCheckFailedException(String msg) { + super(msg); + } + + public CompactionStatisticsCheckFailedException( + IDeviceID deviceID, TimeRange deviceTimeRange, TimeRange actualDeviceTimeRange) { + super( + getExceptionMsg( + deviceID, + "The time range of current device is " + + deviceTimeRange + + ", which should equals actual device time range " + + actualDeviceTimeRange)); + } + + public CompactionStatisticsCheckFailedException( + IDeviceID deviceID, TimeseriesMetadata timeseriesMetadata, TimeRange actualTimeRange) { + super( + getExceptionMsg( + deviceID, + "Current timeseriesMetadata is " + + timeseriesMetadata + + ", which should equals actual time range " + + actualTimeRange)); + } + + public CompactionStatisticsCheckFailedException( + IDeviceID deviceID, ChunkMetadata chunkMetadata, TimeRange actualChunkTimeRange) { + super( + getExceptionMsg( + deviceID, + "Current chunkMetadata is " + + chunkMetadata + + ", which should equals actual chunk time range " + + actualChunkTimeRange)); + } + + public CompactionStatisticsCheckFailedException( + IDeviceID deviceID, PageHeader pageHeader, TimeRange pageDataTimeRange) { + super( + getExceptionMsg( + deviceID, + "Current page is " + + pageHeader + + ", which should contains actual page data time range " + + pageDataTimeRange)); + } + + private static String getExceptionMsg(IDeviceID deviceID, String detail) { + return "The device(" + deviceID + ")'s time range verification failed. " + detail; + } + + @Override + @SuppressWarnings("java:S3551") + public Throwable fillInStackTrace() { + return this; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java index 48df5c4e295f..3f76b6a538fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java @@ -198,12 +198,12 @@ private void calculateRepairMethod() { return; } RepairDataFileScanUtil repairDataFileScanUtil = new RepairDataFileScanUtil(sourceFile, true); - repairDataFileScanUtil.scanTsFile(); + repairDataFileScanUtil.scanTsFile(true); if (repairDataFileScanUtil.isBrokenFile()) { sourceFile.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR); return; } - if (repairDataFileScanUtil.hasUnsortedData()) { + if (repairDataFileScanUtil.hasUnsortedDataOrWrongStatistics()) { sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java index c11a7468a712..cffe49793163 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.repair; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionStatisticsCheckFailedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; @@ -38,15 +39,16 @@ import org.apache.tsfile.file.MetaMarker; import org.apache.tsfile.file.header.ChunkHeader; import org.apache.tsfile.file.header.PageHeader; -import org.apache.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TsFileDeviceIterator; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import org.slf4j.Logger; @@ -56,8 +58,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,7 +70,8 @@ public class RepairDataFileScanUtil { private static final Logger logger = LoggerFactory.getLogger(RepairDataFileScanUtil.class); private final TsFileResource resource; - private boolean hasUnsortedData; + private ArrayDeviceTimeIndex timeIndex; + private boolean hasUnsortedDataOrWrongStatistics; private boolean isBrokenFile; private long previousTime; private boolean printLog; @@ -78,13 +82,27 @@ public RepairDataFileScanUtil(TsFileResource resource) { public RepairDataFileScanUtil(TsFileResource resource, boolean printLog) { this.resource = resource; - this.hasUnsortedData = false; + this.hasUnsortedDataOrWrongStatistics = false; this.previousTime = Long.MIN_VALUE; this.printLog = printLog; } public void scanTsFile() { + scanTsFile(false); + } + + public void scanTsFile(boolean checkTsFileResource) { File tsfile = resource.getTsFile(); + try { + timeIndex = checkTsFileResource ? getDeviceTimeIndex(resource) : null; + } catch (IOException e) { + logger.warn( + "Meet error when read tsfile resource file {}, it may be repaired after reboot", + tsfile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX, + e); + isBrokenFile = true; + return; + } try (TsFileSequenceReader reader = new CompactionTsFileReader( tsfile.getPath(), @@ -92,26 +110,53 @@ public void scanTsFile() { ? CompactionType.INNER_SEQ_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION)) { TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + Set deviceIdsInTimeIndex = + checkTsFileResource ? new HashSet<>(timeIndex.getDevices()) : Collections.emptySet(); while (deviceIterator.hasNext()) { Pair deviceIsAlignedPair = deviceIterator.next(); IDeviceID device = deviceIsAlignedPair.getLeft(); + if (checkTsFileResource) { + if (!deviceIdsInTimeIndex.contains(device)) { + throw new CompactionStatisticsCheckFailedException( + device + " does not exist in the resource file"); + } + deviceIdsInTimeIndex.remove(device); + } MetadataIndexNode metadataIndexNode = deviceIterator.getFirstMeasurementNodeOfCurrentDevice(); + TimeRange deviceTimeRangeInResource = + checkTsFileResource + ? new TimeRange(timeIndex.getStartTime(device), timeIndex.getEndTime(device)) + : null; boolean isAligned = deviceIsAlignedPair.getRight(); if (isAligned) { - checkAlignedDeviceSeries(reader, device, metadataIndexNode); + checkAlignedDeviceSeries( + reader, device, metadataIndexNode, deviceTimeRangeInResource, checkTsFileResource); } else { - checkNonAlignedDeviceSeries(reader, device, metadataIndexNode); + checkNonAlignedDeviceSeries( + reader, device, metadataIndexNode, deviceTimeRangeInResource, checkTsFileResource); } } + if (!deviceIdsInTimeIndex.isEmpty()) { + throw new CompactionStatisticsCheckFailedException( + "These devices (" + deviceIdsInTimeIndex + ") do not exist in the tsfile"); + } } catch (CompactionLastTimeCheckFailedException lastTimeCheckFailedException) { - this.hasUnsortedData = true; + this.hasUnsortedDataOrWrongStatistics = true; if (printLog) { logger.error( "File {} has unsorted data: ", resource.getTsFile().getPath(), lastTimeCheckFailedException); } + } catch (CompactionStatisticsCheckFailedException compactionStatisticsCheckFailedException) { + this.hasUnsortedDataOrWrongStatistics = true; + if (printLog) { + logger.error( + "File {} has wrong time statistics: ", + resource.getTsFile().getPath(), + compactionStatisticsCheckFailedException); + } } catch (Exception e) { // ignored the exception caused by thread interrupt if (Thread.currentThread().isInterrupted()) { @@ -127,102 +172,188 @@ public void scanTsFile() { } private void checkAlignedDeviceSeries( - TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode metadataIndexNode) + TsFileSequenceReader reader, + IDeviceID device, + MetadataIndexNode metadataIndexNode, + TimeRange deviceTimeRangeInResource, + boolean checkTsFileResource) throws IOException { - List chunkMetadataList = - reader.getAlignedChunkMetadataByMetadataIndexNode(device, metadataIndexNode, false); - for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) { - IChunkMetadata timeChunkMetadata = alignedChunkMetadata.getTimeChunkMetadata(); - Chunk timeChunk = reader.readMemChunk((ChunkMetadata) timeChunkMetadata); + List timeColumnTimeseriesMetadata = new ArrayList<>(1); + reader.readITimeseriesMetadata(timeColumnTimeseriesMetadata, metadataIndexNode, ""); + TimeseriesMetadata timeseriesMetadata = timeColumnTimeseriesMetadata.get(0); - CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk); - ByteBuffer chunkDataBuffer = timeChunk.getData(); - ChunkHeader chunkHeader = timeChunk.getHeader(); - while (chunkDataBuffer.hasRemaining()) { - // deserialize a PageHeader from chunkDataBuffer - PageHeader pageHeader = null; - if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) { - pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, timeChunk.getChunkStatistic()); - } else { - pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType()); - } - ByteBuffer pageData = chunkReader.readPageDataWithoutUncompressing(pageHeader); - IDecryptor decryptor = IDecryptor.getDecryptor(timeChunk.getEncryptParam()); - ByteBuffer uncompressedPageData = - decryptAndUncompressPageData( - pageHeader, - IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()), - pageData, - decryptor); - Decoder decoder = - Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()); - while (decoder.hasNext(uncompressedPageData)) { - long currentTime = decoder.readLong(uncompressedPageData); - checkPreviousTimeAndUpdate(device, currentTime); - } - } + // check device time range + TimeRange timeseriesTimeRange = + new TimeRange( + timeseriesMetadata.getStatistics().getStartTime(), + timeseriesMetadata.getStatistics().getEndTime()); + if (checkTsFileResource && !timeseriesTimeRange.equals(deviceTimeRangeInResource)) { + throw new CompactionStatisticsCheckFailedException( + device, deviceTimeRangeInResource, timeseriesTimeRange); + } + + long actualTimeseriesStartTime = Long.MAX_VALUE; + long actualTimeseriesEndTime = Long.MIN_VALUE; + List timeChunkMetadataList = + reader.readChunkMetaDataList(timeColumnTimeseriesMetadata.get(0)); + for (ChunkMetadata timeChunkMetadata : timeChunkMetadataList) { + actualTimeseriesStartTime = + Math.min(actualTimeseriesStartTime, timeChunkMetadata.getStartTime()); + actualTimeseriesEndTime = Math.max(actualTimeseriesEndTime, timeChunkMetadata.getEndTime()); + checkTimeChunkInAlignedSeries(reader, device, timeChunkMetadata); } + + // reset previousTime previousTime = Long.MIN_VALUE; + + // check timeseries time range + if (actualTimeseriesStartTime > actualTimeseriesEndTime) { + return; + } + TimeRange actualTimeseriesTimeRange = + new TimeRange(actualTimeseriesStartTime, actualTimeseriesEndTime); + if (!actualTimeseriesTimeRange.equals(timeseriesTimeRange)) { + throw new CompactionStatisticsCheckFailedException( + device, timeseriesMetadata, actualTimeseriesTimeRange); + } } - private void checkNonAlignedDeviceSeries( - TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode metadataIndexNode) + private void checkTimeChunkInAlignedSeries( + TsFileSequenceReader reader, IDeviceID device, ChunkMetadata timeChunkMetadata) throws IOException { - Iterator>> measurementChunkMetadataListMapIterator = - reader.getMeasurementChunkMetadataListMapIterator(metadataIndexNode); - while (measurementChunkMetadataListMapIterator.hasNext()) { - Map> measurementChunkMetadataListMap = - measurementChunkMetadataListMapIterator.next(); - for (Map.Entry> measurementChunkMetadataListEntry : - measurementChunkMetadataListMap.entrySet()) { - List chunkMetadataList = measurementChunkMetadataListEntry.getValue(); - checkSingleNonAlignedSeries( - reader, device, measurementChunkMetadataListEntry.getKey(), chunkMetadataList); - previousTime = Long.MIN_VALUE; + Chunk timeChunk = reader.readMemChunk(timeChunkMetadata); + + CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk); + ByteBuffer chunkDataBuffer = timeChunk.getData(); + ChunkHeader chunkHeader = timeChunk.getHeader(); + long actualChunkStartTime = Long.MAX_VALUE; + long actualChunkEndTime = Long.MIN_VALUE; + while (chunkDataBuffer.hasRemaining()) { + // deserialize a PageHeader from chunkDataBuffer + PageHeader pageHeader = null; + if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) { + pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, timeChunk.getChunkStatistic()); + } else { + pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType()); } + actualChunkStartTime = Math.min(actualChunkStartTime, pageHeader.getStartTime()); + actualChunkEndTime = Math.max(actualChunkEndTime, pageHeader.getEndTime()); + ByteBuffer pageData = chunkReader.readPageDataWithoutUncompressing(pageHeader); + IDecryptor decryptor = IDecryptor.getDecryptor(timeChunk.getEncryptParam()); + ByteBuffer uncompressedPageData = + decryptAndUncompressPageData( + pageHeader, + IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()), + pageData, + decryptor); + validateTimeData(device, uncompressedPageData, pageHeader); + } + if (actualChunkStartTime > actualChunkEndTime) { + return; + } + TimeRange actualChunkTimeRange = new TimeRange(actualChunkStartTime, actualChunkEndTime); + if (!actualChunkTimeRange.equals( + new TimeRange(timeChunkMetadata.getStartTime(), timeChunkMetadata.getEndTime()))) { + throw new CompactionStatisticsCheckFailedException( + device, timeChunkMetadata, actualChunkTimeRange); } } - private void checkSingleNonAlignedSeries( + private void checkNonAlignedDeviceSeries( TsFileSequenceReader reader, - IDeviceID deviceID, - String measurementId, - List chunkMetadataList) + IDeviceID device, + MetadataIndexNode metadataIndexNode, + TimeRange deviceTimeRangeInResource, + boolean checkTsFileResource) throws IOException { - for (ChunkMetadata chunkMetadata : chunkMetadataList) { - if (chunkMetadata == null || chunkMetadata.getStatistics().getCount() == 0) { - continue; - } - Chunk chunk = reader.readMemChunk(chunkMetadata); - ChunkHeader chunkHeader = chunk.getHeader(); - CompactionChunkReader chunkReader = new CompactionChunkReader(chunk); - ByteBuffer chunkDataBuffer = chunk.getData(); - while (chunkDataBuffer.hasRemaining()) { - // deserialize a PageHeader from chunkDataBuffer - PageHeader pageHeader = null; - if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) { - pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunk.getChunkStatistic()); - } else { - pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType()); - } - ByteBuffer pageData = chunkReader.readPageDataWithoutUncompressing(pageHeader); - IDecryptor decryptor = IDecryptor.getDecryptor(chunk.getEncryptParam()); - ByteBuffer uncompressedPageData = - decryptAndUncompressPageData( - pageHeader, - IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()), - pageData, - decryptor); - ByteBuffer timeBuffer = getTimeBufferFromNonAlignedPage(uncompressedPageData); - Decoder timeDecoder = - Decoder.getDecoderByType( - TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), - TSDataType.INT64); - while (timeDecoder.hasNext(timeBuffer)) { - long currentTime = timeDecoder.readLong(timeBuffer); - checkPreviousTimeAndUpdate(deviceID, measurementId, currentTime); - } + List timeseriesMetadataList = new ArrayList<>(); + reader.getDeviceTimeseriesMetadata( + timeseriesMetadataList, metadataIndexNode, Collections.emptySet(), true); + long actualDeviceStartTime = Long.MAX_VALUE; + long actualDeviceEndTime = Long.MIN_VALUE; + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { + actualDeviceStartTime = + Math.min(actualDeviceStartTime, timeseriesMetadata.getStatistics().getStartTime()); + actualDeviceEndTime = + Math.max(actualDeviceStartTime, timeseriesMetadata.getStatistics().getEndTime()); + checkSingleNonAlignedSeries(reader, device, timeseriesMetadata); + previousTime = Long.MIN_VALUE; + } + + if (!checkTsFileResource || actualDeviceStartTime > actualDeviceEndTime) { + return; + } + TimeRange actualDeviceTimeRange = new TimeRange(actualDeviceStartTime, actualDeviceEndTime); + if (!actualDeviceTimeRange.equals(deviceTimeRangeInResource)) { + throw new CompactionStatisticsCheckFailedException( + device, deviceTimeRangeInResource, actualDeviceTimeRange); + } + } + + private void checkSingleNonAlignedSeries( + TsFileSequenceReader reader, IDeviceID deviceID, TimeseriesMetadata timeseriesMetadata) + throws IOException { + TimeRange timeseriesTimeRange = + new TimeRange( + timeseriesMetadata.getStatistics().getStartTime(), + timeseriesMetadata.getStatistics().getEndTime()); + long actualTimeseriesStartTime = Long.MAX_VALUE; + long actualTimeseriesEndTime = Long.MIN_VALUE; + for (IChunkMetadata iChunkMetadata : timeseriesMetadata.getChunkMetadataList()) { + ChunkMetadata chunkMetadata = (ChunkMetadata) iChunkMetadata; + actualTimeseriesStartTime = Math.min(actualTimeseriesStartTime, chunkMetadata.getStartTime()); + actualTimeseriesEndTime = Math.max(actualTimeseriesEndTime, chunkMetadata.getEndTime()); + checkChunkOfNonAlignedSeries(reader, deviceID, chunkMetadata); + } + if (actualTimeseriesStartTime > actualTimeseriesEndTime) { + return; + } + TimeRange actualTimeseriesTimeRange = + new TimeRange(actualTimeseriesStartTime, actualTimeseriesEndTime); + if (!actualTimeseriesTimeRange.equals(timeseriesTimeRange)) { + throw new CompactionStatisticsCheckFailedException( + deviceID, timeseriesMetadata, actualTimeseriesTimeRange); + } + } + + private void checkChunkOfNonAlignedSeries( + TsFileSequenceReader reader, IDeviceID deviceID, ChunkMetadata chunkMetadata) + throws IOException { + Chunk chunk = reader.readMemChunk(chunkMetadata); + ChunkHeader chunkHeader = chunk.getHeader(); + CompactionChunkReader chunkReader = new CompactionChunkReader(chunk); + ByteBuffer chunkDataBuffer = chunk.getData(); + long actualChunkStartTime = Long.MAX_VALUE; + long actualChunkEndTime = Long.MIN_VALUE; + while (chunkDataBuffer.hasRemaining()) { + // deserialize a PageHeader from chunkDataBuffer + PageHeader pageHeader = null; + if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) { + pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunk.getChunkStatistic()); + } else { + pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType()); } + actualChunkStartTime = Math.min(actualChunkStartTime, pageHeader.getStartTime()); + actualChunkEndTime = Math.max(actualChunkEndTime, pageHeader.getEndTime()); + ByteBuffer pageData = chunkReader.readPageDataWithoutUncompressing(pageHeader); + IDecryptor decryptor = IDecryptor.getDecryptor(chunk.getEncryptParam()); + ByteBuffer uncompressedPageData = + decryptAndUncompressPageData( + pageHeader, + IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()), + pageData, + decryptor); + ByteBuffer timeBuffer = getTimeBufferFromNonAlignedPage(uncompressedPageData); + validateTimeData(deviceID, timeBuffer, pageHeader); + } + if (actualChunkStartTime > actualChunkEndTime) { + return; + } + TimeRange actualChunkTimeRange = new TimeRange(actualChunkStartTime, actualChunkEndTime); + if (!actualChunkTimeRange.equals( + new TimeRange(chunkMetadata.getStartTime(), chunkMetadata.getEndTime()))) { + throw new CompactionStatisticsCheckFailedException( + deviceID, chunkMetadata, actualChunkTimeRange); } } @@ -234,6 +365,31 @@ private ByteBuffer getTimeBufferFromNonAlignedPage(ByteBuffer uncompressedPageDa return timeBuffer; } + private void validateTimeData( + IDeviceID device, ByteBuffer uncompressedTimeData, PageHeader pageHeader) throws IOException { + Decoder decoder = + Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + TimeRange pageHeaderTimeRange = + new TimeRange(pageHeader.getStartTime(), pageHeader.getEndTime()); + long actualStartTime = Long.MAX_VALUE; + long actualEndTime = Long.MIN_VALUE; + while (decoder.hasNext(uncompressedTimeData)) { + long currentTime = decoder.readLong(uncompressedTimeData); + actualStartTime = Math.min(actualStartTime, currentTime); + actualEndTime = Math.max(actualEndTime, currentTime); + checkPreviousTimeAndUpdate(device, currentTime); + } + if (actualStartTime > actualEndTime) { + return; + } + TimeRange actualPageTimeRange = new TimeRange(actualStartTime, actualEndTime); + if (!actualPageTimeRange.equals(pageHeaderTimeRange)) { + throw new CompactionStatisticsCheckFailedException(device, pageHeader, actualPageTimeRange); + } + } + private void checkPreviousTimeAndUpdate(IDeviceID deviceID, String measurementId, long time) { if (previousTime >= time) { throw new CompactionLastTimeCheckFailedException( @@ -249,8 +405,8 @@ private void checkPreviousTimeAndUpdate(IDeviceID deviceID, long time) { previousTime = time; } - public boolean hasUnsortedData() { - return hasUnsortedData; + public boolean hasUnsortedDataOrWrongStatistics() { + return hasUnsortedDataOrWrongStatistics; } public boolean isBrokenFile() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java index d044824a3f28..a73d237705e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java @@ -103,7 +103,7 @@ private void checkInternalUnsortedFileAndRepair(RepairTimePartition timePartitio } LOGGER.info("[RepairScheduler] start check tsfile: {}", sourceFile); RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(sourceFile); - scanUtil.scanTsFile(); + scanUtil.scanTsFile(true); checkTaskStatusAndMayStop(); if (scanUtil.isBrokenFile()) { LOGGER.warn("[RepairScheduler] {} is skipped because it is broken", sourceFile); @@ -111,7 +111,7 @@ private void checkInternalUnsortedFileAndRepair(RepairTimePartition timePartitio latch.countDown(); continue; } - if (!scanUtil.hasUnsortedData()) { + if (!scanUtil.hasUnsortedDataOrWrongStatistics()) { latch.countDown(); continue; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java index 70c7813a26d0..a4fe8e20e13e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java @@ -24,8 +24,11 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.common.TimeRange; @@ -36,6 +39,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.List; public class RepairDataFileScanUtilTest extends AbstractCompactionTest { @@ -76,7 +80,123 @@ public void testScanNormalFile() throws IOException { RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource); scanUtil.scanTsFile(); Assert.assertFalse(scanUtil.isBrokenFile()); - Assert.assertFalse(scanUtil.hasUnsortedData()); + Assert.assertFalse(scanUtil.hasUnsortedDataOrWrongStatistics()); + } + + @Test + public void testWrongChunkStatisticsWithNonAlignedSeries() throws IOException { + TsFileResource resource = createEmptyFileAndResource(true); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d2"); + writer.generateSimpleNonAlignedSeriesToCurrentDevice( + "s0", new TimeRange[] {new TimeRange(10, 40)}, TSEncoding.PLAIN, CompressionType.LZ4); + writer.generateSimpleNonAlignedSeriesToCurrentDevice( + "s1", + new TimeRange[] {new TimeRange(40, 40), new TimeRange(50, 70)}, + TSEncoding.PLAIN, + CompressionType.LZ4); + List chunkMetadataListInMemory = + writer.getFileWriter().getChunkMetadataListOfCurrentDeviceInMemory(); + ChunkMetadata originChunkMetadata = chunkMetadataListInMemory.get(0); + originChunkMetadata.getStatistics().setStartTime(4); + writer.endChunkGroup(); + writer.endFile(); + } + RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource); + scanUtil.scanTsFile(); + Assert.assertFalse(scanUtil.isBrokenFile()); + Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics()); + } + + @Test + public void testWrongChunkStatisticsWithAlignedSeries() throws IOException { + TsFileResource resource = createEmptyFileAndResource(true); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d1"); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(10, 40)}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false, true)); + List chunkMetadataListInMemory = + writer.getFileWriter().getChunkMetadataListOfCurrentDeviceInMemory(); + ChunkMetadata originChunkMetadata = chunkMetadataListInMemory.get(0); + originChunkMetadata.getStatistics().setStartTime(20); + writer.endChunkGroup(); + writer.endFile(); + } + RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource); + scanUtil.scanTsFile(); + Assert.assertFalse(scanUtil.isBrokenFile()); + Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics()); + } + + @Test + public void testWrongResourceStatistics() throws IOException { + TsFileResource resource = createEmptyFileAndResource(true); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d1"); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(10, 40)}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false, true)); + writer.endChunkGroup(); + writer.endFile(); + } + resource + .getTimeIndex() + .updateStartTime(IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d1"), 1); + RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource); + scanUtil.scanTsFile(true); + Assert.assertFalse(scanUtil.isBrokenFile()); + Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics()); + } + + @Test + public void testDeviceNotExistsInTsFile() throws IOException { + TsFileResource resource = createEmptyFileAndResource(true); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d1"); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(10, 40)}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false, true)); + writer.endChunkGroup(); + writer.endFile(); + } + resource + .getTimeIndex() + .updateStartTime(IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d2"), 1); + RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource); + scanUtil.scanTsFile(true); + Assert.assertFalse(scanUtil.isBrokenFile()); + Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics()); + } + + @Test + public void testDeviceDoesNotExistInResourceFile() throws IOException { + TsFileResource resource = createEmptyFileAndResource(true); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d1"); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(10, 40)}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false, true)); + writer.endChunkGroup(); + writer.endFile(); + } + resource.setTimeIndex(new ArrayDeviceTimeIndex()); + RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource); + scanUtil.scanTsFile(true); + Assert.assertFalse(scanUtil.isBrokenFile()); + Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics()); } @Test @@ -104,7 +224,7 @@ public void testScanUnsortedFile() throws IOException { RepairDataFileScanUtil scanUtil1 = new RepairDataFileScanUtil(resource1); scanUtil1.scanTsFile(); Assert.assertFalse(scanUtil1.isBrokenFile()); - Assert.assertTrue(scanUtil1.hasUnsortedData()); + Assert.assertTrue(scanUtil1.hasUnsortedDataOrWrongStatistics()); TsFileResource resource2 = createEmptyFileAndResource(true); try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource2)) { @@ -133,7 +253,7 @@ public void testScanUnsortedFile() throws IOException { RepairDataFileScanUtil scanUtil2 = new RepairDataFileScanUtil(resource2); scanUtil2.scanTsFile(); Assert.assertFalse(scanUtil2.isBrokenFile()); - Assert.assertTrue(scanUtil2.hasUnsortedData()); + Assert.assertTrue(scanUtil2.hasUnsortedDataOrWrongStatistics()); } @Test @@ -162,6 +282,6 @@ public void testScanFileWithDifferentCompressionTypes() throws IOException { RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource); scanUtil.scanTsFile(); Assert.assertFalse(scanUtil.isBrokenFile()); - Assert.assertTrue(scanUtil.hasUnsortedData()); + Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics()); } }