Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use apache-airflow-providers-openlineage, bump min Airflow version to 2.7 and Python version to 3.8 #2103

Merged
merged 6 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 2 additions & 85 deletions .github/workflows/ci-python-sdk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -538,96 +538,14 @@ jobs:
AZURE_WASB_CONN_STRING: ${{ secrets.AZURE_WASB_CONN_STRING }}
AZURE_WASB_ACCESS_KEY: ${{ secrets.AZURE_WASB_ACCESS_KEY }}

Run-example-dag-tests-Airflow-2-2-5:
if: >-
github.event_name == 'push' ||
(
github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.fork == false
) ||
(
github.event_name == 'pull_request_target' &&
contains(github.event.pull_request.labels.*.name, 'safe to test')
)||
(
github.event_name == 'release'
)
runs-on: ubuntu-latest
services:
postgres:
# Docker Hub image
image: dimberman/pagila-test
env:
POSTGRES_PASSWORD: postgres
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
sftp:
image: ghcr.io/astronomer/astro-sdk/sftp_docker
ports:
- 2222:22
ftp:
image: ghcr.io/astronomer/astro-sdk/ftp_docker
ports:
- 21:21
- 30000-30009:30000-30009
env:
FTP_USER_NAME: ${{ secrets.SFTP_USERNAME }}
FTP_USER_PASS: ${{ secrets.SFTP_PASSWORD }}
FTP_USER_HOME: /home/foo
PUBLICHOST: "localhost"
steps:
- uses: actions/checkout@v3
if: github.event_name != 'pull_request_target'

- name: Checkout pull/${{ github.event.number }}
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }}
if: github.event_name == 'pull_request_target'

- uses: actions/setup-python@v3
with:
python-version: '3.8'
architecture: 'x64'
- uses: actions/cache@v3
with:
path: |
~/.cache/pip
.nox
key: ${{ runner.os }}-2.2.5-${{ hashFiles('python-sdk/pyproject.toml') }}-${{ hashFiles('python-sdk/src/astro/__init__.py') }}
- run: cat ../.github/ci-test-connections.yaml > test-connections.yaml
- run: python -c 'import os; print(os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON", "").strip())' > ${{ env.GOOGLE_APPLICATION_CREDENTIALS }}
- run: sqlite3 /tmp/sqlite_default.db "VACUUM;"
- run: pip3 install nox
- run: nox -s "test-3.8(airflow='2.2.5')" -- "tests_integration/test_example_dags.py" "tests_integration/integration_test_dag.py" -k "not redshift"
env:
GOOGLE_APPLICATION_CREDENTIALS_JSON: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}
GOOGLE_APPLICATION_CREDENTIALS: /tmp/google_credentials.json
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
REDSHIFT_NATIVE_LOAD_IAM_ROLE_ARN: ${{ secrets.REDSHIFT_NATIVE_LOAD_IAM_ROLE_ARN }}
REDSHIFT_USERNAME: ${{ secrets.REDSHIFT_USERNAME }}
REDSHIFT_PASSWORD: ${{ secrets.REDSHIFT_PASSWORD }}
SNOWFLAKE_ACCOUNT_NAME: ${{ secrets.SNOWFLAKE_UNAME }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
AIRFLOW__ASTRO_SDK__DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}
AZURE_WASB_CONN_STRING: ${{ secrets.AZURE_WASB_CONN_STRING }}
AZURE_WASB_ACCESS_KEY: ${{ secrets.AZURE_WASB_ACCESS_KEY }}
pankajkoti marked this conversation as resolved.
Show resolved Hide resolved

Generate-Constraints:
if: (github.event_name == 'release' || github.event_name == 'push')
strategy:
fail-fast: false
matrix:
python: [ '3.7', '3.8', '3.9', '3.10' ]
airflow: [ '2.2.5', '2.3', '2.4', '2.5', '2.6', '2.7', '2.8']
python: [ '3.8', '3.9', '3.10', '3.11' ]
airflow: [ '2.7', '2.8']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -724,7 +642,6 @@ jobs:
name: Build and publish Python 🐍 distributions 📦 to PyPI
needs:
- Run-Unit-tests-Airflow-2-8
- Run-example-dag-tests-Airflow-2-2-5
- Run-Integration-tests-Airflow-2-8
- Run-load-file-Integration-Airflow-2-8
- Run-example-dag-Integration-Airflow-2-8
Expand Down
31 changes: 7 additions & 24 deletions python-sdk/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,17 @@ def dev(session: nox.Session) -> None:
session.install("-e", ".[all,tests]")


@nox.session(python=["3.7", "3.8", "3.9", "3.10", "3.11"])
@nox.parametrize("airflow", ["2.2.5", "2.4", "2.5", "2.6", "2.7", "2.8"])
@nox.session(python=["3.8", "3.9", "3.10", "3.11"])
@nox.parametrize("airflow", ["2.7", "2.8"])
def test(session: nox.Session, airflow) -> None:
"""Run both unit and integration tests."""
env = {
"AIRFLOW_HOME": f"~/airflow-{airflow}-python-{session.python}",
"AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES": "airflow\\.* astro\\.*",
}

if airflow == "2.2.5":
env[
"AIRFLOW__CORE__XCOM_BACKEND"
] = "astro.custom_backend.astro_custom_backend.AstroCustomXcomBackend"
env["AIRFLOW__ASTRO_SDK__STORE_DATA_LOCAL_DEV"] = "True"

# If you need a pinned version of a provider to be present in a nox session then
# update the constraints file used below with that version of provider
# For example as part of MSSQL support we need apache-airflow-providers-microsoft-mssql>=3.2 and this
# has been updated in the below constraint file.
session.install(f"apache-airflow=={airflow}", "-c", "tests/modified_constraint_file.txt")
session.install("-e", ".[all,tests]", "-c", "tests/modified_constraint_file.txt")
session.install("apache-airflow-providers-common-sql==1.2.0")
# install smart-open 6.3.0 since it has FTP implementation
session.install("smart-open>=6.3.0")
else:
env["AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES"] = "airflow\\.* astro\\.*"

session.install(f"apache-airflow~={airflow}")
session.install("-e", ".[all,tests]")
session.install(f"apache-airflow~={airflow}")
session.install("-e", ".[all,tests]")

# Log all the installed dependencies
session.log("Installed Dependencies:")
Expand Down Expand Up @@ -150,8 +133,8 @@ def build_docs(session: nox.Session) -> None:
session.run("make", "html")


@nox.session(python=["3.7", "3.8", "3.9", "3.10", "3.11"])
@nox.parametrize("airflow", ["2.2.5", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8"])
@nox.session(python=["3.8", "3.9", "3.10", "3.11"])
@nox.parametrize("airflow", ["2.7", "2.8"])
def generate_constraints(session: nox.Session, airflow) -> None:
"""Generate constraints file"""
session.install("wheel")
Expand Down
10 changes: 4 additions & 6 deletions python-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ authors = [
readme = "README.md"
license = { file = "LICENSE" }

requires-python = ">=3.7"
requires-python = ">=3.8"
dependencies = [
"apache-airflow>=2.0",
"apache-airflow>=2.7",
"attrs>=20.3.0",
"pandas",
"pyarrow",
"python-frontmatter",
"smart-open",
"SQLAlchemy>=1.3.18",
"cached_property>=1.5.0;python_version<='3.7'",
"Flask-Session<0.6.0" # This release breaking our tests, let's pin it as a temporary workaround
]

Expand All @@ -36,7 +35,6 @@ classifiers = [
"Framework :: Apache Airflow",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
Expand Down Expand Up @@ -87,7 +85,7 @@ ftp = [
"apache-airflow-providers-ftp>=3.0.0",
"smart-open>=5.2.1",
]
openlineage = ["openlineage-airflow>=0.17.0"]
openlineage = ["apache-airflow-providers-openlineage>=1.4.0"]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the OpenLineage provider only supports Airflow versions greater than or equal to 2.7.0.
For compatibility with older Airflow versions, wouldn't there be dependency conflict on strict requirement with [openlineage] extra?

Copy link
Contributor

@pankajkoti pankajkoti Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks a lot @JDarDagran for the suggestion

Bumping up min Airflow version to 2.7 and min Python version to 3.8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of the airflow provider has apache-airflow>=2.6.0 as a dependency. hopping bumping 2.7.0 will not impact users

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will release a minor version of Astro SDK and it should not cause a problem IMHO. If users would like to use the new minor version, they would need to have Airflow 2.7. For earlier versions, they can continue using prior versions of Astro SDK.

Similar fashion is followed in Airflow OSS providers when bumping versions for dependencies

Copy link
Contributor

@pankajkoti pankajkoti Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides we cannot have apache-airflow-providers-openlineage without bumping Airflow to 2.7


databricks = [
"databricks-cli",
Expand Down Expand Up @@ -125,7 +123,7 @@ all = [
"databricks-sql-connector<2.9.0",
"s3fs",
"protobuf",
"openlineage-airflow>=0.17.0",
"apache-airflow-providers-openlineage>=1.4.0",
"apache-airflow-providers-microsoft-azure",
"azure-storage-blob",
"apache-airflow-providers-microsoft-mssql>=3.2",
Expand Down
4 changes: 1 addition & 3 deletions python-sdk/src/astro/lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
log = logging.getLogger(__name__)

try:
from openlineage.airflow.extractors import TaskMetadata
from openlineage.airflow.extractors.base import BaseExtractor, OperatorLineage
from openlineage.airflow.utils import get_job_name
from airflow.providers.openlineage.extractors import OperatorLineage
from openlineage.client.facet import (
BaseFacet,
DataQualityMetricsInputDatasetFacet,
Expand Down
25 changes: 8 additions & 17 deletions python-sdk/tests_integration/extractors/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import pendulum
import pytest
from airflow.models.taskinstance import TaskInstance
from airflow.providers.openlineage.extractors.base import DefaultExtractor
from airflow.providers.openlineage.extractors.manager import ExtractorManager
from airflow.utils import timezone
from openlineage.airflow.extractors import Extractors
from openlineage.airflow.extractors.base import DefaultExtractor
from openlineage.client.facet import DataQualityMetricsInputDatasetFacet, OutputStatisticsOutputDatasetFacet
from openlineage.client.run import Dataset as OpenlineageDataset

Expand Down Expand Up @@ -114,14 +114,13 @@ def test_python_sdk_load_file_extract_on_complete(mock_xcom_pull):
tzinfo = pendulum.timezone("UTC")
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=load_file_operator, run_id=execution_date)
python_sdk_extractor = Extractors().get_extractor_class(LoadFileOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(LoadFileOperator)
assert python_sdk_extractor is DefaultExtractor

task_meta_extract = python_sdk_extractor(load_file_operator).extract()
assert task_meta_extract is None

task_meta = python_sdk_extractor(load_file_operator).extract_on_complete(task_instance=task_instance)
assert task_meta.name == f"adhoc_airflow.{task_id}"
assert task_meta.inputs[0].facets["input_file_facet"] == INPUT_STATS[0].facets["input_file_facet"]
assert task_meta.job_facets == {}
assert task_meta.run_facets == {}
Expand Down Expand Up @@ -156,12 +155,11 @@ def test_python_sdk_export_file_extract_on_complete():
)

task_instance = TaskInstance(task=export_file_operator)
python_sdk_extractor = Extractors().get_extractor_class(ExportToFileOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(ExportToFileOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(export_file_operator).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(export_file_operator).extract_on_complete(task_instance=task_instance)
assert task_meta.name == f"adhoc_airflow.{task_id}"
assert (
task_meta.inputs[0].facets["dataQualityMetrics"]
== INPUT_STATS_FOR_EXPORT_FILE[0].facets["dataQualityMetrics"]
Expand All @@ -179,7 +177,6 @@ def test_append_op_extract_on_complete():
"""
Test extractor ``extract_on_complete`` get called and collect lineage for append operator
"""
task_id = "append_table"

src_table_operator = LoadFileOperator(
task_id="load_file",
Expand All @@ -203,12 +200,11 @@ def test_append_op_extract_on_complete():
tzinfo = pendulum.timezone("UTC")
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=op, run_id=execution_date)
python_sdk_extractor = Extractors().get_extractor_class(AppendOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(AppendOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(op).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(op).extract_on_complete(task_instance=task_instance)
assert task_meta.name == f"adhoc_airflow.{task_id}"
assert task_meta.inputs[0].name == f"astronomer-dag-authoring.astronomer-dag-authoring.{src_table.name}"
assert task_meta.inputs[0].namespace == "bigquery"
assert task_meta.inputs[0].facets is not None
Expand All @@ -221,7 +217,6 @@ def test_merge_op_extract_on_complete():
"""
Test extractor ``extract_on_complete`` get called and collect lineage for merge operator
"""
task_id = "merge"
src_table_operator = LoadFileOperator(
task_id="load_file",
input_file=File(path="gs://astro-sdk/workspace/sample_pattern.csv", filetype=FileType.CSV),
Expand All @@ -246,13 +241,12 @@ def test_merge_op_extract_on_complete():
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=op, run_id=execution_date)

python_sdk_extractor = Extractors().get_extractor_class(MergeOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(MergeOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(op).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(op).extract_on_complete(task_instance=task_instance)

assert task_meta.name == f"adhoc_airflow.{task_id}"
assert task_meta.inputs[0].name == f"astronomer-dag-authoring.astro.{src_table.name}"
assert task_meta.inputs[0].namespace == "bigquery"
assert task_meta.inputs[0].facets is not None
Expand All @@ -277,7 +271,6 @@ def test_python_sdk_transform_extract_on_complete():
imdb_table = load_file.execute(context=create_context(load_file))

output_table = Table(name="test_name", conn_id="gcp_conn", metadata=Metadata(schema="astro"))
task_id = "top_five_animations"

@aql.transform
def top_five_animations(input_table: Table) -> str:
Expand All @@ -290,12 +283,11 @@ def top_five_animations(input_table: Table) -> str:
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=task.operator, run_id=execution_date)

python_sdk_extractor = Extractors().get_extractor_class(TransformOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(TransformOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(task.operator).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(task.operator).extract_on_complete(task_instance=task_instance)
assert task_meta.name == f"adhoc_airflow.{task_id}"
source_code = task_meta.job_facets.get("sourceCode")
# check for transform code return is present in source code facet.
validate_string = """return "SELECT title, rating FROM {{ input_table }} LIMIT 5;"""
Expand Down Expand Up @@ -343,12 +335,11 @@ def aggregate_data(df: pd.DataFrame):
tzinfo = pendulum.timezone("UTC")
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=task[0].operator, run_id=execution_date)
python_sdk_extractor = Extractors().get_extractor_class(DataframeOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(DataframeOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(task[0].operator).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(task[0].operator).extract_on_complete(task_instance=task_instance)
assert task_meta.name == "adhoc_airflow.aggregate_data"
assert task_meta.outputs[0].facets["schema"].fields[0].name == test_schema_name
assert task_meta.outputs[0].facets["schema"].fields[0].type == test_db_name
assert task_meta.outputs[0].facets["dataSource"].name == test_tbl_name
Expand Down
Loading