Skip to content

Commit

Permalink
🎉 Source File: cache binary stream to file (#15501)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored and girarda committed Aug 11, 2022
1 parent 7024ecc commit e1e4551
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@
- name: File
sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerRepository: airbyte/source-file
dockerImageTag: 0.2.16
dockerImageTag: 0.2.17
documentationUrl: https://docs.airbyte.io/integrations/sources/file
icon: file.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2255,7 +2255,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-file:0.2.16"
- dockerImage: "airbyte/source-file:0.2.17"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/file"
connectionSpecification:
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-file-secure/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM airbyte/source-file:0.2.16
FROM airbyte/source-file:0.2.17

WORKDIR /airbyte/integration_code
COPY source_file_secure ./source_file_secure
Expand All @@ -9,5 +9,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.16
LABEL io.airbyte.version=0.2.17
LABEL io.airbyte.name=airbyte/source-file-secure
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_file ./source_file
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.16
LABEL io.airbyte.version=0.2.17
LABEL io.airbyte.name=airbyte/source-file
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-file/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"pandas==1.4.3",
"paramiko==2.11.0",
"s3fs==2022.7.1",
"boto3==1.21.21",
"smart-open[all]==6.0.0",
"lxml==4.9.1",
"html5lib==1.1",
Expand All @@ -23,7 +24,7 @@
"pyxlsb==1.0.9",
]

TEST_REQUIREMENTS = ["boto3==1.21.21", "pytest==7.1.2", "pytest-docker==1.0.0", "pytest-mock~=3.8.2"]
TEST_REQUIREMENTS = ["pytest~=6.2", "pytest-docker==1.0.0", "pytest-mock~=3.6.1"]

setup(
name="source_file",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import json
import tempfile
import traceback
from os import environ
from typing import Iterable
Expand Down Expand Up @@ -352,10 +353,20 @@ def read(self, fields: Iterable = None) -> Iterable[dict]:
yield from df[columns].to_dict(orient="records")
else:
fields = set(fields) if fields else None
if self.binary_source:
fp = self._cache_stream(fp)
for df in self.load_dataframes(fp):
columns = fields.intersection(set(df.columns)) if fields else df.columns
df = df.where(pd.notnull(df), None)
yield from df[columns].to_dict(orient="records")
yield from df[list(columns)].to_dict(orient="records")

def _cache_stream(self, fp):
"""cache stream to file"""
fp_tmp = tempfile.TemporaryFile(mode="w+b")
fp_tmp.write(fp.read())
fp_tmp.seek(0)
fp.close()
return fp_tmp

def _stream_properties(self, fp):
if self._reader_format == "yaml":
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ In order to read large files from a remote location, this connector uses the [sm

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|---------------------------------------------------|
| 0.2.17 | 2022-08-11 | [15501](https://github.com/airbytehq/airbyte/pull/15501) | Cache binary stream to file |
| 0.2.16 | 2022-08-10 | [15293](https://github.com/airbytehq/airbyte/pull/15293) | added support for encoding reader option |
| 0.2.15 | 2022-08-05 | [15269](https://github.com/airbytehq/airbyte/pull/15269) | Bump `smart-open` version to 6.0.0 |
| 0.2.12 | 2022-07-12 | [14535](https://github.com/airbytehq/airbyte/pull/14535) | Fix invalid schema generation for JSON files |
Expand Down

0 comments on commit e1e4551

Please sign in to comment.