Skip to content

Commit

Permalink
Fix worker status message overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
sharpener6 committed Jan 10, 2025
1 parent f4b4daa commit 05473d7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
16 changes: 8 additions & 8 deletions scaler/protocol/python/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class Resource(Message):
def __init__(self, msg):
self._msg = msg
super().__init__(msg)

@property
def cpu(self) -> int:
Expand All @@ -26,7 +26,7 @@ def get_message(self):

class ObjectManagerStatus(Message):
def __init__(self, msg):
self._msg = msg
super().__init__(msg)

@property
def number_of_objects(self) -> int:
Expand All @@ -48,7 +48,7 @@ def get_message(self):

class ClientManagerStatus(Message):
def __init__(self, msg):
self._msg = msg
super().__init__(msg)

@property
def client_to_num_of_tasks(self) -> Dict[bytes, int]:
Expand All @@ -70,7 +70,7 @@ def get_message(self):

class TaskManagerStatus(Message):
def __init__(self, msg):
self._msg = msg
super().__init__(msg)

@property
def unassigned(self) -> int:
Expand Down Expand Up @@ -117,7 +117,7 @@ def get_message(self):

class ProcessorStatus(Message):
def __init__(self, msg):
self._msg = msg
super().__init__(msg)

@property
def pid(self) -> int:
Expand Down Expand Up @@ -155,7 +155,7 @@ def get_message(self):

class WorkerStatus(Message):
def __init__(self, msg):
self._msg = msg
super().__init__(msg)

@property
def worker_id(self) -> bytes:
Expand Down Expand Up @@ -237,7 +237,7 @@ def get_message(self):

class WorkerManagerStatus(Message):
def __init__(self, msg):
self._msg = msg
super().__init__(msg)

@property
def workers(self) -> List[WorkerStatus]:
Expand All @@ -253,7 +253,7 @@ def get_message(self):

class BinderStatus(Message):
def __init__(self, msg):
self._msg = msg
super().__init__(msg)

@property
def received(self) -> Dict[str, int]:
Expand Down
7 changes: 5 additions & 2 deletions scaler/scheduler/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from scaler.scheduler.mixins import TaskManager, WorkerManager
from scaler.utility.mixins import Looper, Reporter

UINT8_MAX = 2**8 - 1


class VanillaWorkerManager(WorkerManager, Looper, Reporter):
def __init__(
Expand Down Expand Up @@ -123,7 +125,8 @@ def __worker_status_from_heartbeat(
worker: bytes, worker_task_numbers: Dict, last: float, info: WorkerHeartbeat
) -> WorkerStatus:
current_processor = next((p for p in info.processors if not p.suspended), None)
suspended = len([p for p in info.processors if p.suspended])
suspended = min(len([p for p in info.processors if p.suspended]), UINT8_MAX)
last_s = min(int(time.time() - last), UINT8_MAX)

if current_processor:
debug_info = f"{int(current_processor.initialized)}{int(current_processor.has_task)}{int(info.task_lock)}"
Expand All @@ -139,7 +142,7 @@ def __worker_status_from_heartbeat(
queued=info.queued_tasks,
suspended=suspended,
lag_us=info.latency_us,
last_s=int(time.time() - last),
last_s=last_s,
itl=debug_info,
processor_statuses=[
ProcessorStatus.new_msg(
Expand Down

0 comments on commit 05473d7

Please sign in to comment.