diff --git a/VERSION b/VERSION index bf787a30..42cdd0b5 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.6.5 +5.7.0 diff --git a/python/dazl/client/_network_client_impl.py b/python/dazl/client/_network_client_impl.py index d0ffacc6..5d76714d 100644 --- a/python/dazl/client/_network_client_impl.py +++ b/python/dazl/client/_network_client_impl.py @@ -420,7 +420,7 @@ async def _init( LOG.info('Reading current ledger state...') # Bring all clients up to the reported head. if party_impls and first_offset or (metadata.offset and metadata.offset != '1-'): - await gather(*[party_impl.read_transactions(first_offset or metadata.offset, False) + await gather(*[party_impl.read_acs(first_offset or metadata.offset, False) for party_impl in party_impls]) LOG.debug('Preparing to raise the "ready" event...') diff --git a/python/dazl/client/_party_client_impl.py b/python/dazl/client/_party_client_impl.py index 785f4864..cd352c4d 100644 --- a/python/dazl/client/_party_client_impl.py +++ b/python/dazl/client/_party_client_impl.py @@ -23,7 +23,7 @@ from ..model.network import connection_settings from ..model.reading import BaseEvent, TransactionStartEvent, TransactionEndEvent, OffsetEvent, \ TransactionFilter, EventKey, ContractCreateEvent, ContractArchiveEvent, \ - sortable_offset_height, InitEvent + sortable_offset_height, InitEvent, ActiveContractSetEvent from ..model.writing import CommandBuilder, CommandDefaults, CommandPayload, \ EventHandlerResponse, Command from ..protocols import LedgerNetwork, LedgerClient @@ -165,11 +165,38 @@ def find_nonempty(self, template: Any, match: ContractMatch, min_count: int = 1, # region Read Path - async def read_transactions(self, until_offset: Optional[str], raise_events: bool) \ - -> Tuple[str, Future]: + async def read_acs(self, until_offset: 'Optional[str]', raise_events: bool) \ + -> 'Tuple[str, Future]': + """ + Initial bootstrap of events from the read side. Only one instance of this coroutine + should be active at a time per client. An initial block of events is read using the + Active Contract Set service, and the transaction stream is then subscribed to in order to + ensure that this client is caught up to the specified offset. + + :param until_offset: + The destination ledger offset to read up until to. If not included, then the client + attempts to read as many transactions as is currently available. + :param raise_events: + ``True`` to raise transaction- and contract-related events to event handlers; + ``False`` to suppress this behavior and only update local state within the client + (offset information and active contract set). + :return: + The ledger offset that the reader ended at and a Future that is resolved when all event + handlers' follow-ups have either successfully or unsuccessfully completed. + """ + LOG.info('Calling read_acs(%r, %r)', until_offset, raise_events) + client = await self._client_fut + acs = await client.active_contracts() + for acs_evt in acs: + await self._process_transaction_stream_event(acs_evt, False) + + return await self.read_transactions(until_offset, raise_events) + + async def read_transactions(self, until_offset: 'Optional[str]', raise_events: bool) \ + -> 'Tuple[str, Future]': """ Main processing method of events from the read side. Only one instance of this coroutine - should be active at a time. + should be active at a time per client. :param until_offset: The destination ledger offset to read up until to. If not included, then the client @@ -188,9 +215,7 @@ async def read_transactions(self, until_offset: Optional[str], raise_events: boo client = await self._client_fut metadata = await self._pool.ledger() - initial_offset = self._reader.offset event_count = 0 - futs = [] while (until_offset is None or @@ -254,6 +279,10 @@ def _process_transaction_stream_event(self, event: Any, raise_events: bool) -> F LOG.debug('evt recv: party %s, BIM %r (%s events)', self.party, event.command_id[0:7], len(event.contract_events)) + elif isinstance(event, ActiveContractSetEvent): + for contract_event in event.contract_events: + self._acs.handle_create(contract_event) + if raise_events: fut = self.emit_event(event) else: @@ -268,8 +297,11 @@ def _process_transaction_stream_event(self, event: Any, raise_events: bool) -> F # region Write path - def write_commands(self, commands: EventHandlerResponse, ignore_errors: bool = False, - workflow_id: Optional[str] = None) \ + def write_commands( + self, + commands: EventHandlerResponse, + ignore_errors: bool = False, + workflow_id: 'Optional[str]' = None) \ -> Awaitable[None]: """ Submit a command or list of commands. @@ -279,6 +311,8 @@ def write_commands(self, commands: EventHandlerResponse, ignore_errors: bool = F :param ignore_errors: Whether errors should be ignored for purposes of terminating the client. If ``True``, then a failure to send this command does not necessarily end the client. + :param workflow_id: + The workflow ID to use to tag submitted commands. :return: An ``asyncio.Future`` that is resolved right before the corresponding side effects have hit the event stream for this party. Synchronous errors are reported back immediately diff --git a/python/dazl/model/reading.py b/python/dazl/model/reading.py index 01de210b..ab5dc81e 100644 --- a/python/dazl/model/reading.py +++ b/python/dazl/model/reading.py @@ -76,6 +76,15 @@ class ReadyEvent(OffsetEvent): """ +class ActiveContractSetEvent(OffsetEvent): + """ + Event raised on initial read of the active contract set. + """ + def __init__(self, client, party, ledger_id, package_store, offset, contract_events): + super().__init__(client, party, None, ledger_id, package_store, offset) + self.contract_events = contract_events + + class BaseTransactionEvent(OffsetEvent): """ Event raised when dazl encounters a new transaction. This is raised before any corresponding diff --git a/python/dazl/protocols/_base.py b/python/dazl/protocols/_base.py index 9808a205..2aa6f8c3 100644 --- a/python/dazl/protocols/_base.py +++ b/python/dazl/protocols/_base.py @@ -92,7 +92,13 @@ async def commands(self, command_payload: CommandPayload) -> None: """ raise NotImplementedError('commands must be implemented') - async def events(self, transaction_filter: TransactionFilter) -> Sequence[BaseEvent]: + async def active_contracts(self) -> 'Sequence[BaseEvent]': + """ + Return the current active contract set. + """ + raise NotImplementedError('active contract set fetch must be implemented') + + async def events(self, transaction_filter: TransactionFilter) -> 'Sequence[BaseEvent]': """ Return events from a certain offset in the ledger. The number of blocks returned is implementation-defined. diff --git a/python/dazl/protocols/v1/grpc.py b/python/dazl/protocols/v1/grpc.py index d5b650fe..2a1b7eee 100644 --- a/python/dazl/protocols/v1/grpc.py +++ b/python/dazl/protocols/v1/grpc.py @@ -4,7 +4,7 @@ """ Support for the gRPC-based Ledger API. """ -from asyncio import gather +from asyncio import gather, get_event_loop from datetime import datetime from threading import Thread, Event from typing import Awaitable, Iterable, Optional, Sequence @@ -15,8 +15,8 @@ from ... import LOG from .._base import LedgerClient, _LedgerConnection, _LedgerConnectionContext from .grpc_time import maybe_grpc_time_stream -from .pb_parse_event import serialize_request, to_transaction_events, \ - BaseEventDeserializationContext +from .pb_parse_event import serialize_acs_request, serialize_transactions_request, \ + to_acs_events, to_transaction_events, BaseEventDeserializationContext from .pb_parse_metadata import parse_daml_metadata_pb, parse_archive_payload, find_dependencies from ...model.core import Party from ...model.ledger import LedgerMetadata, StaticTimeModel, RealTimeModel @@ -32,7 +32,7 @@ class GRPCv1LedgerClient(LedgerClient): def __init__(self, connection: 'GRPCv1Connection', ledger: LedgerMetadata, party: Party): self.connection = safe_cast(GRPCv1Connection, connection) self.ledger = safe_cast(LedgerMetadata, ledger) - self.party = safe_cast(str, party) # type: Party + self.party = Party(safe_cast(str, party)) def commands(self, commands: CommandPayload) -> None: serializer = self.ledger.serializer @@ -40,12 +40,30 @@ def commands(self, commands: CommandPayload) -> None: return self.connection.context.run_in_background( lambda: self.connection.command_service.SubmitAndWait(request)) + def active_contracts(self) -> 'Awaitable[Sequence[BaseEvent]]': + results_future = get_event_loop().create_future() + request = serialize_acs_request(self.ledger.ledger_id, self.party) + + context = BaseEventDeserializationContext( + None, self.ledger.store, self.party, self.ledger.ledger_id) + + def on_acs_done(fut): + try: + acs_stream = fut.result() + events = to_acs_events(context, acs_stream) + results_future.set_result(events) + except Exception as ex: + results_future.set_exception(ex) + + acs_future = self.connection.context.run_in_background( + lambda: self.connection.active_contract_set_service.GetActiveContracts(request)) + acs_future.add_done_callback(on_acs_done) + return results_future + def events(self, transaction_filter: TransactionFilter) \ -> Awaitable[Sequence[BaseEvent]]: - request = serialize_request(transaction_filter, self.party) - - import asyncio - results_future = asyncio.get_event_loop().create_future() + results_future = get_event_loop().create_future() + request = serialize_transactions_request(transaction_filter, self.party) context = BaseEventDeserializationContext( None, self.ledger.store, self.party, transaction_filter.ledger_id) @@ -244,6 +262,7 @@ def __init__(self, self._closed = Event() self._channel = grpc_create_channel(settings) + self.active_contract_set_service = G.ActiveContractsServiceStub(self._channel) self.command_service = G.CommandServiceStub(self._channel) self.transaction_service = G.TransactionServiceStub(self._channel) self.package_service = G.PackageServiceStub(self._channel) diff --git a/python/dazl/protocols/v1/model/__init__.py b/python/dazl/protocols/v1/model/__init__.py index 85f3c4ca..9af866cf 100644 --- a/python/dazl/protocols/v1/model/__init__.py +++ b/python/dazl/protocols/v1/model/__init__.py @@ -1,6 +1,9 @@ # Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +from ...._gen.com.digitalasset.ledger.api.v1.active_contracts_service_pb2 import \ + GetActiveContractsRequest, GetActiveContractsResponse +from ...._gen.com.digitalasset.ledger.api.v1.active_contracts_service_pb2_grpc import ActiveContractsServiceStub from ...._gen.com.digitalasset.ledger.api.v1.commands_pb2 import Command, CreateCommand, ExerciseCommand from ...._gen.com.digitalasset.ledger.api.v1.command_service_pb2_grpc import CommandServiceStub from ...._gen.com.digitalasset.ledger.api.v1.command_submission_service_pb2 import SubmitRequest diff --git a/python/dazl/protocols/v1/pb_parse_event.py b/python/dazl/protocols/v1/pb_parse_event.py index 0bb5faba..aa5bc5ef 100644 --- a/python/dazl/protocols/v1/pb_parse_event.py +++ b/python/dazl/protocols/v1/pb_parse_event.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from datetime import datetime from decimal import Decimal -from typing import Any, Dict, Iterable, List, Optional, Sequence +from typing import Any, Dict, Iterable, List, Optional, Sequence, Union # noinspection PyPackageRequirements from google.protobuf.empty_pb2 import Empty @@ -16,11 +16,12 @@ from ...model.core import ContractId from ...model.reading import BaseEvent, TransactionFilter, ContractCreateEvent, \ TransactionStartEvent, TransactionEndEvent, ContractArchiveEvent, OffsetEvent, \ - ContractExercisedEvent + ContractExercisedEvent, ActiveContractSetEvent from ...model.types import RecordType, Type, VariantType, ContractIdType, ListType, \ TypeEvaluationContext, type_evaluate_dispatch_default_error, MapType, OptionalType from ...model.types_store import PackageStore from ...util.prim_types import to_date, to_datetime, to_hashable, frozendict +from ...util.typing import safe_cast @dataclass(frozen=True) @@ -36,6 +37,12 @@ class BaseEventDeserializationContext: def offset_event(self, time: datetime, offset: str) -> OffsetEvent: return OffsetEvent(self.client, self.party, time, self.ledger_id, self.store, offset) + def active_contract_set(self, offset: str, workflow_id: str) \ + -> 'ActiveContractSetEventDeserializationContext': + return ActiveContractSetEventDeserializationContext( + self.client, self.store, self.party, self.ledger_id, + offset, workflow_id) + def transaction(self, time: datetime, offset: str, command_id: str, workflow_id: str) \ -> 'TransactionEventDeserializationContext': return TransactionEventDeserializationContext( @@ -43,6 +50,25 @@ def transaction(self, time: datetime, offset: str, command_id: str, workflow_id: time, offset, command_id, workflow_id) +@dataclass(frozen=True) +class ActiveContractSetEventDeserializationContext(BaseEventDeserializationContext): + """ + Attributes required throughout the deserialization of the Active Contract Set. + """ + offset: str + workflow_id: str + + def active_contract_set_event(self, contract_events: 'Sequence[ContractCreateEvent]') \ + -> 'ActiveContractSetEvent': + return ActiveContractSetEvent(self.client, self.party, self.ledger_id, self.store, + self.offset, contract_events) + + def contract_created_event(self, cid, cdata, event_id, witness_parties) -> ContractCreateEvent: + return ContractCreateEvent(self.client, self.party, None, self.ledger_id, self.store, + self.offset, cid, cdata, '', self.workflow_id, + event_id, witness_parties) + + @dataclass(frozen=True) class TransactionEventDeserializationContext(BaseEventDeserializationContext): """ @@ -83,7 +109,7 @@ def contract_archived_event(self, cid, cdata, event_id, witness_parties) \ event_id, witness_parties) -def serialize_request(transaction_filter: TransactionFilter, party: str) \ +def serialize_transactions_request(transaction_filter: TransactionFilter, party: str) \ -> 'G.GetTransactionsRequest': from . import model as G if transaction_filter.current_offset is not None: @@ -115,6 +141,16 @@ def serialize_request(transaction_filter: TransactionFilter, party: str) \ filter=tr_filter) +def serialize_acs_request(ledger_id: str, party: str): + from . import model as G + + filters_by_party = {party: G.Filters()} + tr_filter = G.TransactionFilter(filters_by_party=filters_by_party) + return G.GetActiveContractsRequest( + ledger_id=ledger_id, + filter=tr_filter) + + def to_transaction_events( context: 'BaseEventDeserializationContext', tx_stream_pb: 'Iterable[G.GetTransactionsResponse]', @@ -162,6 +198,14 @@ def to_transaction_events( return events +def to_acs_events( + context: 'BaseEventDeserializationContext', + acs_stream_pb: 'Iterable[Any]') -> 'Sequence[ActiveContractSetEvent]': + return [acs_evt + for acs_response_pb in acs_stream_pb + for acs_evt in to_acs_event(context, acs_response_pb)] + + def from_transaction_tree(context: 'BaseEventDeserializationContext', tt_pb: 'G.TransactionTree') \ -> 'Sequence[OffsetEvent]': @@ -180,6 +224,15 @@ def from_transaction_tree(context: 'BaseEventDeserializationContext', tt_pb: 'G. return events +def to_acs_event(context: BaseEventDeserializationContext, acs_pb: 'G.ActiveContractSetResponse'): + acs_context = context.active_contract_set(acs_pb.offset, acs_pb.workflow_id) + contract_events = \ + [evt + for evt in (to_created_event(acs_context, evt_pb) for evt_pb in acs_pb.active_contracts) + if evt is not None] + return [acs_context.active_contract_set_event(contract_events)] + + def to_transaction_chunk(context: BaseEventDeserializationContext, tx_pb: 'G.Transaction') \ -> 'Sequence[OffsetEvent]': """ @@ -208,95 +261,117 @@ def to_transaction_chunk(context: BaseEventDeserializationContext, tx_pb: 'G.Tra t_context.transaction_end_event(contract_events)] -def to_event(context: TransactionEventDeserializationContext, evt_pb: 'G.Event') \ +def to_event( + context: 'Union[TransactionEventDeserializationContext, ActiveContractSetEventDeserializationContext]', + evt_pb: 'G.Event') \ -> 'Optional[BaseEvent]': try: event_type = evt_pb.WhichOneof('event') except ValueError: - event_type = evt_pb.WhichOneof('kind') + try: + event_type = evt_pb.WhichOneof('kind') + except ValueError: + LOG.error('Deserialization error into an event of %r', evt_pb) + raise if 'created' == event_type: - cr = evt_pb.created - search_str = f'{cr.template_id.name}@{cr.template_id.package_id}' + return to_created_event(context, evt_pb.created) + elif 'exercised' == event_type: + return to_exercised_event(context, evt_pb.exercised) + elif 'archived' == event_type: + return to_archived_event(context, evt_pb.archived) + else: + raise ValueError(f'unknown event type: {event_type}') - candidates = context.store.resolve_template_type(search_str) - if len(candidates) == 0: - LOG.warning('Could not find metadata for %s!', search_str) - return None - elif len(candidates) > 1: - LOG.error('The template identifier %s is not unique within its metadata!', candidates) - return None - ((type_ref, tt),) = candidates.items() +def to_created_event( + context: 'Union[TransactionEventDeserializationContext, ActiveContractSetEventDeserializationContext]', + cr: 'G.CreatedEvent') \ + -> 'Optional[ContractCreateEvent]': + search_str = f'{cr.template_id.name}@{cr.template_id.package_id}' - tt_context = TypeEvaluationContext.from_store(context.store) + candidates = context.store.resolve_template_type(search_str) + if len(candidates) == 0: + LOG.warning('Could not find metadata for %s!', search_str) + return None + elif len(candidates) > 1: + LOG.error('The template identifier %s is not unique within its metadata!', candidates) + return None - cid = ContractId(cr.contract_id, type_ref) - cdata = to_record(tt_context, tt, cr.create_arguments) - event_id = cr.event_id - witness_parties = tuple(cr.witness_parties) + ((type_ref, tt),) = candidates.items() - return context.contract_created_event(cid, cdata, event_id, witness_parties) + tt_context = TypeEvaluationContext.from_store(context.store) - elif 'exercised' == event_type: - er = evt_pb.exercised - search_str = f'{er.template_id.name}@{er.template_id.package_id}' + cid = ContractId(cr.contract_id, type_ref) + cdata = to_record(tt_context, tt, cr.create_arguments) + event_id = cr.event_id + witness_parties = tuple(cr.witness_parties) - candidates = context.store.resolve_template_type(search_str) - if len(candidates) == 0: - LOG.warning('Could not find metadata for %s!', search_str) - return None - elif len(candidates) > 1: - LOG.error('The template identifier %s is not unique within its metadata!', candidates) - return None + return context.contract_created_event(cid, cdata, event_id, witness_parties) - ((type_ref, tt),) = candidates.items() - tt_context = TypeEvaluationContext.from_store(context.store) - choice_candidates = context.store.resolve_choice(type_ref, er.choice) - if len(candidates) == 0: - LOG.warning('Could not find metadata for %s!', search_str) - return None - elif len(candidates) > 1: - LOG.error('The template identifier %s is not unique within its metadata!', candidates) - return None +def to_exercised_event( + context: 'TransactionEventDeserializationContext', + er: 'G.ExercisedEvent') \ + -> 'Optional[ContractExercisedEvent]': + search_str = f'{er.template_id.name}@{er.template_id.package_id}' + + candidates = context.store.resolve_template_type(search_str) + if len(candidates) == 0: + LOG.warning('Could not find metadata for %s!', search_str) + return None + elif len(candidates) > 1: + LOG.error('The template identifier %s is not unique within its metadata!', candidates) + return None - ((choice_ref, cc),) = choice_candidates.items() + ((type_ref, tt),) = candidates.items() - cid = ContractId(er.contract_id, type_ref) - event_id = er.event_id - witness_parties = tuple(er.witness_parties) - contract_creating_event_id = er.contract_creating_event_id - choice = er.choice - choice_args = to_natural_type(tt_context, cc.data_type, er.choice_argument) - acting_parties = tuple(er.acting_parties) - consuming = er.consuming - child_event_ids = er.child_event_ids + tt_context = TypeEvaluationContext.from_store(context.store) + choice_candidates = context.store.resolve_choice(type_ref, er.choice) + if len(candidates) == 0: + LOG.warning('Could not find metadata for %s!', search_str) + return None + elif len(candidates) > 1: + LOG.error('The template identifier %s is not unique within its metadata!', candidates) + return None - return context.contract_exercised_event( - cid, None, event_id, witness_parties, contract_creating_event_id, - choice, choice_args, acting_parties, consuming, child_event_ids) + ((choice_ref, cc),) = choice_candidates.items() - elif 'archived' == event_type: - ar = evt_pb.archived - search_str = f'{ar.template_id.name}@{ar.template_id.package_id}' + cid = ContractId(er.contract_id, type_ref) + event_id = er.event_id + witness_parties = tuple(er.witness_parties) + contract_creating_event_id = er.contract_creating_event_id + choice = er.choice + choice_args = to_natural_type(tt_context, cc.data_type, er.choice_argument) + acting_parties = tuple(er.acting_parties) + consuming = er.consuming + child_event_ids = er.child_event_ids - candidates = context.store.resolve_template_type(search_str) - if len(candidates) == 0: - LOG.warning('Could not find metadata for %s!', search_str) - return None - elif len(candidates) > 1: - LOG.error('The template identifier %s is not unique within its metadata!', candidates) - return None + return context.contract_exercised_event( + cid, None, event_id, witness_parties, contract_creating_event_id, + choice, choice_args, acting_parties, consuming, child_event_ids) - ((type_ref, tt),) = candidates.items() - event_id = ar.event_id - witness_parties = tuple(ar.witness_parties) - cid = ContractId(ar.contract_id, type_ref) - return context.contract_archived_event(cid, None, event_id, witness_parties) - else: - raise ValueError(f'unknown event type: {event_type}') +def to_archived_event( + context: 'TransactionEventDeserializationContext', + ar: 'G.ArchivedEvent') \ + -> 'Optional[ContractArchiveEvent]': + search_str = f'{ar.template_id.name}@{ar.template_id.package_id}' + + candidates = context.store.resolve_template_type(search_str) + if len(candidates) == 0: + LOG.warning('Could not find metadata for %s!', search_str) + return None + elif len(candidates) > 1: + LOG.error('The template identifier %s is not unique within its metadata!', candidates) + return None + + ((type_ref, tt),) = candidates.items() + event_id = ar.event_id + witness_parties = tuple(ar.witness_parties) + + cid = ContractId(ar.contract_id, type_ref) + return context.contract_archived_event(cid, None, event_id, witness_parties) def to_natural_type(context: TypeEvaluationContext, data_type: Type, obj: 'G.Value') -> Any: diff --git a/python/dazl/protocols/v1/pb_parse_metadata.py b/python/dazl/protocols/v1/pb_parse_metadata.py index 22acc781..fae4cea2 100644 --- a/python/dazl/protocols/v1/pb_parse_metadata.py +++ b/python/dazl/protocols/v1/pb_parse_metadata.py @@ -188,7 +188,7 @@ def parse_daml_metadata_pb(package_id: str, metadata_pb: Any) -> 'PackageStore': 'The template %s was of type %s; only records are supported for templates', tt, data_type) - LOG.debug('Fully registered all types.') + LOG.debug('Fully registered all types for package ID %r', package_id) return psb.build() diff --git a/python/pyproject.toml b/python/pyproject.toml index 2bab9e78..bfe3f71d 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dazl" -version = "5.6.5" +version = "5.7.0" description = "high-level Ledger API client for DAML ledgers" license = "Apache-2.0" authors = ["Davin K. Tanabe "]