Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandInguva committed Nov 4, 2022
1 parent 6afba48 commit 27fe681
Showing 1 changed file with 62 additions and 70 deletions.
132 changes: 62 additions & 70 deletions sdks/python/apache_beam/testing/analyzers/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import time
import uuid

import pandas
import pandas as pd
import requests
import sys
import yaml
Expand Down Expand Up @@ -188,77 +188,55 @@ def search_issue_with_title(self, title, labels=None):
class ChangePointAnalysis:
def __init__(
self,
data: pandas.DataFrame,
data: Union[List[float], List[List[float]], np.ndarray],
metric_name: str,
):
self.data = data
self.metric_name = metric_name

@staticmethod
def edivisive_means(
series: Union[List[float], List[List[float]], np.ndarray],
pvalue: float = 0.05,
permutations: int = 100):
return e_divisive(series, pvalue, permutations)

def find_change_points(self, analysis='edivisive') -> List:
if analysis == 'edivisive':
metric_values = self.data[load_test_metrics_utils.VALUE_LABEL].tolist()
change_points = ChangePointAnalysis.edivisive_means(metric_values)
return change_points
else:
raise NotImplementedError
self, pvalue: float = 0.05, permutations: int = 100) -> List:
"""
Args:
pvalue: p value for the permutation test.
permutations: Number of permutations for the permutation test.
Returns:
The indices of change points.
"""
return e_divisive(self.data, pvalue, permutations)

def get_failing_test_description(self):
metric_description = _METRIC_DESCRIPTION.format(self.metric_name) + 2 * '\n'
for data in self.data:
metric_description += _METRIC_INFO.format(
data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL],
data[load_test_metrics_utils.METRICS_TYPE_LABEL],
data[load_test_metrics_utils.VALUE_LABEL]) + '\n'
return metric_description
# def get_failing_test_description(self):


# metric_description = _METRIC_DESCRIPTION.format(self.metric_name) + 2 * '\n'
# for data in self.data:
# metric_description += _METRIC_INFO.format(
# data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL],
# data[load_test_metrics_utils.METRICS_TYPE_LABEL],
# data[load_test_metrics_utils.VALUE_LABEL]) + '\n'
# return metric_description


class RunChangePointAnalysis:
def __init__(self, args):
self.change_point_sibling_distance = args.change_point_sibling_distance
self.changepoint_to_recent_run_window = (
args.changepoint_to_recent_run_window)
self.test_name = sys.argv[0]

def has_latest_sibling_change_point(
self, metric_values, metric_name, sibling_indexes_to_search) -> bool:
query_template = """
SELECT * FROM {project}.{dataset}.{table}
WHERE {metric_name_id} = '{metric_name}'
ORDER BY {timestamp} DESC
LIMIT 10
""".format(
project=BQ_PROJECT_NAME,
dataset=BQ_DATASET,
metric_name_id=METRIC_NAME,
metric_name=metric_name,
timestamp=SUBMIT_TIMESTAMP_LABEL,
table=self.test_name)
df = FetchMetrics.fetch_from_bq(query_template=query_template)
latest_change_point = df[CHANGE_POINT_LABEL]

for sibling_index in sibling_indexes_to_search:
# Verify the logic.
if metric_values[sibling_index] == latest_change_point:
return True

return False

def is_changepoint_in_valid_window(self, change_point_index: int) -> bool:
# If the change point is more than N runs behind the most recent run
# then don't raise an alert for it.
if self.changepoint_to_recent_run_window > change_point_index:
if self.changepoint_to_recent_run_window >= change_point_index:
return True
return False

def has_sibling_change_point(
self, changepoint_index, metric_values, metric_name) -> Optional[int]:
self,
change_point_index: int,
metric_values: List,
metric_name: str,
test_name: str) -> Optional[int]:
"""
Finds the sibling change point index. If not,
returns the original changepoint index.
Expand All @@ -270,38 +248,54 @@ def has_sibling_change_point(

# Search backward from the current changepoint
sibling_indexes_to_search = []
for i in range(changepoint_index - 1, -1, -1):
if changepoint_index - i >= self.change_point_sibling_distance:
for i in range(change_point_index - 1, -1, -1):
if change_point_index - i <= self.change_point_sibling_distance:
sibling_indexes_to_search.append(i)
# Search forward from the current changepoint
for i in range(changepoint_index + 1, len(metric_values)):
if i - changepoint_index <= self.change_point_sibling_distance:
for i in range(change_point_index + 1, len(metric_values)):
if i - change_point_index <= self.change_point_sibling_distance:
sibling_indexes_to_search.append(i)
# Look for change points within change_point_sibling_distance.
# Return the first change point found.
alert_new_issue = self.has_latest_sibling_change_point(
metric_values=metric_values,
query_template = """
SELECT * FROM {project}.{dataset}.{table}
WHERE {metric_name_id} = '{metric_name}'
ORDER BY {timestamp} DESC
LIMIT 10
""".format(
project=BQ_PROJECT_NAME,
dataset=BQ_DATASET,
metric_name_id=METRIC_NAME,
metric_name=metric_name,
sibling_indexes_to_search=sibling_indexes_to_search)
timestamp=SUBMIT_TIMESTAMP_LABEL,
table=test_name)
df = FetchMetrics.fetch_from_bq(query_template=query_template)
latest_change_point = df[CHANGE_POINT_LABEL].tolist()[0]

if not alert_new_issue:
return False
return True
alert_new_issue = True
for sibling_index in sibling_indexes_to_search:
# Verify the logic.
if metric_values[sibling_index] == latest_change_point:
alert_new_issue = False
break
return alert_new_issue

def run(self, file_path):
with open(file_path, 'r') as stream:
config = yaml.safe_load(stream)
metric_name = config['metric_name']
test_name = config['test_name']
if config['source'] == 'big_query':
metric_values = FetchMetrics.fetch_from_bq(
data: pd.DataFrame = FetchMetrics.fetch_from_bq(
project_name=config['project'],
dataset=config['metrics_dataset'],
table=config['metrics_table'],
metric_name=metric_name)

metric_values = data[load_test_metrics_utils.VALUE_LABEL].to_list()
change_point_analyzer = ChangePointAnalysis(
metric_values, metric_name=metric_name)
change_points_idx = change_point_analyzer.find_change_points()
change_points_idx = change_point_analyzer.edivisive_means()

if not change_points_idx:
return
Expand All @@ -327,12 +321,10 @@ def run(self, file_path):
labels=GH_ISSUE_LABELS)

bq_metrics_publisher = BigQueryMetricsPublisher(
project_name=BQ_PROJECT_NAME,
dataset=BQ_DATASET,
table=self.test_name)
project_name=BQ_PROJECT_NAME, dataset=BQ_DATASET, table=test_name)
metric_dict = Metric(
submit_timestamp=time.time(),
test_name=self.test_name,
test_name=test_name,
metric_name=metric_name,
test_id=uuid.uuid4().hex,
change_point=metric_values[
Expand Down Expand Up @@ -362,15 +354,15 @@ def run(self, file_path):
help='Search for a sibling changepoint in both directions '
'from the current change point index.')
parser.add_argument(
'changepoint_to_recent_run_window',
'--changepoint_to_recent_run_window',
type=int,
default=7,
help='Only allow creating alerts when regression '
'happens if the run corresponding to the regressions is '
'within changepoint_to_recent_run_window.')
known_args, unknown_args = parser.parse_known_args()

_LOGGER.warning('Discarding unknown arguments : %s ' % unknown_args)
if unknown_args:
_LOGGER.warning('Discarding unknown arguments : %s ' % unknown_args)
file_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
RunChangePointAnalysis(known_args).run(file_path=file_path, )
RunChangePointAnalysis(known_args).run(file_path=file_path)

0 comments on commit 27fe681

Please sign in to comment.