Skip to content

Commit

Permalink
Merge branch 'main' into vincbeck/fab_provider
Browse files Browse the repository at this point in the history
  • Loading branch information
vincbeck committed Dec 6, 2023
2 parents b0934b1 + fba682b commit 2d707f4
Show file tree
Hide file tree
Showing 74 changed files with 2,101 additions and 1,367 deletions.
78 changes: 38 additions & 40 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ jobs:
default-constraints-branch: ${{ steps.selective-checks.outputs.default-constraints-branch }}
docs-list-as-string: ${{ steps.selective-checks.outputs.docs-list-as-string }}
skip-pre-commits: ${{ steps.selective-checks.outputs.skip-pre-commits }}
providers-compatibility-checks: ${{ steps.selective-checks.outputs.providers-compatibility-checks }}
helm-test-packages: ${{ steps.selective-checks.outputs.helm-test-packages }}
debug-resources: ${{ steps.selective-checks.outputs.debug-resources }}
runs-on: ${{steps.selective-checks.outputs.runs-on}}
Expand Down Expand Up @@ -780,71 +781,68 @@ jobs:
breeze release-management prepare-provider-packages --skip-tag-check
--package-format wheel ${{ needs.build-info.outputs.affected-providers-list-as-string }}
provider-airflow-compatibility-check:
providers-compatibility-checks:
timeout-minutes: 80
name: "Providers Airflow 2.5 compatibility check"
name: >
Compat ${{matrix.airflow-version}}:P${{matrix.python-version}} provider check
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
strategy:
fail-fast: false
matrix:
include: ${{fromJson(needs.build-info.outputs.providers-compatibility-checks)}}
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}"
PYTHON_MAJOR_MINOR_VERSION: "${{matrix.python-version}}"
VERSION_SUFFIX_FOR_PYPI: "dev0"
if: needs.build-info.outputs.skip-provider-tests != 'true'
steps:
- name: Cleanup repo
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
if: contains(fromJson(needs.build-info.outputs.python-versions),matrix.python-version)
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v4
with:
persist-credentials: false
- name: >
Prepare breeze & CI image: ${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
Prepare breeze & CI image: ${{matrix.python-version}}:${{env.IMAGE_TAG}}
uses: ./.github/actions/prepare_breeze_and_image
- name: "Cleanup dist files"
run: rm -fv ./dist/*
- name: "Prepare provider packages: wheel"
run: >
breeze release-management prepare-provider-packages --version-suffix-for-pypi dev0
breeze release-management prepare-provider-packages
--package-format wheel ${{ needs.build-info.outputs.affected-providers-list-as-string }}
- name: "Fix incompatible 2.5 provider packages"
- name: >
Remove incompatible Airflow
${{matrix.airflow-version}}:Python ${{matrix.python-version}} provider packages
run: |
# This step should remove the provider packages that are not compatible with 2.5
# or replace them with 2.5 compatible versions. Sometimes we have good reasons to bump
# the min airflow versions for some providers and then we need to add exclusions here.
#
# The Removal can be done with:
#
# rm -vf dist/apache_airflow_providers_<PROVIDER>*.whl
#
# Then it can be followed by downloading a compatible version from PyPI in case other
# providers depend on it and fail with import errors (you need to download compatible version):
#
# pip download --no-deps --dest dist apache-airflow-providers-<PROVIDER>==3.1.0
#
rm -vf dist/apache_airflow_providers_openlineage*.whl
rm -rf dist/apache_airflow_providers_common_io*.whl
rm -rf dist/apache_airflow_providers_fab*.whl
- name: "Get all provider extras as AIRFLOW_EXTRAS env variable"
# Extras might be different on S3 so rather than relying on "all" we should get the list of
# packages to be installed from the current provider_dependencies.json file
rm -vf ${{ matrix.remove-providers }}
working-directory: ./dist
if: matrix.remove-providers != ''
- name: "Checkout ${{matrix.airflow-version}} of Airflow"
uses: actions/checkout@v4
with:
persist-credentials: false
ref: ${{matrix.airflow-version}}
path: old-airflow
- name: "Prepare airflow package: wheel"
run: |
python -c 'from pathlib import Path; import json
providers = json.loads(Path("generated/provider_dependencies.json").read_text())
provider_keys = ",".join(providers.keys())
print("AIRFLOW_EXTRAS={}".format(provider_keys))' >> $GITHUB_ENV
- name: "Install and verify all provider packages and airflow on Airflow 2.5 files"
pip install pip==23.3.1 wheel==0.36.2 gitpython==3.1.40
python setup.py egg_info --tag-build ".dev0" bdist_wheel -d ../dist
working-directory: ./old-airflow
- name: >
Install and verify all provider packages and airflow on
Airflow ${{matrix.airflow-version}}:Python ${{matrix.python-version}}
run: >
breeze release-management verify-provider-packages --use-airflow-version 2.5.0
--use-packages-from-dist --airflow-constraints-reference constraints-2.5.0
breeze release-management verify-provider-packages --use-packages-from-dist
if: needs.build-info.outputs.affected-providers-list-as-string == ''
- name: "Install affected provider packages and airflow on Airflow 2.5 files"
- name: >
Install affected provider packages and airflow on
Airflow ${{matrix.airflow-version}}:Python ${{matrix.python-version}}
run: >
breeze release-management install-provider-packages --use-airflow-version 2.5.0
--airflow-constraints-reference constraints-2.5.0 --run-in-parallel
# Make sure to skip the run if the only provider to be installed has been removed
# in the previous step
if: >
needs.build-info.outputs.affected-providers-list-as-string != '' &&
needs.build-info.outputs.affected-providers-list-as-string != 'openlineage'
breeze release-management install-provider-packages --run-in-parallel
if: needs.build-info.outputs.affected-providers-list-as-string != ''
prepare-install-provider-packages-sdist:
timeout-minutes: 80
Expand Down
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,13 @@ repos:
files: ^Dockerfile$
pass_filenames: false
additional_dependencies: ['rich>=12.4.4']
- id: generate-airflow-diagrams
name: Generate airflow diagrams
entry: ./scripts/ci/pre_commit/pre_commit_generate_airflow_diagrams.py
language: python
files: ^scripts/ci/pre_commit/pre_commit_generate_airflow_diagrams.py
pass_filenames: false
additional_dependencies: ['rich>=12.4.4', "diagrams>=0.23.4"]
- id: update-supported-versions
name: Updates supported versions in documentation
entry: ./scripts/ci/pre_commit/pre_commit_supported_versions.py
Expand Down
2 changes: 2 additions & 0 deletions STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| flynt | Run flynt string format converter for Python | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| generate-airflow-diagrams | Generate airflow diagrams | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| generate-pypi-readme | Generate PyPI README | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| identity | Print input to the static check hooks for troubleshooting | |
Expand Down
7 changes: 0 additions & 7 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3738,8 +3738,6 @@ components:
readOnly: true
weight_rule:
$ref: "#/components/schemas/WeightRule"
priority_weight_strategy:
$ref: "#/components/schemas/PriorityWeightStrategy"
ui_color:
$ref: "#/components/schemas/Color"
ui_fgcolor:
Expand Down Expand Up @@ -4769,16 +4767,11 @@ components:
WeightRule:
description: Weight rule.
type: string
nullable: true
enum:
- downstream
- upstream
- absolute

PriorityWeightStrategy:
description: Priority weight strategy.
type: string

HealthStatus:
description: Health status
type: string
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/schemas/task_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class TaskSchema(Schema):
retry_exponential_backoff = fields.Boolean(dump_only=True)
priority_weight = fields.Number(dump_only=True)
weight_rule = WeightRuleField(dump_only=True)
priority_weight_strategy = fields.String(dump_only=True)
ui_color = ColorField(dump_only=True)
ui_fgcolor = ColorField(dump_only=True)
template_fields = fields.List(fields.String(), dump_only=True)
Expand Down
11 changes: 0 additions & 11 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,6 @@ core:
description: |
The weighting method used for the effective total priority weight of the task
version_added: 2.2.0
version_deprecated: 2.8.0
deprecation_reason: |
This option is deprecated and will be removed in Airflow 3.0.
Please use ``default_task_priority_weight_strategy`` instead.
type: string
example: ~
default: ~
default_task_priority_weight_strategy:
description: |
The strategy used for the effective total priority weight of the task
version_added: 2.8.0
type: string
example: ~
default: "downstream"
Expand Down
69 changes: 0 additions & 69 deletions airflow/example_dags/example_priority_weight_strategy.py

This file was deleted.

135 changes: 135 additions & 0 deletions airflow/example_dags/example_python_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
"""
from __future__ import annotations

import logging
import sys
import time
from pprint import pprint

import pendulum

from airflow.decorators import dag, task
from airflow.operators.python import is_venv_installed

log = logging.getLogger(__name__)

PATH_TO_PYTHON_BINARY = sys.executable


@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_python_decorator():
# [START howto_operator_python]
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return "Whatever you return gets printed in the logs"

run_this = print_context()
# [END howto_operator_python]

# [START howto_operator_python_render_sql]
@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()
# [END howto_operator_python_render_sql]

# [START howto_operator_python_kwargs]
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)

for i in range(5):
sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)

run_this >> log_the_sql >> sleeping_task
# [END howto_operator_python_kwargs]

if not is_venv_installed():
log.warning("The virtalenv_python example task requires virtualenv, please install it.")
else:
# [START howto_operator_python_venv]
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep

from colorama import Back, Fore, Style

print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")

virtualenv_task = callable_virtualenv()
# [END howto_operator_python_venv]

sleeping_task >> virtualenv_task

# [START howto_operator_external_python]
@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep

print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")

external_python_task = callable_external_python()
# [END howto_operator_external_python]

run_this >> external_python_task >> virtualenv_task


example_python_decorator()
Loading

0 comments on commit 2d707f4

Please sign in to comment.