From 2536079f6f0dcfaf2a2274b1223571ab621130f3 Mon Sep 17 00:00:00 2001 From: mbdavid Date: Wed, 26 Jun 2024 10:27:23 -0300 Subject: [PATCH 1/2] Fix Person_Tests to run each test in a new database instance --- LiteDB.Tests/Query/OrderBy_Tests.cs | 21 ++++++++++++++++++++- LiteDB.Tests/Query/Person_Tests.cs | 21 ++++++++++++--------- LiteDB.Tests/Query/QueryApi_Tests.cs | 8 ++++++++ LiteDB.Tests/Query/Select_Tests.cs | 17 +++++++++++++++++ LiteDB.Tests/Query/Where_Tests.cs | 20 ++++++++++++++++++++ 5 files changed, 77 insertions(+), 10 deletions(-) diff --git a/LiteDB.Tests/Query/OrderBy_Tests.cs b/LiteDB.Tests/Query/OrderBy_Tests.cs index 09a084a65..e0e17a9e5 100644 --- a/LiteDB.Tests/Query/OrderBy_Tests.cs +++ b/LiteDB.Tests/Query/OrderBy_Tests.cs @@ -4,11 +4,15 @@ namespace LiteDB.Tests.QueryTest { - public class OrderBy_Tests : Person_Tests + public class OrderBy_Tests { [Fact] public void Query_OrderBy_Using_Index() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + collection.EnsureIndex(x => x.Name); var r0 = local @@ -27,6 +31,10 @@ public void Query_OrderBy_Using_Index() [Fact] public void Query_OrderBy_Using_Index_Desc() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + collection.EnsureIndex(x => x.Name); var r0 = local @@ -45,6 +53,10 @@ public void Query_OrderBy_Using_Index_Desc() [Fact] public void Query_OrderBy_With_Func() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + collection.EnsureIndex(x => x.Date.Day); var r0 = local @@ -63,6 +75,10 @@ public void Query_OrderBy_With_Func() [Fact] public void Query_OrderBy_With_Offset_Limit() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + // no index var r0 = local @@ -85,6 +101,9 @@ public void Query_OrderBy_With_Offset_Limit() [Fact] public void Query_Asc_Desc() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var asc = collection.Find(Query.All(Query.Ascending)).ToArray(); var desc = collection.Find(Query.All(Query.Descending)).ToArray(); diff --git a/LiteDB.Tests/Query/Person_Tests.cs b/LiteDB.Tests/Query/Person_Tests.cs index d55ee1053..d0fb6a852 100644 --- a/LiteDB.Tests/Query/Person_Tests.cs +++ b/LiteDB.Tests/Query/Person_Tests.cs @@ -5,23 +5,26 @@ namespace LiteDB.Tests.QueryTest { public class Person_Tests : IDisposable { - protected readonly Person[] local; - - protected ILiteDatabase db; - protected ILiteCollection collection; + private readonly Person[] _local; + private readonly ILiteDatabase _db; + private readonly ILiteCollection _collection; public Person_Tests() { - this.local = DataGen.Person().ToArray(); + this._local = DataGen.Person().ToArray(); - db = new LiteDatabase(":memory:"); - collection = db.GetCollection("person"); - collection.Insert(this.local); + _db = new LiteDatabase(":memory:"); + _collection = _db.GetCollection("person"); + _collection.Insert(this._local); } + public ILiteCollection GetCollection() => _collection; + + public Person[] GetLocal() => _local; + public void Dispose() { - db?.Dispose(); + _db?.Dispose(); } } } \ No newline at end of file diff --git a/LiteDB.Tests/Query/QueryApi_Tests.cs b/LiteDB.Tests/Query/QueryApi_Tests.cs index d92ece994..a6a158d20 100644 --- a/LiteDB.Tests/Query/QueryApi_Tests.cs +++ b/LiteDB.Tests/Query/QueryApi_Tests.cs @@ -10,6 +10,10 @@ public class QueryApi_Tests : Person_Tests [Fact] public void Query_And() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + var r0 = local.Where(x => x.Age == 22 && x.Active == true).ToArray(); var r1 = collection.Find(Query.And(Query.EQ("Age", 22), Query.EQ("Active", true))).ToArray(); @@ -20,6 +24,10 @@ public void Query_And() [Fact] public void Query_And_Same_Field() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + var r0 = local.Where(x => x.Age > 22 && x.Age < 25).ToArray(); var r1 = collection.Find(Query.And(Query.GT("Age", 22), Query.LT("Age", 25))).ToArray(); diff --git a/LiteDB.Tests/Query/Select_Tests.cs b/LiteDB.Tests/Query/Select_Tests.cs index 66117640c..b2e67342d 100644 --- a/LiteDB.Tests/Query/Select_Tests.cs +++ b/LiteDB.Tests/Query/Select_Tests.cs @@ -10,6 +10,10 @@ public class Select_Tests : Person_Tests [Fact] public void Query_Select_Key_Only() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + collection.EnsureIndex(x => x.Address.City); // must orderBy mem data because index will be sorted @@ -30,6 +34,10 @@ public void Query_Select_Key_Only() [Fact] public void Query_Select_New_Document() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + var r0 = local .Select(x => new {city = x.Address.City.ToUpper(), phone0 = x.Phones[0], address = new Address {Street = x.Name}}) .ToArray(); @@ -49,6 +57,9 @@ public void Query_Select_New_Document() [Fact] public void Query_Or_With_Null() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var r = collection.Find(Query.Or( Query.GTE("Date", new DateTime(2001, 1, 1)), Query.EQ("Date", null) @@ -58,6 +69,10 @@ public void Query_Or_With_Null() [Fact] public void Query_Find_All_Predicate() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + var r = collection.Find(x => true).ToArray(); r.Should().HaveCount(1000); @@ -66,6 +81,8 @@ public void Query_Find_All_Predicate() [Fact] public void Query_With_No_Collection() { + using var db = new LiteDatabase(":memory:"); + using (var r = db.Execute("SELECT DAY(NOW()) as DIA")) { while(r.Read()) diff --git a/LiteDB.Tests/Query/Where_Tests.cs b/LiteDB.Tests/Query/Where_Tests.cs index e411bc30f..07a2eb955 100644 --- a/LiteDB.Tests/Query/Where_Tests.cs +++ b/LiteDB.Tests/Query/Where_Tests.cs @@ -15,6 +15,10 @@ class Entity [Fact] public void Query_Where_With_Parameter() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + var r0 = local .Where(x => x.Address.State == "FL") .ToArray(); @@ -29,6 +33,10 @@ public void Query_Where_With_Parameter() [Fact] public void Query_Multi_Where_With_Like() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + var r0 = local .Where(x => x.Age >= 10 && x.Age <= 40) .Where(x => x.Name.StartsWith("Ge")) @@ -45,6 +53,10 @@ public void Query_Multi_Where_With_Like() [Fact] public void Query_Single_Where_With_And() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + var r0 = local .Where(x => x.Age == 25 && x.Active) .ToArray(); @@ -59,6 +71,10 @@ public void Query_Single_Where_With_And() [Fact] public void Query_Single_Where_With_Or_And_In() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + var r0 = local .Where(x => x.Age == 25 || x.Age == 26 || x.Age == 27) .ToArray(); @@ -78,6 +94,10 @@ public void Query_Single_Where_With_Or_And_In() [Fact] public void Query_With_Array_Ids() { + using var db = new Person_Tests(); + var collection = db.GetCollection(); + var local = db.GetLocal(); + var ids = new int[] { 1, 2, 3 }; var r0 = local From 198f0a672fb71480a8215f3f365f80e480a33965 Mon Sep 17 00:00:00 2001 From: mbdavid Date: Wed, 26 Jun 2024 11:02:39 -0300 Subject: [PATCH 2/2] Remove DiskWriterQueue and use same thread to write on disk --- LiteDB.Tests/Internals/CacheAsync_Tests.cs | 4 +- LiteDB.Tests/Internals/Disk_Tests.cs | 5 +- LiteDB.Tests/Query/Data/PersonGroupByData.cs | 32 ++++ .../PersonQueryData.cs} | 12 +- LiteDB.Tests/Query/GroupBy_Tests.cs | 62 +++----- LiteDB.Tests/Query/OrderBy_Tests.cs | 25 ++- LiteDB.Tests/Query/QueryApi_Tests.cs | 12 +- LiteDB.Tests/Query/Select_Tests.cs | 21 ++- LiteDB.Tests/Query/Where_Tests.cs | 27 ++-- LiteDB/Engine/Disk/DiskService.cs | 91 ++++++----- LiteDB/Engine/Disk/DiskWriterQueue.cs | 142 ------------------ .../Engine/Disk/StreamFactory/StreamPool.cs | 2 +- LiteDB/Engine/Engine/Transaction.cs | 2 +- LiteDB/Engine/LiteEngine.cs | 13 +- LiteDB/Engine/Services/TransactionService.cs | 4 +- LiteDB/Engine/Services/WalIndexService.cs | 11 +- LiteDB/Engine/Sort/SortDisk.cs | 2 +- .../Engine/SystemCollections/SysDatabase.cs | 5 +- 18 files changed, 152 insertions(+), 320 deletions(-) create mode 100644 LiteDB.Tests/Query/Data/PersonGroupByData.cs rename LiteDB.Tests/Query/{Person_Tests.cs => Data/PersonQueryData.cs} (62%) delete mode 100644 LiteDB/Engine/Disk/DiskWriterQueue.cs diff --git a/LiteDB.Tests/Internals/CacheAsync_Tests.cs b/LiteDB.Tests/Internals/CacheAsync_Tests.cs index 59199725d..5f541f881 100644 --- a/LiteDB.Tests/Internals/CacheAsync_Tests.cs +++ b/LiteDB.Tests/Internals/CacheAsync_Tests.cs @@ -39,14 +39,12 @@ void serialize(ManualResetEventSlim toBlock, ManualResetEventSlim toFree) // test starts here!!! var p0 = new HeaderPage(r.NewPage(), 0); - disk.WriteAsync(new PageBuffer[] {p0.UpdateBuffer()}); + disk.WriteLogDisk(new PageBuffer[] { p0.UpdateBuffer() }); // (1 ->) jump to thread B serialize(wa, wb); // (2 <-) continue from thread B - disk.Queue.Value.Wait(); - // (3 ->) jump to thread B serialize(wa, wb); }); diff --git a/LiteDB.Tests/Internals/Disk_Tests.cs b/LiteDB.Tests/Internals/Disk_Tests.cs index 142edaaa1..1a7928c9b 100644 --- a/LiteDB.Tests/Internals/Disk_Tests.cs +++ b/LiteDB.Tests/Internals/Disk_Tests.cs @@ -35,10 +35,7 @@ public void Disk_Read_Write() } // page will be saved in LOG file in PagePosition order (0-99) - disk.WriteAsync(pages); - - // wait for async queue writes - disk.Queue.Value.Wait(); + disk.WriteLogDisk(pages); // after release, no page can be read/write pages.Clear(); diff --git a/LiteDB.Tests/Query/Data/PersonGroupByData.cs b/LiteDB.Tests/Query/Data/PersonGroupByData.cs new file mode 100644 index 000000000..7e7a31cf6 --- /dev/null +++ b/LiteDB.Tests/Query/Data/PersonGroupByData.cs @@ -0,0 +1,32 @@ +using FluentAssertions; +using System; +using System.IO; +using System.Linq; +using Xunit; + +namespace LiteDB.Tests.QueryTest +{ + public class PersonGroupByData : IDisposable + { + private readonly Person[] _local; + private readonly ILiteDatabase _db; + private readonly ILiteCollection _collection; + + public PersonGroupByData() + { + _local = DataGen.Person(1, 1000).ToArray(); + _db = new LiteDatabase(new MemoryStream()); + _collection = _db.GetCollection(); + + _collection.Insert(_local); + _collection.EnsureIndex(x => x.Age); + } + + public (ILiteCollection, Person[]) GetData() => (_collection, _local); + + public void Dispose() + { + _db.Dispose(); + } + } +} \ No newline at end of file diff --git a/LiteDB.Tests/Query/Person_Tests.cs b/LiteDB.Tests/Query/Data/PersonQueryData.cs similarity index 62% rename from LiteDB.Tests/Query/Person_Tests.cs rename to LiteDB.Tests/Query/Data/PersonQueryData.cs index d0fb6a852..2e266b0cf 100644 --- a/LiteDB.Tests/Query/Person_Tests.cs +++ b/LiteDB.Tests/Query/Data/PersonQueryData.cs @@ -3,28 +3,26 @@ namespace LiteDB.Tests.QueryTest { - public class Person_Tests : IDisposable + public class PersonQueryData : IDisposable { private readonly Person[] _local; private readonly ILiteDatabase _db; private readonly ILiteCollection _collection; - public Person_Tests() + public PersonQueryData() { - this._local = DataGen.Person().ToArray(); + _local = DataGen.Person().ToArray(); _db = new LiteDatabase(":memory:"); _collection = _db.GetCollection("person"); _collection.Insert(this._local); } - public ILiteCollection GetCollection() => _collection; - - public Person[] GetLocal() => _local; + public (ILiteCollection, Person[]) GetData() => (_collection, _local); public void Dispose() { - _db?.Dispose(); + _db.Dispose(); } } } \ No newline at end of file diff --git a/LiteDB.Tests/Query/GroupBy_Tests.cs b/LiteDB.Tests/Query/GroupBy_Tests.cs index a388cc48a..8f84dd430 100644 --- a/LiteDB.Tests/Query/GroupBy_Tests.cs +++ b/LiteDB.Tests/Query/GroupBy_Tests.cs @@ -6,46 +6,33 @@ namespace LiteDB.Tests.QueryTest { - public class GroupBy_Tests : IDisposable + public class GroupBy_Tests { - private readonly Person[] local; - - private readonly ILiteDatabase db; - private readonly ILiteCollection collection; - - public GroupBy_Tests() - { - local = DataGen.Person(1, 1000).ToArray(); - - db = new LiteDatabase(new MemoryStream()); - collection = db.GetCollection(); - - collection.Insert(local); - collection.EnsureIndex(x => x.Age); - } - - [Fact(Skip = "Commented out")] + [Fact(Skip = "Missing implement LINQ for GroupBy")] public void Query_GroupBy_Age_With_Count() { - //** var r0 = local - //** .GroupBy(x => x.Age) - //** .Select(x => new { Age = x.Key, Count = x.Count() }) - //** .OrderBy(x => x.Age) - //** .ToArray(); + //**using var db = new PersonGroupByData(); + //**var (collection, local) = db.GetData(); //** - //** var r1 = collection.Query() - //** .GroupBy(x => x.Age) - //** .Select(x => new { Age = x.Key, Count = x.Count() }) - //** .ToArray(); + //**var r0 = local + //** .GroupBy(x => x.Age) + //** .Select(x => new { Age = x.Key, Count = x.Count() }) + //** .OrderBy(x => x.Age) + //** .ToArray(); //** - //** foreach (var r in r0.Zip(r1, (l, r) => new { left = l, right = r })) - //** { - //** r.left.Age.Should().Be(r.right.Age); - //** r.left.Count.Should().Be(r.right.Count); - //** } + //**var r1 = collection.Query() + //** .GroupBy("$.Age") + //** .Select(x => new { Age = x.Key, Count = x.Count() }) + //** .ToArray(); + //** + //**foreach (var r in r0.Zip(r1, (l, r) => new { left = l, right = r })) + //**{ + //** r.left.Age.Should().Be(r.right.Age); + //** r.left.Count.Should().Be(r.right.Count); + //**} } - [Fact(Skip = "Commented out")] + [Fact(Skip = "Missing implement LINQ for GroupBy")] public void Query_GroupBy_Year_With_Sum_Age() { //** var r0 = local @@ -66,7 +53,7 @@ public void Query_GroupBy_Year_With_Sum_Age() //** } } - [Fact(Skip = "Commented out")] + [Fact(Skip = "Missing implement LINQ for GroupBy")] public void Query_GroupBy_Func() { //** var r0 = local @@ -87,7 +74,7 @@ public void Query_GroupBy_Func() //** } } - [Fact(Skip = "Commented out")] + [Fact(Skip = "Missing implement LINQ for GroupBy")] public void Query_GroupBy_With_Array_Aggregation() { //** // quite complex group by query @@ -113,10 +100,5 @@ public void Query_GroupBy_With_Array_Aggregation() //** Assert.Equal("Dahlia Warren", r[0].Users[0].Name); //** Assert.Equal(24, r[0].Users[0].Age); } - - public void Dispose() - { - db?.Dispose(); - } } } \ No newline at end of file diff --git a/LiteDB.Tests/Query/OrderBy_Tests.cs b/LiteDB.Tests/Query/OrderBy_Tests.cs index e0e17a9e5..83fb6c8e7 100644 --- a/LiteDB.Tests/Query/OrderBy_Tests.cs +++ b/LiteDB.Tests/Query/OrderBy_Tests.cs @@ -9,9 +9,8 @@ public class OrderBy_Tests [Fact] public void Query_OrderBy_Using_Index() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); collection.EnsureIndex(x => x.Name); @@ -31,9 +30,8 @@ public void Query_OrderBy_Using_Index() [Fact] public void Query_OrderBy_Using_Index_Desc() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); collection.EnsureIndex(x => x.Name); @@ -53,9 +51,8 @@ public void Query_OrderBy_Using_Index_Desc() [Fact] public void Query_OrderBy_With_Func() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); collection.EnsureIndex(x => x.Date.Day); @@ -75,9 +72,8 @@ public void Query_OrderBy_With_Func() [Fact] public void Query_OrderBy_With_Offset_Limit() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); // no index @@ -101,15 +97,14 @@ public void Query_OrderBy_With_Offset_Limit() [Fact] public void Query_Asc_Desc() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); + using var db = new PersonQueryData(); + var (collection, _) = db.GetData(); var asc = collection.Find(Query.All(Query.Ascending)).ToArray(); var desc = collection.Find(Query.All(Query.Descending)).ToArray(); asc[0].Id.Should().Be(1); desc[0].Id.Should().Be(1000); - } } } \ No newline at end of file diff --git a/LiteDB.Tests/Query/QueryApi_Tests.cs b/LiteDB.Tests/Query/QueryApi_Tests.cs index a6a158d20..c958074fa 100644 --- a/LiteDB.Tests/Query/QueryApi_Tests.cs +++ b/LiteDB.Tests/Query/QueryApi_Tests.cs @@ -5,14 +5,13 @@ namespace LiteDB.Tests.QueryTest { - public class QueryApi_Tests : Person_Tests + public class QueryApi_Tests : PersonQueryData { [Fact] public void Query_And() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); var r0 = local.Where(x => x.Age == 22 && x.Active == true).ToArray(); @@ -24,9 +23,8 @@ public void Query_And() [Fact] public void Query_And_Same_Field() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); var r0 = local.Where(x => x.Age > 22 && x.Age < 25).ToArray(); diff --git a/LiteDB.Tests/Query/Select_Tests.cs b/LiteDB.Tests/Query/Select_Tests.cs index b2e67342d..6049ad0ac 100644 --- a/LiteDB.Tests/Query/Select_Tests.cs +++ b/LiteDB.Tests/Query/Select_Tests.cs @@ -5,14 +5,13 @@ namespace LiteDB.Tests.QueryTest { - public class Select_Tests : Person_Tests + public class Select_Tests : PersonQueryData { [Fact] public void Query_Select_Key_Only() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); collection.EnsureIndex(x => x.Address.City); @@ -34,9 +33,8 @@ public void Query_Select_Key_Only() [Fact] public void Query_Select_New_Document() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); var r0 = local .Select(x => new {city = x.Address.City.ToUpper(), phone0 = x.Phones[0], address = new Address {Street = x.Name}}) @@ -57,8 +55,8 @@ public void Query_Select_New_Document() [Fact] public void Query_Or_With_Null() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); + using var db = new PersonQueryData(); + var (collection, _) = db.GetData(); var r = collection.Find(Query.Or( Query.GTE("Date", new DateTime(2001, 1, 1)), @@ -69,9 +67,8 @@ public void Query_Or_With_Null() [Fact] public void Query_Find_All_Predicate() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, _) = db.GetData(); var r = collection.Find(x => true).ToArray(); diff --git a/LiteDB.Tests/Query/Where_Tests.cs b/LiteDB.Tests/Query/Where_Tests.cs index 07a2eb955..00869f139 100644 --- a/LiteDB.Tests/Query/Where_Tests.cs +++ b/LiteDB.Tests/Query/Where_Tests.cs @@ -4,7 +4,7 @@ namespace LiteDB.Tests.QueryTest { - public class Where_Tests : Person_Tests + public class Where_Tests : PersonQueryData { class Entity { @@ -15,9 +15,8 @@ class Entity [Fact] public void Query_Where_With_Parameter() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); var r0 = local .Where(x => x.Address.State == "FL") @@ -33,9 +32,8 @@ public void Query_Where_With_Parameter() [Fact] public void Query_Multi_Where_With_Like() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); var r0 = local .Where(x => x.Age >= 10 && x.Age <= 40) @@ -53,9 +51,8 @@ public void Query_Multi_Where_With_Like() [Fact] public void Query_Single_Where_With_And() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); var r0 = local .Where(x => x.Age == 25 && x.Active) @@ -71,9 +68,8 @@ public void Query_Single_Where_With_And() [Fact] public void Query_Single_Where_With_Or_And_In() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); var r0 = local .Where(x => x.Age == 25 || x.Age == 26 || x.Age == 27) @@ -94,9 +90,8 @@ public void Query_Single_Where_With_Or_And_In() [Fact] public void Query_With_Array_Ids() { - using var db = new Person_Tests(); - var collection = db.GetCollection(); - var local = db.GetLocal(); + using var db = new PersonQueryData(); + var (collection, local) = db.GetData(); var ids = new int[] { 1, 2, 3 }; diff --git a/LiteDB/Engine/Disk/DiskService.cs b/LiteDB/Engine/Disk/DiskService.cs index 567689afd..73e7910b5 100644 --- a/LiteDB/Engine/Disk/DiskService.cs +++ b/LiteDB/Engine/Disk/DiskService.cs @@ -14,7 +14,6 @@ namespace LiteDB.Engine internal class DiskService : IDisposable { private readonly MemoryCache _cache; - private readonly Lazy _queue; private readonly EngineState _state; private IStreamFactory _dataFactory; @@ -22,6 +21,7 @@ internal class DiskService : IDisposable private StreamPool _dataPool; private readonly StreamPool _logPool; + private readonly Lazy _writer; private long _dataLength; private long _logLength; @@ -36,6 +36,7 @@ public DiskService( _cache = new MemoryCache(memorySegmentSizes); _state = state; + // get new stream factory based on settings _dataFactory = settings.CreateDataFactory(); _logFactory = settings.CreateLogFactory(); @@ -44,23 +45,23 @@ public DiskService( _dataPool = new StreamPool(_dataFactory, false); _logPool = new StreamPool(_logFactory, true); - var isNew = _dataFactory.GetLength() == 0L; + // get lazy disk writer (log file) - created only when used + _writer = _logPool.Writer; - // create lazy async writer queue for log file - _queue = new Lazy(() => new DiskWriterQueue(_logPool.Writer, state)); + var isNew = _dataFactory.GetLength() == 0L; // create new database if not exist yet if (isNew) { LOG($"creating new database: '{Path.GetFileName(_dataFactory.Name)}'", "DISK"); - this.Initialize(_dataPool.Writer, settings.Collation, settings.InitialSize); + this.Initialize(_dataPool.Writer.Value, settings.Collation, settings.InitialSize); } // if not readonly, force open writable datafile if (settings.ReadOnly == false) { - _ = _dataPool.Writer.CanRead; + _ = _dataPool.Writer.Value.CanRead; } // get initial data file length @@ -77,21 +78,11 @@ public DiskService( } } - /// - /// Get async queue writer - /// - public Lazy Queue => _queue; - /// /// Get memory cache instance /// public MemoryCache Cache => _cache; - /// - /// Get Stream pool used inside disk service - /// - public StreamPool GetPool(FileOrigin origin) => origin == FileOrigin.Data ? _dataPool : _logPool; - /// /// Create a new empty database (use synced mode) /// @@ -171,40 +162,55 @@ public PageBuffer NewPage() } /// - /// Write pages inside file origin using async queue - WORKS ONLY FOR LOG FILE - returns how many pages are inside "pages" + /// Write all pages inside log file in a thread safe operation /// - public int WriteAsync(IEnumerable pages) + public int WriteLogDisk(IEnumerable pages) { var count = 0; + var stream = _writer.Value; - foreach (var page in pages) + // do a global write lock - only 1 thread can write on disk at time + lock(stream) { - ENSURE(page.ShareCounter == BUFFER_WRITABLE, "to enqueue page, page must be writable"); + foreach (var page in pages) + { + ENSURE(page.ShareCounter == BUFFER_WRITABLE, "to enqueue page, page must be writable"); + + // adding this page into file AS new page (at end of file) + // must add into cache to be sure that new readers can see this page + page.Position = Interlocked.Add(ref _logLength, PAGE_SIZE); + + // should mark page origin to log because async queue works only for log file + // if this page came from data file, must be changed before MoveToReadable + page.Origin = FileOrigin.Log; + + // mark this page as readable and get cached paged to enqueue + var readable = _cache.MoveToReadable(page); - // adding this page into file AS new page (at end of file) - // must add into cache to be sure that new readers can see this page - page.Position = Interlocked.Add(ref _logLength, PAGE_SIZE); + // set log stream position to page + stream.Position = page.Position; - // should mark page origin to log because async queue works only for log file - // if this page came from data file, must be changed before MoveToReadable - page.Origin = FileOrigin.Log; +#if DEBUG + _state.SimulateDiskWriteFail?.Invoke(page); +#endif - // mark this page as readable and get cached paged to enqueue - var readable = _cache.MoveToReadable(page); + // and write to disk in a sync mode + stream.Write(page.Array, page.Offset, PAGE_SIZE); - _queue.Value.EnqueuePage(readable); + // release page here (no page use after this) + page.Release(); - count++; + count++; + } } return count; } /// - /// Get virtual file length: real file can be small because async thread can still writing on disk - /// and incrementing file size (Log file) + /// Get file length based on data/log length variables (no direct on disk) /// - public long GetVirtualLength(FileOrigin origin) + public long GetFileLength(FileOrigin origin) { if (origin == FileOrigin.Log) { @@ -252,7 +258,7 @@ public IEnumerable ReadFull(FileOrigin origin) try { // get length before starts (avoid grow during loop) - var length = this.GetVirtualLength(origin); + var length = this.GetFileLength(origin); stream.Position = 0; @@ -279,13 +285,11 @@ public IEnumerable ReadFull(FileOrigin origin) } /// - /// Write pages DIRECT in disk with NO queue. This pages are not cached and are not shared - WORKS FOR DATA FILE ONLY + /// Write pages DIRECT in disk. This pages are not cached and are not shared - WORKS FOR DATA FILE ONLY /// - public void Write(IEnumerable pages, FileOrigin origin) + public void WriteDataDisk(IEnumerable pages) { - ENSURE(origin == FileOrigin.Data); - - var stream = origin == FileOrigin.Data ? _dataPool.Writer : _logPool.Writer; + var stream = _dataPool.Writer.Value; foreach (var page in pages) { @@ -310,8 +314,6 @@ public void SetLength(long length, FileOrigin origin) if (origin == FileOrigin.Log) { - ENSURE(_queue.Value.Length == 0, "queue must be empty before set new length"); - Interlocked.Exchange(ref _logLength, length - PAGE_SIZE); } else @@ -319,7 +321,7 @@ public void SetLength(long length, FileOrigin origin) Interlocked.Exchange(ref _dataLength, length - PAGE_SIZE); } - stream.SetLength(length); + stream.Value.SetLength(length); } /// @@ -334,12 +336,9 @@ public string GetName(FileOrigin origin) public void Dispose() { - // dispose queue (wait finish) - if (_queue.IsValueCreated) _queue.Value.Dispose(); - // get stream length from writer - is safe because only this instance // can change file size - var delete = _logFactory.Exists() && _logPool.Writer.Length == 0; + var delete = _logFactory.Exists() && _logPool.Writer.Value.Length == 0; // dispose Stream pools _dataPool.Dispose(); diff --git a/LiteDB/Engine/Disk/DiskWriterQueue.cs b/LiteDB/Engine/Disk/DiskWriterQueue.cs deleted file mode 100644 index 40a3e024f..000000000 --- a/LiteDB/Engine/Disk/DiskWriterQueue.cs +++ /dev/null @@ -1,142 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -using static LiteDB.Constants; - -namespace LiteDB.Engine -{ - /// - /// Implement disk write queue and async writer thread - used only for write on LOG file - /// [ThreadSafe] - /// - internal class DiskWriterQueue : IDisposable - { - private readonly Stream _stream; - private readonly EngineState _state; - - // async thread controls - private Task _task; - private bool _shouldClose = false; - - private readonly ConcurrentQueue _queue = new ConcurrentQueue(); - private readonly object _queueSync = new object(); - private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent(); - private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true); - - private Exception _exception = null; // store last exception in async running task - - public DiskWriterQueue(Stream stream, EngineState state) - { - _stream = stream; - _state = state; - } - - /// - /// Get how many pages are waiting for store - /// - public int Length => _queue.Count; - - /// - /// Add page into writer queue and will be saved in disk by another thread. If page.Position = MaxValue, store at end of file (will get final Position) - /// After this method, this page will be available into reader as a clean page - /// - public void EnqueuePage(PageBuffer page) - { - ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file"); - - // throw last exception that stop running queue - if (_exception != null) throw _exception; - - _queueIsEmpty.Reset(); - _queue.Enqueue(page); - _queueHasItems.Set(); - - lock (_queueSync) - { - if (_task == null) - { - _task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning); - } - } - } - - /// - /// Wait until all queue be executed and no more pending pages are waiting for write - be sure you do a full lock database before call this - /// - public void Wait() - { - _queueIsEmpty.Wait(); - - ENSURE(_queue.Count == 0, "queue should be empty after wait() call"); - } - - /// - /// Execute all items in queue sync - /// - private void ExecuteQueue() - { - try - { - while (true) - { - if (_queue.TryDequeue(out var page)) - { - WritePageToStream(page); - } - else - { - - if (_queue.Count > 0) continue; - - _queueIsEmpty.Set(); - _queueHasItems.Reset(); - - if (_shouldClose) return; - - _stream.FlushToDisk(); - - _queueHasItems.WaitAsync().GetAwaiter().GetResult(); - } - } - } - catch (Exception ex) - { - _state.Handle(ex); - _exception = ex; - } - } - - private void WritePageToStream(PageBuffer page) - { - if (page == null) return; - - ENSURE(page.ShareCounter > 0, "page must be shared at least 1"); - - // set stream position according to page - _stream.Position = page.Position; - -#if DEBUG - _state.SimulateDiskWriteFail?.Invoke(page); -#endif - - _stream.Write(page.Array, page.Offset, PAGE_SIZE); - - // release page here (no page use after this) - page.Release(); - } - - public void Dispose() - { - LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK"); - - _shouldClose = true; - _queueHasItems.Set(); // unblock the running loop in case there are no items - - _task?.GetAwaiter().GetResult(); - _task = null; - } - } -} \ No newline at end of file diff --git a/LiteDB/Engine/Disk/StreamFactory/StreamPool.cs b/LiteDB/Engine/Disk/StreamFactory/StreamPool.cs index f7f1355d3..da378692e 100644 --- a/LiteDB/Engine/Disk/StreamFactory/StreamPool.cs +++ b/LiteDB/Engine/Disk/StreamFactory/StreamPool.cs @@ -32,7 +32,7 @@ public StreamPool(IStreamFactory factory, bool appendOnly) /// /// Get single Stream writer instance /// - public Stream Writer => _writer.Value; + public Lazy Writer => _writer; /// /// Rent a Stream reader instance diff --git a/LiteDB/Engine/Engine/Transaction.cs b/LiteDB/Engine/Engine/Transaction.cs index d5f5927ff..4db9f57b3 100644 --- a/LiteDB/Engine/Engine/Transaction.cs +++ b/LiteDB/Engine/Engine/Transaction.cs @@ -113,7 +113,7 @@ private void CommitAndReleaseTransaction(TransactionService transaction) // try checkpoint when finish transaction and log file are bigger than checkpoint pragma value (in pages) if (_header.Pragmas.Checkpoint > 0 && - _disk.GetVirtualLength(FileOrigin.Log) > (_header.Pragmas.Checkpoint * PAGE_SIZE)) + _disk.GetFileLength(FileOrigin.Log) > (_header.Pragmas.Checkpoint * PAGE_SIZE)) { _walIndex.TryCheckpoint(); } diff --git a/LiteDB/Engine/LiteEngine.cs b/LiteDB/Engine/LiteEngine.cs index ce5f1987a..59bb84b4c 100644 --- a/LiteDB/Engine/LiteEngine.cs +++ b/LiteDB/Engine/LiteEngine.cs @@ -140,7 +140,7 @@ internal bool Open() _walIndex = new WalIndexService(_disk, _locker); // if exists log file, restore wal index references (can update full _header instance) - if (_disk.GetVirtualLength(FileOrigin.Log) > 0) + if (_disk.GetFileLength(FileOrigin.Log) > 0) { _walIndex.RestoreIndex(ref _header); } @@ -186,12 +186,6 @@ internal List Close() // stop running all transactions tc.Catch(() => _monitor?.Dispose()); - // wait for writer queue - if (_disk != null && _disk.Queue.IsValueCreated) - { - tc.Catch(() => _disk.Queue.Value.Wait()); - } - if (_header?.Pragmas.Checkpoint > 0) { // do a soft checkpoint (only if exclusive lock is possible) @@ -228,11 +222,6 @@ internal List Close(Exception ex) tc.Catch(() => _monitor?.Dispose()); - if (_disk != null && _disk.Queue.IsValueCreated) - { - tc.Catch(() => _disk.Queue.Value.Dispose()); - } - // close disks streams tc.Catch(() => _disk?.Dispose()); diff --git a/LiteDB/Engine/Services/TransactionService.cs b/LiteDB/Engine/Services/TransactionService.cs index 31c2a6e42..4c139cd5d 100644 --- a/LiteDB/Engine/Services/TransactionService.cs +++ b/LiteDB/Engine/Services/TransactionService.cs @@ -232,7 +232,7 @@ IEnumerable source() // write all dirty pages, in sequence on log-file and store references into log pages on transPages // (works only for Write snapshots) - var count = _disk.WriteAsync(source()); + var count = _disk.WriteLogDisk(source()); // now, discard all clean pages (because those pages are writable and must be readable) // from write snapshots @@ -368,7 +368,7 @@ IEnumerable source() try { // write all pages (including new header) - _disk.WriteAsync(source()); + _disk.WriteLogDisk(source()); } catch { diff --git a/LiteDB/Engine/Services/WalIndexService.cs b/LiteDB/Engine/Services/WalIndexService.cs index b17d9da92..94185eb4c 100644 --- a/LiteDB/Engine/Services/WalIndexService.cs +++ b/LiteDB/Engine/Services/WalIndexService.cs @@ -246,7 +246,7 @@ public void RestoreIndex(ref HeaderPage header) public int Checkpoint() { // no log file or no confirmed transaction, just exit - if (_disk.GetVirtualLength(FileOrigin.Log) == 0 || _confirmTransactions.Count == 0) return 0; + if (_disk.GetFileLength(FileOrigin.Log) == 0 || _confirmTransactions.Count == 0) return 0; var mustExit = _locker.EnterExclusive(); @@ -269,7 +269,7 @@ public int Checkpoint() public int TryCheckpoint() { // no log file or no confirmed transaction, just exit - if (_disk.GetVirtualLength(FileOrigin.Log) == 0 || _confirmTransactions.Count == 0) return 0; + if (_disk.GetFileLength(FileOrigin.Log) == 0 || _confirmTransactions.Count == 0) return 0; if (_locker.TryEnterExclusive(out var mustExit) == false) return 0; @@ -295,13 +295,8 @@ private int CheckpointInternal() { LOG($"checkpoint", "WAL"); - // wait all pages write on disk - _disk.Queue.Value.Wait(); - var counter = 0; - ENSURE(_disk.Queue.Value.Length == 0, "no pages on queue when checkpoint"); - // getting all "good" pages from log file to be copied into data file IEnumerable source() { @@ -336,7 +331,7 @@ IEnumerable source() } // write all log pages into data file (sync) - _disk.Write(source(), FileOrigin.Data); + _disk.WriteDataDisk(source()); // clear log file, clear wal index, memory cache, this.Clear(); diff --git a/LiteDB/Engine/Sort/SortDisk.cs b/LiteDB/Engine/Sort/SortDisk.cs index 7641259d9..f43722eb6 100644 --- a/LiteDB/Engine/Sort/SortDisk.cs +++ b/LiteDB/Engine/Sort/SortDisk.cs @@ -83,7 +83,7 @@ public long GetContainerPosition() /// public void Write(long position, BufferSlice buffer) { - var writer = _pool.Writer; + var writer = _pool.Writer.Value; // there is only a single writer instance, must be lock to ensure only 1 single thread are writing lock(writer) diff --git a/LiteDB/Engine/SystemCollections/SysDatabase.cs b/LiteDB/Engine/SystemCollections/SysDatabase.cs index 1161e05d9..c08c234ab 100644 --- a/LiteDB/Engine/SystemCollections/SysDatabase.cs +++ b/LiteDB/Engine/SystemCollections/SysDatabase.cs @@ -24,9 +24,8 @@ private IEnumerable SysDatabase() ["creationTime"] = _header.CreationTime, - ["dataFileSize"] = (int)_disk.GetVirtualLength(FileOrigin.Data), - ["logFileSize"] = (int)_disk.GetVirtualLength(FileOrigin.Log), - ["asyncQueueLength"] = _disk.Queue.IsValueCreated ? _disk.Queue.Value.Length : 0, + ["dataFileSize"] = (int)_disk.GetFileLength(FileOrigin.Data), + ["logFileSize"] = (int)_disk.GetFileLength(FileOrigin.Log), ["currentReadVersion"] = _walIndex.CurrentReadVersion, ["lastTransactionID"] = _walIndex.LastTransactionID,