diff --git a/README.md b/README.md index 0c0d0d5..6aaf67f 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,13 @@ -**English** | [**简体中文**](./README_ZH.md) -


FlaxKV -

+

-Let you forget you're using a database. -Simple and high-performance persistent database solutions. +Let you forget you're using a database — +Simple and high-performance persistent database solution

@@ -29,15 +27,23 @@ Simple and high-performance persistent database solutions.

+

+

+ English | + 简体中文 +

+

+

+
+

- -# FlaxKV - A persistent database masquerading as a dictionary. -The `flaxkv` module provides a dictionary-like interface for interacting with high-performance key-value databases (LMDB, LevelDB). It abstracts the complexities of direct database interaction, allowing users to perform CRUD (Create, Read, Update, Delete) operations in a simple and intuitive manner. You can use it just like a Python dictionary without worrying about it blocking your main process at any stage. +The `flaxkv` module provides a dictionary-like interface for interacting with high-performance key-value databases (LMDB, LevelDB). +It abstracts the complexities of direct database interaction, allowing users to perform CRUD operations in a simple and +intuitive manner. You can use it just like a Python dictionary without worrying about it blocking your main process at any stage. **Use Cases** @@ -51,7 +57,7 @@ The `flaxkv` module provides a dictionary-like interface for interacting with hi ## Key Features -- **Always up-to-date, never blocking.**: It was designed from the ground up to ensure that no write operations block the user process, while users can always read the most recently written data. +- **Always Up-to-date, Never Blocking**: It was designed from the ground up to ensure that no write operations block the user process, while users can always read the most recently written data. - **Ease of Use**: Interacting with the database feels just like using a Python dictionary! You don't even have to worry about resource release. @@ -106,15 +112,18 @@ for key, value in d.items(): print(len(d)) ``` -Hey! Did you... forget to call `d.close()` to release resources here? -No, you don't need to manage it manually! It's all just like using a dictionary! -(Of course, you can also manually call `d.close()` to release resources immediately.) +You might have noticed that even when the program ends, we didn't use `d.close()` to release resources! +Everything will be handled automatically. +More importantly, as a persistent database, it offers performance close to dictionary (in-memory) access! +(There should be a benchmark here..) + +P.S.: Of course, you can also manually call `d.close()` to release resources immediately~. ## Citation If `FlaxKV` has been helpful to your research, please cite: ```bibtex @misc{flaxkv, - title={FlaxKV: An Easy-to-use and High Performance Key-Value Database}, + title={FlaxKV: An Easy-to-use and High Performance Key-Value Database Solution}, author={K.Y}, howpublished = {\url{https://github.com/KenyonY/flaxkv}}, year={2023} diff --git a/README_ZH.md b/README_ZH.md index 0853a62..2288bb4 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -1,14 +1,12 @@ -**简体中文** | [**English**](./README.md)


FlaxKV -

-让你忘记自己在使用数据库。 -简单且高性能的持久化数据库解决方案。 +让你忘记自己在使用数据库 —— +简单且高性能的持久化数据库解决方案

@@ -30,12 +28,22 @@

-# FlaxKV +

+

+ 简体中文 | + English +

+

+ +

+
+

+ -`flaxkv` 模块提供了一个类似字典的接口,用于与高性能键值数据库进行交互。 -它抽象了直接数据库交互的复杂性,允许用户以简单直观的方式执行CRUD(创建、读取、更新、删除)操作。 -你可以直接将它当成python字典来使用而不必关心在任何阶段它会阻塞你的主进程。 +`flaxkv` 提供了一个类似字典的接口,用于与高性能键值数据库进行交互。 +抽象了直接数据库交互的复杂性,允许用户以简单直观的方式执行CRUD操作。 +你可以直接将它当成python字典来使用而不必担心在任何阶段它会阻塞你的主进程。 **适用场景** @@ -106,19 +114,19 @@ for key, value in d.items(): print(len(d)) ``` +也许你注意到即使到程序结束并没有使用到`d.close()`来进行资源释放!这一切都将被自动处理。 +更重要的是,它(作为持久化数据库)还提供了接近字典(内存)存取的性能!(这里应存在一个benchmark..) -嘿!所以... 这里是不是忘记调用 `d.close()` 进行资源释放? -不,你并不需要手动去管理! 一切就和使用字典一样! -(当然,也可以手动调用 `d.close()` 来立即释放资源~) +PS: 当然也可以手动调用 `d.close()` 来立即释放资源~ -## Citation +## 引用 如果`FlaxKV`对你的研究有帮助,欢迎引用: ```bibtex @misc{flaxkv, - title={FlaxKV: An Easy-to-use and High Performance Key-Value Database Solutions}, + title={FlaxKV: An Easy-to-use and High Performance Key-Value Database Solution}, author={K.Y}, howpublished = {\url{https://github.com/KenyonY/flaxkv}}, year={2023} diff --git a/flaxkv/__init__.py b/flaxkv/__init__.py index c587505..80152ab 100644 --- a/flaxkv/__init__.py +++ b/flaxkv/__init__.py @@ -1,5 +1,7 @@ from .base import LevelDBDict, LMDBDict +__version__ = "0.1.1" + __all__ = [ "dbdict", "LMDBDict", @@ -7,13 +9,10 @@ ] -def dbdict(path, backend='lmdb', recreate=False, **kwargs): +def dbdict(path, backend='lmdb', rebuild=False, **kwargs): if backend == 'lmdb': - return LMDBDict(path, recreate=recreate, **kwargs) + return LMDBDict(path, rebuild=rebuild, **kwargs) elif backend == 'leveldb': - return LevelDBDict(path, recreate=recreate) + return LevelDBDict(path, rebuild=rebuild, **kwargs) else: raise ValueError(f"Unsupported DB type {backend}.") - - -__version__ = "0.1.0" diff --git a/flaxkv/base.py b/flaxkv/base.py index 95c7530..97c7b7e 100644 --- a/flaxkv/base.py +++ b/flaxkv/base.py @@ -21,14 +21,15 @@ import numpy as np from loguru import logger -# from .decorators import class_measure_time +from .decorators import class_measure_time +from .helper import SimpleQueue from .log import setting_log from .manager import DBManager from .pack import decode, decode_key, encode class BaseDBDict(ABC): - MAX_BUFFER_SIZE = 1000 + MAX_BUFFER_SIZE = 200 _COMMIT_TIME_INTERVAL = 60 * 60 * 24 _logger = None @@ -44,17 +45,17 @@ class _enc_prefix: dict = b'd' array = b'a' - def __init__(self, db_type, path, recreate=False, **kwargs): + def __init__(self, db_type, path, rebuild=False, **kwargs): """ Initializes the BaseDBDict class which provides a dictionary-like interface to a database. Args: db_type (str): Type of the database ("lmdb" or "leveldb"). path (str): Path to the database. - recreate (bool, optional): Whether to recreate the database. Defaults to False. + rebuild (bool, optional): Whether to recreate the database. Defaults to False. """ self._db_manager = DBManager( - db_type=db_type, db_path=path, new=recreate, **kwargs + db_type=db_type, db_path=path, rebuild=rebuild, **kwargs ) self._static_view = self._db_manager.new_static_view() @@ -64,9 +65,10 @@ def __init__(self, db_type, path, recreate=False, **kwargs): self._buffer_lock = threading.Lock() self._write_event = threading.Event() - self._write_activate = True # to skip writing when close() self._latest_write_num = 0 + self._write_queue = SimpleQueue(maxsize=1) self._thread_running = True + self._thread = threading.Thread(target=self._background_worker) self._thread.daemon = True @@ -118,30 +120,38 @@ def _background_worker(self): """ Background worker function to periodically write buffer to the database. """ - while self._thread_running: + while self._thread_running or not self._write_queue.empty(): + self._write_event.wait(timeout=self._COMMIT_TIME_INTERVAL) self._write_event.clear() - if self._write_activate: - try: - self._write_buffer_to_db(current_write_num=self._latest_write_num) - except: - # todo: - self._logger.warning(f"Write buffer to db failed. error") - def write_immediately(self): + if not self._write_queue.empty(): + value = self._write_queue.get() + if value is False: + break + + try: + self._write_buffer_to_db(current_write_num=self._latest_write_num) + + except: + # todo: + self._logger.warning(f"Write buffer to db failed. error") + + def write_immediately(self, write=True): """ Triggers an immediate write of the buffer to the database. """ + self._write_queue.put(write) self._write_event.set() - def _close_background_worker(self): + def _close_background_worker(self, write=True): """ Stops the background worker thread. """ self._latest_write_num += 1 - self.write_immediately() + self.write_immediately(write=write) self._thread_running = False - self._thread.join(timeout=10) + self._thread.join(timeout=30) if self._thread.is_alive(): self._logger.warning( "Warning: Background thread did not finish in time. Some data might not be saved." @@ -220,7 +230,7 @@ def _set(self, key, value): self._buffered_count += 1 # Trigger immediate write if buffer size exceeds MAX_BUFFER_SIZE if self._buffered_count >= self.MAX_BUFFER_SIZE: - print("Trigger immediate write") + self._logger.debug("Trigger immediate write") self._latest_write_num += 1 self._buffered_count = 0 self.write_immediately() @@ -305,12 +315,14 @@ def _write_buffer_to_db( raise with self._buffer_lock: - self._logger.debug("reset buffer") self.delete_buffer_set = self.delete_buffer_set - delete_buffer_set_snapshot self.buffer_dict = self._diff_buffer(self.buffer_dict, buffer_dict_snapshot) self._db_manager.close_view(self._static_view) self._static_view = self._db_manager.new_static_view() + self._logger.info( + f"write {self._db_manager.db_type.upper()} buffer to db successfully-{current_write_num=}-{self._latest_write_num=}" + ) def __getitem__(self, key): """ @@ -403,8 +415,7 @@ def clear(self): with self._buffer_lock: self._db_manager.close_view(self._static_view) self._db_manager.close() - self._write_activate = False - self._close_background_worker() + self._close_background_worker(write=False) self._db_manager.clear() self._static_view = self._db_manager.new_static_view() @@ -413,7 +424,6 @@ def clear(self): self.buffer_dict = {} self.delete_buffer_set = set() self._buffer_lock = threading.Lock() - self._write_activate = True self._write_event = threading.Event() self._latest_write_num = 0 @@ -450,10 +460,8 @@ def close(self, write=True): Args: write (bool, optional): Whether to write the buffer to the database before closing. Defaults to True. """ - if not write: - self._write_activate = False - self._close_background_worker() - self._write_activate = True + self._close_background_worker(write=write) + self._db_manager.close_view(self._static_view) self._db_manager.close() self._logger.info(f"Closed ({self._db_manager.db_type.upper()}) successfully") @@ -503,13 +511,16 @@ class LMDBDict(BaseDBDict): value: int, float, bool, str, list, dict, and np.ndarray, """ - def __init__(self, path, map_size=1024**3, recreate=False, log=False, **kwargs): - setting_log(save_file=log) + def __init__( + self, path, map_size=1024**3, rebuild=False, log_level=None, **kwargs + ): + + setting_log(level=log_level, save_file=kwargs.pop('save_log', False)) self._logger = logger.bind(flaxkv=True) - if not log: + if not log_level: self._logger.remove() super().__init__( - "lmdb", path, max_dbs=1, map_size=map_size, recreate=recreate, **kwargs + "lmdb", path, max_dbs=1, map_size=map_size, rebuild=rebuild, **kwargs ) def keys(self): @@ -594,12 +605,12 @@ class LevelDBDict(BaseDBDict): value: int, float, bool, str, list, dict and np.ndarray, """ - def __init__(self, path, recreate=False, log=False): - setting_log(save_file=log) + def __init__(self, path, rebuild=False, log_level=None, **kwargs): + setting_log(level=log_level, save_file=kwargs.pop('save_log', False)) self._logger = logger.bind(flaxkv=True) - if not log: + if not log_level: self._logger.remove() - super().__init__("leveldb", path=path, recreate=recreate) + super().__init__("leveldb", path=path, rebuild=rebuild) def keys(self): with self._buffer_lock: diff --git a/flaxkv/decorators.py b/flaxkv/decorators.py index 54c876e..cc633c2 100644 --- a/flaxkv/decorators.py +++ b/flaxkv/decorators.py @@ -5,7 +5,7 @@ from rich import print from rich.text import Text -ENABLED_MEASURE_TIME_DECORATOR = False +ENABLED_MEASURE_TIME_DECORATOR = True def class_measure_time(logger=None, level=logging.INFO, prec=3): diff --git a/flaxkv/helper.py b/flaxkv/helper.py new file mode 100644 index 0000000..0bb8b1f --- /dev/null +++ b/flaxkv/helper.py @@ -0,0 +1,19 @@ +import queue + + +class SimpleQueue: + def __init__(self, maxsize: int): + self.q = queue.Queue(maxsize=maxsize) + + def put(self, item): + if not self.q.full(): + self.q.put(item) + else: + self.q.get() + self.q.put(item) + + def get(self, block=True, timeout=None): + return self.q.get(block=block, timeout=timeout) + + def empty(self): + return self.q.empty() diff --git a/flaxkv/log.py b/flaxkv/log.py index 09ce0ef..e16244f 100644 --- a/flaxkv/log.py +++ b/flaxkv/log.py @@ -7,13 +7,14 @@ from loguru import logger -def setting_log( - save_file=False, - multi_process=True, -): +def setting_log(level=None, multi_process=True, save_file=True): """ Configures the logging settings for the application. """ + if level is None: + # https://loguru.readthedocs.io/en/stable/api/logger.html + level = "CRITICAL" + save_file = False tz = os.environ.get("TZ", "").strip() if tz and hasattr(time, "tzset"): @@ -30,7 +31,7 @@ def get_utc_offset(timezone_str): config_handlers = [ { "sink": sys.stdout, - "level": "DEBUG", + "level": level, "filter": lambda record: "flaxkv" in record["extra"], }, ] @@ -40,7 +41,7 @@ def get_utc_offset(timezone_str): "sink": f"./Log/flaxkv.log", "enqueue": multi_process, "rotation": "100 MB", - "level": "DEBUG", + "level": level, "filter": lambda record: "flaxkv" in record["extra"], } ] diff --git a/flaxkv/manager.py b/flaxkv/manager.py index 5df0ec8..af1ec27 100644 --- a/flaxkv/manager.py +++ b/flaxkv/manager.py @@ -19,18 +19,18 @@ class DBManager: - def __init__(self, db_type: str, db_path: str, new=False, **kwargs): + def __init__(self, db_type: str, db_path: str, rebuild=False, **kwargs): """ Initializes the database manager. Args: db_type (str): Type of the database ("lmdb" or "leveldb"). db_path (str): Path to the database. - new (bool, optional): Whether to create a new database. Defaults to False. + rebuild (bool, optional): Whether to create a new database. Defaults to False. """ self.db_type = db_type.lower() self.db_path = db_path - if new: + if rebuild: self.delete_db() self.env = self.connect()