Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Salesforce: Add the ability to filter streams #8871

Merged
merged 7 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@ tests:
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
# Discover test is disabled for this connector, because each time it starts, about 700 requests are consumed, and we have a Salesforce limit of 15,000 requests per day.
# discovery:
# - config_path: "secrets/config.json"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
timeout_seconds: 600
- config_path: "secrets/config_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog_bulk.json"
timeout_seconds: 600
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,6 @@
#
# MIT License
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


from .source import SourceSalesforce

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .exceptions import TypeSalesforceException
from .rate_limiting import default_backoff_handler
from .utils import filter_streams

STRING_TYPES = [
"byte",
Expand Down Expand Up @@ -208,19 +209,21 @@ def filter_streams(self, stream_name: str) -> bool:
return False
return True

def get_validated_streams(self, catalog: ConfiguredAirbyteCatalog = None):
def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None):
salesforce_objects = self.describe()["sobjects"]
validated_streams = []
stream_names = [stream_object["name"] for stream_object in salesforce_objects]
if catalog:
streams_for_read = [configured_stream.stream.name for configured_stream in catalog.streams]
return [configured_stream.stream.name for configured_stream in catalog.streams]

for stream_object in salesforce_objects:
stream_name = stream_object["name"]
if catalog and stream_name not in streams_for_read:
continue
if self.filter_streams(stream_name):
validated_streams.append(stream_name)
if config.get("streams_criteria"):
filtered_stream_list = []
for stream_criteria in config["streams_criteria"]:
filtered_stream_list += filter_streams(
streams_list=stream_names, search_word=stream_criteria["value"], search_criteria=stream_criteria["criteria"]
)
stream_names = list(set(filtered_stream_list))

validated_streams = [stream_name for stream_name in stream_names if self.filter_streams(stream_name)]
return validated_streams

@default_backoff_handler(max_tries=5, factor=15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf

def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]:
sf = self._get_sf_object(config)
stream_names = sf.get_validated_streams(catalog=catalog)
stream_names = sf.get_validated_streams(config=config, catalog=catalog)
return self.generate_streams(config, stream_names, sf)

def read(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,37 @@
"enum": ["BULK", "REST"],
"default": "BULK"
},
"streams_criteria": {
"type": "array",
"items": {
"type": "object",
"required": ["criteria", "value"],
"properties": {
"criteria": {
"type": "string",
"title": "Search criteria",
"enum": [
"starts with",
"ends with",
"contains",
"exacts",
"starts not with",
"ends not with",
"not contains",
"not exacts"
],
"default": "contains"
},
"value": {
"type": "string",
"title": "Search value"
}
}
},
"title": "Streams filter criteria",
"description": "Add selection criteria for streams to get only streams that are relevant to you",
"examples": ["https://example1.com", "https://example2.com"]
},
"wait_timeout": {
"title": "Response Waiting Time",
"description": "Maximum wait time of Safesforce responses in minutes. This option is used for the BULK mode only",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


def filter_streams(streams_list: list, search_word: str, search_criteria: str):
search_word = search_word.lower()
criteria_mapping = {
"starts with": lambda stream_name: stream_name.startswith(search_word),
"starts not with": lambda stream_name: not stream_name.startswith(search_word),
"ends with": lambda stream_name: stream_name.endswith(search_word),
"ends not with": lambda stream_name: not stream_name.endswith(search_word),
"contains": lambda stream_name: search_word in stream_name,
"not contains": lambda stream_name: search_word not in stream_name,
"exacts": lambda stream_name: search_word == stream_name,
"not exacts": lambda stream_name: search_word != stream_name,
}
new_streams_list = []
for stream in streams_list:
if criteria_mapping[search_criteria](stream.lower()):
new_streams_list.append(stream)
return new_streams_list
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,30 @@ def test_download_data_filter_null_bytes(stream_bulk_config, stream_bulk_api):
m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00')
res = list(stream.download_data(url=job_full_url))
assert res == [(1, {"Id": "0014W000027f6UwQAI", "IsDeleted": "false"})]


@pytest.mark.parametrize(
"streams_criteria,number_of_filtered_streams",
[
([{"criteria": "exacts", "value": "FakeStream30"}], 1),
([{"criteria": "not exacts", "value": "Fakestream30"}], 99),
([{"criteria": "starts with", "value": "fakeStream3"}], 11),
([{"criteria": "starts not with", "value": "mystream"}], 100),
([{"criteria": "ends with", "value": "stream10"}], 1),
([{"criteria": "ends not with", "value": "0"}], 90),
([{"criteria": "contains", "value": "keStr"}], 100),
([{"criteria": "contains", "value": "eam10"}], 1),
([{"criteria": "not contains", "value": "string"}], 100),
([{"criteria": "not contains", "value": "Mystream"}], 100),
([{"criteria": "not contains", "value": "stream23"}], 99),
],
)
def test_discover_with_streams_criteria_param(streams_criteria, number_of_filtered_streams, stream_rest_config):
updated_config = {**stream_rest_config, **{"streams_criteria": streams_criteria}}
sf_object = Salesforce(**stream_rest_config)
sf_object.login = Mock()
sf_object.access_token = Mock()
sf_object.instance_url = "https://fase-account.salesforce.com"
sf_object.describe = Mock(return_value={"sobjects": [{"name": f"FakeStream{i}"} for i in range(100)]})
filtered_streams = sf_object.get_validated_streams(config=updated_config)
assert len(filtered_streams) == number_of_filtered_streams
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we may check the steams by name instead of just count them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is, you propose to use the same method to filter the names of streams, and in the end compare them with each other, it is strange?
Could you tell us in more detail how you see this implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just suggest to compare filtered_stream == expected_filtered_streams.
E.g. ['stream1', 'stream2'] == ['stream1']
Is it possible?

1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ List of available streams:
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |

| 0.1.10 | 2021-12-?? | [????](https://github.com/airbytehq/airbyte/pull/????) | Add the ability to filter streams by user |
| 0.1.9 | 2021-12-07 | [8405](https://github.com/airbytehq/airbyte/pull/8405) | Filter 'null' byte(s) in HTTP responses |
| 0.1.8 | 2021-11-30 | [8191](https://github.com/airbytehq/airbyte/pull/8191) | Make `start_date` optional and change its format to `YYYY-MM-DD` |
| 0.1.7 | 2021-11-24 | [8206](https://github.com/airbytehq/airbyte/pull/8206) | Handling 400 error when trying to create a job for sync using Bulk API. |
Expand Down