Skip to content

Commit

Permalink
Source S3: keep processing but warn if OSError happen (#21604)
Browse files Browse the repository at this point in the history
* Source S3: keep processing but warn if OSError happen

* Source S3: bump version and update changelog

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
1 parent 0b97ce3 commit 04a77ad
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.28
dockerImageTag: 0.1.29
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12646,7 +12646,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.28"
- dockerImage: "airbyte/source-s3:0.1.29"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.version=0.1.29
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.231.175:9000"
"endpoint": "http://10.0.40.43:9000"
},
"format": {
"filetype": "csv",
Expand Down
13 changes: 11 additions & 2 deletions airbyte-integrations/connectors/source-s3/source_s3/s3file.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,17 @@ def open(self, binary: bool) -> Iterator[Union[TextIO, BinaryIO]]:
config = ClientConfig(signature_version=UNSIGNED)
params = {"client": make_s3_client(self._provider, config=config)}
self.logger.debug(f"try to open {self.file_info}")
result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode)

# There are rare cases when some keys become unreachable during sync
# and we don't know about it, because catalog has been initially formed only once at the beginning
# This is happen for example if a file was deleted/moved (or anything else) while we proceed with another file
try:
result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode)
except OSError as e:
self.logger.warn(
f"We don't have access to {self.url}. "
f"Check whether key {self.url} exists in `{bucket}` bucket and/or has proper ACL permissions"
)
raise e
# see https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager for why we do this
try:
yield result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,12 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
continue

storagefile = self.storagefile_class(file_info, self._provider)
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f, file_info)
processed_files.append(file_info)
try:
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f, file_info)
processed_files.append(file_info)
except OSError:
continue

if this_schema == master_schema:
continue # exact schema match so go to next file
Expand Down Expand Up @@ -348,18 +351,21 @@ def _read_from_slice(
"""
for file_item in stream_slice["files"]:
storage_file: StorageFile = file_item["storage_file"]
with storage_file.open(file_reader.is_binary) as f:
# TODO: make this more efficient than mutating every record one-by-one as they stream
for record in file_reader.stream_records(f, storage_file.file_info):
schema_matched_record = self._match_target_schema(record, list(self._get_schema_map().keys()))
complete_record = self._add_extra_fields_from_map(
schema_matched_record,
{
self.ab_last_mod_col: datetime.strftime(storage_file.last_modified, self.datetime_format_string),
self.ab_file_name_col: storage_file.url,
},
)
yield complete_record
try:
with storage_file.open(file_reader.is_binary) as f:
# TODO: make this more efficient than mutating every record one-by-one as they stream
for record in file_reader.stream_records(f, storage_file.file_info):
schema_matched_record = self._match_target_schema(record, list(self._get_schema_map().keys()))
complete_record = self._add_extra_fields_from_map(
schema_matched_record,
{
self.ab_last_mod_col: datetime.strftime(storage_file.last_modified, self.datetime_format_string),
self.ab_file_name_col: storage_file.url,
},
)
yield complete_record
except OSError:
continue
LOGGER.info("finished reading a stream slice")

def read_records(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer |
| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format |
| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format |
| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option |
Expand Down

0 comments on commit 04a77ad

Please sign in to comment.