Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add or modify encryption related codes #13364

Merged
merged 21 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;

import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encrypt.IDecryptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
Expand Down Expand Up @@ -47,6 +48,8 @@ public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader {
// chunk data of the time column
private final ByteBuffer timeChunkDataBuffer;

private final IDecryptor decryptor;

// chunk headers of all the sub sensors
private final List<ChunkHeader> valueChunkHeaderList = new ArrayList<>();
// chunk data of all the sub sensors
Expand All @@ -59,6 +62,7 @@ public AlignedSinglePageWholeChunkReader(Chunk timeChunk, List<Chunk> valueChunk
super(Long.MIN_VALUE, null);
this.timeChunkHeader = timeChunk.getHeader();
this.timeChunkDataBuffer = timeChunk.getData();
this.decryptor = timeChunk.getDecryptor();

valueChunkList.forEach(
chunk -> {
Expand Down Expand Up @@ -121,7 +125,8 @@ private void skipCurrentPage(PageHeader timePageHeader, List<PageHeader> valuePa
private AlignedPageReader constructAlignedPageReader(
PageHeader timePageHeader, List<PageHeader> rawValuePageHeaderList) throws IOException {
ByteBuffer timePageData =
ChunkReader.deserializePageData(timePageHeader, timeChunkDataBuffer, timeChunkHeader);
ChunkReader.deserializePageData(
timePageHeader, timeChunkDataBuffer, timeChunkHeader, decryptor);

List<PageHeader> valuePageHeaderList = new ArrayList<>();
List<ByteBuffer> valuePageDataList = new ArrayList<>();
Expand All @@ -143,7 +148,7 @@ private AlignedPageReader constructAlignedPageReader(
valuePageHeaderList.add(valuePageHeader);
valuePageDataList.add(
ChunkReader.deserializePageData(
valuePageHeader, valueChunkDataBufferList.get(i), valueChunkHeader));
valuePageHeader, valueChunkDataBufferList.get(i), valueChunkHeader, decryptor));
valueDataTypeList.add(valueChunkHeader.getDataType());
valueDecoderList.add(
Decoder.getDecoderByType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encrypt.IDecryptor;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.enums.EncryptionType;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
Expand All @@ -35,13 +37,14 @@
public class SinglePageWholeChunkReader extends AbstractChunkReader {
private final ChunkHeader chunkHeader;
private final ByteBuffer chunkDataBuffer;
private final IDecryptor decryptor;

public SinglePageWholeChunkReader(Chunk chunk) throws IOException {
super(Long.MIN_VALUE, null);

this.chunkHeader = chunk.getHeader();
this.chunkDataBuffer = chunk.getData();

this.decryptor = chunk.getDecryptor();
initAllPageReaders();
}

Expand All @@ -58,7 +61,7 @@ private void initAllPageReaders() throws IOException {
private PageReader constructPageReader(PageHeader pageHeader) throws IOException {
return new PageReader(
pageHeader,
deserializePageData(pageHeader, chunkDataBuffer, chunkHeader),
deserializePageData(pageHeader, chunkDataBuffer, chunkHeader, decryptor),
chunkHeader.getDataType(),
Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()),
defaultTimeDecoder,
Expand Down Expand Up @@ -107,10 +110,42 @@ public static ByteBuffer uncompressPageData(
return ByteBuffer.wrap(uncompressedPageData);
}

public static ByteBuffer decryptAndUncompressPageData(
PageHeader pageHeader,
IUnCompressor unCompressor,
ByteBuffer compressedPageData,
IDecryptor decryptor)
throws IOException {
int compressedPageBodyLength = pageHeader.getCompressedSize();
byte[] uncompressedPageData = new byte[pageHeader.getUncompressedSize()];
try {
byte[] decryptedPageData =
decryptor.decrypt(compressedPageData.array(), 0, compressedPageBodyLength);
unCompressor.uncompress(
decryptedPageData, 0, compressedPageBodyLength, uncompressedPageData, 0);
} catch (Exception e) {
throw new IOException(
"Decrypt and Uncompress error! uncompress size: "
+ pageHeader.getUncompressedSize()
+ "compressed size: "
+ pageHeader.getCompressedSize()
+ "page header: "
+ pageHeader
+ e.getMessage());
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved
}
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved

return ByteBuffer.wrap(uncompressedPageData);
}

public static ByteBuffer deserializePageData(
PageHeader pageHeader, ByteBuffer chunkBuffer, ChunkHeader chunkHeader) throws IOException {
PageHeader pageHeader, ByteBuffer chunkBuffer, ChunkHeader chunkHeader, IDecryptor decryptor)
throws IOException {
IUnCompressor unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
ByteBuffer compressedPageBody = readCompressedPageData(pageHeader, chunkBuffer);
return uncompressPageData(pageHeader, unCompressor, compressedPageBody);
if (decryptor == null || decryptor.getEncryptionType() == EncryptionType.UNENCRYPTED) {
return uncompressPageData(pageHeader, unCompressor, compressedPageBody);
} else {
return decryptAndUncompressPageData(pageHeader, unCompressor, compressedPageBody, decryptor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,9 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException {
== TsFileConstant.TIME_COLUMN_MASK) {
timeChunkList.add(
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())));
chunkHeader,
tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()),
tsFileSequenceReader.getDecryptor()));
break;
}

Expand All @@ -353,12 +355,14 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException {
? new ChunkReader(
new Chunk(
chunkHeader,
tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())),
tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()),
tsFileSequenceReader.getDecryptor()),
filter)
: new SinglePageWholeChunkReader(
new Chunk(
chunkHeader,
tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())));
tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()),
tsFileSequenceReader.getDecryptor()));
currentIsAligned = false;
currentMeasurements.add(
new MeasurementSchema(chunkHeader.getMeasurementID(), chunkHeader.getDataType()));
Expand Down Expand Up @@ -402,7 +406,9 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException {

valueChunkList.add(
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())));
chunkHeader,
tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()),
tsFileSequenceReader.getDecryptor()));
currentMeasurements.add(
new MeasurementSchema(chunkHeader.getMeasurementID(), chunkHeader.getDataType()));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ public Chunk get(
FileReaderManager.getInstance().get(chunkCacheKey.getFilePath(), true);
Chunk chunk = reader.readMemChunk(chunkCacheKey.offsetOfChunkHeader);
return new Chunk(
chunk.getHeader(), chunk.getData().duplicate(), timeRangeList, chunkStatistic);
chunk.getHeader(),
chunk.getData().duplicate(),
timeRangeList,
chunkStatistic,
chunk.getDecryptor());
}

Chunk chunk = lruCache.get(chunkCacheKey);
Expand All @@ -129,7 +133,11 @@ public Chunk get(
}

return new Chunk(
chunk.getHeader(), chunk.getData().duplicate(), timeRangeList, chunkStatistic);
chunk.getHeader(),
chunk.getData().duplicate(),
timeRangeList,
chunkStatistic,
chunk.getDecryptor());
} finally {
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
READ_CHUNK_ALL, System.nanoTime() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encrypt.IDecryptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
Expand All @@ -38,14 +39,16 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.tsfile.read.reader.chunk.ChunkReader.uncompressPageData;
import static org.apache.tsfile.read.reader.chunk.ChunkReader.decryptAndUncompressPageData;

public class CompactionAlignedChunkReader {

// chunk headers of all the sub sensors
private final List<ChunkHeader> valueChunkHeaderList = new ArrayList<>();

private final IUnCompressor timeUnCompressor;

private final IDecryptor decryptor;
private final Decoder timeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
Expand All @@ -61,7 +64,7 @@ public class CompactionAlignedChunkReader {
public CompactionAlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList) {
ChunkHeader timeChunkHeader = timeChunk.getHeader();
this.timeUnCompressor = IUnCompressor.getUnCompressor(timeChunkHeader.getCompressionType());

this.decryptor = timeChunk.getDecryptor();
valueChunkList.forEach(
chunk -> {
this.valueChunkHeaderList.add(chunk == null ? null : chunk.getHeader());
Expand Down Expand Up @@ -106,9 +109,10 @@ public AlignedPageReader getAlignedPageReader(
List<ByteBuffer> compressedValuePageDatas)
throws IOException {

// uncompress time page data
// decrypt and uncompress time page data
ByteBuffer uncompressedTimePageData =
uncompressPageData(timePageHeader, timeUnCompressor, compressedTimePageData);
decryptAndUncompressPageData(
timePageHeader, timeUnCompressor, compressedTimePageData, decryptor);
// uncompress value page datas
List<ByteBuffer> uncompressedValuePageDatas = new ArrayList<>();
List<TSDataType> valueTypes = new ArrayList<>();
Expand All @@ -121,10 +125,11 @@ public AlignedPageReader getAlignedPageReader(
} else {
ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i);
uncompressedValuePageDatas.add(
uncompressPageData(
decryptAndUncompressPageData(
valuePageHeaders.get(i),
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()),
compressedValuePageDatas.get(i)));
compressedValuePageDatas.get(i),
decryptor));
TSDataType valueType = valueChunkHeader.getDataType();
valueDecoders.add(Decoder.getDecoderByType(valueChunkHeader.getEncodingType(), valueType));
valueTypes.add(valueType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encrypt.IDecryptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
Expand All @@ -39,13 +40,15 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.tsfile.read.reader.chunk.ChunkReader.uncompressPageData;
import static org.apache.tsfile.read.reader.chunk.ChunkReader.decryptAndUncompressPageData;

public class CompactionChunkReader {

private final ChunkHeader chunkHeader;
private ByteBuffer chunkDataBuffer;
private final IUnCompressor unCompressor;

private final IDecryptor decryptor;
private final Decoder timeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
Expand All @@ -66,6 +69,7 @@ public CompactionChunkReader(Chunk chunk) {
this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
this.deleteIntervalList = chunk.getDeleteIntervalList();
this.chunkStatistic = chunk.getChunkStatistic();
this.decryptor = chunk.getDecryptor();
}

/**
Expand Down Expand Up @@ -126,8 +130,10 @@ public static ByteBuffer readCompressedPageData(PageHeader pageHeader, ByteBuffe
*/
public TsBlock readPageData(PageHeader pageHeader, ByteBuffer compressedPageData)
throws IOException {
// uncompress page data
ByteBuffer pageData = uncompressPageData(pageHeader, unCompressor, compressedPageData);
// decrypt and uncompress page data
ByteBuffer pageData =
decryptAndUncompressPageData(pageHeader, unCompressor, compressedPageData, decryptor);
;
zhujt20 marked this conversation as resolved.
Show resolved Hide resolved

// decode page data
TSDataType dataType = chunkHeader.getDataType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public List<PageLoader> getPages() {
chunk.getHeader().getDataType(),
chunk.getHeader().getEncodingType(),
chunkMetadata,
pageModifiedStatus));
pageModifiedStatus,
chunk.getDecryptor()));
}
return pageList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,28 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;

import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encrypt.IDecryptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.EncryptionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;

import java.io.IOException;
import java.nio.ByteBuffer;

import static org.apache.tsfile.read.reader.chunk.ChunkReader.decryptAndUncompressPageData;
import static org.apache.tsfile.read.reader.chunk.ChunkReader.uncompressPageData;

public class InstantPageLoader extends PageLoader {

private ByteBuffer pageData;

private IDecryptor decryptor;

public InstantPageLoader() {}

public InstantPageLoader(
Expand All @@ -49,9 +54,11 @@ public InstantPageLoader(
TSDataType dataType,
TSEncoding encoding,
ChunkMetadata chunkMetadata,
ModifiedStatus modifiedStatus) {
ModifiedStatus modifiedStatus,
IDecryptor decryptor) {
super(file, pageHeader, compressionType, dataType, encoding, chunkMetadata, modifiedStatus);
this.pageData = pageData;
this.decryptor = decryptor;
}

@Override
Expand All @@ -62,7 +69,11 @@ public ByteBuffer getCompressedData() {
@Override
public ByteBuffer getUnCompressedData() throws IOException {
IUnCompressor unCompressor = IUnCompressor.getUnCompressor(compressionType);
return uncompressPageData(pageHeader, unCompressor, pageData);
if (decryptor == null || decryptor.getEncryptionType() == EncryptionType.UNENCRYPTED) {
return uncompressPageData(pageHeader, unCompressor, pageData);
} else {
return decryptAndUncompressPageData(pageHeader, unCompressor, pageData, decryptor);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import java.util.Map;
import java.util.Set;

import static org.apache.tsfile.read.reader.chunk.ChunkReader.uncompressPageData;
import static org.apache.tsfile.read.reader.chunk.ChunkReader.decryptAndUncompressPageData;

public class RepairDataFileScanUtil {
private static final Logger logger = LoggerFactory.getLogger(RepairDataFileScanUtil.class);
Expand Down Expand Up @@ -136,10 +136,11 @@ private void checkAlignedDeviceSeries(
ByteBuffer pageData = chunkReader.readPageDataWithoutUncompressing(pageHeader);

ByteBuffer uncompressedPageData =
uncompressPageData(
decryptAndUncompressPageData(
pageHeader,
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
pageData);
pageData,
timeChunk.getDecryptor());
Decoder decoder =
Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
while (decoder.hasNext(uncompressedPageData)) {
Expand Down Expand Up @@ -194,10 +195,11 @@ private void checkSingleNonAlignedSeries(
ByteBuffer pageData = chunkReader.readPageDataWithoutUncompressing(pageHeader);

ByteBuffer uncompressedPageData =
uncompressPageData(
decryptAndUncompressPageData(
pageHeader,
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
pageData);
pageData,
chunk.getDecryptor());
ByteBuffer timeBuffer = getTimeBufferFromNonAlignedPage(uncompressedPageData);
Decoder timeDecoder =
Decoder.getDecoderByType(
Expand Down
Loading