Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source salesforce: processing of failed jobs #10141

Merged
merged 9 commits into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
"dockerImageTag": "0.1.22",
"dockerImageTag": "0.1.23",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce",
"icon": "salesforce.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.22
dockerImageTag: 0.1.23
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
12 changes: 1 addition & 11 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6801,7 +6801,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:0.1.22"
- dockerImage: "airbyte/source-salesforce:0.1.23"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down Expand Up @@ -6879,16 +6879,6 @@
title: "Streams filter criteria"
description: "Add selection criteria for streams to get only streams that\
\ are relevant to you"
wait_timeout:
title: "Response Waiting Time"
description: "Maximum wait time of Salesforce responses in minutes. This\
\ option is used for the BULK mode only. The default wait time of the\
\ Parent Batch in the Bulk Mode to wait for all the batches to finish\
\ processing is 20 minutes."
type: "integer"
minimum: 5
maximum: 60
default: 10
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.22
LABEL io.airbyte.version=0.1.23
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "secrets/config_sandbox.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"

discovery:
- config_path: "secrets/config.json"
basic_read:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

import json
import logging
import re
from pathlib import Path
from typing import Any, Mapping

import pytest
import requests_mock
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from source_salesforce.source import SourceSalesforce

Expand All @@ -20,6 +23,12 @@ def parse_input_config():
return json.loads(file.read())


@pytest.fixture(name="input_sandbox_config")
def parse_input_sandbox_config():
with open(HERE.parent / "secrets/config_sandbox.json", "r") as file:
return json.loads(file.read())


def get_stream(input_config: Mapping[str, Any], stream_name: str) -> Stream:
stream_cls = type("a", (object,), {"name": stream_name})
configured_stream_cls = type("b", (object,), {"stream": stream_cls()})
Expand All @@ -42,3 +51,42 @@ def test_not_queryable_stream(caplog, input_config):

# check logs
assert "is not queryable" in caplog.records[-1].message


@pytest.mark.parametrize(
"stream_name,log_messages",
(
(
"Dashboard",
["switch to STANDARD(non-BULK) sync"],
),
# CategoryNode has access limitation thus SF returns failed job statuses
(
"CategoryNode",
["insufficient access rights on cross-reference id", "switch to STANDARD(non-BULK) sync"],
),
),
ids=["successful_switching", "failed_switching"],
)
def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, stream_name, log_messages):
stream = get_stream(input_sandbox_config, stream_name)
expected_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))

create_query_matcher = re.compile(r"jobs/query$")
job_matcher = re.compile(r"jobs/query/fake_id$")
loaded_record_ids = []
with requests_mock.Mocker(real_http=True) as m:
m.register_uri(
"POST",
create_query_matcher,
json={
"id": "fake_id",
},
)
m.register_uri("GET", job_matcher, json={"state": "Failed", "errorMessage": "unknown error"})
m.register_uri("DELETE", job_matcher, json={})
with caplog.at_level(logging.WARNING):
loaded_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))
for i, log_message in enumerate(log_messages, 1):
assert log_message in caplog.records[-i].message
assert loaded_record_ids == expected_record_ids
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from .exceptions import TypeSalesforceException
from .rate_limiting import default_backoff_handler
from .utils import filter_streams
from .utils import filter_streams_by_criteria

STRING_TYPES = [
"byte",
Expand Down Expand Up @@ -191,7 +191,9 @@ def __init__(
self.access_token = None
self.instance_url = None
self.session = requests.Session()
self.is_sandbox = is_sandbox is True or (isinstance(is_sandbox, str) and is_sandbox.lower() == "true")
self.is_sandbox = is_sandbox in [True, "true"]
if self.is_sandbox:
self.logger.info("using SANDBOX of Salesforce")
self.start_date = start_date

def _get_standard_headers(self):
Expand All @@ -206,30 +208,37 @@ def filter_streams(self, stream_name: str) -> bool:
return False
return True

def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None):
salesforce_objects = self.describe()["sobjects"]
stream_objects = []
for stream_object in salesforce_objects:
def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> Mapping[str, Any]:
"""Selects all validated streams with additional filtering:
1) skip all sobjects with negative value of the flag "queryable"
2) user can set search criterias of necessary streams
3) selection by catalog settings
"""
stream_objects = {}
for stream_object in self.describe()["sobjects"]:
if stream_object["queryable"]:
stream_objects.append(stream_object)
stream_objects[stream_object.pop("name")] = stream_object
else:
self.logger.warn(f"Stream {stream_object['name']} is not queryable and will be ignored.")

stream_names = [stream_object["name"] for stream_object in stream_objects]
if catalog:
return [configured_stream.stream.name for configured_stream in catalog.streams], stream_objects
return {
configured_stream.stream.name: stream_objects[configured_stream.stream.name]
for configured_stream in catalog.streams
if configured_stream.stream.name in stream_objects
}

stream_names = list(stream_objects.keys())
if config.get("streams_criteria"):
filtered_stream_list = []
for stream_criteria in config["streams_criteria"]:
filtered_stream_list += filter_streams(
filtered_stream_list += filter_streams_by_criteria(
streams_list=stream_names, search_word=stream_criteria["value"], search_criteria=stream_criteria["criteria"]
)
stream_names = list(set(filtered_stream_list))

validated_streams = [stream_name for stream_name in stream_names if self.filter_streams(stream_name)]
validated_stream_objects = [stream_object for stream_object in stream_objects if stream_object["name"] in validated_streams]
return validated_streams, validated_stream_objects
return {stream_name: sobject_options for stream_name, sobject_options in stream_objects.items() if stream_name in validated_streams}

@default_backoff_handler(max_tries=5, factor=15)
def _make_request(
Expand Down Expand Up @@ -261,20 +270,20 @@ def login(self):
self.access_token = auth["access_token"]
self.instance_url = auth["instance_url"]

def describe(self, sobject: str = None, stream_objects: List = None) -> Mapping[str, Any]:
def describe(self, sobject: str = None, sobject_options: Mapping[str, Any] = None) -> Mapping[str, Any]:
"""Describes all objects or a specific object"""
headers = self._get_standard_headers()

endpoint = "sobjects" if not sobject else f"sobjects/{sobject}/describe"

url = f"{self.instance_url}/services/data/{self.version}/{endpoint}"
resp = self._make_request("GET", url, headers=headers)
if resp.status_code == 404:
self.logger.error(f"Filtered stream objects: {stream_objects}")
if resp.status_code == 404 and sobject:
self.logger.error(f"not found a description for the sobject '{sobject}'. Sobject options: {sobject_options}")
return resp.json()

def generate_schema(self, stream_name: str = None, stream_objects: List = None) -> Mapping[str, Any]:
response = self.describe(stream_name, stream_objects)
def generate_schema(self, stream_name: str = None, stream_options: Mapping[str, Any] = None) -> Mapping[str, Any]:
response = self.describe(stream_name, stream_options)
schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}}
for field in response["fields"]:
schema["properties"][field["name"]] = self.field_to_property_schema(field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,18 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
def generate_streams(
cls,
config: Mapping[str, Any],
stream_names: List[str],
stream_objects: Mapping[str, Any],
sf_object: Salesforce,
state: Mapping[str, Any] = None,
stream_objects: List = None,
) -> List[Stream]:
""" "Generates a list of stream by their names. It can be used for different tests too"""
authenticator = TokenAuthenticator(sf_object.access_token)
streams = []
for stream_name in stream_names:
streams_kwargs = {}
for stream_name, sobject_options in stream_objects.items():
streams_kwargs = {"sobject_options": sobject_options}
stream_state = state.get(stream_name, {}) if state else {}

selected_properties = sf_object.generate_schema(stream_name, stream_objects).get("properties", {})
selected_properties = sf_object.generate_schema(stream_name, sobject_options).get("properties", {})
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
properties_not_supported_by_bulk = {
key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"]
Expand All @@ -63,7 +62,6 @@ def generate_streams(
else:
# Use BULK API
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
streams_kwargs["wait_timeout"] = config.get("wait_timeout")

json_schema = sf_object.generate_schema(stream_name, stream_objects)
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
Expand All @@ -77,8 +75,8 @@ def generate_streams(

def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None, state: Mapping[str, Any] = None) -> List[Stream]:
sf = self._get_sf_object(config)
stream_names, stream_objects = sf.get_validated_streams(config=config, catalog=catalog)
return self.generate_streams(config, stream_names, sf, state=state, stream_objects=stream_objects)
stream_objects = sf.get_validated_streams(config=config, catalog=catalog)
return self.generate_streams(config, stream_objects, sf, state=state)

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Salesforce Source Spec",
"type": "object",
"required": ["client_id", "client_secret", "refresh_token"],
"required": [
"client_id",
"client_secret",
"refresh_token"
],
"additionalProperties": true,
"properties": {
"auth_type": {
Expand Down Expand Up @@ -33,7 +37,10 @@
"description": "Date in the format 2017-01-25. Any data before this date will not be replicated. This field uses the \"updated\" field if available, otherwise the \"created\" fields if they are available for a stream. If not set, then by default all your data is replicated.",
"type": "string",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z|[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"examples": ["2021-07-25", "2021-07-25T00:00:00Z"]
"examples": [
"2021-07-25",
"2021-07-25T00:00:00Z"
]
},
"is_sandbox": {
"title": "Sandbox",
Expand All @@ -45,7 +52,10 @@
"type": "array",
"items": {
"type": "object",
"required": ["criteria", "value"],
"required": [
"criteria",
"value"
],
"properties": {
"criteria": {
"type": "string",
Expand All @@ -70,20 +80,14 @@
},
"title": "Streams filter criteria",
"description": "Add selection criteria for streams to get only streams that are relevant to you"
},
"wait_timeout": {
"title": "Response Waiting Time",
"description": "Maximum wait time of Salesforce responses in minutes. This option is used for the BULK mode only. The default wait time of the Parent Batch in the Bulk Mode to wait for all the batches to finish processing is 20 minutes.",
"type": "integer",
"minimum": 5,
"maximum": 60,
"default": 10
}
}
},
"advanced_auth": {
"auth_flow_type": "oauth2.0",
"predicate_key": ["auth_type"],
"predicate_key": [
"auth_type"
],
"predicate_value": "Client",
"oauth_config_specification": {
"oauth_user_input_from_connector_config_specification": {
Expand All @@ -92,7 +96,9 @@
"properties": {
"is_sandbox": {
"type": "boolean",
"path_in_connector_config": ["is_sandbox"]
"path_in_connector_config": [
"is_sandbox"
]
}
}
},
Expand All @@ -102,7 +108,9 @@
"properties": {
"refresh_token": {
"type": "string",
"path_in_connector_config": ["refresh_token"]
"path_in_connector_config": [
"refresh_token"
]
}
}
},
Expand All @@ -124,11 +132,15 @@
"properties": {
"client_id": {
"type": "string",
"path_in_connector_config": ["client_id"]
"path_in_connector_config": [
"client_id"
]
},
"client_secret": {
"type": "string",
"path_in_connector_config": ["client_secret"]
"path_in_connector_config": [
"client_secret"
]
}
}
}
Expand Down
Loading