Skip to content

Commit

Permalink
qa-engine: implement fetch_adoption_metrics_per_connector_version (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Jan 26, 2023
1 parent 74b5dbf commit 3f15459
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/run-qa-engine.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ jobs:
- name: Install ci-connector-ops package
run: pip install --quiet -e ./tools/ci_connector_ops
- name: Run QA Engine
env:
QA_ENGINE_AIRBYTE_DATA_PROD_SA: '${{ secrets.QA_ENGINE_AIRBYTE_DATA_PROD_SA }}'
run: run-qa-engine
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
WITH official_connector_syncs AS (
SELECT * FROM airbyte_warehouse.connector_sync WHERE is_officially_published and (job_status = "failed" or job_status = "succeeded")
),
adoption_per_version AS (
SELECT
connector_definition_id,
docker_repository,
connector_version,
COUNT(distinct(user_id)) as number_of_users,
COUNT(distinct(connection_id)) as number_of_connections
FROM
official_connector_syncs
GROUP BY connector_definition_id, docker_repository, connector_version
),
job_status_per_version AS (
SELECT
connector_definition_id,
docker_repository,
connector_version,
job_status,
COUNT(1) as sync_count
FROM official_connector_syncs
GROUP BY connector_definition_id, docker_repository, connector_version, job_status
),
success_failure_by_connector_version AS (
SELECT
connector_definition_id,
docker_repository,
connector_version,
ifnull(failed, 0) as failed_syncs_count,
ifnull(succeeded, 0) as succeeded_syncs_count,
ifnull(succeeded, 0) + ifnull(failed, 0) as total_syncs_count,
SAFE_DIVIDE(ifnull(succeeded, 0), ifnull(succeeded, 0) + ifnull(failed, 0)) as sync_success_rate
FROM job_status_per_version
PIVOT
(
max(sync_count)
FOR job_status in ('failed', 'succeeded')
)
)
SELECT
*
FROM
adoption_per_version
LEFT JOIN success_failure_by_connector_version
USING (connector_definition_id, docker_repository, connector_version);
19 changes: 13 additions & 6 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import os
from importlib.resources import files
import json

from google.oauth2 import service_account
import requests
import pandas as pd

Expand Down Expand Up @@ -31,18 +35,21 @@ def fetch_adoption_metrics_per_connector_version() -> pd.DataFrame:
"""Retrieve adoptions metrics for each connector version from our data warehouse.
Returns:
pd.DataFrame: A Dataframe with adoption metrics per connector version.
pd.DataFrame: A dataframe with adoption metrics per connector version.
"""
# TODO: directly query BigQuery
# use query in https://airbyte.metabaseapp.com/question/1642-adoption-and-success-rate-per-connector-version-oss-cloud
return pd.DataFrame(columns=[
connector_adoption_sql = files("ci_connector_ops.qa_engine").joinpath("connector_adoption.sql").read_text()
bq_credentials = service_account.Credentials.from_service_account_info(json.loads(os.environ["QA_ENGINE_AIRBYTE_DATA_PROD_SA"]))
adoption_metrics = pd.read_gbq(connector_adoption_sql, project_id="airbyte-data-prod", credentials=bq_credentials)
return adoption_metrics[[
"connector_definition_id",
"connector_version",
"number_of_connections",
"number_of_users",
"succeeded_syncs_count",
"failed_syncs_count",
"total_syncs_count",
"sync_success_rate",
])
]]

CLOUD_CATALOG = fetch_remote_catalog(CLOUD_CATALOG_URL)
OSS_CATALOG = fetch_remote_catalog(OSS_CATALOG_URL)
ADOPTION_METRICS_PER_CONNECTOR_VERSION = fetch_adoption_metrics_per_connector_version()
4 changes: 3 additions & 1 deletion tools/ci_connector_ops/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"PyYAML~=6.0",
"GitPython~=3.1.29",
"pandas~=1.5.3",
"pandas-gbq~=0.19.0",
"pydantic~=1.10.4",
"fsspec~=2023.1.0",
"gcsfs~=2023.1.0"
Expand All @@ -22,7 +23,7 @@


setup(
version="0.1.4",
version="0.1.5",
name="ci_connector_ops",
description="Packaged maintained by the connector operations team to perform CI for connectors",
author="Airbyte",
Expand All @@ -33,6 +34,7 @@
"tests": TEST_REQUIREMENTS,
},
python_requires=">=3.9",
package_data={"ci_connector_ops.qa_engine": ["connector_adoption.sql"]},
entry_points={
"console_scripts": [
"check-test-strictness-level = ci_connector_ops.sat_config_checks:check_test_strictness_level",
Expand Down
34 changes: 31 additions & 3 deletions tools/ci_connector_ops/tests/test_qa_engine/test_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#


from importlib.resources import files

import pandas as pd
import pytest

Expand All @@ -16,15 +18,41 @@ def test_fetch_remote_catalog(catalog_url):
assert all(expected_column in catalog.columns for expected_column in expected_columns)
assert set(catalog.connector_type.unique()) == {"source", "destination"}

def test_fetch_adoption_metrics_per_connector_version():
def test_fetch_adoption_metrics_per_connector_version(mocker):
fake_bigquery_results = pd.DataFrame([{
"connector_definition_id": "abcdefgh",
"connector_version": "0.0.0",
"number_of_connections": 10,
"number_of_users": 2,
"succeeded_syncs_count": 12,
"failed_syncs_count": 1,
"total_syncs_count": 3,
"sync_success_rate": .99,
"unexpected_column": "foobar"
}])
mocker.patch.object(inputs.pd, "read_gbq", mocker.Mock(return_value=fake_bigquery_results))
mocker.patch.object(inputs.os, "environ", {"QA_ENGINE_AIRBYTE_DATA_PROD_SA": '{"type": "fake_service_account"}'})
mocker.patch.object(inputs.service_account.Credentials, "from_service_account_info")
expected_columns = {
"connector_definition_id",
"connector_version",
"number_of_connections",
"number_of_users",
"succeeded_syncs_count",
"failed_syncs_count",
"total_syncs_count",
"sync_success_rate",
}

expected_sql_query = files("ci_connector_ops.qa_engine").joinpath("connector_adoption.sql").read_text()
expected_project_id = "airbyte-data-prod"
adoption_metrics_per_connector_version = inputs.fetch_adoption_metrics_per_connector_version()
assert len(adoption_metrics_per_connector_version) == 0
assert isinstance(adoption_metrics_per_connector_version, pd.DataFrame)
assert set(adoption_metrics_per_connector_version.columns) == expected_columns
inputs.service_account.Credentials.from_service_account_info.assert_called_with(
{"type": "fake_service_account"}
)
inputs.pd.read_gbq.assert_called_with(
expected_sql_query,
project_id=expected_project_id,
credentials=inputs.service_account.Credentials.from_service_account_info.return_value
)

0 comments on commit 3f15459

Please sign in to comment.