Skip to content

Commit

Permalink
Add partitioned DML support (#459)
Browse files Browse the repository at this point in the history
* Add 'Datatbase.execute_partitioned_dml' method.

* Add system test which exercises PDML. both for UPDATE (with parameter)
  and DELETE.
  • Loading branch information
tseaver committed Sep 21, 2018
1 parent 08a3399 commit 56be30f
Show file tree
Hide file tree
Showing 4 changed files with 380 additions and 194 deletions.
73 changes: 72 additions & 1 deletion spanner/google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,32 @@

"""User friendly container for Cloud Spanner Database."""

import copy
import functools
import re
import threading
import copy

from google.api_core.gapic_v1 import client_info
import google.auth.credentials
from google.protobuf.struct_pb2 import Struct
from google.cloud.exceptions import NotFound
import six

# pylint: disable=ungrouped-imports
from google.cloud.spanner_v1 import __version__
from google.cloud.spanner_v1._helpers import _make_value_pb
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.gapic.spanner_client import SpannerClient
from google.cloud.spanner_v1.keyset import KeySet
from google.cloud.spanner_v1.pool import BurstyPool
from google.cloud.spanner_v1.pool import SessionCheckout
from google.cloud.spanner_v1.session import Session
from google.cloud.spanner_v1.snapshot import _restart_on_unavailable
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.streamed import StreamedResultSet
from google.cloud.spanner_v1.proto.transaction_pb2 import (
TransactionSelector, TransactionOptions)
# pylint: enable=ungrouped-imports


Expand Down Expand Up @@ -272,6 +279,70 @@ def drop(self):
metadata = _metadata_with_prefix(self.name)
api.drop_database(self.name, metadata=metadata)

def execute_partitioned_dml(
self, dml, params=None, param_types=None, query_mode=None):
"""Execute a partitionable DML statement.
:type dml: str
:param dml: SQL DML statement
:type params: dict, {str -> column value}
:param params: values for parameter replacement. Keys must match
the names used in ``dml``.
:type param_types: dict[str -> Union[dict, .types.Type]]
:param param_types:
(Optional) maps explicit types for one or more param values;
required if parameters are passed.
:type query_mode:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.QueryMode`
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1
:rtype: int
:returns: Count of rows affected by the DML statement.
"""
if params is not None:
if param_types is None:
raise ValueError(
"Specify 'param_types' when passing 'params'.")
params_pb = Struct(fields={
key: _make_value_pb(value) for key, value in params.items()})
else:
params_pb = None

api = self.spanner_api

txn_options = TransactionOptions(
partitioned_dml=TransactionOptions.PartitionedDml())

metadata = _metadata_with_prefix(self.name)

with SessionCheckout(self._pool) as session:

txn = api.begin_transaction(
session.name, txn_options, metadata=metadata)

txn_selector = TransactionSelector(id=txn.id)

restart = functools.partial(
api.execute_streaming_sql,
session.name,
dml,
transaction=txn_selector,
params=params_pb,
param_types=param_types,
query_mode=query_mode,
metadata=metadata)

iterator = _restart_on_unavailable(restart)

result_set = StreamedResultSet(iterator)
list(result_set) # consume all partials

return result_set.stats.row_count_lower_bound

def session(self, labels=None):
"""Factory to create a session for this database.
Expand Down
7 changes: 2 additions & 5 deletions spanner/google/cloud/spanner_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,8 @@ def execute_update(self, dml, params=None, param_types=None,
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1
:rtype:
:class:`google.cloud.spanner_v1.proto.ExecuteSqlRequest.ResultSetStats`
:returns:
stats object, including count of rows affected by the DML
statement.
:rtype: int
:returns: Count of rows affected by the DML statement.
"""
if params is not None:
if param_types is None:
Expand Down
56 changes: 56 additions & 0 deletions spanner/tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,62 @@ def test_transaction_execute_update_then_insert_commit(self):
rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL))
self._check_rows_data(rows)

def test_execute_partitioned_dml(self):
retry = RetryInstanceState(_has_all_ddl)
retry(self._db.reload)()

delete_statement = 'DELETE FROM {} WHERE true'.format(self.TABLE)

def _setup_table(txn):
txn.execute_update(delete_statement)
for insert_statement in self._generate_insert_statements():
txn.execute_update(insert_statement)

committed = self._db.run_in_transaction(_setup_table)

with self._db.snapshot(read_timestamp=committed) as snapshot:
before_pdml = list(snapshot.read(
self.TABLE, self.COLUMNS, self.ALL))

self._check_rows_data(before_pdml)

nonesuch = 'nonesuch@example.com'
target = 'phred@example.com'
update_statement = (
'UPDATE {table} SET {table}.email = @email '
'WHERE {table}.email = @target').format(
table=self.TABLE)

row_count = self._db.execute_partitioned_dml(
update_statement,
params={
'email': nonesuch,
'target': target,
},
param_types={
'email': Type(code=STRING),
'target': Type(code=STRING),
},
)
self.assertEqual(row_count, 1)

row = self.ROW_DATA[0]
updated = [row[:3] + (nonesuch,)] + list(self.ROW_DATA[1:])

with self._db.snapshot(read_timestamp=committed) as snapshot:
after_update = list(snapshot.read(
self.TABLE, self.COLUMNS, self.ALL))
self._check_rows_data(after_update, updated)

row_count = self._db.execute_partitioned_dml(delete_statement)
self.assertEqual(row_count, len(self.ROW_DATA))

with self._db.snapshot(read_timestamp=committed) as snapshot:
after_delete = list(snapshot.read(
self.TABLE, self.COLUMNS, self.ALL))

self._check_rows_data(after_delete, [])

def _transaction_concurrency_helper(self, unit_of_work, pkey):
INITIAL_VALUE = 123
NUM_THREADS = 3 # conforms to equivalent Java systest.
Expand Down
Loading

0 comments on commit 56be30f

Please sign in to comment.