diff --git a/benchmark/BDN.benchmark/RecoveryBenchmark.cs b/benchmark/BDN.benchmark/RecoveryBenchmark.cs new file mode 100644 index 0000000000..af6144b4c9 --- /dev/null +++ b/benchmark/BDN.benchmark/RecoveryBenchmark.cs @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Columns; +using BenchmarkDotNet.Configs; +using Embedded.perftest; +using Garnet.server; + +namespace BDN.benchmark +{ + public class CustomConfig : ManualConfig + { + public CustomConfig() + { + AddColumn(StatisticColumn.Mean); + AddColumn(StatisticColumn.StdDev); + AddColumn(StatisticColumn.Median); + AddColumn(StatisticColumn.P90); + AddColumn(StatisticColumn.P95); + } + } + + [Config(typeof(CustomConfig))] + public class RecoveryBenchmark + { + [ParamsSource(nameof(CommandLineArgsProvider))] + public string LogDir { get; set; } + + public IEnumerable CommandLineArgsProvider() + { + // Return the command line arguments as an enumerable + return Environment.GetCommandLineArgs().Skip(1); + } + + [Params("100m")] + public string MemorySize { get; set; } + + EmbeddedRespServer server; + + [IterationSetup] + public void Setup() + { + Console.WriteLine($"LogDir: {LogDir}"); + server = new EmbeddedRespServer(new GarnetServerOptions() + { + EnableStorageTier = true, + LogDir = LogDir, + CheckpointDir = LogDir, + IndexSize = "1m", + DisableObjects = true, + MemorySize = MemorySize, + PageSize = "32k", + }); + } + + [IterationCleanup] + public void Cleanup() + { + server.Dispose(); + } + + [Benchmark] + public void Recover() + { + server.StoreWrapper.RecoverCheckpoint(); + } + } +} \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index 359a6b8aa1..b6dfac758f 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -1054,6 +1054,11 @@ public virtual void Dispose() /// public int AllocatedPageCount; + /// + /// Max number of pages that have been allocated at any point in time + /// + public int MaxAllocatedPageCount; + /// /// Maximum possible number of empty pages in circular buffer /// @@ -1116,6 +1121,22 @@ public int EmptyPageCount /// internal abstract void DeleteFromMemory(); + /// + /// Increments AllocatedPageCount + /// Update MaxAllocatedPageCount, if a higher number of pages have been allocated. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + protected void IncrementAllocatedPageCount() + { + var newAllocatedPageCount = Interlocked.Increment(ref AllocatedPageCount); + var currMaxAllocatedPageCount = MaxAllocatedPageCount; + while (currMaxAllocatedPageCount < newAllocatedPageCount) + { + if (Interlocked.CompareExchange(ref MaxAllocatedPageCount, newAllocatedPageCount, currMaxAllocatedPageCount) == currMaxAllocatedPageCount) + return; + currMaxAllocatedPageCount = MaxAllocatedPageCount; + } + } /// /// Segment size diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocator.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocator.cs index 103b6b7fbd..f835cabbff 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocator.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocator.cs @@ -138,7 +138,7 @@ public override void Dispose() /// internal override void AllocatePage(int index) { - Interlocked.Increment(ref AllocatedPageCount); + IncrementAllocatedPageCount(); if (overflowPagePool.TryGet(out var item)) { diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocator.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocator.cs index f142e3e9b3..b96081735d 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocator.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocator.cs @@ -281,7 +281,7 @@ internal override void AllocatePage(int index) internal Record[] AllocatePage() { - Interlocked.Increment(ref AllocatedPageCount); + IncrementAllocatedPageCount(); if (overflowPagePool.TryGet(out var item)) return item; diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocator.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocator.cs index aba531ad30..1eaca3327d 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocator.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocator.cs @@ -205,7 +205,7 @@ public override void Dispose() /// internal override void AllocatePage(int index) { - Interlocked.Increment(ref AllocatedPageCount); + IncrementAllocatedPageCount(); if (overflowPagePool.TryGet(out var item)) { diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs index 9877c83ad8..bb6c16019e 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs @@ -20,6 +20,7 @@ internal sealed class RecoveryStatus public long snapshotEndPage; public long untilAddress; public int capacity; + public int usableCapacity; public CheckpointType checkpointType; public IDevice recoveryDevice; @@ -33,10 +34,11 @@ internal sealed class RecoveryStatus private readonly SemaphoreSlim readSemaphore = new(0); private readonly SemaphoreSlim flushSemaphore = new(0); - public RecoveryStatus(int capacity, + public RecoveryStatus(int capacity, int emptyPageCount, long endPage, long untilAddress, CheckpointType checkpointType) { this.capacity = capacity; + this.usableCapacity = capacity - emptyPageCount; this.endPage = endPage; this.untilAddress = untilAddress; this.checkpointType = checkpointType; @@ -501,7 +503,7 @@ await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogical private void DoPostRecovery(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, long tailAddress, ref long headAddress, ref long readOnlyAddress) { // Adjust head and read-only address post-recovery - var _head = (1 + (tailAddress >> hlog.LogPageSizeBits) - hlog.GetCapacityNumPages()) << hlog.LogPageSizeBits; + var _head = (1 + (tailAddress >> hlog.LogPageSizeBits) - (hlog.GetCapacityNumPages() - hlog.MinEmptyPageCount)) << hlog.LogPageSizeBits; if (_head > headAddress) headAddress = _head; if (readOnlyAddress < headAddress) @@ -607,25 +609,100 @@ private bool SetRecoveryPageRanges(HybridLogCheckpointInfo recoveredHLCInfo, int return true; } + /// + /// This method ensures that before 'pagesToRead' number of pages are read into memory, any previously allocated pages + /// that would cause total number of pages in memory to go beyond usableCapacity are freed. This is to ensure that + /// memory size constraint is maintained during recovery. + /// Illustration with capacity 32, usableCapacity 20, pagesToRead 2: + /// beg: startPage - 32 + /// end: startPage - 18 + /// We free these 14 pages, leaving 18 allocated, and then read 2, which fills up usableCapacity. + /// The beg, end can only be zero on the first pass through the buffer, as the page number continuously increases + /// + private void FreePagesBeyondUsableCapacity(long startPage, int capacity, int usableCapacity, int pagesToRead, RecoveryStatus recoveryStatus) + { + var beg = startPage - capacity; + var end = startPage - (usableCapacity - pagesToRead); + if (beg < 0) beg = 0; + if (end < 0) end = 0; + + WaitUntilAllPagesHaveBeenFlushed(beg, end, recoveryStatus); + for (var page = beg; page < end; page++) + { + if (hlog.IsAllocated(hlog.GetPageIndexForPage(page))) + hlog.FreePage(page); + } + } + + private void ReadPagesWithMemoryConstraint(long endAddress, int capacity, RecoveryStatus recoveryStatus, long page, long endPage, int numPagesToRead) + { + // Before reading in additional pages, make sure that any previously allocated pages that would violate the memory size + // constraint are freed. + FreePagesBeyondUsableCapacity(startPage: page, capacity: capacity, usableCapacity: capacity - hlog.MinEmptyPageCount, pagesToRead: numPagesToRead, recoveryStatus); + + // Issue request to read pages as much as possible + for (var p = page; p < endPage; p++) recoveryStatus.readStatus[hlog.GetPageIndexForPage(p)] = ReadStatus.Pending; + hlog.AsyncReadPagesFromDevice(page, numPagesToRead, endAddress, + hlog.AsyncReadPagesCallbackForRecovery, + recoveryStatus, recoveryStatus.recoveryDevicePageOffset, + recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice); + } + + private long ReadPagesForRecovery(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page) + { + var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage); + if (page < readEndPage) + { + var numPagesToRead = (int)(readEndPage - page); + + // Ensure that page slots that will be read into, have been flushed from previous reads. Due to the use of a single read semaphore, + // this must be done in batches of "all flushes' followed by "all reads" to ensure proper sequencing of reads when + // usableCapacity != capacity (and thus the page-read index is not equal to the page-flush index). + WaitUntilAllPagesHaveBeenFlushed(page, readEndPage, recoveryStatus); + ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead); + } + + return readEndPage; + } + + private async ValueTask ReadPagesForRecoveryAsync(long untilAddress, RecoveryStatus recoveryStatus, long endPage, int capacity, int numPagesToReadPerIteration, long page, CancellationToken cancellationToken) + { + var readEndPage = Math.Min(page + numPagesToReadPerIteration, endPage); + if (page < readEndPage) + { + var numPagesToRead = (int)(readEndPage - page); + + // Ensure that page slots that will be read into, have been flushed from previous reads. Due to the use of a single read semaphore, + // this must be done in batches of "all flushes' followed by "all reads" to ensure proper sequencing of reads when + // usableCapacity != capacity (and thus the page-read index is not equal to the page-flush index). + await WaitUntilAllPagesHaveBeenFlushedAsync(page, readEndPage, recoveryStatus, cancellationToken).ConfigureAwait(false); + ReadPagesWithMemoryConstraint(untilAddress, capacity, recoveryStatus, page, readEndPage, numPagesToRead); + } + + return readEndPage; + } + private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, long nextVersion, CheckpointType checkpointType, RecoveryOptions options) { if (untilAddress <= scanFromAddress) return; - var recoveryStatus = GetPageRangesToRead(scanFromAddress, untilAddress, checkpointType, out long startPage, out long endPage, out int capacity, out int numPagesToReadFirst); - - // Issue request to read pages as much as possible - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, hlog.AsyncReadPagesCallbackForRecovery, recoveryStatus); + var recoveryStatus = GetPageRangesToRead(scanFromAddress, untilAddress, checkpointType, out long startPage, out long endPage, out int capacity, out int numPagesToReadPerIteration); - for (long page = startPage; page < endPage; page++) + for (long page = startPage; page < endPage; page += numPagesToReadPerIteration) { - // Ensure page has been read into memory - int pageIndex = hlog.GetPageIndexForPage(page); - recoveryStatus.WaitRead(pageIndex); + var end = ReadPagesForRecovery(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page); + + for (var p = page; p < end; p++) + { + // Ensure page has been read into memory + int pageIndex = hlog.GetPageIndexForPage(p); + recoveryStatus.WaitRead(pageIndex); - // We make an extra pass to clear locks when reading every page back into memory - ClearLocksOnPage(page, options); + // We make an extra pass to clear locks when reading every page back into memory + ClearLocksOnPage(p, options); - ProcessReadPage(recoverFromAddress, untilAddress, nextVersion, options, recoveryStatus, endPage, capacity, page, pageIndex); + ProcessReadPageAndFlush(recoverFromAddress, untilAddress, nextVersion, options, recoveryStatus, p, pageIndex); + } } WaitUntilAllPagesHaveBeenFlushed(startPage, endPage, recoveryStatus); @@ -635,21 +712,23 @@ private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long recover { if (untilAddress <= scanFromAddress) return; - var recoveryStatus = GetPageRangesToRead(scanFromAddress, untilAddress, checkpointType, out long startPage, out long endPage, out int capacity, out int numPagesToReadFirst); - - // Issue request to read pages as much as possible - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, hlog.AsyncReadPagesCallbackForRecovery, recoveryStatus); + var recoveryStatus = GetPageRangesToRead(scanFromAddress, untilAddress, checkpointType, out long startPage, out long endPage, out int capacity, out int numPagesToReadPerIteration); - for (long page = startPage; page < endPage; page++) + for (long page = startPage; page < endPage; page += numPagesToReadPerIteration) { - // Ensure page has been read into memory - int pageIndex = hlog.GetPageIndexForPage(page); - await recoveryStatus.WaitReadAsync(pageIndex, cancellationToken).ConfigureAwait(false); + var end = await ReadPagesForRecoveryAsync(untilAddress, recoveryStatus, endPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false); - // We make an extra pass to clear locks when reading every page back into memory - ClearLocksOnPage(page, options); + for (var p = page; p < end; p++) + { + // Ensure page has been read into memory + int pageIndex = hlog.GetPageIndexForPage(p); + await recoveryStatus.WaitReadAsync(pageIndex, cancellationToken).ConfigureAwait(false); + + // We make an extra pass to clear locks when reading every page back into memory + ClearLocksOnPage(p, options); - ProcessReadPage(recoverFromAddress, untilAddress, nextVersion, options, recoveryStatus, endPage, capacity, page, pageIndex); + ProcessReadPageAndFlush(recoverFromAddress, untilAddress, nextVersion, options, recoveryStatus, p, pageIndex); + } } await WaitUntilAllPagesHaveBeenFlushedAsync(startPage, endPage, recoveryStatus, cancellationToken).ConfigureAwait(false); @@ -666,11 +745,13 @@ private RecoveryStatus GetPageRangesToRead(long scanFromAddress, long untilAddre capacity = hlog.GetCapacityNumPages(); int totalPagesToRead = (int)(endPage - startPage); - numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); - return new RecoveryStatus(capacity, endPage, untilAddress, checkpointType); + + // Leave out at least MinEmptyPageCount pages to maintain memory size during recovery + numPagesToReadFirst = Math.Min(capacity - hlog.MinEmptyPageCount, totalPagesToRead); + return new RecoveryStatus(capacity, hlog.MinEmptyPageCount, endPage, untilAddress, checkpointType); } - private void ProcessReadPage(long recoverFromAddress, long untilAddress, long nextVersion, RecoveryOptions options, RecoveryStatus recoveryStatus, long endPage, int capacity, long page, int pageIndex) + private void ProcessReadPageAndFlush(long recoverFromAddress, long untilAddress, long nextVersion, RecoveryOptions options, RecoveryStatus recoveryStatus, long page, int pageIndex) { if (ProcessReadPage(recoverFromAddress, untilAddress, nextVersion, options, recoveryStatus, page, pageIndex)) { @@ -681,13 +762,6 @@ private void ProcessReadPage(long recoverFromAddress, long untilAddress, long ne // We do not need to flush recoveryStatus.flushStatus[pageIndex] = FlushStatus.Done; - - // Issue next read if there are more pages past 'capacity' from this one. - if (page + capacity < endPage) - { - recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; - hlog.AsyncReadPagesFromDevice(page + capacity, 1, untilAddress, hlog.AsyncReadPagesCallbackForRecovery, recoveryStatus); - } } private bool ProcessReadPage(long recoverFromAddress, long untilAddress, long nextVersion, RecoveryOptions options, RecoveryStatus recoveryStatus, long page, int pageIndex) @@ -733,16 +807,13 @@ private async ValueTask WaitUntilAllPagesHaveBeenFlushedAsync(long startPage, lo private void RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, long nextVersion, Guid guid, RecoveryOptions options, DeltaLog deltaLog, long recoverTo) { - GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadFirst); + GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadPerIteration); - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, snapshotEndAddress, - hlog.AsyncReadPagesCallbackForRecovery, - recoveryStatus, recoveryStatus.recoveryDevicePageOffset, - recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice); - - for (long page = startPage; page < endPage; page += capacity) + for (long page = startPage; page < endPage; page += numPagesToReadPerIteration) { - long end = Math.Min(page + capacity, endPage); + ReadPagesForRecovery(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page); + + long end = Math.Min(page + numPagesToReadPerIteration, endPage); for (long p = page; p < end; p++) { int pageIndex = hlog.GetPageIndexForPage(p); @@ -764,7 +835,7 @@ private void RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recover } } - ApplyDelta(scanFromAddress, recoverFromAddress, untilAddress, nextVersion, options, deltaLog, recoverTo, endPage, snapshotEndPage, capacity, recoveryStatus, page, end); + ApplyDelta(scanFromAddress, recoverFromAddress, untilAddress, nextVersion, options, deltaLog, recoverTo, endPage, snapshotEndPage, capacity, numPagesToReadPerIteration, recoveryStatus, page, end); } WaitUntilAllPagesHaveBeenFlushed(startPage, endPage, recoveryStatus); @@ -773,16 +844,13 @@ private void RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recover private async ValueTask RecoverHybridLogFromSnapshotFileAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, long nextVersion, Guid guid, RecoveryOptions options, DeltaLog deltaLog, long recoverTo, CancellationToken cancellationToken) { - GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadFirst); + GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadPerIteration); - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, snapshotEndAddress, - hlog.AsyncReadPagesCallbackForRecovery, - recoveryStatus, recoveryStatus.recoveryDevicePageOffset, - recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice); - - for (long page = startPage; page < endPage; page += capacity) + for (long page = startPage; page < endPage; page += numPagesToReadPerIteration) { - long end = Math.Min(page + capacity, endPage); + await ReadPagesForRecoveryAsync(snapshotEndAddress, recoveryStatus, snapshotEndPage, capacity, numPagesToReadPerIteration, page, cancellationToken).ConfigureAwait(false); + + long end = Math.Min(page + numPagesToReadPerIteration, endPage); for (long p = page; p < end; p++) { int pageIndex = hlog.GetPageIndexForPage(p); @@ -804,14 +872,14 @@ private async ValueTask RecoverHybridLogFromSnapshotFileAsync(long scanFromAddre } } - ApplyDelta(scanFromAddress, recoverFromAddress, untilAddress, nextVersion, options, deltaLog, recoverTo, endPage, snapshotEndPage, capacity, recoveryStatus, page, end); + ApplyDelta(scanFromAddress, recoverFromAddress, untilAddress, nextVersion, options, deltaLog, recoverTo, endPage, snapshotEndPage, capacity, numPagesToReadPerIteration, recoveryStatus, page, end); } await WaitUntilAllPagesHaveBeenFlushedAsync(startPage, endPage, recoveryStatus, cancellationToken).ConfigureAwait(false); recoveryStatus.Dispose(); } - private void ApplyDelta(long scanFromAddress, long recoverFromAddress, long untilAddress, long nextVersion, RecoveryOptions options, DeltaLog deltaLog, long recoverTo, long endPage, long snapshotEndPage, int capacity, RecoveryStatus recoveryStatus, long page, long end) + private void ApplyDelta(long scanFromAddress, long recoverFromAddress, long untilAddress, long nextVersion, RecoveryOptions options, DeltaLog deltaLog, long recoverTo, long endPage, long snapshotEndPage, int capacity, int numPagesToRead, RecoveryStatus recoveryStatus, long page, long end) { hlog.ApplyDelta(deltaLog, page, end, recoverTo); @@ -824,13 +892,10 @@ private void ApplyDelta(long scanFromAddress, long recoverFromAddress, long unti ProcessReadSnapshotPage(recoverFromAddress, untilAddress, nextVersion, options, recoveryStatus, p, pageIndex); // Issue next read - if (p + capacity < endPage) + if (p + numPagesToRead < endPage) { // Flush snapshot page to main log - // Flush callback will issue further reads or page clears recoveryStatus.flushStatus[pageIndex] = FlushStatus.Pending; - if (p + capacity < snapshotEndPage) - recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; hlog.AsyncFlushPages(p, 1, AsyncFlushPageCallbackForRecovery, recoveryStatus); } } @@ -856,7 +921,7 @@ private void GetSnapshotPageRangesToRead(long fromAddress, long untilAddress, lo recoveryDevice.Initialize(hlog.GetSegmentSize()); objectLogRecoveryDevice.Initialize(-1); - recoveryStatus = new RecoveryStatus(capacity, endPage, untilAddress, CheckpointType.Snapshot) + recoveryStatus = new RecoveryStatus(capacity, hlog.MinEmptyPageCount, endPage, untilAddress, CheckpointType.Snapshot) { recoveryDevice = recoveryDevice, objectLogRecoveryDevice = objectLogRecoveryDevice, @@ -866,7 +931,7 @@ private void GetSnapshotPageRangesToRead(long fromAddress, long untilAddress, lo // Initially issue read request for all pages that can be held in memory int totalPagesToRead = (int)(snapshotEndPage - startPage); - numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); + numPagesToReadFirst = Math.Min(capacity - hlog.MinEmptyPageCount, totalPagesToRead); } private void ProcessReadSnapshotPage(long fromAddress, long untilAddress, long nextVersion, RecoveryOptions options, RecoveryStatus recoveryStatus, long page, int pageIndex) @@ -993,7 +1058,6 @@ private unsafe bool RecoverFromPage(long startRecoveryAddress, return touched; } - private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, object context) { if (errorCode != 0) @@ -1007,29 +1071,12 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, ob if (Interlocked.Decrement(ref result.count) == 0) { int pageIndex = hlog.GetPageIndexForPage(result.page); + if (errorCode != 0) result.context.SignalFlushedError(pageIndex); else result.context.SignalFlushed(pageIndex); - if (result.page + result.context.capacity < result.context.endPage) - { - long readPage = result.page + result.context.capacity; - if (result.context.checkpointType == CheckpointType.FoldOver) - { - hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, hlog.AsyncReadPagesCallbackForRecovery, result.context); - } - else - { - if (readPage < result.context.snapshotEndPage) - { - // If next page is in snapshot, issue retrieval for it - hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, hlog.AsyncReadPagesCallbackForRecovery, - result.context, - result.context.recoveryDevicePageOffset, - result.context.recoveryDevice, result.context.objectLogRecoveryDevice); - } - } - } + result.Free(); } } @@ -1126,7 +1173,7 @@ private bool RestoreHybridLogInitializePages(long beginAddress, long headAddress tailPage = GetPage(fromAddress); headPage = GetPage(headAddress); - recoveryStatus = new RecoveryStatus(GetCapacityNumPages(), tailPage, untilAddress, 0); + recoveryStatus = new RecoveryStatus(GetCapacityNumPages(), MinEmptyPageCount, tailPage, untilAddress, 0); for (int i = 0; i < recoveryStatus.capacity; i++) { recoveryStatus.readStatus[i] = ReadStatus.Done; diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs index fcafda5577..47119a2704 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs @@ -33,6 +33,11 @@ public partial class TsavoriteKV : TsavoriteBase, IDisposable /// public long EntryCount => GetEntryCount(); + /// + /// Maximum number of memory pages ever allocated + /// + public long MaxAllocatedPageCount => hlog.MaxAllocatedPageCount; + /// /// Size of index in #cache lines (64 bytes each) /// diff --git a/playground/Embedded.perftest/EmbeddedRespServer.cs b/playground/Embedded.perftest/EmbeddedRespServer.cs index 9c0adc5117..30eb5af8b6 100644 --- a/playground/Embedded.perftest/EmbeddedRespServer.cs +++ b/playground/Embedded.perftest/EmbeddedRespServer.cs @@ -28,6 +28,8 @@ public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory /// public new void Dispose() => base.Dispose(); + public StoreWrapper StoreWrapper => storeWrapper; + /// /// Return a RESP session to this server /// diff --git a/test/Garnet.test/RespAdminCommandsTests.cs b/test/Garnet.test/RespAdminCommandsTests.cs index 72a1256b2c..23695cbb2f 100644 --- a/test/Garnet.test/RespAdminCommandsTests.cs +++ b/test/Garnet.test/RespAdminCommandsTests.cs @@ -217,6 +217,120 @@ public void SeSaveRecoverObjectTest() Assert.AreEqual(ldata, returnedData); } } + [Test] + [TestCase(63, 15, 1)] + [TestCase(63, 1, 1)] + [TestCase(16, 16, 1)] + [TestCase(5, 64, 1)] + public void SeSaveRecoverMultipleObjectsTest(int memorySize, int recoveryMemorySize, int pageSize) + { + string sizeToString(int size) => size + "k"; + + server.Dispose(); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, MemorySize: sizeToString(memorySize), PageSize: sizeToString(pageSize)); + server.Start(); + + var ldata = new RedisValue[] { "a", "b", "c", "d" }; + var ldataArr = ldata.Select(x => x).Reverse().ToArray(); + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) + { + var db = redis.GetDatabase(0); + for (int i = 0; i < 3000; i++) + { + var key = $"SeSaveRecoverTestKey{i:0000}"; + db.ListLeftPush(key, ldata); + var retval = db.ListRange(key); + Assert.AreEqual(ldataArr, retval, $"key {key}"); + } + + // Issue and wait for DB save + var server = redis.GetServer($"{TestUtils.Address}:{TestUtils.Port}"); + server.Save(SaveType.BackgroundSave); + while (server.LastSave().Ticks == DateTimeOffset.FromUnixTimeSeconds(0).Ticks) Thread.Sleep(10); + } + + server.Dispose(false); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, lowMemory: true, MemorySize: sizeToString(recoveryMemorySize), PageSize: sizeToString(pageSize)); + server.Start(); + + Assert.LessOrEqual(server.Provider.StoreWrapper.objectStore.MaxAllocatedPageCount, (recoveryMemorySize / pageSize) + 1); + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) + { + var db = redis.GetDatabase(0); + for (int i = 0; i < 3000; i++) + { + var key = $"SeSaveRecoverTestKey{i:0000}"; + var returnedData = db.ListRange(key); + Assert.AreEqual(ldataArr, returnedData, $"key {key}"); + } + } + } + + [Test] + [TestCase("63k", "15k")] + [TestCase("63k", "3k")] + [TestCase("63k", "1k")] + [TestCase("8k", "5k")] + [TestCase("16k", "16k")] + [TestCase("5k", "8k")] + [TestCase("5k", "64k")] + public void SeSaveRecoverMultipleKeysTest(string memorySize, string recoveryMemorySize) + { + bool disableObj = true; + + server.Dispose(); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, DisableObjects: disableObj, lowMemory: true, MemorySize: memorySize, PageSize: "1k", enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) + { + var db = redis.GetDatabase(0); + for (int i = 0; i < 1000; i++) + { + db.StringSet($"SeSaveRecoverTestKey{i:0000}", $"SeSaveRecoverTestValue"); + } + + for (int i = 0; i < 1000; i++) + { + var recoveredValue = db.StringGet($"SeSaveRecoverTestKey{i:0000}"); + Assert.AreEqual("SeSaveRecoverTestValue", recoveredValue.ToString()); + } + + var inforesult = db.Execute("INFO"); + + // Issue and wait for DB save + var server = redis.GetServer($"{TestUtils.Address}:{TestUtils.Port}"); + server.Save(SaveType.BackgroundSave); + while (server.LastSave().Ticks == DateTimeOffset.FromUnixTimeSeconds(0).Ticks) Thread.Sleep(10); + + for (int i = 1000; i < 2000; i++) + { + db.StringSet($"SeSaveRecoverTestKey{i:0000}", $"SeSaveRecoverTestValue"); + } + + for (int i = 1000; i < 2000; i++) + { + var recoveredValue = db.StringGet($"SeSaveRecoverTestKey{i:0000}"); + Assert.AreEqual("SeSaveRecoverTestValue", recoveredValue.ToString()); + } + + db.Execute("COMMITAOF"); + } + + server.Dispose(false); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, DisableObjects: disableObj, tryRecover: true, lowMemory: true, MemorySize: recoveryMemorySize, PageSize: "1k", enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) + { + var db = redis.GetDatabase(0); + for (int i = 0; i < 2000; i++) + { + var recoveredValue = db.StringGet($"SeSaveRecoverTestKey{i:0000}"); + Assert.AreEqual("SeSaveRecoverTestValue", recoveredValue.ToString(), $"Key SeSaveRecoverTestKey{i:0000}"); + } + } + } [Test] public void SeAofRecoverTest() diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index de33187eb1..305aa5fb79 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -36,6 +36,11 @@ internal static class TestUtils /// public static int Port = 33278; + /// + /// Whether to use a test progress logger + /// + static readonly bool useTestLogger = false; + internal static string AzureTestContainer { get @@ -197,7 +202,20 @@ public static GarnetServer CreateGarnetServer( opts.PageSize = opts.ObjectStorePageSize = PageSize == default ? "512" : PageSize; } - return new GarnetServer(opts); + if (useTestLogger) + { + var loggerFactory = LoggerFactory.Create(builder => + { + builder.AddProvider(new NUnitLoggerProvider(TestContext.Progress, "GarnetServer", null, false, false, LogLevel.Trace)); + builder.SetMinimumLevel(LogLevel.Trace); + }); + + return new GarnetServer(opts, loggerFactory); + } + else + { + return new GarnetServer(opts); + } } ///