diff --git a/python/dazl/__init__.py b/python/dazl/__init__.py index c1989838..2e73a446 100644 --- a/python/dazl/__init__.py +++ b/python/dazl/__init__.py @@ -13,6 +13,7 @@ __all__ = [ "AIOPartyClient", "Command", + "connect", "ContractData", "ContractId", "CreateAndExerciseCommand", @@ -50,6 +51,7 @@ exercise_by_key, ) from .ledger import Command +from .ledger.grpc import connect from .pretty.table import write_acs from .prim import ContractData, ContractId, DazlError, FrozenDict as frozendict, Party from .util.logging import setup_default_logger diff --git a/python/dazl/ledger/config/access.py b/python/dazl/ledger/config/access.py index 9578b392..1b64fe16 100644 --- a/python/dazl/ledger/config/access.py +++ b/python/dazl/ledger/config/access.py @@ -5,6 +5,7 @@ import base64 from collections.abc import MutableSet as MutableSetBase, Set as SetBase import json +from logging import Logger import os from pathlib import Path from typing import ( @@ -64,12 +65,12 @@ def create_access( application_name: Optional[str] = None, oauth_token: Optional[str] = None, oauth_token_file: Optional[str] = None, + logger: Optional[Logger] = None, ) -> AccessConfig: """ Create an appropriate instance of :class:`AccessConfig`. See :meth:`Config.create` for a more detailed description of these parameters. - """ # admin = None is effectively the same as admin = False in this context is_property_based = read_as or act_as or admin or ledger_id or application_name @@ -88,7 +89,7 @@ def create_access( raise ConfigError("no oauth token access or read_as/act_as/admin was specified") # how do they configure thee? let me count the ways... - if sum(map(int, (is_property_based, oauth_token, oauth_token_file))): + if sum(map(int, (bool(is_property_based), bool(oauth_token), bool(oauth_token_file)))) > 1: raise ConfigError( "must specify ONE of read_as/act_as/admin, oauth_token, or oauth_token_file" ) diff --git a/python/dazl/ledger/config/ssl.py b/python/dazl/ledger/config/ssl.py index cd8c9bfd..2ab48b12 100644 --- a/python/dazl/ledger/config/ssl.py +++ b/python/dazl/ledger/config/ssl.py @@ -1,5 +1,6 @@ # Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +from logging import Logger from os import PathLike, fspath from typing import TYPE_CHECKING, Optional @@ -25,6 +26,7 @@ def __init__( cert_file: "Optional[PathLike]" = None, cert_key: "Optional[bytes]" = None, cert_key_file: "Optional[PathLike]" = None, + logger: Optional[Logger] = None, ): self._ca: Optional[bytes] self._cert: Optional[bytes] diff --git a/python/dazl/ledger/errors.py b/python/dazl/ledger/errors.py new file mode 100644 index 00000000..32a9b61e --- /dev/null +++ b/python/dazl/ledger/errors.py @@ -0,0 +1,22 @@ +# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +__all__ = ["CallbackReturnWarning", "ProtocolWarning"] + + +class CallbackReturnWarning(Warning): + """ + Raised when a user callback on a stream returns a value. These objects have no meaning and are + ignored by dazl. + + This warning is raised primarily because older versions of dazl interpreted returning commands + from a callback as a request to send commands to the underlying ledger, and this is not + supported in newer APIs. + """ + + +class ProtocolWarning(Warning): + """ + Warnings that are raised when dazl detects incompatibilities between the Ledger API server-side + implementation and dazl. + """ diff --git a/python/dazl/ledger/grpc/__init__.py b/python/dazl/ledger/grpc/__init__.py new file mode 100644 index 00000000..c2515944 --- /dev/null +++ b/python/dazl/ledger/grpc/__init__.py @@ -0,0 +1,42 @@ +# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +from os import PathLike +from typing import Collection, Optional, Union, overload + +from ...prim import Party, TimeDeltaLike +from ..config import Config +from .conn_aio import Connection + +__all__ = ["connect", "Connection"] + + +# TODO: Figure out clever ways to make this function's type signature easier to maintain while +# preserving its ease of use to callers. +@overload +def connect( + url: str, + *, + read_as: "Union[None, Party, Collection[Party]]" = None, + act_as: "Union[None, Party, Collection[Party]]" = None, + admin: "Optional[bool]" = False, + ledger_id: "Optional[str]" = None, + application_name: "Optional[str]" = None, + oauth_token: "Optional[str]" = None, + ca: "Optional[bytes]" = None, + ca_file: "Optional[PathLike]" = None, + cert: "Optional[bytes]" = None, + cert_file: "Optional[PathLike]" = None, + cert_key: "Optional[bytes]" = None, + cert_key_file: "Optional[PathLike]" = None, + connect_timeout: "Optional[TimeDeltaLike]" = None, + enable_http_proxy: "bool" = True, +) -> Connection: + ... + + +def connect(**kwargs): + """ + Connect to a gRPC Ledger API implementation and return a connection that uses asyncio. + """ + config = Config.create(**kwargs) + return Connection(config) diff --git a/python/dazl/ledger/grpc/channel.py b/python/dazl/ledger/grpc/channel.py new file mode 100644 index 00000000..90c7f441 --- /dev/null +++ b/python/dazl/ledger/grpc/channel.py @@ -0,0 +1,60 @@ +# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +from urllib.parse import urlparse + +from grpc import ( + AuthMetadataContext, + AuthMetadataPlugin, + AuthMetadataPluginCallback, + composite_channel_credentials, + metadata_call_credentials, + ssl_channel_credentials, +) +from grpc.aio import Channel, insecure_channel, secure_channel + +from ..config import Config + +__all__ = ["create_channel"] + + +def create_channel(config: "Config") -> "Channel": + """ + Create a :class:`Channel` for the specified configuration. + """ + u = urlparse(config.url.url) + + options = [ + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ] + if not config.url.use_http_proxy: + options.append(("grpc.enable_http_proxy", 0)) + + if (u.scheme in ("https", "grpcs")) or config.ssl: + credentials = ssl_channel_credentials( + root_certificates=config.ssl.ca, + private_key=config.ssl.cert_key, + certificate_chain=config.ssl.cert, + ) + if config.access.token: + credentials = composite_channel_credentials( + credentials, metadata_call_credentials(GrpcAuth(config)) + ) + return secure_channel(u.netloc, credentials, options) + else: + return insecure_channel(u.netloc, options) + + +class GrpcAuth(AuthMetadataPlugin): + def __init__(self, config: "Config"): + self._config = config + + def __call__(self, context: "AuthMetadataContext", callback: "AuthMetadataPluginCallback"): + options = [] + + # TODO: Add support here for refresh tokens + token = self._config.access.token + if token: + options.append(("Authorization", "Bearer " + self._config.access.token)) + + callback(options, None) diff --git a/python/dazl/ledger/grpc/codec_aio.py b/python/dazl/ledger/grpc/codec_aio.py index 7f109394..d0ea23a4 100644 --- a/python/dazl/ledger/grpc/codec_aio.py +++ b/python/dazl/ledger/grpc/codec_aio.py @@ -226,7 +226,7 @@ async def decode_created_event(self, event: G_CreatedEvent) -> CreateEvent: key = await self.decode_value(template.key.type, event.key) return CreateEvent( - cid, cdata, event.signatories, event.observers, event.agreement_text.Value, key + cid, cdata, event.signatories, event.observers, event.agreement_text.value, key ) async def decode_archived_event(self, event: G_ArchivedEvent) -> ArchiveEvent: diff --git a/python/dazl/ledger/grpc/conn_aio.py b/python/dazl/ledger/grpc/conn_aio.py new file mode 100644 index 00000000..16d19ced --- /dev/null +++ b/python/dazl/ledger/grpc/conn_aio.py @@ -0,0 +1,496 @@ +# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +This module contains the mapping between gRPC calls and Python/dazl types. +""" +from __future__ import annotations + +import asyncio +from typing import AbstractSet, Any, AsyncIterable, Collection, Mapping, Optional, Sequence, Union +import uuid +import warnings + +from grpc.aio import Channel + +from ..._gen.com.daml.ledger.api.v1.active_contracts_service_pb2 import ( + GetActiveContractsRequest as G_GetActiveContractsRequest, +) +from ..._gen.com.daml.ledger.api.v1.active_contracts_service_pb2_grpc import ( + ActiveContractsServiceStub, +) +from ..._gen.com.daml.ledger.api.v1.admin.package_management_service_pb2 import ( + UploadDarFileRequest as G_UploadDarFileRequest, +) +from ..._gen.com.daml.ledger.api.v1.admin.package_management_service_pb2_grpc import ( + PackageManagementServiceStub, +) +from ..._gen.com.daml.ledger.api.v1.admin.party_management_service_pb2 import ( + AllocatePartyRequest as G_AllocatePartyRequest, +) +from ..._gen.com.daml.ledger.api.v1.admin.party_management_service_pb2_grpc import ( + PartyManagementServiceStub, +) +from ..._gen.com.daml.ledger.api.v1.command_service_pb2 import ( + SubmitAndWaitRequest as G_SubmitAndWaitRequest, +) +from ..._gen.com.daml.ledger.api.v1.command_service_pb2_grpc import CommandServiceStub +from ..._gen.com.daml.ledger.api.v1.commands_pb2 import Command as G_Command, Commands as G_Commands +from ..._gen.com.daml.ledger.api.v1.ledger_identity_service_pb2 import ( + GetLedgerIdentityRequest as G_GetLedgerIdentityRequest, +) +from ..._gen.com.daml.ledger.api.v1.ledger_identity_service_pb2_grpc import ( + LedgerIdentityServiceStub, +) +from ..._gen.com.daml.ledger.api.v1.package_service_pb2 import ( + GetPackageRequest as G_GetPackageRequest, + ListPackagesRequest as G_ListPackagesRequest, +) +from ..._gen.com.daml.ledger.api.v1.package_service_pb2_grpc import PackageServiceStub +from ..._gen.com.daml.ledger.api.v1.transaction_filter_pb2 import ( + TransactionFilter as G_TransactionFilter, +) +from ..._gen.com.daml.ledger.api.v1.transaction_service_pb2 import ( + GetTransactionsRequest as G_GetTransactionsRequest, +) +from ..._gen.com.daml.ledger.api.v1.transaction_service_pb2_grpc import TransactionServiceStub +from ...damlast.daml_lf_1 import PackageRef, TypeConName +from ...prim import LEDGER_STRING_REGEX, ContractData, ContractId, Party +from ...query import Query +from ..api_types import ArchiveEvent, Boundary, Command, CreateEvent, ExerciseResponse, PartyInfo +from ..config import Config +from ..config.access import PropertyBasedAccessConfig +from ..errors import ProtocolWarning +from ..stream_aio import QueryStreamBase +from .channel import create_channel +from .codec_aio import Codec + +__all__ = ["Connection"] + + +class Connection: + def __init__(self, config: Config): + self._config = config + self._logger = config.logger + self._channel = create_channel(config) + self._codec = Codec(self) + + @property + def config(self) -> Config: + return self._config + + @property + def channel(self) -> Channel: + """ + Provides access to the underlying gRPC channel. + """ + return self._channel + + @property + def codec(self) -> Codec: + return self._codec + + async def __aenter__(self) -> Connection: + await self.open() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.close() + + async def open(self) -> None: + """ + Does final validation of the token, including possibly fetching the ledger ID if it is not + yet known. + """ + if not self._config.access.ledger_id: + # most calls require a ledger ID; if it wasn't supplied as part of our token or we were + # never given a token in the first place, fetch the ledger ID from the destination + stub = LedgerIdentityServiceStub(self._channel) + response = await stub.GetLedgerIdentity(G_GetLedgerIdentityRequest()) + if isinstance(self._config.access, PropertyBasedAccessConfig): + self._logger.info("Connected to gRPC Ledger API, ledger ID: %s", response.ledger_id) + self._config.access.ledger_id = response.ledger_id + else: + raise ValueError("when using token-based access, the token must contain ledger ID") + + async def close(self) -> None: + """ + Close the underlying channel. Once the channel is closed, future command submissions, + streams in progress, and any future streams will fail. + """ + await self._channel.close() + + # region Write API + + async def do_commands( + self, + commands: Union[Command, Sequence[Command]], + *, + workflow_id: Optional[str] = None, + command_id: Optional[str] = None, + ) -> "None": + """ + Submit one or more commands to the Ledger API. + + You should generally prefer trying to use :meth:`create`, :meth:`exercise`, + :meth:`exercise_by_key`, or :meth:`create_and_exercise`, as they are available over both + the gRPC Ledger API and HTTP JSON API; additionally those methods can provide more + information about what happened. + + This method can be used to submit multiple disparate commands as a single transaction, but + if you find yourself needing to do this, you may want to consider moving more of your logic + into Daml so that only a single command is needed from the outside in order to satisfy your + use case. + """ + if commands is None: + return + elif isinstance(commands, Command): + commands = [commands] + + stub = CommandServiceStub(self.channel) + + commands_pb = await asyncio.gather(*map(self._codec.encode_command, commands)) + request = G_SubmitAndWaitRequest( + commands=G_Commands( + ledger_id=self._config.access.ledger_id, + application_id=self._config.access.application_name, + command_id=self._command_id(command_id), + workflow_id=self._workflow_id(workflow_id), + party=self._ensure_act_as(), + commands=commands_pb, + act_as=self._config.access.act_as, + read_as=self._config.access.read_only_as, + ) + ) + await stub.SubmitAndWait(request) + + async def create( + self, + template_id: Union[str, TypeConName], + payload: ContractData, + *, + workflow_id: Optional[str] = None, + command_id: Optional[str] = None, + ) -> CreateEvent: + """ + Create a contract for a given template. + + :param template_id: The template of the contract to be created. + :param payload: Template arguments for the contract to be created. + :param workflow_id: An optional workflow ID. + :param command_id: An optional command ID. If unspecified, a random one will be created. + """ + stub = CommandServiceStub(self.channel) + + request = G_SubmitAndWaitRequest( + commands=G_Commands( + ledger_id=self._config.access.ledger_id, + application_id=self._config.access.application_name, + command_id=self._command_id(command_id), + workflow_id=self._workflow_id(workflow_id), + party=self._ensure_act_as(), + commands=[await self._codec.encode_create_command(template_id, payload)], + act_as=self._config.access.act_as, + read_as=self._config.access.read_only_as, + ) + ) + response = await stub.SubmitAndWaitForTransaction(request) + + return await self._codec.decode_created_event(response.transaction.events[0].created) + + async def exercise( + self, + contract_id: ContractId, + choice_name: str, + argument: Optional[ContractData] = None, + *, + workflow_id: Optional[str] = None, + command_id: Optional[str] = None, + ) -> ExerciseResponse: + """ + Exercise a choice on a contract identified by its contract ID. + + :param contract_id: The contract ID of the contract to exercise. + :param choice_name: The name of the choice to exercise. + :param argument: The choice arguments. Can be omitted for choices that take no argument. + :param workflow_id: An optional workflow ID. + :param command_id: An optional command ID. If unspecified, a random one will be created. + :return: A response + """ + stub = CommandServiceStub(self.channel) + + commands = [await self._codec.encode_exercise_command(contract_id, choice_name, argument)] + request = self._submit_and_wait_request(commands, workflow_id, command_id) + response = await stub.SubmitAndWaitForTransactionTree(request) + + return await self._codec.decode_exercise_response(response.transaction) + + async def create_and_exercise( + self, + template_id: Union[str, TypeConName], + payload: ContractData, + choice_name: str, + argument: Optional[ContractData] = None, + *, + workflow_id: Optional[str] = None, + command_id: Optional[str] = None, + ) -> ExerciseResponse: + stub = CommandServiceStub(self.channel) + + commands = [ + await self._codec.encode_create_and_exercise_command( + template_id, payload, choice_name, argument + ) + ] + request = self._submit_and_wait_request(commands, workflow_id, command_id) + response = await stub.SubmitAndWaitForTransactionTree(request) + + return await self._codec.decode_exercise_response(response.transaction) + + async def exercise_by_key( + self, + template_id: Union[str, TypeConName], + choice_name: str, + key: Any, + argument: Optional[ContractData] = None, + *, + workflow_id: Optional[str] = None, + command_id: Optional[str] = None, + ) -> "ExerciseResponse": + stub = CommandServiceStub(self.channel) + + commands = [ + await self._codec.encode_exercise_by_key_command( + template_id, choice_name, key, argument + ) + ] + request = await self._submit_and_wait_request(commands, workflow_id, command_id) + response = await stub.SubmitAndWaitForTransactionTree(request) + + return await self._codec.decode_exercise_response(response.transaction) + + async def archive(self, contract_id: ContractId) -> ArchiveEvent: + await self.exercise(contract_id, "Archive") + return ArchiveEvent(contract_id) + + async def archive_by_key(self, template_id: str, key: Any) -> ArchiveEvent: + response = await self.exercise_by_key(template_id, "Archive", key) + return next(iter(event for event in response.events if isinstance(event, ArchiveEvent))) + + def _ensure_act_as(self) -> Party: + act_as_party = next(iter(self._config.access.act_as), None) + if not act_as_party: + raise ValueError("current access rights do not include any act-as parties") + return act_as_party + + @staticmethod + def _workflow_id(workflow_id: Optional[str]) -> Optional[str]: + if workflow_id: + if not LEDGER_STRING_REGEX.match(workflow_id): + raise ValueError("workflow_id must be a valid ledger string") + return workflow_id + else: + return None + + @staticmethod + def _command_id(command_id: Optional[str]) -> str: + if command_id: + if not LEDGER_STRING_REGEX.match(command_id): + raise ValueError("command_id must be a valid ledger string") + return command_id + else: + return uuid.uuid4().hex + + def _submit_and_wait_request( + self, + commands: Collection[G_Command], + workflow_id: Optional[str] = None, + command_id: Optional[str] = None, + ) -> G_SubmitAndWaitRequest: + return G_SubmitAndWaitRequest( + commands=G_Commands( + ledger_id=self._config.access.ledger_id, + application_id=self._config.access.application_name, + command_id=self._command_id(command_id), + workflow_id=self._workflow_id(workflow_id), + party=self._ensure_act_as(), + commands=commands, + act_as=self._config.access.act_as, + read_as=self._config.access.read_only_as, + ) + ) + + # endregion + + # region Read API + + def query(self, template_id: str = "*", query: Query = None) -> QueryStream: + return QueryStream(self, {template_id: query}, False) + + def query_many(self, queries: Optional[Mapping[str, Query]] = None) -> QueryStream: + return QueryStream(self, queries, False) + + def stream(self, template_id: str = "*", query: Query = None) -> QueryStream: + return QueryStream(self, {template_id: query}, True) + + def stream_many(self, queries: Optional[Mapping[str, Query]] = None) -> QueryStream: + return QueryStream(self, queries, True) + + # endregion + + # region Party Management calls + + async def allocate_party( + self, identifier_hint: str = None, display_name: str = None + ) -> "PartyInfo": + """ + Allocate a new party. + """ + stub = PartyManagementServiceStub(self.channel) + request = G_AllocatePartyRequest(party_id_hint=identifier_hint, display_name=display_name) + response = await stub.AllocateParty(request) + return Codec.decode_party_info(response.party_details) + + async def list_known_parties(self) -> Sequence[PartyInfo]: + stub = PartyManagementServiceStub(self.channel) + response = await stub.ListKnownParties() + return [Codec.decode_party_info(pd) for pd in response.party_details] + + # endregion + + # region Package Management calls + + async def get_package(self, package_id: PackageRef) -> bytes: + stub = PackageServiceStub(self.channel) + request = G_GetPackageRequest( + ledger_id=self._config.access.ledger_id, package_id=str(package_id) + ) + response = await stub.GetPackage(request) + return response.archive_payload + + async def list_package_ids(self) -> AbstractSet[PackageRef]: + stub = PackageServiceStub(self.channel) + request = G_ListPackagesRequest(ledger_id=self._config.access.ledger_id) + response = await stub.ListPackages(request) + return frozenset({PackageRef(pkg_id) for pkg_id in response.package_ids}) + + async def upload_package(self, contents: bytes) -> None: + stub = PackageManagementServiceStub(self.channel) + request = G_UploadDarFileRequest(dar_file=contents) + await stub.UploadDarFile(request) + return + + # endregion + + +class QueryStream(QueryStreamBase): + def __init__( + self, + conn: Connection, + queries: Optional[Mapping[str, Query]], + continue_stream: bool, + ): + self.conn = conn + self._queries = queries + self._continue_stream = continue_stream + + self._offset = None + self._filter = None + self._response_stream = None + + async def close(self) -> None: + if self._response_stream is not None: + self._response_stream.cancel() + self._response_stream = None + + async def items(self): + """ + Return an asynchronous stream of events. + + .. code-block:: python + + async with conn.query('SampleApp:Iou') as query: + async for r in query: + print(f"Offset: {r.offset}") + for event in r.events: + print(f" Event: {event}") + + :return: + A stream of responses, where each response contains one or more events at a particular + offset. + + At least one initial :class:`Boundary` is always returned, even if the stream is empty. + In this case, the first returned object is a :class:`Boundary` with ``offset=None``. + """ + filters = await self.conn.codec.encode_filters(self._queries) + filters_by_party = {party: filters for party in self.conn.config.access.read_as} + tx_filter_pb = G_TransactionFilter(filters_by_party=filters_by_party) + + try: + offset = None + async for event in self._acs_events(tx_filter_pb): + if isinstance(event, CreateEvent): + await self._emit_create(event) + elif isinstance(event, Boundary): + offset = event.offset + await self._emit_boundary(event) + else: + warnings.warn(f"Received an unknown event: {event}", ProtocolWarning) + yield event + + if self._continue_stream: + # now start returning events as they come off the transaction stream; note this + # stream will never naturally close, so it's on the caller to call close() or to + # otherwise exit our current context + async for event in self._tx_events(tx_filter_pb, offset): + if isinstance(event, CreateEvent): + await self._emit_create(event) + elif isinstance(event, ArchiveEvent): + await self._emit_archive(event) + elif isinstance(event, Boundary): + await self._emit_boundary(event) + else: + warnings.warn(f"Received an unknown event: {event}", ProtocolWarning) + yield event + finally: + await self.close() + + async def _acs_events( + self, filter_pb: G_TransactionFilter + ) -> AsyncIterable[Union[CreateEvent, Boundary]]: + stub = ActiveContractsServiceStub(self.conn.channel) + + request = G_GetActiveContractsRequest( + ledger_id=self.conn.config.access.ledger_id, filter=filter_pb + ) + self._response_stream = stub.GetActiveContracts(request) + + offset = None + async for response in self._response_stream: + for event in response.active_contracts: + yield await self.conn.codec.decode_created_event(event) + # for ActiveContractSetResponse messages, only the last offset is actually relevant + offset = response.offset + yield Boundary(offset) + + async def _tx_events( + self, filter_pb: G_TransactionFilter, begin_offset: Optional[str] + ) -> AsyncIterable[Union[CreateEvent, ArchiveEvent, Boundary]]: + stub = TransactionServiceStub(self.conn.channel) + + request = G_GetTransactionsRequest( + ledger_id=self.conn.config.access.ledger_id, + filter=filter_pb, + begin=self.conn.codec.encode_begin_offset(begin_offset), + ) + + self._response_stream = stub.GetTransactions(request) + async for response in self._response_stream: + for event in response.events: + event_type = event.WhichOneof("event") + if event_type == "created": + yield await self.conn.codec.decode_created_event(event.created) + elif event_type == "archived": + yield await self.conn.codec.decode_archived_event(event.archived) + else: + warnings.warn(f"Unknown Event({event_type}=...)", ProtocolWarning) + yield Boundary(response.offset) diff --git a/python/dazl/ledger/stream_aio.py b/python/dazl/ledger/stream_aio.py new file mode 100644 index 00000000..3cf63c4d --- /dev/null +++ b/python/dazl/ledger/stream_aio.py @@ -0,0 +1,163 @@ +# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from collections import defaultdict +from inspect import iscoroutine +from typing import ( + Any, + AsyncIterator, + Awaitable, + Callable, + DefaultDict, + List, + TypeVar, + Union, + overload, +) +import warnings + +from .api_types import ArchiveEvent, Boundary, CreateEvent +from .errors import CallbackReturnWarning + +__all__ = ["QueryStreamBase"] + +CREATE_EVENT = "create" +ARCHIVE_EVENT = "archive" +BOUNDARY = "boundary" + +Self = TypeVar("Self") + + +class QueryStreamBase: + @property + def _callbacks( + self, + ) -> "DefaultDict[str, List[Union[Callable[[Any], None], Callable[[Any], Awaitable[None]]]]]": + cb = getattr(self, "__cb", None) + if cb is None: + cb = defaultdict(list) + object.__setattr__(self, "__cb", cb) + return cb + + @overload + def on_boundary(self, fn: "Callable[[Boundary], None]") -> "Callable[[Boundary], None]": + ... + + @overload + def on_boundary( + self, fn: "Callable[[Boundary], Awaitable[None]]" + ) -> "Callable[[Boundary], Awaitable[None]]": + ... + + def on_boundary(self, fn): + if not callable(fn): + raise ValueError("fn must be a callable") + + self._callbacks[BOUNDARY].append(fn) + + @overload + def on_create(self, fn: "Callable[[CreateEvent], None]") -> "Callable[[CreateEvent], None]": + ... + + @overload + def on_create( + self, fn: "Callable[[CreateEvent], Awaitable[None]]" + ) -> "Callable[[CreateEvent], Awaitable[None]]": + ... + + def on_create(self, fn): + if not callable(fn): + raise ValueError("fn must be a callable") + + self._callbacks[CREATE_EVENT].append(fn) + + @overload + def on_archive(self, fn: "Callable[[ArchiveEvent], None]") -> "Callable[[ArchiveEvent], None]": + ... + + @overload + def on_archive( + self, fn: "Callable[[ArchiveEvent], Awaitable[None]]" + ) -> "Callable[[ArchiveEvent], Awaitable[None]]": + ... + + def on_archive(self, fn): + if not callable(fn): + raise ValueError("fn must be a callable") + + self._callbacks[ARCHIVE_EVENT].append(fn) + + async def __aenter__(self: Self) -> "Self": + """ + Prepare the stream. + """ + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + """ + Close the stream. + """ + + async def close(self) -> None: + """ + Close and dispose of any resources used by this stream. + """ + + async def run(self) -> "None": + """ + "Runs" the stream. This can be called as an alternative to :meth:`items` when using + callback-based APIs. + """ + async for _ in self: + pass + + async def creates(self) -> "AsyncIterator[CreateEvent]": + """ + Return a stream of :class:`CreateEvent`s. This will include the contracts of the + Active Contract Set, as well as create events over subsequent transactions. + """ + async for item in self.items(): + if isinstance(item, CreateEvent): + yield item + + async def events(self) -> "AsyncIterator[Union[CreateEvent, ArchiveEvent]]": + """ + Return a stream of :class:`CreateEvent`s. This will include the contracts of the + Active Contract Set, as well as create and archive events over subsequent transactions. + """ + async for item in self.items(): + if isinstance(item, CreateEvent) or isinstance(item, ArchiveEvent): + yield item + + def items(self) -> "AsyncIterator[Union[CreateEvent, ArchiveEvent, Boundary]]": + """ + Must be overridden by subclasses to provide a stream of events. The implementation is + expected to call :meth:`_emit_create` and :meth:`_emit_archive` for every encountered event. + """ + raise NotImplementedError + + def __aiter__(self) -> "AsyncIterator[Union[CreateEvent, ArchiveEvent, Boundary]]": + """ + Returns :meth:`items`, which includes all create and archive events, and boundaries. + """ + return self.items() + + async def _emit(self, name: str, obj: "Any"): + for cb in self._callbacks[name]: + result = cb(obj) + if result is not None and iscoroutine(result): + result = await result + if result is not None: + warnings.warn( + "callbacks should not return anything; the result will be ignored", + CallbackReturnWarning, + ) + + async def _emit_create(self, event: "CreateEvent"): + await self._emit(CREATE_EVENT, event) + + async def _emit_archive(self, event: "ArchiveEvent"): + await self._emit(ARCHIVE_EVENT, event) + + async def _emit_boundary(self, event: "Boundary"): + await self._emit(BOUNDARY, event) diff --git a/python/dazl/prim/__init__.py b/python/dazl/prim/__init__.py index b203e7e4..dbc0ecba 100644 --- a/python/dazl/prim/__init__.py +++ b/python/dazl/prim/__init__.py @@ -6,7 +6,14 @@ correspond to types over the Ledger API. """ -from .basic import to_bool, to_str +from .basic import ( + LEDGER_STRING_REGEX, + NAME_STRING_REGEX, + PACKAGE_ID_STRING_REGEX, + PARTY_ID_STRING_REGEX, + to_bool, + to_str, +) from .complex import to_record, to_variant from .contracts import ContractData, ContractId from .datetime import ( diff --git a/python/dazl/prim/basic.py b/python/dazl/prim/basic.py index 9a82605b..64343f9a 100644 --- a/python/dazl/prim/basic.py +++ b/python/dazl/prim/basic.py @@ -1,12 +1,26 @@ # Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. # SPDX-License-Identifier: Apache-2.0 - +import re from typing import Any -__all__ = ["to_bool", "to_str"] +__all__ = [ + "LEDGER_STRING_REGEX", + "NAME_STRING_REGEX", + "PACKAGE_ID_STRING_REGEX", + "PARTY_ID_STRING_REGEX", + "to_bool", + "to_str", +] + +# Standard string regexes as defined here: +# https://github.com/digital-asset/daml/blob/a6da995ecb71004c34c88a4f4211543868cfde15/ledger-api/grpc-definitions/com/daml/ledger/api/v1/value.proto#L18-L21 +NAME_STRING_REGEX = re.compile(r"[A-Za-z$_][A-Za-z0-9$_]*") +PACKAGE_ID_STRING_REGEX = re.compile(r"[A-Za-z0-9\-_ ]+") +PARTY_ID_STRING_REGEX = re.compile(r"[A-Za-z0-9:\-_ ]") +LEDGER_STRING_REGEX = re.compile(r"[A-Za-z0-9#:\-_/ ]") -def to_bool(obj: "Any") -> bool: +def to_bool(obj: Any) -> bool: """ Convert any of the common wire representations of a ``bool`` to a ``bool``. """ @@ -27,7 +41,7 @@ def to_bool(obj: "Any") -> bool: raise ValueError(f"Could not parse as a boolean: {obj!r}") -def to_str(obj: "Any") -> str: +def to_str(obj: Any) -> str: """ Convert any object to a string. This simply calls ``str`` on the object to produce a string representation. diff --git a/python/dazl/protocols/core.py b/python/dazl/protocols/core.py new file mode 100644 index 00000000..d75bcc79 --- /dev/null +++ b/python/dazl/protocols/core.py @@ -0,0 +1,126 @@ +# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +from typing import Any, Awaitable, Callable, Collection, Mapping, Optional, Sequence, TypeVar, Union + +from ..prim import ContractData, ContractId, Party + +__all__ = [ + "Event", + "InitEvent", + "ReadyEvent", + "CreateEvent", + "ArchiveEvent", + "ExerciseResponse", + "PartyInfo", + "Query", + "EventHandler", + "AEventHandler", +] + +Query = Union[None, Mapping[str, Any], Collection[Mapping[str, Any]]] + + +class Event: + """ + Superclass of all dazl events. + """ + + __slots__ = () + + def __setattr__(self, key, value): + raise AttributeError("Event instances are read-only") + + +E = TypeVar("E", bound=Event) + +EventHandler = Callable[[E], None] +AEventHandler = Union[Callable[[E], None], Callable[[E], Awaitable[None]]] + + +class InitEvent(Event): + pass + + +class ReadyEvent(Event): + pass + + +class CreateEvent(Event): + __slots__ = ("_cid", "_cdata") + + _cid: "ContractId" + _cdata: "ContractData" + + def __init__(self, cid: "ContractId", cdata: "ContractData"): + object.__setattr__(self, "_cid", cid) + object.__setattr__(self, "_cdata", cdata) + + @property + def cid(self) -> "ContractId": + return self._cid + + @property + def cdata(self) -> "ContractData": + return self._cdata + + +class ArchiveEvent(Event): + __slots__ = ("_cid",) + + _cid: "ContractId" + + def __init__(self, cid: "ContractId"): + object.__setattr__(self, "_cid", cid) + + @property + def cid(self) -> "ContractId": + return self._cid + + +class ExerciseResponse: + __slots__ = "_result", "_events" + + _result: "Optional[Any]" + _events: "Sequence[Union[CreateEvent, ArchiveEvent]]" + + def __init__( + self, result: "Optional[Any]", events: "Sequence[Union[CreateEvent, ArchiveEvent]]" + ): + object.__setattr__(self, "_result", result) + object.__setattr__(self, "_events", tuple(events)) + + @property + def result(self) -> "Optional[Any]": + return self._result + + @property + def events(self) -> "Sequence[Union[CreateEvent, ArchiveEvent]]": + return self._events + + def __repr__(self): + return f"ExerciseResponse(result={self.result}, events={self.events})" + + +class PartyInfo: + __slots__ = "_party", "_display_name", "_is_local" + + _party: "Party" + _display_name: str + _is_local: bool + + def __init__(self, party: "Party", display_name: str, is_local: bool): + object.__setattr__(self, "_party", party) + object.__setattr__(self, "_display_name", display_name) + object.__setattr__(self, "_is_local", is_local) + + @property + def party(self) -> "Party": + return self._party + + @property + def display_name(self) -> str: + return self._display_name + + @property + def is_local(self) -> bool: + return self._is_local diff --git a/python/dazl/protocols/errors.py b/python/dazl/protocols/errors.py index ece29db5..8646c5e6 100644 --- a/python/dazl/protocols/errors.py +++ b/python/dazl/protocols/errors.py @@ -2,7 +2,13 @@ # SPDX-License-Identifier: Apache-2.0 from ..prim import DazlError -__all__ = ["ConnectionTimeoutError", "UserTerminateRequest"] +__all__ = [ + "ConnectionTimeoutError", + "UserTerminateRequest", + "StreamError", + "ProtocolWarning", + "CallbackReturnWarning", +] class ConnectionTimeoutError(DazlError): @@ -15,3 +21,16 @@ class UserTerminateRequest(DazlError): """ Raised when the user has initiated a request to terminate the application. """ + + +class StreamError: + """ + An error that arises when trying to read from a query stream. + """ + + +class ProtocolWarning(Warning): + """ + Warnings that are raised when dazl detects incompatibilities between the Ledger API server-side + implementation and dazl. + """ diff --git a/python/dazl/query/__init__.py b/python/dazl/query/__init__.py index 924d8235..85cae290 100644 --- a/python/dazl/query/__init__.py +++ b/python/dazl/query/__init__.py @@ -2,6 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -__all__ = ["ContractMatch", "is_match"] +__all__ = ["ContractMatch", "is_match", "Query"] -from .query import ContractMatch, is_match +from .query import ContractMatch, Query, is_match diff --git a/python/docs/index.rst b/python/docs/index.rst index f0b0aa6f..444241ab 100644 --- a/python/docs/index.rst +++ b/python/docs/index.rst @@ -9,45 +9,48 @@ dazl: DA client library for Python Dependencies ------------ -You will need Python 3.6 or later and a Digital Asset ledger implementation (DA Sandbox or -DA Ledger Server). - -Build-time dependencies are handled using `Poetry `_. +You will need Python 3.6 or later and a Daml Ledger. Getting Started --------------- -This section assumes that you already have a running ledger with the standard `daml new` model loaded, and have imported `dazl`. +This section assumes that you already have a running ledger with the standard `daml new` model +loaded, and have imported `dazl`. Connect to the ledger and submit a single command:: - with dazl.simple_client('http://localhost:6865', 'Alice') as client: - contract = { 'issuer' : 'Alice', 'owner' : 'Alice', 'name' : 'hello world!' } - client.ready() - client.submit_create('Main.Asset', contract) + import dazl + + async with dazl.connect('http://localhost:6865', 'Alice') as conn: + conn.create('Main:Asset', {'issuer': 'Alice', 'owner': 'Alice', 'name': 'hello world!'}) Connect to the ledger as a single party, print all contracts, and close:: - with dazl.simple_client('http://localhost:7600', 'Alice') as client: - # wait for the ACS to be fully read - client.ready() - contract_dict = client.find_active('*') - print(contract_dict) + import dazl + + async with dazl.connect('http://localhost:6865', 'Alice') as conn: + contracts = {} + async for event in conn.query(): + contracts[event.cid] = event.cdata + print(contracts) Connect to the ledger using asynchronous callbacks:: - network = dazl.Network() - network.set_config(url='http://localhost:6865') + import dazl + + async with dazl.connect('http://localhost:6865', 'Alice') as conn: + contracts = {} + @conn.on_create + def _(event): + contracts[event.cid] = event.cdata + print(contracts) - alice = network.aio_party('Alice') +Code +---- - @alice.ledger_ready() - async def onReady(event): - contracts = await event.acs_find_one('Main.Asset') - print(contracts) +Build-time dependencies are handled using `Poetry `_. - network.run_until_complete() Table of Contents ----------------- diff --git a/python/docs/migrating.rst b/python/docs/migrating.rst index 1e95a0e2..9967b74c 100644 --- a/python/docs/migrating.rst +++ b/python/docs/migrating.rst @@ -49,8 +49,184 @@ Commands such as ``dazl ls``, ``dazl tail``, or options provided for your applic - ``--use-acs-service`` -Migrating from dazl v6 from v7 -============================== +Migrating to dazl v7.5 +====================== + +dazl 7.5 introduced a new API for connecting to the Ledger API that embraces design patterns and +technical capabilities that have been introduced to Daml and some of the underlying Python libraries +over the last few years. + +* Daml Multi-party submissions (as of Daml Connect 1.9): + https://daml.com/blog/engineering/roles-in-daml-introducing-multi-party-submissions/ + + Occasionally it had been necessary for applications to listen to multiple streams as different + parties and submit commands based on information. With multi-party submissions, relationships + such as public information and group membership is easier to model with parties, which removes + the need for clients to attempt to keep otherwise independent streams correlated. As such, + :class:`dazl.Network` is deprecated in favor of a lighterweight API that is more explicitly + focused on single connections. + +* Daml HTTP JSON API (stable as of DAML SDK 1.3.0) + + While not yet directly supported (this is planned for dazl v8), the HTTP JSON API supports most + use-cases for applications that need to communicate with a ledger. The new API is slightly + restructured for both compatibility with the HTTP JSON API and parity with the JavaScript + `@daml/ledger `_ library. + +* Daml Authentication/authorization using JSON Web Tokens (JWT) + + dazl was originally built with the simple assumption that a ``Party`` exists 1:1 to a set of + credentials. As the Daml Ledger API evolved, this assumption no longer holds. The new dazl API + treats tokens more as a + +* Daml return values from exercise choices + + Exercise return values have been available from the Ledger API since Daml was open sourced back + in April 2019. dazl's API has finally been updated to take this feature into account. + +* "Unsubscribing" from streams + + ``dazl`` has never had a straightforward way of abandoning event callbacks that were no longer + needed. The new API makes stream lifecycle more explicit and the responsibility of the user of the + library. Disposing of streams is now simpler to reason about. + +* Native gRPC support for asyncio: https://github.com/grpc/proposal/pull/155 + + As of gRPC 1.32.0, the Python gRPC libraries natively support ``asyncio``. This, combined with + client streams that are no longer coupled to each other, means the internals of ``dazl`` are + significantly simpler while also improving performance. + +The changes: + +``dazl.Network``, which has been the primary entry point for dazl code since dazl v5, is now +deprecated. Transitional releases (starting with v7.5) will include both APIs, and ``dazl.Network`` +will be fully removed in dazl v8. + +To ease the transition, you can simply replace ``dazl.Network`` with ``dazl.ConnectionFactory``, +but there are some important semantic differences between these APIs: + +* Old-style template names are not supported with ``dazl.ConnectionFactory``. If you were using + template names such as "Some.Module.Contract" instead of "Some.Module:Contract", this is the time + to change. +* Callbacks from a ``dazl.ConnectionFactory`` that _return_ commands will raise warnings + (though they will still function as expected). These warnings are raised to help you find + examples of callbacks that will need to be reworked when transitioning to the new API. +* Multiple calls to aio_party or simple_party for the same ``Party`` will still share an underlying + connection, but a warning will be raised. These warnings are raised to help you find examples of + places where you may be relying on connection sharing; connections are no longer automatically + shared in the new API. +* Data streams will no longer be synchronized across Parties. If you were building up state from the + perspective of one party and using that information as a different party, you will experience + different behavior. This behavior is anyway generally frowned upon, but prior to the introduction + of multi-party submissions, occasionally necessary. + + +.. code-block:: python + + # dazl v5-v7 + import dazl + + network = dazl.Network() + network.set_config(url="http://localhost:6865") + client = network.aio_party("Alice") + + @client.on_ledger_create("Some:Request") + def auto_accept(event): + return dazl.exercise(event.cid, "Accept") + + network.run_forever() + + # dazl v7.5 or later, transitional API + import dazl + + network = dazl.ConnectionFactory() + network.set_config(url="http://localhost:6865") + client = network.aio_party("Alice") + + @client.on_ledger_create("Some:Request") + def auto_accept(event): + return dazl.exercise(event.cid, "Accept") + + network.run_forever() + + # dazl v7.5 or later, new API + import asyncio, dazl + + async def main(): + async with dazl.connect("http://localhost:6865", "Alice") as conn: + async for event in conn.stream("Some:Request"): + await conn.exercise(event.cid, "Accept") + + asyncio.run(main()) + +A multi-party example. Note that because there is no more ``Network`` to tie connections together, +there are no guarantees that ``Alice`` and ``Bob`` receive events at around the same time. You +should generally + +.. code-block:: python + + # dazl v5-v7 + import dazl + + network = dazl.Network() + network.set_config(url="http://localhost:6865") + + client_alice = network.aio_party("Alice") + client_bob = network.aio_party("Bob") + + @client_alice.on_ledger_create("Some:Request") + def auto_accept(event): + return dazl.exercise(event.cid, "Accept") + + @client_bob.on_ledger_create("Some:Request") + def auto_accept(event): + return dazl.exercise(event.cid, "Accept")) + + network.run() + + # dazl v7.5 or later, transitional API + import dazl + + network = dazl.ConnectionFactory() + network.set_config(url="http://localhost:6865") + + client_alice = network.aio_party("Alice") + client_bob = network.aio_party("Bob") + + @client_alice.on_ledger_create("Some:Request") + def auto_accept(event): + # changed to avoid warnings, even though it still works the old way + # return dazl.exercise(event.cid, "Accept") + return client_alice.submit_exercise(event.cid, "Accept") + + @client_bob.on_ledger_create("Some:Request") + def auto_accept(event): + # changed to avoid warnings, even though it still works the old way + # return dazl.exercise(event.cid, "Accept")) + return client_bob.submit_exercise(event.cid, "Accept") + + network.run() + + # dazl v7.5 or later, new API + import asyncio, dazl + + async def main_alice(): + async with dazl.connect("http://localhost:6865", "Alice") as conn: + async for event in conn.stream("Some:Request"): + await conn.exercise(event.cid, "Accept") + + async def main_bob(): + async with dazl.connect("http://localhost:6865", "Bob") as conn: + async for event in conn.stream("Some:Request"): + await conn.exercise(event.cid, "Accept") + + # Python 3.7+ + asyncio.run(asyncio.gather(main_alice(), main_bob())) + + + +Migrating to dazl v7 +==================== Template formats ---------------- @@ -116,16 +292,14 @@ Deprecated symbols in the `dazl.damlast` and `dazl.model` packages have been rem | ``dazl.model.types.TypeReference.full_name_unambiguous`` property | :func:`dazl.damlast.util.package_local_name()` | +-----------------------------------------------------------------------+------------------------------------------------+ -Migrating from dazl v5 from v6 -============================== +Migrating to v6 +=============== No major breaking API changes were introduced in the v6 release. -Migrating from dazl v5 from v4 -============================== - - +Migrating to v5 +=============== Library Initialization ---------------------- diff --git a/python/tests/unit/test_protocol_ledgerapi.py b/python/tests/unit/test_protocol_ledgerapi.py new file mode 100644 index 00000000..d8ea3c77 --- /dev/null +++ b/python/tests/unit/test_protocol_ledgerapi.py @@ -0,0 +1,48 @@ +# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import logging + +import pytest + +from dazl import connect + +from .dars import PostOffice + + +@pytest.mark.asyncio +async def test_protocol_ledger_api(sandbox): + # first, administrative stuff--upload the DAR and allocate two parties that we'll use later + async with connect(url=sandbox, admin=True) as conn: + await conn.upload_package(PostOffice.read_bytes()) + postman = (await conn.allocate_party()).party + participant = (await conn.allocate_party()).party + + async with connect(url=sandbox, act_as=postman) as conn: + event = await conn.create("Main:PostmanRole", {"postman": postman}) + result = await conn.exercise( + event.contract_id, "InviteParticipant", {"party": participant, "address": "Somewhere!"} + ) + logging.info("Result of inviting a participant: %s", result) + + async with connect(url=sandbox, act_as=participant) as conn: + # Stream results for Main:InviteAuthorRole, and then Main:InviteReceiverRole. Then break the + # stream once we find the first contract. + # + # We do NOT use query() here, because in a distributed ledger setting, the result of the + # postman inviting participants may not yet have been observed by the clients. Instead, use + # stream() since it remains open until explicitly closed. We break the never-ending iterator + # as soon as we see one of each contract. + async with conn.stream("Main:InviteAuthorRole") as query: + async for event in query: + result = await conn.exercise(event.contract_id, "AcceptInviteAuthorRole") + logging.info("The result of AcceptInviteAuthorRole: %s", result) + break + + async with conn.stream("Main:InviteReceiverRole") as query: + async for event in query: + result = await conn.exercise(event.contract_id, "AcceptInviteReceiverRole") + logging.info("The result of AcceptInviteReceiverRole: %s", result) + break + + logging.info("Done!")