From 1853270d25ba6cceed559ccad7c7c328ade3477c Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 10:03:09 -0400 Subject: [PATCH 01/31] stop automatically queueing objects for work --- src/api-service/__app__/onefuzzlib/orm.py | 19 ------------------- .../__init__.py | 0 .../function.json | 0 .../__init__.py | 0 .../function.json | 0 .../__init__.py | 0 .../function.json | 0 .../{daily_events => timer_daily}/__init__.py | 0 .../function.json | 0 .../__init__.py | 0 .../function.json | 0 .../__init__.py | 0 .../function.json | 0 .../__init__.py | 0 .../function.json | 0 .../__init__.py | 0 .../function.json | 0 17 files changed, 19 deletions(-) rename src/api-service/__app__/{events => queue_file_changes}/__init__.py (100%) rename src/api-service/__app__/{events => queue_file_changes}/function.json (100%) rename src/api-service/__app__/{heartbeat => queue_heartbeat}/__init__.py (100%) rename src/api-service/__app__/{heartbeat => queue_heartbeat}/function.json (100%) rename src/api-service/__app__/{update_queue => queue_updates}/__init__.py (100%) rename src/api-service/__app__/{update_queue => queue_updates}/function.json (100%) rename src/api-service/__app__/{daily_events => timer_daily}/__init__.py (100%) rename src/api-service/__app__/{daily_events => timer_daily}/function.json (100%) rename src/api-service/__app__/{scheduled_events => timer_events}/__init__.py (100%) rename src/api-service/__app__/{scheduled_events => timer_events}/function.json (100%) rename src/api-service/__app__/{pool_resize => timer_pool_resize}/__init__.py (100%) rename src/api-service/__app__/{pool_resize => timer_pool_resize}/function.json (100%) rename src/api-service/__app__/{scaleset_events => timer_scalesets}/__init__.py (100%) rename src/api-service/__app__/{scaleset_events => timer_scalesets}/function.json (100%) rename src/api-service/__app__/{schedule_tasks => timer_schedule_tasks}/__init__.py (100%) rename src/api-service/__app__/{schedule_tasks => timer_schedule_tasks}/function.json (100%) diff --git a/src/api-service/__app__/onefuzzlib/orm.py b/src/api-service/__app__/onefuzzlib/orm.py index 0dd77eed98..d0c0b9ae89 100644 --- a/src/api-service/__app__/onefuzzlib/orm.py +++ b/src/api-service/__app__/onefuzzlib/orm.py @@ -63,8 +63,6 @@ SAFE_STRINGS = (UUID, Container, Region, PoolName) KEY = Union[int, str, UUID, Enum] -QUEUE_DELAY_STOPPING_SECONDS = 30 -QUEUE_DELAY_CREATE_SECONDS = 5 HOURS = 60 * 60 @@ -228,22 +226,6 @@ def event(self) -> Any: def telemetry(self) -> Any: return self.raw(exclude_none=True, include=self.telemetry_include()) - def _queue_as_needed(self) -> None: - # Upon ORM save with state, if the object has a state that needs work, - # automatically queue it - state = getattr(self, "state", None) - if state is None: - return - needs_work = getattr(state, "needs_work", None) - if needs_work is None: - return - if state not in needs_work(): - return - if state.name in ["stopping", "stop", "shutdown"]: - self.queue(visibility_timeout=QUEUE_DELAY_STOPPING_SECONDS) - else: - self.queue(visibility_timeout=QUEUE_DELAY_CREATE_SECONDS) - def _event_as_needed(self) -> None: # Upon ORM save, if the object returns event data, we'll send it to the # dashboard event subsystem @@ -306,7 +288,6 @@ def save(self, new: bool = False, require_etag: bool = False) -> Optional[Error] else: self.etag = client.insert_or_replace_entity(self.table_name(), raw) - self._queue_as_needed() if self.table_name() in TelemetryEvent.__members__: telem = self.telemetry() if telem: diff --git a/src/api-service/__app__/events/__init__.py b/src/api-service/__app__/queue_file_changes/__init__.py similarity index 100% rename from src/api-service/__app__/events/__init__.py rename to src/api-service/__app__/queue_file_changes/__init__.py diff --git a/src/api-service/__app__/events/function.json b/src/api-service/__app__/queue_file_changes/function.json similarity index 100% rename from src/api-service/__app__/events/function.json rename to src/api-service/__app__/queue_file_changes/function.json diff --git a/src/api-service/__app__/heartbeat/__init__.py b/src/api-service/__app__/queue_heartbeat/__init__.py similarity index 100% rename from src/api-service/__app__/heartbeat/__init__.py rename to src/api-service/__app__/queue_heartbeat/__init__.py diff --git a/src/api-service/__app__/heartbeat/function.json b/src/api-service/__app__/queue_heartbeat/function.json similarity index 100% rename from src/api-service/__app__/heartbeat/function.json rename to src/api-service/__app__/queue_heartbeat/function.json diff --git a/src/api-service/__app__/update_queue/__init__.py b/src/api-service/__app__/queue_updates/__init__.py similarity index 100% rename from src/api-service/__app__/update_queue/__init__.py rename to src/api-service/__app__/queue_updates/__init__.py diff --git a/src/api-service/__app__/update_queue/function.json b/src/api-service/__app__/queue_updates/function.json similarity index 100% rename from src/api-service/__app__/update_queue/function.json rename to src/api-service/__app__/queue_updates/function.json diff --git a/src/api-service/__app__/daily_events/__init__.py b/src/api-service/__app__/timer_daily/__init__.py similarity index 100% rename from src/api-service/__app__/daily_events/__init__.py rename to src/api-service/__app__/timer_daily/__init__.py diff --git a/src/api-service/__app__/daily_events/function.json b/src/api-service/__app__/timer_daily/function.json similarity index 100% rename from src/api-service/__app__/daily_events/function.json rename to src/api-service/__app__/timer_daily/function.json diff --git a/src/api-service/__app__/scheduled_events/__init__.py b/src/api-service/__app__/timer_events/__init__.py similarity index 100% rename from src/api-service/__app__/scheduled_events/__init__.py rename to src/api-service/__app__/timer_events/__init__.py diff --git a/src/api-service/__app__/scheduled_events/function.json b/src/api-service/__app__/timer_events/function.json similarity index 100% rename from src/api-service/__app__/scheduled_events/function.json rename to src/api-service/__app__/timer_events/function.json diff --git a/src/api-service/__app__/pool_resize/__init__.py b/src/api-service/__app__/timer_pool_resize/__init__.py similarity index 100% rename from src/api-service/__app__/pool_resize/__init__.py rename to src/api-service/__app__/timer_pool_resize/__init__.py diff --git a/src/api-service/__app__/pool_resize/function.json b/src/api-service/__app__/timer_pool_resize/function.json similarity index 100% rename from src/api-service/__app__/pool_resize/function.json rename to src/api-service/__app__/timer_pool_resize/function.json diff --git a/src/api-service/__app__/scaleset_events/__init__.py b/src/api-service/__app__/timer_scalesets/__init__.py similarity index 100% rename from src/api-service/__app__/scaleset_events/__init__.py rename to src/api-service/__app__/timer_scalesets/__init__.py diff --git a/src/api-service/__app__/scaleset_events/function.json b/src/api-service/__app__/timer_scalesets/function.json similarity index 100% rename from src/api-service/__app__/scaleset_events/function.json rename to src/api-service/__app__/timer_scalesets/function.json diff --git a/src/api-service/__app__/schedule_tasks/__init__.py b/src/api-service/__app__/timer_schedule_tasks/__init__.py similarity index 100% rename from src/api-service/__app__/schedule_tasks/__init__.py rename to src/api-service/__app__/timer_schedule_tasks/__init__.py diff --git a/src/api-service/__app__/schedule_tasks/function.json b/src/api-service/__app__/timer_schedule_tasks/function.json similarity index 100% rename from src/api-service/__app__/schedule_tasks/function.json rename to src/api-service/__app__/timer_schedule_tasks/function.json From 30d8debb2bb07ad738977177cd5011db417ae465 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 13:41:14 -0400 Subject: [PATCH 02/31] actuate state machines directly instead of via queues --- src/api-service/__app__/onefuzzlib/orm.py | 21 ++++++++++++ .../__app__/timer_events/__init__.py | 33 ++++++++++--------- .../__app__/timer_events/function.json | 4 +-- .../__init__.py | 0 .../function.json | 0 5 files changed, 40 insertions(+), 18 deletions(-) rename src/api-service/__app__/{timer_schedule_tasks => timer_tasks}/__init__.py (100%) rename src/api-service/__app__/{timer_schedule_tasks => timer_tasks}/function.json (100%) diff --git a/src/api-service/__app__/onefuzzlib/orm.py b/src/api-service/__app__/onefuzzlib/orm.py index d0c0b9ae89..dde0fac0bd 100644 --- a/src/api-service/__app__/onefuzzlib/orm.py +++ b/src/api-service/__app__/onefuzzlib/orm.py @@ -14,6 +14,7 @@ List, Mapping, Optional, + Protocol, Tuple, Type, TypeVar, @@ -66,6 +67,26 @@ HOURS = 60 * 60 +class HasState(Protocol): + # TODO: this should be bound tighter than Any + state: Any + + +def process_update(obj: HasState) -> None: + """ process through the state machine for an object """ + + # process the state machine up to 5 times unless it's stopped changing + for _ in range(0, 5): + state = obj.state + func = getattr(obj, state.name, None) + if func is None: + return + func() + new_state = obj.state + if new_state == state: + break + + def resolve(key: KEY) -> str: if isinstance(key, str): return key diff --git a/src/api-service/__app__/timer_events/__init__.py b/src/api-service/__app__/timer_events/__init__.py index 13e33e162e..a6f711dfff 100644 --- a/src/api-service/__app__/timer_events/__init__.py +++ b/src/api-service/__app__/timer_events/__init__.py @@ -10,6 +10,7 @@ from ..onefuzzlib.dashboard import get_event from ..onefuzzlib.jobs import Job +from ..onefuzzlib.orm import process_update from ..onefuzzlib.pools import Node, Pool from ..onefuzzlib.proxy import Proxy from ..onefuzzlib.repro import Repro @@ -19,43 +20,43 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 proxies = Proxy.search_states(states=VmState.needs_work()) for proxy in proxies: - logging.info("requeueing update proxy vm: %s", proxy.region) - proxy.queue() + logging.info("update proxy vm: %s", proxy.region) + process_update(proxy) vms = Repro.search_states(states=VmState.needs_work()) for vm in vms: - logging.info("requeueing update vm: %s", vm.vm_id) - vm.queue() + logging.info("update vm: %s", vm.vm_id) + process_update(vm) tasks = Task.search_states(states=TaskState.needs_work()) for task in tasks: - logging.info("requeueing update task: %s", task.task_id) - task.queue() + logging.info("update task: %s", task.task_id) + process_update(task) jobs = Job.search_states(states=JobState.needs_work()) for job in jobs: - logging.info("requeueing update job: %s", job.job_id) - job.queue() + logging.info("update job: %s", job.job_id) + process_update(job) pools = Pool.search_states(states=PoolState.needs_work()) for pool in pools: - logging.info("queuing update pool: %s (%s)", pool.pool_id, pool.name) - pool.queue() + logging.info("update pool: %s (%s)", pool.pool_id, pool.name) + process_update(pool) nodes = Node.search_states(states=NodeState.needs_work()) for node in nodes: - logging.info("queuing update node: %s", node.machine_id) - node.queue() + logging.info("update node: %s", node.machine_id) + process_update(node) expired_tasks = Task.search_expired() for task in expired_tasks: - logging.info("queuing stop for task: %s", task.job_id) - task.queue_stop() + logging.info("stopping task: %s", task.job_id) + task.stopping() expired_jobs = Job.search_expired() for job in expired_jobs: - logging.info("queuing stop for job: %s", job.job_id) - job.queue_stop() + logging.info("stopping job: %s", job.job_id) + job.stopping() # Reminder, proxies are created on-demand. If something is "wrong" with # a proxy, the plan is: delete and recreate it. diff --git a/src/api-service/__app__/timer_events/function.json b/src/api-service/__app__/timer_events/function.json index 414338e48d..f975832043 100644 --- a/src/api-service/__app__/timer_events/function.json +++ b/src/api-service/__app__/timer_events/function.json @@ -3,7 +3,7 @@ { "direction": "in", "name": "mytimer", - "schedule": "0 */5 * * * *", + "schedule": "00:01:00", "type": "timerTrigger" }, { @@ -14,4 +14,4 @@ } ], "scriptFile": "__init__.py" -} +} \ No newline at end of file diff --git a/src/api-service/__app__/timer_schedule_tasks/__init__.py b/src/api-service/__app__/timer_tasks/__init__.py similarity index 100% rename from src/api-service/__app__/timer_schedule_tasks/__init__.py rename to src/api-service/__app__/timer_tasks/__init__.py diff --git a/src/api-service/__app__/timer_schedule_tasks/function.json b/src/api-service/__app__/timer_tasks/function.json similarity index 100% rename from src/api-service/__app__/timer_schedule_tasks/function.json rename to src/api-service/__app__/timer_tasks/function.json From 9a1a917d9a54dcd9e8690a8a92deac567fe1dffe Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 14:07:02 -0400 Subject: [PATCH 03/31] rename testquery to stop warning for ORM tests --- src/api-service/tests/test_query_builder.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/api-service/tests/test_query_builder.py b/src/api-service/tests/test_query_builder.py index adbc702634..5f3a148dc6 100755 --- a/src/api-service/tests/test_query_builder.py +++ b/src/api-service/tests/test_query_builder.py @@ -10,7 +10,7 @@ from __app__.onefuzzlib.orm import ORMMixin, build_filters -class TestOrm(ORMMixin): +class BasicOrm(ORMMixin): a: int b: UUID c: str @@ -27,38 +27,38 @@ def test_filter(self) -> None: self.maxDiff = 999999999999999 self.assertEqual( - build_filters(TestOrm, {"a": [1]}), ("a eq 1", {}), "handle integer" + build_filters(BasicOrm, {"a": [1]}), ("a eq 1", {}), "handle integer" ) self.assertEqual( build_filters( - TestOrm, {"b": [UUID("06aa1e71-b025-4325-9983-4b3ce2de12ea")]} + BasicOrm, {"b": [UUID("06aa1e71-b025-4325-9983-4b3ce2de12ea")]} ), ("b eq '06aa1e71-b025-4325-9983-4b3ce2de12ea'", {}), "handle UUID", ) self.assertEqual( - build_filters(TestOrm, {"a": ["a"]}), (None, {"a": ["a"]}), "handle str" + build_filters(BasicOrm, {"a": ["a"]}), (None, {"a": ["a"]}), "handle str" ) self.assertEqual( - build_filters(TestOrm, {"a": [1, 2]}), + build_filters(BasicOrm, {"a": [1, 2]}), ("(a eq 1 or a eq 2)", {}), "multiple values", ) self.assertEqual( - build_filters(TestOrm, {"a": ["b"], "c": ["d"]}), + build_filters(BasicOrm, {"a": ["b"], "c": ["d"]}), (None, {"a": ["b"], "c": ["d"]}), "multiple fields", ) self.assertEqual( - build_filters(TestOrm, {"a": [1, 2], "c": [3]}), + build_filters(BasicOrm, {"a": [1, 2], "c": [3]}), ("(a eq 1 or a eq 2) and c eq 3", {}), "multiple fields and values", ) self.assertEqual( build_filters( - TestOrm, + BasicOrm, { "a": ["b"], "b": [1], @@ -70,13 +70,13 @@ def test_filter(self) -> None: ) self.assertEqual( - build_filters(TestOrm, {"d": [1, 2], "e": [3]}), + build_filters(BasicOrm, {"d": [1, 2], "e": [3]}), ("(PartitionKey eq 1 or PartitionKey eq 2) and RowKey eq 3", {}), "query on keyfields", ) with self.assertRaises(ValueError): - build_filters(TestOrm, {"test1": ["b", "c"], "test2": ["d"]}) + build_filters(BasicOrm, {"test1": ["b", "c"], "test2": ["d"]}) if __name__ == "__main__": From cfee27bd2d7856625096044a4c0b9fd9c2bd1566 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 14:14:16 -0400 Subject: [PATCH 04/31] move more job/task processing into timer_tasks --- src/api-service/__app__/onefuzzlib/jobs.py | 3 --- .../__app__/onefuzzlib/tasks/main.py | 3 --- .../__app__/timer_events/__init__.py | 24 +---------------- .../__app__/timer_tasks/__init__.py | 26 +++++++++++++++++++ 4 files changed, 27 insertions(+), 29 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/jobs.py b/src/api-service/__app__/onefuzzlib/jobs.py index a3ba108062..ca6076f331 100644 --- a/src/api-service/__app__/onefuzzlib/jobs.py +++ b/src/api-service/__app__/onefuzzlib/jobs.py @@ -73,9 +73,6 @@ def stopping(self) -> None: self.state = JobState.stopped self.save() - def queue_stop(self) -> None: - self.queue(method=self.stopping) - def on_start(self) -> None: # try to keep this effectively idempotent if self.end_time is None: diff --git a/src/api-service/__app__/onefuzzlib/tasks/main.py b/src/api-service/__app__/onefuzzlib/tasks/main.py index daff974af7..1189162de8 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/main.py +++ b/src/api-service/__app__/onefuzzlib/tasks/main.py @@ -118,9 +118,6 @@ def stopping(self) -> None: self.state = TaskState.stopped self.save() - def queue_stop(self) -> None: - self.queue(method=self.stopping) - @classmethod def search_states( cls, *, job_id: Optional[UUID] = None, states: Optional[List[TaskState]] = None diff --git a/src/api-service/__app__/timer_events/__init__.py b/src/api-service/__app__/timer_events/__init__.py index a6f711dfff..0d8f4f4501 100644 --- a/src/api-service/__app__/timer_events/__init__.py +++ b/src/api-service/__app__/timer_events/__init__.py @@ -6,15 +6,13 @@ import logging import azure.functions as func -from onefuzztypes.enums import JobState, NodeState, PoolState, TaskState, VmState +from onefuzztypes.enums import NodeState, PoolState, VmState from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.jobs import Job from ..onefuzzlib.orm import process_update from ..onefuzzlib.pools import Node, Pool from ..onefuzzlib.proxy import Proxy from ..onefuzzlib.repro import Repro -from ..onefuzzlib.tasks.main import Task def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 @@ -28,16 +26,6 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: logging.info("update vm: %s", vm.vm_id) process_update(vm) - tasks = Task.search_states(states=TaskState.needs_work()) - for task in tasks: - logging.info("update task: %s", task.task_id) - process_update(task) - - jobs = Job.search_states(states=JobState.needs_work()) - for job in jobs: - logging.info("update job: %s", job.job_id) - process_update(job) - pools = Pool.search_states(states=PoolState.needs_work()) for pool in pools: logging.info("update pool: %s (%s)", pool.pool_id, pool.name) @@ -48,16 +36,6 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: logging.info("update node: %s", node.machine_id) process_update(node) - expired_tasks = Task.search_expired() - for task in expired_tasks: - logging.info("stopping task: %s", task.job_id) - task.stopping() - - expired_jobs = Job.search_expired() - for job in expired_jobs: - logging.info("stopping job: %s", job.job_id) - job.stopping() - # Reminder, proxies are created on-demand. If something is "wrong" with # a proxy, the plan is: delete and recreate it. for proxy in Proxy.search(): diff --git a/src/api-service/__app__/timer_tasks/__init__.py b/src/api-service/__app__/timer_tasks/__init__.py index 57fab2fc4d..120d3e5d2b 100644 --- a/src/api-service/__app__/timer_tasks/__init__.py +++ b/src/api-service/__app__/timer_tasks/__init__.py @@ -3,13 +3,39 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +import logging + import azure.functions as func +from onefuzztypes.enums import JobState, TaskState from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.jobs import Job +from ..onefuzzlib.orm import process_update +from ..onefuzzlib.tasks.main import Task from ..onefuzzlib.tasks.scheduler import schedule_tasks def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 + expired_tasks = Task.search_expired() + for task in expired_tasks: + logging.info("stopping task: %s", task.job_id) + task.stopping() + + expired_jobs = Job.search_expired() + for job in expired_jobs: + logging.info("stopping job: %s", job.job_id) + job.stopping() + + jobs = Job.search_states(states=JobState.needs_work()) + for job in jobs: + logging.info("update job: %s", job.job_id) + process_update(job) + + tasks = Task.search_states(states=TaskState.needs_work()) + for task in tasks: + logging.info("update task: %s", task.task_id) + process_update(task) + schedule_tasks() event = get_event() From 7f6ed299ad93c357604028a1451cba6c872b30c0 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 14:24:05 -0400 Subject: [PATCH 05/31] get backported typing extensions --- src/api-service/__app__/requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/api-service/__app__/requirements.txt b/src/api-service/__app__/requirements.txt index e701f9b6f7..5080d5e5c6 100644 --- a/src/api-service/__app__/requirements.txt +++ b/src/api-service/__app__/requirements.txt @@ -32,5 +32,6 @@ PyJWT~=1.7.1 requests~=2.24.0 memoization~=0.3.1 github3.py~=1.3.0 +typing-extensions~=3.7.4.3 # onefuzz types version is set during build -onefuzztypes==0.0.0 \ No newline at end of file +onefuzztypes==0.0.0 From a07d1a5dd5e4012e090bb1a0ede1a53f8d510508 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 14:35:09 -0400 Subject: [PATCH 06/31] split out more things into individual queues --- .../__app__/timer_events/__init__.py | 22 ------------- .../__app__/timer_proxy/__init__.py | 33 +++++++++++++++++++ .../__app__/timer_proxy/function.json | 17 ++++++++++ .../__app__/timer_repro/__init__.py | 24 ++++++++++++++ .../__app__/timer_repro/function.json | 17 ++++++++++ 5 files changed, 91 insertions(+), 22 deletions(-) create mode 100644 src/api-service/__app__/timer_proxy/__init__.py create mode 100644 src/api-service/__app__/timer_proxy/function.json create mode 100644 src/api-service/__app__/timer_repro/__init__.py create mode 100644 src/api-service/__app__/timer_repro/function.json diff --git a/src/api-service/__app__/timer_events/__init__.py b/src/api-service/__app__/timer_events/__init__.py index 0d8f4f4501..8351cf22d6 100644 --- a/src/api-service/__app__/timer_events/__init__.py +++ b/src/api-service/__app__/timer_events/__init__.py @@ -11,21 +11,9 @@ from ..onefuzzlib.dashboard import get_event from ..onefuzzlib.orm import process_update from ..onefuzzlib.pools import Node, Pool -from ..onefuzzlib.proxy import Proxy -from ..onefuzzlib.repro import Repro def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 - proxies = Proxy.search_states(states=VmState.needs_work()) - for proxy in proxies: - logging.info("update proxy vm: %s", proxy.region) - process_update(proxy) - - vms = Repro.search_states(states=VmState.needs_work()) - for vm in vms: - logging.info("update vm: %s", vm.vm_id) - process_update(vm) - pools = Pool.search_states(states=PoolState.needs_work()) for pool in pools: logging.info("update pool: %s (%s)", pool.pool_id, pool.name) @@ -36,16 +24,6 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: logging.info("update node: %s", node.machine_id) process_update(node) - # Reminder, proxies are created on-demand. If something is "wrong" with - # a proxy, the plan is: delete and recreate it. - for proxy in Proxy.search(): - if not proxy.is_alive(): - logging.error("proxy alive check failed, stopping: %s", proxy.region) - proxy.state = VmState.stopping - proxy.save() - else: - proxy.save_proxy_config() - event = get_event() if event: dashboard.set(event) diff --git a/src/api-service/__app__/timer_proxy/__init__.py b/src/api-service/__app__/timer_proxy/__init__.py new file mode 100644 index 0000000000..afff5d35b4 --- /dev/null +++ b/src/api-service/__app__/timer_proxy/__init__.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import logging + +import azure.functions as func +from onefuzztypes.enums import VmState + +from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.orm import process_update +from ..onefuzzlib.proxy import Proxy + + +def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 + # Reminder, proxies are created on-demand. If something is "wrong" with + # a proxy, the plan is: delete and recreate it. + for proxy in Proxy.search(): + if not proxy.is_alive(): + logging.error("proxy alive check failed, stopping: %s", proxy.region) + proxy.state = VmState.stopping + proxy.save() + else: + proxy.save_proxy_config() + + if proxy.state in VmState.needs_work(): + logging.info("update proxy vm: %s", proxy.region) + process_update(proxy) + + event = get_event() + if event: + dashboard.set(event) diff --git a/src/api-service/__app__/timer_proxy/function.json b/src/api-service/__app__/timer_proxy/function.json new file mode 100644 index 0000000000..8ed6b1ea60 --- /dev/null +++ b/src/api-service/__app__/timer_proxy/function.json @@ -0,0 +1,17 @@ +{ + "bindings": [ + { + "direction": "in", + "name": "mytimer", + "schedule": "00:00:30", + "type": "timerTrigger" + }, + { + "type": "signalR", + "direction": "out", + "name": "dashboard", + "hubName": "dashboard" + } + ], + "scriptFile": "__init__.py" +} \ No newline at end of file diff --git a/src/api-service/__app__/timer_repro/__init__.py b/src/api-service/__app__/timer_repro/__init__.py new file mode 100644 index 0000000000..2725e54379 --- /dev/null +++ b/src/api-service/__app__/timer_repro/__init__.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import logging + +import azure.functions as func +from onefuzztypes.enums import VmState + +from ..onefuzzlib.dashboard import get_event +from ..onefuzzlib.orm import process_update +from ..onefuzzlib.repro import Repro + + +def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 + vms = Repro.search_states(states=VmState.needs_work()) + for vm in vms: + logging.info("update vm: %s", vm.vm_id) + process_update(vm) + + event = get_event() + if event: + dashboard.set(event) diff --git a/src/api-service/__app__/timer_repro/function.json b/src/api-service/__app__/timer_repro/function.json new file mode 100644 index 0000000000..f975832043 --- /dev/null +++ b/src/api-service/__app__/timer_repro/function.json @@ -0,0 +1,17 @@ +{ + "bindings": [ + { + "direction": "in", + "name": "mytimer", + "schedule": "00:01:00", + "type": "timerTrigger" + }, + { + "type": "signalR", + "direction": "out", + "name": "dashboard", + "hubName": "dashboard" + } + ], + "scriptFile": "__init__.py" +} \ No newline at end of file From ed2173d32e07dd82482587594ef0a930fbfed718 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 14:43:00 -0400 Subject: [PATCH 07/31] remove schedule_tasks --- .../__app__/timer_schedule_tasks/__init__.py | 24 ------------------- .../timer_schedule_tasks/function.json | 17 ------------- 2 files changed, 41 deletions(-) delete mode 100644 src/api-service/__app__/timer_schedule_tasks/__init__.py delete mode 100644 src/api-service/__app__/timer_schedule_tasks/function.json diff --git a/src/api-service/__app__/timer_schedule_tasks/__init__.py b/src/api-service/__app__/timer_schedule_tasks/__init__.py deleted file mode 100644 index 2725e54379..0000000000 --- a/src/api-service/__app__/timer_schedule_tasks/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import logging - -import azure.functions as func -from onefuzztypes.enums import VmState - -from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.orm import process_update -from ..onefuzzlib.repro import Repro - - -def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 - vms = Repro.search_states(states=VmState.needs_work()) - for vm in vms: - logging.info("update vm: %s", vm.vm_id) - process_update(vm) - - event = get_event() - if event: - dashboard.set(event) diff --git a/src/api-service/__app__/timer_schedule_tasks/function.json b/src/api-service/__app__/timer_schedule_tasks/function.json deleted file mode 100644 index 009043c96c..0000000000 --- a/src/api-service/__app__/timer_schedule_tasks/function.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "bindings": [ - { - "direction": "in", - "name": "mytimer", - "schedule": "00:00:15", - "type": "timerTrigger" - }, - { - "type": "signalR", - "direction": "out", - "name": "dashboard", - "hubName": "dashboard" - } - ], - "scriptFile": "__init__.py" -} \ No newline at end of file From 808c6b8d6d910f52cda048a49aff3a96b53ddf3f Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 14:48:39 -0400 Subject: [PATCH 08/31] remove queue updates that have been split off --- .../__app__/timer_events/__init__.py | 55 ++----------------- 1 file changed, 6 insertions(+), 49 deletions(-) diff --git a/src/api-service/__app__/timer_events/__init__.py b/src/api-service/__app__/timer_events/__init__.py index 13e33e162e..e3626de78c 100644 --- a/src/api-service/__app__/timer_events/__init__.py +++ b/src/api-service/__app__/timer_events/__init__.py @@ -6,66 +6,23 @@ import logging import azure.functions as func -from onefuzztypes.enums import JobState, NodeState, PoolState, TaskState, VmState +from onefuzztypes.enums import NodeState, PoolState from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.jobs import Job +from ..onefuzzlib.orm import process_update from ..onefuzzlib.pools import Node, Pool -from ..onefuzzlib.proxy import Proxy -from ..onefuzzlib.repro import Repro -from ..onefuzzlib.tasks.main import Task def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 - proxies = Proxy.search_states(states=VmState.needs_work()) - for proxy in proxies: - logging.info("requeueing update proxy vm: %s", proxy.region) - proxy.queue() - - vms = Repro.search_states(states=VmState.needs_work()) - for vm in vms: - logging.info("requeueing update vm: %s", vm.vm_id) - vm.queue() - - tasks = Task.search_states(states=TaskState.needs_work()) - for task in tasks: - logging.info("requeueing update task: %s", task.task_id) - task.queue() - - jobs = Job.search_states(states=JobState.needs_work()) - for job in jobs: - logging.info("requeueing update job: %s", job.job_id) - job.queue() - pools = Pool.search_states(states=PoolState.needs_work()) for pool in pools: - logging.info("queuing update pool: %s (%s)", pool.pool_id, pool.name) - pool.queue() + logging.info("update pool: %s (%s)", pool.pool_id, pool.name) + process_update(pool) nodes = Node.search_states(states=NodeState.needs_work()) for node in nodes: - logging.info("queuing update node: %s", node.machine_id) - node.queue() - - expired_tasks = Task.search_expired() - for task in expired_tasks: - logging.info("queuing stop for task: %s", task.job_id) - task.queue_stop() - - expired_jobs = Job.search_expired() - for job in expired_jobs: - logging.info("queuing stop for job: %s", job.job_id) - job.queue_stop() - - # Reminder, proxies are created on-demand. If something is "wrong" with - # a proxy, the plan is: delete and recreate it. - for proxy in Proxy.search(): - if not proxy.is_alive(): - logging.error("proxy alive check failed, stopping: %s", proxy.region) - proxy.state = VmState.stopping - proxy.save() - else: - proxy.save_proxy_config() + logging.info("update node: %s", node.machine_id) + process_update(node) event = get_event() if event: From ecde82d2ae23f54433885d26467bb0b1131f2d8e Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 14:51:39 -0400 Subject: [PATCH 09/31] rename timer_workers to timer_scalesets --- .../__app__/{timer_scalesets => timer_workers}/__init__.py | 0 .../__app__/{timer_scalesets => timer_workers}/function.json | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename src/api-service/__app__/{timer_scalesets => timer_workers}/__init__.py (100%) rename src/api-service/__app__/{timer_scalesets => timer_workers}/function.json (100%) diff --git a/src/api-service/__app__/timer_scalesets/__init__.py b/src/api-service/__app__/timer_workers/__init__.py similarity index 100% rename from src/api-service/__app__/timer_scalesets/__init__.py rename to src/api-service/__app__/timer_workers/__init__.py diff --git a/src/api-service/__app__/timer_scalesets/function.json b/src/api-service/__app__/timer_workers/function.json similarity index 100% rename from src/api-service/__app__/timer_scalesets/function.json rename to src/api-service/__app__/timer_workers/function.json From 8320020fa8fe14d8c15a42bb8de7c1c28ccf8662 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 15:01:00 -0400 Subject: [PATCH 10/31] merge autoscaling function into the worker function --- .../__app__/timer_events/__init__.py | 29 ---- .../__app__/timer_events/function.json | 17 -- .../__app__/timer_pool_resize/__init__.py | 151 ----------------- .../__app__/timer_pool_resize/function.json | 11 -- .../__app__/timer_workers/__init__.py | 155 +++++++++++++++++- 5 files changed, 153 insertions(+), 210 deletions(-) delete mode 100644 src/api-service/__app__/timer_events/__init__.py delete mode 100644 src/api-service/__app__/timer_events/function.json delete mode 100644 src/api-service/__app__/timer_pool_resize/__init__.py delete mode 100644 src/api-service/__app__/timer_pool_resize/function.json diff --git a/src/api-service/__app__/timer_events/__init__.py b/src/api-service/__app__/timer_events/__init__.py deleted file mode 100644 index e3626de78c..0000000000 --- a/src/api-service/__app__/timer_events/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import logging - -import azure.functions as func -from onefuzztypes.enums import NodeState, PoolState - -from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.orm import process_update -from ..onefuzzlib.pools import Node, Pool - - -def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 - pools = Pool.search_states(states=PoolState.needs_work()) - for pool in pools: - logging.info("update pool: %s (%s)", pool.pool_id, pool.name) - process_update(pool) - - nodes = Node.search_states(states=NodeState.needs_work()) - for node in nodes: - logging.info("update node: %s", node.machine_id) - process_update(node) - - event = get_event() - if event: - dashboard.set(event) diff --git a/src/api-service/__app__/timer_events/function.json b/src/api-service/__app__/timer_events/function.json deleted file mode 100644 index f975832043..0000000000 --- a/src/api-service/__app__/timer_events/function.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "bindings": [ - { - "direction": "in", - "name": "mytimer", - "schedule": "00:01:00", - "type": "timerTrigger" - }, - { - "type": "signalR", - "direction": "out", - "name": "dashboard", - "hubName": "dashboard" - } - ], - "scriptFile": "__init__.py" -} \ No newline at end of file diff --git a/src/api-service/__app__/timer_pool_resize/__init__.py b/src/api-service/__app__/timer_pool_resize/__init__.py deleted file mode 100644 index 10034d0bd2..0000000000 --- a/src/api-service/__app__/timer_pool_resize/__init__.py +++ /dev/null @@ -1,151 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import logging -import math -from typing import List - -import azure.functions as func -from onefuzztypes.enums import NodeState, PoolState, ScalesetState -from onefuzztypes.models import AutoScaleConfig, TaskPool - -from ..onefuzzlib.pools import Node, Pool, Scaleset -from ..onefuzzlib.tasks.main import Task - - -def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None: - logging.info("Scaling up") - autoscale_config = pool.autoscale - if not isinstance(autoscale_config, AutoScaleConfig): - return - - for scaleset in scalesets: - if scaleset.state == ScalesetState.running: - - max_size = min(scaleset.max_size(), autoscale_config.scaleset_size) - logging.info( - "Sacleset id: %s, Scaleset size: %d, max_size: %d" - % (scaleset.scaleset_id, scaleset.size, max_size) - ) - if scaleset.size < max_size: - current_size = scaleset.size - if nodes_needed <= max_size - current_size: - scaleset.size = current_size + nodes_needed - nodes_needed = 0 - else: - scaleset.size = max_size - nodes_needed = nodes_needed - (max_size - current_size) - scaleset.state = ScalesetState.resize - scaleset.save() - - else: - continue - - if nodes_needed == 0: - return - - for _ in range( - math.ceil( - nodes_needed - / min( - Scaleset.scaleset_max_size(autoscale_config.image), - autoscale_config.scaleset_size, - ) - ) - ): - logging.info("Creating Scaleset for Pool %s" % (pool.name)) - max_nodes_scaleset = min( - Scaleset.scaleset_max_size(autoscale_config.image), - autoscale_config.scaleset_size, - nodes_needed, - ) - - if not autoscale_config.region: - raise Exception("Region is missing") - - scaleset = Scaleset.create( - pool_name=pool.name, - vm_sku=autoscale_config.vm_sku, - image=autoscale_config.image, - region=autoscale_config.region, - size=max_nodes_scaleset, - spot_instances=autoscale_config.spot_instances, - tags={"pool": pool.name}, - ) - scaleset.save() - nodes_needed -= max_nodes_scaleset - - -def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None: - logging.info("Scaling down") - for scaleset in scalesets: - nodes = Node.search_states( - scaleset_id=scaleset.scaleset_id, states=[NodeState.free] - ) - if nodes and nodes_to_remove > 0: - max_nodes_remove = min(len(nodes), nodes_to_remove) - if max_nodes_remove >= scaleset.size and len(nodes) == scaleset.size: - scaleset.state = ScalesetState.shutdown - nodes_to_remove = nodes_to_remove - scaleset.size - scaleset.save() - for node in nodes: - node.set_shutdown() - continue - - scaleset.size = scaleset.size - max_nodes_remove - nodes_to_remove = nodes_to_remove - max_nodes_remove - scaleset.state = ScalesetState.resize - scaleset.save() - - -def get_vm_count(tasks: List[Task]) -> int: - count = 0 - for task in tasks: - task_pool = task.get_pool() - if ( - not task_pool - or not isinstance(task_pool, Pool) - or not isinstance(task.config.pool, TaskPool) - ): - continue - count += task.config.pool.count - return count - - -def main(mytimer: func.TimerRequest) -> None: # noqa: F841 - pools = Pool.search_states(states=PoolState.available()) - for pool in pools: - logging.info("autoscale: %s" % (pool.autoscale)) - if not pool.autoscale: - continue - - # get all the tasks (count not stopped) for the pool - tasks = Task.get_tasks_by_pool_name(pool.name) - logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks))) - - num_of_tasks = get_vm_count(tasks) - nodes_needed = max(num_of_tasks, pool.autoscale.min_size) - if pool.autoscale.max_size: - nodes_needed = min(nodes_needed, pool.autoscale.max_size) - - # do scaleset logic match with pool - # get all the scalesets for the pool - scalesets = Scaleset.search_by_pool(pool.name) - pool_resize = False - for scaleset in scalesets: - if scaleset.state in ScalesetState.modifying(): - pool_resize = True - break - nodes_needed = nodes_needed - scaleset.size - - if pool_resize: - continue - - logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed)) - if nodes_needed > 0: - # resizing scaleset or creating new scaleset. - scale_up(pool, scalesets, nodes_needed) - elif nodes_needed < 0: - scale_down(scalesets, abs(nodes_needed)) diff --git a/src/api-service/__app__/timer_pool_resize/function.json b/src/api-service/__app__/timer_pool_resize/function.json deleted file mode 100644 index a2a21e3ef4..0000000000 --- a/src/api-service/__app__/timer_pool_resize/function.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "scriptFile": "__init__.py", - "bindings": [ - { - "name": "mytimer", - "type": "timerTrigger", - "direction": "in", - "schedule": "00:01:00" - } - ] -} diff --git a/src/api-service/__app__/timer_workers/__init__.py b/src/api-service/__app__/timer_workers/__init__.py index 0513de4a3e..917ebf2d33 100644 --- a/src/api-service/__app__/timer_workers/__init__.py +++ b/src/api-service/__app__/timer_workers/__init__.py @@ -4,12 +4,17 @@ # Licensed under the MIT License. import logging +import math +from typing import List import azure.functions as func -from onefuzztypes.enums import ScalesetState +from onefuzztypes.enums import NodeState, PoolState, ScalesetState +from onefuzztypes.models import AutoScaleConfig, TaskPool from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.pools import Scaleset +from ..onefuzzlib.orm import process_update +from ..onefuzzlib.pools import Node, Pool, Scaleset +from ..onefuzzlib.tasks.main import Task def process_scaleset(scaleset: Scaleset) -> None: @@ -38,11 +43,157 @@ def process_scaleset(scaleset: Scaleset) -> None: return +def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None: + logging.info("Scaling up") + autoscale_config = pool.autoscale + if not isinstance(autoscale_config, AutoScaleConfig): + return + + for scaleset in scalesets: + if scaleset.state == ScalesetState.running: + + max_size = min(scaleset.max_size(), autoscale_config.scaleset_size) + logging.info( + "Sacleset id: %s, Scaleset size: %d, max_size: %d" + % (scaleset.scaleset_id, scaleset.size, max_size) + ) + if scaleset.size < max_size: + current_size = scaleset.size + if nodes_needed <= max_size - current_size: + scaleset.size = current_size + nodes_needed + nodes_needed = 0 + else: + scaleset.size = max_size + nodes_needed = nodes_needed - (max_size - current_size) + scaleset.state = ScalesetState.resize + scaleset.save() + + else: + continue + + if nodes_needed == 0: + return + + for _ in range( + math.ceil( + nodes_needed + / min( + Scaleset.scaleset_max_size(autoscale_config.image), + autoscale_config.scaleset_size, + ) + ) + ): + logging.info("Creating Scaleset for Pool %s" % (pool.name)) + max_nodes_scaleset = min( + Scaleset.scaleset_max_size(autoscale_config.image), + autoscale_config.scaleset_size, + nodes_needed, + ) + + if not autoscale_config.region: + raise Exception("Region is missing") + + scaleset = Scaleset.create( + pool_name=pool.name, + vm_sku=autoscale_config.vm_sku, + image=autoscale_config.image, + region=autoscale_config.region, + size=max_nodes_scaleset, + spot_instances=autoscale_config.spot_instances, + tags={"pool": pool.name}, + ) + scaleset.save() + nodes_needed -= max_nodes_scaleset + + +def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None: + logging.info("Scaling down") + for scaleset in scalesets: + nodes = Node.search_states( + scaleset_id=scaleset.scaleset_id, states=[NodeState.free] + ) + if nodes and nodes_to_remove > 0: + max_nodes_remove = min(len(nodes), nodes_to_remove) + if max_nodes_remove >= scaleset.size and len(nodes) == scaleset.size: + scaleset.state = ScalesetState.shutdown + nodes_to_remove = nodes_to_remove - scaleset.size + scaleset.save() + for node in nodes: + node.set_shutdown() + continue + + scaleset.size = scaleset.size - max_nodes_remove + nodes_to_remove = nodes_to_remove - max_nodes_remove + scaleset.state = ScalesetState.resize + scaleset.save() + + +def get_vm_count(tasks: List[Task]) -> int: + count = 0 + for task in tasks: + task_pool = task.get_pool() + if ( + not task_pool + or not isinstance(task_pool, Pool) + or not isinstance(task.config.pool, TaskPool) + ): + continue + count += task.config.pool.count + return count + + +def resize_pools() -> None: + pools = Pool.search_states(states=PoolState.available()) + for pool in pools: + logging.info("autoscale: %s" % (pool.autoscale)) + if not pool.autoscale: + continue + + # get all the tasks (count not stopped) for the pool + tasks = Task.get_tasks_by_pool_name(pool.name) + logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks))) + + num_of_tasks = get_vm_count(tasks) + nodes_needed = max(num_of_tasks, pool.autoscale.min_size) + if pool.autoscale.max_size: + nodes_needed = min(nodes_needed, pool.autoscale.max_size) + + # do scaleset logic match with pool + # get all the scalesets for the pool + scalesets = Scaleset.search_by_pool(pool.name) + pool_resize = False + for scaleset in scalesets: + if scaleset.state in ScalesetState.modifying(): + pool_resize = True + break + nodes_needed = nodes_needed - scaleset.size + + if pool_resize: + continue + + logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed)) + if nodes_needed > 0: + # resizing scaleset or creating new scaleset. + scale_up(pool, scalesets, nodes_needed) + elif nodes_needed < 0: + scale_down(scalesets, abs(nodes_needed)) + + def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 + nodes = Node.search_states(states=NodeState.needs_work()) + for node in nodes: + logging.info("update node: %s", node.machine_id) + process_update(node) + scalesets = Scaleset.search() for scaleset in scalesets: process_scaleset(scaleset) + pools = Pool.search_states(states=PoolState.needs_work()) + for pool in pools: + logging.info("update pool: %s (%s)", pool.pool_id, pool.name) + process_update(pool) + event = get_event() if event: dashboard.set(event) From 65e11ed83d1c77a2e4a5f1c22743df9edcfbb04a Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 15:34:39 -0400 Subject: [PATCH 11/31] move autoscale into separate module still called by scaleset code --- .../__app__/onefuzzlib/autoscale.py | 141 +++++++++++++++ src/api-service/__app__/onefuzzlib/pools.py | 32 ++-- .../__app__/timer_workers/__init__.py | 168 +----------------- 3 files changed, 166 insertions(+), 175 deletions(-) create mode 100644 src/api-service/__app__/onefuzzlib/autoscale.py diff --git a/src/api-service/__app__/onefuzzlib/autoscale.py b/src/api-service/__app__/onefuzzlib/autoscale.py new file mode 100644 index 0000000000..e896e01b84 --- /dev/null +++ b/src/api-service/__app__/onefuzzlib/autoscale.py @@ -0,0 +1,141 @@ +import logging +from typing import List +from .pools import Pool, Node, Scaleset +from onefuzztypes.models import AutoScaleConfig, TaskPool +from onefuzztypes.enums import ScalesetState, PoolState, NodeState +import math +from .tasks.main import Task + + +def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None: + logging.info("Scaling up") + autoscale_config = pool.autoscale + if not isinstance(autoscale_config, AutoScaleConfig): + return + + for scaleset in scalesets: + if scaleset.state == ScalesetState.running: + + max_size = min(scaleset.max_size(), autoscale_config.scaleset_size) + logging.info( + "scaleset:%s size:%d max_size:%d" + % (scaleset.scaleset_id, scaleset.size, max_size) + ) + if scaleset.size < max_size: + current_size = scaleset.size + if nodes_needed <= max_size - current_size: + scaleset.size = current_size + nodes_needed + nodes_needed = 0 + else: + scaleset.size = max_size + nodes_needed = nodes_needed - (max_size - current_size) + scaleset.state = ScalesetState.resize + scaleset.save() + + else: + continue + + if nodes_needed == 0: + return + + for _ in range( + math.ceil( + nodes_needed + / min( + Scaleset.scaleset_max_size(autoscale_config.image), + autoscale_config.scaleset_size, + ) + ) + ): + logging.info("Creating Scaleset for Pool %s" % (pool.name)) + max_nodes_scaleset = min( + Scaleset.scaleset_max_size(autoscale_config.image), + autoscale_config.scaleset_size, + nodes_needed, + ) + + if not autoscale_config.region: + raise Exception("Region is missing") + + scaleset = Scaleset.create( + pool_name=pool.name, + vm_sku=autoscale_config.vm_sku, + image=autoscale_config.image, + region=autoscale_config.region, + size=max_nodes_scaleset, + spot_instances=autoscale_config.spot_instances, + tags={"pool": pool.name}, + ) + scaleset.save() + nodes_needed -= max_nodes_scaleset + + +def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None: + logging.info("Scaling down") + for scaleset in scalesets: + nodes = Node.search_states( + scaleset_id=scaleset.scaleset_id, states=[NodeState.free] + ) + if nodes and nodes_to_remove > 0: + max_nodes_remove = min(len(nodes), nodes_to_remove) + if max_nodes_remove >= scaleset.size and len(nodes) == scaleset.size: + scaleset.state = ScalesetState.shutdown + nodes_to_remove = nodes_to_remove - scaleset.size + scaleset.save() + for node in nodes: + node.set_shutdown() + continue + + scaleset.size = scaleset.size - max_nodes_remove + nodes_to_remove = nodes_to_remove - max_nodes_remove + scaleset.state = ScalesetState.resize + scaleset.save() + + +def get_vm_count(tasks: List[Task]) -> int: + count = 0 + for task in tasks: + task_pool = task.get_pool() + if ( + not task_pool + or not isinstance(task_pool, Pool) + or not isinstance(task.config.pool, TaskPool) + ): + continue + count += task.config.pool.count + return count + + +def autoscale_pool(pool: Pool) -> None: + logging.info("autoscale: %s" % (pool.autoscale)) + if not pool.autoscale: + return + + # get all the tasks (count not stopped) for the pool + tasks = Task.get_tasks_by_pool_name(pool.name) + logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks))) + + num_of_tasks = get_vm_count(tasks) + nodes_needed = max(num_of_tasks, pool.autoscale.min_size) + if pool.autoscale.max_size: + nodes_needed = min(nodes_needed, pool.autoscale.max_size) + + # do scaleset logic match with pool + # get all the scalesets for the pool + scalesets = Scaleset.search_by_pool(pool.name) + pool_resize = False + for scaleset in scalesets: + if scaleset.state in ScalesetState.modifying(): + pool_resize = True + break + nodes_needed = nodes_needed - scaleset.size + + if pool_resize: + return + + logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed)) + if nodes_needed > 0: + # resizing scaleset or creating new scaleset. + scale_up(pool, scalesets, nodes_needed) + elif nodes_needed < 0: + scale_down(scalesets, abs(nodes_needed)) \ No newline at end of file diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index 2df8d895ff..b53e2615ba 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -108,6 +108,21 @@ def search_outdated( version_query = "not (version eq '%s')" % __version__ return cls.search(query=query, raw_unchecked_filter=version_query) + @classmethod + def mark_oudated_nodes(cls) -> None: + outdated = cls.search_outdated() + for node in outdated: + logging.info( + "node is outdated: %s - node_version:%s api_version:%s", + node.machine_id, + node.version, + __version__, + ) + if node.version == "1.0.0": + node.to_reimage(done=True) + else: + node.to_reimage() + @classmethod def get_by_machine_id(cls, machine_id: UUID) -> Optional["Node"]: nodes = cls.search(query={"machine_id": [machine_id]}) @@ -682,25 +697,11 @@ def cleanup_nodes(self) -> bool: to_reimage = [] to_delete = [] - outdated = Node.search_outdated(scaleset_id=self.scaleset_id) - for node in outdated: - logging.info( - "node is outdated: %s - node_version:%s api_version:%s", - node.machine_id, - node.version, - __version__, - ) - if node.version == "1.0.0": - node.state = NodeState.done - to_reimage.append(node) - else: - node.to_reimage() - nodes = Node.search_states( scaleset_id=self.scaleset_id, states=NodeState.ready_for_reset() ) - if not outdated and not nodes: + if not nodes: logging.info("no nodes need updating: %s", self.scaleset_id) return False @@ -855,7 +856,6 @@ def halt(self) -> None: self.save() else: logging.info("scaleset deleted: %s", self.scaleset_id) - self.state = ScalesetState.halt self.delete() @classmethod diff --git a/src/api-service/__app__/timer_workers/__init__.py b/src/api-service/__app__/timer_workers/__init__.py index 917ebf2d33..f2c309ff4f 100644 --- a/src/api-service/__app__/timer_workers/__init__.py +++ b/src/api-service/__app__/timer_workers/__init__.py @@ -4,182 +4,29 @@ # Licensed under the MIT License. import logging -import math -from typing import List import azure.functions as func from onefuzztypes.enums import NodeState, PoolState, ScalesetState -from onefuzztypes.models import AutoScaleConfig, TaskPool from ..onefuzzlib.dashboard import get_event from ..onefuzzlib.orm import process_update from ..onefuzzlib.pools import Node, Pool, Scaleset -from ..onefuzzlib.tasks.main import Task +from ..onefuzzlib.autoscale import autoscale_pool def process_scaleset(scaleset: Scaleset) -> None: logging.debug("checking scaleset for updates: %s", scaleset.scaleset_id) - if scaleset.state == ScalesetState.resize: - scaleset.resize() - # if the scaleset is touched during cleanup, don't continue to process it if scaleset.cleanup_nodes(): logging.debug("scaleset needed cleanup: %s", scaleset.scaleset_id) return - if ( - scaleset.state in ScalesetState.needs_work() - and scaleset.state != ScalesetState.resize - ): - logging.info( - "exec scaleset state: %s - %s", - scaleset.scaleset_id, - scaleset.state, - ) - - if hasattr(scaleset, scaleset.state.name): - getattr(scaleset, scaleset.state.name)() - return - - -def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None: - logging.info("Scaling up") - autoscale_config = pool.autoscale - if not isinstance(autoscale_config, AutoScaleConfig): - return - - for scaleset in scalesets: - if scaleset.state == ScalesetState.running: - - max_size = min(scaleset.max_size(), autoscale_config.scaleset_size) - logging.info( - "Sacleset id: %s, Scaleset size: %d, max_size: %d" - % (scaleset.scaleset_id, scaleset.size, max_size) - ) - if scaleset.size < max_size: - current_size = scaleset.size - if nodes_needed <= max_size - current_size: - scaleset.size = current_size + nodes_needed - nodes_needed = 0 - else: - scaleset.size = max_size - nodes_needed = nodes_needed - (max_size - current_size) - scaleset.state = ScalesetState.resize - scaleset.save() - - else: - continue - - if nodes_needed == 0: - return - - for _ in range( - math.ceil( - nodes_needed - / min( - Scaleset.scaleset_max_size(autoscale_config.image), - autoscale_config.scaleset_size, - ) - ) - ): - logging.info("Creating Scaleset for Pool %s" % (pool.name)) - max_nodes_scaleset = min( - Scaleset.scaleset_max_size(autoscale_config.image), - autoscale_config.scaleset_size, - nodes_needed, - ) - - if not autoscale_config.region: - raise Exception("Region is missing") - - scaleset = Scaleset.create( - pool_name=pool.name, - vm_sku=autoscale_config.vm_sku, - image=autoscale_config.image, - region=autoscale_config.region, - size=max_nodes_scaleset, - spot_instances=autoscale_config.spot_instances, - tags={"pool": pool.name}, - ) - scaleset.save() - nodes_needed -= max_nodes_scaleset - - -def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None: - logging.info("Scaling down") - for scaleset in scalesets: - nodes = Node.search_states( - scaleset_id=scaleset.scaleset_id, states=[NodeState.free] - ) - if nodes and nodes_to_remove > 0: - max_nodes_remove = min(len(nodes), nodes_to_remove) - if max_nodes_remove >= scaleset.size and len(nodes) == scaleset.size: - scaleset.state = ScalesetState.shutdown - nodes_to_remove = nodes_to_remove - scaleset.size - scaleset.save() - for node in nodes: - node.set_shutdown() - continue - - scaleset.size = scaleset.size - max_nodes_remove - nodes_to_remove = nodes_to_remove - max_nodes_remove - scaleset.state = ScalesetState.resize - scaleset.save() - - -def get_vm_count(tasks: List[Task]) -> int: - count = 0 - for task in tasks: - task_pool = task.get_pool() - if ( - not task_pool - or not isinstance(task_pool, Pool) - or not isinstance(task.config.pool, TaskPool) - ): - continue - count += task.config.pool.count - return count - - -def resize_pools() -> None: - pools = Pool.search_states(states=PoolState.available()) - for pool in pools: - logging.info("autoscale: %s" % (pool.autoscale)) - if not pool.autoscale: - continue - - # get all the tasks (count not stopped) for the pool - tasks = Task.get_tasks_by_pool_name(pool.name) - logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks))) - - num_of_tasks = get_vm_count(tasks) - nodes_needed = max(num_of_tasks, pool.autoscale.min_size) - if pool.autoscale.max_size: - nodes_needed = min(nodes_needed, pool.autoscale.max_size) - - # do scaleset logic match with pool - # get all the scalesets for the pool - scalesets = Scaleset.search_by_pool(pool.name) - pool_resize = False - for scaleset in scalesets: - if scaleset.state in ScalesetState.modifying(): - pool_resize = True - break - nodes_needed = nodes_needed - scaleset.size - - if pool_resize: - continue - - logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed)) - if nodes_needed > 0: - # resizing scaleset or creating new scaleset. - scale_up(pool, scalesets, nodes_needed) - elif nodes_needed < 0: - scale_down(scalesets, abs(nodes_needed)) + process_update(scaleset) def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 + Node.mark_oudated_nodes() nodes = Node.search_states(states=NodeState.needs_work()) for node in nodes: logging.info("update node: %s", node.machine_id) @@ -189,10 +36,13 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: for scaleset in scalesets: process_scaleset(scaleset) - pools = Pool.search_states(states=PoolState.needs_work()) + pools = Pool.search() for pool in pools: - logging.info("update pool: %s (%s)", pool.pool_id, pool.name) - process_update(pool) + if pool.state in PoolState.needs_work(): + logging.info("update pool: %s (%s)", pool.pool_id, pool.name) + process_update(pool) + elif pool.state in PoolState.available() and pool.autoscale: + autoscale_pool(pool) event = get_event() if event: From 01418dda0c0e93346624f77a1201b0afc68f4075 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 15:56:51 -0400 Subject: [PATCH 12/31] speed up processing repro --- src/api-service/__app__/timer_repro/function.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api-service/__app__/timer_repro/function.json b/src/api-service/__app__/timer_repro/function.json index f975832043..ebe98447c3 100644 --- a/src/api-service/__app__/timer_repro/function.json +++ b/src/api-service/__app__/timer_repro/function.json @@ -3,7 +3,7 @@ { "direction": "in", "name": "mytimer", - "schedule": "00:01:00", + "schedule": "00:00:30", "type": "timerTrigger" }, { From c1e718aca73937e7f8144e59fe40ab02fdb9e1d3 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 16:08:35 -0400 Subject: [PATCH 13/31] ensure repro VMs get shut down --- .../__app__/onefuzzlib/autoscale.py | 2 +- src/api-service/__app__/onefuzzlib/repro.py | 14 ++++++++++---- .../__app__/timer_repro/__init__.py | 19 ++++++++++++++----- src/pytypes/onefuzztypes/models.py | 1 + 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/autoscale.py b/src/api-service/__app__/onefuzzlib/autoscale.py index e896e01b84..694df6883a 100644 --- a/src/api-service/__app__/onefuzzlib/autoscale.py +++ b/src/api-service/__app__/onefuzzlib/autoscale.py @@ -138,4 +138,4 @@ def autoscale_pool(pool: Pool) -> None: # resizing scaleset or creating new scaleset. scale_up(pool, scalesets, nodes_needed) elif nodes_needed < 0: - scale_down(scalesets, abs(nodes_needed)) \ No newline at end of file + scale_down(scalesets, abs(nodes_needed)) diff --git a/src/api-service/__app__/onefuzzlib/repro.py b/src/api-service/__app__/onefuzzlib/repro.py index 52ef6da021..b143651b2b 100644 --- a/src/api-service/__app__/onefuzzlib/repro.py +++ b/src/api-service/__app__/onefuzzlib/repro.py @@ -5,6 +5,7 @@ import logging from typing import List, Optional, Tuple, Union +from datetime import datetime, timedelta from azure.mgmt.compute.models import VirtualMachine from onefuzztypes.enums import OS, ContainerType, ErrorCode, VmState @@ -205,9 +206,6 @@ def build_repro_script(self) -> Optional[Error]: logging.info("saved repro script") return None - def queue_stop(self, count: int) -> None: - self.queue(method=self.stopping, visibility_timeout=count * HOURS) - @classmethod def search_states(cls, *, states: Optional[List[VmState]] = None) -> List["Repro"]: query: QueryFilter = {} @@ -228,10 +226,18 @@ def create(cls, config: ReproConfig) -> Union[Error, "Repro"]: return task vm = cls(config=config, task_id=task.task_id, os=task.os, auth=build_auth()) + if vm.end_time is None: + vm.end_time = datetime.utcnow() + timedelta(hours=config.duration) vm.save() - vm.queue_stop(config.duration) + return vm + @classmethod + def search_expired(cls) -> List["Repro"]: + # unlike jobs/tasks, the entry is deleted from the backing table upon stop + time_filter = "end_time lt datetime'%s'" % datetime.utcnow().isoformat() + return cls.search(raw_unchecked_filter=time_filter) + @classmethod def key_fields(cls) -> Tuple[str, Optional[str]]: return ("vm_id", None) diff --git a/src/api-service/__app__/timer_repro/__init__.py b/src/api-service/__app__/timer_repro/__init__.py index 2725e54379..29a3e22a21 100644 --- a/src/api-service/__app__/timer_repro/__init__.py +++ b/src/api-service/__app__/timer_repro/__init__.py @@ -14,11 +14,20 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 - vms = Repro.search_states(states=VmState.needs_work()) - for vm in vms: - logging.info("update vm: %s", vm.vm_id) - process_update(vm) + expired = Repro.search_expired() + for repro in expired: + logging.info("stopping repro: %s", repro.vm_id) + repro.stopping() + + expired_vm_ids = [x.vm_id for x in expired] + + for repro in Repro.search_states(states=VmState.needs_work()): + if repro.vm_id in expired_vm_ids: + # this VM already got processed during the expired phase + continue + logging.info("update repro: %s", repro.vm_id) + process_update(repro) event = get_event() if event: - dashboard.set(event) + dashboard.set(event) \ No newline at end of file diff --git a/src/pytypes/onefuzztypes/models.py b/src/pytypes/onefuzztypes/models.py index 39399434d1..b510511970 100644 --- a/src/pytypes/onefuzztypes/models.py +++ b/src/pytypes/onefuzztypes/models.py @@ -582,6 +582,7 @@ class Repro(BaseModel): os: OS error: Optional[Error] ip: Optional[str] + end_time: Optional[datetime] class ExitStatus(BaseModel): From 9b9d3a1af526f681439a0e01d21cc77b707f8fc3 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 16:09:52 -0400 Subject: [PATCH 14/31] handle future queued stop --- src/api-service/__app__/onefuzzlib/proxy.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/proxy.py b/src/api-service/__app__/onefuzzlib/proxy.py index e6986e8d62..9652d18e56 100644 --- a/src/api-service/__app__/onefuzzlib/proxy.py +++ b/src/api-service/__app__/onefuzzlib/proxy.py @@ -210,9 +210,6 @@ def save_proxy_config(self) -> None: account_id=os.environ["ONEFUZZ_FUNC_STORAGE"], ) - def queue_stop(self, count: int) -> None: - self.queue(method=self.stopping, visibility_timeout=count * HOURS) - @classmethod def search_states(cls, *, states: Optional[List[VmState]] = None) -> List["Proxy"]: query: QueryFilter = {} From ef43ddba6d8f98f149a8d8a22cb5e1e1015b647f Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 16:19:53 -0400 Subject: [PATCH 15/31] add change from #137 --- src/api-service/__app__/onefuzzlib/autoscale.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api-service/__app__/onefuzzlib/autoscale.py b/src/api-service/__app__/onefuzzlib/autoscale.py index 694df6883a..faf6a0d910 100644 --- a/src/api-service/__app__/onefuzzlib/autoscale.py +++ b/src/api-service/__app__/onefuzzlib/autoscale.py @@ -14,7 +14,7 @@ def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None: return for scaleset in scalesets: - if scaleset.state == ScalesetState.running: + if scaleset.state == [ScalesetState.running, ScalesetState.resize]: max_size = min(scaleset.max_size(), autoscale_config.scaleset_size) logging.info( From e5a8b445b22b4abd897b5059465f863b753ecd21 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 16:25:11 -0400 Subject: [PATCH 16/31] in, not == --- src/api-service/__app__/onefuzzlib/autoscale.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api-service/__app__/onefuzzlib/autoscale.py b/src/api-service/__app__/onefuzzlib/autoscale.py index faf6a0d910..eb1d82318f 100644 --- a/src/api-service/__app__/onefuzzlib/autoscale.py +++ b/src/api-service/__app__/onefuzzlib/autoscale.py @@ -14,7 +14,7 @@ def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None: return for scaleset in scalesets: - if scaleset.state == [ScalesetState.running, ScalesetState.resize]: + if scaleset.state in [ScalesetState.running, ScalesetState.resize]: max_size = min(scaleset.max_size(), autoscale_config.scaleset_size) logging.info( From 49c354aa1bb241ccf19b46aec355ca052ebd1a2b Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 16:27:15 -0400 Subject: [PATCH 17/31] change from #158 --- src/api-service/__app__/onefuzzlib/tasks/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api-service/__app__/onefuzzlib/tasks/main.py b/src/api-service/__app__/onefuzzlib/tasks/main.py index 1189162de8..dd1a055346 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/main.py +++ b/src/api-service/__app__/onefuzzlib/tasks/main.py @@ -162,7 +162,7 @@ def get_tasks_by_pool_name(cls, pool_name: str) -> List["Task"]: task_pool = task.get_pool() if not task_pool: continue - if pool_name == task_pool.name and task.state in TaskState.available(): + if pool_name == task_pool.name: pool_tasks.append(task) return pool_tasks From 3032ec38c587565790978dc4e36a7824a30b3c58 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 16:43:49 -0400 Subject: [PATCH 18/31] update linting --- src/api-service/__app__/onefuzzlib/autoscale.py | 8 +++++--- src/api-service/__app__/onefuzzlib/proxy.py | 2 +- src/api-service/__app__/onefuzzlib/repro.py | 4 ++-- src/api-service/__app__/timer_repro/__init__.py | 2 +- src/api-service/__app__/timer_workers/__init__.py | 4 ++-- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/autoscale.py b/src/api-service/__app__/onefuzzlib/autoscale.py index eb1d82318f..2e964722da 100644 --- a/src/api-service/__app__/onefuzzlib/autoscale.py +++ b/src/api-service/__app__/onefuzzlib/autoscale.py @@ -1,9 +1,11 @@ import logging +import math from typing import List -from .pools import Pool, Node, Scaleset + +from onefuzztypes.enums import NodeState, ScalesetState from onefuzztypes.models import AutoScaleConfig, TaskPool -from onefuzztypes.enums import ScalesetState, PoolState, NodeState -import math + +from .pools import Node, Pool, Scaleset from .tasks.main import Task diff --git a/src/api-service/__app__/onefuzzlib/proxy.py b/src/api-service/__app__/onefuzzlib/proxy.py index 9652d18e56..4f93921375 100644 --- a/src/api-service/__app__/onefuzzlib/proxy.py +++ b/src/api-service/__app__/onefuzzlib/proxy.py @@ -27,7 +27,7 @@ from .azure.queue import get_queue_sas from .azure.vm import VM from .extension import proxy_manager_extensions -from .orm import HOURS, MappingIntStrAny, ORMMixin, QueryFilter +from .orm import MappingIntStrAny, ORMMixin, QueryFilter from .proxy_forward import ProxyForward PROXY_SKU = "Standard_B2s" diff --git a/src/api-service/__app__/onefuzzlib/repro.py b/src/api-service/__app__/onefuzzlib/repro.py index b143651b2b..f261722255 100644 --- a/src/api-service/__app__/onefuzzlib/repro.py +++ b/src/api-service/__app__/onefuzzlib/repro.py @@ -4,8 +4,8 @@ # Licensed under the MIT License. import logging -from typing import List, Optional, Tuple, Union from datetime import datetime, timedelta +from typing import List, Optional, Tuple, Union from azure.mgmt.compute.models import VirtualMachine from onefuzztypes.enums import OS, ContainerType, ErrorCode, VmState @@ -19,7 +19,7 @@ from .azure.ip import get_public_ip from .azure.vm import VM from .extension import repro_extensions -from .orm import HOURS, ORMMixin, QueryFilter +from .orm import ORMMixin, QueryFilter from .reports import get_report from .tasks.main import Task diff --git a/src/api-service/__app__/timer_repro/__init__.py b/src/api-service/__app__/timer_repro/__init__.py index 29a3e22a21..f59edec5d9 100644 --- a/src/api-service/__app__/timer_repro/__init__.py +++ b/src/api-service/__app__/timer_repro/__init__.py @@ -30,4 +30,4 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: event = get_event() if event: - dashboard.set(event) \ No newline at end of file + dashboard.set(event) diff --git a/src/api-service/__app__/timer_workers/__init__.py b/src/api-service/__app__/timer_workers/__init__.py index f2c309ff4f..4d2458e80a 100644 --- a/src/api-service/__app__/timer_workers/__init__.py +++ b/src/api-service/__app__/timer_workers/__init__.py @@ -6,12 +6,12 @@ import logging import azure.functions as func -from onefuzztypes.enums import NodeState, PoolState, ScalesetState +from onefuzztypes.enums import NodeState, PoolState +from ..onefuzzlib.autoscale import autoscale_pool from ..onefuzzlib.dashboard import get_event from ..onefuzzlib.orm import process_update from ..onefuzzlib.pools import Node, Pool, Scaleset -from ..onefuzzlib.autoscale import autoscale_pool def process_scaleset(scaleset: Scaleset) -> None: From 48cc55ce76b5b9cb2840c575be7089db860528f8 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 17:22:42 -0400 Subject: [PATCH 19/31] pull Protocol from typing_extensions --- src/api-service/__app__/onefuzzlib/orm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api-service/__app__/onefuzzlib/orm.py b/src/api-service/__app__/onefuzzlib/orm.py index dde0fac0bd..c833b6584d 100644 --- a/src/api-service/__app__/onefuzzlib/orm.py +++ b/src/api-service/__app__/onefuzzlib/orm.py @@ -7,6 +7,7 @@ import json from datetime import datetime from enum import Enum +from typing_extensions import Protocol from typing import ( Any, Callable, @@ -14,7 +15,6 @@ List, Mapping, Optional, - Protocol, Tuple, Type, TypeVar, From 88274f372bc8fdb35115830883016c02a946b38e Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 15 Oct 2020 17:32:46 -0400 Subject: [PATCH 20/31] fix lint --- src/api-service/__app__/onefuzzlib/orm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api-service/__app__/onefuzzlib/orm.py b/src/api-service/__app__/onefuzzlib/orm.py index c833b6584d..21e114e1b6 100644 --- a/src/api-service/__app__/onefuzzlib/orm.py +++ b/src/api-service/__app__/onefuzzlib/orm.py @@ -7,7 +7,6 @@ import json from datetime import datetime from enum import Enum -from typing_extensions import Protocol from typing import ( Any, Callable, @@ -37,6 +36,7 @@ from onefuzztypes.models import Error from onefuzztypes.primitives import Container, PoolName, Region from pydantic import BaseModel, Field +from typing_extensions import Protocol from .azure.table import get_client from .dashboard import add_event From 20865d32e85c813ac297ffa4a48eaed0f8044de8 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 10:12:01 -0400 Subject: [PATCH 21/31] handle pools not ready before scheduling work --- .../__app__/onefuzzlib/tasks/scheduler.py | 28 +++++++++++++++---- src/pytypes/onefuzztypes/enums.py | 2 +- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/tasks/scheduler.py b/src/api-service/__app__/onefuzzlib/tasks/scheduler.py index a24e9e353c..67a441a363 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/scheduler.py +++ b/src/api-service/__app__/onefuzzlib/tasks/scheduler.py @@ -7,9 +7,10 @@ from typing import Dict, List from uuid import UUID -from onefuzztypes.enums import OS, TaskState +from onefuzztypes.enums import OS, TaskState, PoolState from onefuzztypes.models import WorkSet, WorkUnit +from .pools import Pool from ..azure.containers import blob_exists, get_container_sas_url, save_blob from ..azure.creds import get_func_storage from .config import build_task_config, get_setup_container @@ -18,6 +19,22 @@ HOURS = 60 * 60 +def schedule_workset(workset: WorkSet, pool: Pool, count: int) -> bool: + if pool.state not in PoolState.available(): + logging.info( + "pool not available for work: %s state: %s", pool.name, pool.state.name + ) + return False + + for _ in range(count): + if not pool.schedule_workset(workset): + logging.error( + "unable to schedule workset. pool:%s workset:%s", pool.name, workset + ) + return False + return True + + def schedule_tasks() -> None: to_schedule: Dict[UUID, List[Task]] = {} @@ -82,7 +99,7 @@ def schedule_tasks() -> None: ) # For now, only offer singleton work sets. - work_set = WorkSet( + workset = WorkSet( reboot=reboot, script=(setup_script is not None), setup_url=setup_url, @@ -94,7 +111,6 @@ def schedule_tasks() -> None: logging.info("unable to find pool for task: %s", task.task_id) continue - for _ in range(count): - pool.schedule_workset(work_set) - task.state = TaskState.scheduled - task.save() + if schedule_workset(workset, pool, count): + task.state = TaskState.scheduled + task.save() \ No newline at end of file diff --git a/src/pytypes/onefuzztypes/enums.py b/src/pytypes/onefuzztypes/enums.py index e9eccc78dd..911b049df5 100644 --- a/src/pytypes/onefuzztypes/enums.py +++ b/src/pytypes/onefuzztypes/enums.py @@ -256,7 +256,7 @@ def needs_work(cls) -> List["PoolState"]: @classmethod def available(cls) -> List["PoolState"]: """ set of states that indicate if it's available for work """ - return [cls.init, cls.running] + return [cls.running] class ScalesetState(Enum): From d04d020d2af389e33a9dddac506eab61ea88835d Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 10:26:47 -0400 Subject: [PATCH 22/31] lint --- src/api-service/__app__/onefuzzlib/tasks/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api-service/__app__/onefuzzlib/tasks/scheduler.py b/src/api-service/__app__/onefuzzlib/tasks/scheduler.py index 67a441a363..5929d2030a 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/scheduler.py +++ b/src/api-service/__app__/onefuzzlib/tasks/scheduler.py @@ -113,4 +113,4 @@ def schedule_tasks() -> None: if schedule_workset(workset, pool, count): task.state = TaskState.scheduled - task.save() \ No newline at end of file + task.save() From 093ccb743b6ec0871bf2bc402739e0784334aead Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 10:29:36 -0400 Subject: [PATCH 23/31] lint --- src/api-service/__app__/onefuzzlib/tasks/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/tasks/scheduler.py b/src/api-service/__app__/onefuzzlib/tasks/scheduler.py index 5929d2030a..b3203d3dd5 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/scheduler.py +++ b/src/api-service/__app__/onefuzzlib/tasks/scheduler.py @@ -7,12 +7,12 @@ from typing import Dict, List from uuid import UUID -from onefuzztypes.enums import OS, TaskState, PoolState +from onefuzztypes.enums import OS, PoolState, TaskState from onefuzztypes.models import WorkSet, WorkUnit -from .pools import Pool from ..azure.containers import blob_exists, get_container_sas_url, save_blob from ..azure.creds import get_func_storage +from ..pools import Pool from .config import build_task_config, get_setup_container from .main import Task From 714b88b4524f8ea0f63a434a724fdd94f5bc5359 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 11:36:48 -0400 Subject: [PATCH 24/31] only start to image a node once --- .../__app__/agent_registration/__init__.py | 1 + src/api-service/__app__/onefuzzlib/pools.py | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/api-service/__app__/agent_registration/__init__.py b/src/api-service/__app__/agent_registration/__init__.py index 27f8cd5afd..0e36f8a3b8 100644 --- a/src/api-service/__app__/agent_registration/__init__.py +++ b/src/api-service/__app__/agent_registration/__init__.py @@ -98,6 +98,7 @@ def post(req: func.HttpRequest) -> func.HttpResponse: node.version = registration_request.version node.reimage_requested = False node.state = NodeState.init + node.reimage_queued = False else: node = Node( pool_name=registration_request.pool_name, diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index b53e2615ba..4a042ba74d 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -69,6 +69,10 @@ class Node(BASE_NODE, ORMMixin): + # should only be set by Scaleset.reimage_nodes + # should only be unset during agent_registration POST + reimage_queued: bool = Field(default=False) + @classmethod def search_states( cls, @@ -210,6 +214,12 @@ def can_process_new_work(self) -> bool: self.stop() return False + if self.state in NodeState.ready_for_reset(): + logging.info( + "can_schedule should be recycled. machine_id:%s", self.machine_id + ) + return False + if self.delete_requested or self.reimage_requested: logging.info( "can_schedule should be recycled. machine_id:%s", self.machine_id @@ -720,7 +730,8 @@ def cleanup_nodes(self) -> bool: if ScalesetShrinkQueue(self.scaleset_id).should_shrink(): node.set_halt() to_delete.append(node) - else: + elif not node.reimage_queued: + # only add nodes that are not already set to reschedule to_reimage.append(node) # Perform operations until they fail due to scaleset getting locked @@ -834,6 +845,9 @@ def reimage_nodes(self, nodes: List[Node]) -> None: "unable to reimage nodes: %s:%s - %s" % (self.scaleset_id, machine_ids, result) ) + for node in nodes: + node.reimage_queued = True + node.save() def shutdown(self) -> None: size = get_vmss_size(self.scaleset_id) From ff8ecef218d5bcc0e226b64ba46fd5b2fef8e65b Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 17:07:33 -0400 Subject: [PATCH 25/31] re-add copyright statement --- src/api-service/__app__/onefuzzlib/autoscale.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/api-service/__app__/onefuzzlib/autoscale.py b/src/api-service/__app__/onefuzzlib/autoscale.py index 2e964722da..8c985b0fde 100644 --- a/src/api-service/__app__/onefuzzlib/autoscale.py +++ b/src/api-service/__app__/onefuzzlib/autoscale.py @@ -1,3 +1,8 @@ +#!/usr/bin/env python +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + import logging import math from typing import List From 5229fcaebde94fda2526c25a4c718597d92c8f09 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 17:09:36 -0400 Subject: [PATCH 26/31] add comments as to what this *should* be bound to --- src/api-service/__app__/onefuzzlib/orm.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/api-service/__app__/onefuzzlib/orm.py b/src/api-service/__app__/onefuzzlib/orm.py index 21e114e1b6..962e0a4949 100644 --- a/src/api-service/__app__/onefuzzlib/orm.py +++ b/src/api-service/__app__/onefuzzlib/orm.py @@ -69,6 +69,8 @@ class HasState(Protocol): # TODO: this should be bound tighter than Any + # In the end, we want this to be an Enum. Specifically, one of + # the JobState,TaskState,etc enums. state: Any From 68d710e8facbf2d92a833b457809c72c55eb1eca Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 17:16:25 -0400 Subject: [PATCH 27/31] address comments --- src/api-service/__app__/onefuzzlib/orm.py | 21 ++++++++++++------- .../__app__/timer_proxy/__init__.py | 4 ++-- .../__app__/timer_repro/__init__.py | 4 ++-- .../__app__/timer_tasks/__init__.py | 6 +++--- .../__app__/timer_workers/__init__.py | 8 +++---- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/orm.py b/src/api-service/__app__/onefuzzlib/orm.py index 962e0a4949..142581f7d3 100644 --- a/src/api-service/__app__/onefuzzlib/orm.py +++ b/src/api-service/__app__/onefuzzlib/orm.py @@ -74,16 +74,23 @@ class HasState(Protocol): state: Any -def process_update(obj: HasState) -> None: +def process_state_update(obj: HasState) -> None: + """ + process a single state update, if the obj + implements a function for that state + """ + + func = getattr(obj, obj.state.name, None) + if func is None: + return + + +def process_state_updates(obj: HasState, max_updates: int = 5) -> None: """ process through the state machine for an object """ - # process the state machine up to 5 times unless it's stopped changing - for _ in range(0, 5): + for _ in range(max_updates): state = obj.state - func = getattr(obj, state.name, None) - if func is None: - return - func() + process_state_update(obj) new_state = obj.state if new_state == state: break diff --git a/src/api-service/__app__/timer_proxy/__init__.py b/src/api-service/__app__/timer_proxy/__init__.py index afff5d35b4..13e5c166f1 100644 --- a/src/api-service/__app__/timer_proxy/__init__.py +++ b/src/api-service/__app__/timer_proxy/__init__.py @@ -9,7 +9,7 @@ from onefuzztypes.enums import VmState from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.orm import process_update +from ..onefuzzlib.orm import process_state_updates from ..onefuzzlib.proxy import Proxy @@ -26,7 +26,7 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: if proxy.state in VmState.needs_work(): logging.info("update proxy vm: %s", proxy.region) - process_update(proxy) + process_state_updates(proxy) event = get_event() if event: diff --git a/src/api-service/__app__/timer_repro/__init__.py b/src/api-service/__app__/timer_repro/__init__.py index f59edec5d9..c6fe21e669 100644 --- a/src/api-service/__app__/timer_repro/__init__.py +++ b/src/api-service/__app__/timer_repro/__init__.py @@ -9,7 +9,7 @@ from onefuzztypes.enums import VmState from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.orm import process_update +from ..onefuzzlib.orm import process_state_updates from ..onefuzzlib.repro import Repro @@ -26,7 +26,7 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: # this VM already got processed during the expired phase continue logging.info("update repro: %s", repro.vm_id) - process_update(repro) + process_state_updates(repro) event = get_event() if event: diff --git a/src/api-service/__app__/timer_tasks/__init__.py b/src/api-service/__app__/timer_tasks/__init__.py index 120d3e5d2b..b0e503579b 100644 --- a/src/api-service/__app__/timer_tasks/__init__.py +++ b/src/api-service/__app__/timer_tasks/__init__.py @@ -10,7 +10,7 @@ from ..onefuzzlib.dashboard import get_event from ..onefuzzlib.jobs import Job -from ..onefuzzlib.orm import process_update +from ..onefuzzlib.orm import process_state_updates from ..onefuzzlib.tasks.main import Task from ..onefuzzlib.tasks.scheduler import schedule_tasks @@ -29,12 +29,12 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: jobs = Job.search_states(states=JobState.needs_work()) for job in jobs: logging.info("update job: %s", job.job_id) - process_update(job) + process_state_updates(job) tasks = Task.search_states(states=TaskState.needs_work()) for task in tasks: logging.info("update task: %s", task.task_id) - process_update(task) + process_state_updates(task) schedule_tasks() diff --git a/src/api-service/__app__/timer_workers/__init__.py b/src/api-service/__app__/timer_workers/__init__.py index 4d2458e80a..070ed6aeb3 100644 --- a/src/api-service/__app__/timer_workers/__init__.py +++ b/src/api-service/__app__/timer_workers/__init__.py @@ -10,7 +10,7 @@ from ..onefuzzlib.autoscale import autoscale_pool from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.orm import process_update +from ..onefuzzlib.orm import process_state_updates from ..onefuzzlib.pools import Node, Pool, Scaleset @@ -22,7 +22,7 @@ def process_scaleset(scaleset: Scaleset) -> None: logging.debug("scaleset needed cleanup: %s", scaleset.scaleset_id) return - process_update(scaleset) + process_state_updates(scaleset) def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 @@ -30,7 +30,7 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: nodes = Node.search_states(states=NodeState.needs_work()) for node in nodes: logging.info("update node: %s", node.machine_id) - process_update(node) + process_state_updates(node) scalesets = Scaleset.search() for scaleset in scalesets: @@ -40,7 +40,7 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: for pool in pools: if pool.state in PoolState.needs_work(): logging.info("update pool: %s (%s)", pool.pool_id, pool.name) - process_update(pool) + process_state_updates(pool) elif pool.state in PoolState.available() and pool.autoscale: autoscale_pool(pool) From f03c87acdd5ffe212178ef46b7d164c676de8412 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 17:24:47 -0400 Subject: [PATCH 28/31] fix typo --- src/api-service/__app__/onefuzzlib/pools.py | 2 +- src/api-service/__app__/timer_workers/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index 4a042ba74d..a0f585562d 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -113,7 +113,7 @@ def search_outdated( return cls.search(query=query, raw_unchecked_filter=version_query) @classmethod - def mark_oudated_nodes(cls) -> None: + def mark_outdated_nodes(cls) -> None: outdated = cls.search_outdated() for node in outdated: logging.info( diff --git a/src/api-service/__app__/timer_workers/__init__.py b/src/api-service/__app__/timer_workers/__init__.py index 070ed6aeb3..bb1390db52 100644 --- a/src/api-service/__app__/timer_workers/__init__.py +++ b/src/api-service/__app__/timer_workers/__init__.py @@ -26,7 +26,7 @@ def process_scaleset(scaleset: Scaleset) -> None: def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 - Node.mark_oudated_nodes() + Node.mark_outdated_nodes() nodes = Node.search_states(states=NodeState.needs_work()) for node in nodes: logging.info("update node: %s", node.machine_id) From 02d4e81cef209cc4525583c7655e01deba7a754f Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 17:25:36 -0400 Subject: [PATCH 29/31] indicate why the task/job is stopping --- src/api-service/__app__/timer_tasks/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api-service/__app__/timer_tasks/__init__.py b/src/api-service/__app__/timer_tasks/__init__.py index b0e503579b..61a65f87d4 100644 --- a/src/api-service/__app__/timer_tasks/__init__.py +++ b/src/api-service/__app__/timer_tasks/__init__.py @@ -18,12 +18,12 @@ def main(mytimer: func.TimerRequest, dashboard: func.Out[str]) -> None: # noqa: F841 expired_tasks = Task.search_expired() for task in expired_tasks: - logging.info("stopping task: %s", task.job_id) + logging.info("stopping expired task: %s", task.job_id) task.stopping() expired_jobs = Job.search_expired() for job in expired_jobs: - logging.info("stopping job: %s", job.job_id) + logging.info("stopping expired job: %s", job.job_id) job.stopping() jobs = Job.search_states(states=JobState.needs_work()) From 6de9458f5f505ff31ae5552d02427772ee7f3857 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 17:28:10 -0400 Subject: [PATCH 30/31] cleanup message --- src/api-service/__app__/onefuzzlib/pools.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index a0f585562d..25ece1ca5d 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -216,13 +216,22 @@ def can_process_new_work(self) -> bool: if self.state in NodeState.ready_for_reset(): logging.info( - "can_schedule should be recycled. machine_id:%s", self.machine_id + "can_schedule node is set for reset. machine_id:%s", self.machine_id ) return False - if self.delete_requested or self.reimage_requested: + if self.delete_requested: logging.info( - "can_schedule should be recycled. machine_id:%s", self.machine_id + "can_schedule is set to be deleted. machine_id:%s", + self.machine_id, + ) + self.stop() + return False + + if self.reimage_requested: + logging.info( + "can_schedule is set to be reimaged. machine_id:%s", + self.machine_id, ) self.stop() return False From 8e51afca3fbbeb3db23c66296f052bbf9ee6b2f0 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 16 Oct 2020 19:43:27 -0400 Subject: [PATCH 31/31] actually execute the function --- src/api-service/__app__/onefuzzlib/orm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/api-service/__app__/onefuzzlib/orm.py b/src/api-service/__app__/onefuzzlib/orm.py index 142581f7d3..f66e36d836 100644 --- a/src/api-service/__app__/onefuzzlib/orm.py +++ b/src/api-service/__app__/onefuzzlib/orm.py @@ -83,6 +83,7 @@ def process_state_update(obj: HasState) -> None: func = getattr(obj, obj.state.name, None) if func is None: return + func() def process_state_updates(obj: HasState, max_updates: int = 5) -> None: