diff --git a/docarray/array/document.py b/docarray/array/document.py index 826e8ec2bf8..fd920fbb5f8 100644 --- a/docarray/array/document.py +++ b/docarray/array/document.py @@ -41,6 +41,10 @@ def __new__(cls, *args, storage: str = 'memory', **kwargs) -> 'DocumentArrayLike from .sqlite import DocumentArraySqlite instance = super().__new__(DocumentArraySqlite) + elif storage == 'pqlite': + from .pqlite import DocumentArrayPqlite + + instance = super().__new__(DocumentArrayPqlite) elif storage == 'weaviate': from .weaviate import DocumentArrayWeaviate diff --git a/docarray/array/pqlite.py b/docarray/array/pqlite.py new file mode 100644 index 00000000000..1cd3ef5c2da --- /dev/null +++ b/docarray/array/pqlite.py @@ -0,0 +1,7 @@ +from .document import DocumentArray +from .storage.pqlite import StorageMixins + + +class DocumentArrayPqlite(StorageMixins, DocumentArray): + def __new__(cls, *args, **kwargs): + return super().__new__(cls) diff --git a/docarray/array/storage/pqlite/__init__.py b/docarray/array/storage/pqlite/__init__.py new file mode 100644 index 00000000000..f07b096a031 --- /dev/null +++ b/docarray/array/storage/pqlite/__init__.py @@ -0,0 +1,10 @@ +from .backend import BackendMixin +from .getsetdel import GetSetDelMixin +from .seqlike import SequenceLikeMixin +from abc import ABC + +__all__ = ['StorageMixins'] + + +class StorageMixins(BackendMixin, GetSetDelMixin, SequenceLikeMixin, ABC): + ... diff --git a/docarray/array/storage/pqlite/backend.py b/docarray/array/storage/pqlite/backend.py new file mode 100644 index 00000000000..461032b1c7d --- /dev/null +++ b/docarray/array/storage/pqlite/backend.py @@ -0,0 +1,63 @@ +from dataclasses import dataclass, asdict +from typing import ( + Union, + Dict, + Optional, + TYPE_CHECKING, +) + +from ..base.backend import BaseBackendMixin +from ....helper import dataclass_from_dict + +if TYPE_CHECKING: + from ....types import ( + DocumentArraySourceType, + ) + + +@dataclass +class PqliteConfig: + n_dim: int = 1 + metric: str = 'cosine' + serialize_protocol: str = 'pickle' + data_path: Optional[str] = None + + +class BackendMixin(BaseBackendMixin): + """Provide necessary functions to enable this storage backend. """ + + def _init_storage( + self, + docs: Optional['DocumentArraySourceType'] = None, + config: Optional[Union[PqliteConfig, Dict]] = None, + ): + if not config: + config = PqliteConfig() + if isinstance(config, dict): + config = dataclass_from_dict(PqliteConfig, config) + + self._persist = bool(config.data_path) + + if not self._persist: + from tempfile import TemporaryDirectory + + config.data_path = TemporaryDirectory().name + + self._config = config + + from pqlite import PQLite + from .helper import OffsetMapping + + config = asdict(config) + n_dim = config.pop('n_dim') + + self._pqlite = PQLite(n_dim, **config) + self._offset2ids = OffsetMapping( + name='docarray', + data_path=config['data_path'], + in_memory=False, + ) + + if docs is not None: + self.clear() + self.extend(docs) diff --git a/docarray/array/storage/pqlite/getsetdel.py b/docarray/array/storage/pqlite/getsetdel.py new file mode 100644 index 00000000000..bed10a3e56d --- /dev/null +++ b/docarray/array/storage/pqlite/getsetdel.py @@ -0,0 +1,88 @@ +from typing import ( + Sequence, + Iterable, +) +import numpy as np +from ...memory import DocumentArrayInMemory +from ..base.getsetdel import BaseGetSetDelMixin +from .... import Document + + +class GetSetDelMixin(BaseGetSetDelMixin): + """Implement required and derived functions that power `getitem`, `setitem`, `delitem`""" + + # essential methods start + + def _get_doc_by_offset(self, offset: int) -> 'Document': + offset = len(self) + offset if offset < 0 else offset + doc_id = self._offset2ids.get_id_by_offset(offset) + doc = self._pqlite.get_doc_by_id(doc_id) if doc_id else None + if doc is None: + raise IndexError('index out of range') + return doc + + def _get_doc_by_id(self, _id: str) -> 'Document': + doc = self._pqlite.get_doc_by_id(_id) + if doc is None: + raise KeyError(f'Can not find Document with id=`{_id}`') + return doc + + def _get_docs_by_offsets(self, offsets: Sequence[int]) -> Iterable['Document']: + ids = self._offset2ids.get_ids_by_offsets(offsets) + return self._get_docs_by_ids(ids) + + def _get_docs_by_ids(self, ids: str) -> Iterable['Document']: + for _id in ids: + yield self._get_doc_by_id(_id) + + def _get_docs_by_slice(self, _slice: slice) -> Iterable['Document']: + return self._get_docs_by_offsets(range(len(self))[_slice]) + + def _get_docs_by_mask(self, mask: Sequence[bool]): + offsets = [i for i, m in enumerate(mask) if m is True] + return self._get_docs_by_offsets(offsets) + + def _set_doc_by_offset(self, offset: int, value: 'Document'): + offset = len(self) + offset if offset < 0 else offset + self._offset2ids.set_at_offset(offset, value.id) + docs = DocumentArrayInMemory([value]) + if docs.embeddings is None: + docs.embeddings = np.zeros((1, self._pqlite.dim)) + self._pqlite.update(docs) + + def _set_doc_by_id(self, _id: str, value: 'Document'): + offset = self._offset2ids.get_offset_by_id(_id) + self._set_doc_by_offset(offset, value) + + def _set_doc_value_pairs( + self, docs: Iterable['Document'], values: Iterable['Document'] + ): + for _d, _v in zip(docs, values): + self._set_doc_by_id(_d.id, _v) + + def _del_doc_by_id(self, _id: str): + offset = self._offset2ids.get_offset_by_id(_id) + self._offset2ids.del_at_offset(offset, commit=True) + self._pqlite.delete([_id]) + + def _del_doc_by_offset(self, offset: int): + offset = len(self) + offset if offset < 0 else offset + _id = self._offset2ids.get_id_by_offset(offset) + self._offset2ids.del_at_offset(offset) + self._pqlite.delete([_id]) + + def _del_doc_by_offsets(self, offsets: Sequence[int]): + ids = [] + for offset in offsets: + ids.append(self._offset2ids.get_id_by_offset(offset)) + + self._offset2ids.del_at_offsets(offsets) + self._pqlite.delete(ids) + + def _del_docs_by_slice(self, _slice: slice): + offsets = range(len(self))[_slice] + self._del_doc_by_offsets(offsets) + + def _del_docs_by_mask(self, mask: Sequence[bool]): + offsets = [i for i, m in enumerate(mask) if m is True] + self._del_doc_by_offsets(offsets) diff --git a/docarray/array/storage/pqlite/helper.py b/docarray/array/storage/pqlite/helper.py new file mode 100644 index 00000000000..e3658eea246 --- /dev/null +++ b/docarray/array/storage/pqlite/helper.py @@ -0,0 +1,115 @@ +from typing import Optional, List, Tuple + +from pqlite.storage.table import Table + + +class OffsetMapping(Table): + def __init__( + self, + name: str = 'offset2ids', + data_path: Optional[str] = None, + in_memory: bool = True, + ): + super().__init__(name, data_path, in_memory) + self.create_table() + self._size = None + + def create_table(self): + sql = f'''CREATE TABLE IF NOT EXISTS {self.name} + (offset INTEGER NOT NULL PRIMARY KEY, + doc_id INTEGER TEXT NOT NULL)''' + + self.execute(sql, commit=True) + + def clear(self): + super().clear() + self._size = None + + def __len__(self): + return self.size + + @property + def size(self): + if self._size is None: + sql = f'SELECT MAX(offset) from {self.name} LIMIT 1;' + result = self._conn.execute(sql).fetchone() + self._size = result[0] + 1 if result[0] else 0 + + return self._size + + def extend_doc_ids(self, doc_ids: List[str], commit: bool = True): + offsets = [self.size + i for i in range(len(doc_ids))] + offset_ids = list(zip(offsets, doc_ids)) + self._insert(offset_ids, commit=commit) + + def _insert(self, offset_ids: List[Tuple[int, str]], commit: bool = True): + sql = f'INSERT INTO {self.name}(offset, doc_id) VALUES (?, ?);' + self.execute_many(sql, offset_ids, commit=commit) + self._size = self.size + len(offset_ids) + + def get_id_by_offset(self, offset: int): + offset = len(self) + offset if offset < 0 else offset + sql = f'SELECT doc_id FROM {self.name} WHERE offset = ? LIMIT 1;' + result = self._conn.execute(sql, (offset,)).fetchone() + return str(result[0]) if result is not None else None + + def get_ids_by_offsets(self, offsets: List[int]) -> List[str]: + return [self.get_id_by_offset(offset) for offset in offsets] + + def get_offsets_by_ids(self, ids: List[str]) -> List[int]: + return [self.get_offset_by_id(k) for k in ids] + + def get_offset_by_id(self, doc_id: str): + sql = f'SELECT offset FROM {self.name} WHERE doc_id=? LIMIT 1;' + result = self._conn.execute(sql, (doc_id,)).fetchone() + return result[0] if result else None + + def del_at_offset(self, offset: int, commit: bool = True): + offset = len(self) + offset if offset < 0 else offset + sql = f'DELETE FROM {self.name} WHERE offset=?' + self._conn.execute(sql, (offset,)) + self.shift_offset(offset, shift_step=1, direction='left', commit=commit) + + self._size -= 1 + + def del_at_offsets(self, offsets: List[int], commit: bool = True): + for offset in sorted(offsets, reverse=True): + self.del_at_offset(offset, commit=False) + if commit: + self.commit() + + def insert_at_offset(self, offset: int, doc_id: str, commit: bool = True): + offset = len(self) + offset if offset < 0 else offset + self.shift_offset(offset - 1, shift_step=1, direction='right', commit=False) + self._insert([(offset, doc_id)], commit=commit) + + def set_at_offset(self, offset: int, doc_id: str, commit: bool = True): + offset = len(self) + offset if offset < 0 else offset + sql = f'UPDATE {self.name} SET doc_id=? WHERE offset = ?' + self._conn.execute( + sql, + ( + doc_id, + offset, + ), + ) + if commit: + self.commit() + + def shift_offset( + self, + shift_from: int, + shift_step: int = 1, + direction: str = 'left', + commit: bool = True, + ): + if direction == 'left': + sql = f'UPDATE {self.name} SET offset=offset-{shift_step} WHERE offset > ?' + elif direction == 'right': + sql = f'UPDATE {self.name} SET offset=offset+{shift_step} WHERE offset > ?' + else: + raise ValueError(f'The shit_offset directory {direction} is not supported!') + + self._conn.execute(sql, (shift_from,)) + if commit: + self._conn.commit() diff --git a/docarray/array/storage/pqlite/seqlike.py b/docarray/array/storage/pqlite/seqlike.py new file mode 100644 index 00000000000..17f80d13606 --- /dev/null +++ b/docarray/array/storage/pqlite/seqlike.py @@ -0,0 +1,80 @@ +from typing import Iterator, Union, Iterable, Sequence, MutableSequence +import numpy as np +from .... import Document +from ...memory import DocumentArrayInMemory + + +class SequenceLikeMixin(MutableSequence[Document]): + """Implement sequence-like methods""" + + def insert(self, index: int, value: 'Document'): + """Insert `doc` at `index`. + + :param index: Position of the insertion. + :param value: The doc needs to be inserted. + """ + if value.embedding is None: + value.embedding = np.zeros(self._pqlite.dim, dtype=np.float32) + + self._pqlite.index(DocumentArrayInMemory([value])) + self._offset2ids.insert_at_offset(index, value.id) + + def append(self, value: 'Document') -> None: + self._pqlite.index(DocumentArrayInMemory([value])) + self._offset2ids.extend_doc_ids([value.id]) + + def extend(self, values: Iterable['Document']) -> None: + docs = DocumentArrayInMemory(values) + for doc in docs: + if doc.embedding is None: + doc.embedding = np.zeros(self._pqlite.dim, dtype=np.float32) + self._pqlite.index(docs) + self._offset2ids.extend_doc_ids([doc.id for doc in docs]) + + def clear(self): + """Clear the data of :class:`DocumentArray`""" + self._offset2ids.clear() + self._pqlite.clear() + + def __del__(self) -> None: + if not self._persist: + self._offset2ids.clear() + self._pqlite.clear() + + def __eq__(self, other): + """In pqlite backend, data are considered as identical if configs point to the same database source""" + return ( + type(self) is type(other) + and type(self._config) is type(other._config) + and self._config == other._config + ) + + def __len__(self): + return self._offset2ids.size + + def __iter__(self) -> Iterator['Document']: + for i in range(len(self)): + yield self[i] + + def __contains__(self, x: Union[str, 'Document']): + if isinstance(x, str): + return self._offset2ids.get_offset_by_id(x) is not None + elif isinstance(x, Document): + return self._offset2ids.get_offset_by_id(x.id) is not None + else: + return False + + def __bool__(self): + """To simulate ```l = []; if l: ...``` + + :return: returns true if the length of the array is larger than 0 + """ + return len(self) > 0 + + def __repr__(self): + return f'' + + def __add__(self, other: Union['Document', Sequence['Document']]): + v = type(self)(self) + v.extend(other) + return v diff --git a/setup.py b/setup.py index b645f8bd90c..7083bad4da0 100644 --- a/setup.py +++ b/setup.py @@ -54,6 +54,7 @@ 'fastapi', 'uvicorn', 'weaviate-client~=3.3.0', + 'pqlite>=0.2.1', ], 'test': [ 'pytest', diff --git a/tests/unit/array/test_advance_indexing.py b/tests/unit/array/test_advance_indexing.py index b3288fc9c3b..79d39e606e6 100644 --- a/tests/unit/array/test_advance_indexing.py +++ b/tests/unit/array/test_advance_indexing.py @@ -14,7 +14,7 @@ def indices(): yield (i for i in [-2, 0, 2]) -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_getter_int_str(docs, storage, start_weaviate): docs = DocumentArray(docs, storage=storage) # getter @@ -34,7 +34,7 @@ def test_getter_int_str(docs, storage, start_weaviate): docs['adsad'] -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_setter_int_str(docs, storage, start_weaviate): docs = DocumentArray(docs, storage=storage) # setter @@ -50,7 +50,7 @@ def test_setter_int_str(docs, storage, start_weaviate): assert docs[docs[2].id].text == 'doc2' -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_del_int_str(docs, storage, indices): docs = DocumentArray(docs, storage=storage) initial_len = len(docs) @@ -71,7 +71,7 @@ def test_del_int_str(docs, storage, indices): assert new_doc_zero not in docs -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_slice(docs, storage, start_weaviate): docs = DocumentArray(docs, storage=storage) # getter @@ -96,7 +96,7 @@ def test_slice(docs, storage, start_weaviate): assert twenty_doc in docs -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_sequence_bool_index(docs, storage, start_weaviate): docs = DocumentArray(docs, storage=storage) # getter @@ -129,7 +129,7 @@ def test_sequence_bool_index(docs, storage, start_weaviate): @pytest.mark.parametrize('nparray', [lambda x: x, np.array, tuple]) -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_sequence_int(docs, nparray, storage, start_weaviate): docs = DocumentArray(docs, storage=storage) # getter @@ -152,7 +152,7 @@ def test_sequence_int(docs, nparray, storage, start_weaviate): assert docs[9].text == 'new' -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_sequence_str(docs, storage, start_weaviate): docs = DocumentArray(docs, storage=storage) # getter @@ -173,14 +173,14 @@ def test_sequence_str(docs, storage, start_weaviate): assert len(docs) == 100 - len(idx) -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_docarray_list_tuple(docs, storage, start_weaviate): docs = DocumentArray(docs, storage=storage) assert isinstance(docs[99, 98], DocumentArray) assert len(docs[99, 98]) == 2 -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_path_syntax_indexing(storage, start_weaviate): da = DocumentArray.empty(3) for d in da: @@ -278,7 +278,7 @@ def test_path_syntax_indexing_set(storage, start_weaviate): @pytest.mark.parametrize('size', [1, 5]) -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_attribute_indexing(storage, start_weaviate, size): da = DocumentArray(storage=storage) da.extend(DocumentArray.empty(size)) @@ -303,7 +303,7 @@ def test_attribute_indexing(storage, start_weaviate, size): assert vv -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_tensor_attribute_selector(storage): import scipy.sparse @@ -311,7 +311,11 @@ def test_tensor_attribute_selector(storage): sp_embed[sp_embed > 0.1] = 0 sp_embed = scipy.sparse.coo_matrix(sp_embed) - da = DocumentArray(storage=storage) + if storage == 'pqlite': + da = DocumentArray(storage=storage, config={'n_dim': 10}) + else: + da = DocumentArray(storage=storage) + da.extend(DocumentArray.empty(3)) da[:, 'embedding'] = sp_embed @@ -333,9 +337,13 @@ def test_tensor_attribute_selector(storage): # TODO: since match function is not implemented, this test will # not work with weaviate storage atm, will be addressed in # next version -@pytest.mark.parametrize('storage', ['memory', 'sqlite']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'pqlite']) def test_advance_selector_mixed(storage): + da = DocumentArray(storage=storage) + if storage == 'pqlite': + da = DocumentArray(storage=storage, config={'n_dim': 3}) + da.extend(DocumentArray.empty(10)) da.embeddings = np.random.random([10, 3]) @@ -345,7 +353,7 @@ def test_advance_selector_mixed(storage): assert len(da[:, ('id', 'embedding', 'matches')][0]) == 10 -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_single_boolean_and_padding(storage, start_weaviate): da = DocumentArray(storage=storage) da.extend(DocumentArray.empty(3)) @@ -364,7 +372,7 @@ def test_single_boolean_and_padding(storage, start_weaviate): assert len(da[True, False, False]) == 1 -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate']) +@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'weaviate', 'pqlite']) def test_edge_case_two_strings(storage, start_weaviate): # getitem da = DocumentArray( diff --git a/tests/unit/array/test_pqlite_indexing.py b/tests/unit/array/test_pqlite_indexing.py new file mode 100644 index 00000000000..bdd01798ef1 --- /dev/null +++ b/tests/unit/array/test_pqlite_indexing.py @@ -0,0 +1,14 @@ +import numpy as np +import pytest + +from docarray import DocumentArray, Document + + +@pytest.fixture +def docs(): + yield (Document(text=j) for j in range(100)) + + +@pytest.fixture +def indices(): + yield (i for i in [-2, 0, 2])