Skip to content

Commit

Permalink
Fixed #244
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jul 5, 2024
1 parent 292ea41 commit 38e8395
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ public static async Task SequentialCompaction(bool useCaching)
Equal(entries.Length + 41L, state.Value);
checker = static (readResult, snapshotIndex, token) =>
{
Equal(8, snapshotIndex);
Equal(7, snapshotIndex);
True(Single(readResult).IsSnapshot);
return default;
};
Expand All @@ -679,7 +679,7 @@ public static async Task SequentialCompaction(bool useCaching)
checker = static (readResult, snapshotIndex, token) =>
{
NotEmpty(readResult);
Equal(8, snapshotIndex);
Equal(7, snapshotIndex);
True(readResult[0].IsSnapshot);
False(readResult[1].IsSnapshot);
return default;
Expand All @@ -701,7 +701,7 @@ public static async Task SequentialCompaction(bool useCaching)
checker = static (readResult, snapshotIndex, token) =>
{
NotEmpty(readResult);
Equal(8, snapshotIndex);
Equal(7, snapshotIndex);
return default;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1, CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,22 +394,28 @@ private sealed class Table : Partition, IReadOnlyList<ReadOnlyMemory<byte>>
private MemoryOwner<byte> header, footer;
private (ReadOnlyMemory<byte>, ReadOnlyMemory<byte>) bufferTuple;

internal Table(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize)
internal Table(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize, ReadOnlySpan<byte> initialFooter)
: base(location, HeaderSize, bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize)
{
footer = manager.BufferAllocator.AllocateExactly(recordsPerPartition * LogEntryMetadata.Size);
#if DEBUG
footer.Span.Clear();
#endif
initialFooter.CopyTo(footer.Span);

header = manager.BufferAllocator.AllocateExactly(HeaderSize);
header.Span.Clear();
}

internal static MemoryOwner<byte> AllocateInitializedFooter(MemoryAllocator<byte> allocator, int recordsPerPartition)
{
var footer = allocator.AllocateExactly(recordsPerPartition * LogEntryMetadata.Size);

// init ephemeral 0 entry
if (PartitionNumber is 0L)
for (var index = 0; index < recordsPerPartition; index++)
{
LogEntryMetadata.Create(LogEntry.Initial, HeaderSize + LogEntryMetadata.Size, length: 0L).Format(footer.Span);
var writeAddress = index * LogEntryMetadata.Size;
var metadata = new LogEntryMetadata(default, 0L, writeAddress + HeaderSize + LogEntryMetadata.Size, 0L);
metadata.Format(footer.Span.Slice(writeAddress));
}

return footer;
}

private bool IsSealed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public abstract partial class PersistentState : Disposable, IPersistentState
private protected readonly WriteMode writeMode;
private readonly bool parallelIO;
private readonly long maxLogEntrySize; // 0 - modern partition, > 0 - sparse partition, < 0 - legacy partition
private MemoryOwner<byte> initializedFooter;

static PersistentState()
{
Expand Down Expand Up @@ -86,6 +87,7 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O
maxLogEntrySize);
break;
case 0L:
initializedFooter = Table.AllocateInitializedFooter(bufferManager.BufferAllocator, recordsPerPartition);
CreateTables(
partitionTable,
path,
Expand All @@ -95,7 +97,8 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O
concurrentReads,
writeMode,
initialSize,
state.LastIndex);
state.LastIndex,
initializedFooter.Span);
break;
case < 0L:
#pragma warning disable CS0618,CS0612
Expand Down Expand Up @@ -133,13 +136,13 @@ private protected PersistentState(DirectoryInfo path, int recordsPerPartition, O

static int ComparePartitions(Partition x, Partition y) => x.PartitionNumber.CompareTo(y.PartitionNumber);

static void CreateTables(SortedSet<Partition> partitionTable, DirectoryInfo path, int bufferSize, int recordsPerPartition, in BufferManager manager, int concurrentReads, WriteMode writeMode, long initialSize, long lastIndex)
static void CreateTables(SortedSet<Partition> partitionTable, DirectoryInfo path, int bufferSize, int recordsPerPartition, in BufferManager manager, int concurrentReads, WriteMode writeMode, long initialSize, long lastIndex, ReadOnlySpan<byte> initializedFooter)
{
foreach (var file in path.EnumerateFiles())
{
if (long.TryParse(file.Name, out var partitionNumber))
{
var partition = new Table(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in manager, concurrentReads, writeMode, initialSize);
var partition = new Table(file.Directory!, bufferSize, recordsPerPartition, partitionNumber, in manager, concurrentReads, writeMode, initialSize, initializedFooter);
partition.Initialize(lastIndex);
partitionTable.Add(partition);
}
Expand Down Expand Up @@ -187,7 +190,7 @@ static void CreateLegacyPartitions(SortedSet<Partition> partitionTable, Director
private partial Partition CreatePartition(long partitionNumber) => maxLogEntrySize switch
{
> 0L => new SparsePartition(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize, maxLogEntrySize),
0L => new Table(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize),
0L => new Table(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize, initializedFooter.Span),
#pragma warning disable CS0618,CS0612
< 0L => new LegacyPartition(Location, bufferSize, recordsPerPartition, partitionNumber, in bufferManager, concurrentReads, writeMode, initialSize),
#pragma warning restore CS0618,CS0612
Expand Down Expand Up @@ -1031,6 +1034,7 @@ protected override void Dispose(bool disposing)
commitEvent.Dispose();
syncRoot.Dispose();
bufferingConsumer?.Clear();
initializedFooter.Dispose();
}

base.Dispose(disposing);
Expand Down

0 comments on commit 38e8395

Please sign in to comment.