-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[low-code] convert request.body to a dict when converting to AirbyteLogMessage #20557
Changes from 2 commits
2398adf
93aabe7
6d8ccd7
0adf45c
c5547b8
f40a56c
84bd5dc
0912538
b6f62d6
a6d0a93
fa4ea8b
dcb8390
3452bba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -408,20 +408,34 @@ def state(self, value: StreamState): | |
def parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]: | ||
# Only emit requests and responses when running in debug mode | ||
if self.logger.isEnabledFor(logging.DEBUG): | ||
yield self._create_trace_message_from_request(request) | ||
yield self._create_trace_message_from_response(response) | ||
yield prepared_request_to_airbyte_message(request) | ||
yield response_to_airbyte_message(response) | ||
# Not great to need to call _read_pages which is a private method | ||
# A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester | ||
yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state) | ||
|
||
def _create_trace_message_from_request(self, request: requests.PreparedRequest): | ||
# FIXME: this should return some sort of trace message | ||
request_dict = {"url": request.url, "http_method": request.method, "headers": dict(request.headers), "body": request.body} | ||
log_message = filter_secrets(f"request:{json.dumps(request_dict)}") | ||
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) | ||
|
||
def _create_trace_message_from_response(self, response: requests.Response): | ||
# FIXME: this should return some sort of trace message | ||
response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code} | ||
log_message = filter_secrets(f"response:{json.dumps(response_dict)}") | ||
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) | ||
|
||
def prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage: | ||
# FIXME: this should return some sort of trace message | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to keep them FIXMEs for now 😞 |
||
request_dict = { | ||
"url": request.url, | ||
"http_method": request.method, | ||
"headers": dict(request.headers), | ||
"body": _body_binary_string_to_dict(request.body), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of assigning the body directly, delegate to |
||
} | ||
log_message = filter_secrets(f"request:{json.dumps(request_dict)}") | ||
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) | ||
|
||
|
||
def _body_binary_string_to_dict(body_str) -> Optional[Mapping[str, str]]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we have typing for this parameter? |
||
if body_str: | ||
return json.loads(body_str.decode()) | ||
else: | ||
return None | ||
|
||
|
||
def response_to_airbyte_message(response: requests.Response) -> AirbyteMessage: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved to a function for consistency and ease of testing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those methods aren't used anywhere else except from the tests though. Should we indicate them as private using the Saying that makes me think it is weird that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call. prepended both with |
||
# FIXME: this should return some sort of trace message | ||
response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code} | ||
log_message = filter_secrets(f"response:{json.dumps(response_dict)}") | ||
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -155,6 +155,7 @@ def test_read_stream(): | |
request = { | ||
"url": "https://demonslayers.com/api/v1/hashiras?era=taisho", | ||
"headers": {"Content-Type": "application/json"}, | ||
"http_method": "GET", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixes the tests There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we'll be able to merge this PR when this is merged in so CI fails on broken tests #20217 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing that out - was looking into it for a PR I'm working on right now 😅 |
||
"body": {"custom": "field"}, | ||
} | ||
response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} | ||
|
@@ -165,6 +166,7 @@ def test_read_stream(): | |
parameters={"era": ["taisho"]}, | ||
headers={"Content-Type": "application/json"}, | ||
body={"custom": "field"}, | ||
http_method="GET", | ||
), | ||
response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), | ||
records=[{"name": "Shinobu Kocho"}, {"name": "Muichiro Tokito"}], | ||
|
@@ -175,6 +177,7 @@ def test_read_stream(): | |
parameters={"era": ["taisho"]}, | ||
headers={"Content-Type": "application/json"}, | ||
body={"custom": "field"}, | ||
http_method="GET", | ||
), | ||
response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), | ||
records=[{"name": "Mitsuri Kanroji"}], | ||
|
@@ -210,6 +213,7 @@ def test_read_stream_with_logs(): | |
"url": "https://demonslayers.com/api/v1/hashiras?era=taisho", | ||
"headers": {"Content-Type": "application/json"}, | ||
"body": {"custom": "field"}, | ||
"http_method": "GET", | ||
} | ||
response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} | ||
expected_pages = [ | ||
|
@@ -219,6 +223,7 @@ def test_read_stream_with_logs(): | |
parameters={"era": ["taisho"]}, | ||
headers={"Content-Type": "application/json"}, | ||
body={"custom": "field"}, | ||
http_method="GET", | ||
), | ||
response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), | ||
records=[{"name": "Shinobu Kocho"}, {"name": "Muichiro Tokito"}], | ||
|
@@ -229,6 +234,7 @@ def test_read_stream_with_logs(): | |
parameters={"era": ["taisho"]}, | ||
headers={"Content-Type": "application/json"}, | ||
body={"custom": "field"}, | ||
http_method="GET", | ||
), | ||
response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), | ||
records=[{"name": "Mitsuri Kanroji"}], | ||
|
@@ -272,6 +278,7 @@ def test_read_stream_no_records(): | |
"url": "https://demonslayers.com/api/v1/hashiras?era=taisho", | ||
"headers": {"Content-Type": "application/json"}, | ||
"body": {"custom": "field"}, | ||
"http_method": "GET", | ||
} | ||
response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} | ||
expected_pages = [ | ||
|
@@ -281,6 +288,7 @@ def test_read_stream_no_records(): | |
parameters={"era": ["taisho"]}, | ||
headers={"Content-Type": "application/json"}, | ||
body={"custom": "field"}, | ||
http_method="GET", | ||
), | ||
response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), | ||
records=[], | ||
|
@@ -291,6 +299,7 @@ def test_read_stream_no_records(): | |
parameters={"era": ["taisho"]}, | ||
headers={"Content-Type": "application/json"}, | ||
body={"custom": "field"}, | ||
http_method="GET", | ||
), | ||
response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), | ||
records=[], | ||
|
@@ -388,15 +397,19 @@ def test_read_stream_returns_error_if_stream_does_not_exist(): | |
pytest.param( | ||
'request:{"url": "https://nichirin.com/v1/swords?color=orange", "http_method": "PUT", "headers": {"field": "name"}, "body":{"key": "value"}}', | ||
HttpRequest( | ||
url="https://nichirin.com/v1/swords", parameters={"color": ["orange"]}, headers={"field": "name"}, body={"key": "value"}, | ||
url="https://nichirin.com/v1/swords", | ||
parameters={"color": ["orange"]}, | ||
headers={"field": "name"}, | ||
body={"key": "value"}, | ||
http_method="PUT", | ||
), | ||
id="test_create_request_with_all_fields", | ||
), | ||
pytest.param( | ||
'request:{"url": "https://nichirin.com/v1/swords?color=orange", "http_method": "GET", "headers": {"field": "name"}}', | ||
HttpRequest(url="https://nichirin.com/v1/swords", parameters={"color": ["orange"]}, headers={"field": "name"}, | ||
http_method="GET"), | ||
HttpRequest( | ||
url="https://nichirin.com/v1/swords", parameters={"color": ["orange"]}, headers={"field": "name"}, http_method="GET" | ||
), | ||
id="test_create_request_with_no_body", | ||
), | ||
pytest.param( | ||
|
@@ -409,6 +422,11 @@ def test_read_stream_returns_error_if_stream_does_not_exist(): | |
HttpRequest(url="https://nichirin.com/v1/swords", headers={"field": "name"}, body={"key": "value"}, http_method="PUT"), | ||
id="test_create_request_with_no_parameters", | ||
), | ||
pytest.param( | ||
'request:{"url": "https://nichirin.com/v1/swords", "http_method": "POST", "headers": {"field": "name"}, "body":null}', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a test where |
||
HttpRequest(url="https://nichirin.com/v1/swords", headers={"field": "name"}, body=None, http_method="POST"), | ||
id="test_create_request_with_null_body", | ||
), | ||
pytest.param("request:{invalid_json: }", None, id="test_invalid_json_still_does_not_crash"), | ||
pytest.param("just a regular log message", None, id="test_no_request:_prefix_does_not_crash"), | ||
], | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename