Skip to content

Commit

Permalink
Merge branch 'main' into py-3-11
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro authored Jul 25, 2023
2 parents f181cd8 + 3666c5d commit 4e472e8
Show file tree
Hide file tree
Showing 29 changed files with 339 additions and 62 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ executors:
parameters:
python_version:
type: string
default: "3.7"
default: "3.8"
docker:
- image: cimg/python:<<parameters.python_version>>

Expand All @@ -96,7 +96,7 @@ jobs:
- run:
name: Run pre-commit
command: |
pip install pre-commit
pip install "pre-commit>=3.0.0"
pre-commit run markdown-link-check --all-files || { git --no-pager diff && false ; }
- save_cache:
key: v1-pc-cache-{{ checksum "pre-commit-cache-key.txt" }}-
Expand Down
2 changes: 1 addition & 1 deletion .circleci/integration-tests/Dockerfile.astro_cloud
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Deploy from astro runtime image
FROM quay.io/astronomer/astro-runtime:8.6.0-base
FROM quay.io/astronomer/astro-runtime:8.8.0-base

ENV AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=False
ENV AWS_NUKE_VERSION=v2.17.0
Expand Down
70 changes: 70 additions & 0 deletions .circleci/integration-tests/airflow_dag_introspection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import time

from airflow.models.taskinstance import TaskInstance
from airflow.utils.log.log_reader import TaskLogReader
from airflow.utils.session import create_session
from sqlalchemy.orm import joinedload


def check_log_retries(max_retries: int, log_container: list):
"""Fetch task logs with retry."""
retries = 0
while retries < max_retries:
try:
if log_container and log_container[0] and log_container[0][0]:
logs = log_container[0][0][1]
break
else:
print("Invalid log_container structure. Unable to retrieve logs.")
retries += 1
except IndexError:
print("IndexError occurred. Retrying...")
retries += 1
except Exception as e:
print(f"An exception occurred: {e}")
if retries < max_retries:
# Wait for some time before retrying
time.sleep(60)
print("Retrying...")
# Increment the number of retries
retries += 1
else:
print("Maximum number of retries reached.")
return logs


def check_log(ti_id: str, expected: str, notexpected: str, try_number: int = 1, **context: dict):
"""Get Task logs of a given TI, also validate expected substring,
we can also provide a substring which is not expected
"""
time.sleep(30)
dagrun = context["dag_run"]
task_instances = dagrun.get_task_instances()
this_task_instance = next(filter(lambda ti: ti.task_id == ti_id, task_instances)) # ti_call

def check(ti, expect: str, notexpect: str):
with create_session() as session:
ti = (
session.query(TaskInstance)
.filter(
TaskInstance.task_id == ti.task_id,
TaskInstance.dag_id == ti.dag_id,
TaskInstance.run_id == ti.run_id,
TaskInstance.map_index == ti.map_index,
)
.join(TaskInstance.dag_run)
.options(joinedload("trigger"))
.options(joinedload("trigger.triggerer_job"))
).first()
task_log_reader = TaskLogReader()

log_container, _ = task_log_reader.read_log_chunks(ti, try_number, {"download_logs": True})
logs = check_log_retries(10, log_container)
print(f"Found logs: {logs}")
if notexpect in logs:
raise Exception(f"{notexpect} is present in the 'logs'.")
if expect not in logs:
raise Exception(f"{notexpect} is not present in the 'logs'.")
print(f"Found {expect} but not {notexpect}")

check(this_task_instance, expected, notexpected)
69 changes: 59 additions & 10 deletions .circleci/integration-tests/master_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from airflow.models import DagRun
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.session import create_session
from airflow_dag_introspection import check_log

SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "#provider-alert")
SLACK_WEBHOOK_CONN = os.getenv("SLACK_WEBHOOK_CONN", "http_slack")
Expand Down Expand Up @@ -63,11 +64,21 @@ def get_report(dag_run_ids: List[str], **context: Any) -> None:
airflow_version = context["ti"].xcom_pull(task_ids="get_airflow_version")
airflow_executor = context["ti"].xcom_pull(task_ids="get_airflow_executor")
astronomer_providers_version = context["ti"].xcom_pull(task_ids="get_astronomer_providers_version")
astro_cloud_provider = context["ti"].xcom_pull(task_ids="get_astro_cloud_provider")

report_details = [
f"*{header}:* `{value}`\n"
for header, value in [
("Runtime version", os.getenv("ASTRONOMER_RUNTIME_VERSION", "N/A")),
("Airflow version", airflow_version),
("Executor", airflow_executor),
("astronomer-providers version", astronomer_providers_version),
("Cloud provider", astro_cloud_provider),
]
]

if IS_RUNTIME_RELEASE:
airflow_version_message = f"Results generated for Runtime version `{os.environ['ASTRONOMER_RUNTIME_VERSION']}` with `{airflow_executor}` and astronomer-providers version `{astronomer_providers_version}` \n\n"
else:
airflow_version_message = f"The below run is on Airflow version `{airflow_version}` with `{airflow_executor}` and astronomer-providers version `{astronomer_providers_version}`\n\n"
report_details.insert(0, "Results generated for:\n\n")
report_details.append("\n") # Adding an additional newline at the end

master_dag_deployment_link = f"{os.environ['AIRFLOW__WEBSERVER__BASE_URL']}/dags/example_master_dag/grid?search=example_master_dag"
deployment_message = f"\n <{master_dag_deployment_link}|Link> to the master DAG for the above run on Astro Cloud deployment \n"
Expand All @@ -94,15 +105,28 @@ def get_report(dag_run_ids: List[str], **context: Any) -> None:
message_list.append(dr_status)
message_list.extend(failed_tasks)
failed_dag_count += 1

output_list = [
airflow_version_message,
f"*Total DAGS*: {dag_count} \n",
f"*Success DAGS*: {dag_count-failed_dag_count} :green_apple: \n",
f"*Failed DAGS*: {failed_dag_count} :apple: \n \n",
]
output_list = report_details + output_list
if failed_dag_count > 0:
output_list.append("*Failure Details:* \n")
output_list.extend(message_list)
dag_run = context["dag_run"]
task_instances = dag_run.get_task_instances()

task_failure_message_list: List[str] = [
f":red_circle: {ti.task_id} \n" for ti in task_instances if ti.state == "failed"
]

if task_failure_message_list:
output_list.append(
"\nSome of Master DAG tasks failed, please check with deployment link below \n"
)
output_list.extend(task_failure_message_list)
output_list.append(deployment_message)
logging.info("%s", "".join(output_list))
# Send dag run report on Slack
Expand Down Expand Up @@ -136,7 +160,7 @@ def prepare_dag_dependency(task_info, execution_time):
wait_for_completion=True,
reset_dag_run=True,
execution_date=execution_time,
allowed_states=["success", "failed", "skipped"],
allowed_states=["success", "failed"],
trigger_rule="all_done",
)
)
Expand All @@ -163,6 +187,18 @@ def prepare_dag_dependency(task_info, execution_time):
get_airflow_version = BashOperator(
task_id="get_airflow_version", bash_command="airflow version", do_xcom_push=True
)
check_logs_data = PythonOperator(
task_id="check_logs",
python_callable=check_log,
op_args=[
"get_airflow_version",
"{{ ti.xcom_pull(task_ids='get_airflow_version') }}",
"this_string_should_not_be_present_in_logs",
],
)

airflow_version_check = (get_airflow_version, check_logs_data)
chain(*airflow_version_check)

get_airflow_executor = BashOperator(
task_id="get_airflow_executor",
Expand All @@ -176,6 +212,17 @@ def prepare_dag_dependency(task_info, execution_time):
do_xcom_push=True,
)

get_astro_cloud_provider = BashOperator(
task_id="get_astro_cloud_provider",
bash_command=(
"[[ $AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID == *azure* ]] && echo 'azure' ||"
"([[ $AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID == *s3* ]] && echo 'aws' ||"
"([[ $AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID == *gcs* ]] && echo 'gcs' ||"
"echo 'unknown'))"
),
do_xcom_push=True,
)

dag_run_ids = []

# AWS sagemaker and batch
Expand Down Expand Up @@ -310,16 +357,17 @@ def prepare_dag_dependency(task_info, execution_time):
provide_context=True,
)

end = DummyOperator(
end = EmptyOperator(
task_id="end",
trigger_rule="all_success",
)

start >> [
list_installed_pip_packages,
get_airflow_version,
airflow_version_check[0],
get_airflow_executor,
get_astronomer_providers_version,
get_astro_cloud_provider,
emr_eks_trigger_tasks[0],
emr_sensor_trigger_tasks[0],
aws_misc_dags_tasks[0],
Expand All @@ -339,9 +387,10 @@ def prepare_dag_dependency(task_info, execution_time):

last_task = [
list_installed_pip_packages,
get_airflow_version,
airflow_version_check[-1],
get_airflow_executor,
get_astronomer_providers_version,
get_astro_cloud_provider,
amazon_trigger_tasks[-1],
emr_eks_trigger_tasks[-1],
emr_sensor_trigger_tasks[-1],
Expand Down
14 changes: 14 additions & 0 deletions .github/scripts/validate_release_commit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import re
import sys

if __name__ == "__main__":
pattern = r"^Release \d{1,}\.\d{1,}\.\d{1,}$"
if len(sys.argv) == 2:
commit_message = sys.argv[1]
if not re.fullmatch(pattern, commit_message):
print(f"{commit_message} does not match '{pattern}'")
sys.exit(1)
else:
print(f"{commit_message} matches '{pattern}'")
else:
raise ValueError("One positional argument 'commit message' is required")
6 changes: 4 additions & 2 deletions .github/workflows/delete-branch-on-pr-close.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ jobs:
uses: actions/checkout@v3

- name: Verify and Delete RC testing branch
env:
branch_name: ${{ github.event.pull_request.head.ref }}
run: |
git config --global user.name "GitHub Actions"
git config --global user.email "action@github.com"
git fetch --prune --all
if [[ "${{ github.event.pull_request.head.ref }}" == rc-test-* ]]; then
git push origin --delete ${{ github.event.pull_request.head.ref }}
if [[ "$branch_name" == rc-test-* ]]; then
git push origin --delete $branch_name
else
echo "Branch does not have the required RC testing prefix. Skipping branch deletion."
fi
6 changes: 5 additions & 1 deletion .github/workflows/deploy-integration-tests.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
name: deploy-integration-tests-to-astro-cloud
name: Deploy integration tests to astro cloud

on: # yamllint disable-line rule:truthy
schedule:
Expand Down Expand Up @@ -57,6 +57,8 @@ jobs:
deployment_id: ${{ secrets.PROVIDER_INTEGRATION_TESTS_DEPLOYMENT_ID }}
astronomer_key_id: ${{ secrets.PROVIDER_INTEGRATION_TESTS_ASTRONOMER_KEY_ID }}
astronomer_key_secret: ${{ secrets.PROVIDER_INTEGRATION_TESTS_ASTRONOMER_KEY_SECRET }}
organization_id: ${{ secrets.ORGANIZATION_ID }}
bearer_token: ${{ secrets.BEARER_TOKEN }}

deploy-to-providers-integration-tests-on-KE:
if: |
Expand Down Expand Up @@ -88,3 +90,5 @@ jobs:
deployment_id: ${{ secrets.PROVIDER_INTEGRATION_TESTS_ON_KE_DEPLOYMENT_ID }}
astronomer_key_id: ${{ secrets.PROVIDER_INTEGRATION_TESTS_ON_KE_ASTRONOMER_KEY_ID }}
astronomer_key_secret: ${{ secrets. PROVIDER_INTEGRATION_TESTS_ON_KE_ASTRONOMER_KEY_SECRET }}
organization_id: ${{ secrets.ORGANIZATION_ID }}
bearer_token: ${{ secrets.BEARER_TOKEN }}
2 changes: 1 addition & 1 deletion .github/workflows/deploy-to-astro-cloud-reuse-wf.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
name: deploy-to-astro-cloud
name: Deploy to astro cloud

on: # yamllint disable-line rule:truthy
workflow_call:
Expand Down
43 changes: 43 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
name: Trigger release and bump version

on: # yamllint disable-line rule:truthy
pull_request:
types:
- closed

jobs:
trigger-release-process:
if: github.event.pull_request.merged == true && startswith(github.event.head_commit.message, 'Release ')
runs-on: ubuntu-latest
steps:
- name: Check out
uses: actions/checkout@v3
with:
token: ${{ secrets.GITHUB_TOKEN }}

- name: Check head commit message
env:
head_commit_title: ${{ github.event.head_commit.message }}
run: |
python ./.github/scripts/validate_release_commit.py "$head_commit_title"
- name: Install commitizen
run: |
pip3 install commitizen
- name: Setup Github Actions git user
run: |
git config --global user.email "airflow-oss-bot@astronomer.io"
git config --global user.name "airflow-oss-bot"
- name: Tag and push tag for triggering release
run: |
git tag `cz version -p`
git push origin `cz version -p`
- name: Bump version for new development
run: |
make ASTRO_PROVIDER_VERSION=dev bump-version
git add .
git push origin main
Loading

0 comments on commit 4e472e8

Please sign in to comment.