Skip to content

Commit

Permalink
Merge pull request #1 from KenyonY/fix/high_freq_writing_data_integrity
Browse files Browse the repository at this point in the history
fix: Fixed data integrity during high-frequency writing
  • Loading branch information
KenyonY authored Oct 29, 2023
2 parents c63d590 + 20d53a6 commit 9ff4a3f
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 77 deletions.
37 changes: 23 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@

**English** | [**简体中文**](./README_ZH.md)

<h1 align="center">
<br>
FlaxKV
<br>
</h1>


<p align="center">
Let you forget you're using a database.
Simple and high-performance persistent database solutions.
Let you forget you're using a database
Simple and high-performance persistent database solution
</p>
<p align="center">
<a href="https://pypi.org/project/flaxkv/">
Expand All @@ -29,15 +27,23 @@ Simple and high-performance persistent database solutions.
</a>
</p>

<h4 align="center">
<p>
<b>English</b> |
<a href="https://github.com/KenyonY/flaxkv/blob/main/README_ZH.md">简体中文</a>
</p>
</h4>

<p >
<br>
</p>



# FlaxKV

A persistent database masquerading as a dictionary.

The `flaxkv` module provides a dictionary-like interface for interacting with high-performance key-value databases (LMDB, LevelDB). It abstracts the complexities of direct database interaction, allowing users to perform CRUD (Create, Read, Update, Delete) operations in a simple and intuitive manner. You can use it just like a Python dictionary without worrying about it blocking your main process at any stage.
The `flaxkv` module provides a dictionary-like interface for interacting with high-performance key-value databases (LMDB, LevelDB).
It abstracts the complexities of direct database interaction, allowing users to perform CRUD operations in a simple and
intuitive manner. You can use it just like a Python dictionary without worrying about it blocking your main process at any stage.

**Use Cases**

Expand All @@ -51,7 +57,7 @@ The `flaxkv` module provides a dictionary-like interface for interacting with hi

## Key Features

- **Always up-to-date, never blocking.**: It was designed from the ground up to ensure that no write operations block the user process, while users can always read the most recently written data.
- **Always Up-to-date, Never Blocking**: It was designed from the ground up to ensure that no write operations block the user process, while users can always read the most recently written data.

- **Ease of Use**: Interacting with the database feels just like using a Python dictionary! You don't even have to worry about resource release.

Expand Down Expand Up @@ -106,15 +112,18 @@ for key, value in d.items():
print(len(d))
```

Hey! Did you... forget to call `d.close()` to release resources here?
No, you don't need to manage it manually! It's all just like using a dictionary!
(Of course, you can also manually call `d.close()` to release resources immediately.)
You might have noticed that even when the program ends, we didn't use `d.close()` to release resources!
Everything will be handled automatically.
More importantly, as a persistent database, it offers performance close to dictionary (in-memory) access!
(There should be a benchmark here..)

P.S.: Of course, you can also manually call `d.close()` to release resources immediately~.

## Citation
If `FlaxKV` has been helpful to your research, please cite:
```bibtex
@misc{flaxkv,
title={FlaxKV: An Easy-to-use and High Performance Key-Value Database},
title={FlaxKV: An Easy-to-use and High Performance Key-Value Database Solution},
author={K.Y},
howpublished = {\url{https://github.com/KenyonY/flaxkv}},
year={2023}
Expand Down
34 changes: 21 additions & 13 deletions README_ZH.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
**简体中文** | [**English**](./README.md)

<h1 align="center">
<br>
FlaxKV
<br>
</h1>

<p align="center">
让你忘记自己在使用数据库
简单且高性能的持久化数据库解决方案
让你忘记自己在使用数据库 ——
简单且高性能的持久化数据库解决方案
</p>


Expand All @@ -30,12 +28,22 @@
</a>
</p>

# FlaxKV
<h4 align="center">
<p>
<b>简体中文</b> |
<a href="https://github.com/KenyonY/flaxkv/blob/main/README.md">English</a>
</p>
</h4>

<p >
<br>
</p>



`flaxkv` 模块提供了一个类似字典的接口,用于与高性能键值数据库进行交互。
它抽象了直接数据库交互的复杂性,允许用户以简单直观的方式执行CRUD(创建、读取、更新、删除)操作
你可以直接将它当成python字典来使用而不必关心在任何阶段它会阻塞你的主进程
`flaxkv` 提供了一个类似字典的接口,用于与高性能键值数据库进行交互。
抽象了直接数据库交互的复杂性,允许用户以简单直观的方式执行CRUD操作
你可以直接将它当成python字典来使用而不必担心在任何阶段它会阻塞你的主进程

**适用场景**

Expand Down Expand Up @@ -106,19 +114,19 @@ for key, value in d.items():

print(len(d))
```
也许你注意到即使到程序结束并没有使用到`d.close()`来进行资源释放!这一切都将被自动处理。
更重要的是,它(作为持久化数据库)还提供了接近字典(内存)存取的性能!(这里应存在一个benchmark..)

嘿!所以... 这里是不是忘记调用 `d.close()` 进行资源释放?
不,你并不需要手动去管理! 一切就和使用字典一样!
(当然,也可以手动调用 `d.close()` 来立即释放资源~)
PS: 当然也可以手动调用 `d.close()` 来立即释放资源~




## Citation
## 引用
如果`FlaxKV`对你的研究有帮助,欢迎引用:
```bibtex
@misc{flaxkv,
title={FlaxKV: An Easy-to-use and High Performance Key-Value Database Solutions},
title={FlaxKV: An Easy-to-use and High Performance Key-Value Database Solution},
author={K.Y},
howpublished = {\url{https://github.com/KenyonY/flaxkv}},
year={2023}
Expand Down
11 changes: 5 additions & 6 deletions flaxkv/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
from .base import LevelDBDict, LMDBDict

__version__ = "0.1.1"

__all__ = [
"dbdict",
"LMDBDict",
"LevelDBDict",
]


def dbdict(path, backend='lmdb', recreate=False, **kwargs):
def dbdict(path, backend='lmdb', rebuild=False, **kwargs):
if backend == 'lmdb':
return LMDBDict(path, recreate=recreate, **kwargs)
return LMDBDict(path, rebuild=rebuild, **kwargs)
elif backend == 'leveldb':
return LevelDBDict(path, recreate=recreate)
return LevelDBDict(path, rebuild=rebuild, **kwargs)
else:
raise ValueError(f"Unsupported DB type {backend}.")


__version__ = "0.1.0"
79 changes: 45 additions & 34 deletions flaxkv/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import numpy as np
from loguru import logger

# from .decorators import class_measure_time
from .decorators import class_measure_time
from .helper import SimpleQueue
from .log import setting_log
from .manager import DBManager
from .pack import decode, decode_key, encode


class BaseDBDict(ABC):
MAX_BUFFER_SIZE = 1000
MAX_BUFFER_SIZE = 200
_COMMIT_TIME_INTERVAL = 60 * 60 * 24
_logger = None

Expand All @@ -44,17 +45,17 @@ class _enc_prefix:
dict = b'd'
array = b'a'

def __init__(self, db_type, path, recreate=False, **kwargs):
def __init__(self, db_type, path, rebuild=False, **kwargs):
"""
Initializes the BaseDBDict class which provides a dictionary-like interface to a database.
Args:
db_type (str): Type of the database ("lmdb" or "leveldb").
path (str): Path to the database.
recreate (bool, optional): Whether to recreate the database. Defaults to False.
rebuild (bool, optional): Whether to recreate the database. Defaults to False.
"""
self._db_manager = DBManager(
db_type=db_type, db_path=path, new=recreate, **kwargs
db_type=db_type, db_path=path, rebuild=rebuild, **kwargs
)
self._static_view = self._db_manager.new_static_view()

Expand All @@ -64,9 +65,10 @@ def __init__(self, db_type, path, recreate=False, **kwargs):
self._buffer_lock = threading.Lock()

self._write_event = threading.Event()
self._write_activate = True # to skip writing when close()
self._latest_write_num = 0
self._write_queue = SimpleQueue(maxsize=1)
self._thread_running = True

self._thread = threading.Thread(target=self._background_worker)
self._thread.daemon = True

Expand Down Expand Up @@ -118,30 +120,38 @@ def _background_worker(self):
"""
Background worker function to periodically write buffer to the database.
"""
while self._thread_running:
while self._thread_running or not self._write_queue.empty():

self._write_event.wait(timeout=self._COMMIT_TIME_INTERVAL)
self._write_event.clear()
if self._write_activate:
try:
self._write_buffer_to_db(current_write_num=self._latest_write_num)
except:
# todo:
self._logger.warning(f"Write buffer to db failed. error")

def write_immediately(self):
if not self._write_queue.empty():
value = self._write_queue.get()
if value is False:
break

try:
self._write_buffer_to_db(current_write_num=self._latest_write_num)

except:
# todo:
self._logger.warning(f"Write buffer to db failed. error")

def write_immediately(self, write=True):
"""
Triggers an immediate write of the buffer to the database.
"""
self._write_queue.put(write)
self._write_event.set()

def _close_background_worker(self):
def _close_background_worker(self, write=True):
"""
Stops the background worker thread.
"""
self._latest_write_num += 1
self.write_immediately()
self.write_immediately(write=write)
self._thread_running = False
self._thread.join(timeout=10)
self._thread.join(timeout=30)
if self._thread.is_alive():
self._logger.warning(
"Warning: Background thread did not finish in time. Some data might not be saved."
Expand Down Expand Up @@ -220,7 +230,7 @@ def _set(self, key, value):
self._buffered_count += 1
# Trigger immediate write if buffer size exceeds MAX_BUFFER_SIZE
if self._buffered_count >= self.MAX_BUFFER_SIZE:
print("Trigger immediate write")
self._logger.debug("Trigger immediate write")
self._latest_write_num += 1
self._buffered_count = 0
self.write_immediately()
Expand Down Expand Up @@ -305,12 +315,14 @@ def _write_buffer_to_db(
raise

with self._buffer_lock:
self._logger.debug("reset buffer")
self.delete_buffer_set = self.delete_buffer_set - delete_buffer_set_snapshot
self.buffer_dict = self._diff_buffer(self.buffer_dict, buffer_dict_snapshot)

self._db_manager.close_view(self._static_view)
self._static_view = self._db_manager.new_static_view()
self._logger.info(
f"write {self._db_manager.db_type.upper()} buffer to db successfully-{current_write_num=}-{self._latest_write_num=}"
)

def __getitem__(self, key):
"""
Expand Down Expand Up @@ -403,8 +415,7 @@ def clear(self):
with self._buffer_lock:
self._db_manager.close_view(self._static_view)
self._db_manager.close()
self._write_activate = False
self._close_background_worker()
self._close_background_worker(write=False)

self._db_manager.clear()
self._static_view = self._db_manager.new_static_view()
Expand All @@ -413,7 +424,6 @@ def clear(self):
self.buffer_dict = {}
self.delete_buffer_set = set()
self._buffer_lock = threading.Lock()
self._write_activate = True

self._write_event = threading.Event()
self._latest_write_num = 0
Expand Down Expand Up @@ -450,10 +460,8 @@ def close(self, write=True):
Args:
write (bool, optional): Whether to write the buffer to the database before closing. Defaults to True.
"""
if not write:
self._write_activate = False
self._close_background_worker()
self._write_activate = True
self._close_background_worker(write=write)

self._db_manager.close_view(self._static_view)
self._db_manager.close()
self._logger.info(f"Closed ({self._db_manager.db_type.upper()}) successfully")
Expand Down Expand Up @@ -503,13 +511,16 @@ class LMDBDict(BaseDBDict):
value: int, float, bool, str, list, dict, and np.ndarray,
"""

def __init__(self, path, map_size=1024**3, recreate=False, log=False, **kwargs):
setting_log(save_file=log)
def __init__(
self, path, map_size=1024**3, rebuild=False, log_level=None, **kwargs
):

setting_log(level=log_level, save_file=kwargs.pop('save_log', False))
self._logger = logger.bind(flaxkv=True)
if not log:
if not log_level:
self._logger.remove()
super().__init__(
"lmdb", path, max_dbs=1, map_size=map_size, recreate=recreate, **kwargs
"lmdb", path, max_dbs=1, map_size=map_size, rebuild=rebuild, **kwargs
)

def keys(self):
Expand Down Expand Up @@ -594,12 +605,12 @@ class LevelDBDict(BaseDBDict):
value: int, float, bool, str, list, dict and np.ndarray,
"""

def __init__(self, path, recreate=False, log=False):
setting_log(save_file=log)
def __init__(self, path, rebuild=False, log_level=None, **kwargs):
setting_log(level=log_level, save_file=kwargs.pop('save_log', False))
self._logger = logger.bind(flaxkv=True)
if not log:
if not log_level:
self._logger.remove()
super().__init__("leveldb", path=path, recreate=recreate)
super().__init__("leveldb", path=path, rebuild=rebuild)

def keys(self):
with self._buffer_lock:
Expand Down
2 changes: 1 addition & 1 deletion flaxkv/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from rich import print
from rich.text import Text

ENABLED_MEASURE_TIME_DECORATOR = False
ENABLED_MEASURE_TIME_DECORATOR = True


def class_measure_time(logger=None, level=logging.INFO, prec=3):
Expand Down
19 changes: 19 additions & 0 deletions flaxkv/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import queue


class SimpleQueue:
def __init__(self, maxsize: int):
self.q = queue.Queue(maxsize=maxsize)

def put(self, item):
if not self.q.full():
self.q.put(item)
else:
self.q.get()
self.q.put(item)

def get(self, block=True, timeout=None):
return self.q.get(block=block, timeout=timeout)

def empty(self):
return self.q.empty()
Loading

0 comments on commit 9ff4a3f

Please sign in to comment.