diff --git a/python/dazl/__init__.py b/python/dazl/__init__.py
index c1989838..d6d409c6 100644
--- a/python/dazl/__init__.py
+++ b/python/dazl/__init__.py
@@ -50,6 +50,7 @@
exercise_by_key,
)
from .ledger import Command
+from .ledger.grpc import connect
from .pretty.table import write_acs
from .prim import ContractData, ContractId, DazlError, FrozenDict as frozendict, Party
from .util.logging import setup_default_logger
diff --git a/python/dazl/client/api.py b/python/dazl/client/api.py
index b6567f90..3efe9368 100644
--- a/python/dazl/client/api.py
+++ b/python/dazl/client/api.py
@@ -142,10 +142,19 @@ class async_network:
"""
def __init__(
- self, url: "Optional[str]" = None, dars: "Optional[Union[Dar, Collection[Dar]]]" = None
+ self,
+ url: "Optional[str]" = None,
+ dars: "Optional[Union[Dar, Collection[Dar]]]" = None,
+ future_api: bool = False,
):
LOG.debug("async_network.__init__")
- self.network = Network()
+ if future_api:
+ # inline import here to avoid import cycles
+ from ..compat.v8_network import Network as V8Network
+
+ self.network = V8Network()
+ else:
+ self.network = Network()
if url:
self.network.set_config(url=url)
self.dars = as_list(dars) # type: List[Dar]
@@ -294,7 +303,7 @@ def start_in_background(
"""
if validate_install_signal_handlers(install_signal_handlers):
self._impl.invoker.install_signal_handlers()
- return self._impl.start(daemon)
+ return self._impl.start_in_background(daemon, install_signal_handlers=False)
def shutdown(self) -> "Optional[Awaitable[None]]":
"""
diff --git a/python/dazl/client/state.py b/python/dazl/client/state.py
index 97e372e0..5af1fc06 100644
--- a/python/dazl/client/state.py
+++ b/python/dazl/client/state.py
@@ -1,7 +1,7 @@
# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
-from asyncio import Future, sleep
+from asyncio import Future, get_event_loop, sleep
from collections import defaultdict
from dataclasses import dataclass
import datetime
@@ -60,7 +60,7 @@ class ContractContextualData:
class ActiveContractSet:
- def __init__(self, invoker: "Invoker", lookup: "SymbolLookup"):
+ def __init__(self, invoker: "Optional[Invoker]", lookup: "SymbolLookup"):
self.invoker = invoker
self.lookup = lookup
self._tcdata = defaultdict(
@@ -207,8 +207,13 @@ def register_query(self, query: "PendingQuery") -> None:
class PendingQuery:
- def __init__(self, invoker: "Invoker", match, min_count: int):
- self.future = invoker.create_future() # type: Awaitable[Collection[ContractContextualData]]
+ def __init__(self, invoker: "Optional[Invoker]", match, min_count: int):
+ if invoker is not None:
+ self.future = (
+ invoker.create_future()
+ ) # type: Awaitable[Collection[ContractContextualData]]
+ else:
+ self.future = get_event_loop().create_future()
self.match = match
self.min_count = min_count
diff --git a/python/dazl/compat/__init__.py b/python/dazl/compat/__init__.py
new file mode 100644
index 00000000..79204ba6
--- /dev/null
+++ b/python/dazl/compat/__init__.py
@@ -0,0 +1,9 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+__all__ = ["AIOGlobalClient", "AIOPartyClient", "Network", "NotSupportedError"]
+
+
+from .v8 import NotSupportedError
+from .v8_client_aio import AIOGlobalClient, AIOPartyClient
+from .v8_network import Network
diff --git a/python/dazl/compat/_messages.py b/python/dazl/compat/_messages.py
new file mode 100644
index 00000000..5a01b8f4
--- /dev/null
+++ b/python/dazl/compat/_messages.py
@@ -0,0 +1,16 @@
+_NS_LEDGER_ID_DEPRECATED = (
+ "fetching ledger_id from an event is deprecated; instead get it from the connection (or "
+ "consider if knowing the ledger ID is important to your usecase)"
+)
+
+_NS_ACS_DEPRECATED = (
+ "acs functions are deprecated; you should keep your own store, or use contract keys to avoid "
+ "needing to work with local state"
+)
+
+
+COMMAND_ID = "command_id is no longer accessible in the new API"
+WORKFLOW_ID = "workflow_id is no longer accessible in the new API"
+EVENT_ID = "event_id is no longer accessible in the new API"
+WITNESS_PARTIES = "witness_parties is no longer accessible in the new API"
+PARTY = "reading `party` from a connection is ambiguous when multi-party submissions are being used; consider an alternate way of determining an appropriate Party in this context"
diff --git a/python/dazl/compat/v8.py b/python/dazl/compat/v8.py
new file mode 100644
index 00000000..b5613202
--- /dev/null
+++ b/python/dazl/compat/v8.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+This module contains symbols to aid in a gradual migration of v5-v7 code to v8. These symbols will
+be marked as deprecated in v8, and be removed in v9.
+"""
+from typing import TypeVar
+
+from ..protocols.core import AEventHandler, ArchiveEvent, CreateEvent, InitEvent, ReadyEvent
+
+__all__ = ["ConnectionReuseWarning", "CallbackReturnValueWarning", "NotSupportedError"]
+
+InitFn = TypeVar("InitFn", bound=AEventHandler[InitEvent])
+ReadyFn = TypeVar("ReadyFn", bound=AEventHandler[ReadyEvent])
+CreateFn = TypeVar("CreateFn", bound=AEventHandler[CreateEvent])
+ArchiveFn = TypeVar("ArchiveFn", bound=AEventHandler[ArchiveEvent])
+DEFAULT_TIMEOUT_SECONDS = 30
+
+
+class NotSupportedError(Exception):
+ """
+ Error raised when calling an API that exists on :class:`Network` but is not supported on
+ :class:`ConnectionFactory`.
+ """
+
+
+class ConnectionReuseWarning(DeprecationWarning):
+ """
+ Warning raised when Network.aio_party or Network.simple_party are called more than once with
+ the same party literal. Connection sharing will no longer explicitly be provided by dazl in the
+ v8 API; any connection sharing must be instead managed by your application.
+ """
+
+
+class CallbackReturnValueWarning(DeprecationWarning):
+ """
+ Warning raised when a callback returns a value instead of directly submitting a command itself.
+ This style of callback is not supported by the dazl v8 API.
+ """
diff --git a/python/dazl/compat/v8_client_aio.py b/python/dazl/compat/v8_client_aio.py
new file mode 100644
index 00000000..8aeb4c6f
--- /dev/null
+++ b/python/dazl/compat/v8_client_aio.py
@@ -0,0 +1,462 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+import asyncio
+import functools
+import inspect
+from pathlib import Path
+from typing import (
+ Any,
+ Awaitable,
+ BinaryIO,
+ Callable,
+ Collection,
+ List,
+ NoReturn,
+ Optional,
+ Set,
+ Tuple,
+ TypeVar,
+ Union,
+)
+import warnings
+
+from ..client.state import ActiveContractSet
+from ..damlast.daml_lf_1 import TypeConName
+from ..damlast.lookup import parse_type_con_name
+from ..model.core import (
+ ContractContextualData,
+ ContractContextualDataCollection,
+ ContractMatch,
+ ContractsState,
+)
+from ..prim import ContractId, Party, TimeDeltaLike
+from ..protocols.core import AEventHandler, ArchiveEvent, CreateEvent, ExerciseResponse
+from ..protocols.ledgerapi import Connection
+from ..protocols.ledgerapi.conn_aio import QueryStream
+from ..util.io import get_bytes
+from .v8 import CallbackReturnValueWarning, NotSupportedError
+from .v8_events import ContractArchiveEvent, ContractCreateEvent, InitEvent, ReadyEvent
+
+__all__ = ["AIOGlobalClient", "AIOPartyClient"]
+
+InitFn = TypeVar("InitFn", bound=AEventHandler[InitEvent])
+ReadyFn = TypeVar("ReadyFn", bound=AEventHandler[ReadyEvent])
+CreateFn = TypeVar("CreateFn", bound=AEventHandler[CreateEvent])
+ArchiveFn = TypeVar("ArchiveFn", bound=AEventHandler[ArchiveEvent])
+DEFAULT_TIMEOUT_SECONDS = 30
+
+_NS_SERIALIZER = "Serializer and PackageStore are not supported; use Connection.codec instead."
+_NS_LEDGER_ID = (
+ "LedgerMetadata.ledger_id is deprecated; use Connection.config.access.ledger_id instead."
+)
+
+
+class AIOClientBase:
+ def __init__(self, conn: "Connection"):
+ self._conn = conn
+
+ async def ensure_dar(
+ self,
+ contents: "Union[str, Path, bytes, BinaryIO]",
+ timeout: "TimeDeltaLike" = DEFAULT_TIMEOUT_SECONDS,
+ ) -> None:
+ """
+ Validate that the ledger has the packages specified by the given contents (as a byte array).
+ Throw an exception if the specified DARs do not exist within the specified timeout.
+
+ :param contents: The DAR or DALF to ensure.
+ :param timeout: The maximum length of time to wait before giving up.
+ """
+ await asyncio.wait_for(self._conn.upload_package(get_bytes(contents)), timeout=timeout)
+
+
+class AIOGlobalClient(AIOClientBase):
+ """
+ A transitional replacement for :class:`dazl.client.api.AIOGlobalClient` that keeps the same
+ superficial API, but changes some of the semantics to match the newer API.
+ """
+
+ async def ensure_packages(self, package_ids: "Collection[str]"):
+ pass
+
+ async def metadata(self) -> "LedgerMetadata":
+ return LedgerMetadata(self._conn.config.access.ledger_id)
+
+
+class LedgerMetadata:
+ def __init__(self, ledger_id: str):
+ self._ledger_id = ledger_id
+
+ def ledger_id(self) -> str:
+ warnings.warn(_NS_LEDGER_ID, DeprecationWarning, stacklevel=2)
+ return self._ledger_id
+
+ @property
+ def store(self) -> "NoReturn":
+ warnings.warn(_NS_SERIALIZER, DeprecationWarning, stacklevel=2)
+ raise NotSupportedError(_NS_SERIALIZER)
+
+
+class AIOPartyClient(AIOClientBase):
+ """
+ A transitional replacement for :class:`dazl.client.AIOPartyClient` that keeps the same
+ superficial API, but changes some of the semantics to match the newer API.
+ """
+
+ def __init__(self, conn: "Connection"):
+ super().__init__(conn)
+ self._acs = ActiveContractSet(None, conn.codec.lookup)
+ self._ready = asyncio.Event()
+ self._templates = set() # type: Set[TypeConName]
+ self._init_callbacks = [] # type: List[AEventHandler[InitEvent]]
+ self._ready_callbacks = [] # type: List[AEventHandler[ReadyEvent]]
+ self._create_callbacks = (
+ []
+ ) # type: List[Tuple[Callable[[CreateEvent], bool], AEventHandler[CreateEvent]]]
+ self._archive_callbacks = (
+ []
+ ) # type: List[Tuple[Callable[[ArchiveEvent], bool], AEventHandler[ArchiveEvent]]]
+ self._stream = None
+
+ # region Event handler registration
+
+ def ledger_init(self) -> "Callable[[InitFn], InitFn]":
+ def register(fn: "InitFn") -> "InitFn":
+ self.add_ledger_init(fn)
+ return fn
+
+ return register
+
+ def add_ledger_init(self, handler: "InitFn") -> None:
+ self._init_callbacks.append(handler)
+
+ def ledger_ready(self) -> "Callable[[ReadyFn], ReadyFn]":
+ def register(fn: "ReadyFn") -> "ReadyFn":
+ self.add_ledger_ready(fn)
+ return fn
+
+ return register
+
+ def add_ledger_ready(self, handler) -> None:
+ self._ready_callbacks.append(handler)
+
+ @staticmethod
+ def ledger_packages_added(*_, **__):
+ """
+ This function is no longer supported.
+ """
+ warnings.warn("ledger_packages_added is not supported", DeprecationWarning, stacklevel=2)
+ raise Exception("ledger_packages_added is not supported")
+
+ @staticmethod
+ def add_ledger_packages_added(*_, **__):
+ """
+ This function is no longer supported.
+ """
+ warnings.warn(
+ "add_ledger_packages_added is not supported", DeprecationWarning, stacklevel=2
+ )
+ raise Exception("add_ledger_packages_added is not supported")
+
+ @staticmethod
+ def ledger_exercised(*_, **__):
+ """
+ This function is no longer supported.
+ """
+ warnings.warn("ledger_exercised is not supported", DeprecationWarning, stacklevel=2)
+ raise Exception("ledger_exercised is not supported")
+
+ @staticmethod
+ def add_ledger_exercised(*_, **__):
+ """
+ This function is no longer supported.
+ """
+ warnings.warn("add_ledger_exercised is not supported", DeprecationWarning, stacklevel=2)
+ raise Exception("add_ledger_exercised is not supported")
+
+ def ledger_created(
+ self, template: "Union[str, TypeConName]", match: "Optional[ContractMatch]" = None
+ ) -> "CreateFn":
+ def register(fn: "CreateFn") -> "CreateFn":
+ self.add_ledger_created(template, fn, match=match)
+ return fn
+
+ return register
+
+ def add_ledger_created(
+ self,
+ template: "Union[str, TypeConName]",
+ handler: "CreateFn",
+ match: "Optional[ContractMatch]" = None,
+ ) -> None:
+ name = parse_type_con_name(template)
+ self._templates.add(name)
+ self._create_callbacks.append((create_filter(name, match), handler))
+
+ def ledger_archived(
+ self, template: "Union[str, TypeConName]", match: "Optional[ContractMatch]" = None
+ ) -> "ArchiveFn":
+ def register(fn: "ArchiveFn") -> "ArchiveFn":
+ self.add_ledger_archived(template, fn, match=match)
+ return fn
+
+ return register
+
+ def add_ledger_archived(
+ self,
+ template: "Union[str, TypeConName]",
+ handler: "ArchiveFn",
+ match: "Optional[ContractMatch]" = None,
+ ) -> None:
+ name = parse_type_con_name(template)
+ self._templates.add(name)
+ self._create_callbacks.append((create_filter(name, match), handler))
+
+ # endregion
+
+ # region Command Submission
+
+ async def submit(self, commands, workflow_id: "Optional[str]" = None):
+ return await self._conn.do_commands(commands, workflow_id=workflow_id)
+
+ async def submit_create(
+ self,
+ template_name: "Union[str, TypeConName]",
+ arguments: "Optional[dict]" = None,
+ workflow_id: "Optional[str]" = None,
+ ):
+ """
+ Submit a single create command.
+
+ :param template_name:
+ The name of the template.
+ :param arguments:
+ The arguments to the create (as a ``dict``).
+ :param workflow_id:
+ The optional workflow ID to stamp on the outgoing command.
+ :return:
+ A future that resolves when the command has made it to the ledger _or_ an error
+ occurred when trying to process them.
+ """
+ return await self._conn.create(template_name, arguments, workflow_id=workflow_id)
+
+ async def submit_exercise(
+ self,
+ cid: "ContractId",
+ choice_name: str,
+ arguments: "Optional[dict]" = None,
+ workflow_id: "Optional[str]" = None,
+ ) -> "ExerciseResponse":
+ """
+ Submit an exercise choice.
+
+ :param cid:
+ The :class:`ContractId` on which a choice is being exercised.
+ :param choice_name:
+ The name of the choice to exercise.
+ :param arguments:
+ The arguments to the exercise (as a ``dict``). Can be omitted (``None``) for no-argument
+ choices.
+ :param workflow_id:
+ The optional workflow ID to stamp on the outgoing command.
+ :return:
+ A future that resolves when the command has made it to the ledger _or_ an error
+ occurred when trying to process them.
+ """
+ return await self._conn.exercise(cid, choice_name, arguments, workflow_id=workflow_id)
+
+ async def submit_exercise_by_key(
+ self,
+ template_name: "Union[str, TypeConName]",
+ contract_key: "Any",
+ choice_name: str,
+ arguments: "Optional[dict]" = None,
+ workflow_id: "Optional[str]" = None,
+ ) -> "ExerciseResponse":
+ """
+ Synchronously submit a single exercise choice. Equivalent to calling :meth:`submit` with a
+ single ``exercise_by_key``.
+
+ :param template_name:
+ The name of the template on which to do an exercise-by-key.
+ :param contract_key:
+ The value that should uniquely identify a contract for the specified template.
+ :param choice_name:
+ The name of the choice to exercise.
+ :param arguments:
+ The arguments to the create (as a ``dict``). Can be omitted (``None``) for no-argument
+ choices.
+ :param workflow_id:
+ The optional workflow ID to stamp on the outgoing command.
+ """
+ return await self._conn.exercise_by_key(
+ template_name, contract_key, choice_name, arguments, workflow_id=workflow_id
+ )
+
+ async def submit_create_and_exercise(
+ self,
+ template_name: "Union[str, TypeConName]",
+ arguments: "dict",
+ choice_name: str,
+ choice_arguments: "Optional[dict]" = None,
+ workflow_id: "Optional[str]" = None,
+ ) -> "ExerciseResponse":
+ """
+ Synchronously submit a single create-and-exercise command. Equivalent to calling
+ :meth:`submit` with a single ``create_and_exercise``.
+
+ :param template_name:
+ The name of the template on which to do an exercise-by-key.
+ :param arguments:
+ The arguments to the create (as a ``dict``).
+ :param choice_name:
+ The name of the choice to exercise.
+ :param choice_arguments:
+ The arguments to the exercise (as a ``dict``). Can be omitted (``None``) for no-argument
+ :param workflow_id:
+ The optional workflow ID to stamp on the outgoing command.
+ """
+ return await self._conn.create_and_exercise(
+ template_name, arguments, choice_name, choice_arguments, workflow_id=workflow_id
+ )
+
+ # endregion
+
+ # region Active contract set
+
+ def find_by_id(self, cid: "Union[str, ContractId]") -> "Optional[ContractContextualData]":
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ return self._acs.get(cid)
+
+ def find(
+ self, template: "Any", match: "ContractMatch" = None, include_archived: bool = False
+ ) -> "ContractContextualDataCollection":
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ return self._acs.read_full(template, match, include_archived=include_archived)
+
+ def find_one(self, template: Any, match: ContractMatch = None) -> "ContractsState":
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ return self._acs.read_active(template, match)
+
+ def find_historical(
+ self, template: Any, match: ContractMatch = None
+ ) -> "ContractContextualDataCollection":
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ return self._acs.read_full(template, match, include_archived=True)
+
+ def find_nonempty(
+ self, template: Any, match: ContractMatch, min_count: int = 1, timeout: float = 30
+ ):
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ return self._acs.read_async(template, match, min_count=min_count)
+
+ # endregion
+
+ # region Ledger/client metadata
+
+ @staticmethod
+ def set_config(*_, **__):
+ """
+ This function is no longer supported.
+ """
+ warnings.warn("set_config is not supported", DeprecationWarning, stacklevel=2)
+ raise Exception("set_config is not supported")
+
+ async def ready(self) -> None:
+ """
+ Block until the underlying stream has finished reading its active contract set.
+ """
+ await self._ready.wait()
+
+ @property
+ def party(self) -> "Party":
+ """
+ Return the party for the connection that triggered this event.
+
+ Note that with multi-party submissions, there may not be a such thing as a singular "Party"
+ for a connection any more; if you are using this property, you will need to find another
+ mechanism for identifying the relevant Party in order to be compatible with multi-party
+ submissions. As such, this property is deprecated.
+ """
+ warnings.warn(
+ "reading `party` from a connection is ambiguous when multi-party submissions are "
+ "being used; consider an alternate way of determining the relevant party",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return next(iter(self._conn.config.access.act_as))
+
+ # endregion
+
+ async def _open(self) -> None:
+ await self._conn.open()
+
+ async def _run(self, keep_open: bool) -> None:
+ """
+ Called by :class:`ConnectionFactory` to start our internal stream.
+ """
+ query = {t: None for t in self._templates}
+
+ try:
+ async with self._create_stream(query, keep_open) as stream:
+ self._stream = stream
+ for cb in self._init_callbacks:
+ stream.on_init(self._new_style_callback(None, cb, lambda _: InitEvent(self)))
+ stream.on_ready(lambda _: self._ready.set())
+ for cb in self._ready_callbacks:
+ stream.on_ready(self._new_style_callback(None, cb, lambda _: ReadyEvent(self)))
+ for f, cb in self._create_callbacks:
+ stream.on_create(
+ self._new_style_callback(f, cb, lambda ev: ContractCreateEvent(self, ev))
+ )
+ for f, cb in self._archive_callbacks:
+ stream.on_archive(
+ self._new_style_callback(f, cb, lambda ev: ContractArchiveEvent(self, ev))
+ )
+ await stream.run()
+ finally:
+ self._stream = None
+
+ async def _stop(self):
+ if self._stream is not None:
+ await self._stream.close()
+
+ def _create_stream(self, query, keep_open) -> "QueryStream[Union[CreateEvent, ArchiveEvent]]":
+ return self._conn.stream_many(query) if keep_open else self._conn.query_many(query)
+
+ def _new_style_callback(
+ self, event_filter, callback: "Callable[[E1], Any]", event_mapper: "Callable[[E0], E1]"
+ ) -> "Callable[[E0], Awaitable[None]]":
+ """
+ Wrap an old-style callback (that can return commands) into the form a new-style callback
+ (that does not do anything with returned values).
+ """
+
+ @functools.wraps(callback)
+ async def impl(event):
+ if event_filter is None or event_filter(event):
+ compat_event = event_mapper(event)
+ ret = callback(compat_event)
+ if inspect.iscoroutine(ret):
+ ret = await ret
+ if ret is not None:
+ warnings.warn(
+ "callbacks should not return anything", CallbackReturnValueWarning
+ )
+ await self._conn.do_commands(ret)
+
+ return impl
+
+
+E0 = TypeVar("E0")
+E1 = TypeVar("E1")
+
+
+def create_filter(name: "TypeConName", match: "ContractMatch") -> "Callable[[CreateEvent], bool]":
+ def event_filter(event: "CreateEvent") -> bool:
+ if event.cid.value_type == name:
+ return True
+ return False
+
+ return event_filter
diff --git a/python/dazl/compat/v8_config.py b/python/dazl/compat/v8_config.py
new file mode 100644
index 00000000..1de3af6f
--- /dev/null
+++ b/python/dazl/compat/v8_config.py
@@ -0,0 +1,11 @@
+from typing import Any, Dict
+
+from ..protocols.config import Config
+
+
+class NetworkConfig:
+ def __init__(self):
+ self._config = dict() # type: Dict[str, Any]
+
+ def new_style_config(self) -> "Config":
+ return Config.create(**self._config)
diff --git a/python/dazl/compat/v8_events.py b/python/dazl/compat/v8_events.py
new file mode 100644
index 00000000..6416d3ec
--- /dev/null
+++ b/python/dazl/compat/v8_events.py
@@ -0,0 +1,183 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+from typing import TYPE_CHECKING, Any, NoReturn, Optional, Union
+import warnings
+
+from . import NotSupportedError, _messages
+from ..damlast.protocols import SymbolLookup
+from ..model.core import ContractContextualData
+from ..prim import ContractData, ContractId
+from ..protocols.core import ArchiveEvent, CreateEvent
+
+if TYPE_CHECKING:
+ from ..model.types import TypeReference
+ from .v8_client_aio import AIOPartyClient
+
+
+class BaseEvent:
+ """
+ Base class for an event hierarchy that exposes the same API as dazl.model.reading events, but
+ with appropriate deprecation warnings that help guide consumers to better alternatives.
+ """
+
+ def __init__(self, client: "AIOPartyClient"):
+ self._client = client
+
+ @property
+ def client(self) -> "AIOPartyClient":
+ return self._client
+
+ @property
+ def party(self):
+ """
+ Return the party for the connection that triggered this event.
+
+ Note that with multi-party submissions, there may not be a such thing as a singular "Party"
+ for a connection any more; if you are using this property, you will need to find another
+ mechanism for identifying the relevant Party in order to be compatible with multi-party
+ submissions. As such, this property is deprecated.
+ """
+ warnings.warn(
+ "reading `party` from a connection is ambiguous when multi-party submissions are "
+ "being used; consider an alternate way of determining the relevant party",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ return self._client.party
+
+ @property
+ def time(self) -> "None":
+ warnings.warn("time is not exposed in the new API", DeprecationWarning, stacklevel=2)
+ return None
+
+ @property
+ def ledger_id(self) -> str:
+ """
+ Return the ledger ID of the connected ledger.
+ """
+ return self._client._conn.config.access.ledger_id
+
+ @property
+ def lookup(self) -> "SymbolLookup":
+ """
+ Return the :class:`SymbolLookup` that provides access to package metadata.
+ """
+ warnings.warn("use Connection.codec.lookup instead", DeprecationWarning, stacklevel=2)
+ # noinspection PyProtectedMember
+ return self._client._conn.codec.lookup
+
+ @property
+ def package_store(self) -> "NoReturn":
+ warnings.warn("package_store is not supported", DeprecationWarning, stacklevel=2)
+ raise NotSupportedError("package_store is not supported")
+
+ def acs_find_active(self, template: "Union[TypeReference, str]", match=None):
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ # noinspection PyDeprecation
+ return self.client.find_one(template, match)
+
+ def acs_find_by_id(self, cid: "Union[str, ContractId]") -> "Optional[ContractContextualData]":
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ # noinspection PyDeprecation
+ return self.client.find_by_id(cid)
+
+ def acs_find_one(self, template: "Union[TypeReference, str]", match=None):
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ # noinspection PyDeprecation
+ return self.client.find_one(template, match=match)
+
+ def acs_find_historical(self, template: "Union[TypeReference, str]", match=None):
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ # noinspection PyDeprecation
+ return self.client.find_historical(template, match)
+
+ def acs_find_nonempty(self, template: "Union[TypeReference, str]", match=None):
+ warnings.warn("ACS functions are deprecated", DeprecationWarning, stacklevel=2)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ # noinspection PyDeprecation
+ return self.client.find_nonempty(template, match=match)
+
+
+class InitEvent(BaseEvent):
+ pass
+
+
+class OffsetEvent(BaseEvent):
+ @property
+ def offset(self) -> "NoReturn":
+ warnings.warn("offset is no longer accessible in the new API")
+ raise NotSupportedError("offset is no longer accessible in the new API")
+
+
+class ReadyEvent(OffsetEvent):
+ pass
+
+
+class ContractEvent(OffsetEvent):
+ @property
+ def command_id(self) -> "NoReturn":
+ """
+ Raises NotSupportedError; ``command_id`` is no longer accessible in the new API.
+ """
+ warnings.warn(_messages.COMMAND_ID, DeprecationWarning, stacklevel=2)
+ raise NotSupportedError(_messages.COMMAND_ID)
+
+ @property
+ def workflow_id(self) -> "NoReturn":
+ """
+ Raises NotSupportedError; ``workflow_id`` is no longer accessible in the new API.
+ """
+ warnings.warn(_messages.WORKFLOW_ID, DeprecationWarning, stacklevel=2)
+ raise NotSupportedError(_messages.WORKFLOW_ID)
+
+ @property
+ def event_id(self) -> "NoReturn":
+ """
+ Raises NotSupportedError; ``event_id`` is no longer accessible in the new API.
+ """
+ warnings.warn(_messages.EVENT_ID, DeprecationWarning, stacklevel=2)
+ raise NotSupportedError(_messages.EVENT_ID)
+
+ @property
+ def witness_parties(self) -> "NoReturn":
+ """
+ Raises NotSupportedError; ``witness_parties`` is no longer accessible in the new API.
+ """
+ warnings.warn(_messages.WITNESS_PARTIES, DeprecationWarning, stacklevel=2)
+ raise NotSupportedError(_messages.WITNESS_PARTIES)
+
+
+class ContractCreateEvent(ContractEvent):
+ def __init__(self, client: "Any", base_event: "CreateEvent"):
+ super().__init__(client)
+ self._base_event = base_event
+
+ @property
+ def cid(self) -> "ContractId":
+ return self._base_event.cid
+
+ @property
+ def cdata(self) -> "ContractData":
+ return self._base_event.cdata
+
+
+class ContractArchiveEvent(ContractEvent):
+ def __init__(self, client: "Any", base_event: "ArchiveEvent"):
+ super().__init__(client)
+ self._base_event = base_event
+
+ @property
+ def cid(self) -> "ContractId":
+ return self._base_event.cid
diff --git a/python/dazl/compat/v8_network.py b/python/dazl/compat/v8_network.py
new file mode 100644
index 00000000..c5abf862
--- /dev/null
+++ b/python/dazl/compat/v8_network.py
@@ -0,0 +1,390 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+import asyncio
+import signal
+from typing import AbstractSet, Any, Awaitable, Collection, Dict, NoReturn, Optional, Union
+from uuid import uuid4
+import warnings
+
+from ..damlast.protocols import SymbolLookup
+from ..prim import Party
+from ..protocols.ledgerapi import connect
+from ..protocols.pkgcache import SHARED_PACKAGE_DATABASE
+from .v8 import ConnectionReuseWarning, NotSupportedError
+from .v8_client_aio import AIOGlobalClient, AIOPartyClient
+
+__all__ = ["Network"]
+
+from ..scheduler import validate_install_signal_handlers
+
+_NS_CONFIG_CHANGE = (
+ "configuration is handled differently in the new API and introspection "
+ "of configuration through resolved_config is no longer supported"
+)
+_NS_BOT_INTROSPECTION = "bot introspection is not supported with Network"
+_NS_BLOCKING_CALLS = "blocking thread connections are not yet supported"
+
+
+class Network:
+ """
+ A transitional replacement for :class:`Network` that keeps the same superficial API, but changes
+ some of the semantics to match the newer API.
+
+ This is an "almost" compatible implementation that can be used in place of :class:`Network` in
+ most simple dazl applications, and can be used to help find potential issues when trying to
+ transition to the new connection API with a small investment.
+ """
+
+ def __init__(self):
+ super().__init__()
+ self._async_clients = {} # type: Dict[Party, AIOPartyClient]
+ self._admin_client = None # type: Optional[AIOGlobalClient]
+ self._config = {"admin": True} # type: Dict[str, Any]
+
+ def set_config(
+ self,
+ url: "Optional[str]" = None,
+ admin_url: "Optional[str]" = None,
+ max_connection_count: "Optional[int]" = None,
+ quiet_timeout: "Optional[float]" = None,
+ use_acs_service: bool = True,
+ ca_file: "Optional[str]" = None,
+ cert_file: "Optional[str]" = None,
+ cert_key_file: "Optional[str]" = None,
+ verify_ssl: "Optional[str]" = None,
+ max_consequence_depth: "Optional[int]" = None,
+ party_groups: "Optional[Union[str, Party], Collection[Union[str, Party]]]" = None,
+ package_ids: "Optional[Collection[str]]" = None,
+ poll_interval: "Optional[float]" = None,
+ connect_timeout: "Optional[float]" = None,
+ eager_package_fetch: "Optional[bool]" = None,
+ enable_http_proxy: "Optional[bool]" = None,
+ ):
+ """
+ Sets configuration parameters that are shared among all the clients created from this
+ configuration. Some of these configuration parameters behave differently in the
+ v8 Network API; please pay close attention to the documentation of each flag for more
+ information.
+
+ :param url:
+ The URL to use for all clients created from this ``Network``.
+ :param admin_url:
+ This parameter is unused.
+ :param max_connection_count:
+ This parameter is unused.
+ :param quiet_timeout:
+ This parameter is unused.
+ :param use_acs_service:
+ This value must be unspecified or True; the active contract set service is always used
+ and this is not configurable.
+ :param ca_file:
+ Path to a certificate authority file.
+ :param cert_file:
+ Path to a public key cert file.
+ :param cert_key_file:
+ Path to a private key cert file.
+ :param verify_ssl:
+ This parameter is unused.
+ :param max_consequence_depth:
+ This parameter is unused, as dazl does not attempt to correlate the behavior of
+ streams across parties.
+ :param party_groups:
+ A set of read-as parties to add to every connection created on this Network. This field
+ behaves differently than in the old API, as _submissions_ take this read-as set into
+ account in Daml Connect 1.9 and later.
+ :param package_ids:
+ This parameter is unused. Daml packages are always loaded on-demand with this
+ implementation.
+ :param poll_interval:
+ This parameter is unused. In the new API, dazl does not internally poll for any
+ reason.
+ :param connect_timeout:
+ Number of seconds before giving upon a connection.
+ :param eager_package_fetch:
+ This parameter is unused. Daml packages are always loaded on-demand with this
+ implementation.
+ :param enable_http_proxy:
+ True to allow gRPC to use HTTP proxy server settings; False to disallow it.
+ """
+ # These parameters are obsoleted by the new API and ignored.
+ if url:
+ self._config["url"] = url
+ if admin_url:
+ warnings.warn("admin_url has no effect", DeprecationWarning, stacklevel=2)
+ if max_connection_count:
+ warnings.warn("max_connection_count has no effect", DeprecationWarning, stacklevel=2)
+ if quiet_timeout:
+ warnings.warn("quiet_timeout has no effect", DeprecationWarning, stacklevel=2)
+ if poll_interval is not None:
+ warnings.warn("poll_interval has no effect", DeprecationWarning, stacklevel=2)
+ if max_consequence_depth is not None:
+ warnings.warn("max_consequence_depth has no effect", DeprecationWarning, stacklevel=2)
+ if package_ids:
+ warnings.warn("package_ids has no effect", DeprecationWarning, stacklevel=2)
+ if eager_package_fetch is not None or not eager_package_fetch:
+ warnings.warn("eager_package_fetch has no effect", DeprecationWarning, stacklevel=2)
+ if not use_acs_service:
+ warnings.warn(
+ "use_acs_service has no effect and cannot be disabled",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ if ca_file:
+ self._config["ca_file"] = ca_file
+ if cert_file:
+ self._config["cert_file"] = cert_file
+ if cert_key_file:
+ self._config["cert_key_file"] = cert_key_file
+ if verify_ssl:
+ warnings.warn("verify_ssl has no effect", DeprecationWarning, stacklevel=2)
+ if party_groups:
+ self._config["read_as"] = party_groups
+ if enable_http_proxy is not None:
+ self._config["enable_http_proxy"] = enable_http_proxy
+ if connect_timeout is not None:
+ self._config["connect_timeout"] = connect_timeout
+
+ @property
+ def lookup(self) -> "SymbolLookup":
+ """
+ Return a :class:`SymbolLookup` that provides type and package information for known
+ packages. Unlike :class:`Network`, this package database is globally shared and is NOT
+ scoped only to this :class:`ConnectionFactory`.
+
+ See the comments in :package:`dazl.protocols.pkgcache` for more information on why this is
+ a globally shared database instead of being individually scoped.
+ """
+ return SHARED_PACKAGE_DATABASE
+
+ def resolved_config(self) -> "NoReturn":
+ """
+ Reading configuration in this way is no longer supported.
+ """
+ raise NotSupportedError("resolved_config() is not supported")
+
+ def simple_global(self) -> "NoReturn":
+ """
+ Thread-blocking implementations are not yet supported in :class:`ConnectionFactory`.
+ Support for this method will be added in dazl v7.6.
+ """
+ raise NotSupportedError("blocking thread connections are not yet supported")
+
+ def aio_global(self) -> "AIOGlobalClient":
+ """
+ Return a client that can be used to access globally-available information, such as package
+ information.
+
+ Unlike :class:`Network`, the returned object does _not_ attempt to reuse a connection from
+ an existing party, and instead tries to create an explicit connection as an admin. But like
+ :class:`Network`, invoking this function multiple times returns the same client.
+ """
+ if self._admin_client is None:
+ self._admin_client = AIOGlobalClient(connect(**self._config))
+ return self._admin_client
+
+ def simple_party(self, party: "Union[str, Party]") -> "NoReturn":
+ """
+ Thread-blocking implementations are not yet supported in :class:`ConnectionFactory`.
+ Support for this method will be added in dazl v7.6.
+ """
+ raise NotSupportedError("blocking thread connections are not yet supported")
+
+ def simple_new_party(self) -> "NoReturn":
+ """
+ Thread-blocking implementations are not yet supported in :class:`Network`.
+ Support for this method will be added in dazl v7.6.
+ """
+ raise NotSupportedError("blocking thread connections are not yet supported")
+
+ def aio_party(self, party: "Union[str, Party]") -> "AIOPartyClient":
+ """
+ Return a party client that works on an asyncio event loop.
+
+ :param party: The party to get a client for.
+ """
+ client = self._async_clients.get(party)
+ if client is not None:
+ warnings.warn(
+ f"ConnectionFactory.aio_party({party!r}) called more than once",
+ ConnectionReuseWarning,
+ stacklevel=2,
+ )
+ return client
+
+ client = AIOPartyClient(connect(act_as=party, **self._config))
+ self._async_clients[party] = client
+ return client
+
+ def aio_new_party(self) -> "AIOPartyClient":
+ """
+ Return a :class:`PartyClient` for a random party that works on an asyncio event loop.
+ This will never return the same object twice.
+ """
+ return self.aio_party(str(uuid4()))
+
+ # noinspection PyMethodMayBeStatic,PyUnusedLocal
+ def party_bots(self, party: "Union[str, Party]", if_missing: int = 0) -> "NoReturn":
+ """
+ Bot introspection is not supported in :class:`ConnectionFactory`.
+ """
+ warnings.warn(_NS_BOT_INTROSPECTION, DeprecationWarning, stacklevel=2)
+ raise NotSupportedError(_NS_BOT_INTROSPECTION)
+
+ # noinspection PyMethodMayBeStatic
+ def bots(self) -> "NoReturn":
+ """
+ Bot introspection is not supported in :class:`ConnectionFactory`.
+ """
+ warnings.warn(_NS_BOT_INTROSPECTION, DeprecationWarning, stacklevel=2)
+ raise NotSupportedError(_NS_BOT_INTROSPECTION)
+
+ # noinspection PyMethodMayBeStatic
+ def start(self) -> None:
+ """
+ This method is not supported, as ``aio_run()`` can be used as an alternative:
+
+ .. code-block:: python
+ import asyncio
+
+ # Python 3.6
+ asyncio.ensure_future(network.aio_run())
+
+ # Python 3.7 or later
+ asyncio.create_task(network.aio_run())
+ """
+ warnings.warn("start() is not supported; use aio_run() instead")
+ raise NotSupportedError()
+
+ def start_in_background(self):
+ """
+ Thread-blocking implementations are not yet supported in :class:`Network`.
+ Support for this method will be added in dazl v7.6.
+ """
+ raise NotSupportedError(_NS_BLOCKING_CALLS)
+
+ def join(self):
+ """
+ Thread-blocking implementations are not yet supported in :class:`Network`.
+ Support for this method will be added in dazl v7.6.
+ """
+ raise NotSupportedError(_NS_BLOCKING_CALLS)
+
+ def shutdown(self) -> "asyncio.Future":
+ # noinspection PyProtectedMember
+ return asyncio.ensure_future(
+ asyncio.gather(*(client._stop() for client in self._async_clients.values()))
+ )
+
+ async def aio_run(self, *coroutines, keep_open: bool = True) -> None:
+ """
+ Coroutine that can be used to schedule and run all connections.
+
+ Note: Unlike the v5 Network API, there are no guarantees that parties are synchronized in
+ any way! In particular, calling ``network.aio_run(keep_open=False)`` does NOT guarantee that
+ all parties have read an active contract set up to the same transaction.
+
+ If a consistent view across parties is required, you must either use the slower, deprecated
+ dazl v5 API, or use a multi-party subscription using the new dazl v8 API (which was
+ introduced in dazl v7.5).
+
+ :param coroutines:
+ Optional additional coroutines to run. ``aio_run`` is only considered complete once all
+ of the additional coroutines are also finished.
+ :param keep_open:
+ ``True`` to use never-ending streams internally; ``False`` in order for all individual
+ party clients to read up to their current ACS and quit. Note that this flag does not
+ work exactly the same as the dazl v5 API, as it does not guarantee any consistency
+ between individual Party streams.
+ """
+ # noinspection PyProtectedMember
+ await asyncio.gather(*(client._open() for client in self._async_clients.values()))
+ all_coroutines = [client._run(keep_open) for client in self._async_clients.values()]
+ all_coroutines.extend(coroutines)
+ await asyncio.gather(*all_coroutines)
+
+ def run_until_complete(
+ self, *coroutines: "Awaitable[None]", install_signal_handlers: "Optional[bool]" = None
+ ) -> None:
+ """
+ Block the current thread and run the application in an event loop. The loop terminates when
+ the given (optional) coroutines terminate OR :meth:`shutdown` is called.
+
+ Note: Unlike the v5 Network API, there are no guarantees that parties are synchronized in
+ any way! Follow-up commands are also not necessarily sent.
+
+ If a consistent view across parties is required, you must either use the slower, deprecated
+ dazl v5 API, or use a multi-party subscription using the new dazl v8 API (which was
+ introduced in dazl v7.5).
+
+ :param coroutines:
+ Optional additional coroutines to run concurrently with dazl reading from Party streams.
+ :param install_signal_handlers:
+ ``True`` to install SIGINT and SIGQUIT event handlers (CTRL+C and CTRL+\\);
+ ``False`` to skip installation. The default value is ``None``, which installs signal
+ handlers only when called from the main thread (default). If signal handlers are
+ requested to be installed and the thread is NOT the main thread, this method throws.
+ """
+ run(asyncio.get_event_loop(), self, coroutines, install_signal_handlers, False)
+
+ def run_forever(
+ self, *coroutines: "Awaitable[None]", install_signal_handlers: "Optional[bool]" = None
+ ) -> None:
+ """
+ Block the current thread and run the application in an event loop.
+
+ Note: Unlike the v5 Network API, there are no guarantees that parties are synchronized in
+ any way!
+
+ If a consistent view across parties is required, you must either use the slower, deprecated
+ dazl v5 API, or use a multi-party subscription using the new dazl v8 API (which was
+ introduced in dazl v7.5).
+
+ :param coroutines:
+ Optional additional coroutines to run concurrently with dazl reading from Party streams.
+ :param install_signal_handlers:
+ ``True`` to install SIGINT and SIGQUIT event handlers (CTRL+C and CTRL+\\);
+ ``False`` to skip installation. The default value is ``None``, which installs signal
+ handlers only when called from the main thread (default). If signal handlers are
+ requested to be installed and the thread is NOT the main thread, this method throws.
+ """
+ run(asyncio.get_event_loop(), self, coroutines, install_signal_handlers, True)
+
+ @property
+ def bots(self):
+ err_str = "introspection of event handlers is not supported in ConnectionFactory"
+ warnings.warn(err_str, DeprecationWarning, stacklevel=2)
+ raise NotSupportedError(err_str)
+
+ @property
+ def party_bots(self):
+ err_str = "introspection of event handlers is not supported in ConnectionFactory"
+ warnings.warn(err_str, DeprecationWarning, stacklevel=2)
+ raise NotSupportedError(err_str)
+
+ def parties(self) -> "AbstractSet[Party]":
+ warnings.warn(
+ "parties is deprecated and there is no planned replacement",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return frozenset(self._async_clients.keys())
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+
+def run(
+ loop: "asyncio.AbstractEventLoop",
+ network: "Network",
+ coroutines: "Collection[Awaitable[None]]",
+ install_signal_handlers: "Optional[bool]" = None,
+ keep_open: bool = False,
+) -> None:
+ if validate_install_signal_handlers(install_signal_handlers):
+ loop.add_signal_handler(signal.SIGINT, network.shutdown)
+ loop.add_signal_handler(signal.SIGQUIT, network.shutdown)
+ loop.run_until_complete(network.aio_run(coroutines, keep_open=keep_open))
diff --git a/python/dazl/compat/v8_threading.py b/python/dazl/compat/v8_threading.py
new file mode 100644
index 00000000..b387ce4a
--- /dev/null
+++ b/python/dazl/compat/v8_threading.py
@@ -0,0 +1,68 @@
+"""
+This module contains threading/asyncio code that is intended to be dropped once
+:class:`dazl.client.api.Network` and associated symbols are fully removed.
+"""
+from threading import RLock, Thread, current_thread
+from typing import Optional
+import warnings
+
+from .. import LOG
+from ..scheduler import Invoker
+
+__all__ = ["ThreadManager"]
+
+
+class ThreadManager:
+ """
+ Enforces the complex thread policy in the dazl v5 API.
+ """
+
+ def __init__(self):
+ self._lock = RLock()
+ self._invoker = Invoker()
+ self._main_thread = None # type: Optional[Thread]
+
+ def start_in_background(
+ self, daemon: bool, install_signal_handlers: "Optional[bool]"
+ ) -> "None":
+ """
+ Create a background thread, start an event loop, and run the application.
+ """
+ from asyncio import new_event_loop, set_event_loop
+
+ def background_main():
+ LOG.info("Starting an event loop on a background thread %r...", current_thread().name)
+
+ try:
+ loop = new_event_loop()
+ set_event_loop(loop)
+
+ loop.run_until_complete(self.aio_run())
+ except:
+ LOG.exception("The main event loop died!")
+
+ LOG.info("The main event loop has finished.")
+
+ with self._lock:
+ if self._main_thread is not None:
+ raise RuntimeError("start() called more than once")
+ self._main_thread = Thread(
+ target=background_main, daemon=daemon, name=f"dazl:main-{id(self):016x}"
+ )
+
+ self._main_thread.start()
+
+ def join(self, timeout: "Optional[float]" = None) -> None:
+ """
+ Wait for the background thread started by :meth:`start` to finish.
+
+ This method does nothing if :meth:`start` has not yet been called.
+ """
+ with self._lock:
+ thread = self._main_thread
+
+ if thread is not None:
+ thread.join(timeout=timeout)
+
+ async def aio_run(self, *coroutines) -> None:
+ raise NotImplementedError
diff --git a/python/dazl/damlast/lookup.py b/python/dazl/damlast/lookup.py
index 56da59a9..e6d55785 100644
--- a/python/dazl/damlast/lookup.py
+++ b/python/dazl/damlast/lookup.py
@@ -15,7 +15,7 @@
import threading
from types import MappingProxyType
-from typing import AbstractSet, Any, Collection, Dict, Iterable, NoReturn, Optional, Tuple
+from typing import AbstractSet, Any, Collection, Dict, Iterable, NoReturn, Optional, Tuple, Union
from .daml_lf_1 import (
Archive,
@@ -148,8 +148,18 @@ class PackageLookup(SymbolLookup):
Caching structure to make lookups on type names within a :class:`Package` faster.
"""
- def __init__(self, archive: "Archive"):
+ def __init__(self, archive: "Archive", allow_legacy_identifiers: bool = False):
+ """
+ Initialize a :class:`PackageLookup`.
+
+ :param archive:
+ The archive to lookup.
+ :param allow_legacy_identifiers:
+ ``True`` to support lookups over legacy types and non-colon-delimited field names;
+ ``False`` to drop support for legacy symbols.
+ """
self.archive = archive
+ self._allow_legacy_identifiers = allow_legacy_identifiers
data_types = {} # type: Dict[str, Tuple[TypeConName, DefDataType]]
values = {} # type: Dict[str, Tuple[ValName, DefValue]]
diff --git a/python/dazl/ledger/config/__init__.py b/python/dazl/ledger/config/__init__.py
new file mode 100644
index 00000000..2ab6b1b5
--- /dev/null
+++ b/python/dazl/ledger/config/__init__.py
@@ -0,0 +1,63 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+from os import PathLike
+from typing import Collection, Optional, Union
+
+from ...prim import Party, TimeDeltaLike
+from .access import AccessConfig, create_access
+from .ssl import SSLConfig
+from .url import URLConfig
+
+__all__ = ["Config", "AccessConfig", "SSLConfig", "URLConfig"]
+
+
+class Config:
+ @classmethod
+ def create(
+ cls,
+ url: "Optional[str]" = None,
+ read_as: "Union[None, Party, Collection[Party]]" = None,
+ act_as: "Union[None, Party, Collection[Party]]" = None,
+ admin: "Optional[bool]" = False,
+ ledger_id: "Optional[str]" = None,
+ application_name: "Optional[str]" = None,
+ oauth_token: "Optional[str]" = None,
+ ca: "Optional[bytes]" = None,
+ ca_file: "Optional[PathLike]" = None,
+ cert: "Optional[bytes]" = None,
+ cert_file: "Optional[PathLike]" = None,
+ cert_key: "Optional[bytes]" = None,
+ cert_key_file: "Optional[PathLike]" = None,
+ connect_timeout: "Optional[TimeDeltaLike]" = None,
+ enable_http_proxy: "bool" = True,
+ ) -> "Config":
+ url_config = URLConfig(
+ url=url,
+ connect_timeout=connect_timeout,
+ enable_http_proxy=enable_http_proxy,
+ )
+
+ access_config = create_access(
+ read_as=read_as,
+ act_as=act_as,
+ admin=admin,
+ ledger_id=ledger_id,
+ application_name=application_name,
+ oauth_token=oauth_token,
+ )
+
+ ssl_config = SSLConfig(
+ ca=ca,
+ ca_file=ca_file,
+ cert=cert,
+ cert_file=cert_file,
+ cert_key=cert_key,
+ cert_key_file=cert_key_file,
+ )
+
+ return cls(access_config, ssl_config, url_config)
+
+ def __init__(self, access: "AccessConfig", ssl: "SSLConfig", url: "URLConfig"):
+ self.access = access
+ self.ssl = ssl
+ self.url = url
diff --git a/python/dazl/ledger/config/access.py b/python/dazl/ledger/config/access.py
new file mode 100644
index 00000000..b010727e
--- /dev/null
+++ b/python/dazl/ledger/config/access.py
@@ -0,0 +1,360 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+import base64
+from collections.abc import MutableSet as MutableSetBase, Set as SetBase
+import json
+from typing import AbstractSet, Any, Collection, Iterator, Mapping, MutableSet, Optional, Union
+
+from ...prim import Party
+
+
+def create_access(
+ *,
+ read_as: "Union[None, Party, Collection[Party]]" = None,
+ act_as: "Union[None, Party, Collection[Party]]" = None,
+ admin: "Optional[bool]" = None,
+ ledger_id: "Optional[str]" = None,
+ application_name: "Optional[str]" = None,
+ oauth_token: "Optional[str]" = None,
+) -> "AccessConfig":
+ if oauth_token:
+ # if a token is supplied, none of the other arguments are allowed
+ if (
+ read_as is not None
+ or act_as is not None
+ or admin is not None
+ or ledger_id is not None
+ or application_name is not None
+ ):
+ raise ValueError(
+ "cannot configure access with both tokens and "
+ "read_as/act_as/admin/ledger_id/application_name configuration options"
+ )
+ return TokenBasedAccessConfig(oauth_token)
+ else:
+ return PropertyBasedAccessConfig(
+ read_as=read_as,
+ act_as=act_as,
+ admin=admin,
+ ledger_id=ledger_id,
+ application_name=application_name,
+ )
+
+
+class AccessConfig:
+ """
+ Configuration parameters for providing access to a ledger.
+ """
+
+ @property
+ def read_as(self) -> "AbstractSet[Party]":
+ """
+ The set of parties that can be used to read data from the ledger. This also includes the
+ set of parties that can be used to write data to the ledger.
+ """
+ raise NotImplementedError
+
+ @property
+ def read_only_as(self) -> "AbstractSet[Party]":
+ """
+ The set of parties that have read-only access to the underlying ledger.
+ """
+ raise NotImplementedError
+
+ @property
+ def act_as(self) -> "AbstractSet[Party]":
+ """
+ The set of parties that can be used to write data to the ledger.
+ """
+ raise NotImplementedError
+
+ @property
+ def admin(self) -> bool:
+ """
+ ``True`` if the token grants "admin" access.
+ """
+ raise NotImplementedError
+
+ @property
+ def ledger_id(self) -> "Optional[str]":
+ """
+ The ledger ID. For non-token based access methods, this can be queried from the ledger.
+ """
+ raise NotImplementedError
+
+ @property
+ def application_name(self) -> str:
+ """
+ The application name.
+ """
+ raise NotImplementedError
+
+ @property
+ def token(self) -> str:
+ """
+ The bearer token that provides authorization and authentication to a ledger.
+ """
+ raise NotImplementedError
+
+
+class TokenBasedAccessConfig(AccessConfig):
+ """
+ Access configuration that is inherently token-based. The token can be changed at any time, and
+ party rights, the application name, and ledger ID are all derived off of the token.
+ """
+
+ def __init__(self, oauth_token: str):
+ self.token = oauth_token
+
+ @property
+ def token(self) -> str:
+ """
+ The bearer token that provides authorization and authentication to a ledger.
+ """
+ return self._token
+
+ @token.setter
+ def token(self, value: str) -> None:
+ self._token = value
+ claims = decode_token(self._token)
+
+ read_as = frozenset(claims.get("readAs", ()))
+ act_as = frozenset(claims.get("actAs", ()))
+
+ self._act_as = act_as
+ self._read_only_as = read_as - act_as
+ self._read_as = read_as.union(act_as)
+ self._admin = bool(claims.get("admin", False))
+ self._ledger_id = claims.get("ledgerId", None)
+ self._application_name = claims.get("applicationId", None)
+
+ def read_as(self) -> "AbstractSet[Party]":
+ return self._read_as
+
+ def read_only_as(self) -> "AbstractSet[Party]":
+ return self._read_only_as
+
+ def act_as(self) -> "AbstractSet[Party]":
+ return self._act_as
+
+ def admin(self) -> bool:
+ return self._admin
+
+ def ledger_id(self) -> "Optional[str]":
+ return self._ledger_id
+
+ def application_name(self) -> str:
+ return self._application_name
+
+
+class PropertyBasedAccessConfig(AccessConfig):
+ """
+ Access configuration that is manually specified outside of an authentication/authorization
+ framework. Suitable for local testing or when no auth server is available, and the Ledger API
+ inherently trusts any caller to provide its own authentication and authorization.
+ """
+
+ def __init__(
+ self,
+ party: "Optional[Party]" = None,
+ read_as: "Union[None, Party, Collection[Party]]" = None,
+ act_as: "Union[None, Party, Collection[Party]]" = None,
+ admin: "Optional[bool]" = False,
+ ledger_id: "Optional[str]" = None,
+ application_name: "Optional[str]" = None,
+ ):
+ """
+
+ :param party:
+ The singular party to use for reading and writing. This parameter is a convenience
+ parameter for the common case where "read as" and "act as" parties are the same,
+ and there is only one of them. If you specify this parameter, you CANNOT supply
+ ``read_as`` or ``act_as``, nor can you supply an access token When connecting to the
+ HTTP JSON API, ``ledger_id`` must _also_ be supplied when using this parameter..
+ :param read_as:
+ :param act_as:
+ A party of set of parties that can be used to submit commands to the ledger. In a
+ Daml-based ledger, act-as rights imply read-as rights. If you specify this parameter,
+ you CANNOT supply ``party``, nor can you supply an access token. When connecting to the
+ HTTP JSON API, ``ledger_id`` must _also_ be supplied when using this parameter.
+ :param ledger_id:
+ The
+ """
+ self._parties = PartyRights()
+ self._parties.maybe_add(read_as, False)
+ self._parties.maybe_add(act_as, True)
+ self._parties.maybe_add(party, True)
+ self._admin = bool(admin)
+ self._ledger_id = ledger_id
+ self._application_name = application_name or "dazl-client"
+
+ @property
+ def token(self):
+ """
+ Produces a token without signing, utilizing our parameters.
+ """
+ return encode_unsigned_token(
+ self.read_as, self.act_as, self.ledger_id, self.application_name, self.admin
+ )
+
+ @property
+ def ledger_id(self) -> "Optional[str]":
+ return self._ledger_id
+
+ @ledger_id.setter
+ def ledger_id(self, value: "Optional[str]") -> None:
+ self._ledger_id = value
+
+ @property
+ def application_name(self) -> str:
+ return self._application_name
+
+ @property
+ def read_as(self) -> "AbstractSet[Party]":
+ """
+ Return the set of parties for which read rights are granted.
+
+ This set always includes the act_as parties. For the set of parties that can be read as
+ but NOT acted as, use :property:`read_only_as`.
+ """
+ return self._parties
+
+ @property
+ def read_only_as(self) -> "MutableSet[Party]":
+ """"""
+ return self._parties.read_as
+
+ @property
+ def act_as(self) -> "MutableSet[Party]":
+ """
+ Return the set of parties for which act-as rights are granted. This collection can be
+ modified.
+ """
+ return self._parties.act_as
+
+ @property
+ def admin(self) -> bool:
+ return self._admin
+
+
+def parties(p: "Union[None, Party, Collection[Party]]") -> "Collection[Party]":
+ if p is None:
+ return []
+ elif isinstance(p, str):
+ return [Party(p)]
+ else:
+ return p
+
+
+DamlLedgerApiNamespace = "https://daml.com/ledger-api"
+
+
+def decode_token(token: str) -> "Mapping[str, Any]":
+ components = token.split(".", 3)
+ if len(components) != 3:
+ raise ValueError("not a JWT")
+ claim_str = base64.urlsafe_b64decode(components[1])
+ claims = json.loads(claim_str)
+ claims_dict = claims.get(DamlLedgerApiNamespace)
+ if claims_dict is None:
+ raise ValueError(f"JWT is missing claim namespace: {DamlLedgerApiNamespace!r}")
+ return claims_dict
+
+
+def encode_unsigned_token(
+ read_as: "Collection[Party]",
+ act_as: "Collection[Party]",
+ ledger_id: str,
+ application_id: str,
+ admin: bool = True,
+) -> bytes:
+ header = {
+ "alg": "none",
+ "typ": "JWT",
+ }
+ payload = {
+ DamlLedgerApiNamespace: {
+ "ledgerId": ledger_id,
+ "applicationId": application_id,
+ "actAs": sorted(act_as),
+ "readAs": sorted(read_as),
+ "admin": admin,
+ }
+ }
+
+ return (
+ base64.urlsafe_b64encode(json.dumps(header).encode("utf-8"))
+ + b"."
+ + base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
+ + b"."
+ )
+
+
+class PartyRights(SetBase):
+ __slots__ = ("_rights", "read_as", "act_as")
+
+ def __init__(self):
+ self._rights = dict()
+ self.read_as = PartyRightsSet(self, False)
+ self.act_as = PartyRightsSet(self, True)
+
+ def maybe_add(
+ self, value: "Union[None, Party, Collection[Party]]", has_act_rights: bool
+ ) -> None:
+ if value is None:
+ return
+
+ # Party is a fake Python newtype, so isinstance checks don't work on it
+ if isinstance(value, str):
+ self.add(Party(value), has_act_rights)
+ else:
+ for party in value:
+ self.add(party, has_act_rights)
+
+ def add(self, value: "Party", has_act_rights: bool) -> None:
+ """
+ Add/replace a ``Party`` and its rights.
+ """
+ self._rights[value] = has_act_rights
+
+ def discard(self, value: "Party") -> None:
+ self._rights.pop(value)
+
+ def get(self, value: "Party") -> "Optional[bool]":
+ return self._rights.get(value)
+
+ def count(self, act_as: bool) -> int:
+ return sum(1 for p, a in self._rights.items() if act_as == a)
+
+ def __contains__(self, party: object) -> bool:
+ return party in self._rights
+
+ def __len__(self) -> int:
+ return len(self._rights)
+
+ def __iter__(self) -> "Iterator[Party]":
+ return iter(sorted(self._rights))
+
+ def iter(self, act_as: bool) -> "Iterator[Party]":
+ return iter(p for p, a in sorted(self._rights.items()) if a == act_as)
+
+
+class PartyRightsSet(MutableSetBase):
+ def __init__(self, rights: "PartyRights", act_as: bool):
+ self._rights = rights
+ self._act_as = act_as
+
+ def add(self, value: "Party") -> None:
+ self._rights.add(value, self._act_as)
+
+ def discard(self, value: "Party") -> None:
+ self._rights.discard(value)
+
+ def __contains__(self, obj: "object") -> bool:
+ return isinstance(obj, str) and (self._rights.get(Party(obj)) == self._act_as)
+
+ def __len__(self) -> int:
+ return self._rights.count(self._act_as)
+
+ def __iter__(self) -> "Iterator[Party]":
+ return self._rights.iter(self._act_as)
diff --git a/python/dazl/ledger/config/ssl.py b/python/dazl/ledger/config/ssl.py
new file mode 100644
index 00000000..a6ec21ef
--- /dev/null
+++ b/python/dazl/ledger/config/ssl.py
@@ -0,0 +1,77 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+from os import PathLike, fspath
+from typing import Optional
+
+
+class SSLConfig:
+ """
+ Configuration parameters that affect SSL connections.
+ """
+
+ def __init__(
+ self,
+ ca: "Optional[bytes]" = None,
+ ca_file: "Optional[PathLike]" = None,
+ cert: "Optional[bytes]" = None,
+ cert_file: "Optional[PathLike]" = None,
+ cert_key: "Optional[bytes]" = None,
+ cert_key_file: "Optional[PathLike]" = None,
+ ):
+ self._ca: Optional[bytes]
+ self._cert: Optional[bytes]
+ self._cert_key: Optional[bytes]
+
+ if ca_file:
+ if ca:
+ raise ValueError("ca and ca_file cannot both be specified at the same time")
+ with open(fspath(ca_file), "rb") as f:
+ self._ca = f.read()
+ else:
+ self._ca = ca
+
+ if cert_file:
+ if cert:
+ raise ValueError("cert and cert_file cannot both be specified at the same time")
+ with open(fspath(cert_file), "rb") as f:
+ self._cert = f.read()
+ else:
+ self._cert = cert
+
+ if cert_key_file:
+ if cert_key:
+ raise ValueError(
+ "cert_key and cert_key_file cannot both be specified at the same time"
+ )
+ with open(fspath(cert_key_file), "rb") as f:
+ self._cert_key = f.read()
+ else:
+ self._cert_key = cert_key
+
+ def __bool__(self):
+ """
+ True if SSL settings are supplied; otherwise False if no SSL settings of any kind were
+ supplied.
+ """
+ return bool(self._ca or self._cert or self._cert_key)
+
+ @property
+ def ca(self) -> "Optional[bytes]":
+ """
+ Server certificate authority file.
+ """
+ return self._ca
+
+ @property
+ def cert(self) -> "Optional[bytes]":
+ """
+ Client certificate file.
+ """
+ return self._cert
+
+ @property
+ def cert_key(self) -> "Optional[bytes]":
+ """
+ Client certificate and key file.
+ """
+ return self._cert_key
diff --git a/python/dazl/ledger/config/url.py b/python/dazl/ledger/config/url.py
new file mode 100644
index 00000000..d74839e8
--- /dev/null
+++ b/python/dazl/ledger/config/url.py
@@ -0,0 +1,100 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+from datetime import timedelta
+import os
+from typing import Optional
+from urllib.parse import urlparse
+
+from ...prim import TimeDeltaLike, to_timedelta
+
+__all__ = ["URLConfig"]
+
+DEFAULT_CONNECT_TIMEOUT = timedelta(30)
+
+
+class URLConfig:
+ def __init__(
+ self,
+ url: "Optional[str]" = None,
+ host: "Optional[str]" = None,
+ port: "Optional[int]" = None,
+ scheme: "Optional[str]" = None,
+ connect_timeout: "Optional[TimeDeltaLike]" = DEFAULT_CONNECT_TIMEOUT,
+ enable_http_proxy: bool = True,
+ ):
+ url = url if url is not None else os.getenv("DAML_LEDGER_URL", "//localhost:6865")
+ if url:
+ if host or port or scheme:
+ raise ValueError("url or host/port/scheme must be specified, but not both")
+ self._url = sanitize_url(url)
+ components = urlparse(self._url, allow_fragments=False)
+ self._host = components.hostname
+ self._port = components.port
+ self._scheme = components.scheme
+ else:
+ self._scheme = scheme or ""
+ self._host = host or "localhost"
+ self._port = port or 6865
+ self._url = f"{self._scheme}//{self._host}:{self._port}"
+ self._connect_timeout = (
+ to_timedelta(connect_timeout)
+ if connect_timeout is not None
+ else DEFAULT_CONNECT_TIMEOUT
+ )
+ self._enable_http_proxy = enable_http_proxy
+
+ @property
+ def url(self) -> str:
+ """
+ The full URL to connect to, including a protocol, host, and port.
+ """
+ return self._url
+
+ @property
+ def scheme(self) -> "Optional[str]":
+ return self._scheme
+
+ @property
+ def host(self) -> "Optional[str]":
+ return self._host
+
+ @property
+ def port(self) -> "Optional[int]":
+ return self._port
+
+ @property
+ def enable_http_proxy(self) -> bool:
+ """
+ Whether to allow the use of HTTP proxies.
+ """
+ return self._enable_http_proxy
+
+ @property
+ def connect_timeout(self) -> "timedelta":
+ """
+ How long to wait for a connection before giving up.
+
+ The default is 30 seconds.
+ """
+ return self._connect_timeout
+
+
+def sanitize_url(url: str) -> str:
+ """
+ Perform some basic sanitization on a URL string:
+ * Convert a URL with no specified protocol to one with a blank protocol
+ * Strip out any trailing slashes
+
+ >>> sanitize_url("somewhere:1000")
+ '//somewhere:1000'
+
+ >>> sanitize_url("http://somewhere:1000")
+ 'http://somewhere:1000'
+
+ >>> sanitize_url("http://somewhere:1000/")
+ 'http://somewhere:1000/'
+ """
+ first_slash = url.find("/")
+ if first_slash == -1 or first_slash != url.find("//"):
+ url = "//" + url
+ return url
diff --git a/python/dazl/ledger/errors.py b/python/dazl/ledger/errors.py
new file mode 100644
index 00000000..e8d456e9
--- /dev/null
+++ b/python/dazl/ledger/errors.py
@@ -0,0 +1,15 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+__all__ = ["CallbackReturnWarning"]
+
+
+class CallbackReturnWarning(Warning):
+ """
+ Raised when a user callback on a stream returns a value. These objects have no meaning and are
+ ignored by dazl.
+
+ This warning is raised primarily because older versions of dazl interpreted returning commands
+ from a callback as a request to send commands to the underlying ledger, and this is not
+ supported in newer APIs.
+ """
diff --git a/python/dazl/ledger/grpc/__init__.py b/python/dazl/ledger/grpc/__init__.py
new file mode 100644
index 00000000..c2515944
--- /dev/null
+++ b/python/dazl/ledger/grpc/__init__.py
@@ -0,0 +1,42 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+from os import PathLike
+from typing import Collection, Optional, Union, overload
+
+from ...prim import Party, TimeDeltaLike
+from ..config import Config
+from .conn_aio import Connection
+
+__all__ = ["connect", "Connection"]
+
+
+# TODO: Figure out clever ways to make this function's type signature easier to maintain while
+# preserving its ease of use to callers.
+@overload
+def connect(
+ url: str,
+ *,
+ read_as: "Union[None, Party, Collection[Party]]" = None,
+ act_as: "Union[None, Party, Collection[Party]]" = None,
+ admin: "Optional[bool]" = False,
+ ledger_id: "Optional[str]" = None,
+ application_name: "Optional[str]" = None,
+ oauth_token: "Optional[str]" = None,
+ ca: "Optional[bytes]" = None,
+ ca_file: "Optional[PathLike]" = None,
+ cert: "Optional[bytes]" = None,
+ cert_file: "Optional[PathLike]" = None,
+ cert_key: "Optional[bytes]" = None,
+ cert_key_file: "Optional[PathLike]" = None,
+ connect_timeout: "Optional[TimeDeltaLike]" = None,
+ enable_http_proxy: "bool" = True,
+) -> Connection:
+ ...
+
+
+def connect(**kwargs):
+ """
+ Connect to a gRPC Ledger API implementation and return a connection that uses asyncio.
+ """
+ config = Config.create(**kwargs)
+ return Connection(config)
diff --git a/python/dazl/ledger/grpc/channel.py b/python/dazl/ledger/grpc/channel.py
new file mode 100644
index 00000000..49e941ab
--- /dev/null
+++ b/python/dazl/ledger/grpc/channel.py
@@ -0,0 +1,58 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+from grpc import (
+ AuthMetadataContext,
+ AuthMetadataPlugin,
+ AuthMetadataPluginCallback,
+ composite_channel_credentials,
+ metadata_call_credentials,
+ ssl_channel_credentials,
+)
+from grpc.aio import Channel, insecure_channel, secure_channel
+
+from ..config import Config
+
+__all__ = ["create_channel"]
+
+
+def create_channel(config: "Config") -> "Channel":
+ """
+ Create a :class:`Channel` for the specified configuration.
+ """
+ target = f"{config.url.host}:{config.url.port}"
+ options = [
+ ("grpc.max_send_message_length", -1),
+ ("grpc.max_receive_message_length", -1),
+ ]
+ if config.url.enable_http_proxy:
+ options.append(("grpc.enable_http_proxy", 0))
+
+ if (config.url.scheme in ("https", "grpcs")) or config.ssl:
+ credentials = ssl_channel_credentials(
+ root_certificates=config.ssl.ca,
+ private_key=config.ssl.cert_key,
+ certificate_chain=config.ssl.cert,
+ )
+ if config.access.token:
+ credentials = composite_channel_credentials(
+ credentials, metadata_call_credentials(GrpcAuth(config))
+ )
+ return secure_channel(target, credentials, options)
+ else:
+ return insecure_channel(target, options)
+
+
+class GrpcAuth(AuthMetadataPlugin):
+ def __init__(self, config: "Config"):
+ self._config = config
+
+ def __call__(self, context: "AuthMetadataContext", callback: "AuthMetadataPluginCallback"):
+ options = []
+
+ # TODO: Add support here for refresh tokens
+ token = self._config.access.token
+ if token:
+ options.append(("Authorization", "Bearer " + self._config.access.token))
+
+ callback(options, None)
diff --git a/python/dazl/ledger/grpc/codec_aio.py b/python/dazl/ledger/grpc/codec_aio.py
index 03a13f7a..4f0a0ba9 100644
--- a/python/dazl/ledger/grpc/codec_aio.py
+++ b/python/dazl/ledger/grpc/codec_aio.py
@@ -11,7 +11,7 @@
# References:
# * https://github.com/digital-asset/daml/blob/main/ledger-service/http-json/src/main/scala/com/digitalasset/http/CommandService.scala
-from typing import Any, List, Optional, Sequence, Tuple, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple, Union
from ..._gen.com.daml.ledger.api.v1.admin.party_management_service_pb2 import (
PartyDetails as G_PartyDetails,
@@ -61,7 +61,10 @@
PartyInfo,
)
from ..pkgcache import SHARED_PACKAGE_DATABASE
-from ..pkgloader_aio import PackageLoader, PackageService
+from ..pkgloader_aio import PackageLoader
+
+if TYPE_CHECKING:
+ from .conn_aio import Connection
__all__ = ["Codec"]
@@ -78,7 +81,7 @@ class Codec:
identify package contents.
"""
- def __init__(self, conn: "PackageService", lookup: "Optional[SymbolLookup]" = None):
+ def __init__(self, conn: "Connection", lookup: "Optional[SymbolLookup]" = None):
self.conn = conn
self._lookup = lookup or SHARED_PACKAGE_DATABASE
self._loader = PackageLoader(self._lookup, conn)
diff --git a/python/dazl/ledger/grpc/conn_aio.py b/python/dazl/ledger/grpc/conn_aio.py
new file mode 100644
index 00000000..faa0f71b
--- /dev/null
+++ b/python/dazl/ledger/grpc/conn_aio.py
@@ -0,0 +1,493 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+"""
+This module contains the mapping between gRPC calls and Python/dazl types.
+"""
+import asyncio
+from typing import (
+ AbstractSet,
+ Any,
+ AsyncIterable,
+ Collection,
+ Generic,
+ Mapping,
+ Optional,
+ Sequence,
+ TypeVar,
+ Union,
+)
+import uuid
+import warnings
+
+from grpc.aio import Channel
+
+from ..._gen.com.daml.ledger.api.v1.active_contracts_service_pb2 import (
+ GetActiveContractsRequest as G_GetActiveContractsRequest,
+)
+from ..._gen.com.daml.ledger.api.v1.active_contracts_service_pb2_grpc import (
+ ActiveContractsServiceStub,
+)
+from ..._gen.com.daml.ledger.api.v1.admin.package_management_service_pb2 import (
+ UploadDarFileRequest as G_UploadDarFileRequest,
+)
+from ..._gen.com.daml.ledger.api.v1.admin.package_management_service_pb2_grpc import (
+ PackageManagementServiceStub,
+)
+from ..._gen.com.daml.ledger.api.v1.admin.party_management_service_pb2 import (
+ AllocatePartyRequest as G_AllocatePartyRequest,
+)
+from ..._gen.com.daml.ledger.api.v1.admin.party_management_service_pb2_grpc import (
+ PartyManagementServiceStub,
+)
+from ..._gen.com.daml.ledger.api.v1.command_service_pb2 import (
+ SubmitAndWaitRequest as G_SubmitAndWaitRequest,
+)
+from ..._gen.com.daml.ledger.api.v1.command_service_pb2_grpc import CommandServiceStub
+from ..._gen.com.daml.ledger.api.v1.commands_pb2 import Command as G_Command, Commands as G_Commands
+from ..._gen.com.daml.ledger.api.v1.ledger_identity_service_pb2 import (
+ GetLedgerIdentityRequest as G_GetLedgerIdentityRequest,
+)
+from ..._gen.com.daml.ledger.api.v1.ledger_identity_service_pb2_grpc import (
+ LedgerIdentityServiceStub,
+)
+from ..._gen.com.daml.ledger.api.v1.package_service_pb2 import (
+ GetPackageRequest as G_GetPackageRequest,
+ ListPackagesRequest as G_ListPackagesRequest,
+)
+from ..._gen.com.daml.ledger.api.v1.package_service_pb2_grpc import PackageServiceStub
+from ..._gen.com.daml.ledger.api.v1.transaction_filter_pb2 import (
+ TransactionFilter as G_TransactionFilter,
+)
+from ..._gen.com.daml.ledger.api.v1.transaction_service_pb2 import (
+ GetTransactionsRequest as G_GetTransactionsRequest,
+)
+from ..._gen.com.daml.ledger.api.v1.transaction_service_pb2_grpc import TransactionServiceStub
+from ...damlast.daml_lf_1 import PackageRef, TypeConName
+from ...prim import ContractData, ContractId, Party
+from ..commands import Command
+from ..config import Config
+from ..config.access import PropertyBasedAccessConfig
+from ..core import (
+ ArchiveEvent,
+ CreateEvent,
+ ExerciseResponse,
+ InitEvent,
+ PartyInfo,
+ Query,
+ ReadyEvent,
+)
+from ..errors import ProtocolWarning
+from ..stream_aio import QueryStreamBase
+from .channel import create_channel
+from .codec_aio import Codec
+
+__all__ = ["Connection"]
+
+T = TypeVar("T")
+
+
+class Connection:
+ def __init__(self, config: "Config"):
+ self._config = config
+ self._channel = create_channel(config)
+ self._codec = Codec(self)
+
+ @property
+ def config(self) -> "Config":
+ return self._config
+
+ @property
+ def channel(self) -> "Channel":
+ """
+ Provides access to the underlying gRPC channel.
+ """
+ return self._channel
+
+ @property
+ def codec(self) -> "Codec":
+ return self._codec
+
+ async def __aenter__(self) -> "Connection":
+ await self.open()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
+ await self.close()
+
+ async def open(self) -> None:
+ """
+ Does final validation of the token, including possibly fetching the ledger ID if it is not
+ yet known.
+ """
+ if not self._config.access.ledger_id:
+ # most calls require a ledger ID; if it wasn't supplied as part of our token or we were
+ # never given a token in the first place, fetch the ledger ID from the destination
+ stub = LedgerIdentityServiceStub(self._channel)
+ response = await stub.GetLedgerIdentity(G_GetLedgerIdentityRequest())
+ if isinstance(self._config.access, PropertyBasedAccessConfig):
+ self._config.access.ledger_id = response.ledger_id
+ else:
+ raise ValueError("token-based access must supply tokens that provide ledger ID")
+
+ async def close(self) -> None:
+ """
+ Close the underlying channel. Once the channel is closed, future command submissions,
+ streams in progress, and any future streams will fail.
+ """
+ await self._channel.close()
+
+ # region Write API
+
+ async def do_commands(
+ self,
+ commands: "Union[Command, Sequence[Command]]",
+ *,
+ workflow_id: "Optional[str]" = None,
+ command_id: "Optional[str]" = None,
+ ) -> "None":
+ """
+ Submit one or more commands to the Ledger API.
+
+ You should generally prefer trying to use :meth:`create`, :meth:`exercise`,
+ :meth:`exercise_by_key`, or :meth:`create_and_exercise`, as they are available over both
+ the gRPC Ledger API and HTTP JSON API; additionally those methods can provide more
+ information about what happened.
+
+ This method can be used to submit multiple disparate commands as a single transaction, but
+ if you find yourself needing to do this, you may want to consider moving more of your logic
+ into Daml so that only a single command is needed from the outside in order to satisfy your
+ use case.
+ """
+ if commands is None:
+ return
+ elif isinstance(commands, Command):
+ commands = [commands]
+
+ stub = CommandServiceStub(self.channel)
+
+ commands_pb = await asyncio.gather(*map(self._codec.encode_command, commands))
+ request = G_SubmitAndWaitRequest(
+ commands=G_Commands(
+ ledger_id=self._config.access.ledger_id,
+ application_id=self._config.access.application_name,
+ command_id=self._command_id(command_id),
+ workflow_id=self._workflow_id(workflow_id),
+ party=self._ensure_act_as(),
+ commands=commands_pb,
+ act_as=self._config.access.act_as,
+ read_as=self._config.access.read_only_as,
+ )
+ )
+ await stub.SubmitAndWait(request)
+
+ async def create(
+ self,
+ template_id: "Union[str, TypeConName]",
+ payload: "ContractData",
+ *,
+ workflow_id: "Optional[str]" = None,
+ command_id: "Optional[str]" = None,
+ ) -> "CreateEvent":
+ """
+ Create a contract for a given template.
+
+ :param template_id: The template of the contract to be created.
+ :param payload: Template arguments for the contract to be created.
+ :param workflow_id: An optional workflow ID.
+ :param command_id: An optional command ID. If unspecified, a random one will be created.
+ """
+ stub = CommandServiceStub(self.channel)
+
+ request = G_SubmitAndWaitRequest(
+ commands=G_Commands(
+ ledger_id=self._config.access.ledger_id,
+ application_id=self._config.access.application_name,
+ command_id=self._command_id(command_id),
+ workflow_id=self._workflow_id(workflow_id),
+ party=self._ensure_act_as(),
+ commands=[await self._codec.encode_create_command(template_id, payload)],
+ act_as=self._config.access.act_as,
+ read_as=self._config.access.read_only_as,
+ )
+ )
+ response = await stub.SubmitAndWaitForTransaction(request)
+
+ return await self._codec.decode_created_event(response.transaction.events[0].created)
+
+ async def exercise(
+ self,
+ contract_id: "ContractId",
+ choice_name: str,
+ argument: "Optional[ContractData]" = None,
+ *,
+ workflow_id: "Optional[str]" = None,
+ command_id: "Optional[str]" = None,
+ ) -> "ExerciseResponse":
+ """
+ Exercise a choice on a contract identified by its contract ID.
+
+ :param contract_id: The contract ID of the contract to exercise.
+ :param choice_name: The name of the choice to exercise.
+ :param argument: The choice arguments. Can be omitted for choices that take no argument.
+ :param workflow_id: An optional workflow ID.
+ :param command_id: An optional command ID. If unspecified, a random one will be created.
+ :return: A response
+ """
+ stub = CommandServiceStub(self.channel)
+
+ commands = [await self._codec.encode_exercise_command(contract_id, choice_name, argument)]
+ request = self._submit_and_wait_request(commands, workflow_id, command_id)
+ response = await stub.SubmitAndWaitForTransactionTree(request)
+
+ return await self._codec.decode_exercise_response(response.transaction)
+
+ async def create_and_exercise(
+ self,
+ template_id: "Union[str, TypeConName]",
+ payload: "ContractData",
+ choice_name: str,
+ argument: "Optional[ContractData]" = None,
+ *,
+ workflow_id: "Optional[str]" = None,
+ command_id: "Optional[str]" = None,
+ ) -> "ExerciseResponse":
+ stub = CommandServiceStub(self.channel)
+
+ commands = [
+ await self._codec.encode_create_and_exercise_command(
+ template_id, payload, choice_name, argument
+ )
+ ]
+ request = self._submit_and_wait_request(commands, workflow_id, command_id)
+ response = await stub.SubmitAndWaitForTransactionTree(request)
+
+ return await self._codec.decode_exercise_response(response.transaction)
+
+ async def exercise_by_key(
+ self,
+ template_id: "Union[str, TypeConName]",
+ choice_name: str,
+ key: "Any",
+ argument: "Optional[ContractData]" = None,
+ *,
+ workflow_id: "Optional[str]" = None,
+ command_id: "Optional[str]" = None,
+ ) -> "ExerciseResponse":
+ stub = CommandServiceStub(self.channel)
+
+ commands = [
+ await self._codec.encode_exercise_by_key_command(
+ template_id, choice_name, key, argument
+ )
+ ]
+ request = await self._submit_and_wait_request(commands, workflow_id, command_id)
+ response = await stub.SubmitAndWaitForTransactionTree(request)
+
+ return await self._codec.decode_exercise_response(response.transaction)
+
+ async def archive(self, contract_id: "ContractId") -> "ArchiveEvent":
+ await self.exercise(contract_id, "Archive")
+ return ArchiveEvent(contract_id)
+
+ async def archive_by_key(self, template_id: str, key: "Any") -> "ArchiveEvent":
+ response = await self.exercise_by_key(template_id, "Archive", key)
+ return next(iter(event for event in response.events if isinstance(event, ArchiveEvent)))
+
+ def _ensure_act_as(self) -> "Party":
+ act_as_party = next(iter(self._config.access.act_as), None)
+ if not act_as_party:
+ raise ValueError("current access rights do not include any act-as parties")
+ return act_as_party
+
+ @staticmethod
+ def _workflow_id(workflow_id: str) -> str:
+ if workflow_id:
+ # TODO: workflow_id must be a LedgerString; we could enforce some minimal validation
+ # here to make for a more obvious error than failing on the server-side
+ return workflow_id
+
+ @staticmethod
+ def _command_id(command_id: "Optional[str]") -> str:
+ # TODO: command_id must be a LedgerString; we could enforce some minimal validation
+ # here to make for a more obvious error than failing on the server-side
+ return command_id or uuid.uuid4().hex
+
+ def _submit_and_wait_request(
+ self,
+ commands: "Collection[G_Command]",
+ workflow_id: "Optional[str]" = None,
+ command_id: "Optional[str]" = None,
+ ) -> "G_SubmitAndWaitRequest":
+ return G_SubmitAndWaitRequest(
+ commands=G_Commands(
+ ledger_id=self._config.access.ledger_id,
+ application_id=self._config.access.application_name,
+ command_id=self._command_id(command_id),
+ workflow_id=self._workflow_id(workflow_id),
+ party=self._ensure_act_as(),
+ commands=commands,
+ act_as=self._config.access.act_as,
+ read_as=self._config.access.read_only_as,
+ )
+ )
+
+ # endregion
+
+ # region Read API
+
+ def query(self, template_id: str = "*", query: "Query" = None) -> "QueryStream[CreateEvent]":
+ return QueryStream(self, {template_id: query}, False)
+
+ def query_many(
+ self, queries: "Optional[Mapping[str, Query]]" = None
+ ) -> "QueryStream[CreateEvent]":
+ return QueryStream(self, queries, False)
+
+ def stream(
+ self, template_id: str = "*", query: "Query" = None
+ ) -> "QueryStream[Union[CreateEvent, ArchiveEvent]]":
+ return QueryStream(self, {template_id: query}, True)
+
+ def stream_many(
+ self, queries: "Optional[Mapping[str, Query]]" = None
+ ) -> "QueryStream[Union[CreateEvent, ArchiveEvent]]":
+ return QueryStream(self, queries, True)
+
+ # endregion
+
+ # region Party Management calls
+
+ async def allocate_party(
+ self, identifier_hint: str = None, display_name: str = None
+ ) -> "PartyInfo":
+ """
+ Allocate a new party.
+ """
+ stub = PartyManagementServiceStub(self.channel)
+ request = G_AllocatePartyRequest(party_id_hint=identifier_hint, display_name=display_name)
+ response = await stub.AllocateParty(request)
+ return Codec.decode_party_info(response.party_details)
+
+ async def list_known_parties(self) -> "Sequence[PartyInfo]":
+ stub = PartyManagementServiceStub(self.channel)
+ response = await stub.ListKnownParties()
+ return [Codec.decode_party_info(pd) for pd in response.party_details]
+
+ # endregion
+
+ # region Package Management calls
+
+ async def get_package(self, package_id: "PackageRef") -> bytes:
+ stub = PackageServiceStub(self.channel)
+ request = G_GetPackageRequest(
+ ledger_id=self._config.access.ledger_id, package_id=str(package_id)
+ )
+ response = await stub.GetPackage(request)
+ return response.archive_payload
+
+ async def list_package_ids(self) -> "AbstractSet[PackageRef]":
+ stub = PackageServiceStub(self.channel)
+ request = G_ListPackagesRequest(ledger_id=self._config.access.ledger_id)
+ response = await stub.ListPackages(request)
+ return frozenset({PackageRef(pkg_id) for pkg_id in response.package_ids})
+
+ async def upload_package(self, contents: bytes) -> None:
+ stub = PackageManagementServiceStub(self.channel)
+ request = G_UploadDarFileRequest(dar_file=contents)
+ await stub.UploadDarFile(request)
+ return
+
+ # endregion
+
+
+class QueryStream(Generic[T], QueryStreamBase):
+ def __init__(
+ self,
+ conn: "Connection",
+ queries: "Optional[Mapping[str, Query]]",
+ continue_stream: bool,
+ ):
+ self.conn = conn
+ self._queries = queries
+ self._continue_stream = continue_stream
+
+ self._offset = None
+ self._filter = None
+ self._response_stream = None
+
+ async def close(self) -> None:
+ if self._response_stream is not None:
+ self._response_stream.cancel()
+ self._response_stream = None
+
+ async def items(self):
+ """
+ Return an asynchronous stream of events.
+
+ .. code-block:: python
+
+ async with conn.query('SampleApp:Iou') as query:
+ async for r in query:
+ print(f"Offset: {r.offset}")
+ for event in r.events:
+ print(f" Event: {event}")
+
+ :return:
+ A stream of responses, where each response contains one or more events at a particular
+ offset.
+ """
+ filters = await self.conn.codec.encode_filters(self._queries)
+ filters_by_party = {party: filters for party in self.conn.config.access.read_as}
+ tx_filter_pb = G_TransactionFilter(filters_by_party=filters_by_party)
+
+ try:
+ await self._emit_init(InitEvent())
+ async for event in self._acs_events(tx_filter_pb):
+ await self._emit_create(event)
+ yield event
+
+ await self._emit_ready(ReadyEvent())
+ if self._continue_stream:
+ # now start returning events as they come off the transaction stream; note this
+ # stream will never naturally close, so it's on the caller to call close() or to
+ # otherwise exit our current context
+ async for event in self._tx_events(tx_filter_pb):
+ if isinstance(event, CreateEvent):
+ await self._emit_create(event)
+ elif isinstance(event, ArchiveEvent):
+ await self._emit_archive(event)
+ else:
+ warnings.warn(f"Received an unknown event: {event}", ProtocolWarning)
+ yield event
+ finally:
+ await self.close()
+
+ async def _acs_events(self, filter_pb: "G_TransactionFilter") -> "AsyncIterable[CreateEvent]":
+ stub = ActiveContractsServiceStub(self.conn.channel)
+
+ request = G_GetActiveContractsRequest(
+ ledger_id=self.conn.config.access.ledger_id, filter=filter_pb
+ )
+ self._response_stream = stub.GetActiveContracts(request)
+ async for response in self._response_stream:
+ self._offset = response.offset
+ for event in response.active_contracts:
+ yield await self.conn.codec.decode_created_event(event)
+
+ async def _tx_events(self, filter_pb: "G_TransactionFilter") -> "AsyncIterable[CreateEvent]":
+ stub = TransactionServiceStub(self.conn.channel)
+
+ request = G_GetTransactionsRequest(
+ ledger_id=self.conn.config.access.ledger_id, filter=filter_pb
+ )
+ self._response_stream = stub.GetTransactions(request)
+ async for response in self._response_stream:
+ self._offset = response.offset
+ for event in response.active_contracts:
+ event_type = event.WhichOneof("event")
+ if event_type == "created":
+ yield await self.conn.codec.decode_created_event(event.created)
+ elif event_type == "archived":
+ yield await self.conn.codec.decode_archived_event(event.archived)
+ else:
+ warnings.warn(f"Unknown Event({event_type}=...)", ProtocolWarning)
diff --git a/python/dazl/ledger/stream_aio.py b/python/dazl/ledger/stream_aio.py
new file mode 100644
index 00000000..50929f0c
--- /dev/null
+++ b/python/dazl/ledger/stream_aio.py
@@ -0,0 +1,162 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+from collections import defaultdict
+from inspect import iscoroutine
+from typing import (
+ Any,
+ AsyncIterator,
+ Awaitable,
+ Callable,
+ DefaultDict,
+ List,
+ TypeVar,
+ Union,
+ overload,
+)
+import warnings
+
+from .api_types import ArchiveEvent, CreateEvent
+from .errors import CallbackReturnWarning
+
+__all__ = ["QueryStreamBase"]
+
+INIT_EVENT = "init"
+READY_EVENT = "ready"
+CREATE_EVENT = "create"
+ARCHIVE_EVENT = "archive"
+
+Self = TypeVar("Self")
+
+
+class QueryStreamBase:
+ @property
+ def _callbacks(
+ self,
+ ) -> "DefaultDict[str, List[Union[Callable[[Any], None], Callable[[Any], Awaitable[None]]]]]":
+ cb = getattr(self, "__cb", None)
+ if cb is None:
+ cb = defaultdict(list)
+ object.__setattr__(self, "__cb", cb)
+ return cb
+
+ @overload
+ def on_init(self, fn: "Callable[[object], None]") -> "Callable[[object], None]":
+ ...
+
+ @overload
+ def on_init(
+ self, fn: "Callable[[InitEvent], Awaitable[None]]"
+ ) -> "Callable[[InitEvent], Awaitable[None]]":
+ ...
+
+ def on_init(self, fn):
+ if not callable(fn):
+ raise ValueError("fn must be a callable")
+
+ self._callbacks[INIT_EVENT].append(fn)
+
+ @overload
+ def on_ready(self, fn: "Callable[[object], None]") -> "Callable[[object], None]":
+ ...
+
+ @overload
+ def on_ready(
+ self, fn: "Callable[[ReadyEvent], Awaitable[None]]"
+ ) -> "Callable[[ReadyEvent], Awaitable[None]]":
+ ...
+
+ def on_ready(self, fn):
+ if not callable(fn):
+ raise ValueError("fn must be a callable")
+
+ self._callbacks[READY_EVENT].append(fn)
+
+ @overload
+ def on_create(self, fn: "Callable[[CreateEvent], None]") -> "Callable[[CreateEvent], None]":
+ ...
+
+ @overload
+ def on_create(
+ self, fn: "Callable[[CreateEvent], Awaitable[None]]"
+ ) -> "Callable[[CreateEvent], Awaitable[None]]":
+ ...
+
+ def on_create(self, fn):
+ if not callable(fn):
+ raise ValueError("fn must be a callable")
+
+ self._callbacks[CREATE_EVENT].append(fn)
+
+ @overload
+ def on_archive(self, fn: "Callable[[ArchiveEvent], None]") -> "Callable[[ArchiveEvent], None]":
+ ...
+
+ @overload
+ def on_archive(
+ self, fn: "Callable[[ArchiveEvent], Awaitable[None]]"
+ ) -> "Callable[[ArchiveEvent], Awaitable[None]]":
+ ...
+
+ def on_archive(self, fn):
+ if not callable(fn):
+ raise ValueError("fn must be a callable")
+
+ self._callbacks[ARCHIVE_EVENT].append(fn)
+
+ async def __aenter__(self: Self) -> "Self":
+ """
+ Prepare the stream.
+ """
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
+ """
+ Close the stream.
+ """
+
+ async def close(self) -> None:
+ """
+ Close and dispose of any resources used by this stream.
+ """
+
+ async def run(self) -> "None":
+ """
+ "Runs" the stream. This can be called as an alternative to :meth:`items` when using
+ callback-based APIs.
+ """
+ async for _ in self:
+ pass
+
+ def items(self) -> "AsyncIterator[Event]":
+ """
+ Must be overridden by subclasses to provide a stream of events. The implementation is
+ expected to call :meth:`_emit_create` and :meth:`_emit_archive` for every encountered event.
+ """
+ raise NotImplementedError
+
+ def __aiter__(self) -> "AsyncIterator[Event]":
+ return self.items()
+
+ async def _emit(self, name: str, obj: "Any"):
+ for cb in self._callbacks[name]:
+ result = cb(obj)
+ if result is not None and iscoroutine(result):
+ result = await result
+ if result is not None:
+ warnings.warn(
+ "callbacks should not return anything; the result will be ignored",
+ CallbackReturnWarning,
+ )
+
+ async def _emit_init(self, event: "Any"):
+ await self._emit(INIT_EVENT, event)
+
+ async def _emit_ready(self, event: "Any"):
+ await self._emit(READY_EVENT, event)
+
+ async def _emit_create(self, event: "CreateEvent"):
+ await self._emit(CREATE_EVENT, event)
+
+ async def _emit_archive(self, event: "ArchiveEvent"):
+ await self._emit(ARCHIVE_EVENT, event)
diff --git a/python/dazl/protocols/core.py b/python/dazl/protocols/core.py
new file mode 100644
index 00000000..d75bcc79
--- /dev/null
+++ b/python/dazl/protocols/core.py
@@ -0,0 +1,126 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+from typing import Any, Awaitable, Callable, Collection, Mapping, Optional, Sequence, TypeVar, Union
+
+from ..prim import ContractData, ContractId, Party
+
+__all__ = [
+ "Event",
+ "InitEvent",
+ "ReadyEvent",
+ "CreateEvent",
+ "ArchiveEvent",
+ "ExerciseResponse",
+ "PartyInfo",
+ "Query",
+ "EventHandler",
+ "AEventHandler",
+]
+
+Query = Union[None, Mapping[str, Any], Collection[Mapping[str, Any]]]
+
+
+class Event:
+ """
+ Superclass of all dazl events.
+ """
+
+ __slots__ = ()
+
+ def __setattr__(self, key, value):
+ raise AttributeError("Event instances are read-only")
+
+
+E = TypeVar("E", bound=Event)
+
+EventHandler = Callable[[E], None]
+AEventHandler = Union[Callable[[E], None], Callable[[E], Awaitable[None]]]
+
+
+class InitEvent(Event):
+ pass
+
+
+class ReadyEvent(Event):
+ pass
+
+
+class CreateEvent(Event):
+ __slots__ = ("_cid", "_cdata")
+
+ _cid: "ContractId"
+ _cdata: "ContractData"
+
+ def __init__(self, cid: "ContractId", cdata: "ContractData"):
+ object.__setattr__(self, "_cid", cid)
+ object.__setattr__(self, "_cdata", cdata)
+
+ @property
+ def cid(self) -> "ContractId":
+ return self._cid
+
+ @property
+ def cdata(self) -> "ContractData":
+ return self._cdata
+
+
+class ArchiveEvent(Event):
+ __slots__ = ("_cid",)
+
+ _cid: "ContractId"
+
+ def __init__(self, cid: "ContractId"):
+ object.__setattr__(self, "_cid", cid)
+
+ @property
+ def cid(self) -> "ContractId":
+ return self._cid
+
+
+class ExerciseResponse:
+ __slots__ = "_result", "_events"
+
+ _result: "Optional[Any]"
+ _events: "Sequence[Union[CreateEvent, ArchiveEvent]]"
+
+ def __init__(
+ self, result: "Optional[Any]", events: "Sequence[Union[CreateEvent, ArchiveEvent]]"
+ ):
+ object.__setattr__(self, "_result", result)
+ object.__setattr__(self, "_events", tuple(events))
+
+ @property
+ def result(self) -> "Optional[Any]":
+ return self._result
+
+ @property
+ def events(self) -> "Sequence[Union[CreateEvent, ArchiveEvent]]":
+ return self._events
+
+ def __repr__(self):
+ return f"ExerciseResponse(result={self.result}, events={self.events})"
+
+
+class PartyInfo:
+ __slots__ = "_party", "_display_name", "_is_local"
+
+ _party: "Party"
+ _display_name: str
+ _is_local: bool
+
+ def __init__(self, party: "Party", display_name: str, is_local: bool):
+ object.__setattr__(self, "_party", party)
+ object.__setattr__(self, "_display_name", display_name)
+ object.__setattr__(self, "_is_local", is_local)
+
+ @property
+ def party(self) -> "Party":
+ return self._party
+
+ @property
+ def display_name(self) -> str:
+ return self._display_name
+
+ @property
+ def is_local(self) -> bool:
+ return self._is_local
diff --git a/python/dazl/protocols/errors.py b/python/dazl/protocols/errors.py
index ece29db5..8646c5e6 100644
--- a/python/dazl/protocols/errors.py
+++ b/python/dazl/protocols/errors.py
@@ -2,7 +2,13 @@
# SPDX-License-Identifier: Apache-2.0
from ..prim import DazlError
-__all__ = ["ConnectionTimeoutError", "UserTerminateRequest"]
+__all__ = [
+ "ConnectionTimeoutError",
+ "UserTerminateRequest",
+ "StreamError",
+ "ProtocolWarning",
+ "CallbackReturnWarning",
+]
class ConnectionTimeoutError(DazlError):
@@ -15,3 +21,16 @@ class UserTerminateRequest(DazlError):
"""
Raised when the user has initiated a request to terminate the application.
"""
+
+
+class StreamError:
+ """
+ An error that arises when trying to read from a query stream.
+ """
+
+
+class ProtocolWarning(Warning):
+ """
+ Warnings that are raised when dazl detects incompatibilities between the Ledger API server-side
+ implementation and dazl.
+ """
diff --git a/python/dazl/protocols/v1/pb_parse_event.py b/python/dazl/protocols/v1/pb_parse_event.py
index b33fadc5..bbe0bb4c 100644
--- a/python/dazl/protocols/v1/pb_parse_event.py
+++ b/python/dazl/protocols/v1/pb_parse_event.py
@@ -38,6 +38,7 @@
TransactionFilter,
TransactionStartEvent,
)
+from ..ledgerapi.codec_aio import Codec
DECODER = ProtobufDecoder()
diff --git a/python/docs/index.rst b/python/docs/index.rst
index f0b0aa6f..444241ab 100644
--- a/python/docs/index.rst
+++ b/python/docs/index.rst
@@ -9,45 +9,48 @@ dazl: DA client library for Python
Dependencies
------------
-You will need Python 3.6 or later and a Digital Asset ledger implementation (DA Sandbox or
-DA Ledger Server).
-
-Build-time dependencies are handled using `Poetry `_.
+You will need Python 3.6 or later and a Daml Ledger.
Getting Started
---------------
-This section assumes that you already have a running ledger with the standard `daml new` model loaded, and have imported `dazl`.
+This section assumes that you already have a running ledger with the standard `daml new` model
+loaded, and have imported `dazl`.
Connect to the ledger and submit a single command::
- with dazl.simple_client('http://localhost:6865', 'Alice') as client:
- contract = { 'issuer' : 'Alice', 'owner' : 'Alice', 'name' : 'hello world!' }
- client.ready()
- client.submit_create('Main.Asset', contract)
+ import dazl
+
+ async with dazl.connect('http://localhost:6865', 'Alice') as conn:
+ conn.create('Main:Asset', {'issuer': 'Alice', 'owner': 'Alice', 'name': 'hello world!'})
Connect to the ledger as a single party, print all contracts, and close::
- with dazl.simple_client('http://localhost:7600', 'Alice') as client:
- # wait for the ACS to be fully read
- client.ready()
- contract_dict = client.find_active('*')
- print(contract_dict)
+ import dazl
+
+ async with dazl.connect('http://localhost:6865', 'Alice') as conn:
+ contracts = {}
+ async for event in conn.query():
+ contracts[event.cid] = event.cdata
+ print(contracts)
Connect to the ledger using asynchronous callbacks::
- network = dazl.Network()
- network.set_config(url='http://localhost:6865')
+ import dazl
+
+ async with dazl.connect('http://localhost:6865', 'Alice') as conn:
+ contracts = {}
+ @conn.on_create
+ def _(event):
+ contracts[event.cid] = event.cdata
+ print(contracts)
- alice = network.aio_party('Alice')
+Code
+----
- @alice.ledger_ready()
- async def onReady(event):
- contracts = await event.acs_find_one('Main.Asset')
- print(contracts)
+Build-time dependencies are handled using `Poetry `_.
- network.run_until_complete()
Table of Contents
-----------------
diff --git a/python/docs/migrating.rst b/python/docs/migrating.rst
index c3f7137c..9c6f1df4 100644
--- a/python/docs/migrating.rst
+++ b/python/docs/migrating.rst
@@ -5,8 +5,184 @@
Migrate
#######
-Migrating from dazl v6 from v7
-==============================
+Migrating to dazl v7.5
+======================
+
+dazl 7.5 introduced a new API for connecting to the Ledger API that embraces design patterns and
+technical capabilities that have been introduced to Daml and some of the underlying Python libraries
+over the last few years.
+
+* Daml Multi-party submissions (as of Daml Connect 1.9):
+ https://daml.com/blog/engineering/roles-in-daml-introducing-multi-party-submissions/
+
+ Occasionally it had been necessary for applications to listen to multiple streams as different
+ parties and submit commands based on information. With multi-party submissions, relationships
+ such as public information and group membership is easier to model with parties, which removes
+ the need for clients to attempt to keep otherwise independent streams correlated. As such,
+ :class:`dazl.Network` is deprecated in favor of a lighterweight API that is more explicitly
+ focused on single connections.
+
+* Daml HTTP JSON API (stable as of DAML SDK 1.3.0)
+
+ While not yet directly supported (this is planned for dazl v8), the HTTP JSON API supports most
+ use-cases for applications that need to communicate with a ledger. The new API is slightly
+ restructured for both compatibility with the HTTP JSON API and parity with the JavaScript
+ `@daml/ledger `_ library.
+
+* Daml Authentication/authorization using JSON Web Tokens (JWT)
+
+ dazl was originally built with the simple assumption that a ``Party`` exists 1:1 to a set of
+ credentials. As the Daml Ledger API evolved, this assumption no longer holds. The new dazl API
+ treats tokens more as a
+
+* Daml return values from exercise choices
+
+ Exercise return values have been available from the Ledger API since Daml was open sourced back
+ in April 2019. dazl's API has finally been updated to take this feature into account.
+
+* "Unsubscribing" from streams
+
+ ``dazl`` has never had a straightforward way of abandoning event callbacks that were no longer
+ needed. The new API makes stream lifecycle more explicit and the responsibility of the user of the
+ library. Disposing of streams is now simpler to reason about.
+
+* Native gRPC support for asyncio: https://github.com/grpc/proposal/pull/155
+
+ As of gRPC 1.32.0, the Python gRPC libraries natively support ``asyncio``. This, combined with
+ client streams that are no longer coupled to each other, means the internals of ``dazl`` are
+ significantly simpler while also improving performance.
+
+The changes:
+
+``dazl.Network``, which has been the primary entry point for dazl code since dazl v5, is now
+deprecated. Transitional releases (starting with v7.5) will include both APIs, and ``dazl.Network``
+will be fully removed in dazl v8.
+
+To ease the transition, you can simply replace ``dazl.Network`` with ``dazl.ConnectionFactory``,
+but there are some important semantic differences between these APIs:
+
+* Old-style template names are not supported with ``dazl.ConnectionFactory``. If you were using
+ template names such as "Some.Module.Contract" instead of "Some.Module:Contract", this is the time
+ to change.
+* Callbacks from a ``dazl.ConnectionFactory`` that _return_ commands will raise warnings
+ (though they will still function as expected). These warnings are raised to help you find
+ examples of callbacks that will need to be reworked when transitioning to the new API.
+* Multiple calls to aio_party or simple_party for the same ``Party`` will still share an underlying
+ connection, but a warning will be raised. These warnings are raised to help you find examples of
+ places where you may be relying on connection sharing; connections are no longer automatically
+ shared in the new API.
+* Data streams will no longer be synchronized across Parties. If you were building up state from the
+ perspective of one party and using that information as a different party, you will experience
+ different behavior. This behavior is anyway generally frowned upon, but prior to the introduction
+ of multi-party submissions, occasionally necessary.
+
+
+.. code-block:: python
+
+ # dazl v5-v7
+ import dazl
+
+ network = dazl.Network()
+ network.set_config(url="http://localhost:6865")
+ client = network.aio_party("Alice")
+
+ @client.on_ledger_create("Some:Request")
+ def auto_accept(event):
+ return dazl.exercise(event.cid, "Accept")
+
+ network.run_forever()
+
+ # dazl v7.5 or later, transitional API
+ import dazl
+
+ network = dazl.ConnectionFactory()
+ network.set_config(url="http://localhost:6865")
+ client = network.aio_party("Alice")
+
+ @client.on_ledger_create("Some:Request")
+ def auto_accept(event):
+ return dazl.exercise(event.cid, "Accept")
+
+ network.run_forever()
+
+ # dazl v7.5 or later, new API
+ import asyncio, dazl
+
+ async def main():
+ async with dazl.connect("http://localhost:6865", "Alice") as conn:
+ async for event in conn.stream("Some:Request"):
+ await conn.exercise(event.cid, "Accept")
+
+ asyncio.run(main())
+
+A multi-party example. Note that because there is no more ``Network`` to tie connections together,
+there are no guarantees that ``Alice`` and ``Bob`` receive events at around the same time. You
+should generally
+
+.. code-block:: python
+
+ # dazl v5-v7
+ import dazl
+
+ network = dazl.Network()
+ network.set_config(url="http://localhost:6865")
+
+ client_alice = network.aio_party("Alice")
+ client_bob = network.aio_party("Bob")
+
+ @client_alice.on_ledger_create("Some:Request")
+ def auto_accept(event):
+ return dazl.exercise(event.cid, "Accept")
+
+ @client_bob.on_ledger_create("Some:Request")
+ def auto_accept(event):
+ return dazl.exercise(event.cid, "Accept"))
+
+ network.run()
+
+ # dazl v7.5 or later, transitional API
+ import dazl
+
+ network = dazl.ConnectionFactory()
+ network.set_config(url="http://localhost:6865")
+
+ client_alice = network.aio_party("Alice")
+ client_bob = network.aio_party("Bob")
+
+ @client_alice.on_ledger_create("Some:Request")
+ def auto_accept(event):
+ # changed to avoid warnings, even though it still works the old way
+ # return dazl.exercise(event.cid, "Accept")
+ return client_alice.submit_exercise(event.cid, "Accept")
+
+ @client_bob.on_ledger_create("Some:Request")
+ def auto_accept(event):
+ # changed to avoid warnings, even though it still works the old way
+ # return dazl.exercise(event.cid, "Accept"))
+ return client_bob.submit_exercise(event.cid, "Accept")
+
+ network.run()
+
+ # dazl v7.5 or later, new API
+ import asyncio, dazl
+
+ async def main_alice():
+ async with dazl.connect("http://localhost:6865", "Alice") as conn:
+ async for event in conn.stream("Some:Request"):
+ await conn.exercise(event.cid, "Accept")
+
+ async def main_bob():
+ async with dazl.connect("http://localhost:6865", "Bob") as conn:
+ async for event in conn.stream("Some:Request"):
+ await conn.exercise(event.cid, "Accept")
+
+ # Python 3.7+
+ asyncio.run(asyncio.gather(main_alice(), main_bob()))
+
+
+
+Migrating to dazl v7
+====================
Template formats
----------------
@@ -72,16 +248,14 @@ Deprecated symbols in the `dazl.damlast` and `dazl.model` packages have been rem
| ``dazl.model.types.TypeReference.full_name_unambiguous`` property | :func:`dazl.damlast.util.package_local_name()` |
+-----------------------------------------------------------------------+------------------------------------------------+
-Migrating from dazl v5 from v6
-==============================
+Migrating to v6
+===============
No major breaking API changes were introduced in the v6 release.
-Migrating from dazl v5 from v4
-==============================
-
-
+Migrating to v5
+===============
Library Initialization
----------------------
diff --git a/python/mypy.ini b/python/mypy.ini
index fa25d222..34c32b5c 100644
--- a/python/mypy.ini
+++ b/python/mypy.ini
@@ -1,5 +1,8 @@
[mypy]
+[mypy-grpc.*]
+ignore_missing_imports = True
+
[mypy-google.protobuf.pyext.*]
ignore_missing_imports = True
diff --git a/python/tests/unit/test_all_party.py b/python/tests/unit/test_all_party.py
index 00473618..31a115b1 100644
--- a/python/tests/unit/test_all_party.py
+++ b/python/tests/unit/test_all_party.py
@@ -1,12 +1,13 @@
# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
-
+import asyncio
import logging
import uuid
import pytest
-from dazl import Party, async_network, create
+from dazl import Party, async_network, connect, create
+from dazl.util.io import get_bytes
from .dars import AllParty as AllPartyDar
@@ -15,14 +16,15 @@
@pytest.mark.asyncio
-async def test_some_party_receives_public_contract(sandbox):
+@pytest.mark.parametrize("future_api", [False, True])
+async def test_some_party_receives_public_contract(sandbox, future_api):
some_party_cids = []
publisher_cids = []
# TODO: Switch to a Party allocation API when available.
all_party = Party(str(uuid.uuid4()))
- async with async_network(url=sandbox, dars=AllPartyDar) as network:
+ async with async_network(url=sandbox, dars=AllPartyDar, future_api=future_api) as network:
network.set_config(party_groups=[all_party])
some_client = network.aio_new_party()
@@ -43,7 +45,20 @@ async def test_some_party_receives_public_contract(sandbox):
publisher_client.add_ledger_created(PublicContract, lambda e: publisher_cids.append(e.cid))
publisher_client.add_ledger_created(PrivateContract, lambda e: publisher_cids.append(e.cid))
- network.start()
+ if future_api:
+ # the new API can't make guarantees that events and consequence commands are correlated
+ # any more, so the caller now has to do this
+ async def wait_for_contracts():
+ while not (some_party_cids == 2 and publisher_cids == 1):
+ await asyncio.sleep(1.0)
+
+ await asyncio.wait_for(
+ network.aio_run(wait_for_contracts(), keep_open=False), timeout=5.0
+ )
+ else:
+ # the old API tracks follow-up commands and subsequent events that those commands have
+ # triggered, but at a higher implementation cost and less flexibility
+ network.start()
logging.info(
"got to the end with some_party contracts: %s and publisher contracts: %s",
@@ -53,3 +68,36 @@ async def test_some_party_receives_public_contract(sandbox):
assert len(some_party_cids) == 2
assert len(publisher_cids) == 1
+
+
+@pytest.mark.asyncio
+async def test_multi_party_receives_all_contracts(sandbox):
+ async with connect(url=sandbox, admin=True) as conn:
+ await conn.upload_package(get_bytes(AllPartyDar))
+ some_party = await conn.allocate_party("someParty")
+ all_party = await conn.allocate_party("allParty")
+ publisher = await conn.allocate_party("publisher")
+
+ async with connect(url=sandbox, act_as=publisher.party) as publisher_conn, connect(
+ url=sandbox, act_as=some_party.party, read_as=all_party.party
+ ) as some_conn:
+ # create two contracts; the underlying submit call is guaranteed to block so from the
+ # perspective of the caller, waiting for these calls to complete is enough to ensure these
+ # contracts are actually there
+ await some_conn.create(PrivateContract, {"someParty": some_party.party})
+ await publisher_conn.create(
+ PublicContract, {"publisher": publisher.party, "allParty": all_party.party}
+ )
+
+ assert await count_contracts(some_conn, PublicContract) == 1
+ assert await count_contracts(some_conn, PrivateContract) == 1
+ assert await count_contracts(publisher_conn, PublicContract) == 1
+ assert await count_contracts(publisher_conn, PrivateContract) == 0
+
+
+async def count_contracts(conn, template_name) -> int:
+ async with conn.query(template_name) as stream:
+ count = 0
+ async for _ in stream:
+ count += 1
+ return count
diff --git a/python/tests/unit/test_api_consistency.py b/python/tests/unit/test_api_consistency.py
index 2764fac3..bc0dcd2f 100644
--- a/python/tests/unit/test_api_consistency.py
+++ b/python/tests/unit/test_api_consistency.py
@@ -1,8 +1,10 @@
# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+import warnings
from dazl import Party
-from dazl.client.api import AIOPartyClient, SimplePartyClient
+from dazl.client.api import AIOPartyClient, Network as V5_Network, SimplePartyClient
+from dazl.compat import Network as V8_Network, NotSupportedError
def test_api_consistency():
@@ -14,15 +16,36 @@ def test_api_consistency():
apc = AIOPartyClient(impl)
tpc = SimplePartyClient(impl)
- callable_apcs = {
- key: getattr(apc, key)
- for key in dir(apc)
- if not key.startswith("_") and callable(getattr(apc, key))
- }
- callable_tpcs = {
- key: getattr(tpc, key)
- for key in dir(tpc)
- if not key.startswith("_") and callable(getattr(tpc, key))
- }
-
- assert sorted(set(callable_apcs.keys())) == sorted(set(callable_tpcs.keys()))
+ callable_apcs = {key for key in dir(apc) if is_public_symbol(apc, key)}
+ callable_tpcs = {key for key in dir(tpc) if is_public_symbol(tpc, key)}
+
+ assert sorted(callable_apcs) == sorted(callable_tpcs)
+
+
+def test_api_consistency_v8_compat():
+ """
+ Ensure the dazl.compat.v8.Network class exposes the same symbols as dazl.client.api.Network.
+ """
+ n5 = V5_Network()
+ n8 = V8_Network()
+
+ callable_v5_symbols = {key for key in dir(n5) if is_public_symbol(n5, key)}
+ callable_v8_symbols = {key for key in dir(n8) if is_public_symbol(n8, key)}
+
+ assert sorted(callable_v5_symbols) == sorted(callable_v8_symbols)
+
+
+def is_public_symbol(obj, key):
+ if key.startswith("_"):
+ return False
+
+ try:
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ val = getattr(obj, key)
+ except NotSupportedError:
+ # we use NotSupportedError as a signal that an API is not supported, even in the
+ # transitional API; however we're
+ return True
+
+ return callable(val)
diff --git a/python/tests/unit/test_protocol_ledgerapi.py b/python/tests/unit/test_protocol_ledgerapi.py
new file mode 100644
index 00000000..f1d0158e
--- /dev/null
+++ b/python/tests/unit/test_protocol_ledgerapi.py
@@ -0,0 +1,48 @@
+# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+
+import pytest
+
+from dazl.protocols.ledgerapi import connect
+
+from .dars import PostOffice
+
+
+@pytest.mark.asyncio
+async def test_protocol_ledger_api(sandbox):
+ # first, administrative stuff--upload the DAR and allocate two parties that we'll use later
+ async with connect(url=sandbox, admin=True) as conn:
+ await conn.upload_package(PostOffice.read_bytes())
+ postman = (await conn.allocate_party()).party
+ participant = (await conn.allocate_party()).party
+
+ async with connect(url=sandbox, act_as=postman, oauth_token=...) as conn:
+ event = await conn.create("Main:PostmanRole", {"postman": postman})
+ result = await conn.exercise(
+ event.cid, "InviteParticipant", {"party": participant, "address": "Somewhere!"}
+ )
+ logging.info("Result of inviting a participant: %s", result)
+
+ async with connect(url=sandbox, act_as=participant) as conn:
+ # Stream results for Main:InviteAuthorRole, and then Main:InviteReceiverRole. Then break the
+ # stream once we find the first contract.
+ #
+ # We do NOT use query() here, because in a distributed ledger setting, the result of the
+ # postman inviting participants may not yet have been observed by the clients. Instead, use
+ # stream() since it remains open until explicitly closed. We break the never-ending iterator
+ # as soon as we see one of each contract.
+ async with conn.stream("Main:InviteAuthorRole") as query:
+ async for event in query:
+ result = await conn.exercise(event.cid, "AcceptInviteAuthorRole")
+ logging.info("The result of AcceptInviteAuthorRole: %s", result)
+ break
+
+ async with conn.stream("Main:InviteReceiverRole") as query:
+ async for event in query:
+ result = await conn.exercise(event.cid, "AcceptInviteReceiverRole")
+ logging.info("The result of AcceptInviteReceiverRole: %s", result)
+ break
+
+ logging.info("Done!")
diff --git a/python/tests/unit/test_v8_compat_events.py b/python/tests/unit/test_v8_compat_events.py
new file mode 100644
index 00000000..593ccd62
--- /dev/null
+++ b/python/tests/unit/test_v8_compat_events.py
@@ -0,0 +1,5 @@
+from dazl.compat.v8_events import ReadyEvent
+
+
+def test():
+ ReadyEvent(None)