Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean up some unnecessary codes #99

Merged
merged 4 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
pandas==1.4.0
pymongo==4.0.1
matplotlib==3.5.1
requests==2.27.1
requests_futures==1.0.0
boto3==1.20.44
sklearn
numpy==1.23.4
lmfit==1.0.3
scipy==1.9.3
scipy==1.9.3
164 changes: 36 additions & 128 deletions spot/Spot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,27 @@
import time
import os
import numpy as np
import pickle
from datetime import datetime
from spot.prices.aws_price_retriever import AWSPriceRetriever
from spot.logs.aws_log_retriever import AWSLogRetriever
from spot.invocation.aws_function_invocator import AWSFunctionInvocator
from spot.invocation.aws_lambda_invoker import AWSLambdaInvoker
from spot.configs.aws_config_retriever import AWSConfigRetriever
from spot.mlModel.linear_regression import LinearRegressionModel
from spot.invocation.config_updater import ConfigUpdater
from spot.db.db import DBClient
from spot.context import Context
from spot.benchmark_config import BenchmarkConfig
from spot.constants import ROOT_DIR
from spot.visualize.Plot import Plot
from spot.recommendation_engine.recommendation_engine import RecommendationEngine
from spot.constants import *
from spot.mlModel.polynomial_regression import PolynomialRegressionModel
from spot.logs.log_propagation_waiter import LogPropagationWaiter


class Spot:
def __init__(self, config_dir: str, model: str):
def __init__(self, config_dir: str, aws_session):
# Load configuration values from config.json
self.config: BenchmarkConfig
self.path: str = config_dir
self.workload_file_path = os.path.join(self.path, "workload.json")
self.config_file_path = os.path.join(self.path, "config.json")
self.db = DBClient()

# TODO: implement checkpoint & restore on Context (loading from pickle?).
self.ctx = Context()

with open(self.config_file_path) as f:
self.config = BenchmarkConfig()
Expand All @@ -37,133 +32,46 @@ def __init__(self, config_dir: str, model: str):
json.dump(self.config.workload, json_file, indent=4)

self.benchmark_dir = self.path
self.log_prop_waiter = LogPropagationWaiter(self.config.function_name)

try:
self.last_log_timestamp = self.db.execute_max_value(
self.config.function_name, DB_NAME_LOGS, "timestamp"
)
except:
print(
"No data for the serverless function found yet. Setting last timestamp for the serverless function to 0.",
)
self.last_log_timestamp = 0
# try:
# self.last_log_timestamp = self.ctx.execute_max_value(
# self.config.function_name, DB_NAME_LOGS, "timestamp"
# )
# except:
# print(
# "No data for the serverless function found yet. Setting last timestamp for the serverless function to 0.",
# )
# self.last_log_timestamp = None
self.last_log_timestamp = None

# Create function db if not exists
self.db.create_function_db(self.config.function_name)
self.ctx.create_function_df(self.config.function_name)

# Instantiate SPOT system components
self.price_retriever = AWSPriceRetriever(self.db, self.config.region)
self.log_retriever = AWSLogRetriever(self.config.function_name)
# self.function_invocator = AWSFunctionInvocator(
# self.workload_file_path,
# self.config.function_name,
# self.config.mem_size,
# self.config.region,
# self.db,
# )
self.function_invoker = AWSLambdaInvoker(lambda_name=self.config.function_name)
self.config_retriever = AWSConfigRetriever(self.config.function_name, self.db)
self.sampler = RecommendationEngine(
self.function_invoker, self.workload_file_path, self.config.workload
self.price_retriever = AWSPriceRetriever(self.ctx, self.config.region)
self.log_retriever = AWSLogRetriever(
self.ctx, aws_session, self.config.function_name
)
function_invoker = AWSLambdaInvoker(
self.ctx, aws_session, self.config.function_name
)
self.recommendation_engine = RecommendationEngine(
function_invoker, self.workload_file_path, self.config.workload
)
# self.ml_model = self.select_model(model)
# self.recommendation_engine = RecommendationEngine(
# self.config_file_path,
# self.config,
# self.ml_model,
# self.db,
# self.benchmark_dir,
# )

def invoke(self):
# fetch configs and most up to date prices
self.config_retriever.get_latest_config()
self.price_retriever.fetch_current_pricing()

# invoke function
start = datetime.now().timestamp()
self.function_invocator.invoke_all()
self.log_prop_waiter.wait_by_count(start, self.function_invocator.invoke_cnt)

def optimize(self):
self.sampler.run()
self.recommendation_engine.run()

def collect_data(self):
# retrieve latest config, logs, pricing scheme
self.config_retriever.get_latest_config()
self.price_retriever.fetch_current_pricing()
# FIXME: now AWSLogRetriever::get_logs returns a pandas DataFrame.
self.last_log_timestamp = self.log_retriever.get_logs()

def train_model(self):
self.ml_model.fetch_data()
self.ml_model.train_model()

def select_model(self, model):
if model == "LinearRegression":
return LinearRegressionModel(
self.config.function_name,
self.config.vendor,
self.db,
self.last_log_timestamp,
self.benchmark_dir,
)
if model == "polynomial":
return PolynomialRegressionModel(
self.config.function_name,
self.config.vendor,
self.db,
self.last_log_timestamp,
self.benchmark_dir,
self.config.mem_bounds,
)
self.last_log_timestamp = self.log_retriever.get_logs(self.last_log_timestamp)

# Runs the workload with different configs to profile the serverless function
def profile(self):
mem_size = self.config.mem_bounds[0]
start = datetime.now().timestamp()
invoke_cnt = 0
while mem_size <= self.config.mem_bounds[1]:
print("Invoking sample workload with mem_size: ", mem_size)
# fetch configs and most up to date prices
self.config_retriever.get_latest_config()
self.price_retriever.fetch_current_pricing()
self.function_invocator.invoke_all(mem_size)
invoke_cnt += self.function_invocator.invoke_cnt
mem_size *= 2
self.log_prop_waiter.wait_by_count(start, invoke_cnt)
def invoke(self, memory_mb):
self.recommendation_engine.invoke_once(memory_mb)

def update_config(self):
self.recommendation_engine.update_config()

def plot_error_vs_epoch(self):
self.recommendation_engine.plot_error_vs_epoch()

def plot_config_vs_epoch(self):
self.recommendation_engine.plot_config_vs_epoch()

def plot_memsize_vs_cost(self):
self.ml_model.plot_memsize_vs_cost()

def recommend(self):
self.recommendation = self.recommendation_engine.recommend()

def get_prediction_error_rate(self):
# TODO: ensure it's called after update_config, or ensure memory is updated in invoke()
self.invoke()
self.collect_data()

log_cnt = self.function_invocator.invoke_cnt
self.ml_model.fetch_data(log_cnt)

# only take the last few because _df may have already contain data
costs = self.ml_model._df["Cost"].values[-log_cnt:]
pred = self.recommendation_engine.get_pred_cost()
err = sum([(cost - pred) ** 2 for cost in costs]) / len(costs)
print(f"{err=}")
self.db.add_document_to_collection(
self.config.function_name, DB_NAME_ERROR, {ERR_VAL: err}
)
self.recommendation_engine.plot_config_vs_epoch()
self.recommendation_engine.plot_error_vs_epoch()
def teardown(self):
# Just saving the Context for now.
os.makedirs(CTX_DIR, exist_ok=True)
ctx_file = os.path.join(CTX_DIR, f"{int(time.time() * 1000)}.pkl")
with open(ctx_file, "wb") as f:
pickle.dump(self.ctx, f)
joehattori marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 1 addition & 12 deletions spot/configs/aws_config_retriever.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import boto3
from spot.db.db import DBClient
import datetime


class AWSConfigRetriever:
def __init__(self, function_name, db: DBClient):
self.DBClient = db
def __init__(self, function_name):
self.function_name = function_name

def get_latest_config(self):
Expand All @@ -17,13 +15,4 @@ def get_latest_config(self):
)
last_modified_ms = int(last_modified.timestamp() * 1000)
config["LastModifiedInMs"] = int(last_modified_ms)

config["Architectures"] = config["Architectures"][0]
self.DBClient.add_new_config_if_changed(self.function_name, "config", config)

def print_configs(self):
iterator = self.DBClient.get_all_collection_documents(
self.function_name, "config"
)
for config in iterator:
print(config)
5 changes: 1 addition & 4 deletions spot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
DURATION_PRICE = "duration_price"
REQUEST_PRICE = "request_price"

DB_NAME_PRICING = "pricing"
DB_NAME_CONFIG = "config"
DB_NAME_LOGS = "logs"
# TODO: maybe put config prediction and error into the same database?
DB_NAME_RECOMMENDATION = "recommendation"
Expand All @@ -26,8 +24,7 @@
DB_ID = "_id"

ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(ROOT_DIR, "data")

CTX_DIR = os.path.join(ROOT_DIR, "__context_cache__")

SAMPLE_POINTS = [128, 2048]
MEMORY_RANGE = [128, 3008]
Expand Down
25 changes: 25 additions & 0 deletions spot/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import subprocess
import json
import pandas as pd
import numpy as np

from spot.constants import *


class Context:
def __init__(self):
self.function_dfs = {}
self.pricing_df = pd.DataFrame()

# Creates database for the function name if the doesnt exist already
def create_function_df(self, function_name):
self.function_dfs[function_name] = pd.DataFrame()

def save_invokation_result(self, function_name, result_df):
old = self.function_dfs.get(function_name, pd.DataFrame())
self.function_dfs[function_name] = pd.concat([old, result_df])

def record_pricing(self, row):
df = pd.DataFrame(row)
self.pricing_df = pd.concat([self.pricing_df, df])
3 changes: 0 additions & 3 deletions spot/db/README.md

This file was deleted.

89 changes: 0 additions & 89 deletions spot/db/db.py

This file was deleted.

Loading