diff --git a/.github/workflows/publish-cdk-command.yml b/.github/workflows/publish-cdk-command.yml index ee12e965cb3c..7e9b33f3318b 100644 --- a/.github/workflows/publish-cdk-command.yml +++ b/.github/workflows/publish-cdk-command.yml @@ -27,7 +27,7 @@ jobs: - name: Checkout Airbyte uses: actions/checkout@v2 - name: Build CDK Package - run: ./gradlew --no-daemon :airbyte-cdk:python:build + run: ./gradlew --no-daemon --no-build-cache :airbyte-cdk:python:build - name: Add Failure Comment if: github.event.inputs.comment-id && !success() uses: peter-evans/create-or-update-comment@v1 diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index a8f04ed36fcb..fda59b2cd649 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.5 +Allow specifying keyword arguments to be sent on a request made by an HTTP stream: https://github.com/airbytehq/airbyte/pull/4493 + ## 0.1.4 Allow to use Python 3.7.0: https://github.com/airbytehq/airbyte/pull/3566 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index e248a9fe262d..60aaa95e153f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -118,6 +118,19 @@ def request_body_json( """ return None + def request_kwargs( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Mapping[str, Any]: + """ + Override to return a mapping of keyword arguments to be used when creating the HTTP request. + Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from + this method. Note that these options do not conflict with request-level options such as headers, request params, etc.. + """ + return {} + @abstractmethod def parse_response( self, @@ -166,13 +179,13 @@ def _create_prepared_request( # TODO support non-json bodies args["json"] = json - return requests.Request(**args).prepare() + return self._session.prepare_request(requests.Request(**args)) # TODO allow configuring these parameters. If we can get this into the requests library, then we can do it without the ugly exception hacks # see https://github.com/litl/backoff/pull/122 @default_backoff_handler(max_tries=5, factor=5) @user_defined_backoff_handler(max_tries=5) - def _send_request(self, request: requests.PreparedRequest) -> requests.Response: + def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: """ Wraps sending the request in rate limit and error handlers. @@ -190,9 +203,8 @@ def _send_request(self, request: requests.PreparedRequest) -> requests.Response: Unexpected transient exceptions use the default backoff parameters. Unexpected persistent exceptions are not handled and will cause the sync to fail. """ - response: requests.Response = self._session.send(request) + response: requests.Response = self._session.send(request, **request_kwargs) if self.should_retry(response): - custom_backoff_time = self.backoff_time(response) if custom_backoff_time: raise UserDefinedBackoffException(backoff=custom_backoff_time, request=request, response=response) @@ -224,8 +236,8 @@ def read_records( params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), ) - - response = self._send_request(request) + request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + response = self._send_request(request, request_kwargs) yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) next_page_token = self.next_page_token(response) diff --git a/airbyte-cdk/python/docs/concepts/http-streams.md b/airbyte-cdk/python/docs/concepts/http-streams.md index 9d15c2f72e1f..12fda0eca2cb 100644 --- a/airbyte-cdk/python/docs/concepts/http-streams.md +++ b/airbyte-cdk/python/docs/concepts/http-streams.md @@ -71,3 +71,8 @@ errors. It is not currently possible to specify a rate limit Airbyte should adhe ### Stream Slicing When implementing [stream slicing](incremental-stream.md#streamstream_slices) in an `HTTPStream` each Slice is equivalent to a HTTP request; the stream will make one request per element returned by the `stream_slices` function. The current slice being read is passed into every other method in `HttpStream` e.g: `request_params`, `request_headers`, `path`, etc.. to be injected into a request. This allows you to dynamically determine the output of the `request_params`, `path`, and other functions to read the input slice and return the appropriate value. + +### Network Adapter Keyword arguments +If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc.. +override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can +be returned as a keyword argument. diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index b45170db9095..3f193b944ad7 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.4", + version="0.1.5", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", @@ -73,14 +73,7 @@ "requests", ], python_requires=">=3.7.0", - extras_require={ - "dev": [ - "MyPy==0.812", - "pytest", - "pytest-cov", - "pytest-mock", - ] - }, + extras_require={"dev": ["MyPy==0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"]}, entry_points={ "console_scripts": ["base-python=base_python.entrypoint:main"], }, diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index d70fa2b0d560..997157c2da08 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -24,6 +24,7 @@ from typing import Any, Iterable, Mapping, Optional +from unittest.mock import ANY import pytest import requests @@ -60,6 +61,18 @@ def parse_response( yield stubResp +def test_request_kwargs_used(mocker, requests_mock): + stream = StubBasicReadHttpStream() + request_kwargs = {"cert": None, "proxies": "google.com"} + mocker.patch.object(stream, "request_kwargs", return_value=request_kwargs) + mocker.patch.object(stream._session, "send", wraps=stream._session.send) + requests_mock.register_uri("GET", stream.url_base) + + list(stream.read_records(sync_mode=SyncMode.full_refresh)) + + stream._session.send.assert_any_call(ANY, **request_kwargs) + + def test_stub_basic_read_http_stream_read_records(mocker): stream = StubBasicReadHttpStream() blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.