Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Python CDK: Allow setting network adapter args on outgoing HTTP requests #4493

Merged
merged 12 commits into from
Jul 7, 2021
Merged
2 changes: 1 addition & 1 deletion .github/workflows/publish-cdk-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Build CDK Package
run: ./gradlew --no-daemon :airbyte-cdk:python:build
run: ./gradlew --no-daemon --no-build-cache :airbyte-cdk:python:build
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #4507

- name: Add Failure Comment
if: github.event.inputs.comment-id && !success()
uses: peter-evans/create-or-update-comment@v1
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.5
Allow specifying keyword arguments to be sent on a request made by an HTTP stream: https://github.com/airbytehq/airbyte/pull/4493

## 0.1.4
Allow to use Python 3.7.0: https://github.com/airbytehq/airbyte/pull/3566

Expand Down
24 changes: 18 additions & 6 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ def request_body_json(
"""
return None

def request_kwargs(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
"""
Override to return a mapping of keyword arguments to be used when creating the HTTP request.
Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
"""
return {}

@abstractmethod
def parse_response(
self,
Expand Down Expand Up @@ -166,13 +179,13 @@ def _create_prepared_request(
# TODO support non-json bodies
args["json"] = json

return requests.Request(**args).prepare()
return self._session.prepare_request(requests.Request(**args))

# TODO allow configuring these parameters. If we can get this into the requests library, then we can do it without the ugly exception hacks
# see https://github.com/litl/backoff/pull/122
@default_backoff_handler(max_tries=5, factor=5)
@user_defined_backoff_handler(max_tries=5)
def _send_request(self, request: requests.PreparedRequest) -> requests.Response:
def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
"""
Wraps sending the request in rate limit and error handlers.

Expand All @@ -190,9 +203,8 @@ def _send_request(self, request: requests.PreparedRequest) -> requests.Response:
Unexpected transient exceptions use the default backoff parameters.
Unexpected persistent exceptions are not handled and will cause the sync to fail.
"""
response: requests.Response = self._session.send(request)
response: requests.Response = self._session.send(request, **request_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see these changes as potentially destructive.
The problem is that we now provide multiple ways for users to define request parameters (headers, path, params, auth). If user will do this then he needs to reimplement the logic of request_params, request_headers, etc, as there is no way to know default values in request_kwargs call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I checked the source code and it seems that the only parameters that we can use here are:

  • allow_redirects
  • stream
  • timeout
  • verify
  • cert
  • proxies
    So there is no conflict with request args, but it still not possible to customize auth (FYI)

if self.should_retry(response):

custom_backoff_time = self.backoff_time(response)
if custom_backoff_time:
raise UserDefinedBackoffException(backoff=custom_backoff_time, request=request, response=response)
Expand Down Expand Up @@ -224,8 +236,8 @@ def read_records(
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)

response = self._send_request(request)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand why you split kwargs here? can't you pass it directly to PreparedRequest?

Copy link
Contributor

@keu keu Jul 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see you tried to mimic Session.request() logic, what is the reason to use PreparedRequest?
is it just to not call request_body_json and other functions multiple times in case of retry? what if this is what user wants? i.e. what if the request fails because of one of its parameters that need to be changed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using Session.prepare_request instead of request.prepare() or just Session.request(), has benefit of sharing session states like cookies across requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to use prepared request for Session.send

good idea, changed to use Session.prepare_request

response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

next_page_token = self.next_page_token(response)
Expand Down
5 changes: 5 additions & 0 deletions airbyte-cdk/python/docs/concepts/http-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ errors. It is not currently possible to specify a rate limit Airbyte should adhe
### Stream Slicing

When implementing [stream slicing](incremental-stream.md#streamstream_slices) in an `HTTPStream` each Slice is equivalent to a HTTP request; the stream will make one request per element returned by the `stream_slices` function. The current slice being read is passed into every other method in `HttpStream` e.g: `request_params`, `request_headers`, `path`, etc.. to be injected into a request. This allows you to dynamically determine the output of the `request_params`, `path`, and other functions to read the input slice and return the appropriate value.

### Network Adapter Keyword arguments
If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
be returned as a keyword argument.
11 changes: 2 additions & 9 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.4",
version="0.1.5",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -73,14 +73,7 @@
"requests",
],
python_requires=">=3.7.0",
extras_require={
"dev": [
"MyPy==0.812",
"pytest",
"pytest-cov",
"pytest-mock",
]
},
extras_require={"dev": ["MyPy==0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"]},
entry_points={
"console_scripts": ["base-python=base_python.entrypoint:main"],
},
Expand Down
13 changes: 13 additions & 0 deletions airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@


from typing import Any, Iterable, Mapping, Optional
from unittest.mock import ANY

import pytest
import requests
Expand Down Expand Up @@ -60,6 +61,18 @@ def parse_response(
yield stubResp


def test_request_kwargs_used(mocker, requests_mock):
stream = StubBasicReadHttpStream()
request_kwargs = {"cert": None, "proxies": "google.com"}
mocker.patch.object(stream, "request_kwargs", return_value=request_kwargs)
mocker.patch.object(stream._session, "send", wraps=stream._session.send)
requests_mock.register_uri("GET", stream.url_base)

list(stream.read_records(sync_mode=SyncMode.full_refresh))

stream._session.send.assert_any_call(ANY, **request_kwargs)


def test_stub_basic_read_http_stream_read_records(mocker):
stream = StubBasicReadHttpStream()
blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.
Expand Down