Skip to content

Commit

Permalink
feat: compatible with Python 3.8 (#17)
Browse files Browse the repository at this point in the history
* Compatible with Python 3.8

* chore update
  • Loading branch information
KenyonY authored Dec 22, 2023
1 parent 4f32a4c commit 886cf49
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN apk update && \
COPY . /home/flaxkv
WORKDIR /home/flaxkv
#musl-dev
RUN apk add patch g++ libstdc++ linux-headers leveldb-dev --no-cache && pip install -e . --no-cache-dir && apk del g++ gcc && rm -rf /var/cache/apk/*
RUN apk add patch g++ libstdc++ linux-headers leveldb-dev --no-cache && pip install -e .[server] --no-cache-dir && apk del g++ gcc && rm -rf /var/cache/apk/*

EXPOSE 8000
ENTRYPOINT ["python", "-m", "flaxkv.__main__", "run"]
27 changes: 27 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
.PHONY: start build exec down log rm

image := "beidongjiedeguang/flaxkv:latest"
container := "flaxkv-container"
compose_path := "docker-compose.yaml"


start:
@docker run -d \
--restart=unless-stopped \
--name $(container) \
-p 8000:8000 \
$(image) --port=8000
@make log


exec:
docker exec -it $(container) sh

log:
docker logs -f $(container)

rm:
docker rm -f $(container)

build:
docker build --tag $(image) .
16 changes: 10 additions & 6 deletions flaxkv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from __future__ import annotations

import re

from .core import LevelDBDict, LMDBDict, RemoteDBDict

__version__ = "0.2.0"
__version__ = "0.2.1-alpha"

__all__ = [
"dictdb",
"FlaxKV",
"dbdict",
"dictdb",
"LMDBDict",
"LevelDBDict",
"RemoteDBDict",
Expand All @@ -30,14 +33,14 @@
url_pattern = re.compile(r'^(http://|https://|ftp://)')


def dictdb(
def FlaxKV(
db_name: str,
root_path_or_url: str = ".",
backend='lmdb',
rebuild=False,
raw=False,
**kwargs,
):
) -> LMDBDict | LevelDBDict | RemoteDBDict:
if url_pattern.match(root_path_or_url):
return RemoteDBDict(
root_path_or_url=root_path_or_url,
Expand Down Expand Up @@ -70,4 +73,5 @@ def dictdb(
raise ValueError(f"Unsupported DB type {backend}.")


dbdict = dictdb
dbdict = FlaxKV
dictdb = FlaxKV
36 changes: 36 additions & 0 deletions flaxkv/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import atexit
import threading
import traceback
Expand Down Expand Up @@ -92,6 +94,7 @@ def __init__(
self.raw = raw
self._static_view = self._db_manager.new_static_view()

self._cache_dict = {}
self.buffer_dict = {}
self.delete_buffer_set = set()

Expand Down Expand Up @@ -275,6 +278,7 @@ def get_batch(self, keys):
if key in self.buffer_dict:
values.append(self.buffer_dict[key])
continue

key = self._encode_key(key)
value = self._static_view.get(key)
if value is not None:
Expand Down Expand Up @@ -873,3 +877,35 @@ def stat(self):
'db': db_count,
'marked_delete': len(self.delete_buffer_set),
}

def pull(self):
(
buffer_dict,
buffer_keys,
buffer_values,
delete_buffer_set,
view,
) = self._get_status_info(return_buffer_dict=True, decode_raw=True)
self._cache_dict = {}
response: Response = view.client.get(f"/dict?db_name={self._db_name}")
if not response.is_success:
raise ValueError(
f"Failed to get items from remote db: {decode(response.read())}"
)
remote_db_dict = decode(response.read())
for dk, dv in remote_db_dict.items():
if dk not in delete_buffer_set:
self._cache_dict[dk] = dv

self._cache_dict.update(buffer_dict)

def get_with_cache(self, key, default=None):

with self._buffer_lock:
if key in self.delete_buffer_set:
return default

if key in self.buffer_dict:
return self.buffer_dict[key]

return self._cache_dict.get(key, default)
56 changes: 44 additions & 12 deletions flaxkv/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

import os
import re
import shutil
import traceback
from pathlib import Path

from loguru import logger

Expand All @@ -35,16 +40,24 @@ def __init__(
rebuild (bool, optional): Whether to create a new database. Defaults to False.
"""
self.db_type = db_type.lower()

self.db_root = root_path_or_url
self.db_name = db_name
self.db_path = os.path.join(root_path_or_url, f"{db_name}-{self.db_type}")
self._rebuild = rebuild
if rebuild:
if db_type == "remote":
...
else:

url_pattern = re.compile(r'^(http://|https://|ftp://)')
if url_pattern.match(root_path_or_url):
self.db_address = root_path_or_url
else:
self.db_address = os.path.join(
root_path_or_url, f"{db_name}-{self.db_type}"
)

root_path = Path(root_path_or_url)
if not root_path.exists():
root_path.mkdir(parents=True, exist_ok=True)

if rebuild:
self.destroy()

self.env = self.connect(**kwargs)

def connect(self, **kwargs):
Expand All @@ -58,19 +71,19 @@ def connect(self, **kwargs):
import lmdb

env = lmdb.open(
self.db_path,
self.db_address,
max_dbs=kwargs.get('max_dbs', 1),
map_size=kwargs.get('map_size', 2 * 1024**3),
)

elif self.db_type == "leveldb":
import plyvel

env = plyvel.DB(self.db_path, create_if_missing=True)
env = plyvel.DB(self.db_address, create_if_missing=True)

elif self.db_type == "remote":
env = RemoteTransaction(
base_url=self.db_root,
base_url=self.db_address,
db_name=self.db_name,
backend=kwargs.pop("backend", "lmdb"),
rebuild=self._rebuild,
Expand All @@ -85,7 +98,7 @@ def rmtree(self):
"""
Deletes the database at the specified path.
"""
shutil.rmtree(self.db_path, ignore_errors=True)
shutil.rmtree(self.db_address, ignore_errors=True)

def destroy(self):
"""
Expand All @@ -96,7 +109,7 @@ def destroy(self):
except:
pass
self.rmtree()
logger.info(f"Destroyed database at {self.db_path}.")
logger.info(f"Destroyed database at {self.db_address}.")

def rebuild_db(self):
"""
Expand Down Expand Up @@ -167,6 +180,17 @@ def write(self):
traceback.print_exc()
raise ValueError(f"Unsupported DB type {self.db_type}.")

# def pull(self):
# if self.db_type == "lmdb":
# ...
# elif self.db_type == "leveldb":
# ...
# elif self.db_type == "remote":
# ...
# else:
# traceback.print_exc()
# raise ValueError(f"Unsupported DB type {self.db_type}.")

def close(self):
"""
Closes the database connection.
Expand Down Expand Up @@ -268,6 +292,14 @@ def get(self, key: bytes, default=None):
return default
return raw_data

# def get_batch(self, keys: list[bytes]):
# url = f"/get_batch?db_name={self.db_name}"
# response = self.client.post(url, content=encode({"keys": keys}))
# if not response.is_success:
# raise RuntimeError
# raw_data = response.read()
# return decode(raw_data) # non bytes

def __enter__(self):
return self

Expand Down
43 changes: 33 additions & 10 deletions flaxkv/serve/app.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
# Copyright (c) 2023 K.Y. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

import traceback

import msgspec
from litestar import Litestar, MediaType, Request, get, post, status_codes
from litestar.exceptions import HTTPException
from litestar.openapi import OpenAPIConfig
from litestar.response import ServerSentEvent, Stream

from .. import __version__
from ..decorators import msg_encoder
from ..pack import encode
from .interface import (
AttachRequest,
DetachRequest,
StructDeleteBatchData,
StructGetBatchData,
StructSetBatchData,
StructSetData,
)
Expand Down Expand Up @@ -83,11 +100,11 @@ async def _set_batch(db_name: str, request: Request) -> None:
data = await request.body()
try:
data = msgspec.msgpack.decode(data, type=StructSetBatchData)
for key, value in data.data.items():
db[key] = value
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
for key, value in data.data.items():
db[key] = value


@post("/get", media_type=MediaType.TEXT)
Expand All @@ -100,6 +117,15 @@ async def _get(db_name: str, request: Request) -> bytes:
return value


@post("/get_batch", media_type=MediaType.TEXT)
async def _get_batch(db_name: str, request: Request) -> bytes:
db = get_db(db_name)
data = await request.body()
data = msgspec.msgpack.decode(data, type=StructGetBatchData)
values = db.get_batch(data.keys)
return encode(values)


@post("/delete")
async def _delete(db_name: str, request: Request) -> None:
db = get_db(db_name)
Expand All @@ -125,33 +151,30 @@ async def _delete_batch(db_name: str, request: Request) -> None:


@get("/keys", media_type=MediaType.TEXT)
@msg_encoder
async def _keys(db_name: str) -> bytes:
db = get_db(db_name)
try:
return db.keys()
return encode(db.keys())
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))


@get("/dict", media_type=MediaType.TEXT)
@msg_encoder
async def _items(db_name: str) -> bytes:
db = get_db(db_name)
try:
return db.db_dict()
return encode(db.db_dict())
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))


@get("/stat", media_type=MediaType.TEXT)
@msg_encoder
async def _stat(db_name: str) -> bytes:
db = get_db(db_name)
try:
return db.stat()
return encode(db.stat())
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
Expand Down
13 changes: 10 additions & 3 deletions flaxkv/serve/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

from dataclasses import dataclass
from typing import Annotated, Any
from typing import Dict, List

import msgspec
from litestar.enums import RequestEncodingType
Expand All @@ -38,8 +41,12 @@ class StructSetData(msgspec.Struct):


class StructSetBatchData(msgspec.Struct):
data: dict[bytes, bytes]
data: Dict[bytes, bytes]


class StructGetBatchData(msgspec.Struct):
keys: List[bytes]


class StructDeleteBatchData(msgspec.Struct):
keys: list[bytes]
keys: List[bytes]
Loading

0 comments on commit 886cf49

Please sign in to comment.