-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
107 additions
and
2 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
from ..proto import BasePbIndexer | ||
import json | ||
from typing import Tuple, List, Union | ||
from ..proto import jina_pb2 | ||
from google.protobuf.json_format import Parse | ||
|
||
|
||
class LeveldbIndexer(BasePbIndexer): | ||
""" | ||
:class:`LeveldbIndexer` use `LevelDB` to save and query protobuf chunk/document. | ||
""" | ||
|
||
def _get_db_handler(self, create_if_missing): | ||
import plyvel | ||
return plyvel.DB(self.index_abspath, create_if_missing=create_if_missing) | ||
|
||
def get_add_handler(self): | ||
"""Get the database handler | ||
""" | ||
return self._get_db_handler(create_if_missing=True) | ||
|
||
def add(self, objs): | ||
"""Add a JSON-friendly object to the indexer | ||
:param objs: objects can be serialized into JSON format | ||
""" | ||
with self.write_handler.write_batch() as h: | ||
for k, obj in objs.items(): | ||
key = k.encode('utf8') | ||
value = json.dumps(obj).encode('utf8') | ||
h.put(key, value) | ||
|
||
def get_query_handler(self): | ||
"""Get the database handler | ||
""" | ||
return self._get_db_handler(create_if_missing=False) | ||
|
||
def query(self, key: str, *args, **kwargs) -> Union['jina_pb2.Chunk', 'jina_pb2.Document']: | ||
"""Find the protobuf chunk/doc using id | ||
:param key: ``chunk_id`` or ``doc_id`` | ||
:return: protobuf chunk or protobuf document | ||
""" | ||
v = self.query_handler.get(key.encode('utf8')) | ||
value = None | ||
if v is not None: | ||
_parser = jina_pb2.Chunk if key[0] == 'c' else jina_pb2.Document | ||
value = Parse(json.loads(v.decode('utf8')), _parser()) | ||
return value | ||
|
||
def close(self): | ||
"""Close the database handler | ||
""" | ||
super().close() | ||
self.write_handler.close() | ||
|
||
|
||
class ChunkLeveldbIndexer(LeveldbIndexer): | ||
"""""" | ||
|
||
|
||
class DocLeveldbIndexer(LeveldbIndexer): | ||
"""""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import unittest | ||
import os | ||
from tests import JinaTestCase | ||
from jina.executors.indexers.keyvalue.leveldb import LeveldbIndexer | ||
import jina.proto.jina_pb2 as jina_pb2 | ||
from jina.executors.indexers import BaseIndexer | ||
from google.protobuf.json_format import MessageToJson | ||
|
||
|
||
class MyTestCase(JinaTestCase): | ||
def _create_Document(self, doc_id, text, weight, length): | ||
d = jina_pb2.Document() | ||
d.doc_id = doc_id | ||
d.raw_bytes = text.encode('utf8') | ||
d.weight = weight | ||
d.length = length | ||
return d | ||
|
||
def test_add_query(self): | ||
indexer = LeveldbIndexer(index_filename='leveldb.db') | ||
data = { | ||
'd1': MessageToJson(self._create_Document(1, 'cat', 0.1, 3)), | ||
'd2': MessageToJson(self._create_Document(2, 'dog', 0.2, 3)), | ||
'd3': MessageToJson(self._create_Document(3, 'bird', 0.3, 3)), | ||
} | ||
indexer.add(data) | ||
indexer.save() | ||
indexer.close() | ||
self.assertTrue(os.path.exists(indexer.index_abspath)) | ||
|
||
searcher = BaseIndexer.load(indexer.save_abspath) | ||
doc = searcher.query('d2') | ||
self.assertEqual(doc.doc_id, 2) | ||
self.assertEqual(doc.length, 3) | ||
self.add_tmpfile(indexer.save_abspath, indexer.index_abspath) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |