Skip to content

Commit

Permalink
Extract ExecuteContext as in/out argument
Browse files Browse the repository at this point in the history
So that we can pass out the parsed capabilities to control
retries. This also allows further code optimization.
  • Loading branch information
fantix committed May 31, 2024
1 parent 780d9e7 commit bf8f328
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 195 deletions.
6 changes: 6 additions & 0 deletions edgedb/_testbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,16 @@ def make_test_client(
database='edgedb',
user='edgedb',
password='test',
host=...,
port=...,
connection_class=...,
):
conargs = cls.get_connect_args(
cluster=cluster, database=database, user=user, password=password)
if host is not ...:
conargs['host'] = host
if port is not ...:
conargs['port'] = port
if connection_class is ...:
connection_class = (
asyncio_client.AsyncIOConnection
Expand Down
30 changes: 30 additions & 0 deletions edgedb/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,42 @@ class QueryContext(typing.NamedTuple):
retry_options: typing.Optional[options.RetryOptions]
state: typing.Optional[options.State]

def lower(
self, *, allow_capabilities: enums.Capability
) -> protocol.ExecuteContext:
return protocol.ExecuteContext(
query=self.query.query,
args=self.query.args,
kwargs=self.query.kwargs,
reg=self.cache.codecs_registry,
qc=self.cache.query_cache,
output_format=self.query_options.output_format,
expect_one=self.query_options.expect_one,
required_one=self.query_options.required_one,
allow_capabilities=allow_capabilities,
state=self.state.as_dict() if self.state else None,
)


class ExecuteContext(typing.NamedTuple):
query: QueryWithArgs
cache: QueryCache
state: typing.Optional[options.State]

def lower(
self, *, allow_capabilities: enums.Capability
) -> protocol.ExecuteContext:
return protocol.ExecuteContext(
query=self.query.query,
args=self.query.args,
kwargs=self.query.kwargs,
reg=self.cache.codecs_registry,
qc=self.cache.query_cache,
output_format=protocol.OutputFormat.NONE,
allow_capabilities=allow_capabilities,
state=self.state.as_dict() if self.state else None,
)


@dataclasses.dataclass
class DescribeContext:
Expand Down
63 changes: 10 additions & 53 deletions edgedb/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,7 @@ async def privileged_execute(
)
else:
await self._protocol.execute(
query=execute_context.query.query,
args=execute_context.query.args,
kwargs=execute_context.query.kwargs,
reg=execute_context.cache.codecs_registry,
qc=execute_context.cache.query_cache,
output_format=protocol.OutputFormat.NONE,
allow_capabilities=enums.Capability.ALL,
state=(
execute_context.state.as_dict()
if execute_context.state else None
),
execute_context.lower(allow_capabilities=enums.Capability.ALL)
)

def is_in_transaction(self) -> bool:
Expand All @@ -211,56 +201,31 @@ async def raw_query(self, query_context: abstract.QueryContext):
await self.connect()

reconnect = False
capabilities = None
i = 0
args = dict(
query=query_context.query.query,
args=query_context.query.args,
kwargs=query_context.query.kwargs,
reg=query_context.cache.codecs_registry,
qc=query_context.cache.query_cache,
output_format=query_context.query_options.output_format,
expect_one=query_context.query_options.expect_one,
required_one=query_context.query_options.required_one,
)
if self._protocol.is_legacy:
args["allow_capabilities"] = enums.Capability.LEGACY_EXECUTE
allow_capabilities = enums.Capability.LEGACY_EXECUTE
else:
args["allow_capabilities"] = enums.Capability.EXECUTE
if query_context.state is not None:
args["state"] = query_context.state.as_dict()
allow_capabilities = enums.Capability.EXECUTE
ctx = query_context.lower(allow_capabilities=allow_capabilities)
while True:
i += 1
try:
if reconnect:
await self.connect(single_attempt=True)
if self._protocol.is_legacy:
return await self._protocol.legacy_execute_anonymous(
**args
)
return await self._protocol.legacy_execute_anonymous(ctx)
else:
return await self._protocol.query(**args)
return await self._protocol.query(ctx)
except errors.EdgeDBError as e:
if query_context.retry_options is None:
raise
if not e.has_tag(errors.SHOULD_RETRY):
raise e
if capabilities is None:
cache_item = query_context.cache.query_cache.get(
query_context.query.query,
query_context.query_options.output_format,
implicit_limit=0,
inline_typenames=False,
inline_typeids=False,
expect_one=query_context.query_options.expect_one,
)
if cache_item is not None:
_, _, _, capabilities = cache_item
# A query is read-only if it has no capabilities i.e.
# capabilities == 0. Read-only queries are safe to retry.
# Explicit transaction conflicts as well.
if (
capabilities != 0
ctx.capabilities != 0
and not isinstance(e, errors.TransactionConflictError)
):
raise e
Expand All @@ -281,17 +246,9 @@ async def _execute(self, execute_context: abstract.ExecuteContext) -> None:
)
else:
await self._protocol.execute(
query=execute_context.query.query,
args=execute_context.query.args,
kwargs=execute_context.query.kwargs,
reg=execute_context.cache.codecs_registry,
qc=execute_context.cache.query_cache,
output_format=protocol.OutputFormat.NONE,
allow_capabilities=enums.Capability.EXECUTE,
state=(
execute_context.state.as_dict()
if execute_context.state else None
),
execute_context.lower(
allow_capabilities=enums.Capability.EXECUTE
)
)

async def describe(
Expand Down
24 changes: 24 additions & 0 deletions edgedb/protocol/protocol.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,30 @@ cdef class QueryCodecsCache:
BaseCodec in_type, BaseCodec out_type, int capabilities)


cdef class ExecuteContext:
cdef:
# Input arguments
str query
object args
object kwargs
CodecsRegistry reg
QueryCodecsCache qc
OutputFormat output_format
bint expect_one
bint required_one
int implicit_limit
bint inline_typenames
bint inline_typeids
uint64_t allow_capabilities
object state

# Contextual variables
bytes cardinality
BaseCodec in_dc
BaseCodec out_dc
readonly uint64_t capabilities


cdef class SansIOProtocol:

cdef:
Expand Down
Loading

0 comments on commit bf8f328

Please sign in to comment.