Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source SalesForce: move to next releaseStage #28243

Closed
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
104cf4b
Source SalesForce: bump airbyte_cdk version
artem1205 Jul 12, 2023
a536748
Source SalesForce: handle bulk API errors
artem1205 Jul 18, 2023
096ed10
Source SalesForce: update docs
artem1205 Jul 18, 2023
8916568
Source SalesForce: bump CDK version
artem1205 Jul 19, 2023
776f0b0
Merge branch 'master' into artem1205/source-salesforce-silver-certifi…
artem1205 Jul 19, 2023
8ffcf41
Source SalesForce: raise config error
artem1205 Jul 21, 2023
9d34456
Source SalesForce: add stream slice test
artem1205 Jul 24, 2023
4ddb464
Source SalesForce: add bulk pagination tests + fixes
artem1205 Jul 24, 2023
bfdc7b6
Source SalesForce: add logging + raise error instead of logs
artem1205 Jul 24, 2023
a864e5e
force use bulk api
artem1205 Jul 24, 2023
efb7337
Update streams.py
sherifnada Jul 24, 2023
95f66c9
Update source.py
sherifnada Jul 24, 2023
32a3a57
Update streams.py
sherifnada Jul 24, 2023
912757f
Update Dockerfile
sherifnada Jul 25, 2023
f45e76e
Merge branch 'master' into artem1205/source-salesforce-silver-certifi…
sherifnada Jul 25, 2023
5af5017
save
sherifnada Jul 25, 2023
8c1d9df
Merge branch 'artem1205/source-salesforce-silver-certification' of gi…
sherifnada Jul 25, 2023
f7fb626
reduce stream slice window from 30 days to 10 days
brianjlai Jul 25, 2023
ae598b6
Source SalesForce: fix start_date in BULK request params
artem1205 Jul 26, 2023
d80a1b5
Merge remote-tracking branch 'origin/artem1205/source-salesforce-silv…
artem1205 Jul 26, 2023
5e5cec4
Source SalesForce: add pattern descriptor
artem1205 Jul 26, 2023
cc3afef
Source SalesForce: update docs
artem1205 Jul 26, 2023
8e05a77
Source SalesForce: update description in spec
artem1205 Jul 26, 2023
c7e14ac
Source SalesForce: update docker base image to 3.9.17-alpine3.18
artem1205 Jul 26, 2023
669d134
Revert "Source SalesForce: update docker base image to 3.9.17-alpine3…
artem1205 Jul 26, 2023
9068516
Source SalesForce: update docs
artem1205 Jul 26, 2023
b18ddce
Source SalesForce: update inapp docs
artem1205 Jul 26, 2023
9f14dc4
reformatting
brianjlai Jul 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ RUN cp /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.1.1
LABEL io.airbyte.version=2.1.2
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.1.1
dockerImageTag: 2.1.2
dockerRepository: airbyte/source-salesforce
githubIssueLabel: source-salesforce
icon: salesforce.svg
license: ELv2
name: Salesforce
registries:
cloud:
dockerImageTag: 2.0.9
enabled: true
oss:
enabled: true
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-salesforce/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "vcrpy==4.1.1", "pandas"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.46", "pandas"]

TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6", "requests_mock", "connector-acceptance-test", "pytest-timeout"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, Describe, IncrementalRestSalesforceStream, RestSalesforceStream

logger = logging.getLogger("airbyte")


class AirbyteStopSync(AirbyteTracedException):
pass
Expand Down Expand Up @@ -59,13 +61,21 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
return True, None

@classmethod
def _get_api_type(cls, stream_name, properties):
def _get_api_type(cls, stream_name: str, properties: dict, force_use_bulk_api: bool) -> str:
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
properties_not_supported_by_bulk = {
key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"]
}
rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
if rest_required:
rest_only = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS
if rest_only:
logger.warning(f"BULK API is not supported for stream: {stream_name}")
return "rest"
if force_use_bulk_api and properties_not_supported_by_bulk:
logger.warning(
f"Following properties will be excluded from stream: {stream_name} due to BULK API limitations: {list(properties_not_supported_by_bulk)}"
)
return "bulk"
if properties_not_supported_by_bulk:
return "rest"
return "bulk"

Expand All @@ -77,15 +87,14 @@ def generate_streams(
sf_object: Salesforce,
) -> List[Stream]:
""" "Generates a list of stream by their names. It can be used for different tests too"""
logger = logging.getLogger()
authenticator = TokenAuthenticator(sf_object.access_token)
stream_properties = sf_object.generate_schemas(stream_objects)
streams = []
for stream_name, sobject_options in stream_objects.items():
streams_kwargs = {"sobject_options": sobject_options}
selected_properties = stream_properties.get(stream_name, {}).get("properties", {})

api_type = cls._get_api_type(stream_name, selected_properties)
api_type = cls._get_api_type(stream_name, selected_properties, config.get("force_use_bulk_api", False))
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
if api_type == "rest":
full_refresh, incremental = RestSalesforceStream, IncrementalRestSalesforceStream
elif api_type == "bulk":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@ connectionSpecification:
- "2021-07-25T00:00:00Z"
format: date-time
order: 5
force_use_bulk_api:
type: boolean
description: Toggle to Bulk API (this might cause empty fields for some streams)
default: false
order: 6
streams_criteria:
type: array
order: 6
order: 7
items:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
import pandas as pd
import pendulum
import requests # type: ignore[import]
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType, SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import Stream, StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_cdk.utils import AirbyteTracedException
from numpy import nan
from pendulum import DateTime # type: ignore[attr-defined]
from requests import codes, exceptions
from requests.models import PreparedRequest

from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .availability_strategy import SalesforceAvailabilityStrategy
Expand Down Expand Up @@ -280,7 +282,6 @@ def _fetch_next_page_for_chunk(


class BulkSalesforceStream(SalesforceStream):
page_size = 15000
DEFAULT_WAIT_TIMEOUT_SECONDS = 86400 # 24-hour bulk job running time
MAX_CHECK_INTERVAL_SECONDS = 2.0
MAX_RETRY_NUMBER = 3
Expand All @@ -291,8 +292,8 @@ def path(self, next_page_token: Mapping[str, Any] = None, **kwargs: Any) -> str:
transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization)

@default_backoff_handler(max_tries=5, factor=15)
def _send_http_request(self, method: str, url: str, json: dict = None, stream: bool = False):
headers = self.authenticator.get_auth_header()
def _send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False):
headers = self.authenticator.get_auth_header() if not headers else headers | self.authenticator.get_auth_header()
response = self._session.request(method, url=url, headers=headers, json=json, stream=stream)
if response.status_code not in [200, 204]:
self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}")
Expand Down Expand Up @@ -347,11 +348,16 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
f"The stream '{self.name}' is not queryable, "
f"sobject options: {self.sobject_options}, error message: '{error_message}'"
)
elif (
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
error.response.status_code == codes.BAD_REQUEST
and error_code == "API_ERROR"
and error_message.startswith("Implementation restriction")
):
message = f"Unable to sync '{self.name}'. To prevent future syncs from failing, ensure the authenticated user has \"View all Data\" permissions."
raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error)
elif error.response.status_code == codes.BAD_REQUEST and error_code == "LIMIT_EXCEEDED":
self.logger.error(
f"Cannot receive data for stream '{self.name}' ,"
f"sobject options: {self.sobject_options}, error message: '{error_message}'"
)
message = "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed."
self.logger.error(message)
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
else:
raise error
else:
Expand All @@ -368,7 +374,20 @@ def wait_for_job(self, url: str) -> str:
# this value was received empirically
time.sleep(0.5)
while pendulum.now() < expiration_time:
job_info = self._send_http_request("GET", url=url).json()
try:
job_info = self._send_http_request("GET", url=url).json()
except exceptions.HTTPError as error:
error_data = error.response.json()[0]
error_code = error_data.get("errorCode")
error_message = error_data.get("message", "")
if (
"We can't complete the action because enabled transaction security policies took too long to complete." in error_message
and error_code == "TXN_SECURITY_METERING_ERROR"
):
message = 'A transient authentication error occurred. To prevent future syncs from failing, assign the "Exempt from Transaction Security" user permission to the authenticated user.'
raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error)
else:
raise error
job_status = job_info["state"]
if job_status in ["JobComplete", "Aborted", "Failed"]:
if job_status != "JobComplete":
Expand Down Expand Up @@ -422,7 +441,7 @@ def filter_null_bytes(self, b: bytes):
self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res))
return res

def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]:
def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dict]:
"""
Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitations.
@ url: string - the url of the `executed_job`
Expand All @@ -431,13 +450,16 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]:
"""
# set filepath for binary data from response
tmp_file = os.path.realpath(os.path.basename(url))
with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response, open(tmp_file, "wb") as data_file:
with closing(self._send_http_request("GET", url, headers={"Accept-Encoding": "gzip"}, stream=True)) as response, open(
tmp_file, "wb"
) as data_file:
response_encoding = response.encoding or self.encoding
response_headers = response.headers
for chunk in response.iter_content(chunk_size=chunk_size):
data_file.write(self.filter_null_bytes(chunk))
# check the file exists
if os.path.isfile(tmp_file):
return tmp_file, response_encoding
return tmp_file, response_encoding, response_headers
else:
raise TmpFileIOError(f"The IO/Error occured while verifying binary data. Stream: {self.name}, file {tmp_file} doesn't exist.")

Expand Down Expand Up @@ -477,24 +499,29 @@ def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return {"next_token": f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' "} # type: ignore[index]
return None

def get_query_select_fields(self) -> str:
return ", ".join(
{
key: value
for key, value in self.get_json_schema().get("properties", {}).items()
if value.get("format") != "base64" and "object" not in value["type"]
}
)

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm
"""

selected_properties = self.get_json_schema().get("properties", {})
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
select_fields = self.get_query_select_fields()
query = f"SELECT {select_fields} FROM {self.name}"
if next_page_token:
query += next_page_token["next_token"]

if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}"
return {"q": query}

def read_records(
Expand Down Expand Up @@ -528,24 +555,19 @@ def read_records(
)
return
raise SalesforceException(f"Job for {self.name} stream using BULK API was failed.")
salesforce_bulk_api_locator = None
while True:
req = PreparedRequest()
req.prepare_url(f"{job_full_url}/results", {"locator": salesforce_bulk_api_locator}) # 'maxRecords': 5
tmp_file, response_encoding, response_headers = self.download_data(url=req.url)
for record in self.read_with_chunks(tmp_file, response_encoding):
yield record

count = 0
record: Mapping[str, Any] = {}
for record in self.read_with_chunks(*self.download_data(url=job_full_url)):
count += 1
yield record
if response_headers.get("Sforce-Locator", "null") == "null":
break
salesforce_bulk_api_locator = response_headers.get("Sforce-Locator")
self.delete_job(url=job_full_url)

if count < self.page_size:
# Salesforce doesn't give a next token or something to know the request was
# the last page. The connectors will sync batches in `page_size` and
# considers that batch is smaller than the `page_size` it must be the last page.
break

next_page_token = self.next_page_token(record)
if not next_page_token:
# not found a next page data.
break
break

def get_standard_instance(self) -> SalesforceStream:
"""Returns a instance of standard logic(non-BULK) with same settings"""
Expand Down Expand Up @@ -632,17 +654,14 @@ def request_params(
select_fields = ",".join(property_chunk.keys())
table_name = self.name
where_conditions = []
order_by_clause = ""

if start_date:
where_conditions.append(f"{self.cursor_field} >= {start_date}")
if end_date:
where_conditions.append(f"{self.cursor_field} < {end_date}")
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
order_by_clause = f"ORDER BY {self.cursor_field} ASC"

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause}"

return {"q": query}

Expand All @@ -662,30 +681,20 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late


class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream):
def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
return None
state_checkpoint_interval = None

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
start_date = max(
(stream_state or {}).get(self.cursor_field, ""),
(stream_slice or {}).get("start_date", ""),
(next_page_token or {}).get("start_date", ""),
)
start_date = max((stream_state or {}).get(self.cursor_field, ""), (stream_slice or {}).get("start_date", ""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should handlet he case where both are "", the default value should be 2017-10-01 for this dev version

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this is an old implementation, we always have a stream slices here and therefore stream_slice.get("start_date") will be enough

end_date = stream_slice["end_date"]

select_fields = ", ".join(self.get_json_schema().get("properties", {}).keys())
select_fields = self.get_query_select_fields()
table_name = self.name
where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"]
order_by_clause = ""

if self.name not in UNSUPPORTED_FILTERING_STREAMS:
order_by_fields = ", ".join([self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field])
order_by_clause = f"ORDER BY {order_by_fields} ASC"

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause}"
return {"q": query}


Expand Down
Loading
Loading