Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPOT suggesting non-minimal configs and fixed log retriever #62

Merged
merged 5 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
453 changes: 453 additions & 0 deletions notebooks/explore.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions spot/Spot.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(self, config_dir: str, model: str):
self.config.function_name,
self.config.mem_size,
self.config.region,
self.db,
)
self.config_retriever = AWSConfigRetriever(self.config.function_name, self.db)
self.ml_model = self.select_model(model)
Expand Down
5 changes: 5 additions & 0 deletions spot/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def add_document_to_collection_if_not_exists(
if not collection.find_one(criteria):
collection.insert_one(document)

def remove_document_from_collection(self, function_name, collection_name, query):
function_db = self.client[function_name]
collection = function_db[collection_name]
collection.delete_one(query)

def add_new_config_if_changed(self, function_name, collection_name, document):
function_db = self.client[function_name]
collection = function_db[collection_name]
Expand Down
27 changes: 26 additions & 1 deletion spot/invocation/aws_function_invocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from spot.invocation.WorkloadChecker import CheckWorkloadValidity
from spot.invocation.EventGenerator import GenericEventGenerator
from spot.invocation.config_updater import ConfigUpdater
from spot.db.db import DBClient


class InvalidWorkloadFileException(Exception):
Expand All @@ -34,15 +35,22 @@ class AWSFunctionInvocator:
"""

def __init__(
self, workload_path: str, function_name: str, mem_size: int, region: str
self,
workload_path: str,
function_name: str,
mem_size: int,
region: str,
db: DBClient,
) -> None:
self._read_workload(workload_path)
self._workload_path: str = os.path.dirname(workload_path)
self._config = ConfigUpdater(function_name, mem_size, region)
self._config.set_mem_size(mem_size)
self._all_events, _ = GenericEventGenerator(self._workload)
self.function_name = function_name
self._futures = []
self._thread = []
self.DBClient = db
self.invoke_cnt = 0

def _read_workload(self, path: str) -> None:
Expand Down Expand Up @@ -106,6 +114,7 @@ def invoke_all(self, mem: int = -1) -> None:
"""Invoke the function with user specified inputs and parameters asynchronously"""
self.invoke_cnt = 0
self._threads = []
request_ids = []
for (instance, instance_times) in self._all_events.items():
self._config.set_instance(
self._workload["instances"][instance]["application"]
Expand All @@ -119,3 +128,19 @@ def invoke_all(self, mem: int = -1) -> None:
thread.join()
for future in self._futures:
res = future.result()
req_id = res["ResponseMetadata"]["RequestId"]
status = res["StatusCode"]
error = False
if status < 200 or status >= 300:
print(f"WARNING: Status code {status} for request id {req_id}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using the logging module for warnings

if "FunctionError" in res:
error = True
print(
"WARNING: Function error for request id {req_id}. The memory configuration being used may be too low"
)
print(res["FunctionError"])
request_ids.append({"_id": req_id, "status": status, "error": error})
for request in request_ids:
self.DBClient.add_document_to_collection_if_not_exists(
self.function_name, "requests", request, {"_id": request["_id"]}
)
36 changes: 29 additions & 7 deletions spot/logs/aws_log_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,33 @@ def get_logs(self):
client = boto3.client("logs")
new_timestamp = self.last_log_timestamp

# TODO: determine how many log streams to read, boto3 client by default returns 50
# get log streams
streams = []
response = client.describe_log_streams(
logGroupName=path, orderBy="LastEventTime", descending=True
)
for stream in response["logStreams"]:
if stream["lastEventTimestamp"] > self.last_log_timestamp:
streams.append((stream["logStreamName"]))
next_token = ""
new_logs = True
while new_logs:
response = (
client.describe_log_streams(
logGroupName=path,
orderBy="LastEventTime",
descending=True,
nextToken=next_token,
)
if next_token != ""
else client.describe_log_streams(
logGroupName=path, orderBy="LastEventTime", descending=True
)
)

for stream in response["logStreams"]:
if stream["lastEventTimestamp"] > self.last_log_timestamp:
streams.append((stream["logStreamName"]))
else:
new_logs = False
break
if next_token == response["nextToken"] or response["nextToken"] == "":
break
next_token = response["nextToken"]

# get log events and save it to DB
for stream in streams:
Expand Down Expand Up @@ -52,6 +70,10 @@ def get_logs(self):
self.DBClient.add_document_to_collection_if_not_exists(
self.function_name, DB_NAME_LOGS, log, {REQUEST_ID: requestId}
)
# Remove from request db if invoked using invocator to confirm all invoked logs present
self.DBClient.remove_document_from_collection(
self.function_name, "requests", {"_id": log[REQUEST_ID]}
)

return new_timestamp

Expand Down
2 changes: 0 additions & 2 deletions spot/mlModel/ml_model_base_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def _fetch_configs(self):
{
RUNTIME: 1,
TIMEOUT: 1,
MEM_SIZE: 1,
ARCH: 1,
LAST_MODIFIED_MS: 1,
DB_ID: 0,
Expand Down Expand Up @@ -136,7 +135,6 @@ def _associate_logs_with_config_and_pricing(self):
* float(int(log[MEM_SIZE]) / 128)
)

# self._df = self._df.append(new_row, ignore_index=True)
self._df = pd.concat(
[self._df, pd.DataFrame.from_records([new_row])], ignore_index=True
)
Expand Down
22 changes: 17 additions & 5 deletions spot/mlModel/polynomial_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(
last_log_timestamp: int,
benchmark_dir: str,
mem_bounds: list,
polynomial_degree=2,
polynomial_degree=3,
):
super().__init__(function_name, vendor, db, last_log_timestamp)
self._degree = polynomial_degree
Expand All @@ -43,6 +43,10 @@ def __init__(

def _preprocess(self):
self._df[MEM_SIZE] = self._df[MEM_SIZE].astype(int)
self._df = self._df[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite important addition!

(self._df[MEM_SIZE] >= self.mem_bounds[0])
& (self._df[MEM_SIZE] <= self.mem_bounds[1])
]
X_mem = self._df[MEM_SIZE].values
y = self._df[COST].values
X_labels = np.unique(X_mem)
Expand All @@ -68,7 +72,9 @@ def train_model(self):
print("No data available to train the model")
exit()

self._model = np.polyfit(self._x, self._y, self._degree)
self._model, self._residual, _, _, _ = np.polyfit(
self._x, self._y, self._degree, full=True
)
self._save_model()

"""
Expand All @@ -92,7 +98,7 @@ def plot_memsize_vs_cost(self):
plt.scatter(self._x, self._y)

# Add linear regression line
xvars = np.linspace(128, 10240, 1024)
xvars = np.linspace(self._x.min(), self._x.max(), 1024)
plt.plot(
xvars,
np.polyval(self._model, xvars),
Expand All @@ -102,9 +108,15 @@ def plot_memsize_vs_cost(self):

# Get optimal config
x_min, y_min = self.get_optimal_config()
mem_recommend = int(round(x_min, 0))
rsme = self._residual

print(f"Minimum cost of {y_min} found at {mem_recommend} MB")
print(f"Residual: {rsme}")

# Plot best mem size, data points and polynomial regression fit
plt.plot(x_min, y_min, "x")
plt.text(x_min, y_min, f"{mem_recommend} MB", fontweight=700)
plt.legend()
plt.show()

Expand All @@ -124,9 +136,9 @@ def plot_memsize_vs_cost(self):

def get_polynomial_equation_string(self):
ret_val = ""
for degree in range(len(self._model) - 1, 0, -1):
for degree in range(len(self._model) - 1, -1, -1):
if degree == 0:
ret_val += str(self._model[degree])
ret_val += str("{:.2E}".format(self._model[degree]))
else:
ret_val += (
str("{:.2E}".format(self._model[degree]))
Expand Down
2 changes: 1 addition & 1 deletion spot/recommendation_engine/recommendation_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, config_file_path, config, model, db, benchmark_dir):
def recommend(self):
self.x_min, self.y_min = self._model.get_optimal_config()
print("Best memory config: ", self.x_min, " ", "Cost: ", self.y_min)
return self.x_min
return round(self.x_min, 0)

def get_pred_cost(self):
return self.y_min
Expand Down
6 changes: 3 additions & 3 deletions spot/serverless_functions/ChromeScreenshot/config.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"function_name": "screenshot-service-dev-capture",
"mem_bounds": [
128,
10280
256,
8192
],
"mem_size": 256,
"mem_size": 512,
"region": "us-east-2",
"vendor": "AWS",
"workload": {
Expand Down
28 changes: 16 additions & 12 deletions spot/serverless_functions/aes/config.json
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
{
"function_name": "pyaes",
"vendor": "AWS",
"region": "us-east-2",
"mem_bounds": [
128,
1024
],
"mem_size": 128,
"region": "us-east-2",
"vendor": "AWS",
"workload": {
"test_name": "IntegrationTest1",
"test_duration_in_seconds": 15,
"random_seed": 100,
"blocking_cli": false,
"instances": {
"instance1": {
"application": "pyaes",
"distribution": "Poisson",
"rate": 5,
"activity_window": [
5,
10
],
"payload": "payload.json",
"application": "pyaes",
"distribution": "Poisson",
"host": "x8siu0es68.execute-api.us-east-2.amazonaws.com",
"stage": "default",
"resource": "/pyaes?format=json"
"payload": "payload.json",
"rate": 5,
"resource": "/pyaes?format=json",
"stage": "default"
}
}
},
"random_seed": 100,
"test_duration_in_seconds": 15,
"test_name": "IntegrationTest1"
}
}
28 changes: 16 additions & 12 deletions spot/serverless_functions/img_proc/config.json
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
{
"function_name": "image_processing",
"vendor": "AWS",
"mem_bounds": [
256,
10280
],
"mem_size": 128,
"region": "us-east-2",
"mem_size": 512,
"vendor": "AWS",
"workload": {
"test_name": "IntegrationTest1",
"test_duration_in_seconds": 5,
"random_seed": 100,
"blocking_cli": false,
"instances": {
"instance1": {
"application": "image_processing",
"distribution": "Poisson",
"rate": 1,
"activity_window": [
1,
5
],
"payload": "payload.json",
"application": "image_processing",
"distribution": "Poisson",
"host": "ts7p12m27c.execute-api.us-east-2.amazonaws.com",
"stage": "default",
"resource": "/image_processing?format=json"
"payload": "payload.json",
"rate": 1,
"resource": "/image_processing?format=json",
"stage": "default"
}
}
},
"random_seed": 100,
"test_duration_in_seconds": 5,
"test_name": "IntegrationTest1"
}
}