From 8f525d782eceae5473c4d3392522ad675ef4d9eb Mon Sep 17 00:00:00 2001 From: Jimmy Date: Fri, 19 Jul 2024 01:09:21 +0800 Subject: [PATCH 1/7] add leveldb thread UT. --- .../StoreTest.MultiThread.cs | 132 ++++++++++++++++++ tests/Neo.Plugins.Storage.Tests/StoreTest.cs | 2 +- 2 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs diff --git a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs new file mode 100644 index 0000000000..c507e03c5d --- /dev/null +++ b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs @@ -0,0 +1,132 @@ +// Copyright (C) 2015-2024 The Neo Project. +// +// StoreSnapshotTest.cs file belongs to the neo project and is free +// software distributed under the MIT software license, see the +// accompanying file LICENSE in the main directory of the +// repository or http://www.opensource.org/licenses/mit-license.php +// for more details. +// +// Redistribution and use in source and binary forms with or without +// modifications are permitted. + +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Neo.IO.Data.LevelDB; +using System; +using System.Linq; +using System.Threading.Tasks; + +namespace Neo.Plugins.Storage.Tests; + +partial class StoreTest +{ + + [TestMethod] + [ExpectedException(typeof(AggregateException))] + public void TestMultiThreadLevelDbSnapshotPut() + { + using var store = levelDbStore.GetStore(path_leveldb); + var snapshot = store.GetSnapshot(); + var testKey = new byte[] { 0x01, 0x02, 0x03 }; + + var tasks = new Task[100]; + for (var i = 0; i < tasks.Length; i++) + { + var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; + tasks[i] = Task.Run(() => + { + snapshot.Put(testKey, value); + snapshot.Commit(); + }); + } + Task.WaitAll(tasks); + snapshot.Dispose(); + } + + [TestMethod] + public void TestMultiThreadLevelDbSnapshotPutWithoutCommit() + { + using var store = levelDbStore.GetStore(path_leveldb); + var snapshot = store.GetSnapshot(); + var testKey = new byte[] { 0x01, 0x02, 0x03 }; + + var tasks = new Task[100]; + for (var i = 0; i < tasks.Length; i++) + { + var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; + tasks[i] = Task.Run(() => + { + snapshot.Put(testKey, value); + }); + } + + try + { + Task.WaitAll(tasks); + snapshot.Commit(); + } + catch (AggregateException ae) + { + var innerExceptions = ae.InnerExceptions; + var hasExpectedException = innerExceptions.Any(innerException => innerException is AggregateException or LevelDBException); + + if (!hasExpectedException) + { + // Re-throw if none of the expected exceptions were found + throw; + } + } + finally + { + snapshot.Dispose(); + } + } + + + [TestMethod] + public void TestMultiThreadLevelDbSnapshotPutWithLocker() + { + using var store = levelDbStore.GetStore(path_leveldb); + + object locker = new(); + var snapshot = store.GetSnapshot(); + + var testKey = new byte[] { 0x01, 0x02, 0x03 }; + + var tasks = new Task[100]; + for (var i = 0; i < tasks.Length; i++) + { + var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; + tasks[i] = Task.Run(() => + { + lock (locker) + { + snapshot.Put(testKey, value); + snapshot.Commit(); + } + }); + } + Task.WaitAll(tasks); + snapshot.Dispose(); + } + + [TestMethod] + public void TestOneSnapshotPerThreadLevelDbSnapshotPut() + { + using var store = levelDbStore.GetStore(path_leveldb); + var testKey = new byte[] { 0x01, 0x02, 0x03 }; + + var tasks = new Task[1000]; + for (var i = 0; i < tasks.Length; i++) + { + var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; + tasks[i] = Task.Run(() => + { + var snapshot = store.GetSnapshot(); + snapshot.Put(testKey, value); + snapshot.Commit(); + snapshot.Dispose(); + }); + } + Task.WaitAll(tasks); + } +} diff --git a/tests/Neo.Plugins.Storage.Tests/StoreTest.cs b/tests/Neo.Plugins.Storage.Tests/StoreTest.cs index 189e212bc2..7d6e914226 100644 --- a/tests/Neo.Plugins.Storage.Tests/StoreTest.cs +++ b/tests/Neo.Plugins.Storage.Tests/StoreTest.cs @@ -17,7 +17,7 @@ namespace Neo.Plugins.Storage.Tests { [TestClass] - public class StoreTest + public partial class StoreTest { private const string path_leveldb = "Data_LevelDB_UT"; private const string path_rocksdb = "Data_RocksDB_UT"; From 8b3c1db11c1555cb5ede1bfd7d8d545b56ed3b0c Mon Sep 17 00:00:00 2001 From: Jimmy Date: Fri, 19 Jul 2024 22:06:27 +0800 Subject: [PATCH 2/7] fix tests and add comments --- .../StoreTest.MultiThread.cs | 158 +++++++++++++----- 1 file changed, 112 insertions(+), 46 deletions(-) diff --git a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs index c507e03c5d..60dc9b92c4 100644 --- a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs +++ b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs @@ -12,92 +12,147 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using Neo.IO.Data.LevelDB; using System; -using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace Neo.Plugins.Storage.Tests; +/* + * LevelDB Thread Safety Explanation: + * + * LevelDB is designed to be a fast key-value storage library. However, it has + * some limitations regarding thread safety. Specifically, LevelDB is not thread-safe + * when multiple threads are attempting to write to the database concurrently. This can + * lead to data corruption, crashes, and other undefined behaviors. + * + * LevelDB provides snapshots and batch writes. Snapshots allow + * a consistent view of the database at a point in time, but they are not designed for + * concurrent write operations. Batch writes can be used to perform atomic updates, + * but they also need to be managed carefully to avoid concurrency issues. + * + * In this test class, we demonstrate these thread safety issues and how to mitigate + * them using different approaches such as locking mechanisms and creating separate + * snapshots for each thread. + */ partial class StoreTest { - [TestMethod] [ExpectedException(typeof(AggregateException))] public void TestMultiThreadLevelDbSnapshotPut() { using var store = levelDbStore.GetStore(path_leveldb); - var snapshot = store.GetSnapshot(); + using var snapshot = store.GetSnapshot(); var testKey = new byte[] { 0x01, 0x02, 0x03 }; - var tasks = new Task[100]; - for (var i = 0; i < tasks.Length; i++) + var threadCount = 1; + while (true) { - var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; - tasks[i] = Task.Run(() => + var tasks = new Task[threadCount]; + try { - snapshot.Put(testKey, value); - snapshot.Commit(); - }); + for (var i = 0; i < tasks.Length; i++) + { + var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; + tasks[i] = Task.Run(() => + { + // Introduce delay to increase conflict chance + Thread.Sleep(new Random().Next(1, 10)); + // Attempt to write to the snapshot and commit + snapshot.Put(testKey, value); + snapshot.Commit(); + }); + } + + // Wait for all tasks to complete + Task.WaitAll(tasks); + threadCount++; + } + catch (AggregateException) + { + // AggregateException is expected due to concurrent access + Console.WriteLine($"AggregateException caught with {threadCount} threads."); + throw; + } + catch (LevelDBException ex) + { + // LevelDBException is also possible due to LevelDB being thread-unsafe + Console.WriteLine($"LevelDBException caught with {threadCount} threads: {ex.Message}"); + break; + } + catch (Exception ex) + { + Assert.Fail("Unexpected exception: " + ex.Message); + } } - Task.WaitAll(tasks); - snapshot.Dispose(); } [TestMethod] - public void TestMultiThreadLevelDbSnapshotPutWithoutCommit() + public void TestMultiThreadLevelDbSnapshotPutUntilException() { using var store = levelDbStore.GetStore(path_leveldb); - var snapshot = store.GetSnapshot(); + using var snapshot = store.GetSnapshot(); var testKey = new byte[] { 0x01, 0x02, 0x03 }; - var tasks = new Task[100]; - for (var i = 0; i < tasks.Length; i++) + var threadCount = 1; + while (true) { - var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; - tasks[i] = Task.Run(() => + var tasks = new Task[threadCount]; + try { - snapshot.Put(testKey, value); - }); - } + for (var i = 0; i < tasks.Length; i++) + { + var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; + tasks[i] = Task.Run(() => + { + // Introduce delay to increase conflict chance + Thread.Sleep(new Random().Next(1, 100)); + // Attempt to write to the snapshot without committing + snapshot.Put(testKey, value); + }); + } - try - { - Task.WaitAll(tasks); - snapshot.Commit(); - } - catch (AggregateException ae) - { - var innerExceptions = ae.InnerExceptions; - var hasExpectedException = innerExceptions.Any(innerException => innerException is AggregateException or LevelDBException); + // Wait for all tasks to complete + Task.WaitAll(tasks); - if (!hasExpectedException) + // Attempt to commit the changes + snapshot.Commit(); + threadCount++; + } + catch (AggregateException ex) { - // Re-throw if none of the expected exceptions were found - throw; + // AggregateException is expected due to concurrent access + Console.WriteLine($"AggregateException caught with {threadCount} threads."); + break; + } + catch (LevelDBException ex) + { + // LevelDBException is also possible due to LevelDB being thread-unsafe + Console.WriteLine($"LevelDBException caught with {threadCount} threads."); + break; + } + catch (Exception ex) + { + Assert.Fail("Unexpected exception: " + ex.Message); } - } - finally - { - snapshot.Dispose(); } } - [TestMethod] public void TestMultiThreadLevelDbSnapshotPutWithLocker() { using var store = levelDbStore.GetStore(path_leveldb); object locker = new(); - var snapshot = store.GetSnapshot(); - var testKey = new byte[] { 0x01, 0x02, 0x03 }; - var tasks = new Task[100]; + var tasks = new Task[10]; for (var i = 0; i < tasks.Length; i++) { var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; tasks[i] = Task.Run(() => { + using var snapshot = store.GetSnapshot(); + // Use a lock to ensure thread-safe access to the snapshot lock (locker) { snapshot.Put(testKey, value); @@ -105,8 +160,9 @@ public void TestMultiThreadLevelDbSnapshotPutWithLocker() } }); } + + // Wait for all tasks to complete Task.WaitAll(tasks); - snapshot.Dispose(); } [TestMethod] @@ -121,12 +177,22 @@ public void TestOneSnapshotPerThreadLevelDbSnapshotPut() var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; tasks[i] = Task.Run(() => { - var snapshot = store.GetSnapshot(); - snapshot.Put(testKey, value); - snapshot.Commit(); - snapshot.Dispose(); + try + { + // Create a new snapshot for each thread to avoid concurrent access issues + using var snapshot = store.GetSnapshot(); + snapshot.Put(testKey, value); + snapshot.Commit(); + } + catch (Exception ex) + { + Console.WriteLine($"Task {i} encountered an exception: {ex}"); + throw; + } }); } + + // Wait for all tasks to complete Task.WaitAll(tasks); } } From 66a2298ce44ca65a009532b52f077eada0f5001c Mon Sep 17 00:00:00 2001 From: Jimmy Date: Fri, 19 Jul 2024 22:14:31 +0800 Subject: [PATCH 3/7] add readme --- src/Plugins/LevelDBStore/readme.md | 113 +++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 src/Plugins/LevelDBStore/readme.md diff --git a/src/Plugins/LevelDBStore/readme.md b/src/Plugins/LevelDBStore/readme.md new file mode 100644 index 0000000000..2cd238a46a --- /dev/null +++ b/src/Plugins/LevelDBStore/readme.md @@ -0,0 +1,113 @@ +Sure, here's the revised README document in a more natural, native English style: + +--- + +# LevelDB and Snapshot Thread Safety Issues + +## Overview + +LevelDB is a fast key-value storage library developed by Google. It's designed to provide high performance and support for high concurrency applications. While LevelDB excels in single-threaded environments, it has some limitations when it comes to multi-threaded operations. + +## What is LevelDB? + +LevelDB offers the following key features: + +- **Key-Value Storage**: Supports storage of keys and values of any size. +- **Ordered Storage**: Key-value pairs are sorted by the key in lexicographical order. +- **Efficient Read/Write**: Optimized for disk access to improve read/write performance. +- **Snapshots**: Provides a consistent view of the database at a specific point in time. +- **Batch Writes**: Allows multiple write operations to be grouped into a single atomic operation. + +## Snapshots + +LevelDB snapshots let you capture the state of the database at a specific moment. This means you can read data from a snapshot without worrying about changes occurring during the read. Snapshots are great for read-only operations that need a consistent view of the data. + +## Thread Safety Issues + +Despite its many strengths, LevelDB has some limitations regarding multi-threaded operations, especially with writes. + +### Multi-Threaded Writes + +LevelDB is not thread-safe when it comes to concurrent writes. Multiple threads trying to write to the database at the same time can lead to data corruption, crashes, and other undefined behaviors. Thus, writing to the same database from multiple threads without proper synchronization is unsafe. + +### Snapshot Thread Safety + +Snapshots, while useful for consistent reads, are not designed for concurrent write operations. Here are the key issues: + +1. **Concurrent Write Conflicts**: Multiple threads attempting to write to a snapshot can result in data inconsistencies or corruption. +2. **Lack of Thread Safety**: Snapshots are not inherently thread-safe, so concurrent operations on a single snapshot can lead to unpredictable behavior. + +```text +A database may only be opened by one process at a time. The leveldb implementation acquires a lock from the operating system to prevent misuse. Within a single process, the same leveldb::DB object may be safely shared by multiple concurrent threads. I.e., different threads may write into or fetch iterators or call Get on the same database without any external synchronization (the leveldb implementation will automatically do the required synchronization). However other objects (like Iterator and WriteBatch) may require external synchronization. If two threads share such an object, they must protect access to it using their own locking protocol. More details are available in the public header files. +``` + +## Example + +Here's an example test class demonstrating the thread safety issues with LevelDB and snapshots: + +```csharp +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Neo.IO.Data.LevelDB; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Neo.Plugins.Storage.Tests +{ + [TestClass] + public partial class StoreTest + { + [TestMethod] + [ExpectedException(typeof(AggregateException))] + public void TestMultiThreadLevelDbSnapshotPut() + { + using var store = levelDbStore.GetStore(path_leveldb); + using var snapshot = store.GetSnapshot(); + var testKey = new byte[] { 0x01, 0x02, 0x03 }; + + var threadCount = 1; + while (true) + { + var tasks = new Task[threadCount]; + try + { + for (var i = 0; i < tasks.Length; i++) + { + var value = new byte[] { 0x04, 0x05, 0x06, (byte)i }; + tasks[i] = Task.Run(() => + { + Thread.Sleep(new Random().Next(1, 10)); + snapshot.Put(testKey, value); + snapshot.Commit(); + }); + } + + Task.WaitAll(tasks); + threadCount++; + } + catch (AggregateException) + { + Console.WriteLine($"AggregateException caught with {threadCount} threads."); + throw; + } + catch (LevelDBException ex) + { + Console.WriteLine($"LevelDBException caught with {threadCount} threads: {ex.Message}"); + break; + } + catch (Exception ex) + { + Assert.Fail("Unexpected exception: " + ex.Message); + } + } + } + } +} +``` + +In this test, we increment the number of threads and demonstrate the exceptions that can occur when multiple threads write to a LevelDB snapshot. + +## References + +- [LevelDB Documentation](https://github.com/google/leveldb) +- [LevelDB Snapshots](https://github.com/google/leveldb/blob/main/doc/index.md) From c01ab9967b0fd018e740541239a441bc07573f61 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Sun, 21 Jul 2024 01:09:39 +0800 Subject: [PATCH 4/7] Update tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs Co-authored-by: Christopher Schuchardt --- tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs index 60dc9b92c4..146a606585 100644 --- a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs +++ b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs @@ -105,7 +105,7 @@ public void TestMultiThreadLevelDbSnapshotPutUntilException() tasks[i] = Task.Run(() => { // Introduce delay to increase conflict chance - Thread.Sleep(new Random().Next(1, 100)); + Thread.Sleep(Random.Shared.Next(1, 100)); // Attempt to write to the snapshot without committing snapshot.Put(testKey, value); }); From 780fd6f4d79243a5850415808bc7f84e264a4e43 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Sun, 21 Jul 2024 01:09:46 +0800 Subject: [PATCH 5/7] Update tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs Co-authored-by: Christopher Schuchardt --- tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs index 146a606585..4e2ed99391 100644 --- a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs +++ b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs @@ -56,7 +56,7 @@ public void TestMultiThreadLevelDbSnapshotPut() tasks[i] = Task.Run(() => { // Introduce delay to increase conflict chance - Thread.Sleep(new Random().Next(1, 10)); + Thread.Sleep(Random.Shared.Next(1, 10)); // Attempt to write to the snapshot and commit snapshot.Put(testKey, value); snapshot.Commit(); From 96fd8d6c29ac5d881cdaa15b1816064a6d20d3ef Mon Sep 17 00:00:00 2001 From: Fernando Diaz Toledano Date: Wed, 11 Sep 2024 09:35:28 +0200 Subject: [PATCH 6/7] Assert fail with AggregateException --- .../StoreTest.MultiThread.cs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs index 4e2ed99391..14f455ff06 100644 --- a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs +++ b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs @@ -67,12 +67,6 @@ public void TestMultiThreadLevelDbSnapshotPut() Task.WaitAll(tasks); threadCount++; } - catch (AggregateException) - { - // AggregateException is expected due to concurrent access - Console.WriteLine($"AggregateException caught with {threadCount} threads."); - throw; - } catch (LevelDBException ex) { // LevelDBException is also possible due to LevelDB being thread-unsafe @@ -118,12 +112,6 @@ public void TestMultiThreadLevelDbSnapshotPutUntilException() snapshot.Commit(); threadCount++; } - catch (AggregateException ex) - { - // AggregateException is expected due to concurrent access - Console.WriteLine($"AggregateException caught with {threadCount} threads."); - break; - } catch (LevelDBException ex) { // LevelDBException is also possible due to LevelDB being thread-unsafe From 592b9a773417fcdfeb492353c90cc4adace5796f Mon Sep 17 00:00:00 2001 From: Jimmy Date: Fri, 27 Sep 2024 21:12:14 +0800 Subject: [PATCH 7/7] fix exception catch logic to make sure it can catch aggregated exception as well --- tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs index 14f455ff06..94974e53d7 100644 --- a/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs +++ b/tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs @@ -75,7 +75,9 @@ public void TestMultiThreadLevelDbSnapshotPut() } catch (Exception ex) { - Assert.Fail("Unexpected exception: " + ex.Message); + // It could be aggregated exception where LevelDBException is just one of them + Console.WriteLine("Unexpected exception: " + ex.Message); + throw; } } } @@ -120,7 +122,9 @@ public void TestMultiThreadLevelDbSnapshotPutUntilException() } catch (Exception ex) { - Assert.Fail("Unexpected exception: " + ex.Message); + // It could be aggregated exception where LevelDBException is just one of them + Console.WriteLine("Unexpected exception: " + ex.Message); + break; } } }