Skip to content

Commit

Permalink
Add Symphony worker implementation (#50)
Browse files Browse the repository at this point in the history
* Add Symphony worker implementation

Signed-off-by: 1597463007 <squid-shape-fifty@duck.com>

* Address comments

Signed-off-by: 1597463007 <squid-shape-fifty@duck.com>

* Address rest of the comments

Signed-off-by: 1597463007 <squid-shape-fifty@duck.com>

---------

Signed-off-by: 1597463007 <squid-shape-fifty@duck.com>
  • Loading branch information
1597463007 authored Jan 24, 2025
1 parent 1d36288 commit 5153efc
Show file tree
Hide file tree
Showing 36 changed files with 854 additions and 72 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ eggs/
*.egg
.mypy_cache/

.venv/
venv*/
.vscode/

Expand Down
113 changes: 99 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<br />

**Scaler provides a simple, efficient and reliable way to perform distributed computing** using a centralized scheduler,
with a stable and language agnostic protocol for client and worker communications.
with a stable and language-agnostic protocol for client and worker communications.

```python
import math
Expand All @@ -47,19 +47,17 @@ of lightweight tasks while improving on load balancing, messaging and deadlocks.

## Features

- Distributed computing on **multiple cores and multiple servers**
- **Python** reference implementation, with **language agnostic messaging protocol** built on top of
- Distributed computing across **multiple cores and multiple servers**
- **Python** reference implementation, with **language-agnostic messaging protocol** built on top of
[Cap'n Proto](https://capnproto.org/) and [ZeroMQ](https://zeromq.org)
- **Graph** scheduling, which supports [Dask](https://www.dask.org)-like graph computing, optionally you
can use [GraphBLAS](https://graphblas.org) for very large graph tasks
- **Automated load balancing**. automatically balances load from busy workers to idle workers and tries to keep workers
utilized as uniformly as possible
- **Automated task recovery** from faulting workers who have died
- Supports for **nested tasks**, tasks can themselves submit new tasks
- **Graph** scheduling, which supports [Dask](https://www.dask.org)-like graph computing, with optional [GraphBLAS](https://graphblas.org)
support for very large graph tasks
- **Automated load balancing**, which automatically balances load from busy workers to idle workers, ensuring uniform utilization across workers
- **Automated task recovery** from worker-related hardware, OS or network failures
- Support for **nested tasks**, allowing tasks to submit new tasks
- `top`-like **monitoring tools**
- GUI monitoring tool

Scaler's scheduler can be run on PyPy, which can provide a performance boost

## Installation

Expand All @@ -78,9 +76,9 @@ Scaler operates around 3 components:
- A set of **workers**, or cluster. Workers are independent computing units, each capable of executing a single task
- **Clients** running inside applications, responsible for submitting tasks to the scheduler.

### Start local scheduler and cluster at the same time in code
### Start local scheduler and cluster programmatically in code

A local scheduler and a local set of workers can be conveniently spawn using `SchedulerClusterCombo`:
A local scheduler and a local set of workers can be conveniently started using `SchedulerClusterCombo`:

```python
from scaler import SchedulerClusterCombo
Expand All @@ -98,7 +96,7 @@ This will start a scheduler with 4 task executing workers on port `2345`.

The scheduler and workers can also be started from the command line with `scaler_scheduler` and `scaler_cluster`.

First start the Scaler scheduler:
First, start the Scaler scheduler:

```bash
$ scaler_scheduler tcp://127.0.0.1:2345
Expand All @@ -108,7 +106,7 @@ $ scaler_scheduler tcp://127.0.0.1:2345
...
```

Then start a set of workers (a.k.a. a Scaler *cluster*) that connect to the previously started scheduler:
Then, start a set of workers (a.k.a. a Scaler *cluster*) that connect to the previously started scheduler:

```bash
$ scaler_cluster -n 4 tcp://127.0.0.1:2345
Expand Down Expand Up @@ -207,6 +205,93 @@ with Client(address="tcp://127.0.0.1:2345") as client:
print(result) # 21
```

## IBM Spectrum Symphony integration

A Scaler scheduler can interface with IBM Spectrum Symphony to provide distributed computing across Symphony clusters.

```bash
$ scaler_symphony_cluster tcp://127.0.0.1:2345 ScalerService --base-concurrency 4
```

This will start a Scaler worker that connects to the Scaler scheduler at `tcp://127.0.0.1:2345` and uses the Symphony
service `ScalerService` to submit tasks.

### Symphony service

A service must be deployed in Symphony to handle the task submission.

<details>

<summary>Here is an example of a service that can be used</summary>

```python
class Message(soamapi.Message):
def __init__(self, payload: bytes = b""):
self.__payload = payload

def set_payload(self, payload: bytes):
self.__payload = payload

def get_payload(self) -> bytes:
return self.__payload

def on_serialize(self, stream):
payload_array = array.array("b", self.get_payload())
stream.write_byte_array(payload_array, 0, len(payload_array))

def on_deserialize(self, stream):
self.set_payload(stream.read_byte_array("b"))

class ServiceContainer(soamapi.ServiceContainer):
def on_create_service(self, service_context):
return

def on_session_enter(self, session_context):
return

def on_invoke(self, task_context):
input_message = Message()
task_context.populate_task_input(input_message)

fn, *args = cloudpickle.loads(input_message.get_payload())
output_payload = cloudpickle.dumps(fn(*args))

output_message = Message(output_payload)
task_context.set_task_output(output_message)

def on_session_leave(self):
return

def on_destroy_service(self):
return
```
</details>

### Nested tasks

Nested task originating from Symphony workers must be able to reach the Scaler scheduler. This might require
modifications to the network configuration.

Nested tasks can also have unpredictable resource usage and runtimes, which can cause Symphony to prematurely kill
tasks. It is recommended to be conservative when provisioning resources and limits, and monitor the cluster status
closely for any abnormalities.

### Base concurrency

Base concurrency is the maximum number of unnested tasks that can be executed concurrently. It is possible to surpass
this limit by submitting nested tasks which carry a higher priority. **Important**: If your workload contains nested
tasks the base concurrency should be set to a value less to the number of cores available on the Symphony worker or else
deadlocks may occur.

A good heuristic for setting the base concurrency is to use the following formula:

```
base_concurrency = number_of_cores - deepest_nesting_level
```

where `deepest_nesting_level` is the deepest nesting level a task has in your workload. If you have a workload that has
a base task that calls a nested task that calls another nested task, the deepest nesting level is 2.

## Performance

### uvloop
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Home = "https://github.com/Citi/scaler"
[project.scripts]
scaler_scheduler = "scaler.entry_points.scheduler:main"
scaler_cluster = "scaler.entry_points.cluster:main"
scaler_symphony_cluster = "scaler.entry_points.symphony_cluster:main"
scaler_top = "scaler.entry_points.top:main"
scaler_ui = "scaler.entry_points.webui:main"

Expand Down
4 changes: 4 additions & 0 deletions run_symphony_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from scaler.entry_points.symphony_cluster import main

if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.8.17"
__version__ = "1.9.0"
6 changes: 1 addition & 5 deletions scaler/client/agent/object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
from scaler.client.agent.mixins import ObjectManager
from scaler.io.async_connector import AsyncConnector
from scaler.protocol.python.common import ObjectContent
from scaler.protocol.python.message import (
ObjectInstruction,
ObjectRequest,
TaskResult,
)
from scaler.protocol.python.message import ObjectInstruction, ObjectRequest, TaskResult


class ClientObjectManager(ObjectManager):
Expand Down
2 changes: 1 addition & 1 deletion scaler/client/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _set_result_or_exception(
self,
result: Optional[Any] = None,
exception: Optional[BaseException] = None,
profiling_info: Optional[ProfileResult] = None
profiling_info: Optional[ProfileResult] = None,
) -> None:
with self._condition: # type: ignore[attr-defined]
if self.cancelled():
Expand Down
11 changes: 3 additions & 8 deletions scaler/client/object_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ def clear(self):

self._connector.send(
ObjectInstruction.new_msg(
ObjectInstruction.ObjectInstructionType.Clear,
self._identity,
ObjectContent.new_msg(tuple()),
ObjectInstruction.ObjectInstructionType.Clear, self._identity, ObjectContent.new_msg(tuple())
)
)

Expand All @@ -106,7 +104,7 @@ def __construct_serializer(self) -> ObjectCache:
object_id,
ObjectContent.ObjectContentType.Serializer,
b"serializer",
chunk_to_list_of_bytes(serializer_bytes)
chunk_to_list_of_bytes(serializer_bytes),
)

def __construct_function(self, fn: Callable) -> ObjectCache:
Expand All @@ -125,8 +123,5 @@ def __construct_object(self, obj: Any, name: Optional[str] = None) -> ObjectCach
object_id = generate_object_id(self._identity, object_payload)
name_bytes = name.encode() if name else f"<obj {object_id.hex()[-6:]}>".encode()
return ObjectCache(
object_id,
ObjectContent.ObjectContentType.Object,
name_bytes,
chunk_to_list_of_bytes(object_payload)
object_id, ObjectContent.ObjectContentType.Object, name_bytes, chunk_to_list_of_bytes(object_payload)
)
2 changes: 1 addition & 1 deletion scaler/entry_points/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_args():
"When set, suspends worker processors using the SIGTSTP signal instead of a synchronization event, "
"fully halting computation on suspended tasks. Note that this may cause some tasks to fail if they "
"do not support being paused at the OS level (e.g. tasks requiring active network connections)."
)
),
)
parser.add_argument(
"--log-hub-address", "-la", default=None, type=ZMQConfig.from_string, help="address for Worker send logs"
Expand Down
106 changes: 106 additions & 0 deletions scaler/entry_points/symphony_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import argparse
import logging
import os
import signal
import socket

from scaler.io.config import (
DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
DEFAULT_IO_THREADS,
DEFAULT_NUMBER_OF_WORKER,
DEFAULT_WORKER_DEATH_TIMEOUT,
)
from scaler.utility.event_loop import EventLoopType, register_event_loop
from scaler.utility.logging.utility import setup_logger
from scaler.utility.zmq_config import ZMQConfig
from scaler.worker.symphony.worker import SymphonyWorker


def get_args():
parser = argparse.ArgumentParser(
"standalone symphony cluster", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--base-concurrency", "-n", type=int, default=DEFAULT_NUMBER_OF_WORKER, help="base task concurrency"
)
parser.add_argument(
"--worker-name", "-w", type=str, default=None, help="worker name, if not specified, it will be hostname"
)
parser.add_argument(
"--heartbeat-interval",
"-hi",
type=int,
default=DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
help="number of seconds to send heartbeat interval",
)
parser.add_argument(
"--death-timeout-seconds", "-ds", type=int, default=DEFAULT_WORKER_DEATH_TIMEOUT, help="death timeout seconds"
)
parser.add_argument(
"--event-loop", "-el", default="builtin", choices=EventLoopType.allowed_types(), help="select event loop type"
)
parser.add_argument(
"--io-threads", "-it", default=DEFAULT_IO_THREADS, help="specify number of io threads per worker"
)
parser.add_argument(
"--logging-paths",
"-lp",
nargs="*",
type=str,
default=("/dev/stdout",),
help='specify where cluster log should logged to, it can be multiple paths, "/dev/stdout" is default for '
"standard output, each worker will have its own log file with process id appended to the path",
)
parser.add_argument(
"--logging-level",
"-ll",
type=str,
choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"),
default="INFO",
help="specify the logging level",
)
parser.add_argument(
"--logging-config-file",
type=str,
default=None,
help="use standard python the .conf file the specify python logging file configuration format, this will "
"bypass --logging-paths and --logging-level at the same time, and this will not work on per worker logging",
)
parser.add_argument("address", type=ZMQConfig.from_string, help="scheduler address to connect to")
parser.add_argument("service_name", type=str, help="symphony service name")
return parser.parse_args()


def main():
args = get_args()
register_event_loop(args.event_loop)

if args.worker_name is None:
args.worker_name = f"{socket.gethostname().split('.')[0]}"

setup_logger(args.logging_paths, args.logging_config_file, args.logging_level)

worker = SymphonyWorker(
address=args.address,
name=args.worker_name,
service_name=args.service_name,
base_concurrency=args.base_concurrency,
heartbeat_interval_seconds=args.heartbeat_interval,
death_timeout_seconds=args.death_timeout_seconds,
event_loop=args.event_loop,
io_threads=args.io_threads,
)

def destroy(*args):
assert args is not None
logging.info(f"{SymphonyWorker.__class__.__name__}: shutting down Symphony worker[{worker.pid}]")
os.kill(worker.pid, signal.SIGINT)

signal.signal(signal.SIGINT, destroy)
signal.signal(signal.SIGTERM, destroy)

worker.start()
logging.info("Symphony worker started")

worker.join()
logging.info("Symphony worker stopped")
2 changes: 1 addition & 1 deletion scaler/scheduler/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def on_add_object(
object_id: bytes,
object_type: ObjectContent.ObjectContentType,
object_name: bytes,
object_bytes: List[bytes]
object_bytes: List[bytes],
):
raise NotImplementedError()

Expand Down
7 changes: 2 additions & 5 deletions scaler/scheduler/object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def on_add_object(
object_id: bytes,
object_type: ObjectContent.ObjectContentType,
object_name: bytes,
object_bytes: List[bytes]
object_bytes: List[bytes],
):
creation = _ObjectCreation(object_id, object_user, object_type, object_name, object_bytes)
logging.debug(
Expand Down Expand Up @@ -184,9 +184,6 @@ def __construct_response(self, request: ObjectRequest) -> ObjectResponse:
return ObjectResponse.new_msg(
ObjectResponse.ObjectResponseType.Content,
ObjectContent.new_msg(
tuple(request.object_ids),
tuple(object_types),
tuple(object_names),
tuple(object_bytes)
tuple(request.object_ids), tuple(object_types), tuple(object_names), tuple(object_bytes)
),
)
6 changes: 1 addition & 5 deletions scaler/worker/agent/heartbeat_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,5 @@ def __get_processor_status_from_holder(processor: ProcessorHolder) -> ProcessorS
resource = Resource.new_msg(0, 0)

return ProcessorStatus.new_msg(
processor.pid(),
processor.initialized(),
processor.task() is not None,
processor.suspended(),
resource,
processor.pid(), processor.initialized(), processor.task() is not None, processor.suspended(), resource
)
Loading

0 comments on commit 5153efc

Please sign in to comment.