Skip to content

Commit

Permalink
fix: exception propagation for asynchronous QueryApi (#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Oct 12, 2022
1 parent 42432f9 commit ddb0f77
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
### Features
1. [#510](https://github.com/influxdata/influxdb-client-python/pull/510): Allow to use client's optional configs for initialization from file or environment properties

### Bug Fixes
1. [#512](https://github.com/influxdata/influxdb-client-python/pull/512): Exception propagation for asynchronous `QueryApi` [async/await]

## 1.33.0 [2022-09-29]

### Features
Expand Down
36 changes: 19 additions & 17 deletions influxdb_client/client/query_api_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from influxdb_client.client._base import _BaseQueryApi
from influxdb_client.client.flux_table import FluxRecord, TableList
from influxdb_client.client.query_api import QueryOptions
from influxdb_client.rest import _UTF_8_encoding
from influxdb_client.rest import _UTF_8_encoding, ApiException
from .._async.rest import RESTResponseAsync


class QueryApiAsync(_BaseQueryApi):
Expand Down Expand Up @@ -98,10 +99,7 @@ async def query(self, query: str, org=None, params: dict = None) -> TableList:
""" # noqa: E501
org = self._org_param(org)

response = await self._query_api.post_query_async(org=org,
query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False,
_return_http_data_only=True)
response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params))

return await self._to_tables_async(response, query_options=self._get_query_options())

Expand All @@ -118,10 +116,7 @@ async def query_stream(self, query: str, org=None, params: dict = None) -> Async
""" # noqa: E501
org = self._org_param(org)

response = await self._query_api.post_query_async(org=org,
query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False,
_return_http_data_only=True)
response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params))

return await self._to_flux_record_stream_async(response, query_options=self._get_query_options())

Expand Down Expand Up @@ -193,11 +188,8 @@ async def query_data_frame_stream(self, query: str, org=None, data_frame_index:
""" # noqa: E501
org = self._org_param(org)

response = await self._query_api.post_query_async(org=org,
query=self._create_query(query, self.default_dialect, params,
dataframe_query=True),
async_req=False, _preload_content=False,
_return_http_data_only=True)
response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params,
dataframe_query=True))

return await self._to_data_frame_stream_async(data_frame_index=data_frame_index, response=response,
query_options=self._get_query_options())
Expand All @@ -215,8 +207,18 @@ async def query_raw(self, query: str, org=None, dialect=_BaseQueryApi.default_di
:return: :class:`~str`
"""
org = self._org_param(org)
result = await self._query_api.post_query_async(org=org, query=self._create_query(query, dialect, params),
async_req=False, _preload_content=False,
_return_http_data_only=True)
result = await self._post_query(org=org, query=self._create_query(query, dialect, params))
raw_bytes = await result.read()
return raw_bytes.decode(_UTF_8_encoding)

async def _post_query(self, org, query):
response = await self._query_api.post_query_async(org=org,
query=query,
async_req=False,
_preload_content=False,
_return_http_data_only=True)
if not 200 <= response.status <= 299:
data = await response.read()
raise ApiException(http_resp=RESTResponseAsync(response, data))

return response
9 changes: 9 additions & 0 deletions tests/test_InfluxDBClientAsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,15 @@ async def test_parse_csv_with_new_lines_in_column(self, mocked):

self.assertEqual(4, len(records))

@async_test
async def test_query_exception_propagation(self):
await self.client.close()
self.client = InfluxDBClientAsync(url="http://localhost:8086", token="wrong", org="my-org")

with pytest.raises(InfluxDBError) as e:
await self.client.query_api().query("buckets()", "my-org")
self.assertEqual("unauthorized access", e.value.message)

async def _prepare_data(self, measurement: str):
_point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3)
_point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3)
Expand Down

0 comments on commit ddb0f77

Please sign in to comment.