From 07faac075d7d35ce7778947fed16a252a3aa3ba8 Mon Sep 17 00:00:00 2001 From: antixar Date: Fri, 4 Feb 2022 11:09:17 +0200 Subject: [PATCH 1/6] restruct streams descriptions --- .../source_salesforce/api.py | 58 ++++++++++--------- .../source_salesforce/source.py | 34 ++++++----- .../source_salesforce/streams.py | 53 +++++++++++------ .../source_salesforce/utils.py | 2 +- .../source-salesforce/unit_tests/unit_test.py | 44 +++++++------- 5 files changed, 111 insertions(+), 80 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py index ad5300c29d00..f469a19cc292 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py @@ -11,7 +11,7 @@ from .exceptions import TypeSalesforceException from .rate_limiting import default_backoff_handler -from .utils import filter_streams +from .utils import filter_streams_by_criteria STRING_TYPES = [ "byte", @@ -175,14 +175,14 @@ class Salesforce: version = "v52.0" def __init__( - self, - refresh_token: str = None, - token: str = None, - client_id: str = None, - client_secret: str = None, - is_sandbox: bool = None, - start_date: str = None, - **kwargs, + self, + refresh_token: str = None, + token: str = None, + client_id: str = None, + client_secret: str = None, + is_sandbox: bool = None, + start_date: str = None, + **kwargs, ): self.refresh_token = refresh_token self.token = token @@ -191,7 +191,9 @@ def __init__( self.access_token = None self.instance_url = None self.session = requests.Session() - self.is_sandbox = is_sandbox is True or (isinstance(is_sandbox, str) and is_sandbox.lower() == "true") + self.is_sandbox = is_sandbox in [True, "true"] + if self.is_sandbox: + self.logger.info("using SANDBOX of Salesforce") self.start_date = start_date def _get_standard_headers(self): @@ -206,34 +208,38 @@ def filter_streams(self, stream_name: str) -> bool: return False return True - def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None): - salesforce_objects = self.describe()["sobjects"] - stream_objects = [] - for stream_object in salesforce_objects: + def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> Mapping[str, Any]: + """Selects all validated streams with additional filtering: + 1) skip all sobjects with negative value of the flag "queryable" + 2) user can set search criterias of necessary streams + 3) selection by catalog settings + """ + stream_objects = {} + for stream_object in self.describe()["sobjects"]: if stream_object["queryable"]: - stream_objects.append(stream_object) + stream_objects[stream_object.pop("name")] = stream_object else: self.logger.warn(f"Stream {stream_object['name']} is not queryable and will be ignored.") - stream_names = [stream_object["name"] for stream_object in stream_objects] if catalog: - return [configured_stream.stream.name for configured_stream in catalog.streams], stream_objects + return {configured_stream.stream.name: stream_objects[configured_stream.stream.name] for configured_stream in catalog.streams if + configured_stream.stream.name in stream_objects} + stream_names = list(stream_objects.keys()) if config.get("streams_criteria"): filtered_stream_list = [] for stream_criteria in config["streams_criteria"]: - filtered_stream_list += filter_streams( + filtered_stream_list += filter_streams_by_criteria( streams_list=stream_names, search_word=stream_criteria["value"], search_criteria=stream_criteria["criteria"] ) stream_names = list(set(filtered_stream_list)) validated_streams = [stream_name for stream_name in stream_names if self.filter_streams(stream_name)] - validated_stream_objects = [stream_object for stream_object in stream_objects if stream_object["name"] in validated_streams] - return validated_streams, validated_stream_objects + return {stream_name: sobject_options for stream_name, sobject_options in stream_objects.items() if stream_name in validated_streams} @default_backoff_handler(max_tries=5, factor=15) def _make_request( - self, http_method: str, url: str, headers: dict = None, body: dict = None, stream: bool = False, params: dict = None + self, http_method: str, url: str, headers: dict = None, body: dict = None, stream: bool = False, params: dict = None ) -> requests.models.Response: try: if http_method == "GET": @@ -261,7 +267,7 @@ def login(self): self.access_token = auth["access_token"] self.instance_url = auth["instance_url"] - def describe(self, sobject: str = None, stream_objects: List = None) -> Mapping[str, Any]: + def describe(self, sobject: str = None, sobject_options: Mapping[str, Any] = None) -> Mapping[str, Any]: """Describes all objects or a specific object""" headers = self._get_standard_headers() @@ -269,12 +275,12 @@ def describe(self, sobject: str = None, stream_objects: List = None) -> Mapping[ url = f"{self.instance_url}/services/data/{self.version}/{endpoint}" resp = self._make_request("GET", url, headers=headers) - if resp.status_code == 404: - self.logger.error(f"Filtered stream objects: {stream_objects}") + if resp.status_code == 404 and sobject: + self.logger.error(f"not found a description for the sobject '{sobject}'. Sobject options: {sobject_options}") return resp.json() - def generate_schema(self, stream_name: str = None, stream_objects: List = None) -> Mapping[str, Any]: - response = self.describe(stream_name, stream_objects) + def generate_schema(self, stream_name: str = None, stream_options: Mapping[str, Any] = None) -> Mapping[str, Any]: + response = self.describe(stream_name, stream_options) schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}} for field in response["fields"]: schema["properties"][field["name"]] = self.field_to_property_schema(field) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index e7edfb658f6c..a13d7218e1b1 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -14,7 +14,8 @@ from requests import codes, exceptions from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce -from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream +from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, \ + SalesforceStream class SourceSalesforce(AbstractSource): @@ -37,21 +38,22 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> @classmethod def generate_streams( - cls, - config: Mapping[str, Any], - stream_names: List[str], - sf_object: Salesforce, - state: Mapping[str, Any] = None, - stream_objects: List = None, + cls, + config: Mapping[str, Any], + stream_objects: Mapping[str, Any], + sf_object: Salesforce, + state: Mapping[str, Any] = None, ) -> List[Stream]: """ "Generates a list of stream by their names. It can be used for different tests too""" authenticator = TokenAuthenticator(sf_object.access_token) streams = [] - for stream_name in stream_names: - streams_kwargs = {} + for stream_name, sobject_options in stream_objects.items(): + streams_kwargs = { + "sobject_options": sobject_options + } stream_state = state.get(stream_name, {}) if state else {} - selected_properties = sf_object.generate_schema(stream_name, stream_objects).get("properties", {}) + selected_properties = sf_object.generate_schema(stream_name, sobject_options).get("properties", {}) # Salesforce BULK API currently does not support loading fields with data type base64 and compound data properties_not_supported_by_bulk = { key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"] @@ -77,11 +79,17 @@ def generate_streams( def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None, state: Mapping[str, Any] = None) -> List[Stream]: sf = self._get_sf_object(config) - stream_names, stream_objects = sf.get_validated_streams(config=config, catalog=catalog) - return self.generate_streams(config, stream_names, sf, state=state, stream_objects=stream_objects) + stream_objects = sf.get_validated_streams(config=config, catalog=catalog) + + stream_objects = {"CategoryNode": stream_objects.pop("CategoryNode")} + # for stream_name in stream_objects: + # if stream_name["name"] in stream_names: + # raise Exception(str(stream_name)) + return self.generate_streams(config, stream_objects, sf, state=state) def read( - self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, + state: MutableMapping[str, Any] = None ) -> Iterator[AirbyteMessage]: """ Overwritten to dynamically receive only those streams that are necessary for reading for significant speed gains diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index dc10483aa099..7a8f48e302db 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -25,12 +25,14 @@ class SalesforceStream(HttpStream, ABC): page_size = 2000 transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs): + def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, sobject_options: Mapping[str, Any] = None, schema: dict = None, + **kwargs): super().__init__(**kwargs) self.sf_api = sf_api self.pk = pk self.stream_name = stream_name self.schema = schema + self.sobject_options = sobject_options @property def name(self) -> str: @@ -57,7 +59,7 @@ def next_page_token(self, response: requests.Response) -> str: return response_data.get("nextRecordsUrl") def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: """ Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm @@ -106,7 +108,8 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: class BulkSalesforceStream(SalesforceStream): - page_size = 30000 + # page_size = 30000 + page_size = 10 DEFAULT_WAIT_TIMEOUT_MINS = 10 MAX_CHECK_INTERVAL_SECONDS = 2.0 MAX_RETRY_NUMBER = 3 @@ -135,7 +138,7 @@ def _send_http_request(self, method: str, url: str, json: dict = None): headers = self.authenticator.get_auth_header() response = self._session.request(method, url=url, headers=headers, json=json) if response.status_code not in [200, 204]: - self.logger.error(f"error body: {response.text}") + self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}") response.raise_for_status() return response @@ -144,11 +147,15 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.htm """ json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"} - + self.logger.warning(str(json)) + # json = {'operation': 'queryAll', + # 'query': 'SELECT Id,ParentId,MasterLabel,SortOrder,SortStyle,CreatedDate,CreatedById,LastModifiedDate,LastModifiedById,SystemModstamp FROM CategoryNode ORDER BY SystemModstamp ASC LIMIT 10', + # 'contentType': 'CSV', 'columnDelimiter': 'COMMA', 'lineEnding': 'LF'} try: response = self._send_http_request("POST", url, json=json) job_id = response.json()["id"] - self.logger.info(f"Created Job: {job_id} to sync {self.name}") + # self.logger.info(f"Created Job: {job_id} to sync {self.name}") + self.logger.warning(f"Created Job: {job_id} to sync {self.name}, {json}") return job_id except exceptions.HTTPError as error: if error.response.status_code in [codes.FORBIDDEN, codes.BAD_REQUEST]: @@ -168,13 +175,16 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: error_code = error_data.get("errorCode") error_message = error_data.get("message", "") if error_message == "Selecting compound data not supported in Bulk Query" or ( - error_code == "INVALIDENTITY" and "is not supported by the Bulk API" in error_message + error_code == "INVALIDENTITY" and "is not supported by the Bulk API" in error_message ): - self.logger.error(f"Cannot receive data for stream '{self.name}' using BULK API, error message: '{error_message}'") + self.logger.error(f"Cannot receive data for stream '{self.name}' using BULK API, " + f"sobject options: {self.sobject_options}, error message: '{error_message}'") elif error.response.status_code == codes.FORBIDDEN and error_code != "REQUEST_LIMIT_EXCEEDED": - self.logger.error(f"Cannot receive data for stream '{self.name}', error message: '{error_message}'") + self.logger.error(f"Cannot receive data for stream '{self.name}' ," + f"sobject options: {self.sobject_options}, error message: '{error_message}'") elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"): - self.logger.error(f"The stream '{self.name}' is not queryable, error message: '{error_message}'") + self.logger.error(f"The stream '{self.name}' is not queryable, " + f"sobject options: {self.sobject_options}, error message: '{error_message}'") else: raise error else: @@ -197,7 +207,12 @@ def wait_for_job(self, url: str) -> str: if job_status in ["JobComplete", "Aborted", "Failed"]: if job_status != "JobComplete": # this is only job metadata without payload - self.logger.error(f"JobStatus: {job_status}, full job response: {job_info}") + error_message = job_info.get("errorMessage") + if not error_message: + # not all failed response can have "errorMessage" and we need to print full response body + error_message = job_info + self.logger.error(f"JobStatus: {job_status}, " + f"sobject options: {self.sobject_options}, error message: '{error_message}'") return job_status @@ -263,7 +278,7 @@ def next_page_token(self, last_record: dict) -> str: return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' " def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: """ Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm @@ -280,11 +295,11 @@ def request_params( return {"q": query} def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: stream_state = stream_state or {} next_page_token = None @@ -326,7 +341,7 @@ def format_start_date(start_date: Optional[str]) -> Optional[str]: return pendulum.parse(start_date).strftime("%Y-%m-%dT%H:%M:%SZ") def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: if next_page_token: """ @@ -367,7 +382,7 @@ def next_page_token(self, last_record: dict) -> str: return last_record[self.cursor_field] def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: selected_properties = self.get_json_schema().get("properties", {}) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/utils.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/utils.py index eacf33298a24..ec73eaceb0ad 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/utils.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/utils.py @@ -3,7 +3,7 @@ # -def filter_streams(streams_list: list, search_word: str, search_criteria: str): +def filter_streams_by_criteria(streams_list: list, search_word: str, search_criteria: str): search_word = search_word.lower() criteria_mapping = { "starts with": lambda stream_name: stream_name.startswith(search_word), diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index b70b9ddbf130..ce89342a9565 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -12,9 +12,11 @@ from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type from requests.exceptions import HTTPError + from source_salesforce.api import Salesforce from source_salesforce.source import SourceSalesforce -from source_salesforce.streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream +from source_salesforce.streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, \ + IncrementalSalesforceStream, SalesforceStream @pytest.fixture(scope="module") @@ -93,7 +95,7 @@ def stream_api_v2(stream_config): def _generate_stream(stream_name, stream_config, stream_api, state=None): - return SourceSalesforce.generate_streams(stream_config, [stream_name], stream_api, state=state)[0] + return SourceSalesforce.generate_streams(stream_config, {stream_name: None}, stream_api, state=state)[0] def test_bulk_sync_creation_failed(stream_config, stream_api): @@ -148,7 +150,7 @@ def test_stream_has_no_state_bulk_api_should_be_used(stream_config, stream_api): def test_bulk_sync_pagination(item_number, stream_config, stream_api): stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) test_ids = [i for i in range(1, item_number)] - pages = [test_ids[i : i + stream.page_size] for i in range(0, len(test_ids), stream.page_size)] + pages = [test_ids[i: i + stream.page_size] for i in range(0, len(test_ids), stream.page_size)] if not pages: pages = [[]] with requests_mock.Mocker() as m: @@ -244,12 +246,12 @@ def test_bulk_sync_failed_retry(stream_config, stream_api): ], ) def test_stream_start_date( - start_date_provided, - stream_name, - expected_start_date, - stream_config, - stream_api, - stream_config_without_start_date, + start_date_provided, + stream_name, + expected_start_date, + stream_config, + stream_api, + stream_config_without_start_date, ): if start_date_provided: stream = _generate_stream(stream_name, stream_config, stream_api) @@ -288,25 +290,25 @@ def test_download_data_filter_null_bytes(stream_config, stream_api): [ ([{"criteria": "exacts", "value": "Account"}], ["Account"]), ( - [{"criteria": "not exacts", "value": "CustomStreamHistory"}], - ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream"], + [{"criteria": "not exacts", "value": "CustomStreamHistory"}], + ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream"], ), ([{"criteria": "starts with", "value": "lead"}], ["Leads", "LeadHistory"]), ( - [{"criteria": "starts not with", "value": "custom"}], - ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"], + [{"criteria": "starts not with", "value": "custom"}], + ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"], ), ([{"criteria": "ends with", "value": "story"}], ["LeadHistory", "OrderHistory", "CustomStreamHistory"]), ([{"criteria": "ends not with", "value": "s"}], ["Account", "LeadHistory", "OrderHistory", "CustomStream", "CustomStreamHistory"]), ([{"criteria": "contains", "value": "applicat"}], ["AIApplications"]), ([{"criteria": "contains", "value": "hist"}], ["LeadHistory", "OrderHistory", "CustomStreamHistory"]), ( - [{"criteria": "not contains", "value": "stream"}], - ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"], + [{"criteria": "not contains", "value": "stream"}], + ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"], ), ( - [{"criteria": "not contains", "value": "Account"}], - ["AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream", "CustomStreamHistory"], + [{"criteria": "not contains", "value": "Account"}], + ["AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream", "CustomStreamHistory"], ), ], ) @@ -330,8 +332,8 @@ def test_discover_with_streams_criteria_param(streams_criteria, predicted_filter ] } ) - filtered_streams, _ = sf_object.get_validated_streams(config=updated_config) - assert sorted(filtered_streams) == sorted(predicted_filtered_streams) + filtered_streams = sf_object.get_validated_streams(config=updated_config) + assert sorted(filtered_streams.keys()) == sorted(predicted_filtered_streams) def test_check_connection_rate_limit(stream_config): @@ -492,8 +494,8 @@ def test_discover_only_queryable(stream_config): ] } ) - filtered_streams, _ = sf_object.get_validated_streams(config=stream_config) - assert filtered_streams == ["Account"] + filtered_streams = sf_object.get_validated_streams(config=stream_config) + assert list(filtered_streams.keys()) == ["Account"] def test_pagination_rest(stream_config, stream_api): From 63c4e029de22e73283ab3f75d9d9067b269ae10e Mon Sep 17 00:00:00 2001 From: antixar Date: Fri, 4 Feb 2022 12:44:23 +0200 Subject: [PATCH 2/6] save state --- .../source_salesforce/streams.py | 3 +-- .../source-salesforce/unit_tests/unit_test.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 7a8f48e302db..d0c35626bdd6 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -108,8 +108,7 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: class BulkSalesforceStream(SalesforceStream): - # page_size = 30000 - page_size = 10 + page_size = 30000 DEFAULT_WAIT_TIMEOUT_MINS = 10 MAX_CHECK_INTERVAL_SECONDS = 2.0 MAX_RETRY_NUMBER = 3 diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index ce89342a9565..b93a47dec733 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -564,3 +564,20 @@ def test_csv_reader_dialect_unix(): m.register_uri("GET", url + "/results", text=text) result = [dict(i[1]) for i in stream.download_data(url)] assert result == data + + +def test_forwarding_sobject_options(stream_config): + sf_object = Salesforce(**stream_config) + sf_object.login = Mock() + sf_object.access_token = Mock() + sf_object.instance_url = "https://fase-account.salesforce.com" + sf_object.describe = Mock( + return_value={ + "sobjects": [ + {"name": "Account", "queryable": True}, + {"name": "Leads", "queryable": False}, + ] + } + ) + filtered_streams = sf_object.get_validated_streams(config=stream_config) + assert list(filtered_streams.keys()) == ["Account"] From 532420132f59e00339ba5cebe2d41492896405f6 Mon Sep 17 00:00:00 2001 From: antixar Date: Fri, 4 Feb 2022 14:16:05 +0200 Subject: [PATCH 3/6] add test checking sobject_options --- .../source-salesforce/unit_tests/unit_test.py | 68 +++++++++++++++---- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index b93a47dec733..ea56c2cd8187 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -5,12 +5,14 @@ import csv import io import json +import re from unittest.mock import Mock import pytest import requests_mock from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type +from airbyte_cdk.models import ConfiguredAirbyteStream, AirbyteStream, DestinationSyncMode from requests.exceptions import HTTPError from source_salesforce.api import Salesforce @@ -566,18 +568,56 @@ def test_csv_reader_dialect_unix(): assert result == data -def test_forwarding_sobject_options(stream_config): - sf_object = Salesforce(**stream_config) - sf_object.login = Mock() - sf_object.access_token = Mock() - sf_object.instance_url = "https://fase-account.salesforce.com" - sf_object.describe = Mock( - return_value={ - "sobjects": [ - {"name": "Account", "queryable": True}, - {"name": "Leads", "queryable": False}, - ] - } +@pytest.mark.order(1) +@pytest.mark.parametrize( + "stream_names,catalog_stream_names,", ( + (["stream_1", "stream_2"], None,), + (["stream_1", "stream_2"], ["stream_1", "stream_2"],), + (["stream_1", "stream_2", "stream_3"], ["stream_1"],), + ) - filtered_streams = sf_object.get_validated_streams(config=stream_config) - assert list(filtered_streams.keys()) == ["Account"] +) +def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_names) -> None: + sobjects_matcher = re.compile('/sobjects$') + token_matcher = re.compile('/token$') + describe_matcher = re.compile('/describe$') + catalog = None + if catalog_stream_names: + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream(name=catalog_stream_name, json_schema={"type": "object"}), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, + ) for catalog_stream_name in catalog_stream_names] + ) + with requests_mock.Mocker() as m: + m.register_uri("POST", token_matcher, json={ + "instance_url": "https://fake-url.com", + "access_token": "fake-token" + }) + m.register_uri("GET", describe_matcher, json={ + "fields": [ + { + "name": "field", + "type": "string", + } + ] + }) + m.register_uri("GET", sobjects_matcher, json={ + "sobjects": [ + { + "name": stream_name, + "flag1": True, + "queryable": True, + } for stream_name in stream_names + ], + + }) + streams = SourceSalesforce().streams(config=stream_config, catalog=catalog) + expected_names = catalog_stream_names if catalog else stream_names + assert not set(expected_names).symmetric_difference(set(stream.name for stream in streams)), "doesn't match excepted streams" + + for stream in streams: + assert stream.sobject_options == {"flag1": True, "queryable": True} + return From 0d9f96e37d3908e92afcc79d619547ed05020197 Mon Sep 17 00:00:00 2001 From: antixar Date: Mon, 7 Feb 2022 12:21:16 +0200 Subject: [PATCH 4/6] add switching to non-bulk logic --- .../acceptance-test-config.yml | 3 + .../source_salesforce/api.py | 31 +++-- .../source_salesforce/source.py | 27 ++-- .../source_salesforce/spec.json | 44 +++--- .../source_salesforce/streams.py | 107 +++++++++------ .../source-salesforce/unit_tests/unit_test.py | 126 ++++++++++-------- 6 files changed, 194 insertions(+), 144 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml index 6d379470c03e..836c822330c1 100644 --- a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml @@ -7,8 +7,11 @@ tests: connection: - config_path: "secrets/config.json" status: "succeed" + - config_path: "secrets/config_sandbox.json" + status: "succeed" - config_path: "integration_tests/invalid_config.json" status: "failed" + discovery: - config_path: "secrets/config.json" basic_read: diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py index f469a19cc292..b6b53f42e8f6 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py @@ -175,14 +175,14 @@ class Salesforce: version = "v52.0" def __init__( - self, - refresh_token: str = None, - token: str = None, - client_id: str = None, - client_secret: str = None, - is_sandbox: bool = None, - start_date: str = None, - **kwargs, + self, + refresh_token: str = None, + token: str = None, + client_id: str = None, + client_secret: str = None, + is_sandbox: bool = None, + start_date: str = None, + **kwargs, ): self.refresh_token = refresh_token self.token = token @@ -210,9 +210,9 @@ def filter_streams(self, stream_name: str) -> bool: def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> Mapping[str, Any]: """Selects all validated streams with additional filtering: - 1) skip all sobjects with negative value of the flag "queryable" - 2) user can set search criterias of necessary streams - 3) selection by catalog settings + 1) skip all sobjects with negative value of the flag "queryable" + 2) user can set search criterias of necessary streams + 3) selection by catalog settings """ stream_objects = {} for stream_object in self.describe()["sobjects"]: @@ -222,8 +222,11 @@ def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAi self.logger.warn(f"Stream {stream_object['name']} is not queryable and will be ignored.") if catalog: - return {configured_stream.stream.name: stream_objects[configured_stream.stream.name] for configured_stream in catalog.streams if - configured_stream.stream.name in stream_objects} + return { + configured_stream.stream.name: stream_objects[configured_stream.stream.name] + for configured_stream in catalog.streams + if configured_stream.stream.name in stream_objects + } stream_names = list(stream_objects.keys()) if config.get("streams_criteria"): @@ -239,7 +242,7 @@ def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAi @default_backoff_handler(max_tries=5, factor=15) def _make_request( - self, http_method: str, url: str, headers: dict = None, body: dict = None, stream: bool = False, params: dict = None + self, http_method: str, url: str, headers: dict = None, body: dict = None, stream: bool = False, params: dict = None ) -> requests.models.Response: try: if http_method == "GET": diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index a13d7218e1b1..14ea8c12221f 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -14,8 +14,7 @@ from requests import codes, exceptions from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce -from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, \ - SalesforceStream +from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream class SourceSalesforce(AbstractSource): @@ -38,19 +37,17 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> @classmethod def generate_streams( - cls, - config: Mapping[str, Any], - stream_objects: Mapping[str, Any], - sf_object: Salesforce, - state: Mapping[str, Any] = None, + cls, + config: Mapping[str, Any], + stream_objects: Mapping[str, Any], + sf_object: Salesforce, + state: Mapping[str, Any] = None, ) -> List[Stream]: """ "Generates a list of stream by their names. It can be used for different tests too""" authenticator = TokenAuthenticator(sf_object.access_token) streams = [] for stream_name, sobject_options in stream_objects.items(): - streams_kwargs = { - "sobject_options": sobject_options - } + streams_kwargs = {"sobject_options": sobject_options} stream_state = state.get(stream_name, {}) if state else {} selected_properties = sf_object.generate_schema(stream_name, sobject_options).get("properties", {}) @@ -65,7 +62,6 @@ def generate_streams( else: # Use BULK API full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream - streams_kwargs["wait_timeout"] = config.get("wait_timeout") json_schema = sf_object.generate_schema(stream_name, stream_objects) pk, replication_key = sf_object.get_pk_and_replication_key(json_schema) @@ -80,16 +76,11 @@ def generate_streams( def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None, state: Mapping[str, Any] = None) -> List[Stream]: sf = self._get_sf_object(config) stream_objects = sf.get_validated_streams(config=config, catalog=catalog) - - stream_objects = {"CategoryNode": stream_objects.pop("CategoryNode")} - # for stream_name in stream_objects: - # if stream_name["name"] in stream_names: - # raise Exception(str(stream_name)) + # stream_objects = {"CategoryNode": stream_objects.pop("CategoryNode")} return self.generate_streams(config, stream_objects, sf, state=state) def read( - self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, - state: MutableMapping[str, Any] = None + self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None ) -> Iterator[AirbyteMessage]: """ Overwritten to dynamically receive only those streams that are necessary for reading for significant speed gains diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json index f9d68a3c39a3..18147ab260b8 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json @@ -4,7 +4,11 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Salesforce Source Spec", "type": "object", - "required": ["client_id", "client_secret", "refresh_token"], + "required": [ + "client_id", + "client_secret", + "refresh_token" + ], "additionalProperties": true, "properties": { "auth_type": { @@ -33,7 +37,10 @@ "description": "Date in the format 2017-01-25. Any data before this date will not be replicated. This field uses the \"updated\" field if available, otherwise the \"created\" fields if they are available for a stream. If not set, then by default all your data is replicated.", "type": "string", "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z|[0-9]{4}-[0-9]{2}-[0-9]{2}$", - "examples": ["2021-07-25", "2021-07-25T00:00:00Z"] + "examples": [ + "2021-07-25", + "2021-07-25T00:00:00Z" + ] }, "is_sandbox": { "title": "Sandbox", @@ -45,7 +52,10 @@ "type": "array", "items": { "type": "object", - "required": ["criteria", "value"], + "required": [ + "criteria", + "value" + ], "properties": { "criteria": { "type": "string", @@ -70,20 +80,14 @@ }, "title": "Streams filter criteria", "description": "Add selection criteria for streams to get only streams that are relevant to you" - }, - "wait_timeout": { - "title": "Response Waiting Time", - "description": "Maximum wait time of Salesforce responses in minutes. This option is used for the BULK mode only. The default wait time of the Parent Batch in the Bulk Mode to wait for all the batches to finish processing is 20 minutes.", - "type": "integer", - "minimum": 5, - "maximum": 60, - "default": 10 } } }, "advanced_auth": { "auth_flow_type": "oauth2.0", - "predicate_key": ["auth_type"], + "predicate_key": [ + "auth_type" + ], "predicate_value": "Client", "oauth_config_specification": { "oauth_user_input_from_connector_config_specification": { @@ -92,7 +96,9 @@ "properties": { "is_sandbox": { "type": "boolean", - "path_in_connector_config": ["is_sandbox"] + "path_in_connector_config": [ + "is_sandbox" + ] } } }, @@ -102,7 +108,9 @@ "properties": { "refresh_token": { "type": "string", - "path_in_connector_config": ["refresh_token"] + "path_in_connector_config": [ + "refresh_token" + ] } } }, @@ -124,11 +132,15 @@ "properties": { "client_id": { "type": "string", - "path_in_connector_config": ["client_id"] + "path_in_connector_config": [ + "client_id" + ] }, "client_secret": { "type": "string", - "path_in_connector_config": ["client_secret"] + "path_in_connector_config": [ + "client_secret" + ] } } } diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index d0c35626bdd6..13980a5ba264 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -25,8 +25,9 @@ class SalesforceStream(HttpStream, ABC): page_size = 2000 transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, sobject_options: Mapping[str, Any] = None, schema: dict = None, - **kwargs): + def __init__( + self, sf_api: Salesforce, pk: str, stream_name: str, sobject_options: Mapping[str, Any] = None, schema: dict = None, **kwargs + ): super().__init__(**kwargs) self.sf_api = sf_api self.pk = pk @@ -59,7 +60,7 @@ def next_page_token(self, response: requests.Response) -> str: return response_data.get("nextRecordsUrl") def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: """ Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm @@ -109,14 +110,10 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: class BulkSalesforceStream(SalesforceStream): page_size = 30000 - DEFAULT_WAIT_TIMEOUT_MINS = 10 + DEFAULT_WAIT_TIMEOUT_SECONDS = 600 MAX_CHECK_INTERVAL_SECONDS = 2.0 MAX_RETRY_NUMBER = 3 - def __init__(self, wait_timeout: Optional[int], **kwargs): - super().__init__(**kwargs) - self._wait_timeout = wait_timeout or self.DEFAULT_WAIT_TIMEOUT_MINS - def path(self, **kwargs) -> str: return f"/services/data/{self.sf_api.version}/jobs/query" @@ -146,15 +143,9 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.htm """ json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"} - self.logger.warning(str(json)) - # json = {'operation': 'queryAll', - # 'query': 'SELECT Id,ParentId,MasterLabel,SortOrder,SortStyle,CreatedDate,CreatedById,LastModifiedDate,LastModifiedById,SystemModstamp FROM CategoryNode ORDER BY SystemModstamp ASC LIMIT 10', - # 'contentType': 'CSV', 'columnDelimiter': 'COMMA', 'lineEnding': 'LF'} try: response = self._send_http_request("POST", url, json=json) job_id = response.json()["id"] - # self.logger.info(f"Created Job: {job_id} to sync {self.name}") - self.logger.warning(f"Created Job: {job_id} to sync {self.name}, {json}") return job_id except exceptions.HTTPError as error: if error.response.status_code in [codes.FORBIDDEN, codes.BAD_REQUEST]: @@ -174,16 +165,22 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: error_code = error_data.get("errorCode") error_message = error_data.get("message", "") if error_message == "Selecting compound data not supported in Bulk Query" or ( - error_code == "INVALIDENTITY" and "is not supported by the Bulk API" in error_message + error_code == "INVALIDENTITY" and "is not supported by the Bulk API" in error_message ): - self.logger.error(f"Cannot receive data for stream '{self.name}' using BULK API, " - f"sobject options: {self.sobject_options}, error message: '{error_message}'") + self.logger.error( + f"Cannot receive data for stream '{self.name}' using BULK API, " + f"sobject options: {self.sobject_options}, error message: '{error_message}'" + ) elif error.response.status_code == codes.FORBIDDEN and error_code != "REQUEST_LIMIT_EXCEEDED": - self.logger.error(f"Cannot receive data for stream '{self.name}' ," - f"sobject options: {self.sobject_options}, error message: '{error_message}'") + self.logger.error( + f"Cannot receive data for stream '{self.name}' ," + f"sobject options: {self.sobject_options}, error message: '{error_message}'" + ) elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"): - self.logger.error(f"The stream '{self.name}' is not queryable, " - f"sobject options: {self.sobject_options}, error message: '{error_message}'") + self.logger.error( + f"The stream '{self.name}' is not queryable, " + f"sobject options: {self.sobject_options}, error message: '{error_message}'" + ) else: raise error else: @@ -191,8 +188,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: return None def wait_for_job(self, url: str) -> str: - # using "seconds" argument because self._wait_timeout can be changed by tests - expiration_time: DateTime = pendulum.now().add(seconds=int(self._wait_timeout * 60.0)) + expiration_time: DateTime = pendulum.now().add(seconds=self.DEFAULT_WAIT_TIMEOUT_SECONDS) job_status = "InProgress" delay_timeout = 0 delay_cnt = 0 @@ -210,8 +206,9 @@ def wait_for_job(self, url: str) -> str: if not error_message: # not all failed response can have "errorMessage" and we need to print full response body error_message = job_info - self.logger.error(f"JobStatus: {job_status}, " - f"sobject options: {self.sobject_options}, error message: '{error_message}'") + self.logger.error( + f"JobStatus: {job_status}, " f"sobject options: {self.sobject_options}, error message: '{error_message}'" + ) return job_status @@ -225,15 +222,15 @@ def wait_for_job(self, url: str) -> str: f"Sleeping {delay_timeout} seconds while waiting for Job: {self.name}/{job_id}" f" to complete. Current state: {job_status}" ) - self.logger.warning(f"Not wait the {self.name} data for {self._wait_timeout} minutes, data: {job_info}!!") + self.logger.warning(f"Not wait the {self.name} data for {self.DEFAULT_WAIT_TIMEOUT_SECONDS} seconds, data: {job_info}!!") return job_status - def execute_job(self, query: str, url: str) -> str: + def execute_job(self, query: str, url: str) -> Tuple[Optional[str], Optional[str]]: job_status = "Failed" for i in range(0, self.MAX_RETRY_NUMBER): job_id = self.create_stream_job(query=query, url=url) if not job_id: - return None + return None, None job_full_url = f"{url}/{job_id}" job_status = self.wait_for_job(url=job_full_url) if job_status not in ["UploadComplete", "InProgress"]: @@ -244,8 +241,8 @@ def execute_job(self, query: str, url: str) -> str: if job_status in ["Aborted", "Failed"]: self.delete_job(url=job_full_url) - raise Exception(f"Job for {self.name} stream using BULK API was failed.") - return job_full_url + return None, job_status + return job_full_url, job_status def filter_null_bytes(self, s: str): """ @@ -277,7 +274,7 @@ def next_page_token(self, last_record: dict) -> str: return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' " def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: """ Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm @@ -294,11 +291,11 @@ def request_params( return {"q": query} def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: stream_state = stream_state or {} next_page_token = None @@ -306,9 +303,20 @@ def read_records( while True: params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) path = self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - job_full_url = self.execute_job(query=params["q"], url=f"{self.url_base}{path}") + job_full_url, job_status = self.execute_job(query=params["q"], url=f"{self.url_base}{path}") if not job_full_url: - return + if job_status == "Failed": + # As rule as BULK logic returns unhandled error. For instance: + # error message: 'Unexpected exception encountered in query processing. + # Please contact support with the following id: 326566388-63578 (-436445966)'" + # Thus we can try to switch to GET sync request because its response returns obvious error message + standard_instance = self.get_standard_instance() + self.logger.info("switch to STANDARD(non-BULK) sync. Because the SalesForce BULK job has returned a failed status") + yield from standard_instance.read_records( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ) + return + raise Exception(f"Job for {self.name} stream using BULK API was failed.") count = 0 for count, record in self.download_data(url=job_full_url): @@ -324,6 +332,25 @@ def read_records( # not found a next page data. break + def get_standard_instance(self) -> SalesforceStream: + """Returns a instance of standard logic(non-BULK) with same settings""" + stream_kwargs = dict( + sf_api=self.sf_api, + pk=self.pk, + stream_name=self.stream_name, + schema=self.schema, + sobject_options=self.sobject_options, + authenticator=self.authenticator, + ) + + if isinstance(self, BulkIncrementalSalesforceStream): + stream_kwargs.update({"replication_key": self.replication_key, "start_date": self.start_date}) + new_cls = IncrementalSalesforceStream + else: + new_cls = SalesforceStream + + return new_cls(**stream_kwargs) + class IncrementalSalesforceStream(SalesforceStream, ABC): state_checkpoint_interval = 500 @@ -340,7 +367,7 @@ def format_start_date(start_date: Optional[str]) -> Optional[str]: return pendulum.parse(start_date).strftime("%Y-%m-%dT%H:%M:%SZ") def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: if next_page_token: """ @@ -381,7 +408,7 @@ def next_page_token(self, last_record: dict) -> str: return last_record[self.cursor_field] def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: selected_properties = self.get_json_schema().get("properties", {}) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index ea56c2cd8187..9457d455f858 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -11,14 +11,11 @@ import pytest import requests_mock from airbyte_cdk.logger import AirbyteLogger -from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type -from airbyte_cdk.models import ConfiguredAirbyteStream, AirbyteStream, DestinationSyncMode +from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type from requests.exceptions import HTTPError - from source_salesforce.api import Salesforce from source_salesforce.source import SourceSalesforce -from source_salesforce.streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, \ - IncrementalSalesforceStream, SalesforceStream +from source_salesforce.streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream @pytest.fixture(scope="module") @@ -152,7 +149,7 @@ def test_stream_has_no_state_bulk_api_should_be_used(stream_config, stream_api): def test_bulk_sync_pagination(item_number, stream_config, stream_api): stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) test_ids = [i for i in range(1, item_number)] - pages = [test_ids[i: i + stream.page_size] for i in range(0, len(test_ids), stream.page_size)] + pages = [test_ids[i : i + stream.page_size] for i in range(0, len(test_ids), stream.page_size)] if not pages: pages = [[]] with requests_mock.Mocker() as m: @@ -215,7 +212,8 @@ def test_bulk_sync_successful_long_response(stream_config, stream_api): @pytest.mark.timeout(17) def test_bulk_sync_successful_retry(stream_config, stream_api): stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) - stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds + stream.DEFAULT_WAIT_TIMEOUT_SECONDS = 6 # maximum wait timeout will be 6 seconds + with requests_mock.Mocker() as m: job_id = _prepare_mock(m, stream) # 2 failed attempts, 3rd one should be successful @@ -229,7 +227,7 @@ def test_bulk_sync_successful_retry(stream_config, stream_api): @pytest.mark.timeout(30) def test_bulk_sync_failed_retry(stream_config, stream_api): stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) - stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds + stream.DEFAULT_WAIT_TIMEOUT_SECONDS = 6 # maximum wait timeout will be 6 seconds with requests_mock.Mocker() as m: job_id = _prepare_mock(m, stream) m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "InProgress", "id": job_id}) @@ -248,12 +246,12 @@ def test_bulk_sync_failed_retry(stream_config, stream_api): ], ) def test_stream_start_date( - start_date_provided, - stream_name, - expected_start_date, - stream_config, - stream_api, - stream_config_without_start_date, + start_date_provided, + stream_name, + expected_start_date, + stream_config, + stream_api, + stream_config_without_start_date, ): if start_date_provided: stream = _generate_stream(stream_name, stream_config, stream_api) @@ -292,25 +290,25 @@ def test_download_data_filter_null_bytes(stream_config, stream_api): [ ([{"criteria": "exacts", "value": "Account"}], ["Account"]), ( - [{"criteria": "not exacts", "value": "CustomStreamHistory"}], - ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream"], + [{"criteria": "not exacts", "value": "CustomStreamHistory"}], + ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream"], ), ([{"criteria": "starts with", "value": "lead"}], ["Leads", "LeadHistory"]), ( - [{"criteria": "starts not with", "value": "custom"}], - ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"], + [{"criteria": "starts not with", "value": "custom"}], + ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"], ), ([{"criteria": "ends with", "value": "story"}], ["LeadHistory", "OrderHistory", "CustomStreamHistory"]), ([{"criteria": "ends not with", "value": "s"}], ["Account", "LeadHistory", "OrderHistory", "CustomStream", "CustomStreamHistory"]), ([{"criteria": "contains", "value": "applicat"}], ["AIApplications"]), ([{"criteria": "contains", "value": "hist"}], ["LeadHistory", "OrderHistory", "CustomStreamHistory"]), ( - [{"criteria": "not contains", "value": "stream"}], - ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"], + [{"criteria": "not contains", "value": "stream"}], + ["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"], ), ( - [{"criteria": "not contains", "value": "Account"}], - ["AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream", "CustomStreamHistory"], + [{"criteria": "not contains", "value": "Account"}], + ["AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream", "CustomStreamHistory"], ), ], ) @@ -505,7 +503,7 @@ def test_pagination_rest(stream_config, stream_api): state = {stream_name: {"SystemModstamp": "2122-08-22T05:08:29.000Z"}} stream: SalesforceStream = _generate_stream(stream_name, stream_config, stream_api, state=state) - stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds + stream.DEFAULT_WAIT_TIMEOUT_SECONDS = 6 # maximum wait timeout will be 6 seconds next_page_url = "/services/data/v52.0/query/012345" with requests_mock.Mocker() as m: resp_1 = { @@ -546,7 +544,7 @@ def test_pagination_rest(stream_config, stream_api): def test_csv_reader_dialect_unix(): - stream: BulkSalesforceStream = BulkSalesforceStream(stream_name=None, wait_timeout=None, sf_api=None, pk=None) + stream: BulkSalesforceStream = BulkSalesforceStream(stream_name=None, sf_api=None, pk=None) url = "https://fake-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA" data = [ @@ -570,17 +568,26 @@ def test_csv_reader_dialect_unix(): @pytest.mark.order(1) @pytest.mark.parametrize( - "stream_names,catalog_stream_names,", ( - (["stream_1", "stream_2"], None,), - (["stream_1", "stream_2"], ["stream_1", "stream_2"],), - (["stream_1", "stream_2", "stream_3"], ["stream_1"],), - - ) + "stream_names,catalog_stream_names,", + ( + ( + ["stream_1", "stream_2"], + None, + ), + ( + ["stream_1", "stream_2"], + ["stream_1", "stream_2"], + ), + ( + ["stream_1", "stream_2", "stream_3"], + ["stream_1"], + ), + ), ) def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_names) -> None: - sobjects_matcher = re.compile('/sobjects$') - token_matcher = re.compile('/token$') - describe_matcher = re.compile('/describe$') + sobjects_matcher = re.compile("/sobjects$") + token_matcher = re.compile("/token$") + describe_matcher = re.compile("/describe$") catalog = None if catalog_stream_names: catalog = ConfiguredAirbyteCatalog( @@ -589,31 +596,38 @@ def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_ stream=AirbyteStream(name=catalog_stream_name, json_schema={"type": "object"}), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.overwrite, - ) for catalog_stream_name in catalog_stream_names] + ) + for catalog_stream_name in catalog_stream_names + ] ) with requests_mock.Mocker() as m: - m.register_uri("POST", token_matcher, json={ - "instance_url": "https://fake-url.com", - "access_token": "fake-token" - }) - m.register_uri("GET", describe_matcher, json={ - "fields": [ - { - "name": "field", - "type": "string", - } - ] - }) - m.register_uri("GET", sobjects_matcher, json={ - "sobjects": [ - { - "name": stream_name, - "flag1": True, - "queryable": True, - } for stream_name in stream_names - ], - - }) + m.register_uri("POST", token_matcher, json={"instance_url": "https://fake-url.com", "access_token": "fake-token"}) + m.register_uri( + "GET", + describe_matcher, + json={ + "fields": [ + { + "name": "field", + "type": "string", + } + ] + }, + ) + m.register_uri( + "GET", + sobjects_matcher, + json={ + "sobjects": [ + { + "name": stream_name, + "flag1": True, + "queryable": True, + } + for stream_name in stream_names + ], + }, + ) streams = SourceSalesforce().streams(config=stream_config, catalog=catalog) expected_names = catalog_stream_names if catalog else stream_names assert not set(expected_names).symmetric_difference(set(stream.name for stream in streams)), "doesn't match excepted streams" From ab15b95bc313f415c1b207db6005dff7b5345651 Mon Sep 17 00:00:00 2001 From: antixar Date: Thu, 10 Feb 2022 19:21:26 +0200 Subject: [PATCH 5/6] bump version --- airbyte-integrations/connectors/source-salesforce/Dockerfile | 2 +- docs/integrations/sources/salesforce.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index aadc9c50ced8..babf0c6ebf10 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.22 +LABEL io.airbyte.version=0.1.23 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 899f208e27d5..bbafe00792b9 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -739,6 +739,7 @@ List of available streams: | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:--------------------------------------------------------------------------| +| 0.1.22 | 2022-02-10 | [10141](https://github.com/airbytehq/airbyte/pull/10141) | processing of failed jobs | | 0.1.22 | 2022-02-02 | [10012](https://github.com/airbytehq/airbyte/pull/10012) | Increase CSV field_size_limit | | 0.1.21 | 2022-01-28 | [9499](https://github.com/airbytehq/airbyte/pull/9499) | If a sync reaches daily rate limit it ends the sync early with success status. Read more in `Performance considerations` section | | 0.1.20 | 2022-01-26 | [9757](https://github.com/airbytehq/airbyte/pull/9757) | Parse CSV with "unix" dialect | From d05e67fad1be9d3752eee1248052457c63b596d8 Mon Sep 17 00:00:00 2001 From: antixar Date: Thu, 10 Feb 2022 19:43:39 +0200 Subject: [PATCH 6/6] update spec file --- .../b117307c-14b6-41aa-9422-947e34922962.json | 2 +- .../src/main/resources/seed/source_definitions.yaml | 2 +- .../init/src/main/resources/seed/source_specs.yaml | 12 +----------- docs/integrations/sources/salesforce.md | 2 +- 4 files changed, 4 insertions(+), 14 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json index 928565cd216e..165218eded0b 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962", "name": "Salesforce", "dockerRepository": "airbyte/source-salesforce", - "dockerImageTag": "0.1.22", + "dockerImageTag": "0.1.23", "documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce", "icon": "salesforce.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index b7774b0ce986..406dd48c4406 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -648,7 +648,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.22 + dockerImageTag: 0.1.23 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 9fb1316fe936..b95ff6221753 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6801,7 +6801,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:0.1.22" +- dockerImage: "airbyte/source-salesforce:0.1.23" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce" connectionSpecification: @@ -6879,16 +6879,6 @@ title: "Streams filter criteria" description: "Add selection criteria for streams to get only streams that\ \ are relevant to you" - wait_timeout: - title: "Response Waiting Time" - description: "Maximum wait time of Salesforce responses in minutes. This\ - \ option is used for the BULK mode only. The default wait time of the\ - \ Parent Batch in the Bulk Mode to wait for all the batches to finish\ - \ processing is 20 minutes." - type: "integer" - minimum: 5 - maximum: 60 - default: 10 supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index bbafe00792b9..af05c2eee268 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -739,7 +739,7 @@ List of available streams: | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:--------------------------------------------------------------------------| -| 0.1.22 | 2022-02-10 | [10141](https://github.com/airbytehq/airbyte/pull/10141) | processing of failed jobs | +| 0.1.23 | 2022-02-10 | [10141](https://github.com/airbytehq/airbyte/pull/10141) | processing of failed jobs | | 0.1.22 | 2022-02-02 | [10012](https://github.com/airbytehq/airbyte/pull/10012) | Increase CSV field_size_limit | | 0.1.21 | 2022-01-28 | [9499](https://github.com/airbytehq/airbyte/pull/9499) | If a sync reaches daily rate limit it ends the sync early with success status. Read more in `Performance considerations` section | | 0.1.20 | 2022-01-26 | [9757](https://github.com/airbytehq/airbyte/pull/9757) | Parse CSV with "unix" dialect |