Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
THUMarkLau committed May 26, 2024
1 parent f372da1 commit 069795d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class WALInputStream extends InputStream implements AutoCloseable {
private ByteBuffer dataBuffer = null;
private long fileSize;
File logFile;
private long endOffset = -1;

enum FileVersion {
V1,
Expand All @@ -52,9 +53,50 @@ public WALInputStream(File logFile) throws IOException {
channel = FileChannel.open(logFile.toPath());
fileSize = channel.size();
analyzeFileVersion();
getEndOffset();
this.logFile = logFile;
}

private void getEndOffset() throws IOException {
if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) {
endOffset = channel.size();
return;
}
ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
long position;
try {
if (version == FileVersion.V2) {
ByteBuffer magicStringBuffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
channel.read(magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_BYTES);
magicStringBuffer.flip();
if (!new String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING)) {
// this is a broken wal file
endOffset = channel.size();
return;
}
position = channel.size() - WALWriter.MAGIC_STRING_BYTES - Integer.BYTES;
} else {
ByteBuffer magicStringBuffer =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1.getBytes().length);
channel.read(
magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_V1.getBytes().length);
magicStringBuffer.flip();
if (!new String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING_V1)) {
// this is a broken wal file
endOffset = channel.size();
return;
}
position = channel.size() - WALWriter.MAGIC_STRING_V1.getBytes().length - Integer.BYTES;
}
channel.read(metadataSizeBuf, position);
metadataSizeBuf.flip();
int metadataSize = metadataSizeBuf.getInt();
endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES - Integer.BYTES - metadataSize - 1;
} finally {
channel.position(WALWriter.MAGIC_STRING_BYTES);
}
}

private void analyzeFileVersion() throws IOException {
if (channel.size() < WALWriter.MAGIC_STRING_BYTES) {
version = FileVersion.UNKNOWN;
Expand Down Expand Up @@ -113,14 +155,17 @@ public void close() throws IOException {

@Override
public int available() throws IOException {
long size = (channel.size() - channel.position());
long size = (endOffset - channel.position());
if (!Objects.isNull(dataBuffer)) {
size += dataBuffer.limit() - dataBuffer.position();
}
return (int) size;
}

private void loadNextSegment() throws IOException {
if (channel.position() >= endOffset) {
throw new IOException("End of file");
}
if (version == FileVersion.V2) {
loadNextSegmentV2();
} else if (version == FileVersion.V1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -86,8 +85,7 @@ public List<File> doCheck() throws WALException {
}

private boolean checkFile(File walFile) {
try (DataInputStream logStream =
new DataInputStream(new BufferedInputStream(new FileInputStream(walFile)))) {
try (DataInputStream logStream = new DataInputStream(new WALInputStream(walFile))) {
while (logStream.available() > 0) {
WALEntry walEntry = WALEntry.deserialize(logStream);
if (walEntry.getType() == WALEntryType.WAL_FILE_INFO_END_MARKER) {
Expand Down

0 comments on commit 069795d

Please sign in to comment.