Skip to content

Commit

Permalink
python: Logging and clarification of ledger ID discovery.
Browse files Browse the repository at this point in the history
  • Loading branch information
da-tanabe committed Mar 17, 2021
1 parent 54a40b7 commit 040b3b4
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 63 deletions.
125 changes: 67 additions & 58 deletions python/dazl/ledger/grpc/conn_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"""
This module contains the mapping between gRPC calls and Python/dazl types.
"""
from __future__ import annotations

import asyncio
from typing import AbstractSet, Any, AsyncIterable, Collection, Mapping, Optional, Sequence, Union
import uuid
Expand Down Expand Up @@ -52,7 +54,7 @@
)
from ..._gen.com.daml.ledger.api.v1.transaction_service_pb2_grpc import TransactionServiceStub
from ...damlast.daml_lf_1 import PackageRef, TypeConName
from ...prim import ContractData, ContractId, Party
from ...prim import LEDGER_STRING_REGEX, ContractData, ContractId, Party
from ...query import Query
from ..api_types import ArchiveEvent, Boundary, Command, CreateEvent, ExerciseResponse, PartyInfo
from ..config import Config
Expand All @@ -66,27 +68,28 @@


class Connection:
def __init__(self, config: "Config"):
def __init__(self, config: Config):
self._config = config
self._logger = config.logger
self._channel = create_channel(config)
self._codec = Codec(self)

@property
def config(self) -> "Config":
def config(self) -> Config:
return self._config

@property
def channel(self) -> "Channel":
def channel(self) -> Channel:
"""
Provides access to the underlying gRPC channel.
"""
return self._channel

@property
def codec(self) -> "Codec":
def codec(self) -> Codec:
return self._codec

async def __aenter__(self) -> "Connection":
async def __aenter__(self) -> Connection:
await self.open()
return self

Expand All @@ -104,9 +107,10 @@ async def open(self) -> None:
stub = LedgerIdentityServiceStub(self._channel)
response = await stub.GetLedgerIdentity(G_GetLedgerIdentityRequest())
if isinstance(self._config.access, PropertyBasedAccessConfig):
self._logger.info("Connected to gRPC Ledger API, ledger ID: %s", response.ledger_id)
self._config.access.ledger_id = response.ledger_id
else:
raise ValueError("token-based access must supply tokens that provide ledger ID")
raise ValueError("when using token-based access, the token must contain ledger ID")

async def close(self) -> None:
"""
Expand All @@ -119,10 +123,10 @@ async def close(self) -> None:

async def do_commands(
self,
commands: "Union[Command, Sequence[Command]]",
commands: Union[Command, Sequence[Command]],
*,
workflow_id: "Optional[str]" = None,
command_id: "Optional[str]" = None,
workflow_id: Optional[str] = None,
command_id: Optional[str] = None,
) -> "None":
"""
Submit one or more commands to the Ledger API.
Expand Down Expand Up @@ -161,12 +165,12 @@ async def do_commands(

async def create(
self,
template_id: "Union[str, TypeConName]",
payload: "ContractData",
template_id: Union[str, TypeConName],
payload: ContractData,
*,
workflow_id: "Optional[str]" = None,
command_id: "Optional[str]" = None,
) -> "CreateEvent":
workflow_id: Optional[str] = None,
command_id: Optional[str] = None,
) -> CreateEvent:
"""
Create a contract for a given template.
Expand Down Expand Up @@ -195,13 +199,13 @@ async def create(

async def exercise(
self,
contract_id: "ContractId",
contract_id: ContractId,
choice_name: str,
argument: "Optional[ContractData]" = None,
argument: Optional[ContractData] = None,
*,
workflow_id: "Optional[str]" = None,
command_id: "Optional[str]" = None,
) -> "ExerciseResponse":
workflow_id: Optional[str] = None,
command_id: Optional[str] = None,
) -> ExerciseResponse:
"""
Exercise a choice on a contract identified by its contract ID.
Expand All @@ -222,14 +226,14 @@ async def exercise(

async def create_and_exercise(
self,
template_id: "Union[str, TypeConName]",
payload: "ContractData",
template_id: Union[str, TypeConName],
payload: ContractData,
choice_name: str,
argument: "Optional[ContractData]" = None,
argument: Optional[ContractData] = None,
*,
workflow_id: "Optional[str]" = None,
command_id: "Optional[str]" = None,
) -> "ExerciseResponse":
workflow_id: Optional[str] = None,
command_id: Optional[str] = None,
) -> ExerciseResponse:
stub = CommandServiceStub(self.channel)

commands = [
Expand All @@ -244,13 +248,13 @@ async def create_and_exercise(

async def exercise_by_key(
self,
template_id: "Union[str, TypeConName]",
template_id: Union[str, TypeConName],
choice_name: str,
key: "Any",
argument: "Optional[ContractData]" = None,
key: Any,
argument: Optional[ContractData] = None,
*,
workflow_id: "Optional[str]" = None,
command_id: "Optional[str]" = None,
workflow_id: Optional[str] = None,
command_id: Optional[str] = None,
) -> "ExerciseResponse":
stub = CommandServiceStub(self.channel)

Expand All @@ -264,39 +268,44 @@ async def exercise_by_key(

return await self._codec.decode_exercise_response(response.transaction)

async def archive(self, contract_id: "ContractId") -> "ArchiveEvent":
async def archive(self, contract_id: ContractId) -> ArchiveEvent:
await self.exercise(contract_id, "Archive")
return ArchiveEvent(contract_id)

async def archive_by_key(self, template_id: str, key: "Any") -> "ArchiveEvent":
async def archive_by_key(self, template_id: str, key: Any) -> ArchiveEvent:
response = await self.exercise_by_key(template_id, "Archive", key)
return next(iter(event for event in response.events if isinstance(event, ArchiveEvent)))

def _ensure_act_as(self) -> "Party":
def _ensure_act_as(self) -> Party:
act_as_party = next(iter(self._config.access.act_as), None)
if not act_as_party:
raise ValueError("current access rights do not include any act-as parties")
return act_as_party

@staticmethod
def _workflow_id(workflow_id: str) -> str:
def _workflow_id(workflow_id: Optional[str]) -> Optional[str]:
if workflow_id:
# TODO: workflow_id must be a LedgerString; we could enforce some minimal validation
# here to make for a more obvious error than failing on the server-side
if not LEDGER_STRING_REGEX.match(workflow_id):
raise ValueError("workflow_id must be a valid ledger string")
return workflow_id
else:
return None

@staticmethod
def _command_id(command_id: "Optional[str]") -> str:
# TODO: command_id must be a LedgerString; we could enforce some minimal validation
# here to make for a more obvious error than failing on the server-side
return command_id or uuid.uuid4().hex
def _command_id(command_id: Optional[str]) -> str:
if command_id:
if not LEDGER_STRING_REGEX.match(command_id):
raise ValueError("command_id must be a valid ledger string")
return command_id
else:
return uuid.uuid4().hex

def _submit_and_wait_request(
self,
commands: "Collection[G_Command]",
workflow_id: "Optional[str]" = None,
command_id: "Optional[str]" = None,
) -> "G_SubmitAndWaitRequest":
commands: Collection[G_Command],
workflow_id: Optional[str] = None,
command_id: Optional[str] = None,
) -> G_SubmitAndWaitRequest:
return G_SubmitAndWaitRequest(
commands=G_Commands(
ledger_id=self._config.access.ledger_id,
Expand All @@ -314,16 +323,16 @@ def _submit_and_wait_request(

# region Read API

def query(self, template_id: str = "*", query: "Query" = None) -> "QueryStream":
def query(self, template_id: str = "*", query: Query = None) -> QueryStream:
return QueryStream(self, {template_id: query}, False)

def query_many(self, queries: "Optional[Mapping[str, Query]]" = None) -> "QueryStream":
def query_many(self, queries: Optional[Mapping[str, Query]] = None) -> QueryStream:
return QueryStream(self, queries, False)

def stream(self, template_id: str = "*", query: "Query" = None) -> "QueryStream":
def stream(self, template_id: str = "*", query: Query = None) -> QueryStream:
return QueryStream(self, {template_id: query}, True)

def stream_many(self, queries: "Optional[Mapping[str, Query]]" = None) -> "QueryStream":
def stream_many(self, queries: Optional[Mapping[str, Query]] = None) -> QueryStream:
return QueryStream(self, queries, True)

# endregion
Expand All @@ -341,7 +350,7 @@ async def allocate_party(
response = await stub.AllocateParty(request)
return Codec.decode_party_info(response.party_details)

async def list_known_parties(self) -> "Sequence[PartyInfo]":
async def list_known_parties(self) -> Sequence[PartyInfo]:
stub = PartyManagementServiceStub(self.channel)
response = await stub.ListKnownParties()
return [Codec.decode_party_info(pd) for pd in response.party_details]
Expand All @@ -350,15 +359,15 @@ async def list_known_parties(self) -> "Sequence[PartyInfo]":

# region Package Management calls

async def get_package(self, package_id: "PackageRef") -> bytes:
async def get_package(self, package_id: PackageRef) -> bytes:
stub = PackageServiceStub(self.channel)
request = G_GetPackageRequest(
ledger_id=self._config.access.ledger_id, package_id=str(package_id)
)
response = await stub.GetPackage(request)
return response.archive_payload

async def list_package_ids(self) -> "AbstractSet[PackageRef]":
async def list_package_ids(self) -> AbstractSet[PackageRef]:
stub = PackageServiceStub(self.channel)
request = G_ListPackagesRequest(ledger_id=self._config.access.ledger_id)
response = await stub.ListPackages(request)
Expand All @@ -376,8 +385,8 @@ async def upload_package(self, contents: bytes) -> None:
class QueryStream(QueryStreamBase):
def __init__(
self,
conn: "Connection",
queries: "Optional[Mapping[str, Query]]",
conn: Connection,
queries: Optional[Mapping[str, Query]],
continue_stream: bool,
):
self.conn = conn
Expand Down Expand Up @@ -443,8 +452,8 @@ async def items(self):
await self.close()

async def _acs_events(
self, filter_pb: "G_TransactionFilter"
) -> "AsyncIterable[Union[CreateEvent, Boundary]]":
self, filter_pb: G_TransactionFilter
) -> AsyncIterable[Union[CreateEvent, Boundary]]:
stub = ActiveContractsServiceStub(self.conn.channel)

request = G_GetActiveContractsRequest(
Expand All @@ -461,8 +470,8 @@ async def _acs_events(
yield Boundary(offset)

async def _tx_events(
self, filter_pb: "G_TransactionFilter", begin_offset: "Optional[str]"
) -> "AsyncIterable[Union[CreateEvent, ArchiveEvent, Boundary]]":
self, filter_pb: G_TransactionFilter, begin_offset: Optional[str]
) -> AsyncIterable[Union[CreateEvent, ArchiveEvent, Boundary]]:
stub = TransactionServiceStub(self.conn.channel)

request = G_GetTransactionsRequest(
Expand Down
9 changes: 8 additions & 1 deletion python/dazl/prim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
correspond to types over the Ledger API.
"""

from .basic import to_bool, to_str
from .basic import (
LEDGER_STRING_REGEX,
NAME_STRING_REGEX,
PACKAGE_ID_STRING_REGEX,
PARTY_ID_STRING_REGEX,
to_bool,
to_str,
)
from .complex import to_record, to_variant
from .contracts import ContractData, ContractId
from .datetime import (
Expand Down
22 changes: 18 additions & 4 deletions python/dazl/prim/basic.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import re
from typing import Any

__all__ = ["to_bool", "to_str"]
__all__ = [
"LEDGER_STRING_REGEX",
"NAME_STRING_REGEX",
"PACKAGE_ID_STRING_REGEX",
"PARTY_ID_STRING_REGEX",
"to_bool",
"to_str",
]

# Standard string regexes as defined here:
# https://github.com/digital-asset/daml/blob/a6da995ecb71004c34c88a4f4211543868cfde15/ledger-api/grpc-definitions/com/daml/ledger/api/v1/value.proto#L18-L21
NAME_STRING_REGEX = re.compile(r"[A-Za-z$_][A-Za-z0-9$_]*")
PACKAGE_ID_STRING_REGEX = re.compile(r"[A-Za-z0-9\-_ ]+")
PARTY_ID_STRING_REGEX = re.compile(r"[A-Za-z0-9:\-_ ]")
LEDGER_STRING_REGEX = re.compile(r"[A-Za-z0-9#:\-_/ ]")


def to_bool(obj: "Any") -> bool:
def to_bool(obj: Any) -> bool:
"""
Convert any of the common wire representations of a ``bool`` to a ``bool``.
"""
Expand All @@ -27,7 +41,7 @@ def to_bool(obj: "Any") -> bool:
raise ValueError(f"Could not parse as a boolean: {obj!r}")


def to_str(obj: "Any") -> str:
def to_str(obj: Any) -> str:
"""
Convert any object to a string. This simply calls ``str`` on the object to produce a string
representation.
Expand Down

0 comments on commit 040b3b4

Please sign in to comment.