Skip to content

Commit

Permalink
Merge pull request #10 from digital-asset/python-acs-integration
Browse files Browse the repository at this point in the history
python: Make initial data reads come from the ACS
  • Loading branch information
da-tanabe authored Jun 17, 2019
2 parents 72b000d + 2481ab8 commit 1758480
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 91 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.6.5
5.7.0
2 changes: 1 addition & 1 deletion python/dazl/client/_network_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ async def _init(
LOG.info('Reading current ledger state...')
# Bring all clients up to the reported head.
if party_impls and first_offset or (metadata.offset and metadata.offset != '1-'):
await gather(*[party_impl.read_transactions(first_offset or metadata.offset, False)
await gather(*[party_impl.read_acs(first_offset or metadata.offset, False)
for party_impl in party_impls])

LOG.debug('Preparing to raise the "ready" event...')
Expand Down
50 changes: 42 additions & 8 deletions python/dazl/client/_party_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from ..model.network import connection_settings
from ..model.reading import BaseEvent, TransactionStartEvent, TransactionEndEvent, OffsetEvent, \
TransactionFilter, EventKey, ContractCreateEvent, ContractArchiveEvent, \
sortable_offset_height, InitEvent
sortable_offset_height, InitEvent, ActiveContractSetEvent
from ..model.writing import CommandBuilder, CommandDefaults, CommandPayload, \
EventHandlerResponse, Command
from ..protocols import LedgerNetwork, LedgerClient
Expand Down Expand Up @@ -165,11 +165,38 @@ def find_nonempty(self, template: Any, match: ContractMatch, min_count: int = 1,

# region Read Path

async def read_transactions(self, until_offset: Optional[str], raise_events: bool) \
-> Tuple[str, Future]:
async def read_acs(self, until_offset: 'Optional[str]', raise_events: bool) \
-> 'Tuple[str, Future]':
"""
Initial bootstrap of events from the read side. Only one instance of this coroutine
should be active at a time per client. An initial block of events is read using the
Active Contract Set service, and the transaction stream is then subscribed to in order to
ensure that this client is caught up to the specified offset.
:param until_offset:
The destination ledger offset to read up until to. If not included, then the client
attempts to read as many transactions as is currently available.
:param raise_events:
``True`` to raise transaction- and contract-related events to event handlers;
``False`` to suppress this behavior and only update local state within the client
(offset information and active contract set).
:return:
The ledger offset that the reader ended at and a Future that is resolved when all event
handlers' follow-ups have either successfully or unsuccessfully completed.
"""
LOG.info('Calling read_acs(%r, %r)', until_offset, raise_events)
client = await self._client_fut
acs = await client.active_contracts()
for acs_evt in acs:
await self._process_transaction_stream_event(acs_evt, False)

return await self.read_transactions(until_offset, raise_events)

async def read_transactions(self, until_offset: 'Optional[str]', raise_events: bool) \
-> 'Tuple[str, Future]':
"""
Main processing method of events from the read side. Only one instance of this coroutine
should be active at a time.
should be active at a time per client.
:param until_offset:
The destination ledger offset to read up until to. If not included, then the client
Expand All @@ -188,9 +215,7 @@ async def read_transactions(self, until_offset: Optional[str], raise_events: boo
client = await self._client_fut
metadata = await self._pool.ledger()

initial_offset = self._reader.offset
event_count = 0

futs = []

while (until_offset is None or
Expand Down Expand Up @@ -254,6 +279,10 @@ def _process_transaction_stream_event(self, event: Any, raise_events: bool) -> F
LOG.debug('evt recv: party %s, BIM %r (%s events)',
self.party, event.command_id[0:7], len(event.contract_events))

elif isinstance(event, ActiveContractSetEvent):
for contract_event in event.contract_events:
self._acs.handle_create(contract_event)

if raise_events:
fut = self.emit_event(event)
else:
Expand All @@ -268,8 +297,11 @@ def _process_transaction_stream_event(self, event: Any, raise_events: bool) -> F

# region Write path

def write_commands(self, commands: EventHandlerResponse, ignore_errors: bool = False,
workflow_id: Optional[str] = None) \
def write_commands(
self,
commands: EventHandlerResponse,
ignore_errors: bool = False,
workflow_id: 'Optional[str]' = None) \
-> Awaitable[None]:
"""
Submit a command or list of commands.
Expand All @@ -279,6 +311,8 @@ def write_commands(self, commands: EventHandlerResponse, ignore_errors: bool = F
:param ignore_errors:
Whether errors should be ignored for purposes of terminating the client. If ``True``,
then a failure to send this command does not necessarily end the client.
:param workflow_id:
The workflow ID to use to tag submitted commands.
:return:
An ``asyncio.Future`` that is resolved right before the corresponding side effects have
hit the event stream for this party. Synchronous errors are reported back immediately
Expand Down
9 changes: 9 additions & 0 deletions python/dazl/model/reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class ReadyEvent(OffsetEvent):
"""


class ActiveContractSetEvent(OffsetEvent):
"""
Event raised on initial read of the active contract set.
"""
def __init__(self, client, party, ledger_id, package_store, offset, contract_events):
super().__init__(client, party, None, ledger_id, package_store, offset)
self.contract_events = contract_events


class BaseTransactionEvent(OffsetEvent):
"""
Event raised when dazl encounters a new transaction. This is raised before any corresponding
Expand Down
8 changes: 7 additions & 1 deletion python/dazl/protocols/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ async def commands(self, command_payload: CommandPayload) -> None:
"""
raise NotImplementedError('commands must be implemented')

async def events(self, transaction_filter: TransactionFilter) -> Sequence[BaseEvent]:
async def active_contracts(self) -> 'Sequence[BaseEvent]':
"""
Return the current active contract set.
"""
raise NotImplementedError('active contract set fetch must be implemented')

async def events(self, transaction_filter: TransactionFilter) -> 'Sequence[BaseEvent]':
"""
Return events from a certain offset in the ledger. The number of blocks
returned is implementation-defined.
Expand Down
35 changes: 27 additions & 8 deletions python/dazl/protocols/v1/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
Support for the gRPC-based Ledger API.
"""
from asyncio import gather
from asyncio import gather, get_event_loop
from datetime import datetime
from threading import Thread, Event
from typing import Awaitable, Iterable, Optional, Sequence
Expand All @@ -15,8 +15,8 @@
from ... import LOG
from .._base import LedgerClient, _LedgerConnection, _LedgerConnectionContext
from .grpc_time import maybe_grpc_time_stream
from .pb_parse_event import serialize_request, to_transaction_events, \
BaseEventDeserializationContext
from .pb_parse_event import serialize_acs_request, serialize_transactions_request, \
to_acs_events, to_transaction_events, BaseEventDeserializationContext
from .pb_parse_metadata import parse_daml_metadata_pb, parse_archive_payload, find_dependencies
from ...model.core import Party
from ...model.ledger import LedgerMetadata, StaticTimeModel, RealTimeModel
Expand All @@ -32,20 +32,38 @@ class GRPCv1LedgerClient(LedgerClient):
def __init__(self, connection: 'GRPCv1Connection', ledger: LedgerMetadata, party: Party):
self.connection = safe_cast(GRPCv1Connection, connection)
self.ledger = safe_cast(LedgerMetadata, ledger)
self.party = safe_cast(str, party) # type: Party
self.party = Party(safe_cast(str, party))

def commands(self, commands: CommandPayload) -> None:
serializer = self.ledger.serializer
request = serializer.serialize_command_request(commands)
return self.connection.context.run_in_background(
lambda: self.connection.command_service.SubmitAndWait(request))

def active_contracts(self) -> 'Awaitable[Sequence[BaseEvent]]':
results_future = get_event_loop().create_future()
request = serialize_acs_request(self.ledger.ledger_id, self.party)

context = BaseEventDeserializationContext(
None, self.ledger.store, self.party, self.ledger.ledger_id)

def on_acs_done(fut):
try:
acs_stream = fut.result()
events = to_acs_events(context, acs_stream)
results_future.set_result(events)
except Exception as ex:
results_future.set_exception(ex)

acs_future = self.connection.context.run_in_background(
lambda: self.connection.active_contract_set_service.GetActiveContracts(request))
acs_future.add_done_callback(on_acs_done)
return results_future

def events(self, transaction_filter: TransactionFilter) \
-> Awaitable[Sequence[BaseEvent]]:
request = serialize_request(transaction_filter, self.party)

import asyncio
results_future = asyncio.get_event_loop().create_future()
results_future = get_event_loop().create_future()
request = serialize_transactions_request(transaction_filter, self.party)

context = BaseEventDeserializationContext(
None, self.ledger.store, self.party, transaction_filter.ledger_id)
Expand Down Expand Up @@ -244,6 +262,7 @@ def __init__(self,

self._closed = Event()
self._channel = grpc_create_channel(settings)
self.active_contract_set_service = G.ActiveContractsServiceStub(self._channel)
self.command_service = G.CommandServiceStub(self._channel)
self.transaction_service = G.TransactionServiceStub(self._channel)
self.package_service = G.PackageServiceStub(self._channel)
Expand Down
3 changes: 3 additions & 0 deletions python/dazl/protocols/v1/model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from ...._gen.com.digitalasset.ledger.api.v1.active_contracts_service_pb2 import \
GetActiveContractsRequest, GetActiveContractsResponse
from ...._gen.com.digitalasset.ledger.api.v1.active_contracts_service_pb2_grpc import ActiveContractsServiceStub
from ...._gen.com.digitalasset.ledger.api.v1.commands_pb2 import Command, CreateCommand, ExerciseCommand
from ...._gen.com.digitalasset.ledger.api.v1.command_service_pb2_grpc import CommandServiceStub
from ...._gen.com.digitalasset.ledger.api.v1.command_submission_service_pb2 import SubmitRequest
Expand Down
Loading

0 comments on commit 1758480

Please sign in to comment.