diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index 213e517d16ef..b0872ace299b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -39,6 +39,7 @@ public class WALInputStream extends InputStream implements AutoCloseable { private ByteBuffer dataBuffer = null; private long fileSize; File logFile; + private long endOffset = -1; enum FileVersion { V1, @@ -52,9 +53,50 @@ public WALInputStream(File logFile) throws IOException { channel = FileChannel.open(logFile.toPath()); fileSize = channel.size(); analyzeFileVersion(); + getEndOffset(); this.logFile = logFile; } + private void getEndOffset() throws IOException { + if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) { + endOffset = channel.size(); + return; + } + ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES); + long position; + try { + if (version == FileVersion.V2) { + ByteBuffer magicStringBuffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES); + channel.read(magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_BYTES); + magicStringBuffer.flip(); + if (!new String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING)) { + // this is a broken wal file + endOffset = channel.size(); + return; + } + position = channel.size() - WALWriter.MAGIC_STRING_BYTES - Integer.BYTES; + } else { + ByteBuffer magicStringBuffer = + ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1.getBytes().length); + channel.read( + magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_V1.getBytes().length); + magicStringBuffer.flip(); + if (!new String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING_V1)) { + // this is a broken wal file + endOffset = channel.size(); + return; + } + position = channel.size() - WALWriter.MAGIC_STRING_V1.getBytes().length - Integer.BYTES; + } + channel.read(metadataSizeBuf, position); + metadataSizeBuf.flip(); + int metadataSize = metadataSizeBuf.getInt(); + endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES - Integer.BYTES - metadataSize - 1; + } finally { + channel.position(WALWriter.MAGIC_STRING_BYTES); + } + } + private void analyzeFileVersion() throws IOException { if (channel.size() < WALWriter.MAGIC_STRING_BYTES) { version = FileVersion.UNKNOWN; @@ -113,7 +155,7 @@ public void close() throws IOException { @Override public int available() throws IOException { - long size = (channel.size() - channel.position()); + long size = (endOffset - channel.position()); if (!Objects.isNull(dataBuffer)) { size += dataBuffer.limit() - dataBuffer.position(); } @@ -121,6 +163,9 @@ public int available() throws IOException { } private void loadNextSegment() throws IOException { + if (channel.position() >= endOffset) { + throw new IOException("End of file"); + } if (version == FileVersion.V2) { loadNextSegmentV2(); } else if (version == FileVersion.V1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java index cbf2ead7464e..5e53e9f664a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java @@ -22,15 +22,14 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException; +import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -86,8 +85,7 @@ public List doCheck() throws WALException { } private boolean checkFile(File walFile) { - try (DataInputStream logStream = - new DataInputStream(new BufferedInputStream(new FileInputStream(walFile)))) { + try (DataInputStream logStream = new DataInputStream(new WALInputStream(walFile))) { while (logStream.available() > 0) { WALEntry walEntry = WALEntry.deserialize(logStream); if (walEntry.getType() == WALEntryType.WAL_FILE_INFO_END_MARKER) {