Skip to content

Commit

Permalink
Remove update_event as a single event loop for the system (microsoft#160
Browse files Browse the repository at this point in the history
)
  • Loading branch information
anshuman-goel committed Oct 19, 2020
1 parent fa48ca2 commit dc37c86
Show file tree
Hide file tree
Showing 24 changed files with 308 additions and 219 deletions.
1 change: 1 addition & 0 deletions src/api-service/__app__/agent_registration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
node.version = registration_request.version
node.reimage_requested = False
node.state = NodeState.init
node.reimage_queued = False
else:
node = Node(
pool_name=registration_request.pool_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import logging
import math
from typing import List

import azure.functions as func
from onefuzztypes.enums import NodeState, PoolState, ScalesetState
from onefuzztypes.enums import NodeState, ScalesetState
from onefuzztypes.models import AutoScaleConfig, TaskPool

from ..onefuzzlib.pools import Node, Pool, Scaleset
from ..onefuzzlib.tasks.main import Task
from .pools import Node, Pool, Scaleset
from .tasks.main import Task


def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None:
Expand All @@ -22,11 +16,11 @@ def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None:
return

for scaleset in scalesets:
if scaleset.state == ScalesetState.running:
if scaleset.state in [ScalesetState.running, ScalesetState.resize]:

max_size = min(scaleset.max_size(), autoscale_config.scaleset_size)
logging.info(
"Sacleset id: %s, Scaleset size: %d, max_size: %d"
"scaleset:%s size:%d max_size:%d"
% (scaleset.scaleset_id, scaleset.size, max_size)
)
if scaleset.size < max_size:
Expand Down Expand Up @@ -134,44 +128,42 @@ def get_vm_count(tasks: List[Task]) -> int:
return count


def main(mytimer: func.TimerRequest) -> None: # noqa: F841
pools = Pool.search_states(states=PoolState.available())
for pool in pools:
logging.info("autoscale: %s" % (pool.autoscale))
if not pool.autoscale:
continue
def autoscale_pool(pool: Pool) -> None:
logging.info("autoscale: %s" % (pool.autoscale))
if not pool.autoscale:
return

# get all the tasks (count not stopped) for the pool
tasks = Task.get_tasks_by_pool_name(pool.name)
logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks)))
# get all the tasks (count not stopped) for the pool
tasks = Task.get_tasks_by_pool_name(pool.name)
logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks)))

num_of_tasks = get_vm_count(tasks)
nodes_needed = max(num_of_tasks, pool.autoscale.min_size)
if pool.autoscale.max_size:
nodes_needed = min(nodes_needed, pool.autoscale.max_size)
num_of_tasks = get_vm_count(tasks)
nodes_needed = max(num_of_tasks, pool.autoscale.min_size)
if pool.autoscale.max_size:
nodes_needed = min(nodes_needed, pool.autoscale.max_size)

# do scaleset logic match with pool
# get all the scalesets for the pool
scalesets = Scaleset.search_by_pool(pool.name)
pool_resize = False
for scaleset in scalesets:
if scaleset.state in ScalesetState.modifying():
pool_resize = True
break
nodes_needed = nodes_needed - scaleset.size
# do scaleset logic match with pool
# get all the scalesets for the pool
scalesets = Scaleset.search_by_pool(pool.name)
pool_resize = False
for scaleset in scalesets:
if scaleset.state in ScalesetState.modifying():
pool_resize = True
break
nodes_needed = nodes_needed - scaleset.size

if pool_resize:
continue
if pool_resize:
return

logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed))
if nodes_needed > 0:
# resizing scaleset or creating new scaleset.
scale_up(pool, scalesets, nodes_needed)
elif nodes_needed < 0:
for scaleset in scalesets:
nodes = Node.search_states(scaleset_id=scaleset.scaleset_id)
for node in nodes:
if node.delete_requested:
nodes_needed += 1
if nodes_needed < 0:
scale_down(scalesets, abs(nodes_needed))
logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed))
if nodes_needed > 0:
# resizing scaleset or creating new scaleset.
scale_up(pool, scalesets, nodes_needed)
elif nodes_needed < 0:
for scaleset in scalesets:
nodes = Node.search_states(scaleset_id=scaleset.scaleset_id)
for node in nodes:
if node.delete_requested:
nodes_needed += 1
if nodes_needed < 0:
scale_down(scalesets, abs(nodes_needed))
3 changes: 0 additions & 3 deletions src/api-service/__app__/onefuzzlib/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ def stopping(self) -> None:
self.state = JobState.stopped
self.save()

def queue_stop(self) -> None:
self.queue(method=self.stopping)

def on_start(self) -> None:
# try to keep this effectively idempotent
if self.end_time is None:
Expand Down
31 changes: 31 additions & 0 deletions src/api-service/__app__/onefuzzlib/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from onefuzztypes.models import Error
from onefuzztypes.primitives import Container, PoolName, Region
from pydantic import BaseModel, Field
from typing_extensions import Protocol

from .azure.table import get_client
from .dashboard import add_event
Expand Down Expand Up @@ -66,6 +67,36 @@
HOURS = 60 * 60


class HasState(Protocol):
# TODO: this should be bound tighter than Any
# In the end, we want this to be an Enum. Specifically, one of
# the JobState,TaskState,etc enums.
state: Any


def process_state_update(obj: HasState) -> None:
"""
process a single state update, if the obj
implements a function for that state
"""

func = getattr(obj, obj.state.name, None)
if func is None:
return
func()


def process_state_updates(obj: HasState, max_updates: int = 5) -> None:
""" process through the state machine for an object """

for _ in range(max_updates):
state = obj.state
process_state_update(obj)
new_state = obj.state
if new_state == state:
break


def resolve(key: KEY) -> str:
if isinstance(key, str):
return key
Expand Down
61 changes: 42 additions & 19 deletions src/api-service/__app__/onefuzzlib/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@


class Node(BASE_NODE, ORMMixin):
# should only be set by Scaleset.reimage_nodes
# should only be unset during agent_registration POST
reimage_queued: bool = Field(default=False)

@classmethod
def search_states(
cls,
Expand Down Expand Up @@ -108,6 +112,21 @@ def search_outdated(
version_query = "not (version eq '%s')" % __version__
return cls.search(query=query, raw_unchecked_filter=version_query)

@classmethod
def mark_outdated_nodes(cls) -> None:
outdated = cls.search_outdated()
for node in outdated:
logging.info(
"node is outdated: %s - node_version:%s api_version:%s",
node.machine_id,
node.version,
__version__,
)
if node.version == "1.0.0":
node.to_reimage(done=True)
else:
node.to_reimage()

@classmethod
def get_by_machine_id(cls, machine_id: UUID) -> Optional["Node"]:
nodes = cls.search(query={"machine_id": [machine_id]})
Expand Down Expand Up @@ -195,9 +214,24 @@ def can_process_new_work(self) -> bool:
self.stop()
return False

if self.delete_requested or self.reimage_requested:
if self.state in NodeState.ready_for_reset():
logging.info(
"can_schedule node is set for reset. machine_id:%s", self.machine_id
)
return False

if self.delete_requested:
logging.info(
"can_schedule is set to be deleted. machine_id:%s",
self.machine_id,
)
self.stop()
return False

if self.reimage_requested:
logging.info(
"can_schedule should be recycled. machine_id:%s", self.machine_id
"can_schedule is set to be reimaged. machine_id:%s",
self.machine_id,
)
self.stop()
return False
Expand Down Expand Up @@ -682,25 +716,11 @@ def cleanup_nodes(self) -> bool:
to_reimage = []
to_delete = []

outdated = Node.search_outdated(scaleset_id=self.scaleset_id)
for node in outdated:
logging.info(
"node is outdated: %s - node_version:%s api_version:%s",
node.machine_id,
node.version,
__version__,
)
if node.version == "1.0.0":
node.state = NodeState.done
to_reimage.append(node)
else:
node.to_reimage()

nodes = Node.search_states(
scaleset_id=self.scaleset_id, states=NodeState.ready_for_reset()
)

if not outdated and not nodes:
if not nodes:
logging.info("no nodes need updating: %s", self.scaleset_id)
return False

Expand All @@ -719,7 +739,8 @@ def cleanup_nodes(self) -> bool:
if ScalesetShrinkQueue(self.scaleset_id).should_shrink():
node.set_halt()
to_delete.append(node)
else:
elif not node.reimage_queued:
# only add nodes that are not already set to reschedule
to_reimage.append(node)

# Perform operations until they fail due to scaleset getting locked
Expand Down Expand Up @@ -833,6 +854,9 @@ def reimage_nodes(self, nodes: List[Node]) -> None:
"unable to reimage nodes: %s:%s - %s"
% (self.scaleset_id, machine_ids, result)
)
for node in nodes:
node.reimage_queued = True
node.save()

def shutdown(self) -> None:
size = get_vmss_size(self.scaleset_id)
Expand All @@ -855,7 +879,6 @@ def halt(self) -> None:
self.save()
else:
logging.info("scaleset deleted: %s", self.scaleset_id)
self.state = ScalesetState.halt
self.delete()

@classmethod
Expand Down
5 changes: 1 addition & 4 deletions src/api-service/__app__/onefuzzlib/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .azure.queue import get_queue_sas
from .azure.vm import VM
from .extension import proxy_manager_extensions
from .orm import HOURS, MappingIntStrAny, ORMMixin, QueryFilter
from .orm import MappingIntStrAny, ORMMixin, QueryFilter
from .proxy_forward import ProxyForward

PROXY_SKU = "Standard_B2s"
Expand Down Expand Up @@ -210,9 +210,6 @@ def save_proxy_config(self) -> None:
account_id=os.environ["ONEFUZZ_FUNC_STORAGE"],
)

def queue_stop(self, count: int) -> None:
self.queue(method=self.stopping, visibility_timeout=count * HOURS)

@classmethod
def search_states(cls, *, states: Optional[List[VmState]] = None) -> List["Proxy"]:
query: QueryFilter = {}
Expand Down
16 changes: 11 additions & 5 deletions src/api-service/__app__/onefuzzlib/repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Licensed under the MIT License.

import logging
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Union

from azure.mgmt.compute.models import VirtualMachine
Expand All @@ -18,7 +19,7 @@
from .azure.ip import get_public_ip
from .azure.vm import VM
from .extension import repro_extensions
from .orm import HOURS, ORMMixin, QueryFilter
from .orm import ORMMixin, QueryFilter
from .reports import get_report
from .tasks.main import Task

Expand Down Expand Up @@ -205,9 +206,6 @@ def build_repro_script(self) -> Optional[Error]:
logging.info("saved repro script")
return None

def queue_stop(self, count: int) -> None:
self.queue(method=self.stopping, visibility_timeout=count * HOURS)

@classmethod
def search_states(cls, *, states: Optional[List[VmState]] = None) -> List["Repro"]:
query: QueryFilter = {}
Expand All @@ -228,10 +226,18 @@ def create(cls, config: ReproConfig) -> Union[Error, "Repro"]:
return task

vm = cls(config=config, task_id=task.task_id, os=task.os, auth=build_auth())
if vm.end_time is None:
vm.end_time = datetime.utcnow() + timedelta(hours=config.duration)
vm.save()
vm.queue_stop(config.duration)

return vm

@classmethod
def search_expired(cls) -> List["Repro"]:
# unlike jobs/tasks, the entry is deleted from the backing table upon stop
time_filter = "end_time lt datetime'%s'" % datetime.utcnow().isoformat()
return cls.search(raw_unchecked_filter=time_filter)

@classmethod
def key_fields(cls) -> Tuple[str, Optional[str]]:
return ("vm_id", None)
3 changes: 0 additions & 3 deletions src/api-service/__app__/onefuzzlib/tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ def stopping(self) -> None:
self.state = TaskState.stopped
self.save()

def queue_stop(self) -> None:
self.queue(method=self.stopping)

@classmethod
def search_states(
cls, *, job_id: Optional[UUID] = None, states: Optional[List[TaskState]] = None
Expand Down
Loading

0 comments on commit dc37c86

Please sign in to comment.