Skip to content

Commit

Permalink
add batching and retry logic to datapath insert and update methods
Browse files Browse the repository at this point in the history
  • Loading branch information
karlcz committed Aug 21, 2024
1 parent 3565ffb commit aeaf372
Showing 1 changed file with 190 additions and 21 deletions.
211 changes: 190 additions & 21 deletions deriva/core/datapath.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import date
import itertools
import logging
import time
import re
from requests import HTTPError
import warnings
Expand Down Expand Up @@ -695,6 +696,102 @@ def fetch(self, limit=None, headers=DEFAULT_HEADERS):
logger.debug("Fetched %d entities" % len(self._results_doc))
return self

def _json_size_approx(data):
"""Return approximate byte count for minimal JSON encoding of data
Minimal encoding has no optional whitespace/indentation.
"""
nbytes = 0

if isinstance(data, (list, tuple)):
nbytes += 2
for elem in data:
nbytes += _json_size_approx(elem) + 1
elif isinstance(data, dict):
nbytes += 2
for k, v in data.items():
nbytes += _json_size_approx(k) + _json_size_approx(v) + 2
elif isinstance(data, str):
nbytes += len(data.encode("utf-8")) + 2
else:
nbytes += len(str(data))

return nbytes

def _generate_batches(entities, max_batch_rows=1000, max_batch_bytes=250*1024):
"""Generate a series of entity batches as slices of the input entities
"""
if not isinstance(entities, (list, tuple)):
raise TypeError('invalid type %s for entities, list or tuple expected' % (type(entities),))

if not max_batch_rows:
logger.debug("disabling batching due to max_batch_rows=%r" % (max_batch_rows,))
return entities

top = len(entities)
lower = 0

while lower < top:
# to ensure progress, always use at least one row per batch regardless of nbytes
upper = lower + 1
batch_nbytes = _json_size_approx(entities[lower])

# advance upper position until a batch size limit is reached
while (upper - lower) < max_batch_rows:
if upper >= top:
break
batch_nbytes += _json_size_approx(entities[upper])
if batch_nbytes > max_batch_bytes:
break
upper += 1

# generate one batch and advance for next batch
logger.debug("generating batch of %d/%d entities (%d:%d)" % (upper-lower, top, lower, upper))
yield entities[lower:upper]
lower = upper

def _request_with_retry(request_func, retry_codes={408, 429, 500, 502, 503, 504}, backoff_factor=4, max_attempts=5):
"""Perform request func with exponential backoff and retry.
:param request_func: A function returning a requests.Response object or raising HTTPError
:param retry_codes: HTTPError status codes on which to attempt retry
:param backoff_factor: Base number of seconds for factor**attempt exponential backoff
:param max_attempts: Max number of request attempts.
Retry will be attempted on HTTPError exceptions which match retry_codes and
also on other unknown exceptions, presumed to be transport errors.
The request_func should do the equivalent of resp.raise_on_status() so that
it only returns a response object for successful requests.
"""
attempt = 0
last_ex = None

while attempt < max_attempts:
try:
if attempt > 0:
delay = backoff_factor**(attempt-1)
logger.debug("sleeping %d seconds before retry %d..." % (delay, attempt))
time.sleep(delay)
attempt += 1
return request_func()
except HTTPError as e:
logger.debug(e.response.text)
last_ex = e
if 400 <= e.response.status_code < 500:
last_ex = DataPathException(_http_error_message(e), e)
if int(e.response.status_code) not in retry_codes:
raise last_ex
except Exception as e:
logger.debug(e.response.text)
last_ex = e

# early return means we don't get here on successful requests
logger.warning("maximum request retry limit %d exceeded" % (max_attempts,))
if last_ex is None:
raise ValueError('exceeded max_attempts without catching a request exception')
raise last_ex

class _TableWrapper (object):
"""Wraps a Table for datapath expressions.
Expand Down Expand Up @@ -836,15 +933,31 @@ def denormalize(self, context_name=None, heuristic=None, groupkey_name='RID'):
"""
return self.path.denormalize(context_name=context_name, heuristic=heuristic, groupkey_name=groupkey_name)

def insert(self, entities, defaults=set(), nondefaults=set(), add_system_defaults=True, on_conflict_skip=False):
def insert(self, entities, defaults=set(), nondefaults=set(), add_system_defaults=True, on_conflict_skip=False, retry_codes={408, 429, 500, 502, 503, 504}, backoff_factor=4, max_attempts=5, max_batch_rows=1000, max_batch_bytes=250*1024):
"""Inserts entities into the table.
:param entities: an iterable collection of entities (i.e., rows) to be inserted into the table.
:param defaults: optional, set of column names to be assigned the default expression value.
:param nondefaults: optional, set of columns names to override implicit system defaults
:param add_system_defaults: flag to add system columns to the set of default columns.
:param on_conflict_skip: flag to skip entities that violate uniqueness constraints.
:param retry_codes: set of HTTP status codes for which retry should be considered.
:param backoff_factor: number of seconds for base of exponential retry backoff.
:param max_attempts: maximum number of requests attempts with retry.
:param max_batch_rows: maximum number of rows for one request, or False to disable batching.
:param max_batch_bytes: approximate maximum number of bytes for one request.
:return a collection of newly created entities.
Retry will only be attempted for idempotent insertion
requests, which are when a user-controlled, non-nullable key
is present in the table and the key's constituent column(s)
are not listed as defaults, and on_conflict_skip=True.
When performing retries, an exponential backoff delay is
introduced after each failed attempt. The delay is
backoff_factor**attempt_number seconds for attempts 0 through
max_attempts-1.
"""
# empty entities will be accepted but results are therefore an empty entity set
if not entities:
Expand Down Expand Up @@ -879,17 +992,46 @@ def insert(self, entities, defaults=set(), nondefaults=set(), add_system_default
if not hasattr(entities[0], 'keys'):
raise TypeError('entities[0] does not look like a dictionary -- does not have a "keys()" method')

try:
resp = self._schema._catalog._wrapped_catalog.post(path, json=entities, headers={'Content-Type': 'application/json'})
return _ResultSet(self.path.uri, lambda ignore1, ignore2, ignore3: resp.json())
except HTTPError as e:
logger.debug(e.response.text)
if 400 <= e.response.status_code < 500:
raise DataPathException(_http_error_message(e), e)
else:
raise e
# perform one batch request in a helper we can hand to retry helper
def request_func(batch, results):
return self._schema._catalog._wrapped_catalog.post(path, json=batch, headers={'Content-Type': 'application/json'})

def _has_user_pkey(table):
"""Return True if table has at least one primary key other than the system RID key"""
for key in table.keys:
if { c.name for c in key.unique_columns } != {'RID'}:
if all([ not c.nullok for c in key.unique_columns ]) \
and all([ c.name not in defaults for c in key.unique_columns ]):
return True
return False

# determine whether insert is idempotent and therefore retry safe
retry_safe = on_conflict_skip and _has_user_pkey(self._wrapped_table)

# perform all requests in a helper we can hand to _ResultSet
def results_func(ignore1, ignore2, ignore3):
results = []
for batch in _generate_batches(
entities,
max_batch_rows=max_batch_rows,
max_batch_bytes=max_batch_bytes
):
if retry_safe:
resp = _request_with_retry(
lambda: request_func(batch, results),
retry_codes=retry_codes,
backoff_factor=backoff_factor,
max_attempts=max_attempts
)
else:
resp = request_func(batch, results)
results.extend(resp.json())
return results

return _ResultSet(self.path.uri, results_func)

def update(self, entities, correlation={'RID'}, targets=None):

def update(self, entities, correlation={'RID'}, targets=None, retry_codes={408, 429, 500, 502, 503, 504}, backoff_factor=4, max_attempts=5, max_batch_rows=1000, max_batch_bytes=250*1024):
"""Update entities of a table.
For more information see the ERMrest protocol for the `attributegroup` interface. By default, this method will
Expand All @@ -901,7 +1043,22 @@ def update(self, entities, correlation={'RID'}, targets=None):
:param correlation: an iterable collection of column names used to correlate input set to the set of rows to be
updated in the catalog. E.g., `{'col name'}` or `{mytable.mycolumn}` will work if you pass a _ColumnWrapper object.
:param targets: an iterable collection of column names used as the targets of the update operation.
:return: a collection of updated entities as returned by the corresponding ERMrest interface.
:param retry_codes: set of HTTP status codes for which retry should be considered.
:param backoff_factor: number of seconds for base of exponential retry backoff.
:param max_attempts: maximum number of requests attempts with retry.
:param max_batch_rows: maximum number of rows for one request, or False to disable batching.
:param max_batch_bytes: approximate maximum number of bytes for one request.
:return a collection of newly created entities.
Retry will only be attempted for idempotent insertion
requests, which are when a user-controlled, non-nullable key
is present in the table and the key's constituent column(s)
are not listed as defaults, and on_conflict_skip=True.
When performing retries, an exponential backoff delay is
introduced after each failed attempt. The delay is
backoff_factor**attempt_number seconds for attempts 0 through
max_attempts-1.
"""
# empty entities will be accepted but results are therefore an empty entity set
if not entities:
Expand Down Expand Up @@ -936,16 +1093,28 @@ def update(self, entities, correlation={'RID'}, targets=None):
targets=','.join(target_cnames)
)

try:
resp = self._schema._catalog._wrapped_catalog.put(path, json=entities, headers={'Content-Type': 'application/json'})
return _ResultSet(self.path.uri, lambda ignore1, ignore2, ignore3: resp.json())
except HTTPError as e:
logger.debug(e.response.text)
if 400 <= e.response.status_code < 500:
raise DataPathException(_http_error_message(e), e)
else:
raise e
# perform one batch request in a helper we can hand to retry helper
def request_func(batch, results):
return self._schema._catalog._wrapped_catalog.put(path, json=batch, headers={'Content-Type': 'application/json'})

# perform all requests in a helper we can hand to _ResultSet
def results_func(ignore1, ignore2, ignore3):
results = []
for batch in _generate_batches(
entities,
max_batch_rows=max_batch_rows,
max_batch_bytes=max_batch_bytes
):
resp = _request_with_retry(
lambda: request_func(batch, results),
retry_codes=retry_codes,
backoff_factor=backoff_factor,
max_attempts=max_attempts
)
results.extend(resp.json())
return results

return _ResultSet(self.path.uri, results_func)

class _TableAlias (_TableWrapper):
"""Represents a table alias in datapath expressions.
Expand Down

0 comments on commit aeaf372

Please sign in to comment.