Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixed data integrity during high-frequency writing #1

Merged
merged 2 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@

**English** | [**简体中文**](./README_ZH.md)

<h1 align="center">
<br>
FlaxKV
<br>
</h1>


<p align="center">
Let you forget you're using a database.
Simple and high-performance persistent database solutions.
Let you forget you're using a database
Simple and high-performance persistent database solution
</p>
<p align="center">
<a href="https://pypi.org/project/flaxkv/">
Expand All @@ -29,15 +27,23 @@ Simple and high-performance persistent database solutions.
</a>
</p>

<h4 align="center">
<p>
<b>English</b> |
<a href="https://github.com/KenyonY/flaxkv/blob/main/README_ZH.md">简体中文</a>
</p>
</h4>

<p >
<br>
</p>



# FlaxKV

A persistent database masquerading as a dictionary.

The `flaxkv` module provides a dictionary-like interface for interacting with high-performance key-value databases (LMDB, LevelDB). It abstracts the complexities of direct database interaction, allowing users to perform CRUD (Create, Read, Update, Delete) operations in a simple and intuitive manner. You can use it just like a Python dictionary without worrying about it blocking your main process at any stage.
The `flaxkv` module provides a dictionary-like interface for interacting with high-performance key-value databases (LMDB, LevelDB).
It abstracts the complexities of direct database interaction, allowing users to perform CRUD operations in a simple and
intuitive manner. You can use it just like a Python dictionary without worrying about it blocking your main process at any stage.

**Use Cases**

Expand All @@ -51,7 +57,7 @@ The `flaxkv` module provides a dictionary-like interface for interacting with hi

## Key Features

- **Always up-to-date, never blocking.**: It was designed from the ground up to ensure that no write operations block the user process, while users can always read the most recently written data.
- **Always Up-to-date, Never Blocking**: It was designed from the ground up to ensure that no write operations block the user process, while users can always read the most recently written data.

- **Ease of Use**: Interacting with the database feels just like using a Python dictionary! You don't even have to worry about resource release.

Expand Down Expand Up @@ -106,15 +112,18 @@ for key, value in d.items():
print(len(d))
```

Hey! Did you... forget to call `d.close()` to release resources here?
No, you don't need to manage it manually! It's all just like using a dictionary!
(Of course, you can also manually call `d.close()` to release resources immediately.)
You might have noticed that even when the program ends, we didn't use `d.close()` to release resources!
Everything will be handled automatically.
More importantly, as a persistent database, it offers performance close to dictionary (in-memory) access!
(There should be a benchmark here..)

P.S.: Of course, you can also manually call `d.close()` to release resources immediately~.

## Citation
If `FlaxKV` has been helpful to your research, please cite:
```bibtex
@misc{flaxkv,
title={FlaxKV: An Easy-to-use and High Performance Key-Value Database},
title={FlaxKV: An Easy-to-use and High Performance Key-Value Database Solution},
author={K.Y},
howpublished = {\url{https://github.com/KenyonY/flaxkv}},
year={2023}
Expand Down
34 changes: 21 additions & 13 deletions README_ZH.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
**简体中文** | [**English**](./README.md)

<h1 align="center">
<br>
FlaxKV
<br>
</h1>

<p align="center">
让你忘记自己在使用数据库
简单且高性能的持久化数据库解决方案
让你忘记自己在使用数据库 ——
简单且高性能的持久化数据库解决方案
</p>


Expand All @@ -30,12 +28,22 @@
</a>
</p>

# FlaxKV
<h4 align="center">
<p>
<b>简体中文</b> |
<a href="https://github.com/KenyonY/flaxkv/blob/main/README.md">English</a>
</p>
</h4>

<p >
<br>
</p>



`flaxkv` 模块提供了一个类似字典的接口,用于与高性能键值数据库进行交互。
它抽象了直接数据库交互的复杂性,允许用户以简单直观的方式执行CRUD(创建、读取、更新、删除)操作
你可以直接将它当成python字典来使用而不必关心在任何阶段它会阻塞你的主进程
`flaxkv` 提供了一个类似字典的接口,用于与高性能键值数据库进行交互。
抽象了直接数据库交互的复杂性,允许用户以简单直观的方式执行CRUD操作
你可以直接将它当成python字典来使用而不必担心在任何阶段它会阻塞你的主进程

**适用场景**

Expand Down Expand Up @@ -106,19 +114,19 @@ for key, value in d.items():

print(len(d))
```
也许你注意到即使到程序结束并没有使用到`d.close()`来进行资源释放!这一切都将被自动处理。
更重要的是,它(作为持久化数据库)还提供了接近字典(内存)存取的性能!(这里应存在一个benchmark..)

嘿!所以... 这里是不是忘记调用 `d.close()` 进行资源释放?
不,你并不需要手动去管理! 一切就和使用字典一样!
(当然,也可以手动调用 `d.close()` 来立即释放资源~)
PS: 当然也可以手动调用 `d.close()` 来立即释放资源~




## Citation
## 引用
如果`FlaxKV`对你的研究有帮助,欢迎引用:
```bibtex
@misc{flaxkv,
title={FlaxKV: An Easy-to-use and High Performance Key-Value Database Solutions},
title={FlaxKV: An Easy-to-use and High Performance Key-Value Database Solution},
author={K.Y},
howpublished = {\url{https://github.com/KenyonY/flaxkv}},
year={2023}
Expand Down
11 changes: 5 additions & 6 deletions flaxkv/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
from .base import LevelDBDict, LMDBDict

__version__ = "0.1.1"

__all__ = [
"dbdict",
"LMDBDict",
"LevelDBDict",
]


def dbdict(path, backend='lmdb', recreate=False, **kwargs):
def dbdict(path, backend='lmdb', rebuild=False, **kwargs):
if backend == 'lmdb':
return LMDBDict(path, recreate=recreate, **kwargs)
return LMDBDict(path, rebuild=rebuild, **kwargs)
elif backend == 'leveldb':
return LevelDBDict(path, recreate=recreate)
return LevelDBDict(path, rebuild=rebuild, **kwargs)
else:
raise ValueError(f"Unsupported DB type {backend}.")


__version__ = "0.1.0"
79 changes: 45 additions & 34 deletions flaxkv/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import numpy as np
from loguru import logger

# from .decorators import class_measure_time
from .decorators import class_measure_time
from .helper import SimpleQueue
from .log import setting_log
from .manager import DBManager
from .pack import decode, decode_key, encode


class BaseDBDict(ABC):
MAX_BUFFER_SIZE = 1000
MAX_BUFFER_SIZE = 200
_COMMIT_TIME_INTERVAL = 60 * 60 * 24
_logger = None

Expand All @@ -44,17 +45,17 @@ class _enc_prefix:
dict = b'd'
array = b'a'

def __init__(self, db_type, path, recreate=False, **kwargs):
def __init__(self, db_type, path, rebuild=False, **kwargs):
"""
Initializes the BaseDBDict class which provides a dictionary-like interface to a database.

Args:
db_type (str): Type of the database ("lmdb" or "leveldb").
path (str): Path to the database.
recreate (bool, optional): Whether to recreate the database. Defaults to False.
rebuild (bool, optional): Whether to recreate the database. Defaults to False.
"""
self._db_manager = DBManager(
db_type=db_type, db_path=path, new=recreate, **kwargs
db_type=db_type, db_path=path, rebuild=rebuild, **kwargs
)
self._static_view = self._db_manager.new_static_view()

Expand All @@ -64,9 +65,10 @@ def __init__(self, db_type, path, recreate=False, **kwargs):
self._buffer_lock = threading.Lock()

self._write_event = threading.Event()
self._write_activate = True # to skip writing when close()
self._latest_write_num = 0
self._write_queue = SimpleQueue(maxsize=1)
self._thread_running = True

self._thread = threading.Thread(target=self._background_worker)
self._thread.daemon = True

Expand Down Expand Up @@ -118,30 +120,38 @@ def _background_worker(self):
"""
Background worker function to periodically write buffer to the database.
"""
while self._thread_running:
while self._thread_running or not self._write_queue.empty():

self._write_event.wait(timeout=self._COMMIT_TIME_INTERVAL)
self._write_event.clear()
if self._write_activate:
try:
self._write_buffer_to_db(current_write_num=self._latest_write_num)
except:
# todo:
self._logger.warning(f"Write buffer to db failed. error")

def write_immediately(self):
if not self._write_queue.empty():
value = self._write_queue.get()
if value is False:
break

try:
self._write_buffer_to_db(current_write_num=self._latest_write_num)

except:
# todo:
self._logger.warning(f"Write buffer to db failed. error")

def write_immediately(self, write=True):
"""
Triggers an immediate write of the buffer to the database.
"""
self._write_queue.put(write)
self._write_event.set()

def _close_background_worker(self):
def _close_background_worker(self, write=True):
"""
Stops the background worker thread.
"""
self._latest_write_num += 1
self.write_immediately()
self.write_immediately(write=write)
self._thread_running = False
self._thread.join(timeout=10)
self._thread.join(timeout=30)
if self._thread.is_alive():
self._logger.warning(
"Warning: Background thread did not finish in time. Some data might not be saved."
Expand Down Expand Up @@ -220,7 +230,7 @@ def _set(self, key, value):
self._buffered_count += 1
# Trigger immediate write if buffer size exceeds MAX_BUFFER_SIZE
if self._buffered_count >= self.MAX_BUFFER_SIZE:
print("Trigger immediate write")
self._logger.debug("Trigger immediate write")
self._latest_write_num += 1
self._buffered_count = 0
self.write_immediately()
Expand Down Expand Up @@ -305,12 +315,14 @@ def _write_buffer_to_db(
raise

with self._buffer_lock:
self._logger.debug("reset buffer")
self.delete_buffer_set = self.delete_buffer_set - delete_buffer_set_snapshot
self.buffer_dict = self._diff_buffer(self.buffer_dict, buffer_dict_snapshot)

self._db_manager.close_view(self._static_view)
self._static_view = self._db_manager.new_static_view()
self._logger.info(
f"write {self._db_manager.db_type.upper()} buffer to db successfully-{current_write_num=}-{self._latest_write_num=}"
)

def __getitem__(self, key):
"""
Expand Down Expand Up @@ -403,8 +415,7 @@ def clear(self):
with self._buffer_lock:
self._db_manager.close_view(self._static_view)
self._db_manager.close()
self._write_activate = False
self._close_background_worker()
self._close_background_worker(write=False)

self._db_manager.clear()
self._static_view = self._db_manager.new_static_view()
Expand All @@ -413,7 +424,6 @@ def clear(self):
self.buffer_dict = {}
self.delete_buffer_set = set()
self._buffer_lock = threading.Lock()
self._write_activate = True

self._write_event = threading.Event()
self._latest_write_num = 0
Expand Down Expand Up @@ -450,10 +460,8 @@ def close(self, write=True):
Args:
write (bool, optional): Whether to write the buffer to the database before closing. Defaults to True.
"""
if not write:
self._write_activate = False
self._close_background_worker()
self._write_activate = True
self._close_background_worker(write=write)

self._db_manager.close_view(self._static_view)
self._db_manager.close()
self._logger.info(f"Closed ({self._db_manager.db_type.upper()}) successfully")
Expand Down Expand Up @@ -503,13 +511,16 @@ class LMDBDict(BaseDBDict):
value: int, float, bool, str, list, dict, and np.ndarray,
"""

def __init__(self, path, map_size=1024**3, recreate=False, log=False, **kwargs):
setting_log(save_file=log)
def __init__(
self, path, map_size=1024**3, rebuild=False, log_level=None, **kwargs
):

setting_log(level=log_level, save_file=kwargs.pop('save_log', False))
self._logger = logger.bind(flaxkv=True)
if not log:
if not log_level:
self._logger.remove()
super().__init__(
"lmdb", path, max_dbs=1, map_size=map_size, recreate=recreate, **kwargs
"lmdb", path, max_dbs=1, map_size=map_size, rebuild=rebuild, **kwargs
)

def keys(self):
Expand Down Expand Up @@ -594,12 +605,12 @@ class LevelDBDict(BaseDBDict):
value: int, float, bool, str, list, dict and np.ndarray,
"""

def __init__(self, path, recreate=False, log=False):
setting_log(save_file=log)
def __init__(self, path, rebuild=False, log_level=None, **kwargs):
setting_log(level=log_level, save_file=kwargs.pop('save_log', False))
self._logger = logger.bind(flaxkv=True)
if not log:
if not log_level:
self._logger.remove()
super().__init__("leveldb", path=path, recreate=recreate)
super().__init__("leveldb", path=path, rebuild=rebuild)

def keys(self):
with self._buffer_lock:
Expand Down
2 changes: 1 addition & 1 deletion flaxkv/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from rich import print
from rich.text import Text

ENABLED_MEASURE_TIME_DECORATOR = False
ENABLED_MEASURE_TIME_DECORATOR = True


def class_measure_time(logger=None, level=logging.INFO, prec=3):
Expand Down
19 changes: 19 additions & 0 deletions flaxkv/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import queue


class SimpleQueue:
def __init__(self, maxsize: int):
self.q = queue.Queue(maxsize=maxsize)

def put(self, item):
if not self.q.full():
self.q.put(item)
else:
self.q.get()
self.q.put(item)

def get(self, block=True, timeout=None):
return self.q.get(block=block, timeout=timeout)

def empty(self):
return self.q.empty()
Loading
Loading