Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Algo: Introduce estimation threshold control to replicas estimation #89

Merged
merged 2 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions algorithm/kapacity/portrait/horizontal/predictive/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ def parse_args():
parser.add_argument('--re-test-dataset-size-in-seconds',
help='size of test dataset in seconds for replicas estimation model',
required=False, default=86400)
parser.add_argument('--re-min-correlation-allowed',
help='minimum allowed correlation of replicas estimation model,'
'the estimation would fail if the model\'s correlation is lower than this threshold,'
'this arg should be a float number within range [0, 1]',
required=False, default=0.9)
parser.add_argument('--re-max-mse-allowed',
help='maximum allowed MSE of replicas estimation model,'
'the estimation would fail if the model\'s MSE is larger than this threshold,'
'this arg should be a float number within range [0, +∞)',
required=False, default=10.0)
parser.add_argument('--scaling-freq', help='frequency of scaling, the duration should be larger than the frequency'
'of the time series forecasting model',
required=True)
Expand Down Expand Up @@ -131,12 +141,14 @@ def predict_replicas(args, metric_ctx, pred_traffics):
traffic_col,
metric_ctx.resource_target,
int(args.re_time_delta_hours),
int(args.re_test_dataset_size_in_seconds))
int(args.re_test_dataset_size_in_seconds),
float(args.re_min_correlation_allowed),
float(args.re_max_mse_allowed))
if 'NO_RESULT' in pred['rule_code'].unique():
raise RuntimeError('there exist points that no replica number would meet the resource target, please consider setting a more reasonable resource target')
return pred
except estimator.EstimationException as e:
raise RuntimeError("replicas estimation failed, this may be caused by insufficient or irregular history data") from e
raise RuntimeError(f'replicas estimation failed, this may be caused by insufficient or irregular history data, detailed estimation info: {e.info}') from e


def merge_history_dict(history_dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ def preprocess_data(self):
df.sort_values(by=self.time_col, inplace=True)
df = df.reset_index(drop=True)

# scale resource to 0~100
resource_max = df[self.resource_col].max()
resource_scaling_factor = 1 if resource_max <= 100 else 10**np.ceil(np.log10(resource_max / 100))
self.logger.info(f'resource scaling factor: {resource_scaling_factor}')
df[self.resource_col] = df[self.resource_col] / resource_scaling_factor
self.resource_target = self.resource_target / resource_scaling_factor

features = self.traffic_cols

self.logger.info(f'checkout before filtering NaN: '
Expand Down Expand Up @@ -628,7 +635,12 @@ def bin2str(x):


class EstimationException(Exception):
pass
def __init__(self, message, info):
self.message = message
self.info = info

def __str__(self):
return self.message


def estimate(data: pd.DataFrame,
Expand All @@ -639,7 +651,9 @@ def estimate(data: pd.DataFrame,
traffic_cols: list[str],
resource_target: float,
time_delta_hours: int,
test_dataset_size_in_seconds: int = 86400) -> pd.DataFrame:
test_dataset_size_in_seconds: int = 86400,
min_correlation_allowed: float = 0.9,
max_mse_allowed: float = 10.0) -> pd.DataFrame:
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger()
Expand All @@ -660,19 +674,30 @@ def estimate(data: pd.DataFrame,
estimator.test()
logger.info(f'********* testing cost time: {time.time() - st10} *********')

if (estimator.pearsonr[0] >= 0.9 and estimator.pearsonr[1] < 0.01
and estimator.big_e_10 == 0 and estimator.mse < 10):
logger.info(f'********* [linear] correlation: {estimator.pearsonr[0]}, significance: {estimator.pearsonr[1]}, big_e_10: {estimator.big_e_10}, mse: {estimator.mse} *********')
logger.info(f'********* [residual] correlation: {estimator.pearsonr_rf[0]}, significance: {estimator.pearsonr_rf[1]}, big_e_10: {estimator.big_e_10_rf}, mse: {estimator.mse_rf} *********')

if (estimator.pearsonr[0] >= min_correlation_allowed and estimator.pearsonr[1] < 0.01
and estimator.big_e_10 == 0 and estimator.mse <= max_mse_allowed):
st10 = time.time()
estimator.policy_linear()
logger.info(f'********* linear policy cost time: {time.time() - st10} *********')
return estimator.output

elif (estimator.pearsonr_rf[0] >= 0.9 and estimator.pearsonr_rf[1] < 0.01 and estimator.big_e_10_rf == 0
and estimator.mse_rf < 10 and estimator.pearsonr[0] >= 0.6 and estimator.pearsonr[1] < 0.01):
elif (estimator.pearsonr_rf[0] >= min_correlation_allowed and estimator.pearsonr_rf[1] < 0.01 and estimator.big_e_10_rf == 0
and estimator.mse_rf <= max_mse_allowed and estimator.pearsonr[0] >= 0.6 and estimator.pearsonr[1] < 0.01):
st10 = time.time()
estimator.policy_residual()
logger.info(f'********* residual policy cost time: {time.time() - st10} *********')
return estimator.output

else:
raise EstimationException("no policy fits")
raise EstimationException('no policy fits',
{'linear': {'correlation': estimator.pearsonr[0],
'significance': estimator.pearsonr[1],
'big_e_10': estimator.big_e_10,
'mse': estimator.mse},
'residual': {'correlation': estimator.pearsonr_rf[0],
'significance': estimator.pearsonr_rf[1],
'big_e_10': estimator.big_e_10_rf,
'mse': estimator.mse_rf}})
Loading