Skip to content

Commit

Permalink
Nightly Reports: Generate markdown slack report from new nightlies (#…
Browse files Browse the repository at this point in the history
…26990)

* Add github asset

* reduce columns in asset

* Add sensor

* remove sensor calls

* Remove bucket manager

* Get nightly resources

* Add nightly report data

* TODO

* Generate usable data frame

* Add template

* Add all data to report

* Generalize gcs blob sensor

* Remove old metadata sensor

* Send report to slack

* Complete report

* Remove overly verbose comments

* Add generics

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@sers.noreply.github.com>
  • Loading branch information
bnchrch and Octavia Squidington III authored Jun 8, 2023
1 parent 37244b1 commit 1315f5a
Show file tree
Hide file tree
Showing 21 changed files with 613 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
METADATA_BUCKET="ben-ab-test-bucket"
CI_REPORT_BUCKET="airbyte-ci-reports"
GITHUB_METADATA_SERVICE_TOKEN=""
NIGHTLY_REPORT_SLACK_WEBHOOK_URL=""
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,33 @@
#
from dagster import Definitions, load_assets_from_modules

from orchestrator.resources.gcp import gcp_gcs_client, gcs_bucket_manager, gcs_directory_blobs, gcs_file_blob, gcs_file_manager
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory
from orchestrator.resources.gcp import gcp_gcs_client, gcs_directory_blobs, gcs_file_blob, gcs_file_manager
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory, github_workflow_runs

from orchestrator.assets import (
github,
connector_nightly_report,
specs_secrets_mask,
spec_cache,
registry,
registry_report,
metadata,
)

from orchestrator.jobs.registry import generate_registry_reports, generate_registry
from orchestrator.jobs.registry import generate_registry_reports, generate_registry, generate_nightly_reports
from orchestrator.sensors.registry import registry_updated_sensor
from orchestrator.sensors.metadata import metadata_updated_sensor
from orchestrator.sensors.gcs import new_gcs_blobs_sensor

from orchestrator.config import REPORT_FOLDER, REGISTRIES_FOLDER, CONNECTORS_PATH, CONNECTOR_REPO_NAME
from orchestrator.config import (
REPORT_FOLDER,
REGISTRIES_FOLDER,
CONNECTORS_PATH,
CONNECTOR_REPO_NAME,
NIGHTLY_FOLDER,
NIGHTLY_COMPLETE_REPORT_FILE_NAME,
NIGHTLY_INDIVIDUAL_TEST_REPORT_FILE_NAME,
NIGHTLY_GHA_WORKFLOW_ID,
)
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER

ASSETS = load_assets_from_modules(
Expand All @@ -30,6 +40,7 @@
metadata,
registry,
registry_report,
connector_nightly_report,
]
)

Expand All @@ -38,22 +49,51 @@
"github_client": github_client.configured({"github_token": {"env": "GITHUB_METADATA_SERVICE_TOKEN"}}),
"github_connector_repo": github_connector_repo.configured({"connector_repo_name": CONNECTOR_REPO_NAME}),
"github_connectors_directory": github_connectors_directory.configured({"connectors_path": CONNECTORS_PATH}),
"github_connector_nightly_workflow_successes": github_workflow_runs.configured(
{
"workflow_id": NIGHTLY_GHA_WORKFLOW_ID,
"branch": "master",
"status": "success",
}
),
"gcp_gcs_client": gcp_gcs_client.configured(
{
"gcp_gcs_cred_string": {"env": "GCS_CREDENTIALS"},
}
),
"gcs_bucket_manager": gcs_bucket_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}}),
"registry_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER}),
"registry_report_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REPORT_FOLDER}),
"latest_metadata_file_blobs": gcs_directory_blobs.configured({"prefix": METADATA_FOLDER, "suffix": f"latest/{METADATA_FILE_NAME}"}),
"latest_oss_registry_gcs_blob": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}),
"latest_cloud_registry_gcs_blob": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "cloud_registry.json"}),
"latest_metadata_file_blobs": gcs_directory_blobs.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "suffix": f"latest/{METADATA_FILE_NAME}"}
),
"latest_oss_registry_gcs_blob": gcs_file_blob.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}
),
"latest_cloud_registry_gcs_blob": gcs_file_blob.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER, "gcs_filename": "cloud_registry.json"}
),
"latest_nightly_complete_file_blobs": gcs_directory_blobs.configured(
{"gcs_bucket": {"env": "CI_REPORT_BUCKET"}, "prefix": NIGHTLY_FOLDER, "suffix": NIGHTLY_COMPLETE_REPORT_FILE_NAME}
),
"latest_nightly_test_output_file_blobs": gcs_directory_blobs.configured(
{"gcs_bucket": {"env": "CI_REPORT_BUCKET"}, "prefix": NIGHTLY_FOLDER, "suffix": NIGHTLY_INDIVIDUAL_TEST_REPORT_FILE_NAME}
),
}

SENSORS = [
registry_updated_sensor(job=generate_registry_reports, resources_def=RESOURCES),
metadata_updated_sensor(job=generate_registry, resources_def=RESOURCES),
new_gcs_blobs_sensor(
job=generate_registry,
resources_def=RESOURCES,
gcs_blobs_resource_key="latest_metadata_file_blobs",
interval=30,
),
new_gcs_blobs_sensor(
job=generate_nightly_reports,
resources_def=RESOURCES,
gcs_blobs_resource_key="latest_nightly_complete_file_blobs",
interval=(1 * 60 * 60),
),
]

SCHEDULES = []
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import pandas as pd
import json
import os

from dagster import Output, asset, OpExecutionContext, MetadataValue
from google.cloud import storage
from typing import List, Type, TypeVar

from orchestrator.ops.slack import send_slack_webhook
from orchestrator.models.ci_report import ConnectorNightlyReport, ConnectorPipelineReport
from orchestrator.config import (
NIGHTLY_COMPLETE_REPORT_FILE_NAME,
)
from orchestrator.templates.render import (
render_connector_nightly_report_md,
)

T = TypeVar("T")

GROUP_NAME = "connector_nightly_report"

# HELPERS


def json_blob_to_model(blob: storage.Blob, Model: Type[T]) -> T:
report = blob.download_as_string()
file_path = blob.name

# parse json
report_json = json.loads(report)

# parse into pydandic model
report_model = Model(file_path=file_path, **report_json)

return report_model


def blobs_to_typed_df(blobs: List[storage.Blob], Model: Type[T]) -> pd.DataFrame:
# read each blob into a model
models = [json_blob_to_model(blob, Model) for blob in blobs]

# convert to dataframe
models_df = pd.DataFrame(models)

return models_df


def get_latest_reports(blobs: List[storage.Blob], number_to_get: int) -> List[storage.Blob]:
"""
Get the latest n reports from a list of blobs
Args:
blobs(List[storage.Blob]): A list of nightly report complete.json blobs
number_to_get(int): The number of latest reports to get
Returns:
A list of blobs
"""

# We can sort by the name to get the latest 10 nightly runs
# As the nightly reports have the timestamp in the path by design
# e.g. airbyte-ci/connectors/test/nightly_builds/master/1686132340/05d686eb0eee2888f6af010b385a4ede330a886b/complete.json
latest_nightly_complete_file_blobs = sorted(blobs, key=lambda blob: blob.name, reverse=True)[:number_to_get]
return latest_nightly_complete_file_blobs


def get_relevant_test_outputs(
latest_nightly_test_output_file_blobs: List[storage.Blob], latest_nightly_complete_file_blobs: List[storage.Blob]
) -> List[storage.Blob]:
"""
Get the relevant test output blobs that are in the same folder to any latest nightly runs
Args:
latest_nightly_test_output_file_blobs (List[storage.Blob]): A list of connector report output.json blobs
latest_nightly_complete_file_blobs (List[storage.Blob]): A list of nightly report complete.json blobs
Returns:
List[storage.Blob]: A list of relevant test output blobs
"""
# get all parent file paths of latest_nightly_complete_file_blobs by removing complete.json from the end of the file path
latest_nightly_complete_file_paths = [
blob.name.replace(f"/{NIGHTLY_COMPLETE_REPORT_FILE_NAME}", "") for blob in latest_nightly_complete_file_blobs
]

# filter latest_nightly_test_output_file_blobs to only those that have a parent file path in latest_nightly_complete_file_paths
# This is to filter out incomplete, or unrelated/old connector test output files
relevant_nightly_test_output_file_blobs = [
blob
for blob in latest_nightly_test_output_file_blobs
if any([parent_prefix in blob.name for parent_prefix in latest_nightly_complete_file_paths])
]

return relevant_nightly_test_output_file_blobs


def compute_connector_nightly_report_history(
nightly_report_complete_df: pd.DataFrame, nightly_report_test_output_df: pd.DataFrame
) -> pd.DataFrame:
# Add a new column to nightly_report_complete_df that is the parent file path of the complete.json file
nightly_report_complete_df["parent_prefix"] = nightly_report_complete_df["file_path"].apply(
lambda file_path: file_path.replace(f"/{NIGHTLY_COMPLETE_REPORT_FILE_NAME}", "")
)

# Add a new column to nightly_report_test_output_df that is the nightly report file path that the test output belongs to
nightly_report_test_output_df["nightly_path"] = nightly_report_test_output_df["file_path"].apply(
lambda file_path: [parent_prefix for parent_prefix in nightly_report_complete_df["parent_prefix"] if parent_prefix in file_path][0]
)

# This will be a matrix of connector success/failure for each nightly run
matrix_df = nightly_report_test_output_df.pivot(index="connector_technical_name", columns="nightly_path", values="success")

# Sort columns by name
matrix_df = matrix_df.reindex(sorted(matrix_df.columns), axis=1)

return matrix_df


# ASSETS


@asset(required_resource_keys={"latest_nightly_complete_file_blobs", "latest_nightly_test_output_file_blobs"}, group_name=GROUP_NAME)
def generate_nightly_report(context: OpExecutionContext) -> Output[pd.DataFrame]:
"""
Generate the Connector Nightly Report from the latest 10 nightly runs
"""
latest_nightly_complete_file_blobs = context.resources.latest_nightly_complete_file_blobs
latest_nightly_test_output_file_blobs = context.resources.latest_nightly_test_output_file_blobs

latest_10_nightly_complete_file_blobs = get_latest_reports(latest_nightly_complete_file_blobs, 10)
relevant_nightly_test_output_file_blobs = get_relevant_test_outputs(
latest_nightly_test_output_file_blobs, latest_10_nightly_complete_file_blobs
)

nightly_report_complete_df = blobs_to_typed_df(latest_10_nightly_complete_file_blobs, ConnectorNightlyReport)
nightly_report_test_output_df = blobs_to_typed_df(relevant_nightly_test_output_file_blobs, ConnectorPipelineReport)

nightly_report_connector_matrix_df = compute_connector_nightly_report_history(nightly_report_complete_df, nightly_report_test_output_df)

nightly_report_complete_md = render_connector_nightly_report_md(nightly_report_connector_matrix_df, nightly_report_complete_df)

slack_webhook_url = os.getenv("NIGHTLY_REPORT_SLACK_WEBHOOK_URL")
if slack_webhook_url:
send_slack_webhook(slack_webhook_url, nightly_report_complete_md)

return Output(
nightly_report_connector_matrix_df,
metadata={"count": len(nightly_report_connector_matrix_df), "preview": MetadataValue.md(nightly_report_complete_md)},
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from dagster import Output, asset
from dagster import Output, asset, OpExecutionContext
import pandas as pd
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe


GROUP_NAME = "github"

Expand All @@ -12,3 +15,30 @@ def github_connector_folders(context):

folder_names = [item.name for item in github_connectors_directory if item.type == "dir"]
return Output(folder_names, metadata={"preview": folder_names})


@asset(required_resource_keys={"github_connector_nightly_workflow_successes"}, group_name=GROUP_NAME)
def github_connector_nightly_workflow_successes(context: OpExecutionContext) -> OutputDataFrame:
"""
Return a list of all the latest nightly workflow runs for the connectors repo.
"""
github_connector_nightly_workflow_successes = context.resources.github_connector_nightly_workflow_successes

workflow_df = pd.DataFrame(github_connector_nightly_workflow_successes)
workflow_df = workflow_df[
[
"id",
"name",
"head_branch",
"head_sha",
"run_number",
"status",
"conclusion",
"workflow_id",
"url",
"created_at",
"updated_at",
"run_started_at",
]
]
return output_dataframe(workflow_df)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from metadata_service.constants import METADATA_FILE_NAME, ICON_FILE_NAME

from orchestrator.utils.object_helpers import are_values_equal, merge_values
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.models.metadata import PartialMetadataDefinition, MetadataDefinition, LatestMetadataEntry
from orchestrator.config import get_public_url_for_gcs_file

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
REGISTRIES_FOLDER = "registries/v0"
REPORT_FOLDER = "generated_reports"

NIGHTLY_FOLDER = "airbyte-ci/connectors/test/nightly_builds/master"
NIGHTLY_COMPLETE_REPORT_FILE_NAME = "complete.json"
NIGHTLY_INDIVIDUAL_TEST_REPORT_FILE_NAME = "output.json"
NIGHTLY_GHA_WORKFLOW_ID = "connector_nightly_builds_dagger.yml"

CONNECTOR_REPO_NAME = "airbytehq/airbyte"
CONNECTORS_PATH = "airbyte-integrations/connectors"
CONNECTORS_TEST_RESULT_BUCKET_URL = "https://dnsgjos7lj2fu.cloudfront.net"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@

registry_reports_inclusive = AssetSelection.keys("connector_registry_report").upstream()
generate_registry_reports = define_asset_job(name="generate_registry_reports", selection=registry_reports_inclusive)

nightly_reports_inclusive = AssetSelection.keys("generate_nightly_report").upstream()
generate_nightly_reports = define_asset_job(name="generate_nightly_reports", selection=nightly_reports_inclusive)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from dataclasses import dataclass
from typing import Optional

# TODO (ben): When the pipeline project is brought into the airbyte-ci folder
# we should update these models to import their twin models from the pipeline project


@dataclass(frozen=True)
class ConnectorNightlyReport:
file_path: str
pipeline_name: Optional[str] = None
run_timestamp: Optional[str] = None
run_duration: Optional[float] = None
success: Optional[bool] = None
failed_steps: Optional[list] = None
successful_steps: Optional[list] = None
skipped_steps: Optional[list] = None
gha_workflow_run_url: Optional[str] = None
pipeline_start_timestamp: Optional[int] = None
pipeline_end_timestamp: Optional[int] = None
pipeline_duration: Optional[int] = None
git_branch: Optional[str] = None
git_revision: Optional[str] = None
ci_context: Optional[str] = None
pull_request_url: Optional[str] = None


@dataclass(frozen=True)
class ConnectorPipelineReport:
file_path: str
connector_technical_name: Optional[str] = None
connector_version: Optional[str] = None
run_timestamp: Optional[str] = None
run_duration: Optional[float] = None
success: Optional[bool] = None
failed_steps: Optional[list] = None
successful_steps: Optional[list] = None
skipped_steps: Optional[list] = None
gha_workflow_run_url: Optional[str] = None
pipeline_start_timestamp: Optional[int] = None
pipeline_end_timestamp: Optional[int] = None
pipeline_duration: Optional[int] = None
git_branch: Optional[str] = None
git_revision: Optional[str] = None
ci_context: Optional[str] = None
cdk_version: Optional[str] = None
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0
from pydantic import ValidationError
from dataclasses import dataclass

from pydantic import ValidationError
from typing import Tuple, Any, Optional


Expand Down
Loading

0 comments on commit 1315f5a

Please sign in to comment.