Skip to content

Commit

Permalink
feat: Improve code robustness (#9)
Browse files Browse the repository at this point in the history
* Improve code robustness.

* Update README

* Add api tests

* fix pytest
  • Loading branch information
KenyonY authored Dec 9, 2023
1 parent ba4698c commit f2617e4
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 78 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CHACHE_SQLITE3.db
CACHE_LEVELDB
Log
FLAXKV_DB
test_db/

.idea
.ipynb_checkpoints
Expand Down
38 changes: 19 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,8 @@ A high-performance dictionary database.
</p>


The `flaxkv` module provides a dictionary-like interface for interacting with high-performance key-value databases (LMDB, LevelDB).
It abstracts the complexities of direct database interaction, allowing users to perform CRUD operations in a simple and
intuitive manner. You can use it just like a Python dictionary without worrying about it blocking your main process at any stage.

**Use Cases**

- **Key-Value Structure**: `flaxkv` is suitable for storing simple key-value structured datasets.

- **High-Frequency Writing**: `flaxkv` is very suitable for scenarios that require high-frequency insertion/updating of data.

- **Machine Learning**: `flaxkv` is perfect for storing various embeddings, images, texts, and other large datasets with key-value structures in machine learning.

The `flaxkv` provides an interface very similar to a dictionary for interacting with high-performance key-value databases. More importantly, as a persistent database, it offers performance close to that of native dictionaries (in-memory access).
You can use it just like a Python dictionary without having to worry about blocking your user process when operating the database at any time.
---

## Key Features
Expand Down Expand Up @@ -86,7 +76,7 @@ import numpy as np

db = dictdb('./test_db')
# or run server `flaxkv run --port 8000`, then:
# db = dictdb('http://localhost:8000', remote=True)
# db = dictdb('http://localhost:8000', remote=True, db_name='test_db', rebuild=False)

db[1] = 1
db[1.1] = 1 / 3
Expand All @@ -112,12 +102,22 @@ for key, value in db.items():
print(len(db))
```

You might have noticed that even when the program ends, we didn't use `db.close()` to release resources!
Everything will be handled automatically.
More importantly, as a persistent database, it offers performance close to dictionary (in-memory) access!
(There should be a benchmark here.)

P.S.: Of course, you can also manually call `db.close()` to release resources immediately.
### Tips
- `flaxkv` provides performance close to native dictionary (in-memory) access as a persistent database! (There should be a benchmark here)
- You may have noticed that in the previous example code, `db.close()` was not used to release resources! Because all this will be automatically handled by `flaxkv`. Of course, you can also manually call db.close() to immediately release resources.
- Since `flaxkv` saves data by buffered writing, this feature of delayed writing may not write data to the disk in time in some scenarios (such as in Jupyter),
in this case, you can use `db.write_immediately()` to immediately trigger a write operation.

### Benchmark
todo

### Use Cases
- **Key-Value Structure:**
Used to save simple key-value structure data.
- **High-Frequency Writing:**
Very suitable for scenarios that require high-frequency insertion/update of data.
- **Machine Learning:**
`flaxkv` is very suitable for saving various large datasets of embeddings, images, texts, and other key-value structures in machine learning.

## Citation
If `FlaxKV` has been helpful to your research, please cite:
Expand Down
39 changes: 23 additions & 16 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,8 @@



`flaxkv` 提供了一个类似字典的接口,用于与高性能键值数据库进行交互。
抽象了直接数据库交互的复杂性,允许用户以简单直观的方式执行CRUD操作。
你可以直接将它当成python字典来使用而不必担心在任何阶段它会阻塞你的主进程。

**适用场景**

- **键-值型结构**
`flaxkv` 适合于保存简单的键值结构数据集。
- **高频写入**
`flaxkv` 非常适合那些需要高频插入/更新数据的场景。
- **机器学习**
`flaxkv`十分适合用于保存机器学习中的各种嵌入向量、图像、文本和其它键-值结构的大型数据集。
`flaxkv` 提供了一个非常类似字典的接口,用于与高性能键值数据库进行交互。更重要的是,它作为持久化数据库提供了接近原生字典(内存)存取的性能。
你可以直接将它当成python字典来使用而不必担心在任何时候操作数据库时会阻塞你的用户进程。

---

Expand Down Expand Up @@ -91,7 +81,7 @@ import numpy as np

db = dictdb('./test_db')
# or run server `flaxkv run --port 8000`, then:
# db = dictdb('http://localhost:8000', remote=True)
# db = dictdb('http://localhost:8000', remote=True, db_name='test_db', rebuild=False)

db[1] = 1
db[1.1] = 1 / 3
Expand All @@ -116,10 +106,27 @@ for key, value in db.items():

print(len(db))
```
也许你注意到即使到程序结束并没有使用到`db.close()`来进行资源释放!这一切都将被自动处理。
更重要的是,它(作为持久化数据库)还提供了接近字典(内存)存取的性能!(这里应存在一个benchmark..)

PS: 当然也可以手动调用 `db.close()` 来立即释放资源

### Tips

- `flaxkv`作为持久化数据库提供了接近原生字典(内存)存取的性能!(这里应存在一个benchmark)
- 也许你注意到在前面的示例代码中并没有使用到`db.close()`来进行资源释放!因为这一切都将被`flaxkv`自动处理。 当然也可以手动调用 `db.close()` 来立即释放资源
- 由于`flaxkv`通过缓冲写入的方式来保存数据,这种延迟写入的特性在一些场景(如jupyter中)下将不能及时将数据写入磁盘,
此时可使用`db.write_immediately()`来立即触发写入操作。

### Benchmark
todo


### 适用场景

- **键-值型结构**
用于保存简单的键值结构数据
- **高频写入**
非常适合需要高频插入/更新数据的场景
- **机器学习**
`flaxkv`十分适合用于保存机器学习中的各种嵌入向量、图像、文本和其它键-值结构的大型数据集。


## 引用
Expand Down
2 changes: 1 addition & 1 deletion flaxkv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .core import LevelDBDict, LMDBDict
from .serve.client import RemoteDictDB

__version__ = "0.1.6"
__version__ = "0.1.7"

__all__ = [
"dictdb",
Expand Down
82 changes: 48 additions & 34 deletions flaxkv/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ def _close_background_worker(self, write=True):
"Warning: Background thread did not finish in time. Some data might not be saved."
)

def _encode_key(self, key):
if self.raw:
return key
else:
return encode(key)

def _encode_value(self, value):
if self.raw:
return value
else:
return encode(value)

def get(self, key: Any, default=None):
"""
Retrieves the value associated with the given key.
Expand All @@ -201,16 +213,13 @@ def get(self, key: Any, default=None):
if key in self.buffer_dict:
return self.buffer_dict[key]

if self.raw:
value = self._static_view.get(key)
else:
value = self._static_view.get(encode(key))
key = self._encode_key(key)
value = self._static_view.get(key)

if value is None:
return default
if self.raw:
return value
else:
return decode(value)

return value if self.raw else decode(value)

def get_db_value(self, key: str):
"""
Expand All @@ -222,10 +231,8 @@ def get_db_value(self, key: str):
Returns:
value: The encoded value associated with the key.
"""
if self.raw:
return self._static_view.get(key)
else:
return self._static_view.get(encode(key))
key = self._encode_key(key)
return self._static_view.get(key)

def get_batch(self, keys):
"""
Expand All @@ -245,7 +252,8 @@ def get_batch(self, keys):
if key in self.buffer_dict:
values.append(self.buffer_dict[key])
continue
value = self._static_view.get(encode(key))
key = self._encode_key(key)
value = self._static_view.get(key)
if value is not None:
value = decode(value)
values.append(value)
Expand Down Expand Up @@ -340,15 +348,11 @@ def _write_buffer_to_db(
with self._db_manager.write() as wb:
try:
for key, value in buffer_dict_snapshot.items():
if self.raw:
wb.put(key, value)
else:
wb.put(encode(key), encode(value))
key, value = self._encode_key(key), self._encode_value(value)
wb.put(key, value)
for key in delete_buffer_set_snapshot:
if self.raw:
wb.delete(key)
else:
wb.delete(encode(key))
key = self._encode_key(key)
wb.delete(key)

except Exception as e:
traceback.print_exc()
Expand All @@ -368,6 +372,12 @@ def _write_buffer_to_db(
f"write {self._db_manager.db_type.upper()} buffer to db successfully-{current_write_num=}-{self._latest_write_num=}"
)

def __iter__(self):
"""
Returns an iterator over the keys.
"""
return iter(self.keys())

def __getitem__(self, key):
"""
Retrieves the value for a given key using the dictionary access syntax.
Expand All @@ -378,8 +388,9 @@ def __getitem__(self, key):
Returns:
value: The value associated with the key.
"""
value = self.get(key)
if value is None:

value = self.get(key, b'iamnone')
if isinstance(value, bytes) and value == b'iamnone':
raise KeyError(f"Key `{key}` not found in the database.")
return value

Expand Down Expand Up @@ -432,10 +443,8 @@ def pop(self, key, default=None):
else:
return value
else:
if self.raw:
value = self._static_view.get(key)
else:
value = self._static_view.get(encode(key))
key = self._encode_key(key)
value = self._static_view.get(key)
return decode(value)
else:
return default
Expand All @@ -455,11 +464,8 @@ def __contains__(self, key):
return True
if key in self.delete_buffer_set:
return False

if self.raw:
return self._static_view.get(key) is not None
else:
return self._static_view.get(encode(key)) is not None
key = self._encode_key(key)
return self._static_view.get(key) is not None

def clear(self):
"""
Expand Down Expand Up @@ -616,7 +622,7 @@ def keys(self, decode_raw=True):

return list(lmdb_keys.union(buffer_keys) - delete_buffer_set)

def items(self, decode_raw=True):
def db_dict(self, decode_raw=True):
(
buffer_dict,
buffer_keys,
Expand All @@ -638,6 +644,10 @@ def items(self, decode_raw=True):

self._db_manager.close_static_view(session)

return _db_dict

def items(self, decode_raw=True):
_db_dict = self.db_dict(decode_raw=decode_raw)
return _db_dict.items()

def set_mapsize(self, map_size):
Expand Down Expand Up @@ -688,7 +698,7 @@ def keys(self, decode_raw=True):

return list(db_keys.union(buffer_keys) - delete_buffer_set)

def items(self, decode_raw=True):
def db_dict(self, decode_raw=True):
(
buffer_dict,
buffer_keys,
Expand All @@ -708,6 +718,10 @@ def items(self, decode_raw=True):
_db_dict.update(buffer_dict)

self._db_manager.close_static_view(snapshot)
return _db_dict

def items(self, decode_raw=True):
_db_dict = self.db_dict(decode_raw=decode_raw)
return _db_dict.items()

def stat(self):
Expand Down
12 changes: 7 additions & 5 deletions flaxkv/serve/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

@post(path="/attach")
async def attach(data: AttachRequest) -> dict:
# todo switch `post` to `get`
db = db_manager.get(data.db_name)
if db is None or data.rebuild:
db_manager.set_db(
Expand All @@ -30,6 +31,7 @@ async def attach(data: AttachRequest) -> dict:

@post(path="/detach")
async def detach(data: DetachRequest) -> dict:
# todo switch `post` to `get`
db = db_manager.detach(db_name=data.db_name)
if db is None:
return {"success": False, "info": "db not found"}
Expand Down Expand Up @@ -75,7 +77,7 @@ async def update_raw(db_name: str, request: Request) -> dict:


@post("/get_raw", media_type=MediaType.TEXT)
async def get_raw(db_name: str, request: Request) -> bytes:
async def _get(db_name: str, request: Request) -> bytes:
db = db_manager.get(db_name)
if db is None:
raise ValueError("db not found")
Expand Down Expand Up @@ -124,7 +126,7 @@ async def get_keys(db_name: str) -> dict:

@get("/values", media_type=MediaType.TEXT)
@msg_encoder
async def get_values(db_name: str) -> bytes:
async def _values(db_name: str) -> bytes:
db = db_manager.get(db_name)
if db is None:
return {"success": False, "info": "db not found"}
Expand All @@ -142,7 +144,7 @@ async def get_items(db_name: str) -> bytes:
if db is None:
return {"success": False, "info": "db not found"}
try:
return {"success": True, "data": dict(db.items())}
return {"success": True, "data": db.db_dict()}
except Exception as e:
traceback.print_exc()
return {"success": False, "info": str(e)}
Expand All @@ -158,9 +160,9 @@ def on_shutdown():
detach,
set_raw,
update_raw,
get_raw,
_get,
get_items,
get_values,
_values,
set_value,
contains,
pop,
Expand Down
12 changes: 9 additions & 3 deletions flaxkv/serve/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@

class RemoteDictDB:
def __init__(
self, url: str, db_name: str, rebuild=False, backend="leveldb", timeout=6
self,
url: str,
db_name: str,
rebuild=False,
backend="leveldb",
timeout=6,
**kwargs,
):
self._url = url
self._db_name = db_name
self._client = httpx.Client(timeout=timeout)
self._client = kwargs.pop("client", httpx.Client(timeout=timeout))
self._attach_db(rebuild=rebuild, backend=backend)

def _attach_db(self, rebuild=False, backend="lmdb"):
Expand Down Expand Up @@ -59,7 +65,7 @@ def pop(self, key, default=None):
def _items_dict(self):
url = f"{self._url}/items?db_name={self._db_name}"
response = self._client.get(url)
return decode(response.read())
return decode(response.read())["data"]

def items(self):
return self._items_dict().items()
Expand Down
Loading

0 comments on commit f2617e4

Please sign in to comment.