Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add Sftp source and Prefect tasks #1039

Merged
merged 43 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a6bfa95
✨ added sftp source file.
Diego-H-S Sep 3, 2024
692337d
✨ added sftp task file.
Diego-H-S Sep 3, 2024
2afb525
✨ added salesforce flow file.
Diego-H-S Sep 3, 2024
cbff314
✨ added integration test file.
Diego-H-S Sep 3, 2024
2deb23b
✨ added unit test file.
Diego-H-S Sep 4, 2024
851f335
📌 added paramiko package in pyproject.
Diego-H-S Sep 4, 2024
45b1225
📝 updated docstrings.
Diego-H-S Sep 4, 2024
68a73e4
✅ updated unit test file.
Diego-H-S Sep 5, 2024
635c23b
📝 updated commented code.
Diego-H-S Sep 5, 2024
969ee28
sftplist bug related to search patterns, solved and columns packed is…
fdelgadodyvenia Sep 10, 2024
30f4ef9
🐛 fixed a raise error bug.
Diego-H-S Sep 12, 2024
a66be5e
⚡️ removed unusded function.
Diego-H-S Sep 12, 2024
d03a2bc
✅ updated integration test folder.
Diego-H-S Sep 18, 2024
c89265f
📝 updated comments.
Diego-H-S Sep 18, 2024
094637a
📝 added feedback.
Diego-H-S Sep 20, 2024
a5818ee
📝 updated comments.
Diego-H-S Sep 20, 2024
48f7438
🚧 Modified `rsa_key` description
Rafalz13 Sep 25, 2024
492e39b
🔥 Removed `credentials` param
Rafalz13 Sep 25, 2024
3145552
♻️ Changed function name from `_get_file_object_file` to `_get_file_o…
Rafalz13 Sep 25, 2024
fd1712a
♻️ Changed function name from `_list_directory` to `_ls`
Rafalz13 Sep 25, 2024
770140a
♻️ Changed the way of handling file listing and recursive option
Rafalz13 Sep 25, 2024
9aa2314
✅ Cleaned up tests for SFTP
Rafalz13 Sep 25, 2024
e150b0c
🐛 Changed imported class name from `SftpCredentials` to `Sftp`
Rafalz13 Sep 25, 2024
352ee7d
♻️ Adjusted `_ls` function
Rafalz13 Sep 26, 2024
43fd128
✅ Modified unit tests
Rafalz13 Sep 26, 2024
d96c7a7
⚡️ Removed `time.sleep()` from the `sftp` task
Rafalz13 Sep 26, 2024
9984cf0
Merge branch '2.0' into sftp_2.0
Rafalz13 Sep 26, 2024
8fcc8eb
🐛 Added missing comma
Rafalz13 Sep 26, 2024
d35de9f
🚧 Added requirements
Rafalz13 Sep 26, 2024
f26e432
✅ Updated tests and _ls function
Rafalz13 Sep 26, 2024
3e79413
🔥 Removed integration tests for SFTP
Rafalz13 Sep 26, 2024
77fc32a
🎨 Formatted the code
Rafalz13 Sep 26, 2024
bfd205c
🎨 Added `allowlist-secret`
Rafalz13 Sep 26, 2024
08cf5da
Update tests/unit/test_sftp.py
Rafalz13 Sep 27, 2024
97d8700
🎨 Moved `pytest-mock` to dev-dependencies
Rafalz13 Sep 27, 2024
e3185f7
removed code
Rafalz13 Sep 27, 2024
d75c9ff
🐛 Updated `dummy_rsa_key` value
Rafalz13 Sep 27, 2024
e3bc335
✅ Added tests to SFTP source
Rafalz13 Sep 27, 2024
3c3b7f8
Merge branch '2.0' into sftp_2.0
Rafalz13 Sep 27, 2024
0ecd363
🎨 Removed extra lines
Rafalz13 Sep 27, 2024
9a4402d
🚧Added `noqa`
Rafalz13 Sep 27, 2024
6bd3f17
🚧 Added `pragma: allowlist secret`
Rafalz13 Sep 27, 2024
04bf6cc
🔥 Removed `noqa: RUF100, S608`
Rafalz13 Sep 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"pyarrow>=10.0, <10.1.0",
# numpy>=2.0 is not compatible with the old pyarrow v10.x.
"numpy>=1.23.4, <2.0",
"paramiko==2.11.0",
"defusedxml>=0.7.1",
"aiohttp>=3.10.5",
"pytest-mock>=3.14.0",
trymzet marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
9 changes: 9 additions & 0 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ attrs==24.2.0
# via visions
babel==2.16.0
# via mkdocs-material
bcrypt==4.2.0
# via paramiko
beautifulsoup4==4.12.3
# via mkdocs-mermaid2-plugin
# via nbconvert
Expand Down Expand Up @@ -73,6 +75,7 @@ cffi==1.17.0
# via cairocffi
# via cryptography
# via pygit2
# via pynacl
charset-normalizer==3.3.2
# via requests
click==8.1.7
Expand All @@ -97,6 +100,7 @@ croniter==2.0.7
# via prefect
cryptography==43.0.0
# via moto
# via paramiko
# via prefect
cssselect2==0.7.0
# via cairosvg
Expand Down Expand Up @@ -362,6 +366,8 @@ pandas==2.2.2
# via visions
pandocfilters==1.5.1
# via nbconvert
paramiko==2.11.0
# via viadot2
parso==0.8.4
# via jedi
pathspec==0.12.1
Expand Down Expand Up @@ -426,6 +432,8 @@ pymdown-extensions==10.9
# via mkdocs-material
# via mkdocs-mermaid2-plugin
# via mkdocstrings
pynacl==1.5.0
# via paramiko
pyodbc==5.1.0
# via viadot2
pyparsing==3.1.2
Expand Down Expand Up @@ -553,6 +561,7 @@ six==1.16.0
# via bleach
# via jsbeautifier
# via kubernetes
# via paramiko
# via python-dateutil
# via rfc3339-validator
smmap==5.0.1
Expand Down
9 changes: 9 additions & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ attrs==24.2.0
# via jsonschema
# via referencing
# via visions
bcrypt==4.2.0
# via paramiko
beautifulsoup4==4.12.3
# via o365
cachetools==5.5.0
Expand All @@ -52,6 +54,7 @@ certifi==2024.7.4
cffi==1.17.0
# via cryptography
# via pygit2
# via pynacl
charset-normalizer==3.3.2
# via requests
click==8.1.7
Expand All @@ -68,6 +71,7 @@ coolname==2.2.0
croniter==2.0.7
# via prefect
cryptography==43.0.0
# via paramiko
# via prefect
dateparser==1.2.0
# via prefect
Expand Down Expand Up @@ -192,6 +196,8 @@ packaging==24.1
pandas==2.2.2
# via viadot2
# via visions
paramiko==2.11.0
# via viadot2
pathspec==0.12.1
# via prefect
pendulum==2.1.2
Expand Down Expand Up @@ -229,6 +235,8 @@ pygit2==1.14.1
# via viadot2
pygments==2.18.0
# via rich
pynacl==1.5.0
# via paramiko
pyodbc==5.1.0
# via viadot2
pytest==8.3.3
Expand Down Expand Up @@ -315,6 +323,7 @@ shellingham==1.5.4
# via typer
six==1.16.0
# via kubernetes
# via paramiko
# via python-dateutil
# via rfc3339-validator
sniffio==1.3.1
Expand Down
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .outlook_to_adls import outlook_to_adls
from .sap_to_parquet import sap_to_parquet
from .sap_to_redshift_spectrum import sap_to_redshift_spectrum
from .sftp_to_adls import sftp_to_adls
from .sharepoint_to_adls import sharepoint_to_adls
from .sharepoint_to_databricks import sharepoint_to_databricks
from .sharepoint_to_redshift_spectrum import sharepoint_to_redshift_spectrum
Expand Down Expand Up @@ -42,6 +43,7 @@
"outlook_to_adls",
"sap_to_parquet",
"sap_to_redshift_spectrum",
"sftp_to_adls",
"sharepoint_to_adls",
"sharepoint_to_databricks",
"sharepoint_to_redshift_spectrum",
Expand Down
64 changes: 64 additions & 0 deletions src/viadot/orchestration/prefect/flows/sftp_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Download data from a SFTP server to Azure Data Lake Storage."""

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, sftp_to_df


@flow(
name="SFTP extraction to ADLS",
description="Extract data from a SFTP server and "
+ "load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def sftp_to_adls(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
file_name: str | None = None,
sep: str = "\t",
columns: list[str] | None = None,
adls_config_key: str | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
r"""Flow to download data from a SFTP server to Azure Data Lake.

Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
file_name (str, optional): Path to the file in SFTP server. Defaults to None.
sep (str, optional): The separator to use to read the CSV file.
Defaults to "\t".
columns (List[str], optional): Columns to read from the file. Defaults to None.
adls_config_key (str, optional): The key in the viadot config holding
relevant credentials. Defaults to None.
adls_azure_key_vault_secret (str, optional): The name of the Azure Key
Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_path (str, optional): Azure Data Lake destination file path
(with file name). Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to True.
"""
data_frame = sftp_to_df(
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
file_name=file_name,
sep=sep,
columns=columns,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
8 changes: 4 additions & 4 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
from .redshift_spectrum import df_to_redshift_spectrum
from .s3 import s3_upload_file
from .sap_rfc import sap_rfc_to_df
from .sharepoint import (
sharepoint_download_file,
sharepoint_to_df,
)
from .sftp import sftp_list, sftp_to_df
from .sharepoint import sharepoint_download_file, sharepoint_to_df
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df
from .supermetrics import supermetrics_to_df

Expand All @@ -47,6 +45,8 @@
"outlook_to_df",
"s3_upload_file",
"sap_rfc_to_df",
"sftp_list",
"sftp_to_df",
"sharepoint_download_file",
"sharepoint_to_df",
"sql_server_query",
Expand Down
88 changes: 88 additions & 0 deletions src/viadot/orchestration/prefect/tasks/sftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Tasks from SFTP API."""

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import Sftp


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def sftp_to_df(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
file_name: str | None = None,
sep: str = "\t",
columns: list[str] | None = None,
) -> pd.DataFrame:
r"""Querying SFTP server and saving data as the data frame.

Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
file_name (str, optional): Path to the file in SFTP server. Defaults to None.
sep (str, optional): The separator to use to read the CSV file.
Defaults to "\t".
columns (List[str], optional): Columns to read from the file. Defaults to None.

Returns:
pd.DataFrame: The response data as a pandas DataFrame.
"""
if not (azure_key_vault_secret or config_key):
raise MissingSourceCredentialsError

if not config_key:
credentials = get_credentials(azure_key_vault_secret)

sftp = Sftp(
credentials=credentials,
config_key=config_key,
)
sftp.get_connection()

return sftp.to_df(file_name=file_name, sep=sep, columns=columns)


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def sftp_list(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
path: str | None = None,
recursive: bool = False,
matching_path: str | None = None,
) -> list[str]:
"""Listing files in the SFTP server.

Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
path (str, optional): Destination path from where to get the structure.
Defaults to None.
recursive (bool, optional): Get the structure in deeper folders.
Defaults to False.
matching_path (str, optional): Filtering folders to return by a regex pattern.
Defaults to None.

Returns:
files_list (list[str]): List of files in the SFTP server.
"""
if not (azure_key_vault_secret or config_key):
raise MissingSourceCredentialsError

if not config_key:
credentials = get_credentials(azure_key_vault_secret)

sftp = Sftp(
credentials=credentials,
config_key=config_key,
)
sftp.get_connection()

return sftp.get_files_list(
path=path, recursive=recursive, matching_path=matching_path
)
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .hubspot import Hubspot
from .mindful import Mindful
from .outlook import Outlook
from .sftp import Sftp
from .sharepoint import Sharepoint
from .sql_server import SQLServer
from .supermetrics import Supermetrics, SupermetricsCredentials
Expand All @@ -24,6 +25,7 @@
"Genesys",
"Hubspot",
"Mindful",
"Sftp",
"Outlook",
"SQLServer",
"Sharepoint",
Expand Down
Loading
Loading