Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Remove update_event as a single event loop for the system #160

Merged
merged 42 commits into from
Oct 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1853270
stop automatically queueing objects for work
demoray Oct 15, 2020
f0d74c7
Merge branch 'main' into drop-queues
bmc-msft Oct 15, 2020
30d8deb
actuate state machines directly instead of via queues
demoray Oct 15, 2020
e1762a4
Merge branch 'main' into remove-queues-by-default
bmc-msft Oct 15, 2020
9a1a917
rename testquery to stop warning for ORM tests
demoray Oct 15, 2020
cfee27b
move more job/task processing into timer_tasks
demoray Oct 15, 2020
7f6ed29
get backported typing extensions
demoray Oct 15, 2020
a07d1a5
split out more things into individual queues
demoray Oct 15, 2020
144021b
Merge remote-tracking branch 'upstream/main' into remove-queues-by-de…
demoray Oct 15, 2020
ed2173d
remove schedule_tasks
demoray Oct 15, 2020
808c6b8
remove queue updates that have been split off
demoray Oct 15, 2020
ecde82d
rename timer_workers to timer_scalesets
demoray Oct 15, 2020
8320020
merge autoscaling function into the worker function
demoray Oct 15, 2020
65e11ed
move autoscale into separate module still called by scaleset code
demoray Oct 15, 2020
01418dd
speed up processing repro
demoray Oct 15, 2020
c1e718a
ensure repro VMs get shut down
demoray Oct 15, 2020
9b9d3a1
handle future queued stop
demoray Oct 15, 2020
ef43ddb
add change from #137
demoray Oct 15, 2020
e5a8b44
in, not ==
demoray Oct 15, 2020
49c354a
change from #158
demoray Oct 15, 2020
3032ec3
update linting
demoray Oct 15, 2020
0031d23
Merge branch 'main' into remove-queues-by-default
bmc-msft Oct 15, 2020
48cc55c
pull Protocol from typing_extensions
demoray Oct 15, 2020
88274f3
fix lint
demoray Oct 15, 2020
8c352d6
Merge branch 'main' into remove-queues-by-default
bmc-msft Oct 16, 2020
20865d3
handle pools not ready before scheduling work
demoray Oct 16, 2020
d04d020
lint
demoray Oct 16, 2020
093ccb7
lint
demoray Oct 16, 2020
714b88b
only start to image a node once
demoray Oct 16, 2020
8802738
Merge branch 'main' into remove-queues-by-default
bmc-msft Oct 16, 2020
65781d1
Merge branch 'main' into remove-queues-by-default
bmc-msft Oct 16, 2020
940645e
Merge branch 'main' into remove-queues-by-default
bmc-msft Oct 16, 2020
ff8ecef
re-add copyright statement
demoray Oct 16, 2020
9677a53
Merge branch 'remove-queues-by-default' of github.com:bmc-msft/onefuz…
demoray Oct 16, 2020
5229fca
add comments as to what this *should* be bound to
demoray Oct 16, 2020
68d710e
address comments
demoray Oct 16, 2020
f03c87a
fix typo
demoray Oct 16, 2020
02d4e81
indicate why the task/job is stopping
demoray Oct 16, 2020
6de9458
cleanup message
demoray Oct 16, 2020
183f511
Merge branch 'main' into remove-queues-by-default
bmc-msft Oct 16, 2020
8e51afc
actually execute the function
demoray Oct 16, 2020
07d6e2c
Merge branch 'main' into remove-queues-by-default
bmc-msft Oct 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/api-service/__app__/agent_registration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
node.version = registration_request.version
node.reimage_requested = False
node.state = NodeState.init
node.reimage_queued = False
else:
node = Node(
pool_name=registration_request.pool_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,151 +1,148 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
# Licensed under the MIT License.

import logging
import math
from typing import List

import azure.functions as func
from onefuzztypes.enums import NodeState, PoolState, ScalesetState
from onefuzztypes.models import AutoScaleConfig, TaskPool

from ..onefuzzlib.pools import Node, Pool, Scaleset
from ..onefuzzlib.tasks.main import Task


def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None:
logging.info("Scaling up")
autoscale_config = pool.autoscale
if not isinstance(autoscale_config, AutoScaleConfig):
return

for scaleset in scalesets:
if scaleset.state == ScalesetState.running:

max_size = min(scaleset.max_size(), autoscale_config.scaleset_size)
logging.info(
"Sacleset id: %s, Scaleset size: %d, max_size: %d"
% (scaleset.scaleset_id, scaleset.size, max_size)
)
if scaleset.size < max_size:
current_size = scaleset.size
if nodes_needed <= max_size - current_size:
scaleset.size = current_size + nodes_needed
nodes_needed = 0
else:
scaleset.size = max_size
nodes_needed = nodes_needed - (max_size - current_size)
scaleset.state = ScalesetState.resize
scaleset.save()

else:
continue

if nodes_needed == 0:
return

for _ in range(
math.ceil(
nodes_needed
/ min(
Scaleset.scaleset_max_size(autoscale_config.image),
autoscale_config.scaleset_size,
)
)
):
logging.info("Creating Scaleset for Pool %s" % (pool.name))
max_nodes_scaleset = min(
Scaleset.scaleset_max_size(autoscale_config.image),
autoscale_config.scaleset_size,
nodes_needed,
)

if not autoscale_config.region:
raise Exception("Region is missing")

scaleset = Scaleset.create(
pool_name=pool.name,
vm_sku=autoscale_config.vm_sku,
image=autoscale_config.image,
region=autoscale_config.region,
size=max_nodes_scaleset,
spot_instances=autoscale_config.spot_instances,
tags={"pool": pool.name},
)
scaleset.save()
nodes_needed -= max_nodes_scaleset


def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None:
logging.info("Scaling down")
for scaleset in scalesets:
nodes = Node.search_states(
scaleset_id=scaleset.scaleset_id, states=[NodeState.free]
)
if nodes and nodes_to_remove > 0:
max_nodes_remove = min(len(nodes), nodes_to_remove)
if max_nodes_remove >= scaleset.size and len(nodes) == scaleset.size:
scaleset.state = ScalesetState.shutdown
nodes_to_remove = nodes_to_remove - scaleset.size
scaleset.save()
for node in nodes:
node.set_shutdown()
continue

scaleset.size = scaleset.size - max_nodes_remove
nodes_to_remove = nodes_to_remove - max_nodes_remove
scaleset.state = ScalesetState.resize
scaleset.save()


def get_vm_count(tasks: List[Task]) -> int:
count = 0
for task in tasks:
task_pool = task.get_pool()
if (
not task_pool
or not isinstance(task_pool, Pool)
or not isinstance(task.config.pool, TaskPool)
):
continue
count += task.config.pool.count
return count


def main(mytimer: func.TimerRequest) -> None: # noqa: F841
pools = Pool.search_states(states=PoolState.available())
for pool in pools:
logging.info("autoscale: %s" % (pool.autoscale))
if not pool.autoscale:
continue

# get all the tasks (count not stopped) for the pool
tasks = Task.get_tasks_by_pool_name(pool.name)
logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks)))

num_of_tasks = get_vm_count(tasks)
nodes_needed = max(num_of_tasks, pool.autoscale.min_size)
if pool.autoscale.max_size:
nodes_needed = min(nodes_needed, pool.autoscale.max_size)

# do scaleset logic match with pool
# get all the scalesets for the pool
scalesets = Scaleset.search_by_pool(pool.name)
pool_resize = False
for scaleset in scalesets:
if scaleset.state in ScalesetState.modifying():
pool_resize = True
break
nodes_needed = nodes_needed - scaleset.size

if pool_resize:
continue

logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed))
if nodes_needed > 0:
# resizing scaleset or creating new scaleset.
scale_up(pool, scalesets, nodes_needed)
elif nodes_needed < 0:
scale_down(scalesets, abs(nodes_needed))
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import logging
import math
from typing import List

from onefuzztypes.enums import NodeState, ScalesetState
from onefuzztypes.models import AutoScaleConfig, TaskPool

from .pools import Node, Pool, Scaleset
from .tasks.main import Task


def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None:
logging.info("Scaling up")
autoscale_config = pool.autoscale
if not isinstance(autoscale_config, AutoScaleConfig):
return

for scaleset in scalesets:
if scaleset.state in [ScalesetState.running, ScalesetState.resize]:

max_size = min(scaleset.max_size(), autoscale_config.scaleset_size)
logging.info(
"scaleset:%s size:%d max_size:%d"
% (scaleset.scaleset_id, scaleset.size, max_size)
)
if scaleset.size < max_size:
current_size = scaleset.size
if nodes_needed <= max_size - current_size:
scaleset.size = current_size + nodes_needed
nodes_needed = 0
else:
scaleset.size = max_size
nodes_needed = nodes_needed - (max_size - current_size)
scaleset.state = ScalesetState.resize
scaleset.save()

else:
continue

if nodes_needed == 0:
return

for _ in range(
math.ceil(
nodes_needed
/ min(
Scaleset.scaleset_max_size(autoscale_config.image),
autoscale_config.scaleset_size,
)
)
):
logging.info("Creating Scaleset for Pool %s" % (pool.name))
max_nodes_scaleset = min(
Scaleset.scaleset_max_size(autoscale_config.image),
autoscale_config.scaleset_size,
nodes_needed,
)

if not autoscale_config.region:
raise Exception("Region is missing")

scaleset = Scaleset.create(
pool_name=pool.name,
vm_sku=autoscale_config.vm_sku,
image=autoscale_config.image,
region=autoscale_config.region,
size=max_nodes_scaleset,
spot_instances=autoscale_config.spot_instances,
tags={"pool": pool.name},
)
scaleset.save()
nodes_needed -= max_nodes_scaleset


def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None:
logging.info("Scaling down")
for scaleset in scalesets:
nodes = Node.search_states(
scaleset_id=scaleset.scaleset_id, states=[NodeState.free]
)
if nodes and nodes_to_remove > 0:
max_nodes_remove = min(len(nodes), nodes_to_remove)
if max_nodes_remove >= scaleset.size and len(nodes) == scaleset.size:
scaleset.state = ScalesetState.shutdown
nodes_to_remove = nodes_to_remove - scaleset.size
scaleset.save()
for node in nodes:
node.set_shutdown()
continue

scaleset.size = scaleset.size - max_nodes_remove
nodes_to_remove = nodes_to_remove - max_nodes_remove
scaleset.state = ScalesetState.resize
scaleset.save()


def get_vm_count(tasks: List[Task]) -> int:
count = 0
for task in tasks:
task_pool = task.get_pool()
if (
not task_pool
or not isinstance(task_pool, Pool)
or not isinstance(task.config.pool, TaskPool)
):
continue
count += task.config.pool.count
return count


def autoscale_pool(pool: Pool) -> None:
logging.info("autoscale: %s" % (pool.autoscale))
if not pool.autoscale:
return

# get all the tasks (count not stopped) for the pool
tasks = Task.get_tasks_by_pool_name(pool.name)
logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks)))

num_of_tasks = get_vm_count(tasks)
nodes_needed = max(num_of_tasks, pool.autoscale.min_size)
if pool.autoscale.max_size:
nodes_needed = min(nodes_needed, pool.autoscale.max_size)

# do scaleset logic match with pool
# get all the scalesets for the pool
scalesets = Scaleset.search_by_pool(pool.name)
pool_resize = False
for scaleset in scalesets:
if scaleset.state in ScalesetState.modifying():
pool_resize = True
break
nodes_needed = nodes_needed - scaleset.size

if pool_resize:
return

logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed))
if nodes_needed > 0:
# resizing scaleset or creating new scaleset.
scale_up(pool, scalesets, nodes_needed)
elif nodes_needed < 0:
scale_down(scalesets, abs(nodes_needed))
3 changes: 0 additions & 3 deletions src/api-service/__app__/onefuzzlib/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ def stopping(self) -> None:
self.state = JobState.stopped
self.save()

def queue_stop(self) -> None:
self.queue(method=self.stopping)

def on_start(self) -> None:
# try to keep this effectively idempotent
if self.end_time is None:
Expand Down
31 changes: 31 additions & 0 deletions src/api-service/__app__/onefuzzlib/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from onefuzztypes.models import Error
from onefuzztypes.primitives import Container, PoolName, Region
from pydantic import BaseModel, Field
from typing_extensions import Protocol

from .azure.table import get_client
from .dashboard import add_event
Expand Down Expand Up @@ -66,6 +67,36 @@
HOURS = 60 * 60


class HasState(Protocol):
# TODO: this should be bound tighter than Any
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
# In the end, we want this to be an Enum. Specifically, one of
# the JobState,TaskState,etc enums.
state: Any


def process_state_update(obj: HasState) -> None:
"""
process a single state update, if the obj
implements a function for that state
"""

func = getattr(obj, obj.state.name, None)
if func is None:
return
func()


def process_state_updates(obj: HasState, max_updates: int = 5) -> None:
""" process through the state machine for an object """

for _ in range(max_updates):
state = obj.state
process_state_update(obj)
new_state = obj.state
if new_state == state:
break


def resolve(key: KEY) -> str:
if isinstance(key, str):
return key
Expand Down
Loading