Skip to content

Commit

Permalink
Support WAL Compression (#12476)
Browse files Browse the repository at this point in the history
* enable wal compression

remove metrics in mem table flush task, cache hash code in partial path, use gzip to compress wal

batch update metrics

* fix bug

* fix compilation problem

* remove useless code

* recover some code

* support compression type in WAL Compress Header

* support multi version WAL

* edit configuration item

* add log for WAL size

* temp for debug

* fix bug

* remove useless log

* remove one configuration

* use compression rate to update wal disk usage

* fix ut

* fix test

* set default to uncompress

* fix wal ut

* optimize calculating of wal size

* close wal file when the origin size of wal buffer is larger than threshold

* add the size of magic string

* may be fix the bug

* fix with comment

* edit with review

* fix test

* add test for wal compression

* add hot reload

* clean the code to make it more readable

* reuse the byte buffer if possible

* Indicate the encoding of String

* Edit according to comment

* spotless
  • Loading branch information
THUMarkLau authored Jun 18, 2024
1 parent d2169a8 commit 0bb4619
Show file tree
Hide file tree
Showing 26 changed files with 1,107 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSType;
import org.apache.tsfile.utils.FSUtils;
Expand Down Expand Up @@ -1137,6 +1138,8 @@ public class IoTDBConfig {
*/
private String RateLimiterType = "FixedIntervalRateLimiter";

private CompressionType WALCompressionAlgorithm = CompressionType.UNCOMPRESSED;

IoTDBConfig() {}

public int getMaxLogEntriesNumPerBatch() {
Expand Down Expand Up @@ -1881,7 +1884,7 @@ public long getWalFileSizeThresholdInByte() {
return walFileSizeThresholdInByte;
}

void setWalFileSizeThresholdInByte(long walFileSizeThresholdInByte) {
public void setWalFileSizeThresholdInByte(long walFileSizeThresholdInByte) {
this.walFileSizeThresholdInByte = walFileSizeThresholdInByte;
}

Expand Down Expand Up @@ -3984,4 +3987,12 @@ public TDataNodeLocation generateLocalDataNodeLocation() {
new TEndPoint(getInternalAddress(), getSchemaRegionConsensusPort()));
return result;
}

public CompressionType getWALCompressionAlgorithm() {
return WALCompressionAlgorithm;
}

public void setWALCompressionAlgorithm(CompressionType WALCompressionAlgorithm) {
this.WALCompressionAlgorithm = WALCompressionAlgorithm;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSType;
import org.apache.tsfile.utils.FilePathUtils;
Expand Down Expand Up @@ -416,6 +417,10 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
properties.getProperty(
"io_task_queue_size_for_flushing",
Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
boolean enableWALCompression =
Boolean.parseBoolean(properties.getProperty("enable_wal_compression", "false"));
conf.setWALCompressionAlgorithm(
enableWALCompression ? CompressionType.LZ4 : CompressionType.UNCOMPRESSED);

conf.setCompactionScheduleIntervalInMs(
Long.parseLong(
Expand Down Expand Up @@ -1793,6 +1798,10 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep
properties.getProperty(
"merge_threshold_of_explain_analyze",
String.valueOf(conf.getMergeThresholdOfExplainAnalyze()))));
boolean enableWALCompression =
Boolean.parseBoolean(properties.getProperty("enable_wal_compression", "false"));
conf.setWALCompressionAlgorithm(
enableWALCompression ? CompressionType.LZ4 : CompressionType.UNCOMPRESSED);

// update Consensus config
reloadConsensusProps(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;

public abstract class AbstractNodeAllocationStrategy implements NodeAllocationStrategy {
Expand Down Expand Up @@ -72,8 +72,8 @@ protected IWALNode createWALNode(String identifier) {
protected IWALNode createWALNode(String identifier, String folder) {
try {
return new WALNode(identifier, folder);
} catch (FileNotFoundException e) {
logger.error("Fail to create wal node", e);
} catch (IOException e) {
logger.error("Meet exception when creating wal node", e);
return WALFakeNode.getFailureInstance(e);
}
}
Expand All @@ -82,7 +82,7 @@ protected IWALNode createWALNode(
String identifier, String folder, long startFileVersion, long startSearchIndex) {
try {
return new WALNode(identifier, folder, startFileVersion, startSearchIndex);
} catch (FileNotFoundException e) {
} catch (IOException e) {
logger.error("Fail to create wal node", e);
return WALFakeNode.getFailureInstance(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -58,7 +57,7 @@ public abstract class AbstractWALBuffer implements IWALBuffer {

protected AbstractWALBuffer(
String identifier, String logDirectory, long startFileVersion, long startSearchIndex)
throws FileNotFoundException {
throws IOException {
this.identifier = identifier;
this.logDirectory = logDirectory;
File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
Expand Down Expand Up @@ -119,7 +118,7 @@ public class WALBuffer extends AbstractWALBuffer {
// manage wal files which have MemTableIds
private final Map<Long, Set<Long>> memTableIdsOfWal = new ConcurrentHashMap<>();

public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
public WALBuffer(String identifier, String logDirectory) throws IOException {
this(identifier, logDirectory, new CheckpointManager(identifier, logDirectory), 0, 0L);
}

Expand All @@ -129,7 +128,7 @@ public WALBuffer(
CheckpointManager checkpointManager,
long startFileVersion,
long startSearchIndex)
throws FileNotFoundException {
throws IOException {
super(identifier, logDirectory, startFileVersion, startSearchIndex);
this.checkpointManager = checkpointManager;
currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
Expand Down Expand Up @@ -521,8 +520,9 @@ public void run() {
forceFlag, syncingBuffer.position(), syncingBuffer.capacity(), usedRatio * 100);

// flush buffer to os
double compressionRatio = 1.0;
try {
currentWALFileWriter.write(syncingBuffer, info.metaData);
compressionRatio = currentWALFileWriter.write(syncingBuffer, info.metaData);
} catch (Throwable e) {
logger.error(
"Fail to sync wal node-{}'s buffer, change system mode to error.", identifier, e);
Expand All @@ -535,12 +535,14 @@ public void run() {
memTableIdsOfWal
.computeIfAbsent(currentWALFileVersion, memTableIds -> new HashSet<>())
.addAll(info.metaData.getMemTablesId());
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage);
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage, compressionRatio);

boolean forceSuccess = false;
// try to roll log writer
if (info.rollWALFileWriterListener != null
|| (forceFlag && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
// TODO: Control the wal file by the number of WALEntry
|| (forceFlag
&& currentWALFileWriter.originalSize() >= config.getWalFileSizeThresholdInByte())) {
try {
rollLogWriter(searchIndex, currentWALFileWriter.getWalFileStatus());
forceSuccess = true;
Expand Down Expand Up @@ -582,7 +584,7 @@ public void run() {
position += fsyncListener.getWalEntryHandler().getSize();
}
}
lastFsyncPosition = currentWALFileWriter.size();
lastFsyncPosition = currentWALFileWriter.originalSize();
}
WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
WRITING_METRICS.recordSyncWALBufferCost(System.nanoTime() - startTime, forceFlag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
Expand Down Expand Up @@ -78,7 +77,7 @@ public class CheckpointManager implements AutoCloseable {

// endregion

public CheckpointManager(String identifier, String logDirectory) throws FileNotFoundException {
public CheckpointManager(String identifier, String logDirectory) throws IOException {
this.identifier = identifier;
this.logDirectory = logDirectory;
File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
Expand Down Expand Up @@ -345,12 +344,13 @@ public long getFirstValidWALVersionId() {
}

/** Update wal disk cost of active memTables. */
public void updateCostOfActiveMemTables(Map<Long, Long> memTableId2WalDiskUsage) {
public void updateCostOfActiveMemTables(
Map<Long, Long> memTableId2WalDiskUsage, double compressionRate) {
for (Map.Entry<Long, Long> memTableWalUsage : memTableId2WalDiskUsage.entrySet()) {
memTableId2Info.computeIfPresent(
memTableWalUsage.getKey(),
(k, v) -> {
v.addWalDiskUsage(memTableWalUsage.getValue());
v.addWalDiskUsage((long) (memTableWalUsage.getValue() * compressionRate));
return v;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -47,8 +45,7 @@ public CheckpointReader(File logFile) {

private void init() {
checkpoints = new ArrayList<>();
try (DataInputStream logStream =
new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)))) {
try (DataInputStream logStream = new DataInputStream(new WALInputStream(logFile))) {
maxMemTableId = logStream.readLong();
while (logStream.available() > 0) {
Checkpoint checkpoint = Checkpoint.deserialize(logStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;

/** CheckpointWriter writes the binary {@link Checkpoint} into .checkpoint file. */
public class CheckpointWriter extends LogWriter {
public CheckpointWriter(File logFile) throws FileNotFoundException {
public CheckpointWriter(File logFile) throws IOException {
super(logFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public interface ILogWriter extends Closeable {
*
* @param buffer content that have been converted to bytes
* @throws IOException if an I/O error occurs
* @return Compression rate of the buffer after compression
*/
void write(ByteBuffer buffer) throws IOException;
double write(ByteBuffer buffer) throws IOException;

/**
* Forces any updates to this file to be written to the storage device that contains it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@

package org.apache.iotdb.db.storageengine.dataregion.wal.io;

import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;

import org.apache.tsfile.compress.ICompressor;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

/**
* LogWriter writes the binary logs into a file, including writing {@link WALEntry} into .wal file
Expand All @@ -43,23 +47,89 @@ public abstract class LogWriter implements ILogWriter {
protected final File logFile;
protected final FileOutputStream logStream;
protected final FileChannel logChannel;
protected long size;
protected long size = 0;
protected long originalSize = 0;
private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 2 + 1);
private ICompressor compressor =
ICompressor.getCompressor(
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm());
private ByteBuffer compressedByteBuffer;
// Minimum size to compress, default is 32 KB
private static long minCompressionSize = 32 * 1024L;

protected LogWriter(File logFile) throws FileNotFoundException {
protected LogWriter(File logFile) throws IOException {
this.logFile = logFile;
this.logStream = new FileOutputStream(logFile, true);
this.logChannel = this.logStream.getChannel();
if (!logFile.exists() || logFile.length() == 0) {
this.logChannel.write(
ByteBuffer.wrap(WALWriter.MAGIC_STRING.getBytes(StandardCharsets.UTF_8)));
size += logChannel.position();
}
if (IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm()
!= CompressionType.UNCOMPRESSED) {
// TODO: Use a dynamic strategy to enlarge the buffer size
compressedByteBuffer =
ByteBuffer.allocate(
compressor.getMaxBytesForCompression(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
} else {
compressedByteBuffer = null;
}
}

@Override
public void write(ByteBuffer buffer) throws IOException {
size += buffer.position();
public double write(ByteBuffer buffer) throws IOException {
CompressionType compressionType =
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
int bufferSize = buffer.position();
if (bufferSize == 0) {
return 1.0;
}
originalSize += bufferSize;
buffer.flip();
boolean compressed = false;
int uncompressedSize = bufferSize;
if (compressionType != CompressionType.UNCOMPRESSED
/* Do not compress buffer that is less than min size */
&& bufferSize > minCompressionSize) {
if (Objects.isNull(compressedByteBuffer)) {
compressedByteBuffer =
ByteBuffer.allocate(
compressor.getMaxBytesForCompression(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
}
compressedByteBuffer.clear();
if (compressor.getType() != compressionType) {
compressor = ICompressor.getCompressor(compressionType);
}
compressor.compress(buffer, compressedByteBuffer);
buffer = compressedByteBuffer;
bufferSize = buffer.position();
buffer.flip();
compressed = true;
}
size += bufferSize;
/*
Header structure:
[CompressionType(1 byte)][dataBufferSize(4 bytes)][uncompressedSize(4 bytes)]
*/
headerBuffer.clear();
headerBuffer.put(
compressed ? compressionType.serialize() : CompressionType.UNCOMPRESSED.serialize());
headerBuffer.putInt(bufferSize);
if (compressed) {
headerBuffer.putInt(uncompressedSize);
}
size += headerBuffer.position();
try {
headerBuffer.flip();
logChannel.write(headerBuffer);
logChannel.write(buffer);
} catch (ClosedChannelException e) {
logger.warn("Cannot write to {}", logFile, e);
}
return ((double) bufferSize / uncompressedSize);
}

@Override
Expand All @@ -79,6 +149,10 @@ public long size() {
return size;
}

public long originalSize() {
return originalSize;
}

@Override
public File getLogFile() {
return logFile;
Expand All @@ -97,4 +171,8 @@ public void close() throws IOException {
}
}
}

public long getOffset() throws IOException {
return logChannel.position();
}
}
Loading

0 comments on commit 0bb4619

Please sign in to comment.