Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mvdb/support record duplication #325

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
175 changes: 148 additions & 27 deletions algoliasearch_django/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
from functools import partial
from itertools import chain
import logging
import types

import sys
from algoliasearch.exceptions import AlgoliaException
from django.db.models.query_utils import DeferredAttribute
try:
from django.utils.inspect import func_supports_parameter, func_accepts_kwargs
except ImportError: # Django 1.7
from .utils import func_supports_parameter, func_accepts_kwargs

from .settings import DEBUG

Expand Down Expand Up @@ -61,6 +66,9 @@ class AlgoliaIndex(object):
# Use to specify the settings of the index.
settings = None

# User to specify a duplication method
duplication_method = None

# Used to specify if the instance should be indexed.
# The attribute should be either:
# - a callable that returns a boolean.
Expand All @@ -71,6 +79,8 @@ class AlgoliaIndex(object):
# Name of the attribute to check on instances if should_index is not a callable
_should_index_is_method = False

DEFAULT_BATCH_SIZE = 1000

def __init__(self, model, client, settings):
"""Initializes the index."""
self.__init_index(client, model, settings)
Expand Down Expand Up @@ -145,6 +155,23 @@ def __init__(self, model, client, settings):
if self.geo_field:
self.geo_field = check_and_get_attr(model, self.geo_field)

# Check duplication_method
if self.duplication_method:
self.duplication_method = check_and_get_attr(model, self.duplication_method)
if (
not func_accepts_kwargs(self.duplication_method) and not (
func_supports_parameter(self.duplication_method, 'raw_record') and
func_supports_parameter(self.duplication_method, 'only_duplicated_ids')
)
):
raise AlgoliaIndexError(
'{} doesnt accept a `raw_record` or'
' `only_duplicated_ids` parameter.'.format(
self.duplication_method
))
if not self.settings.get('attributeForDistinct'):
raise AlgoliaIndexError('Missing attributeForDistinct setting.')

# Check should_index + get the callable or attribute/field name
if self.should_index:
if hasattr(model, self.should_index):
Expand Down Expand Up @@ -196,13 +223,16 @@ def _validate_geolocation(geolocation):
)
)

def get_raw_record(self, instance, update_fields=None):
def get_raw_record(self, instance, update_fields=None, only_duplicated_ids=None):
"""
Gets the raw record.

If `update_fields` is set, the raw record will be build with only
the objectID and the given fields. Also, `_geoloc` and `_tags` will
not be included.

If `only_duplicated_ids` is set, it needs to be a list of duplicated data
that was added to a record.
"""
tmp = {'objectID': self.objectID(instance)}

Expand Down Expand Up @@ -237,8 +267,37 @@ def get_raw_record(self, instance, update_fields=None):
tmp['_tags'] = list(tmp['_tags'])

logger.debug('BUILD %s FROM %s', tmp['objectID'], self.model)
if self._has_duplication_method():
logger.debug('DUPLICATING MODEL %s', self.model)
records = self.duplication_method(
instance, raw_record=tmp,
only_duplicated_ids=only_duplicated_ids
)
if isinstance(records, (list, tuple, types.GeneratorType)):
return records

# unsupported type
raise TypeError(
'{} should return a list, tuple or generator.'.format(
self.duplication_method
)
)

return tmp

def _get_duplicated_raw_records(self, *args, **kwargs):
"""
Return a list of records, possibly duplicated if
`_has_duplication_method()` returns True.
It is an internal helper.
"""
objs = self.get_raw_record(*args, **kwargs)
return objs if self._has_duplication_method() else [objs]

def _has_duplication_method(self):
"""Return True if this AlgoliaIndex has a duplication_method method"""
return self.duplication_method is not None

def _has_should_index(self):
"""Return True if this AlgoliaIndex has a should_index method or attribute"""
return self.should_index is not None
Expand Down Expand Up @@ -283,14 +342,17 @@ def _should_really_index(self, instance):
instance.__class__.__name__, self.should_index))
return attr_value

def save_record(self, instance, update_fields=None, **kwargs):
def save_record(self, instance, update_fields=None, only_duplicated_ids=None, **kwargs):
"""Saves the record.

If `update_fields` is set, this method will use partial_update_object()
and will update only the given fields (never `_geoloc` and `_tags`).

For more information about partial_update_object:
https://github.com/algolia/algoliasearch-client-python#update-an-existing-object-in-the-index

If `only_duplicated_ids` is set, it needs to be a list
of duplicated data that was added to the record.
"""
if not self._should_index(instance):
# Should not index, but since we don't now the state of the
Expand All @@ -301,35 +363,55 @@ def save_record(self, instance, update_fields=None, **kwargs):

try:
if update_fields:
obj = self.get_raw_record(instance,
update_fields=update_fields)
result = self.__index.partial_update_object(obj)
index_method = self.__index.partial_update_objects
else:
obj = self.get_raw_record(instance)
result = self.__index.save_object(obj)
logger.info('SAVE %s FROM %s', obj['objectID'], self.model)
index_method = self.__index.save_objects

objs = self._get_duplicated_raw_records(
instance, update_fields=update_fields,
only_duplicated_ids=only_duplicated_ids
)
batch = AlgoliaIndexBatch(
index_method, self.DEFAULT_BATCH_SIZE, objs
)
result = batch.flush()
logger.info('SAVE %s FROM %s', self.objectID(instance), self.model)
return result
except AlgoliaException as e:
if DEBUG:
raise e
else:
logger.warning('%s FROM %s NOT SAVED: %s', obj['objectID'],
logger.warning('%s FROM %s NOT SAVED: %s', self.objectID(instance),
self.model, e)

def delete_record(self, instance):
def delete_record(self, instance, only_duplicated_ids=None):
"""Deletes the record."""
objectID = self.objectID(instance)
if self._has_duplication_method():
if only_duplicated_ids:
object_ids = only_duplicated_ids
else:
object_ids = [
obj['objectID']
for obj in self.get_raw_record(instance)
]
else:
object_ids = [self.objectID(instance)]
try:
self.__index.delete_object(objectID)
logger.info('DELETE %s FROM %s', objectID, self.model)
batch = AlgoliaIndexBatch(
self.__index.delete_objects,
self.DEFAULT_BATCH_SIZE, object_ids
)
result = batch.flush()
logger.info('DELETE %s FROM %s', self.objectID(instance), self.model)
return result
except AlgoliaException as e:
if DEBUG:
raise e
else:
logger.warning('%s FROM %s NOT DELETED: %s', objectID,
logger.warning('%s FROM %s NOT DELETED: %s', self.objectID(instance),
self.model, e)

def update_records(self, qs, batch_size=1000, **kwargs):
def update_records(self, qs, batch_size=DEFAULT_BATCH_SIZE, **kwargs):
"""
Updates multiple records.

Expand All @@ -342,6 +424,11 @@ def update_records(self, qs, batch_size=1000, **kwargs):
>>> update_records(MyModel, qs, myField=True)
>>> qs.update(myField=True)
"""
if self._has_duplication_method():
raise AlgoliaIndexError(
'update_records() with record duplication is not supported yet'
)

tmp = {}
for key, value in kwargs.items():
name = self.__translate_fields.get(key, None)
Expand Down Expand Up @@ -432,7 +519,7 @@ def delete(self):
if self.__tmp_index:
self.__tmp_index.delete()

def reindex_all(self, batch_size=1000):
def reindex_all(self, batch_size=DEFAULT_BATCH_SIZE):
"""
Reindex all the records.

Expand Down Expand Up @@ -487,7 +574,7 @@ def reindex_all(self, batch_size=1000):
logger.debug('CLEAR INDEX %s_tmp', self.index_name)

counts = 0
batch = []
batch = AlgoliaIndexBatch(self.__tmp_index.save_objects, batch_size)

if hasattr(self, 'get_queryset'):
qs = self.get_queryset()
Expand All @@ -498,17 +585,14 @@ def reindex_all(self, batch_size=1000):
if not self._should_index(instance):
continue # should not index

batch.append(self.get_raw_record(instance))
if len(batch) >= batch_size:
self.__tmp_index.save_objects(batch)
logger.info('SAVE %d OBJECTS TO %s_tmp', len(batch),
self.index_name)
batch = []

objs = self._get_duplicated_raw_records(instance)
batch.add(objs)
counts += 1
if len(batch) > 0:
self.__tmp_index.save_objects(batch)
logger.info('SAVE %d OBJECTS TO %s_tmp', len(batch),
self.index_name)

logger.info('SAVE %d OBJECTS TO %s_tmp', len(batch),
self.index_name)
batch.flush()

self.__client.move_index(self.tmp_index_name,
self.index_name)
Expand Down Expand Up @@ -539,3 +623,40 @@ def reindex_all(self, batch_size=1000):
else:
logger.warning('ERROR DURING REINDEXING %s: %s', self.model,
e)


class AlgoliaIndexBatch(object):
"""Helper to construct batches of requests and send them."""

def __init__(self, method, size, queue=None):
"""
Initialize the batch.

:param method: the method used to flush the batch
:param size: the batch size
:param queue: optional queue to initialize the batch
"""
self.method = method
self.size = size
# casting the queue to a list here
self._queue = list(queue) if queue else []

def __len__(self):
"""Return the length of the batch."""
return len(self._queue)

def add(self, objs):
"""Add objects to the batch."""
self._queue.extend(objs)

def flush(self):
"""
Flush the batch, i.e. perform the requests, by ensuring that
each batched request doesn't contain more than `self.size` operations.
"""
results = []
while self._queue:
results.append(self.method(self._queue[:self.size]))
self._queue = self._queue[self.size:]

return results
22 changes: 22 additions & 0 deletions algoliasearch_django/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,28 @@ def reset(self, settings=None):
"""
self.__init__(settings=settings if settings is not None else SETTINGS)

# Record duplication specifics

def add_duplicated_records(self, instance, new_record_ids):
"""
Add new duplicated records to an instance.

See documentation here:
https://www.algolia.com/doc/guides/ranking/distinct/#distinct-to-index-large-records
"""
adapter = self.get_adapter_from_instance(instance)
return adapter.save_record(instance, only_duplicated_ids=new_record_ids)

def delete_duplicated_records(self, instance, old_record_ids, **kwargs):
"""
Delete old duplicated records from an instance.

See documentation here:
https://www.algolia.com/doc/guides/ranking/distinct/#distinct-to-index-large-records
"""
adapter = self.get_adapter_from_instance(instance)
return adapter.delete_record(instance, only_duplicated_ids=old_record_ids, **kwargs)

# Signalling hooks.

def __post_save_receiver(self, instance, **kwargs):
Expand Down
42 changes: 42 additions & 0 deletions algoliasearch_django/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import inspect

from django.utils import six


def func_accepts_kwargs(func):
"""
Shameless copy-paste from django.utils.inspect.

FIXME: when dropping support for Django 1.7, remove this code
"""
if six.PY2:
# Not all callables are inspectable with getargspec, so we'll
# try a couple different ways but in the end fall back on assuming
# it is -- we don't want to prevent registration of valid but weird
# callables.
try:
argspec = inspect.getargspec(func)
except TypeError:
try:
argspec = inspect.getargspec(func.__call__)
except (TypeError, AttributeError):
argspec = None
return not argspec or argspec[2] is not None

return any(
p for p in inspect.signature(func).parameters.values()
if p.kind == p.VAR_KEYWORD
)


def func_supports_parameter(func, parameter):
"""
Shameless copy-paste from django.utils.inspect.

FIXME: when dropping support for Django 1.7, remove this code
"""
if six.PY3:
return parameter in inspect.signature(func).parameters
else:
args, varargs, varkw, defaults = inspect.getargspec(func)
return parameter in args
18 changes: 18 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ def property_should_not_index(self):
def property_string(self):
return "foo"

def duplication_get_locations(self):
return [
{'id': '1', 'name': 'Paris'},
{'id': '2', 'name': 'San Francisco'},
{'id': '3', 'name': 'Berlin'}
]

def duplication_get_records_per_location(self, raw_record, only_duplicated_ids=None):
for loc in self.duplication_get_locations():
lang_record = raw_record.copy()
lang_record['objectID'] = '{}-{}'.format(lang_record['objectID'], loc['id'])
lang_record['location'] = loc['name']
if not only_duplicated_ids or lang_record['objectID'] in only_duplicated_ids:
yield lang_record

def duplication_get_records_per_location_wrong_type(self, **kwargs):
return 'oups'


class BlogPost(models.Model):
author = models.ForeignKey(
Expand Down
Loading