diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json index 928565cd216e..165218eded0b 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962", "name": "Salesforce", "dockerRepository": "airbyte/source-salesforce", - "dockerImageTag": "0.1.22", + "dockerImageTag": "0.1.23", "documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce", "icon": "salesforce.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index b7774b0ce986..406dd48c4406 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -648,7 +648,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.22 + dockerImageTag: 0.1.23 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 9fb1316fe936..b95ff6221753 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6801,7 +6801,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:0.1.22" +- dockerImage: "airbyte/source-salesforce:0.1.23" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce" connectionSpecification: @@ -6879,16 +6879,6 @@ title: "Streams filter criteria" description: "Add selection criteria for streams to get only streams that\ \ are relevant to you" - wait_timeout: - title: "Response Waiting Time" - description: "Maximum wait time of Salesforce responses in minutes. This\ - \ option is used for the BULK mode only. The default wait time of the\ - \ Parent Batch in the Bulk Mode to wait for all the batches to finish\ - \ processing is 20 minutes." - type: "integer" - minimum: 5 - maximum: 60 - default: 10 supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index aadc9c50ced8..babf0c6ebf10 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.22 +LABEL io.airbyte.version=0.1.23 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml index 6d379470c03e..836c822330c1 100644 --- a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml @@ -7,8 +7,11 @@ tests: connection: - config_path: "secrets/config.json" status: "succeed" + - config_path: "secrets/config_sandbox.json" + status: "succeed" - config_path: "integration_tests/invalid_config.json" status: "failed" + discovery: - config_path: "secrets/config.json" basic_read: diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py b/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py index fb6f63f2ec27..14e6001a02c2 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py @@ -4,10 +4,13 @@ import json import logging +import re from pathlib import Path from typing import Any, Mapping import pytest +import requests_mock +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams import Stream from source_salesforce.source import SourceSalesforce @@ -20,6 +23,12 @@ def parse_input_config(): return json.loads(file.read()) +@pytest.fixture(name="input_sandbox_config") +def parse_input_sandbox_config(): + with open(HERE.parent / "secrets/config_sandbox.json", "r") as file: + return json.loads(file.read()) + + def get_stream(input_config: Mapping[str, Any], stream_name: str) -> Stream: stream_cls = type("a", (object,), {"name": stream_name}) configured_stream_cls = type("b", (object,), {"stream": stream_cls()}) @@ -42,3 +51,42 @@ def test_not_queryable_stream(caplog, input_config): # check logs assert "is not queryable" in caplog.records[-1].message + + +@pytest.mark.parametrize( + "stream_name,log_messages", + ( + ( + "Dashboard", + ["switch to STANDARD(non-BULK) sync"], + ), + # CategoryNode has access limitation thus SF returns failed job statuses + ( + "CategoryNode", + ["insufficient access rights on cross-reference id", "switch to STANDARD(non-BULK) sync"], + ), + ), + ids=["successful_switching", "failed_switching"], +) +def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, stream_name, log_messages): + stream = get_stream(input_sandbox_config, stream_name) + expected_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh)) + + create_query_matcher = re.compile(r"jobs/query$") + job_matcher = re.compile(r"jobs/query/fake_id$") + loaded_record_ids = [] + with requests_mock.Mocker(real_http=True) as m: + m.register_uri( + "POST", + create_query_matcher, + json={ + "id": "fake_id", + }, + ) + m.register_uri("GET", job_matcher, json={"state": "Failed", "errorMessage": "unknown error"}) + m.register_uri("DELETE", job_matcher, json={}) + with caplog.at_level(logging.WARNING): + loaded_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh)) + for i, log_message in enumerate(log_messages, 1): + assert log_message in caplog.records[-i].message + assert loaded_record_ids == expected_record_ids diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py index ad5300c29d00..b6b53f42e8f6 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py @@ -11,7 +11,7 @@ from .exceptions import TypeSalesforceException from .rate_limiting import default_backoff_handler -from .utils import filter_streams +from .utils import filter_streams_by_criteria STRING_TYPES = [ "byte", @@ -191,7 +191,9 @@ def __init__( self.access_token = None self.instance_url = None self.session = requests.Session() - self.is_sandbox = is_sandbox is True or (isinstance(is_sandbox, str) and is_sandbox.lower() == "true") + self.is_sandbox = is_sandbox in [True, "true"] + if self.is_sandbox: + self.logger.info("using SANDBOX of Salesforce") self.start_date = start_date def _get_standard_headers(self): @@ -206,30 +208,37 @@ def filter_streams(self, stream_name: str) -> bool: return False return True - def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None): - salesforce_objects = self.describe()["sobjects"] - stream_objects = [] - for stream_object in salesforce_objects: + def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> Mapping[str, Any]: + """Selects all validated streams with additional filtering: + 1) skip all sobjects with negative value of the flag "queryable" + 2) user can set search criterias of necessary streams + 3) selection by catalog settings + """ + stream_objects = {} + for stream_object in self.describe()["sobjects"]: if stream_object["queryable"]: - stream_objects.append(stream_object) + stream_objects[stream_object.pop("name")] = stream_object else: self.logger.warn(f"Stream {stream_object['name']} is not queryable and will be ignored.") - stream_names = [stream_object["name"] for stream_object in stream_objects] if catalog: - return [configured_stream.stream.name for configured_stream in catalog.streams], stream_objects + return { + configured_stream.stream.name: stream_objects[configured_stream.stream.name] + for configured_stream in catalog.streams + if configured_stream.stream.name in stream_objects + } + stream_names = list(stream_objects.keys()) if config.get("streams_criteria"): filtered_stream_list = [] for stream_criteria in config["streams_criteria"]: - filtered_stream_list += filter_streams( + filtered_stream_list += filter_streams_by_criteria( streams_list=stream_names, search_word=stream_criteria["value"], search_criteria=stream_criteria["criteria"] ) stream_names = list(set(filtered_stream_list)) validated_streams = [stream_name for stream_name in stream_names if self.filter_streams(stream_name)] - validated_stream_objects = [stream_object for stream_object in stream_objects if stream_object["name"] in validated_streams] - return validated_streams, validated_stream_objects + return {stream_name: sobject_options for stream_name, sobject_options in stream_objects.items() if stream_name in validated_streams} @default_backoff_handler(max_tries=5, factor=15) def _make_request( @@ -261,7 +270,7 @@ def login(self): self.access_token = auth["access_token"] self.instance_url = auth["instance_url"] - def describe(self, sobject: str = None, stream_objects: List = None) -> Mapping[str, Any]: + def describe(self, sobject: str = None, sobject_options: Mapping[str, Any] = None) -> Mapping[str, Any]: """Describes all objects or a specific object""" headers = self._get_standard_headers() @@ -269,12 +278,12 @@ def describe(self, sobject: str = None, stream_objects: List = None) -> Mapping[ url = f"{self.instance_url}/services/data/{self.version}/{endpoint}" resp = self._make_request("GET", url, headers=headers) - if resp.status_code == 404: - self.logger.error(f"Filtered stream objects: {stream_objects}") + if resp.status_code == 404 and sobject: + self.logger.error(f"not found a description for the sobject '{sobject}'. Sobject options: {sobject_options}") return resp.json() - def generate_schema(self, stream_name: str = None, stream_objects: List = None) -> Mapping[str, Any]: - response = self.describe(stream_name, stream_objects) + def generate_schema(self, stream_name: str = None, stream_options: Mapping[str, Any] = None) -> Mapping[str, Any]: + response = self.describe(stream_name, stream_options) schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}} for field in response["fields"]: schema["properties"][field["name"]] = self.field_to_property_schema(field) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index e7edfb658f6c..15cb754b9efb 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -39,19 +39,18 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> def generate_streams( cls, config: Mapping[str, Any], - stream_names: List[str], + stream_objects: Mapping[str, Any], sf_object: Salesforce, state: Mapping[str, Any] = None, - stream_objects: List = None, ) -> List[Stream]: """ "Generates a list of stream by their names. It can be used for different tests too""" authenticator = TokenAuthenticator(sf_object.access_token) streams = [] - for stream_name in stream_names: - streams_kwargs = {} + for stream_name, sobject_options in stream_objects.items(): + streams_kwargs = {"sobject_options": sobject_options} stream_state = state.get(stream_name, {}) if state else {} - selected_properties = sf_object.generate_schema(stream_name, stream_objects).get("properties", {}) + selected_properties = sf_object.generate_schema(stream_name, sobject_options).get("properties", {}) # Salesforce BULK API currently does not support loading fields with data type base64 and compound data properties_not_supported_by_bulk = { key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"] @@ -63,7 +62,6 @@ def generate_streams( else: # Use BULK API full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream - streams_kwargs["wait_timeout"] = config.get("wait_timeout") json_schema = sf_object.generate_schema(stream_name, stream_objects) pk, replication_key = sf_object.get_pk_and_replication_key(json_schema) @@ -77,8 +75,8 @@ def generate_streams( def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None, state: Mapping[str, Any] = None) -> List[Stream]: sf = self._get_sf_object(config) - stream_names, stream_objects = sf.get_validated_streams(config=config, catalog=catalog) - return self.generate_streams(config, stream_names, sf, state=state, stream_objects=stream_objects) + stream_objects = sf.get_validated_streams(config=config, catalog=catalog) + return self.generate_streams(config, stream_objects, sf, state=state) def read( self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json index f9d68a3c39a3..18147ab260b8 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json @@ -4,7 +4,11 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Salesforce Source Spec", "type": "object", - "required": ["client_id", "client_secret", "refresh_token"], + "required": [ + "client_id", + "client_secret", + "refresh_token" + ], "additionalProperties": true, "properties": { "auth_type": { @@ -33,7 +37,10 @@ "description": "Date in the format 2017-01-25. Any data before this date will not be replicated. This field uses the \"updated\" field if available, otherwise the \"created\" fields if they are available for a stream. If not set, then by default all your data is replicated.", "type": "string", "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z|[0-9]{4}-[0-9]{2}-[0-9]{2}$", - "examples": ["2021-07-25", "2021-07-25T00:00:00Z"] + "examples": [ + "2021-07-25", + "2021-07-25T00:00:00Z" + ] }, "is_sandbox": { "title": "Sandbox", @@ -45,7 +52,10 @@ "type": "array", "items": { "type": "object", - "required": ["criteria", "value"], + "required": [ + "criteria", + "value" + ], "properties": { "criteria": { "type": "string", @@ -70,20 +80,14 @@ }, "title": "Streams filter criteria", "description": "Add selection criteria for streams to get only streams that are relevant to you" - }, - "wait_timeout": { - "title": "Response Waiting Time", - "description": "Maximum wait time of Salesforce responses in minutes. This option is used for the BULK mode only. The default wait time of the Parent Batch in the Bulk Mode to wait for all the batches to finish processing is 20 minutes.", - "type": "integer", - "minimum": 5, - "maximum": 60, - "default": 10 } } }, "advanced_auth": { "auth_flow_type": "oauth2.0", - "predicate_key": ["auth_type"], + "predicate_key": [ + "auth_type" + ], "predicate_value": "Client", "oauth_config_specification": { "oauth_user_input_from_connector_config_specification": { @@ -92,7 +96,9 @@ "properties": { "is_sandbox": { "type": "boolean", - "path_in_connector_config": ["is_sandbox"] + "path_in_connector_config": [ + "is_sandbox" + ] } } }, @@ -102,7 +108,9 @@ "properties": { "refresh_token": { "type": "string", - "path_in_connector_config": ["refresh_token"] + "path_in_connector_config": [ + "refresh_token" + ] } } }, @@ -124,11 +132,15 @@ "properties": { "client_id": { "type": "string", - "path_in_connector_config": ["client_id"] + "path_in_connector_config": [ + "client_id" + ] }, "client_secret": { "type": "string", - "path_in_connector_config": ["client_secret"] + "path_in_connector_config": [ + "client_secret" + ] } } } diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index bbd54a89470f..3851f41a0e18 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -30,12 +30,15 @@ class SalesforceStream(HttpStream, ABC): page_size = 2000 transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs): + def __init__( + self, sf_api: Salesforce, pk: str, stream_name: str, sobject_options: Mapping[str, Any] = None, schema: dict = None, **kwargs + ): super().__init__(**kwargs) self.sf_api = sf_api self.pk = pk self.stream_name = stream_name self.schema = schema + self.sobject_options = sobject_options @property def name(self) -> str: @@ -112,14 +115,10 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: class BulkSalesforceStream(SalesforceStream): page_size = 30000 - DEFAULT_WAIT_TIMEOUT_MINS = 10 + DEFAULT_WAIT_TIMEOUT_SECONDS = 600 MAX_CHECK_INTERVAL_SECONDS = 2.0 MAX_RETRY_NUMBER = 3 - def __init__(self, wait_timeout: Optional[int], **kwargs): - super().__init__(**kwargs) - self._wait_timeout = wait_timeout or self.DEFAULT_WAIT_TIMEOUT_MINS - def path(self, **kwargs) -> str: return f"/services/data/{self.sf_api.version}/jobs/query" @@ -140,7 +139,7 @@ def _send_http_request(self, method: str, url: str, json: dict = None): headers = self.authenticator.get_auth_header() response = self._session.request(method, url=url, headers=headers, json=json) if response.status_code not in [200, 204]: - self.logger.error(f"error body: {response.text}") + self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}") response.raise_for_status() return response @@ -149,11 +148,9 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.htm """ json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"} - try: response = self._send_http_request("POST", url, json=json) job_id = response.json()["id"] - self.logger.info(f"Created Job: {job_id} to sync {self.name}") return job_id except exceptions.HTTPError as error: if error.response.status_code in [codes.FORBIDDEN, codes.BAD_REQUEST]: @@ -175,11 +172,20 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: if error_message == "Selecting compound data not supported in Bulk Query" or ( error_code == "INVALIDENTITY" and "is not supported by the Bulk API" in error_message ): - self.logger.error(f"Cannot receive data for stream '{self.name}' using BULK API, error message: '{error_message}'") + self.logger.error( + f"Cannot receive data for stream '{self.name}' using BULK API, " + f"sobject options: {self.sobject_options}, error message: '{error_message}'" + ) elif error.response.status_code == codes.FORBIDDEN and error_code != "REQUEST_LIMIT_EXCEEDED": - self.logger.error(f"Cannot receive data for stream '{self.name}', error message: '{error_message}'") + self.logger.error( + f"Cannot receive data for stream '{self.name}' ," + f"sobject options: {self.sobject_options}, error message: '{error_message}'" + ) elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"): - self.logger.error(f"The stream '{self.name}' is not queryable, error message: '{error_message}'") + self.logger.error( + f"The stream '{self.name}' is not queryable, " + f"sobject options: {self.sobject_options}, error message: '{error_message}'" + ) else: raise error else: @@ -187,8 +193,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: return None def wait_for_job(self, url: str) -> str: - # using "seconds" argument because self._wait_timeout can be changed by tests - expiration_time: DateTime = pendulum.now().add(seconds=int(self._wait_timeout * 60.0)) + expiration_time: DateTime = pendulum.now().add(seconds=self.DEFAULT_WAIT_TIMEOUT_SECONDS) job_status = "InProgress" delay_timeout = 0 delay_cnt = 0 @@ -202,7 +207,13 @@ def wait_for_job(self, url: str) -> str: if job_status in ["JobComplete", "Aborted", "Failed"]: if job_status != "JobComplete": # this is only job metadata without payload - self.logger.error(f"JobStatus: {job_status}, full job response: {job_info}") + error_message = job_info.get("errorMessage") + if not error_message: + # not all failed response can have "errorMessage" and we need to print full response body + error_message = job_info + self.logger.error( + f"JobStatus: {job_status}, " f"sobject options: {self.sobject_options}, error message: '{error_message}'" + ) return job_status @@ -216,15 +227,15 @@ def wait_for_job(self, url: str) -> str: f"Sleeping {delay_timeout} seconds while waiting for Job: {self.name}/{job_id}" f" to complete. Current state: {job_status}" ) - self.logger.warning(f"Not wait the {self.name} data for {self._wait_timeout} minutes, data: {job_info}!!") + self.logger.warning(f"Not wait the {self.name} data for {self.DEFAULT_WAIT_TIMEOUT_SECONDS} seconds, data: {job_info}!!") return job_status - def execute_job(self, query: str, url: str) -> str: + def execute_job(self, query: str, url: str) -> Tuple[Optional[str], Optional[str]]: job_status = "Failed" for i in range(0, self.MAX_RETRY_NUMBER): job_id = self.create_stream_job(query=query, url=url) if not job_id: - return None + return None, None job_full_url = f"{url}/{job_id}" job_status = self.wait_for_job(url=job_full_url) if job_status not in ["UploadComplete", "InProgress"]: @@ -235,8 +246,8 @@ def execute_job(self, query: str, url: str) -> str: if job_status in ["Aborted", "Failed"]: self.delete_job(url=job_full_url) - raise Exception(f"Job for {self.name} stream using BULK API was failed.") - return job_full_url + return None, job_status + return job_full_url, job_status def filter_null_bytes(self, s: str): """ @@ -297,9 +308,20 @@ def read_records( while True: params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) path = self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - job_full_url = self.execute_job(query=params["q"], url=f"{self.url_base}{path}") + job_full_url, job_status = self.execute_job(query=params["q"], url=f"{self.url_base}{path}") if not job_full_url: - return + if job_status == "Failed": + # As rule as BULK logic returns unhandled error. For instance: + # error message: 'Unexpected exception encountered in query processing. + # Please contact support with the following id: 326566388-63578 (-436445966)'" + # Thus we can try to switch to GET sync request because its response returns obvious error message + standard_instance = self.get_standard_instance() + self.logger.warning("switch to STANDARD(non-BULK) sync. Because the SalesForce BULK job has returned a failed status") + yield from standard_instance.read_records( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ) + return + raise Exception(f"Job for {self.name} stream using BULK API was failed.") count = 0 for count, record in self.download_data(url=job_full_url): @@ -315,6 +337,25 @@ def read_records( # not found a next page data. break + def get_standard_instance(self) -> SalesforceStream: + """Returns a instance of standard logic(non-BULK) with same settings""" + stream_kwargs = dict( + sf_api=self.sf_api, + pk=self.pk, + stream_name=self.stream_name, + schema=self.schema, + sobject_options=self.sobject_options, + authenticator=self.authenticator, + ) + + if isinstance(self, BulkIncrementalSalesforceStream): + stream_kwargs.update({"replication_key": self.replication_key, "start_date": self.start_date}) + new_cls = IncrementalSalesforceStream + else: + new_cls = SalesforceStream + + return new_cls(**stream_kwargs) + class IncrementalSalesforceStream(SalesforceStream, ABC): state_checkpoint_interval = 500 diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/utils.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/utils.py index eacf33298a24..ec73eaceb0ad 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/utils.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/utils.py @@ -3,7 +3,7 @@ # -def filter_streams(streams_list: list, search_word: str, search_criteria: str): +def filter_streams_by_criteria(streams_list: list, search_word: str, search_criteria: str): search_word = search_word.lower() criteria_mapping = { "starts with": lambda stream_name: stream_name.startswith(search_word), diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index 1fab2b1583df..779f748e58fd 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -5,12 +5,13 @@ import csv import io import json +import re from unittest.mock import Mock import pytest import requests_mock from airbyte_cdk.logger import AirbyteLogger -from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type +from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type from requests.exceptions import HTTPError from source_salesforce.api import Salesforce from source_salesforce.source import SourceSalesforce @@ -99,7 +100,7 @@ def stream_api_v2(stream_config): def _generate_stream(stream_name, stream_config, stream_api, state=None): - return SourceSalesforce.generate_streams(stream_config, [stream_name], stream_api, state=state)[0] + return SourceSalesforce.generate_streams(stream_config, {stream_name: None}, stream_api, state=state)[0] def test_bulk_sync_creation_failed(stream_config, stream_api): @@ -217,7 +218,8 @@ def test_bulk_sync_successful_long_response(stream_config, stream_api): @pytest.mark.timeout(17) def test_bulk_sync_successful_retry(stream_config, stream_api): stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) - stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds + stream.DEFAULT_WAIT_TIMEOUT_SECONDS = 6 # maximum wait timeout will be 6 seconds + with requests_mock.Mocker() as m: job_id = _prepare_mock(m, stream) # 2 failed attempts, 3rd one should be successful @@ -231,7 +233,7 @@ def test_bulk_sync_successful_retry(stream_config, stream_api): @pytest.mark.timeout(30) def test_bulk_sync_failed_retry(stream_config, stream_api): stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) - stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds + stream.DEFAULT_WAIT_TIMEOUT_SECONDS = 6 # maximum wait timeout will be 6 seconds with requests_mock.Mocker() as m: job_id = _prepare_mock(m, stream) m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "InProgress", "id": job_id}) @@ -336,8 +338,8 @@ def test_discover_with_streams_criteria_param(streams_criteria, predicted_filter ] } ) - filtered_streams, _ = sf_object.get_validated_streams(config=updated_config) - assert sorted(filtered_streams) == sorted(predicted_filtered_streams) + filtered_streams = sf_object.get_validated_streams(config=updated_config) + assert sorted(filtered_streams.keys()) == sorted(predicted_filtered_streams) def test_check_connection_rate_limit(stream_config): @@ -498,8 +500,8 @@ def test_discover_only_queryable(stream_config): ] } ) - filtered_streams, _ = sf_object.get_validated_streams(config=stream_config) - assert filtered_streams == ["Account"] + filtered_streams = sf_object.get_validated_streams(config=stream_config) + assert list(filtered_streams.keys()) == ["Account"] def test_pagination_rest(stream_config, stream_api): @@ -507,7 +509,7 @@ def test_pagination_rest(stream_config, stream_api): state = {stream_name: {"SystemModstamp": "2122-08-22T05:08:29.000Z"}} stream: SalesforceStream = _generate_stream(stream_name, stream_config, stream_api, state=state) - stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds + stream.DEFAULT_WAIT_TIMEOUT_SECONDS = 6 # maximum wait timeout will be 6 seconds next_page_url = "/services/data/v52.0/query/012345" with requests_mock.Mocker() as m: resp_1 = { @@ -548,14 +550,13 @@ def test_pagination_rest(stream_config, stream_api): def test_csv_reader_dialect_unix(): - stream: BulkSalesforceStream = BulkSalesforceStream(stream_name=None, wait_timeout=None, sf_api=None, pk=None) + stream: BulkSalesforceStream = BulkSalesforceStream(stream_name=None, sf_api=None, pk=None) url = "https://fake-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA" data = [ {"Id": "1", "Name": '"first_name" "last_name"'}, {"Id": "2", "Name": "'" + 'first_name"\n' + "'" + 'last_name\n"'}, - {"Id": "3", "Name": "first_name last_name" + 1024 * 1024 * "e"}, - {"Id": "4", "Name": "first_name last_name"}, + {"Id": "3", "Name": "first_name last_name"}, ] with io.StringIO("", newline="") as csvfile: @@ -571,6 +572,76 @@ def test_csv_reader_dialect_unix(): assert result == data +@pytest.mark.parametrize( + "stream_names,catalog_stream_names,", + ( + ( + ["stream_1", "stream_2"], + None, + ), + ( + ["stream_1", "stream_2"], + ["stream_1", "stream_2"], + ), + ( + ["stream_1", "stream_2", "stream_3"], + ["stream_1"], + ), + ), +) +def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_names) -> None: + sobjects_matcher = re.compile("/sobjects$") + token_matcher = re.compile("/token$") + describe_matcher = re.compile("/describe$") + catalog = None + if catalog_stream_names: + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream(name=catalog_stream_name, json_schema={"type": "object"}), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, + ) + for catalog_stream_name in catalog_stream_names + ] + ) + with requests_mock.Mocker() as m: + m.register_uri("POST", token_matcher, json={"instance_url": "https://fake-url.com", "access_token": "fake-token"}) + m.register_uri( + "GET", + describe_matcher, + json={ + "fields": [ + { + "name": "field", + "type": "string", + } + ] + }, + ) + m.register_uri( + "GET", + sobjects_matcher, + json={ + "sobjects": [ + { + "name": stream_name, + "flag1": True, + "queryable": True, + } + for stream_name in stream_names + ], + }, + ) + streams = SourceSalesforce().streams(config=stream_config, catalog=catalog) + expected_names = catalog_stream_names if catalog else stream_names + assert not set(expected_names).symmetric_difference(set(stream.name for stream in streams)), "doesn't match excepted streams" + + for stream in streams: + assert stream.sobject_options == {"flag1": True, "queryable": True} + return + + def test_csv_field_size_limit(): DEFAULT_CSV_FIELD_SIZE_LIMIT = 1024 * 128 diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 899f208e27d5..af05c2eee268 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -739,6 +739,7 @@ List of available streams: | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:--------------------------------------------------------------------------| +| 0.1.23 | 2022-02-10 | [10141](https://github.com/airbytehq/airbyte/pull/10141) | processing of failed jobs | | 0.1.22 | 2022-02-02 | [10012](https://github.com/airbytehq/airbyte/pull/10012) | Increase CSV field_size_limit | | 0.1.21 | 2022-01-28 | [9499](https://github.com/airbytehq/airbyte/pull/9499) | If a sync reaches daily rate limit it ends the sync early with success status. Read more in `Performance considerations` section | | 0.1.20 | 2022-01-26 | [9757](https://github.com/airbytehq/airbyte/pull/9757) | Parse CSV with "unix" dialect |