Skip to content

Commit

Permalink
feat: advanced pq indexer (#87)
Browse files Browse the repository at this point in the history
Co-authored-by: Han Xiao <han.xiao@jina.ai>
Co-authored-by: David Buchaca Prats <davidbuchaca@gmail.com>
  • Loading branch information
3 people authored Feb 2, 2022
1 parent 63573ad commit 2a80abf
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 15 deletions.
4 changes: 4 additions & 0 deletions docarray/array/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def __new__(cls, *args, storage: str = 'memory', **kwargs) -> 'DocumentArrayLike
from .sqlite import DocumentArraySqlite

instance = super().__new__(DocumentArraySqlite)
elif storage == 'pqlite':
from .pqlite import DocumentArrayPqlite

instance = super().__new__(DocumentArrayPqlite)
elif storage == 'weaviate':
from .weaviate import DocumentArrayWeaviate

Expand Down
7 changes: 7 additions & 0 deletions docarray/array/pqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .document import DocumentArray
from .storage.pqlite import StorageMixins


class DocumentArrayPqlite(StorageMixins, DocumentArray):
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
10 changes: 10 additions & 0 deletions docarray/array/storage/pqlite/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .backend import BackendMixin
from .getsetdel import GetSetDelMixin
from .seqlike import SequenceLikeMixin
from abc import ABC

__all__ = ['StorageMixins']


class StorageMixins(BackendMixin, GetSetDelMixin, SequenceLikeMixin, ABC):
...
63 changes: 63 additions & 0 deletions docarray/array/storage/pqlite/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dataclasses import dataclass, asdict
from typing import (
Union,
Dict,
Optional,
TYPE_CHECKING,
)

from ..base.backend import BaseBackendMixin
from ....helper import dataclass_from_dict

if TYPE_CHECKING:
from ....types import (
DocumentArraySourceType,
)


@dataclass
class PqliteConfig:
n_dim: int = 1
metric: str = 'cosine'
serialize_protocol: str = 'pickle'
data_path: Optional[str] = None


class BackendMixin(BaseBackendMixin):
"""Provide necessary functions to enable this storage backend. """

def _init_storage(
self,
docs: Optional['DocumentArraySourceType'] = None,
config: Optional[Union[PqliteConfig, Dict]] = None,
):
if not config:
config = PqliteConfig()
if isinstance(config, dict):
config = dataclass_from_dict(PqliteConfig, config)

self._persist = bool(config.data_path)

if not self._persist:
from tempfile import TemporaryDirectory

config.data_path = TemporaryDirectory().name

self._config = config

from pqlite import PQLite
from .helper import OffsetMapping

config = asdict(config)
n_dim = config.pop('n_dim')

self._pqlite = PQLite(n_dim, **config)
self._offset2ids = OffsetMapping(
name='docarray',
data_path=config['data_path'],
in_memory=False,
)

if docs is not None:
self.clear()
self.extend(docs)
88 changes: 88 additions & 0 deletions docarray/array/storage/pqlite/getsetdel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from typing import (
Sequence,
Iterable,
)
import numpy as np
from ...memory import DocumentArrayInMemory
from ..base.getsetdel import BaseGetSetDelMixin
from .... import Document


class GetSetDelMixin(BaseGetSetDelMixin):
"""Implement required and derived functions that power `getitem`, `setitem`, `delitem`"""

# essential methods start

def _get_doc_by_offset(self, offset: int) -> 'Document':
offset = len(self) + offset if offset < 0 else offset
doc_id = self._offset2ids.get_id_by_offset(offset)
doc = self._pqlite.get_doc_by_id(doc_id) if doc_id else None
if doc is None:
raise IndexError('index out of range')
return doc

def _get_doc_by_id(self, _id: str) -> 'Document':
doc = self._pqlite.get_doc_by_id(_id)
if doc is None:
raise KeyError(f'Can not find Document with id=`{_id}`')
return doc

def _get_docs_by_offsets(self, offsets: Sequence[int]) -> Iterable['Document']:
ids = self._offset2ids.get_ids_by_offsets(offsets)
return self._get_docs_by_ids(ids)

def _get_docs_by_ids(self, ids: str) -> Iterable['Document']:
for _id in ids:
yield self._get_doc_by_id(_id)

def _get_docs_by_slice(self, _slice: slice) -> Iterable['Document']:
return self._get_docs_by_offsets(range(len(self))[_slice])

def _get_docs_by_mask(self, mask: Sequence[bool]):
offsets = [i for i, m in enumerate(mask) if m is True]
return self._get_docs_by_offsets(offsets)

def _set_doc_by_offset(self, offset: int, value: 'Document'):
offset = len(self) + offset if offset < 0 else offset
self._offset2ids.set_at_offset(offset, value.id)
docs = DocumentArrayInMemory([value])
if docs.embeddings is None:
docs.embeddings = np.zeros((1, self._pqlite.dim))
self._pqlite.update(docs)

def _set_doc_by_id(self, _id: str, value: 'Document'):
offset = self._offset2ids.get_offset_by_id(_id)
self._set_doc_by_offset(offset, value)

def _set_doc_value_pairs(
self, docs: Iterable['Document'], values: Iterable['Document']
):
for _d, _v in zip(docs, values):
self._set_doc_by_id(_d.id, _v)

def _del_doc_by_id(self, _id: str):
offset = self._offset2ids.get_offset_by_id(_id)
self._offset2ids.del_at_offset(offset, commit=True)
self._pqlite.delete([_id])

def _del_doc_by_offset(self, offset: int):
offset = len(self) + offset if offset < 0 else offset
_id = self._offset2ids.get_id_by_offset(offset)
self._offset2ids.del_at_offset(offset)
self._pqlite.delete([_id])

def _del_doc_by_offsets(self, offsets: Sequence[int]):
ids = []
for offset in offsets:
ids.append(self._offset2ids.get_id_by_offset(offset))

self._offset2ids.del_at_offsets(offsets)
self._pqlite.delete(ids)

def _del_docs_by_slice(self, _slice: slice):
offsets = range(len(self))[_slice]
self._del_doc_by_offsets(offsets)

def _del_docs_by_mask(self, mask: Sequence[bool]):
offsets = [i for i, m in enumerate(mask) if m is True]
self._del_doc_by_offsets(offsets)
115 changes: 115 additions & 0 deletions docarray/array/storage/pqlite/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from typing import Optional, List, Tuple

from pqlite.storage.table import Table


class OffsetMapping(Table):
def __init__(
self,
name: str = 'offset2ids',
data_path: Optional[str] = None,
in_memory: bool = True,
):
super().__init__(name, data_path, in_memory)
self.create_table()
self._size = None

def create_table(self):
sql = f'''CREATE TABLE IF NOT EXISTS {self.name}
(offset INTEGER NOT NULL PRIMARY KEY,
doc_id INTEGER TEXT NOT NULL)'''

self.execute(sql, commit=True)

def clear(self):
super().clear()
self._size = None

def __len__(self):
return self.size

@property
def size(self):
if self._size is None:
sql = f'SELECT MAX(offset) from {self.name} LIMIT 1;'
result = self._conn.execute(sql).fetchone()
self._size = result[0] + 1 if result[0] else 0

return self._size

def extend_doc_ids(self, doc_ids: List[str], commit: bool = True):
offsets = [self.size + i for i in range(len(doc_ids))]
offset_ids = list(zip(offsets, doc_ids))
self._insert(offset_ids, commit=commit)

def _insert(self, offset_ids: List[Tuple[int, str]], commit: bool = True):
sql = f'INSERT INTO {self.name}(offset, doc_id) VALUES (?, ?);'
self.execute_many(sql, offset_ids, commit=commit)
self._size = self.size + len(offset_ids)

def get_id_by_offset(self, offset: int):
offset = len(self) + offset if offset < 0 else offset
sql = f'SELECT doc_id FROM {self.name} WHERE offset = ? LIMIT 1;'
result = self._conn.execute(sql, (offset,)).fetchone()
return str(result[0]) if result is not None else None

def get_ids_by_offsets(self, offsets: List[int]) -> List[str]:
return [self.get_id_by_offset(offset) for offset in offsets]

def get_offsets_by_ids(self, ids: List[str]) -> List[int]:
return [self.get_offset_by_id(k) for k in ids]

def get_offset_by_id(self, doc_id: str):
sql = f'SELECT offset FROM {self.name} WHERE doc_id=? LIMIT 1;'
result = self._conn.execute(sql, (doc_id,)).fetchone()
return result[0] if result else None

def del_at_offset(self, offset: int, commit: bool = True):
offset = len(self) + offset if offset < 0 else offset
sql = f'DELETE FROM {self.name} WHERE offset=?'
self._conn.execute(sql, (offset,))
self.shift_offset(offset, shift_step=1, direction='left', commit=commit)

self._size -= 1

def del_at_offsets(self, offsets: List[int], commit: bool = True):
for offset in sorted(offsets, reverse=True):
self.del_at_offset(offset, commit=False)
if commit:
self.commit()

def insert_at_offset(self, offset: int, doc_id: str, commit: bool = True):
offset = len(self) + offset if offset < 0 else offset
self.shift_offset(offset - 1, shift_step=1, direction='right', commit=False)
self._insert([(offset, doc_id)], commit=commit)

def set_at_offset(self, offset: int, doc_id: str, commit: bool = True):
offset = len(self) + offset if offset < 0 else offset
sql = f'UPDATE {self.name} SET doc_id=? WHERE offset = ?'
self._conn.execute(
sql,
(
doc_id,
offset,
),
)
if commit:
self.commit()

def shift_offset(
self,
shift_from: int,
shift_step: int = 1,
direction: str = 'left',
commit: bool = True,
):
if direction == 'left':
sql = f'UPDATE {self.name} SET offset=offset-{shift_step} WHERE offset > ?'
elif direction == 'right':
sql = f'UPDATE {self.name} SET offset=offset+{shift_step} WHERE offset > ?'
else:
raise ValueError(f'The shit_offset directory {direction} is not supported!')

self._conn.execute(sql, (shift_from,))
if commit:
self._conn.commit()
80 changes: 80 additions & 0 deletions docarray/array/storage/pqlite/seqlike.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Iterator, Union, Iterable, Sequence, MutableSequence
import numpy as np
from .... import Document
from ...memory import DocumentArrayInMemory


class SequenceLikeMixin(MutableSequence[Document]):
"""Implement sequence-like methods"""

def insert(self, index: int, value: 'Document'):
"""Insert `doc` at `index`.
:param index: Position of the insertion.
:param value: The doc needs to be inserted.
"""
if value.embedding is None:
value.embedding = np.zeros(self._pqlite.dim, dtype=np.float32)

self._pqlite.index(DocumentArrayInMemory([value]))
self._offset2ids.insert_at_offset(index, value.id)

def append(self, value: 'Document') -> None:
self._pqlite.index(DocumentArrayInMemory([value]))
self._offset2ids.extend_doc_ids([value.id])

def extend(self, values: Iterable['Document']) -> None:
docs = DocumentArrayInMemory(values)
for doc in docs:
if doc.embedding is None:
doc.embedding = np.zeros(self._pqlite.dim, dtype=np.float32)
self._pqlite.index(docs)
self._offset2ids.extend_doc_ids([doc.id for doc in docs])

def clear(self):
"""Clear the data of :class:`DocumentArray`"""
self._offset2ids.clear()
self._pqlite.clear()

def __del__(self) -> None:
if not self._persist:
self._offset2ids.clear()
self._pqlite.clear()

def __eq__(self, other):
"""In pqlite backend, data are considered as identical if configs point to the same database source"""
return (
type(self) is type(other)
and type(self._config) is type(other._config)
and self._config == other._config
)

def __len__(self):
return self._offset2ids.size

def __iter__(self) -> Iterator['Document']:
for i in range(len(self)):
yield self[i]

def __contains__(self, x: Union[str, 'Document']):
if isinstance(x, str):
return self._offset2ids.get_offset_by_id(x) is not None
elif isinstance(x, Document):
return self._offset2ids.get_offset_by_id(x.id) is not None
else:
return False

def __bool__(self):
"""To simulate ```l = []; if l: ...```
:return: returns true if the length of the array is larger than 0
"""
return len(self) > 0

def __repr__(self):
return f'<DocumentArray[PQLite] (length={len(self)}) at {id(self)}>'

def __add__(self, other: Union['Document', Sequence['Document']]):
v = type(self)(self)
v.extend(other)
return v
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
'fastapi',
'uvicorn',
'weaviate-client~=3.3.0',
'pqlite>=0.2.1',
],
'test': [
'pytest',
Expand Down
Loading

0 comments on commit 2a80abf

Please sign in to comment.