diff --git a/google/cloud/bigtable/_mutate_rows.py b/google/cloud/bigtable/_mutate_rows.py index a422c99b2..e34ebaeb6 100644 --- a/google/cloud/bigtable/_mutate_rows.py +++ b/google/cloud/bigtable/_mutate_rows.py @@ -31,6 +31,9 @@ from google.cloud.bigtable.client import Table from google.cloud.bigtable.mutations import RowMutationEntry +# mutate_rows requests are limited to this value +MUTATE_ROWS_REQUEST_MUTATION_LIMIT = 100_000 + class _MutateRowsIncomplete(RuntimeError): """ @@ -68,6 +71,14 @@ def __init__( - per_request_timeout: the timeoutto use for each mutate_rows attempt, in seconds. If not specified, the request will run until operation_timeout is reached. """ + # check that mutations are within limits + total_mutations = sum(len(entry.mutations) for entry in mutation_entries) + if total_mutations > MUTATE_ROWS_REQUEST_MUTATION_LIMIT: + raise ValueError( + "mutate_rows requests can contain at most " + f"{MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations across " + f"all entries. Found {total_mutations}." + ) # create partial function to pass to trigger rpc call metadata = _make_metadata(table.table_name, table.app_profile_id) self._gapic_fn = functools.partial( @@ -119,7 +130,7 @@ async def start(self): self._handle_entry_error(idx, exc) finally: # raise exception detailing incomplete mutations - all_errors = [] + all_errors: list[Exception] = [] for idx, exc_list in self.errors.items(): if len(exc_list) == 0: raise core_exceptions.ClientError( diff --git a/google/cloud/bigtable/client.py b/google/cloud/bigtable/client.py index 3d33eebf9..4ec3cea27 100644 --- a/google/cloud/bigtable/client.py +++ b/google/cloud/bigtable/client.py @@ -20,8 +20,6 @@ Any, Optional, Set, - Callable, - Coroutine, TYPE_CHECKING, ) @@ -60,6 +58,8 @@ from google.cloud.bigtable._mutate_rows import _MutateRowsOperation from google.cloud.bigtable._helpers import _make_metadata from google.cloud.bigtable._helpers import _convert_retry_deadline +from google.cloud.bigtable.mutations_batcher import MutationsBatcher +from google.cloud.bigtable.mutations_batcher import _MB_SIZE from google.cloud.bigtable._helpers import _attempt_timeout_generator from google.cloud.bigtable.read_modify_write_rules import ReadModifyWriteRule @@ -69,7 +69,6 @@ from google.cloud.bigtable.row_filters import RowFilterChain if TYPE_CHECKING: - from google.cloud.bigtable.mutations_batcher import MutationsBatcher from google.cloud.bigtable import RowKeySamples from google.cloud.bigtable import ShardedQuery @@ -753,17 +752,48 @@ async def execute_rpc(): ) return await wrapped_fn() - def mutations_batcher(self, **kwargs) -> MutationsBatcher: + def mutations_batcher( + self, + *, + flush_interval: float | None = 5, + flush_limit_mutation_count: int | None = 1000, + flush_limit_bytes: int = 20 * _MB_SIZE, + flow_control_max_mutation_count: int = 100_000, + flow_control_max_bytes: int = 100 * _MB_SIZE, + batch_operation_timeout: float | None = None, + batch_per_request_timeout: float | None = None, + ) -> MutationsBatcher: """ Returns a new mutations batcher instance. Can be used to iteratively add mutations that are flushed as a group, to avoid excess network calls + Args: + - flush_interval: Automatically flush every flush_interval seconds. If None, + a table default will be used + - flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count + mutations are added across all entries. If None, this limit is ignored. + - flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. + - flow_control_max_mutation_count: Maximum number of inflight mutations. + - flow_control_max_bytes: Maximum number of inflight bytes. + - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None, + table default_operation_timeout will be used + - batch_per_request_timeout: timeout for each individual request, in seconds. If None, + table default_per_request_timeout will be used Returns: - a MutationsBatcher context manager that can batch requests """ - return MutationsBatcher(self, **kwargs) + return MutationsBatcher( + self, + flush_interval=flush_interval, + flush_limit_mutation_count=flush_limit_mutation_count, + flush_limit_bytes=flush_limit_bytes, + flow_control_max_mutation_count=flow_control_max_mutation_count, + flow_control_max_bytes=flow_control_max_bytes, + batch_operation_timeout=batch_operation_timeout, + batch_per_request_timeout=batch_per_request_timeout, + ) async def mutate_row( self, @@ -861,10 +891,6 @@ async def bulk_mutate_rows( *, operation_timeout: float | None = 60, per_request_timeout: float | None = None, - on_success: Callable[ - [int, RowMutationEntry], None | Coroutine[None, None, None] - ] - | None = None, ): """ Applies mutations for multiple rows in a single batched request. @@ -890,9 +916,6 @@ async def bulk_mutate_rows( in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted if within operation_timeout budget - - on_success: a callback function that will be called when each mutation - entry is confirmed to be applied successfully. Will be passed the - index and the entry itself. Raises: - MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions diff --git a/google/cloud/bigtable/exceptions.py b/google/cloud/bigtable/exceptions.py index b2cf0ce6b..fc4e368b9 100644 --- a/google/cloud/bigtable/exceptions.py +++ b/google/cloud/bigtable/exceptions.py @@ -85,19 +85,96 @@ def __str__(self): class MutationsExceptionGroup(BigtableExceptionGroup): """ Represents one or more exceptions that occur during a bulk mutation operation + + Exceptions will typically be of type FailedMutationEntryError, but other exceptions may + be included if they are raised during the mutation operation """ @staticmethod - def _format_message(excs: list[FailedMutationEntryError], total_entries: int): - entry_str = "entry" if total_entries == 1 else "entries" - plural_str = "" if len(excs) == 1 else "s" - return f"{len(excs)} sub-exception{plural_str} (from {total_entries} {entry_str} attempted)" + def _format_message( + excs: list[Exception], total_entries: int, exc_count: int | None = None + ) -> str: + """ + Format a message for the exception group + + Args: + - excs: the exceptions in the group + - total_entries: the total number of entries attempted, successful or not + - exc_count: the number of exceptions associated with the request + if None, this will be len(excs) + """ + exc_count = exc_count if exc_count is not None else len(excs) + entry_str = "entry" if exc_count == 1 else "entries" + return f"{exc_count} failed {entry_str} from {total_entries} attempted." + + def __init__( + self, excs: list[Exception], total_entries: int, message: str | None = None + ): + """ + Args: + - excs: the exceptions in the group + - total_entries: the total number of entries attempted, successful or not + - message: the message for the exception group. If None, a default message + will be generated + """ + message = ( + message + if message is not None + else self._format_message(excs, total_entries) + ) + super().__init__(message, excs) + self.total_entries_attempted = total_entries - def __init__(self, excs: list[FailedMutationEntryError], total_entries: int): - super().__init__(self._format_message(excs, total_entries), excs) + def __new__( + cls, excs: list[Exception], total_entries: int, message: str | None = None + ): + """ + Args: + - excs: the exceptions in the group + - total_entries: the total number of entries attempted, successful or not + - message: the message for the exception group. If None, a default message + """ + message = ( + message if message is not None else cls._format_message(excs, total_entries) + ) + instance = super().__new__(cls, message, excs) + instance.total_entries_attempted = total_entries + return instance - def __new__(cls, excs: list[FailedMutationEntryError], total_entries: int): - return super().__new__(cls, cls._format_message(excs, total_entries), excs) + @classmethod + def from_truncated_lists( + cls, + first_list: list[Exception], + last_list: list[Exception], + total_excs: int, + entry_count: int, + ) -> MutationsExceptionGroup: + """ + Create a MutationsExceptionGroup from two lists of exceptions, representing + a larger set that has been truncated. The MutationsExceptionGroup will + contain the union of the two lists as sub-exceptions, and the error message + describe the number of exceptions that were truncated. + + Args: + - first_list: the set of oldest exceptions to add to the ExceptionGroup + - last_list: the set of newest exceptions to add to the ExceptionGroup + - total_excs: the total number of exceptions associated with the request + Should be len(first_list) + len(last_list) + number of dropped exceptions + in the middle + - entry_count: the total number of entries attempted, successful or not + """ + first_count, last_count = len(first_list), len(last_list) + if first_count + last_count >= total_excs: + # no exceptions were dropped + return cls(first_list + last_list, entry_count) + excs = first_list + last_list + truncation_count = total_excs - (first_count + last_count) + base_message = cls._format_message(excs, entry_count, total_excs) + first_message = f"first {first_count}" if first_count else "" + last_message = f"last {last_count}" if last_count else "" + conjunction = " and " if first_message and last_message else "" + message = f"{base_message} ({first_message}{conjunction}{last_message} attached as sub-exceptions; {truncation_count} truncated)" + return cls(excs, entry_count, message) class FailedMutationEntryError(Exception): @@ -108,14 +185,17 @@ class FailedMutationEntryError(Exception): def __init__( self, - failed_idx: int, + failed_idx: int | None, failed_mutation_entry: "RowMutationEntry", cause: Exception, ): idempotent_msg = ( "idempotent" if failed_mutation_entry.is_idempotent() else "non-idempotent" ) - message = f"Failed {idempotent_msg} mutation entry at index {failed_idx} with cause: {cause!r}" + index_msg = f" at index {failed_idx} " if failed_idx is not None else " " + message = ( + f"Failed {idempotent_msg} mutation entry{index_msg}with cause: {cause!r}" + ) super().__init__(message) self.index = failed_idx self.entry = failed_mutation_entry diff --git a/google/cloud/bigtable/mutations.py b/google/cloud/bigtable/mutations.py index fe136f8d9..a4c02cd74 100644 --- a/google/cloud/bigtable/mutations.py +++ b/google/cloud/bigtable/mutations.py @@ -17,6 +17,11 @@ import time from dataclasses import dataclass from abc import ABC, abstractmethod +from sys import getsizeof + +# mutation entries above this should be rejected +from google.cloud.bigtable._mutate_rows import MUTATE_ROWS_REQUEST_MUTATION_LIMIT + from google.cloud.bigtable.read_modify_write_rules import MAX_INCREMENT_VALUE @@ -41,6 +46,12 @@ def is_idempotent(self) -> bool: def __str__(self) -> str: return str(self._to_dict()) + def size(self) -> int: + """ + Get the size of the mutation in bytes + """ + return getsizeof(self._to_dict()) + @classmethod def _from_dict(cls, input_dict: dict[str, Any]) -> Mutation: instance: Mutation | None = None @@ -195,6 +206,12 @@ def __init__(self, row_key: bytes | str, mutations: Mutation | list[Mutation]): row_key = row_key.encode("utf-8") if isinstance(mutations, Mutation): mutations = [mutations] + if len(mutations) == 0: + raise ValueError("mutations must not be empty") + elif len(mutations) > MUTATE_ROWS_REQUEST_MUTATION_LIMIT: + raise ValueError( + f"entries must have <= {MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations" + ) self.row_key = row_key self.mutations = tuple(mutations) @@ -208,6 +225,12 @@ def is_idempotent(self) -> bool: """Check if the mutation is idempotent""" return all(mutation.is_idempotent() for mutation in self.mutations) + def size(self) -> int: + """ + Get the size of the mutation in bytes + """ + return getsizeof(self._to_dict()) + @classmethod def _from_dict(cls, input_dict: dict[str, Any]) -> RowMutationEntry: return RowMutationEntry( diff --git a/google/cloud/bigtable/mutations_batcher.py b/google/cloud/bigtable/mutations_batcher.py index 9681f4382..68c3f9fbe 100644 --- a/google/cloud/bigtable/mutations_batcher.py +++ b/google/cloud/bigtable/mutations_batcher.py @@ -14,17 +14,149 @@ # from __future__ import annotations +from typing import Any, TYPE_CHECKING import asyncio -from typing import TYPE_CHECKING +import atexit +import warnings +from collections import deque +from google.cloud.bigtable.mutations import RowMutationEntry +from google.cloud.bigtable.exceptions import MutationsExceptionGroup +from google.cloud.bigtable.exceptions import FailedMutationEntryError + +from google.cloud.bigtable._mutate_rows import _MutateRowsOperation +from google.cloud.bigtable._mutate_rows import MUTATE_ROWS_REQUEST_MUTATION_LIMIT from google.cloud.bigtable.mutations import Mutation -from google.cloud.bigtable.row_filters import RowFilter if TYPE_CHECKING: from google.cloud.bigtable.client import Table # pragma: no cover -# Type alias used internally for readability. -_row_key_type = bytes +# used to make more readable default values +_MB_SIZE = 1024 * 1024 + + +class _FlowControl: + """ + Manages flow control for batched mutations. Mutations are registered against + the FlowControl object before being sent, which will block if size or count + limits have reached capacity. As mutations completed, they are removed from + the FlowControl object, which will notify any blocked requests that there + is additional capacity. + + Flow limits are not hard limits. If a single mutation exceeds the configured + limits, it will be allowed as a single batch when the capacity is available. + """ + + def __init__( + self, + max_mutation_count: int, + max_mutation_bytes: int, + ): + """ + Args: + - max_mutation_count: maximum number of mutations to send in a single rpc. + This corresponds to individual mutations in a single RowMutationEntry. + - max_mutation_bytes: maximum number of bytes to send in a single rpc. + """ + self._max_mutation_count = max_mutation_count + self._max_mutation_bytes = max_mutation_bytes + if self._max_mutation_count < 1: + raise ValueError("max_mutation_count must be greater than 0") + if self._max_mutation_bytes < 1: + raise ValueError("max_mutation_bytes must be greater than 0") + self._capacity_condition = asyncio.Condition() + self._in_flight_mutation_count = 0 + self._in_flight_mutation_bytes = 0 + + def _has_capacity(self, additional_count: int, additional_size: int) -> bool: + """ + Checks if there is capacity to send a new entry with the given size and count + + FlowControl limits are not hard limits. If a single mutation exceeds + the configured flow limits, it will be sent in a single batch when + previous batches have completed. + + Args: + - additional_count: number of mutations in the pending entry + - additional_size: size of the pending entry + Returns: + - True if there is capacity to send the pending entry, False otherwise + """ + # adjust limits to allow overly large mutations + acceptable_size = max(self._max_mutation_bytes, additional_size) + acceptable_count = max(self._max_mutation_count, additional_count) + # check if we have capacity for new mutation + new_size = self._in_flight_mutation_bytes + additional_size + new_count = self._in_flight_mutation_count + additional_count + return new_size <= acceptable_size and new_count <= acceptable_count + + async def remove_from_flow( + self, mutations: RowMutationEntry | list[RowMutationEntry] + ) -> None: + """ + Removes mutations from flow control. This method should be called once + for each mutation that was sent to add_to_flow, after the corresponding + operation is complete. + + Args: + - mutations: mutation or list of mutations to remove from flow control + """ + if not isinstance(mutations, list): + mutations = [mutations] + total_count = sum(len(entry.mutations) for entry in mutations) + total_size = sum(entry.size() for entry in mutations) + self._in_flight_mutation_count -= total_count + self._in_flight_mutation_bytes -= total_size + # notify any blocked requests that there is additional capacity + async with self._capacity_condition: + self._capacity_condition.notify_all() + + async def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry]): + """ + Generator function that registers mutations with flow control. As mutations + are accepted into the flow control, they are yielded back to the caller, + to be sent in a batch. If the flow control is at capacity, the generator + will block until there is capacity available. + + Args: + - mutations: list mutations to break up into batches + Yields: + - list of mutations that have reserved space in the flow control. + Each batch contains at least one mutation. + """ + if not isinstance(mutations, list): + mutations = [mutations] + start_idx = 0 + end_idx = 0 + while end_idx < len(mutations): + start_idx = end_idx + batch_mutation_count = 0 + # fill up batch until we hit capacity + async with self._capacity_condition: + while end_idx < len(mutations): + next_entry = mutations[end_idx] + next_size = next_entry.size() + next_count = len(next_entry.mutations) + if ( + self._has_capacity(next_count, next_size) + # make sure not to exceed per-request mutation count limits + and (batch_mutation_count + next_count) + <= MUTATE_ROWS_REQUEST_MUTATION_LIMIT + ): + # room for new mutation; add to batch + end_idx += 1 + batch_mutation_count += next_count + self._in_flight_mutation_bytes += next_size + self._in_flight_mutation_count += next_count + elif start_idx != end_idx: + # we have at least one mutation in the batch, so send it + break + else: + # batch is empty. Block until we have capacity + await self._capacity_condition.wait_for( + lambda: self._has_capacity(next_count, next_size) + ) + yield mutations[start_idx:end_idx] class MutationsBatcher: @@ -35,7 +167,6 @@ class MutationsBatcher: to use as few network requests as required Flushes: - - manually - every flush_interval seconds - after queue reaches flush_count in quantity - after queue reaches flush_size_bytes in storage size @@ -46,61 +177,323 @@ class MutationsBatcher: batcher.add(row, mut) """ - queue: asyncio.Queue[tuple[_row_key_type, list[Mutation]]] - conditional_queues: dict[RowFilter, tuple[list[Mutation], list[Mutation]]] - - MB_SIZE = 1024 * 1024 - def __init__( self, table: "Table", - flush_count: int = 100, - flush_size_bytes: int = 100 * MB_SIZE, - max_mutation_bytes: int = 20 * MB_SIZE, - flush_interval: int = 5, - metadata: list[tuple[str, str]] | None = None, + *, + flush_interval: float | None = 5, + flush_limit_mutation_count: int | None = 1000, + flush_limit_bytes: int = 20 * _MB_SIZE, + flow_control_max_mutation_count: int = 100_000, + flow_control_max_bytes: int = 100 * _MB_SIZE, + batch_operation_timeout: float | None = None, + batch_per_request_timeout: float | None = None, ): - raise NotImplementedError + """ + Args: + - table: Table to preform rpc calls + - flush_interval: Automatically flush every flush_interval seconds. + If None, no time-based flushing is performed. + - flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count + mutations are added across all entries. If None, this limit is ignored. + - flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. + - flow_control_max_mutation_count: Maximum number of inflight mutations. + - flow_control_max_bytes: Maximum number of inflight bytes. + - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None, + table default_operation_timeout will be used + - batch_per_request_timeout: timeout for each individual request, in seconds. If None, + table default_per_request_timeout will be used + """ + self._operation_timeout: float = ( + batch_operation_timeout or table.default_operation_timeout + ) + self._per_request_timeout: float = ( + batch_per_request_timeout + or table.default_per_request_timeout + or self._operation_timeout + ) + if self._operation_timeout <= 0: + raise ValueError("batch_operation_timeout must be greater than 0") + if self._per_request_timeout <= 0: + raise ValueError("batch_per_request_timeout must be greater than 0") + if self._per_request_timeout > self._operation_timeout: + raise ValueError( + "batch_per_request_timeout must be less than batch_operation_timeout" + ) + self.closed: bool = False + self._table = table + self._staged_entries: list[RowMutationEntry] = [] + self._staged_count, self._staged_bytes = 0, 0 + self._flow_control = _FlowControl( + flow_control_max_mutation_count, flow_control_max_bytes + ) + self._flush_limit_bytes = flush_limit_bytes + self._flush_limit_count = ( + flush_limit_mutation_count + if flush_limit_mutation_count is not None + else float("inf") + ) + self._flush_timer = self._start_flush_timer(flush_interval) + self._flush_jobs: set[asyncio.Future[None]] = set() + # MutationExceptionGroup reports number of successful entries along with failures + self._entries_processed_since_last_raise: int = 0 + self._exceptions_since_last_raise: int = 0 + # keep track of the first and last _exception_list_limit exceptions + self._exception_list_limit: int = 10 + self._oldest_exceptions: list[Exception] = [] + self._newest_exceptions: deque[Exception] = deque( + maxlen=self._exception_list_limit + ) + # clean up on program exit + atexit.register(self._on_exit) - async def append(self, row_key: str | bytes, mutation: Mutation | list[Mutation]): + def _start_flush_timer(self, interval: float | None) -> asyncio.Future[None]: """ - Add a new mutation to the internal queue + Set up a background task to flush the batcher every interval seconds + + If interval is None, an empty future is returned + + Args: + - flush_interval: Automatically flush every flush_interval seconds. + If None, no time-based flushing is performed. + Returns: + - asyncio.Future that represents the background task """ - raise NotImplementedError + if interval is None or self.closed: + empty_future: asyncio.Future[None] = asyncio.Future() + empty_future.set_result(None) + return empty_future - async def append_conditional( - self, - predicate_filter: RowFilter, - row_key: str | bytes, - if_true_mutations: Mutation | list[Mutation] | None = None, - if_false_mutations: Mutation | list[Mutation] | None = None, - ): + async def timer_routine(self, interval: float): + """ + Triggers new flush tasks every `interval` seconds + """ + while not self.closed: + await asyncio.sleep(interval) + # add new flush task to list + if not self.closed and self._staged_entries: + self._schedule_flush() + + timer_task = asyncio.create_task(timer_routine(self, interval)) + return timer_task + + async def append(self, mutation_entry: RowMutationEntry): + """ + Add a new set of mutations to the internal queue + + TODO: return a future to track completion of this entry + + Args: + - mutation_entry: new entry to add to flush queue + Raises: + - RuntimeError if batcher is closed + - ValueError if an invalid mutation type is added + """ + if self.closed: + raise RuntimeError("Cannot append to closed MutationsBatcher") + if isinstance(mutation_entry, Mutation): # type: ignore + raise ValueError( + f"invalid mutation type: {type(mutation_entry).__name__}. Only RowMutationEntry objects are supported by batcher" + ) + self._staged_entries.append(mutation_entry) + # start a new flush task if limits exceeded + self._staged_count += len(mutation_entry.mutations) + self._staged_bytes += mutation_entry.size() + if ( + self._staged_count >= self._flush_limit_count + or self._staged_bytes >= self._flush_limit_bytes + ): + self._schedule_flush() + # yield to the event loop to allow flush to run + await asyncio.sleep(0) + + def _schedule_flush(self) -> asyncio.Future[None] | None: + """Update the flush task to include the latest staged entries""" + if self._staged_entries: + entries, self._staged_entries = self._staged_entries, [] + self._staged_count, self._staged_bytes = 0, 0 + new_task = self._create_bg_task(self._flush_internal, entries) + new_task.add_done_callback(self._flush_jobs.remove) + self._flush_jobs.add(new_task) + return new_task + return None + + async def _flush_internal(self, new_entries: list[RowMutationEntry]): + """ + Flushes a set of mutations to the server, and updates internal state + + Args: + - new_entries: list of RowMutationEntry objects to flush + """ + # flush new entries + in_process_requests: list[asyncio.Future[list[FailedMutationEntryError]]] = [] + async for batch in self._flow_control.add_to_flow(new_entries): + batch_task = self._create_bg_task(self._execute_mutate_rows, batch) + in_process_requests.append(batch_task) + # wait for all inflight requests to complete + found_exceptions = await self._wait_for_batch_results(*in_process_requests) + # update exception data to reflect any new errors + self._entries_processed_since_last_raise += len(new_entries) + self._add_exceptions(found_exceptions) + + async def _execute_mutate_rows( + self, batch: list[RowMutationEntry] + ) -> list[FailedMutationEntryError]: """ - Apply a different set of mutations based on the outcome of predicate_filter + Helper to execute mutation operation on a batch - Calls check_and_mutate_row internally on flush + Args: + - batch: list of RowMutationEntry objects to send to server + - timeout: timeout in seconds. Used as operation_timeout and per_request_timeout. + If not given, will use table defaults + Returns: + - list of FailedMutationEntryError objects for mutations that failed. + FailedMutationEntryError objects will not contain index information """ - raise NotImplementedError + request = {"table_name": self._table.table_name} + if self._table.app_profile_id: + request["app_profile_id"] = self._table.app_profile_id + try: + operation = _MutateRowsOperation( + self._table.client._gapic_client, + self._table, + batch, + operation_timeout=self._operation_timeout, + per_request_timeout=self._per_request_timeout, + ) + await operation.start() + except MutationsExceptionGroup as e: + # strip index information from exceptions, since it is not useful in a batch context + for subexc in e.exceptions: + subexc.index = None + return list(e.exceptions) + finally: + # mark batch as complete in flow control + await self._flow_control.remove_from_flow(batch) + return [] - async def flush(self): + def _add_exceptions(self, excs: list[Exception]): """ - Send queue over network in as few calls as possible + Add new list of exceptions to internal store. To avoid unbounded memory, + the batcher will store the first and last _exception_list_limit exceptions, + and discard any in between. + """ + self._exceptions_since_last_raise += len(excs) + if excs and len(self._oldest_exceptions) < self._exception_list_limit: + # populate oldest_exceptions with found_exceptions + addition_count = self._exception_list_limit - len(self._oldest_exceptions) + self._oldest_exceptions.extend(excs[:addition_count]) + excs = excs[addition_count:] + if excs: + # populate newest_exceptions with remaining found_exceptions + self._newest_exceptions.extend(excs[-self._exception_list_limit :]) + + def _raise_exceptions(self): + """ + Raise any unreported exceptions from background flush operations Raises: - - MutationsExceptionGroup if any mutation in the batch fails + - MutationsExceptionGroup with all unreported exceptions """ - raise NotImplementedError + if self._oldest_exceptions or self._newest_exceptions: + oldest, self._oldest_exceptions = self._oldest_exceptions, [] + newest = list(self._newest_exceptions) + self._newest_exceptions.clear() + entry_count, self._entries_processed_since_last_raise = ( + self._entries_processed_since_last_raise, + 0, + ) + exc_count, self._exceptions_since_last_raise = ( + self._exceptions_since_last_raise, + 0, + ) + raise MutationsExceptionGroup.from_truncated_lists( + first_list=oldest, + last_list=newest, + total_excs=exc_count, + entry_count=entry_count, + ) async def __aenter__(self): """For context manager API""" - raise NotImplementedError + return self async def __aexit__(self, exc_type, exc, tb): """For context manager API""" - raise NotImplementedError + await self.close() async def close(self): """ Flush queue and clean up resources """ - raise NotImplementedError + self.closed = True + self._flush_timer.cancel() + self._schedule_flush() + if self._flush_jobs: + await asyncio.gather(*self._flush_jobs, return_exceptions=True) + try: + await self._flush_timer + except asyncio.CancelledError: + pass + atexit.unregister(self._on_exit) + # raise unreported exceptions + self._raise_exceptions() + + def _on_exit(self): + """ + Called when program is exited. Raises warning if unflushed mutations remain + """ + if not self.closed and self._staged_entries: + warnings.warn( + f"MutationsBatcher for table {self._table.table_name} was not closed. " + f"{len(self._staged_entries)} Unflushed mutations will not be sent to the server." + ) + + @staticmethod + def _create_bg_task(func, *args, **kwargs) -> asyncio.Future[Any]: + """ + Create a new background task, and return a future + + This method wraps asyncio to make it easier to maintain subclasses + with different concurrency models. + + Args: + - func: function to execute in background task + - *args: positional arguments to pass to func + - **kwargs: keyword arguments to pass to func + Returns: + - Future object representing the background task + """ + return asyncio.create_task(func(*args, **kwargs)) + + @staticmethod + async def _wait_for_batch_results( + *tasks: asyncio.Future[list[FailedMutationEntryError]] | asyncio.Future[None], + ) -> list[Exception]: + """ + Takes in a list of futures representing _execute_mutate_rows tasks, + waits for them to complete, and returns a list of errors encountered. + + Args: + - *tasks: futures representing _execute_mutate_rows or _flush_internal tasks + Returns: + - list of Exceptions encountered by any of the tasks. Errors are expected + to be FailedMutationEntryError, representing a failed mutation operation. + If a task fails with a different exception, it will be included in the + output list. Successful tasks will not be represented in the output list. + """ + if not tasks: + return [] + all_results = await asyncio.gather(*tasks, return_exceptions=True) + found_errors = [] + for result in all_results: + if isinstance(result, Exception): + # will receive direct Exception objects if request task fails + found_errors.append(result) + elif result: + # completed requests will return a list of FailedMutationEntryError + for e in result: + # strip index information + e.index = None + found_errors.extend(result) + return found_errors diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 45a3e17d2..e1771202a 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -16,6 +16,7 @@ import pytest_asyncio import os import asyncio +import uuid from google.api_core import retry from google.api_core.exceptions import ClientError @@ -27,7 +28,10 @@ @pytest.fixture(scope="session") def event_loop(): - return asyncio.get_event_loop() + loop = asyncio.get_event_loop() + yield loop + loop.stop() + loop.close() @pytest.fixture(scope="session") @@ -206,6 +210,27 @@ async def _retrieve_cell_value(table, row_key): return cell.value +async def _create_row_and_mutation( + table, temp_rows, *, start_value=b"start", new_value=b"new_value" +): + """ + Helper to create a new row, and a sample set_cell mutation to change its value + """ + from google.cloud.bigtable.mutations import SetCell + + row_key = uuid.uuid4().hex.encode() + family = TEST_FAMILY + qualifier = b"test-qualifier" + await temp_rows.add_row( + row_key, family=family, qualifier=qualifier, value=start_value + ) + # ensure cell is initialized + assert (await _retrieve_cell_value(table, row_key)) == start_value + + mutation = SetCell(family=TEST_FAMILY, qualifier=qualifier, new_value=new_value) + return row_key, mutation + + @pytest_asyncio.fixture(scope="function") async def temp_rows(table): builder = TempRowBuilder(table) @@ -213,7 +238,7 @@ async def temp_rows(table): await builder.delete_rows() -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=10) @pytest.mark.asyncio async def test_ping_and_warm_gapic(client, table): """ @@ -246,27 +271,15 @@ async def test_mutation_set_cell(table, temp_rows): """ Ensure cells can be set properly """ - from google.cloud.bigtable.mutations import SetCell - - row_key = b"mutate" - family = TEST_FAMILY - qualifier = b"test-qualifier" - start_value = b"start" - await temp_rows.add_row( - row_key, family=family, qualifier=qualifier, value=start_value - ) - - # ensure cell is initialized - assert (await _retrieve_cell_value(table, row_key)) == start_value - - expected_value = b"new-value" - mutation = SetCell( - family=TEST_FAMILY, qualifier=b"test-qualifier", new_value=expected_value + row_key = b"bulk_mutate" + new_value = uuid.uuid4().hex.encode() + row_key, mutation = await _create_row_and_mutation( + table, temp_rows, new_value=new_value ) await table.mutate_row(row_key, mutation) # ensure cell is updated - assert (await _retrieve_cell_value(table, row_key)) == expected_value + assert (await _retrieve_cell_value(table, row_key)) == new_value @retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @@ -290,28 +303,173 @@ async def test_bulk_mutations_set_cell(client, table, temp_rows): """ Ensure cells can be set properly """ - from google.cloud.bigtable.mutations import SetCell, RowMutationEntry + from google.cloud.bigtable.mutations import RowMutationEntry - row_key = b"bulk_mutate" - family = TEST_FAMILY - qualifier = b"test-qualifier" - start_value = b"start" - await temp_rows.add_row( - row_key, family=family, qualifier=qualifier, value=start_value + new_value = uuid.uuid4().hex.encode() + row_key, mutation = await _create_row_and_mutation( + table, temp_rows, new_value=new_value ) + bulk_mutation = RowMutationEntry(row_key, [mutation]) - # ensure cell is initialized - assert (await _retrieve_cell_value(table, row_key)) == start_value + await table.bulk_mutate_rows([bulk_mutation]) + + # ensure cell is updated + assert (await _retrieve_cell_value(table, row_key)) == new_value + + +@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@pytest.mark.asyncio +async def test_mutations_batcher_context_manager(client, table, temp_rows): + """ + test batcher with context manager. Should flush on exit + """ + from google.cloud.bigtable.mutations import RowMutationEntry - expected_value = b"new-value" - mutation = SetCell( - family=TEST_FAMILY, qualifier=b"test-qualifier", new_value=expected_value + new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)] + row_key, mutation = await _create_row_and_mutation( + table, temp_rows, new_value=new_value + ) + row_key2, mutation2 = await _create_row_and_mutation( + table, temp_rows, new_value=new_value2 ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - await table.bulk_mutate_rows([bulk_mutation]) + bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) + async with table.mutations_batcher() as batcher: + await batcher.append(bulk_mutation) + await batcher.append(bulk_mutation2) # ensure cell is updated - assert (await _retrieve_cell_value(table, row_key)) == expected_value + assert (await _retrieve_cell_value(table, row_key)) == new_value + assert len(batcher._staged_entries) == 0 + + +@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@pytest.mark.asyncio +async def test_mutations_batcher_timer_flush(client, table, temp_rows): + """ + batch should occur after flush_interval seconds + """ + from google.cloud.bigtable.mutations import RowMutationEntry + + new_value = uuid.uuid4().hex.encode() + row_key, mutation = await _create_row_and_mutation( + table, temp_rows, new_value=new_value + ) + bulk_mutation = RowMutationEntry(row_key, [mutation]) + flush_interval = 0.1 + async with table.mutations_batcher(flush_interval=flush_interval) as batcher: + await batcher.append(bulk_mutation) + await asyncio.sleep(0) + assert len(batcher._staged_entries) == 1 + await asyncio.sleep(flush_interval + 0.1) + assert len(batcher._staged_entries) == 0 + # ensure cell is updated + assert (await _retrieve_cell_value(table, row_key)) == new_value + + +@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@pytest.mark.asyncio +async def test_mutations_batcher_count_flush(client, table, temp_rows): + """ + batch should flush after flush_limit_mutation_count mutations + """ + from google.cloud.bigtable.mutations import RowMutationEntry + + new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)] + row_key, mutation = await _create_row_and_mutation( + table, temp_rows, new_value=new_value + ) + bulk_mutation = RowMutationEntry(row_key, [mutation]) + row_key2, mutation2 = await _create_row_and_mutation( + table, temp_rows, new_value=new_value2 + ) + bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) + + async with table.mutations_batcher(flush_limit_mutation_count=2) as batcher: + await batcher.append(bulk_mutation) + assert len(batcher._flush_jobs) == 0 + # should be noop; flush not scheduled + assert len(batcher._staged_entries) == 1 + await batcher.append(bulk_mutation2) + # task should now be scheduled + assert len(batcher._flush_jobs) == 1 + await asyncio.gather(*batcher._flush_jobs) + assert len(batcher._staged_entries) == 0 + assert len(batcher._flush_jobs) == 0 + # ensure cells were updated + assert (await _retrieve_cell_value(table, row_key)) == new_value + assert (await _retrieve_cell_value(table, row_key2)) == new_value2 + + +@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@pytest.mark.asyncio +async def test_mutations_batcher_bytes_flush(client, table, temp_rows): + """ + batch should flush after flush_limit_bytes bytes + """ + from google.cloud.bigtable.mutations import RowMutationEntry + + new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)] + row_key, mutation = await _create_row_and_mutation( + table, temp_rows, new_value=new_value + ) + bulk_mutation = RowMutationEntry(row_key, [mutation]) + row_key2, mutation2 = await _create_row_and_mutation( + table, temp_rows, new_value=new_value2 + ) + bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) + + flush_limit = bulk_mutation.size() + bulk_mutation2.size() - 1 + + async with table.mutations_batcher(flush_limit_bytes=flush_limit) as batcher: + await batcher.append(bulk_mutation) + assert len(batcher._flush_jobs) == 0 + assert len(batcher._staged_entries) == 1 + await batcher.append(bulk_mutation2) + # task should now be scheduled + assert len(batcher._flush_jobs) == 1 + assert len(batcher._staged_entries) == 0 + # let flush complete + await asyncio.gather(*batcher._flush_jobs) + # ensure cells were updated + assert (await _retrieve_cell_value(table, row_key)) == new_value + assert (await _retrieve_cell_value(table, row_key2)) == new_value2 + + +@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@pytest.mark.asyncio +async def test_mutations_batcher_no_flush(client, table, temp_rows): + """ + test with no flush requirements met + """ + from google.cloud.bigtable.mutations import RowMutationEntry + + new_value = uuid.uuid4().hex.encode() + start_value = b"unchanged" + row_key, mutation = await _create_row_and_mutation( + table, temp_rows, start_value=start_value, new_value=new_value + ) + bulk_mutation = RowMutationEntry(row_key, [mutation]) + row_key2, mutation2 = await _create_row_and_mutation( + table, temp_rows, start_value=start_value, new_value=new_value + ) + bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) + + size_limit = bulk_mutation.size() + bulk_mutation2.size() + 1 + async with table.mutations_batcher( + flush_limit_bytes=size_limit, flush_limit_mutation_count=3, flush_interval=1 + ) as batcher: + await batcher.append(bulk_mutation) + assert len(batcher._staged_entries) == 1 + await batcher.append(bulk_mutation2) + # flush not scheduled + assert len(batcher._flush_jobs) == 0 + await asyncio.sleep(0.01) + assert len(batcher._staged_entries) == 2 + assert len(batcher._flush_jobs) == 0 + # ensure cells were not updated + assert (await _retrieve_cell_value(table, row_key)) == start_value + assert (await _retrieve_cell_value(table, row_key2)) == start_value @pytest.mark.parametrize( diff --git a/tests/unit/test__mutate_rows.py b/tests/unit/test__mutate_rows.py index 4fba16f23..18b2beede 100644 --- a/tests/unit/test__mutate_rows.py +++ b/tests/unit/test__mutate_rows.py @@ -27,6 +27,13 @@ from mock import AsyncMock # type: ignore +def _make_mutation(count=1, size=1): + mutation = mock.Mock() + mutation.size.return_value = size + mutation.mutations = [mock.Mock()] * count + return mutation + + class TestMutateRowsOperation: def _target_class(self): from google.cloud.bigtable._mutate_rows import _MutateRowsOperation @@ -72,7 +79,7 @@ def test_ctor(self): client = mock.Mock() table = mock.Mock() - entries = [mock.Mock(), mock.Mock()] + entries = [_make_mutation(), _make_mutation()] operation_timeout = 0.05 attempt_timeout = 0.01 instance = self._make_one( @@ -105,6 +112,37 @@ def test_ctor(self): assert instance.remaining_indices == list(range(len(entries))) assert instance.errors == {} + def test_ctor_too_many_entries(self): + """ + should raise an error if an operation is created with more than 100,000 entries + """ + from google.cloud.bigtable._mutate_rows import ( + MUTATE_ROWS_REQUEST_MUTATION_LIMIT, + ) + + assert MUTATE_ROWS_REQUEST_MUTATION_LIMIT == 100_000 + + client = mock.Mock() + table = mock.Mock() + entries = [_make_mutation()] * MUTATE_ROWS_REQUEST_MUTATION_LIMIT + operation_timeout = 0.05 + attempt_timeout = 0.01 + # no errors if at limit + self._make_one(client, table, entries, operation_timeout, attempt_timeout) + # raise error after crossing + with pytest.raises(ValueError) as e: + self._make_one( + client, + table, + entries + [_make_mutation()], + operation_timeout, + attempt_timeout, + ) + assert "mutate_rows requests can contain at most 100000 mutations" in str( + e.value + ) + assert "Found 100001" in str(e.value) + @pytest.mark.asyncio async def test_mutate_rows_operation(self): """ @@ -112,7 +150,7 @@ async def test_mutate_rows_operation(self): """ client = mock.Mock() table = mock.Mock() - entries = [mock.Mock(), mock.Mock()] + entries = [_make_mutation(), _make_mutation()] operation_timeout = 0.05 instance = self._make_one( client, table, entries, operation_timeout, operation_timeout @@ -135,7 +173,7 @@ async def test_mutate_rows_exception(self, exc_type): client = mock.Mock() table = mock.Mock() - entries = [mock.Mock()] + entries = [_make_mutation()] operation_timeout = 0.05 expected_cause = exc_type("abort") with mock.patch.object( @@ -170,7 +208,7 @@ async def test_mutate_rows_exception_retryable_eventually_pass(self, exc_type): client = mock.Mock() table = mock.Mock() - entries = [mock.Mock()] + entries = [_make_mutation()] operation_timeout = 1 expected_cause = exc_type("retry") num_retries = 2 @@ -197,7 +235,7 @@ async def test_mutate_rows_incomplete_ignored(self): client = mock.Mock() table = mock.Mock() - entries = [mock.Mock()] + entries = [_make_mutation()] operation_timeout = 0.05 with mock.patch.object( self._target_class(), @@ -220,12 +258,11 @@ async def test_mutate_rows_incomplete_ignored(self): @pytest.mark.asyncio async def test_run_attempt_single_entry_success(self): """Test mutating a single entry""" - mutation = mock.Mock() - mutations = {0: mutation} + mutation = _make_mutation() expected_timeout = 1.3 - mock_gapic_fn = self._make_mock_gapic(mutations) + mock_gapic_fn = self._make_mock_gapic({0: mutation}) instance = self._make_one( - mutation_entries=mutations, + mutation_entries=[mutation], per_request_timeout=expected_timeout, ) with mock.patch.object(instance, "_gapic_fn", mock_gapic_fn): @@ -251,9 +288,9 @@ async def test_run_attempt_partial_success_retryable(self): """Some entries succeed, but one fails. Should report the proper index, and raise incomplete exception""" from google.cloud.bigtable._mutate_rows import _MutateRowsIncomplete - success_mutation = mock.Mock() - success_mutation_2 = mock.Mock() - failure_mutation = mock.Mock() + success_mutation = _make_mutation() + success_mutation_2 = _make_mutation() + failure_mutation = _make_mutation() mutations = [success_mutation, failure_mutation, success_mutation_2] mock_gapic_fn = self._make_mock_gapic(mutations, error_dict={1: 300}) instance = self._make_one( @@ -272,9 +309,9 @@ async def test_run_attempt_partial_success_retryable(self): @pytest.mark.asyncio async def test_run_attempt_partial_success_non_retryable(self): """Some entries succeed, but one fails. Exception marked as non-retryable. Do not raise incomplete error""" - success_mutation = mock.Mock() - success_mutation_2 = mock.Mock() - failure_mutation = mock.Mock() + success_mutation = _make_mutation() + success_mutation_2 = _make_mutation() + failure_mutation = _make_mutation() mutations = [success_mutation, failure_mutation, success_mutation_2] mock_gapic_fn = self._make_mock_gapic(mutations, error_dict={1: 300}) instance = self._make_one( diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 805a6340d..3557c1c16 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2552,14 +2552,18 @@ async def test_bulk_mutate_row_metadata(self, include_app_profile): async with client.get_table("i", "t", app_profile_id=profile) as table: with mock.patch.object( client._gapic_client, "mutate_rows", AsyncMock() - ) as read_rows: - read_rows.side_effect = core_exceptions.Aborted("mock") + ) as mutate_rows: + mutate_rows.side_effect = core_exceptions.Aborted("mock") + mutation = mock.Mock() + mutation.size.return_value = 1 + entry = mock.Mock() + entry.mutations = [mutation] try: - await table.bulk_mutate_rows([mock.Mock()]) + await table.bulk_mutate_rows([entry]) except Exception: # exception used to end early pass - kwargs = read_rows.call_args_list[0].kwargs + kwargs = mutate_rows.call_args_list[0].kwargs metadata = kwargs["metadata"] goog_metadata = None for key, value in metadata: diff --git a/tests/unit/test_exceptions.py b/tests/unit/test_exceptions.py index e68ccf5e8..ef186a47c 100644 --- a/tests/unit/test_exceptions.py +++ b/tests/unit/test_exceptions.py @@ -136,12 +136,12 @@ def _make_one(self, excs=None, num_entries=3): @pytest.mark.parametrize( "exception_list,total_entries,expected_message", [ - ([Exception()], 1, "1 sub-exception (from 1 entry attempted)"), - ([Exception()], 2, "1 sub-exception (from 2 entries attempted)"), + ([Exception()], 1, "1 failed entry from 1 attempted."), + ([Exception()], 2, "1 failed entry from 2 attempted."), ( [Exception(), RuntimeError()], 2, - "2 sub-exceptions (from 2 entries attempted)", + "2 failed entries from 2 attempted.", ), ], ) @@ -154,6 +154,77 @@ def test_raise(self, exception_list, total_entries, expected_message): assert str(e.value) == expected_message assert list(e.value.exceptions) == exception_list + def test_raise_custom_message(self): + """ + should be able to set a custom error message + """ + custom_message = "custom message" + exception_list = [Exception()] + with pytest.raises(self._get_class()) as e: + raise self._get_class()(exception_list, 5, message=custom_message) + assert str(e.value) == custom_message + assert list(e.value.exceptions) == exception_list + + @pytest.mark.parametrize( + "first_list_len,second_list_len,total_excs,entry_count,expected_message", + [ + (3, 0, 3, 4, "3 failed entries from 4 attempted."), + (1, 0, 1, 2, "1 failed entry from 2 attempted."), + (0, 1, 1, 2, "1 failed entry from 2 attempted."), + (2, 2, 4, 4, "4 failed entries from 4 attempted."), + ( + 1, + 1, + 3, + 2, + "3 failed entries from 2 attempted. (first 1 and last 1 attached as sub-exceptions; 1 truncated)", + ), + ( + 1, + 2, + 100, + 2, + "100 failed entries from 2 attempted. (first 1 and last 2 attached as sub-exceptions; 97 truncated)", + ), + ( + 2, + 1, + 4, + 9, + "4 failed entries from 9 attempted. (first 2 and last 1 attached as sub-exceptions; 1 truncated)", + ), + ( + 3, + 0, + 10, + 10, + "10 failed entries from 10 attempted. (first 3 attached as sub-exceptions; 7 truncated)", + ), + ( + 0, + 3, + 10, + 10, + "10 failed entries from 10 attempted. (last 3 attached as sub-exceptions; 7 truncated)", + ), + ], + ) + def test_from_truncated_lists( + self, first_list_len, second_list_len, total_excs, entry_count, expected_message + ): + """ + Should be able to make MutationsExceptionGroup using a pair of + lists representing a larger truncated list of exceptions + """ + first_list = [Exception()] * first_list_len + second_list = [Exception()] * second_list_len + with pytest.raises(self._get_class()) as e: + raise self._get_class().from_truncated_lists( + first_list, second_list, total_excs, entry_count + ) + assert str(e.value) == expected_message + assert list(e.value.exceptions) == first_list + second_list + class TestRetryExceptionGroup(TestBigtableExceptionGroup): def _get_class(self): @@ -281,6 +352,25 @@ def test_raise_idempotent(self): assert e.value.__cause__ == test_exc assert test_entry.is_idempotent.call_count == 1 + def test_no_index(self): + """ + Instances without an index should display different error string + """ + test_idx = None + test_entry = unittest.mock.Mock() + test_exc = ValueError("test") + with pytest.raises(self._get_class()) as e: + raise self._get_class()(test_idx, test_entry, test_exc) + assert ( + str(e.value) + == "Failed idempotent mutation entry with cause: ValueError('test')" + ) + assert e.value.index == test_idx + assert e.value.entry == test_entry + assert e.value.__cause__ == test_exc + assert isinstance(e.value, Exception) + assert test_entry.is_idempotent.call_count == 1 + class TestFailedQueryShardError: def _get_class(self): diff --git a/tests/unit/test_mutations.py b/tests/unit/test_mutations.py index 5730c53c9..c8c6788b1 100644 --- a/tests/unit/test_mutations.py +++ b/tests/unit/test_mutations.py @@ -45,6 +45,16 @@ def test___str__(self): assert self_mock._to_dict.called assert str_value == str(self_mock._to_dict.return_value) + @pytest.mark.parametrize("test_dict", [{}, {"key": "value"}]) + def test_size(self, test_dict): + from sys import getsizeof + + """Size should return size of dict representation""" + self_mock = mock.Mock() + self_mock._to_dict.return_value = test_dict + size_value = self._target_class().size(self_mock) + assert size_value == getsizeof(test_dict) + @pytest.mark.parametrize( "expected_class,input_dict", [ @@ -494,6 +504,21 @@ def test_ctor(self): assert instance.row_key == expected_key assert list(instance.mutations) == expected_mutations + def test_ctor_over_limit(self): + """Should raise error if mutations exceed MAX_MUTATIONS_PER_ENTRY""" + from google.cloud.bigtable._mutate_rows import ( + MUTATE_ROWS_REQUEST_MUTATION_LIMIT, + ) + + assert MUTATE_ROWS_REQUEST_MUTATION_LIMIT == 100_000 + # no errors at limit + expected_mutations = [None for _ in range(MUTATE_ROWS_REQUEST_MUTATION_LIMIT)] + self._make_one(b"row_key", expected_mutations) + # error if over limit + with pytest.raises(ValueError) as e: + self._make_one("key", expected_mutations + [mock.Mock()]) + assert "entries must have <= 100000 mutations" in str(e.value) + def test_ctor_str_key(self): expected_key = "row_key" expected_mutations = [mock.Mock(), mock.Mock()] @@ -528,7 +553,6 @@ def test__to_dict(self): @pytest.mark.parametrize( "mutations,result", [ - ([], True), ([mock.Mock(is_idempotent=lambda: True)], True), ([mock.Mock(is_idempotent=lambda: False)], False), ( @@ -551,6 +575,21 @@ def test_is_idempotent(self, mutations, result): instance = self._make_one("row_key", mutations) assert instance.is_idempotent() == result + def test_empty_mutations(self): + with pytest.raises(ValueError) as e: + self._make_one("row_key", []) + assert "must not be empty" in str(e.value) + + @pytest.mark.parametrize("test_dict", [{}, {"key": "value"}]) + def test_size(self, test_dict): + from sys import getsizeof + + """Size should return size of dict representation""" + self_mock = mock.Mock() + self_mock._to_dict.return_value = test_dict + size_value = self._target_class().size(self_mock) + assert size_value == getsizeof(test_dict) + def test__from_dict_mock(self): """ test creating instance from entry dict, with mocked mutation._from_dict diff --git a/tests/unit/test_mutations_batcher.py b/tests/unit/test_mutations_batcher.py new file mode 100644 index 000000000..a900468d5 --- /dev/null +++ b/tests/unit/test_mutations_batcher.py @@ -0,0 +1,1097 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import asyncio + +# try/except added for compatibility with python < 3.8 +try: + from unittest import mock + from unittest.mock import AsyncMock +except ImportError: # pragma: NO COVER + import mock # type: ignore + from mock import AsyncMock # type: ignore + + +def _make_mutation(count=1, size=1): + mutation = mock.Mock() + mutation.size.return_value = size + mutation.mutations = [mock.Mock()] * count + return mutation + + +class Test_FlowControl: + def _make_one(self, max_mutation_count=10, max_mutation_bytes=100): + from google.cloud.bigtable.mutations_batcher import _FlowControl + + return _FlowControl(max_mutation_count, max_mutation_bytes) + + def test_ctor(self): + max_mutation_count = 9 + max_mutation_bytes = 19 + instance = self._make_one(max_mutation_count, max_mutation_bytes) + assert instance._max_mutation_count == max_mutation_count + assert instance._max_mutation_bytes == max_mutation_bytes + assert instance._in_flight_mutation_count == 0 + assert instance._in_flight_mutation_bytes == 0 + assert isinstance(instance._capacity_condition, asyncio.Condition) + + def test_ctor_invalid_values(self): + """Test that values are positive, and fit within expected limits""" + with pytest.raises(ValueError) as e: + self._make_one(0, 1) + assert "max_mutation_count must be greater than 0" in str(e.value) + with pytest.raises(ValueError) as e: + self._make_one(1, 0) + assert "max_mutation_bytes must be greater than 0" in str(e.value) + + @pytest.mark.parametrize( + "max_count,max_size,existing_count,existing_size,new_count,new_size,expected", + [ + (1, 1, 0, 0, 0, 0, True), + (1, 1, 1, 1, 1, 1, False), + (10, 10, 0, 0, 0, 0, True), + (10, 10, 0, 0, 9, 9, True), + (10, 10, 0, 0, 11, 9, True), + (10, 10, 0, 1, 11, 9, True), + (10, 10, 1, 0, 11, 9, False), + (10, 10, 0, 0, 9, 11, True), + (10, 10, 1, 0, 9, 11, True), + (10, 10, 0, 1, 9, 11, False), + (10, 1, 0, 0, 1, 0, True), + (1, 10, 0, 0, 0, 8, True), + (float("inf"), float("inf"), 0, 0, 1e10, 1e10, True), + (8, 8, 0, 0, 1e10, 1e10, True), + (12, 12, 6, 6, 5, 5, True), + (12, 12, 5, 5, 6, 6, True), + (12, 12, 6, 6, 6, 6, True), + (12, 12, 6, 6, 7, 7, False), + # allow capacity check if new_count or new_size exceeds limits + (12, 12, 0, 0, 13, 13, True), + (12, 12, 12, 0, 0, 13, True), + (12, 12, 0, 12, 13, 0, True), + # but not if there's already values in flight + (12, 12, 1, 1, 13, 13, False), + (12, 12, 1, 1, 0, 13, False), + (12, 12, 1, 1, 13, 0, False), + ], + ) + def test__has_capacity( + self, + max_count, + max_size, + existing_count, + existing_size, + new_count, + new_size, + expected, + ): + """ + _has_capacity should return True if the new mutation will will not exceed the max count or size + """ + instance = self._make_one(max_count, max_size) + instance._in_flight_mutation_count = existing_count + instance._in_flight_mutation_bytes = existing_size + assert instance._has_capacity(new_count, new_size) == expected + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "existing_count,existing_size,added_count,added_size,new_count,new_size", + [ + (0, 0, 0, 0, 0, 0), + (2, 2, 1, 1, 1, 1), + (2, 0, 1, 0, 1, 0), + (0, 2, 0, 1, 0, 1), + (10, 10, 0, 0, 10, 10), + (10, 10, 5, 5, 5, 5), + (0, 0, 1, 1, -1, -1), + ], + ) + async def test_remove_from_flow_value_update( + self, + existing_count, + existing_size, + added_count, + added_size, + new_count, + new_size, + ): + """ + completed mutations should lower the inflight values + """ + instance = self._make_one() + instance._in_flight_mutation_count = existing_count + instance._in_flight_mutation_bytes = existing_size + mutation = _make_mutation(added_count, added_size) + await instance.remove_from_flow(mutation) + assert instance._in_flight_mutation_count == new_count + assert instance._in_flight_mutation_bytes == new_size + + @pytest.mark.asyncio + async def test__remove_from_flow_unlock(self): + """capacity condition should notify after mutation is complete""" + instance = self._make_one(10, 10) + instance._in_flight_mutation_count = 10 + instance._in_flight_mutation_bytes = 10 + + async def task_routine(): + async with instance._capacity_condition: + await instance._capacity_condition.wait_for( + lambda: instance._has_capacity(1, 1) + ) + + task = asyncio.create_task(task_routine()) + await asyncio.sleep(0.05) + # should be blocked due to capacity + assert task.done() is False + # try changing size + mutation = _make_mutation(count=0, size=5) + await instance.remove_from_flow([mutation]) + await asyncio.sleep(0.05) + assert instance._in_flight_mutation_count == 10 + assert instance._in_flight_mutation_bytes == 5 + assert task.done() is False + # try changing count + instance._in_flight_mutation_bytes = 10 + mutation = _make_mutation(count=5, size=0) + await instance.remove_from_flow([mutation]) + await asyncio.sleep(0.05) + assert instance._in_flight_mutation_count == 5 + assert instance._in_flight_mutation_bytes == 10 + assert task.done() is False + # try changing both + instance._in_flight_mutation_count = 10 + mutation = _make_mutation(count=5, size=5) + await instance.remove_from_flow([mutation]) + await asyncio.sleep(0.05) + assert instance._in_flight_mutation_count == 5 + assert instance._in_flight_mutation_bytes == 5 + # task should be complete + assert task.done() is True + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mutations,count_cap,size_cap,expected_results", + [ + # high capacity results in no batching + ([(5, 5), (1, 1), (1, 1)], 10, 10, [[(5, 5), (1, 1), (1, 1)]]), + # low capacity splits up into batches + ([(1, 1), (1, 1), (1, 1)], 1, 1, [[(1, 1)], [(1, 1)], [(1, 1)]]), + # test count as limiting factor + ([(1, 1), (1, 1), (1, 1)], 2, 10, [[(1, 1), (1, 1)], [(1, 1)]]), + # test size as limiting factor + ([(1, 1), (1, 1), (1, 1)], 10, 2, [[(1, 1), (1, 1)], [(1, 1)]]), + # test with some bloackages and some flows + ( + [(1, 1), (5, 5), (4, 1), (1, 4), (1, 1)], + 5, + 5, + [[(1, 1)], [(5, 5)], [(4, 1), (1, 4)], [(1, 1)]], + ), + ], + ) + async def test_add_to_flow(self, mutations, count_cap, size_cap, expected_results): + """ + Test batching with various flow control settings + """ + mutation_objs = [_make_mutation(count=m[0], size=m[1]) for m in mutations] + instance = self._make_one(count_cap, size_cap) + i = 0 + async for batch in instance.add_to_flow(mutation_objs): + expected_batch = expected_results[i] + assert len(batch) == len(expected_batch) + for j in range(len(expected_batch)): + # check counts + assert len(batch[j].mutations) == expected_batch[j][0] + # check sizes + assert batch[j].size() == expected_batch[j][1] + # update lock + await instance.remove_from_flow(batch) + i += 1 + assert i == len(expected_results) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mutations,max_limit,expected_results", + [ + ([(1, 1)] * 11, 10, [[(1, 1)] * 10, [(1, 1)]]), + ([(1, 1)] * 10, 1, [[(1, 1)] for _ in range(10)]), + ([(1, 1)] * 10, 2, [[(1, 1), (1, 1)] for _ in range(5)]), + ], + ) + async def test_add_to_flow_max_mutation_limits( + self, mutations, max_limit, expected_results + ): + """ + Test flow control running up against the max API limit + Should submit request early, even if the flow control has room for more + """ + with mock.patch( + "google.cloud.bigtable.mutations_batcher.MUTATE_ROWS_REQUEST_MUTATION_LIMIT", + max_limit, + ): + mutation_objs = [_make_mutation(count=m[0], size=m[1]) for m in mutations] + # flow control has no limits except API restrictions + instance = self._make_one(float("inf"), float("inf")) + i = 0 + async for batch in instance.add_to_flow(mutation_objs): + expected_batch = expected_results[i] + assert len(batch) == len(expected_batch) + for j in range(len(expected_batch)): + # check counts + assert len(batch[j].mutations) == expected_batch[j][0] + # check sizes + assert batch[j].size() == expected_batch[j][1] + # update lock + await instance.remove_from_flow(batch) + i += 1 + assert i == len(expected_results) + + @pytest.mark.asyncio + async def test_add_to_flow_oversize(self): + """ + mutations over the flow control limits should still be accepted + """ + instance = self._make_one(2, 3) + large_size_mutation = _make_mutation(count=1, size=10) + large_count_mutation = _make_mutation(count=10, size=1) + results = [out async for out in instance.add_to_flow([large_size_mutation])] + assert len(results) == 1 + await instance.remove_from_flow(results[0]) + count_results = [ + out async for out in instance.add_to_flow(large_count_mutation) + ] + assert len(count_results) == 1 + + +class TestMutationsBatcher: + def _get_target_class(self): + from google.cloud.bigtable.mutations_batcher import MutationsBatcher + + return MutationsBatcher + + def _make_one(self, table=None, **kwargs): + if table is None: + table = mock.Mock() + table.default_operation_timeout = 10 + table.default_per_request_timeout = 10 + + return self._get_target_class()(table, **kwargs) + + @mock.patch( + "google.cloud.bigtable.mutations_batcher.MutationsBatcher._start_flush_timer" + ) + @pytest.mark.asyncio + async def test_ctor_defaults(self, flush_timer_mock): + flush_timer_mock.return_value = asyncio.create_task(asyncio.sleep(0)) + table = mock.Mock() + table.default_operation_timeout = 10 + table.default_per_request_timeout = 8 + async with self._make_one(table) as instance: + assert instance._table == table + assert instance.closed is False + assert instance._flush_jobs == set() + assert len(instance._staged_entries) == 0 + assert len(instance._oldest_exceptions) == 0 + assert len(instance._newest_exceptions) == 0 + assert instance._exception_list_limit == 10 + assert instance._exceptions_since_last_raise == 0 + assert instance._flow_control._max_mutation_count == 100000 + assert instance._flow_control._max_mutation_bytes == 104857600 + assert instance._flow_control._in_flight_mutation_count == 0 + assert instance._flow_control._in_flight_mutation_bytes == 0 + assert instance._entries_processed_since_last_raise == 0 + assert instance._operation_timeout == table.default_operation_timeout + assert instance._per_request_timeout == table.default_per_request_timeout + await asyncio.sleep(0) + assert flush_timer_mock.call_count == 1 + assert flush_timer_mock.call_args[0][0] == 5 + assert isinstance(instance._flush_timer, asyncio.Future) + + @mock.patch( + "google.cloud.bigtable.mutations_batcher.MutationsBatcher._start_flush_timer", + ) + @pytest.mark.asyncio + async def test_ctor_explicit(self, flush_timer_mock): + """Test with explicit parameters""" + flush_timer_mock.return_value = asyncio.create_task(asyncio.sleep(0)) + table = mock.Mock() + flush_interval = 20 + flush_limit_count = 17 + flush_limit_bytes = 19 + flow_control_max_mutation_count = 1001 + flow_control_max_bytes = 12 + operation_timeout = 11 + per_request_timeout = 2 + async with self._make_one( + table, + flush_interval=flush_interval, + flush_limit_mutation_count=flush_limit_count, + flush_limit_bytes=flush_limit_bytes, + flow_control_max_mutation_count=flow_control_max_mutation_count, + flow_control_max_bytes=flow_control_max_bytes, + batch_operation_timeout=operation_timeout, + batch_per_request_timeout=per_request_timeout, + ) as instance: + assert instance._table == table + assert instance.closed is False + assert instance._flush_jobs == set() + assert len(instance._staged_entries) == 0 + assert len(instance._oldest_exceptions) == 0 + assert len(instance._newest_exceptions) == 0 + assert instance._exception_list_limit == 10 + assert instance._exceptions_since_last_raise == 0 + assert ( + instance._flow_control._max_mutation_count + == flow_control_max_mutation_count + ) + assert instance._flow_control._max_mutation_bytes == flow_control_max_bytes + assert instance._flow_control._in_flight_mutation_count == 0 + assert instance._flow_control._in_flight_mutation_bytes == 0 + assert instance._entries_processed_since_last_raise == 0 + assert instance._operation_timeout == operation_timeout + assert instance._per_request_timeout == per_request_timeout + await asyncio.sleep(0) + assert flush_timer_mock.call_count == 1 + assert flush_timer_mock.call_args[0][0] == flush_interval + assert isinstance(instance._flush_timer, asyncio.Future) + + @mock.patch( + "google.cloud.bigtable.mutations_batcher.MutationsBatcher._start_flush_timer" + ) + @pytest.mark.asyncio + async def test_ctor_no_flush_limits(self, flush_timer_mock): + """Test with None for flush limits""" + flush_timer_mock.return_value = asyncio.create_task(asyncio.sleep(0)) + table = mock.Mock() + table.default_operation_timeout = 10 + table.default_per_request_timeout = 8 + flush_interval = None + flush_limit_count = None + flush_limit_bytes = None + async with self._make_one( + table, + flush_interval=flush_interval, + flush_limit_mutation_count=flush_limit_count, + flush_limit_bytes=flush_limit_bytes, + ) as instance: + assert instance._table == table + assert instance.closed is False + assert instance._staged_entries == [] + assert len(instance._oldest_exceptions) == 0 + assert len(instance._newest_exceptions) == 0 + assert instance._exception_list_limit == 10 + assert instance._exceptions_since_last_raise == 0 + assert instance._flow_control._in_flight_mutation_count == 0 + assert instance._flow_control._in_flight_mutation_bytes == 0 + assert instance._entries_processed_since_last_raise == 0 + await asyncio.sleep(0) + assert flush_timer_mock.call_count == 1 + assert flush_timer_mock.call_args[0][0] is None + assert isinstance(instance._flush_timer, asyncio.Future) + + @pytest.mark.asyncio + async def test_ctor_invalid_values(self): + """Test that timeout values are positive, and fit within expected limits""" + with pytest.raises(ValueError) as e: + self._make_one(batch_operation_timeout=-1) + assert "batch_operation_timeout must be greater than 0" in str(e.value) + with pytest.raises(ValueError) as e: + self._make_one(batch_per_request_timeout=-1) + assert "batch_per_request_timeout must be greater than 0" in str(e.value) + with pytest.raises(ValueError) as e: + self._make_one(batch_operation_timeout=1, batch_per_request_timeout=2) + assert ( + "batch_per_request_timeout must be less than batch_operation_timeout" + in str(e.value) + ) + + def test_default_argument_consistency(self): + """ + We supply default arguments in MutationsBatcher.__init__, and in + table.mutations_batcher. Make sure any changes to defaults are applied to + both places + """ + from google.cloud.bigtable.client import Table + from google.cloud.bigtable.mutations_batcher import MutationsBatcher + import inspect + + get_batcher_signature = dict( + inspect.signature(Table.mutations_batcher).parameters + ) + get_batcher_signature.pop("self") + batcher_init_signature = dict(inspect.signature(MutationsBatcher).parameters) + batcher_init_signature.pop("table") + # both should have same number of arguments + assert len(get_batcher_signature.keys()) == len(batcher_init_signature.keys()) + assert len(get_batcher_signature) == 7 # update if expected params change + # both should have same argument names + assert set(get_batcher_signature.keys()) == set(batcher_init_signature.keys()) + # both should have same default values + for arg_name in get_batcher_signature.keys(): + assert ( + get_batcher_signature[arg_name].default + == batcher_init_signature[arg_name].default + ) + + @mock.patch( + "google.cloud.bigtable.mutations_batcher.MutationsBatcher._schedule_flush" + ) + @pytest.mark.asyncio + async def test__start_flush_timer_w_None(self, flush_mock): + """Empty timer should return immediately""" + async with self._make_one() as instance: + with mock.patch("asyncio.sleep") as sleep_mock: + await instance._start_flush_timer(None) + assert sleep_mock.call_count == 0 + assert flush_mock.call_count == 0 + + @mock.patch( + "google.cloud.bigtable.mutations_batcher.MutationsBatcher._schedule_flush" + ) + @pytest.mark.asyncio + async def test__start_flush_timer_call_when_closed(self, flush_mock): + """closed batcher's timer should return immediately""" + async with self._make_one() as instance: + await instance.close() + flush_mock.reset_mock() + with mock.patch("asyncio.sleep") as sleep_mock: + await instance._start_flush_timer(1) + assert sleep_mock.call_count == 0 + assert flush_mock.call_count == 0 + + @mock.patch( + "google.cloud.bigtable.mutations_batcher.MutationsBatcher._schedule_flush" + ) + @pytest.mark.asyncio + async def test__flush_timer(self, flush_mock): + """Timer should continue to call _schedule_flush in a loop""" + expected_sleep = 12 + async with self._make_one(flush_interval=expected_sleep) as instance: + instance._staged_entries = [mock.Mock()] + loop_num = 3 + with mock.patch("asyncio.sleep") as sleep_mock: + sleep_mock.side_effect = [None] * loop_num + [asyncio.CancelledError()] + try: + await instance._flush_timer + except asyncio.CancelledError: + pass + assert sleep_mock.call_count == loop_num + 1 + sleep_mock.assert_called_with(expected_sleep) + assert flush_mock.call_count == loop_num + + @mock.patch( + "google.cloud.bigtable.mutations_batcher.MutationsBatcher._schedule_flush" + ) + @pytest.mark.asyncio + async def test__flush_timer_no_mutations(self, flush_mock): + """Timer should not flush if no new mutations have been staged""" + expected_sleep = 12 + async with self._make_one(flush_interval=expected_sleep) as instance: + loop_num = 3 + with mock.patch("asyncio.sleep") as sleep_mock: + sleep_mock.side_effect = [None] * loop_num + [asyncio.CancelledError()] + try: + await instance._flush_timer + except asyncio.CancelledError: + pass + assert sleep_mock.call_count == loop_num + 1 + sleep_mock.assert_called_with(expected_sleep) + assert flush_mock.call_count == 0 + + @mock.patch( + "google.cloud.bigtable.mutations_batcher.MutationsBatcher._schedule_flush" + ) + @pytest.mark.asyncio + async def test__flush_timer_close(self, flush_mock): + """Timer should continue terminate after close""" + async with self._make_one() as instance: + with mock.patch("asyncio.sleep"): + # let task run in background + await asyncio.sleep(0.5) + assert instance._flush_timer.done() is False + # close the batcher + await instance.close() + await asyncio.sleep(0.1) + # task should be complete + assert instance._flush_timer.done() is True + + @pytest.mark.asyncio + async def test_append_closed(self): + """Should raise exception""" + with pytest.raises(RuntimeError): + instance = self._make_one() + await instance.close() + await instance.append(mock.Mock()) + + @pytest.mark.asyncio + async def test_append_wrong_mutation(self): + """ + Mutation objects should raise an exception. + Only support RowMutationEntry + """ + from google.cloud.bigtable.mutations import DeleteAllFromRow + + async with self._make_one() as instance: + expected_error = "invalid mutation type: DeleteAllFromRow. Only RowMutationEntry objects are supported by batcher" + with pytest.raises(ValueError) as e: + await instance.append(DeleteAllFromRow()) + assert str(e.value) == expected_error + + @pytest.mark.asyncio + async def test_append_outside_flow_limits(self): + """entries larger than mutation limits are still processed""" + async with self._make_one( + flow_control_max_mutation_count=1, flow_control_max_bytes=1 + ) as instance: + oversized_entry = _make_mutation(count=0, size=2) + await instance.append(oversized_entry) + assert instance._staged_entries == [oversized_entry] + assert instance._staged_count == 0 + assert instance._staged_bytes == 2 + instance._staged_entries = [] + async with self._make_one( + flow_control_max_mutation_count=1, flow_control_max_bytes=1 + ) as instance: + overcount_entry = _make_mutation(count=2, size=0) + await instance.append(overcount_entry) + assert instance._staged_entries == [overcount_entry] + assert instance._staged_count == 2 + assert instance._staged_bytes == 0 + instance._staged_entries = [] + + @pytest.mark.asyncio + async def test_append_flush_runs_after_limit_hit(self): + """ + If the user appends a bunch of entries above the flush limits back-to-back, + it should still flush in a single task + """ + from google.cloud.bigtable.mutations_batcher import MutationsBatcher + + with mock.patch.object(MutationsBatcher, "_execute_mutate_rows") as op_mock: + async with self._make_one(flush_limit_bytes=100) as instance: + # mock network calls + async def mock_call(*args, **kwargs): + return [] + + op_mock.side_effect = mock_call + # append a mutation just under the size limit + await instance.append(_make_mutation(size=99)) + # append a bunch of entries back-to-back in a loop + num_entries = 10 + for _ in range(num_entries): + await instance.append(_make_mutation(size=1)) + # let any flush jobs finish + await asyncio.gather(*instance._flush_jobs) + # should have only flushed once, with large mutation and first mutation in loop + assert op_mock.call_count == 1 + sent_batch = op_mock.call_args[0][0] + assert len(sent_batch) == 2 + # others should still be pending + assert len(instance._staged_entries) == num_entries - 1 + + @pytest.mark.parametrize( + "flush_count,flush_bytes,mutation_count,mutation_bytes,expect_flush", + [ + (10, 10, 1, 1, False), + (10, 10, 9, 9, False), + (10, 10, 10, 1, True), + (10, 10, 1, 10, True), + (10, 10, 10, 10, True), + (1, 1, 10, 10, True), + (1, 1, 0, 0, False), + ], + ) + @pytest.mark.asyncio + async def test_append( + self, flush_count, flush_bytes, mutation_count, mutation_bytes, expect_flush + ): + """test appending different mutations, and checking if it causes a flush""" + async with self._make_one( + flush_limit_mutation_count=flush_count, flush_limit_bytes=flush_bytes + ) as instance: + assert instance._staged_count == 0 + assert instance._staged_bytes == 0 + assert instance._staged_entries == [] + mutation = _make_mutation(count=mutation_count, size=mutation_bytes) + with mock.patch.object(instance, "_schedule_flush") as flush_mock: + await instance.append(mutation) + assert flush_mock.call_count == bool(expect_flush) + assert instance._staged_count == mutation_count + assert instance._staged_bytes == mutation_bytes + assert instance._staged_entries == [mutation] + instance._staged_entries = [] + + @pytest.mark.asyncio + async def test_append_multiple_sequentially(self): + """Append multiple mutations""" + async with self._make_one( + flush_limit_mutation_count=8, flush_limit_bytes=8 + ) as instance: + assert instance._staged_count == 0 + assert instance._staged_bytes == 0 + assert instance._staged_entries == [] + mutation = _make_mutation(count=2, size=3) + with mock.patch.object(instance, "_schedule_flush") as flush_mock: + await instance.append(mutation) + assert flush_mock.call_count == 0 + assert instance._staged_count == 2 + assert instance._staged_bytes == 3 + assert len(instance._staged_entries) == 1 + await instance.append(mutation) + assert flush_mock.call_count == 0 + assert instance._staged_count == 4 + assert instance._staged_bytes == 6 + assert len(instance._staged_entries) == 2 + await instance.append(mutation) + assert flush_mock.call_count == 1 + assert instance._staged_count == 6 + assert instance._staged_bytes == 9 + assert len(instance._staged_entries) == 3 + instance._staged_entries = [] + + @pytest.mark.asyncio + async def test_flush_flow_control_concurrent_requests(self): + """ + requests should happen in parallel if flow control breaks up single flush into batches + """ + import time + + num_calls = 10 + fake_mutations = [_make_mutation(count=1) for _ in range(num_calls)] + async with self._make_one(flow_control_max_mutation_count=1) as instance: + with mock.patch.object( + instance, "_execute_mutate_rows", AsyncMock() + ) as op_mock: + # mock network calls + async def mock_call(*args, **kwargs): + await asyncio.sleep(0.1) + return [] + + op_mock.side_effect = mock_call + start_time = time.monotonic() + # flush one large batch, that will be broken up into smaller batches + instance._staged_entries = fake_mutations + instance._schedule_flush() + await asyncio.sleep(0.01) + # make room for new mutations + for i in range(num_calls): + await instance._flow_control.remove_from_flow( + [_make_mutation(count=1)] + ) + await asyncio.sleep(0.01) + # allow flushes to complete + await asyncio.gather(*instance._flush_jobs) + duration = time.monotonic() - start_time + assert len(instance._oldest_exceptions) == 0 + assert len(instance._newest_exceptions) == 0 + # if flushes were sequential, total duration would be 1s + assert duration < 0.25 + assert op_mock.call_count == num_calls + + @pytest.mark.asyncio + async def test_schedule_flush_no_mutations(self): + """schedule flush should return None if no staged mutations""" + async with self._make_one() as instance: + with mock.patch.object(instance, "_flush_internal") as flush_mock: + for i in range(3): + assert instance._schedule_flush() is None + assert flush_mock.call_count == 0 + + @pytest.mark.asyncio + async def test_schedule_flush_with_mutations(self): + """if new mutations exist, should add a new flush task to _flush_jobs""" + async with self._make_one() as instance: + with mock.patch.object(instance, "_flush_internal") as flush_mock: + for i in range(1, 4): + mutation = mock.Mock() + instance._staged_entries = [mutation] + instance._schedule_flush() + assert instance._staged_entries == [] + # let flush task run + await asyncio.sleep(0) + assert instance._staged_entries == [] + assert instance._staged_count == 0 + assert instance._staged_bytes == 0 + assert flush_mock.call_count == i + + @pytest.mark.asyncio + async def test__flush_internal(self): + """ + _flush_internal should: + - await previous flush call + - delegate batching to _flow_control + - call _execute_mutate_rows on each batch + - update self.exceptions and self._entries_processed_since_last_raise + """ + num_entries = 10 + async with self._make_one() as instance: + with mock.patch.object(instance, "_execute_mutate_rows") as execute_mock: + with mock.patch.object( + instance._flow_control, "add_to_flow" + ) as flow_mock: + # mock flow control to always return a single batch + async def gen(x): + yield x + + flow_mock.side_effect = lambda x: gen(x) + mutations = [_make_mutation(count=1, size=1)] * num_entries + await instance._flush_internal(mutations) + assert instance._entries_processed_since_last_raise == num_entries + assert execute_mock.call_count == 1 + assert flow_mock.call_count == 1 + instance._oldest_exceptions.clear() + instance._newest_exceptions.clear() + + @pytest.mark.asyncio + async def test_flush_clears_job_list(self): + """ + a job should be added to _flush_jobs when _schedule_flush is called, + and removed when it completes + """ + async with self._make_one() as instance: + with mock.patch.object(instance, "_flush_internal", AsyncMock()): + mutations = [_make_mutation(count=1, size=1)] + instance._staged_entries = mutations + assert instance._flush_jobs == set() + new_job = instance._schedule_flush() + assert instance._flush_jobs == {new_job} + await new_job + assert instance._flush_jobs == set() + + @pytest.mark.parametrize( + "num_starting,num_new_errors,expected_total_errors", + [ + (0, 0, 0), + (0, 1, 1), + (0, 2, 2), + (1, 0, 1), + (1, 1, 2), + (10, 2, 12), + (10, 20, 20), # should cap at 20 + ], + ) + @pytest.mark.asyncio + async def test__flush_internal_with_errors( + self, num_starting, num_new_errors, expected_total_errors + ): + """ + errors returned from _execute_mutate_rows should be added to internal exceptions + """ + from google.cloud.bigtable import exceptions + + num_entries = 10 + expected_errors = [ + exceptions.FailedMutationEntryError(mock.Mock(), mock.Mock(), ValueError()) + ] * num_new_errors + async with self._make_one() as instance: + instance._oldest_exceptions = [mock.Mock()] * num_starting + with mock.patch.object(instance, "_execute_mutate_rows") as execute_mock: + execute_mock.return_value = expected_errors + with mock.patch.object( + instance._flow_control, "add_to_flow" + ) as flow_mock: + # mock flow control to always return a single batch + async def gen(x): + yield x + + flow_mock.side_effect = lambda x: gen(x) + mutations = [_make_mutation(count=1, size=1)] * num_entries + await instance._flush_internal(mutations) + assert instance._entries_processed_since_last_raise == num_entries + assert execute_mock.call_count == 1 + assert flow_mock.call_count == 1 + found_exceptions = instance._oldest_exceptions + list( + instance._newest_exceptions + ) + assert len(found_exceptions) == expected_total_errors + for i in range(num_starting, expected_total_errors): + assert found_exceptions[i] == expected_errors[i - num_starting] + # errors should have index stripped + assert found_exceptions[i].index is None + # clear out exceptions + instance._oldest_exceptions.clear() + instance._newest_exceptions.clear() + + async def _mock_gapic_return(self, num=5): + from google.cloud.bigtable_v2.types import MutateRowsResponse + from google.rpc import status_pb2 + + async def gen(num): + for i in range(num): + entry = MutateRowsResponse.Entry( + index=i, status=status_pb2.Status(code=0) + ) + yield MutateRowsResponse(entries=[entry]) + + return gen(num) + + @pytest.mark.asyncio + async def test_timer_flush_end_to_end(self): + """Flush should automatically trigger after flush_interval""" + num_nutations = 10 + mutations = [_make_mutation(count=2, size=2)] * num_nutations + + async with self._make_one(flush_interval=0.05) as instance: + instance._table.default_operation_timeout = 10 + instance._table.default_per_request_timeout = 9 + with mock.patch.object( + instance._table.client._gapic_client, "mutate_rows" + ) as gapic_mock: + gapic_mock.side_effect = ( + lambda *args, **kwargs: self._mock_gapic_return(num_nutations) + ) + for m in mutations: + await instance.append(m) + assert instance._entries_processed_since_last_raise == 0 + # let flush trigger due to timer + await asyncio.sleep(0.1) + assert instance._entries_processed_since_last_raise == num_nutations + + @pytest.mark.asyncio + @mock.patch( + "google.cloud.bigtable.mutations_batcher._MutateRowsOperation", + ) + async def test__execute_mutate_rows(self, mutate_rows): + mutate_rows.return_value = AsyncMock() + start_operation = mutate_rows().start + table = mock.Mock() + table.table_name = "test-table" + table.app_profile_id = "test-app-profile" + table.default_operation_timeout = 17 + table.default_per_request_timeout = 13 + async with self._make_one(table) as instance: + batch = [_make_mutation()] + result = await instance._execute_mutate_rows(batch) + assert start_operation.call_count == 1 + args, kwargs = mutate_rows.call_args + assert args[0] == table.client._gapic_client + assert args[1] == table + assert args[2] == batch + kwargs["operation_timeout"] == 17 + kwargs["per_request_timeout"] == 13 + assert result == [] + + @pytest.mark.asyncio + @mock.patch("google.cloud.bigtable.mutations_batcher._MutateRowsOperation.start") + async def test__execute_mutate_rows_returns_errors(self, mutate_rows): + """Errors from operation should be retruned as list""" + from google.cloud.bigtable.exceptions import ( + MutationsExceptionGroup, + FailedMutationEntryError, + ) + + err1 = FailedMutationEntryError(0, mock.Mock(), RuntimeError("test error")) + err2 = FailedMutationEntryError(1, mock.Mock(), RuntimeError("test error")) + mutate_rows.side_effect = MutationsExceptionGroup([err1, err2], 10) + table = mock.Mock() + table.default_operation_timeout = 17 + table.default_per_request_timeout = 13 + async with self._make_one(table) as instance: + batch = [_make_mutation()] + result = await instance._execute_mutate_rows(batch) + assert len(result) == 2 + assert result[0] == err1 + assert result[1] == err2 + # indices should be set to None + assert result[0].index is None + assert result[1].index is None + + @pytest.mark.asyncio + async def test__raise_exceptions(self): + """Raise exceptions and reset error state""" + from google.cloud.bigtable import exceptions + + expected_total = 1201 + expected_exceptions = [RuntimeError("mock")] * 3 + async with self._make_one() as instance: + instance._oldest_exceptions = expected_exceptions + instance._entries_processed_since_last_raise = expected_total + try: + instance._raise_exceptions() + except exceptions.MutationsExceptionGroup as exc: + assert list(exc.exceptions) == expected_exceptions + assert str(expected_total) in str(exc) + assert instance._entries_processed_since_last_raise == 0 + instance._oldest_exceptions, instance._newest_exceptions = ([], []) + # try calling again + instance._raise_exceptions() + + @pytest.mark.asyncio + async def test___aenter__(self): + """Should return self""" + async with self._make_one() as instance: + assert await instance.__aenter__() == instance + + @pytest.mark.asyncio + async def test___aexit__(self): + """aexit should call close""" + async with self._make_one() as instance: + with mock.patch.object(instance, "close") as close_mock: + await instance.__aexit__(None, None, None) + assert close_mock.call_count == 1 + + @pytest.mark.asyncio + async def test_close(self): + """Should clean up all resources""" + async with self._make_one() as instance: + with mock.patch.object(instance, "_schedule_flush") as flush_mock: + with mock.patch.object(instance, "_raise_exceptions") as raise_mock: + await instance.close() + assert instance.closed is True + assert instance._flush_timer.done() is True + assert instance._flush_jobs == set() + assert flush_mock.call_count == 1 + assert raise_mock.call_count == 1 + + @pytest.mark.asyncio + async def test_close_w_exceptions(self): + """Raise exceptions on close""" + from google.cloud.bigtable import exceptions + + expected_total = 10 + expected_exceptions = [RuntimeError("mock")] + async with self._make_one() as instance: + instance._oldest_exceptions = expected_exceptions + instance._entries_processed_since_last_raise = expected_total + try: + await instance.close() + except exceptions.MutationsExceptionGroup as exc: + assert list(exc.exceptions) == expected_exceptions + assert str(expected_total) in str(exc) + assert instance._entries_processed_since_last_raise == 0 + # clear out exceptions + instance._oldest_exceptions, instance._newest_exceptions = ([], []) + + @pytest.mark.asyncio + async def test__on_exit(self, recwarn): + """Should raise warnings if unflushed mutations exist""" + async with self._make_one() as instance: + # calling without mutations is noop + instance._on_exit() + assert len(recwarn) == 0 + # calling with existing mutations should raise warning + num_left = 4 + instance._staged_entries = [mock.Mock()] * num_left + with pytest.warns(UserWarning) as w: + instance._on_exit() + assert len(w) == 1 + assert "unflushed mutations" in str(w[0].message).lower() + assert str(num_left) in str(w[0].message) + # calling while closed is noop + instance.closed = True + instance._on_exit() + assert len(recwarn) == 0 + # reset staged mutations for cleanup + instance._staged_entries = [] + + @pytest.mark.asyncio + async def test_atexit_registration(self): + """Should run _on_exit on program termination""" + import atexit + + with mock.patch( + "google.cloud.bigtable.mutations_batcher.MutationsBatcher._on_exit" + ) as on_exit_mock: + async with self._make_one(): + assert on_exit_mock.call_count == 0 + atexit._run_exitfuncs() + assert on_exit_mock.call_count == 1 + # should not call after close + atexit._run_exitfuncs() + assert on_exit_mock.call_count == 1 + + @pytest.mark.asyncio + @mock.patch( + "google.cloud.bigtable.mutations_batcher._MutateRowsOperation", + ) + async def test_timeout_args_passed(self, mutate_rows): + """ + batch_operation_timeout and batch_per_request_timeout should be used + in api calls + """ + mutate_rows.return_value = AsyncMock() + expected_operation_timeout = 17 + expected_per_request_timeout = 13 + async with self._make_one( + batch_operation_timeout=expected_operation_timeout, + batch_per_request_timeout=expected_per_request_timeout, + ) as instance: + assert instance._operation_timeout == expected_operation_timeout + assert instance._per_request_timeout == expected_per_request_timeout + # make simulated gapic call + await instance._execute_mutate_rows([_make_mutation()]) + assert mutate_rows.call_count == 1 + kwargs = mutate_rows.call_args[1] + assert kwargs["operation_timeout"] == expected_operation_timeout + assert kwargs["per_request_timeout"] == expected_per_request_timeout + + @pytest.mark.parametrize( + "limit,in_e,start_e,end_e", + [ + (10, 0, (10, 0), (10, 0)), + (1, 10, (0, 0), (1, 1)), + (10, 1, (0, 0), (1, 0)), + (10, 10, (0, 0), (10, 0)), + (10, 11, (0, 0), (10, 1)), + (3, 20, (0, 0), (3, 3)), + (10, 20, (0, 0), (10, 10)), + (10, 21, (0, 0), (10, 10)), + (2, 1, (2, 0), (2, 1)), + (2, 1, (1, 0), (2, 0)), + (2, 2, (1, 0), (2, 1)), + (3, 1, (3, 1), (3, 2)), + (3, 3, (3, 1), (3, 3)), + (1000, 5, (999, 0), (1000, 4)), + (1000, 5, (0, 0), (5, 0)), + (1000, 5, (1000, 0), (1000, 5)), + ], + ) + def test__add_exceptions(self, limit, in_e, start_e, end_e): + """ + Test that the _add_exceptions function properly updates the + _oldest_exceptions and _newest_exceptions lists + Args: + - limit: the _exception_list_limit representing the max size of either list + - in_e: size of list of exceptions to send to _add_exceptions + - start_e: a tuple of ints representing the initial sizes of _oldest_exceptions and _newest_exceptions + - end_e: a tuple of ints representing the expected sizes of _oldest_exceptions and _newest_exceptions + """ + from collections import deque + + input_list = [RuntimeError(f"mock {i}") for i in range(in_e)] + mock_batcher = mock.Mock() + mock_batcher._oldest_exceptions = [ + RuntimeError(f"starting mock {i}") for i in range(start_e[0]) + ] + mock_batcher._newest_exceptions = deque( + [RuntimeError(f"starting mock {i}") for i in range(start_e[1])], + maxlen=limit, + ) + mock_batcher._exception_list_limit = limit + mock_batcher._exceptions_since_last_raise = 0 + self._get_target_class()._add_exceptions(mock_batcher, input_list) + assert len(mock_batcher._oldest_exceptions) == end_e[0] + assert len(mock_batcher._newest_exceptions) == end_e[1] + assert mock_batcher._exceptions_since_last_raise == in_e + # make sure that the right items ended up in the right spots + # should fill the oldest slots first + oldest_list_diff = end_e[0] - start_e[0] + # new items should by added on top of the starting list + newest_list_diff = min(max(in_e - oldest_list_diff, 0), limit) + for i in range(oldest_list_diff): + assert mock_batcher._oldest_exceptions[i + start_e[0]] == input_list[i] + # then, the newest slots should be filled with the last items of the input list + for i in range(1, newest_list_diff + 1): + assert mock_batcher._newest_exceptions[-i] == input_list[-i]