Skip to content

Commit

Permalink
Pluggable Scheduler (#710)
Browse files Browse the repository at this point in the history
* [x] Make Scheduler class loadable from JSON configs
* [x] Update the Launcher and `run.py` to instantiate Scheduler from
JSON
* [x] Create JSON schema for the Scheduler config
* [x] Add unit tests for the new Scheduler JSON configs

Closes #700

---------

Co-authored-by: Brian Kroth <bpkroth@microsoft.com>
Co-authored-by: Brian Kroth <bpkroth@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 15, 2024
1 parent 2a09a07 commit 4166e96
Show file tree
Hide file tree
Showing 20 changed files with 395 additions and 41 deletions.
9 changes: 9 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@
],
"url": "./mlos_bench/mlos_bench/config/schemas/optimizers/optimizer-schema.json"
},
{
"fileMatch": [
"mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/**/*.jsonc",
"mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/**/*.json",
"mlos_bench/mlos_bench/config/schedulers/**/*.jsonc",
"mlos_bench/mlos_bench/config/schedulers/**/*.json"
],
"url": "./mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json"
},
{
"fileMatch": [
"mlos_bench/mlos_bench/tests/config/schemas/storage/test-cases/**/*.jsonc",
Expand Down
11 changes: 11 additions & 0 deletions mlos_bench/mlos_bench/config/schedulers/sync_scheduler.jsonc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Mock optimizer to test the benchmarking framework.
{
"$schema": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json",

"class": "mlos_bench.schedulers.SyncScheduler",

"config": {
"trial_config_repeat_count": 3,
"teardown": false
}
}
1 change: 1 addition & 0 deletions mlos_bench/mlos_bench/config/schemas/config_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class ConfigSchema(Enum):
GLOBALS = path_join(CONFIG_SCHEMA_DIR, "cli/globals-schema.json")
ENVIRONMENT = path_join(CONFIG_SCHEMA_DIR, "environments/environment-schema.json")
OPTIMIZER = path_join(CONFIG_SCHEMA_DIR, "optimizers/optimizer-schema.json")
SCHEDULER = path_join(CONFIG_SCHEMA_DIR, "schedulers/scheduler-schema.json")
SERVICE = path_join(CONFIG_SCHEMA_DIR, "services/service-schema.json")
STORAGE = path_join(CONFIG_SCHEMA_DIR, "storage/storage-schema.json")
TUNABLE_PARAMS = path_join(CONFIG_SCHEMA_DIR, "tunables/tunable-params-schema.json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
"type": "object",
"$ref": "./optimizers/optimizer-schema.json"
},
{
"description": "scheduler config",
"type": "object",
"$ref": "./schedulers/scheduler-schema.json"
},
{
"description": "service config",
"type": "object",
Expand Down
100 changes: 100 additions & 0 deletions mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json",
"title": "mlos_bench Scheduler config",

"$defs": {
"comment": {
"$comment": "This section contains reusable partial schema bits (or just split out for readability)"
},

"config_base_scheduler": {
"$comment": "config properties common to all Scheduler types.",
"type": "object",
"properties": {
"experiment_id": {
"$ref": "../cli/common-defs-subschemas.json#/$defs/experiment_id"
},
"trial_id": {
"$ref": "../cli/common-defs-subschemas.json#/$defs/trial_id"
},
"config_id": {
"$ref": "../cli/common-defs-subschemas.json#/$defs/config_id"
},
"teardown": {
"description": "Whether to teardown the experiment after running it.",
"type": "boolean"
},
"trial_config_repeat_count": {
"description": "Number of times to repeat a config.",
"type": "integer",
"minimum": 1,
"examples": [3, 5]
}
}
}
},

"description": "config for the mlos_bench scheduler",
"$comment": "top level schema document rules",
"type": "object",
"properties": {
"$schema": {
"description": "The schema to use for validating the scheduler config (accepts both URLs and local paths).",
"type": "string",
"$comment": "This is optional, but if provided, should match the name of this file.",
"pattern": "/schemas/schedulers/scheduler-schema.json$"
},

"description": {
"description": "Optional description of the config.",
"type": "string"
},

"class": {
"description": "The name of the scheduler class to use.",
"$comment": "required",
"enum": [
"mlos_bench.schedulers.SyncScheduler",
"mlos_bench.schedulers.sync_scheduler.SyncScheduler"
]
},

"config": {
"description": "The scheduler-specific config.",
"$comment": "Stub for scheduler-specific config appended with condition statements below",
"type": "object",
"minProperties": 1
}
},
"required": ["class"],

"oneOf": [
{
"$comment": "extensions to the 'config' object properties when synchronous scheduler is being used",
"if": {
"properties": {
"class": {
"enum": [
"mlos_bench.schedulers.SyncScheduler",
"mlos_bench.schedulers.sync_scheduler.SyncScheduler"
]
}
},
"required": ["class"]
},
"then": {
"properties": {
"config": {
"type": "object",
"allOf": [{ "$ref": "#/$defs/config_base_scheduler" }],
"$comment": "disallow other properties",
"unevaluatedProperties": false
}
}
},
"else": false
}
],
"unevaluatedProperties": false
}
67 changes: 54 additions & 13 deletions mlos_bench/mlos_bench/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from mlos_bench.services.local.local_exec import LocalExecService
from mlos_bench.services.config_persistence import ConfigPersistenceService

from mlos_bench.schedulers.base_scheduler import Scheduler

from mlos_bench.services.types.config_loader_type import SupportsConfigLoading


Expand Down Expand Up @@ -76,12 +78,6 @@ def __init__(self, description: str, long_text: str = "", argv: Optional[List[st
else:
config = {}

self.trial_config_repeat_count: int = (
args.trial_config_repeat_count or config.get("trial_config_repeat_count", 1)
)
if self.trial_config_repeat_count <= 0:
raise ValueError(f"Invalid trial_config_repeat_count: {self.trial_config_repeat_count}")

log_level = args.log_level or config.get("log_level", _LOG_LEVEL)
try:
log_level = int(log_level)
Expand Down Expand Up @@ -109,12 +105,16 @@ def __init__(self, description: str, long_text: str = "", argv: Optional[List[st
# It's useful to keep it there explicitly mostly for the --help output.
if args.experiment_id:
self.global_config['experiment_id'] = args.experiment_id
self.global_config = DictTemplater(self.global_config).expand_vars(use_os_env=True)
assert isinstance(self.global_config, dict)
# trial_config_repeat_count is a scheduler property but it's convenient to set it via command line
if args.trial_config_repeat_count:
self.global_config["trial_config_repeat_count"] = args.trial_config_repeat_count
# Ensure that the trial_id is present since it gets used by some other
# configs but is typically controlled by the run optimize loop.
self.global_config.setdefault('trial_id', 1)

self.global_config = DictTemplater(self.global_config).expand_vars(use_os_env=True)
assert isinstance(self.global_config, dict)

# --service cli args should override the config file values.
service_files: List[str] = config.get("services", []) + (args.service or [])
assert isinstance(self._parent_service, SupportsConfigLoading)
Expand Down Expand Up @@ -146,6 +146,8 @@ def __init__(self, description: str, long_text: str = "", argv: Optional[List[st
_LOG.info("Init storage: %s", self.storage)

self.teardown: bool = bool(args.teardown) if args.teardown is not None else bool(config.get("teardown", True))
self.scheduler = self._load_scheduler(args.scheduler or config.get("scheduler"))
_LOG.info("Init scheduler: %s", self.scheduler)

@property
def config_loader(self) -> ConfigPersistenceService:
Expand Down Expand Up @@ -203,9 +205,14 @@ def _parse_args(parser: argparse.ArgumentParser, argv: Optional[List[str]]) -> T
' a single trial with default (or specified in --tunable_values).')

parser.add_argument(
'--trial_config_repeat_count', '--trial-config-repeat-count', required=False, type=int, default=1,
'--trial_config_repeat_count', '--trial-config-repeat-count', required=False, type=int,
help='Number of times to repeat each config. Default is 1 trial per config, though more may be advised.')

parser.add_argument(
'--scheduler', required=False,
help='Path to the scheduler configuration file. By default, use' +
' a single worker synchronous scheduler.')

parser.add_argument(
'--storage', required=False,
help='Path to the storage configuration file.' +
Expand Down Expand Up @@ -337,17 +344,13 @@ def _load_optimizer(self, args_optimizer: Optional[str]) -> Optimizer:
in the --optimizer command line option. If config file not specified,
create a one-shot optimizer to run a single benchmark trial.
"""
if 'max_iterations' in self.global_config:
self.global_config['max_iterations'] *= self.trial_config_repeat_count
if args_optimizer is None:
# global_config may contain additional properties, so we need to
# strip those out before instantiating the basic oneshot optimizer.
config = {key: val for key, val in self.global_config.items() if key in OneShotOptimizer.BASE_SUPPORTED_CONFIG_PROPS}
return OneShotOptimizer(
self.tunables, config=config, service=self._parent_service)
class_config = self._config_loader.load_config(args_optimizer, ConfigSchema.OPTIMIZER)
if 'max_iterations' in class_config:
class_config['max_iterations'] *= self.trial_config_repeat_count
assert isinstance(class_config, Dict)
optimizer = self._config_loader.build_optimizer(tunables=self.tunables,
service=self._parent_service,
Expand Down Expand Up @@ -376,3 +379,41 @@ def _load_storage(self, args_storage: Optional[str]) -> Storage:
config=class_config,
global_config=self.global_config)
return storage

def _load_scheduler(self, args_scheduler: Optional[str]) -> Scheduler:
"""
Instantiate the Scheduler object from JSON file provided in the --scheduler
command line parameter.
Create a simple synchronous single-threaded scheduler if omitted.
"""
# Set `teardown` for scheduler only to prevent conflicts with other configs.
global_config = self.global_config.copy()
global_config.setdefault("teardown", self.teardown)
if args_scheduler is None:
# pylint: disable=import-outside-toplevel
from mlos_bench.schedulers.sync_scheduler import SyncScheduler
return SyncScheduler(
# All config values can be overridden from global config
config={
"experiment_id": "UNDEFINED - override from global config",
"trial_id": 0,
"config_id": -1,
"trial_config_repeat_count": 1,
"teardown": self.teardown,
},
global_config=self.global_config,
environment=self.environment,
optimizer=self.optimizer,
storage=self.storage,
root_env_config=self.root_env_config,
)
class_config = self._config_loader.load_config(args_scheduler, ConfigSchema.SCHEDULER)
assert isinstance(class_config, Dict)
return self._config_loader.build_scheduler(
config=class_config,
global_config=self.global_config,
environment=self.environment,
optimizer=self.optimizer,
storage=self.storage,
root_env_config=self.root_env_config,
)
21 changes: 2 additions & 19 deletions mlos_bench/mlos_bench/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from mlos_bench.launcher import Launcher
from mlos_bench.tunables.tunable_groups import TunableGroups
from mlos_bench.schedulers.sync_scheduler import SyncScheduler

_LOG = logging.getLogger(__name__)

Expand All @@ -25,27 +24,11 @@ def _main(argv: Optional[List[str]] = None) -> Tuple[Optional[float], Optional[T

launcher = Launcher("mlos_bench", "Systems autotuning and benchmarking tool", argv=argv)

# TODO: Instantiate Scheduler from JSON config
scheduler = SyncScheduler(
config={
"experiment_id": "UNDEFINED - override from global config",
"trial_id": 0, # Override from global config
"config_id": -1, # Override from global config
"trial_config_repeat_count": launcher.trial_config_repeat_count,
"teardown": launcher.teardown,
},
global_config=launcher.global_config,
environment=launcher.environment,
optimizer=launcher.optimizer,
storage=launcher.storage,
root_env_config=launcher.root_env_config,
)

with scheduler as scheduler_context:
with launcher.scheduler as scheduler_context:
scheduler_context.start()
scheduler_context.teardown()

(score, _config) = result = scheduler.get_best_observation()
(score, _config) = result = launcher.scheduler.get_best_observation()
# NOTE: This log line is used in test_launch_main_app_* unit tests:
_LOG.info("Final score: %s", score)
return result
Expand Down
39 changes: 36 additions & 3 deletions mlos_bench/mlos_bench/schedulers/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,34 @@ def __init__(self, *,
and the derived classes is called by the persistence service
after reading the class JSON configuration. Other objects like
the Environment and Optimizer are provided by the Launcher.
Parameters
----------
config : dict
The configuration for the scheduler.
global_config : dict
he global configuration for the experiment.
environment : Environment
The environment to benchmark/optimize.
optimizer : Optimizer
The optimizer to use.
storage : Storage
The storage to use.
root_env_config : str
Path to the root environment configuration.
"""
self.global_config = global_config
config = merge_parameters(dest=config.copy(), source=global_config)
config = merge_parameters(dest=config.copy(), source=global_config,
required_keys=["experiment_id", "trial_id"])

self._experiment_id = config["experiment_id"].strip()
self._trial_id = int(config["trial_id"])
self._config_id = int(config.get("config_id", -1))
self._trial_config_repeat_count: int = config.get("trial_config_repeat_count", 1)

self._trial_config_repeat_count = int(config.get("trial_config_repeat_count", 1))
if self._trial_config_repeat_count <= 0:
raise ValueError(f"Invalid trial_config_repeat_count: {self._trial_config_repeat_count}")

self._do_teardown = bool(config.get("teardown", True))

self.experiment: Optional[Storage.Experiment] = None
Expand All @@ -58,11 +78,24 @@ def __init__(self, *,
self.storage = storage
self._root_env_config = root_env_config

_LOG.debug("Scheduler instantiated: %s :: %s", self, config)

def __repr__(self) -> str:
"""
Produce a human-readable version of the Scheduler (mostly for logging).
Returns
-------
string : str
A human-readable version of the Scheduler.
"""
return self.__class__.__name__

def __enter__(self) -> 'Scheduler':
"""
Enter the scheduler's context.
"""
_LOG.debug("Optimizer START :: %s", self)
_LOG.debug("Scheduler START :: %s", self)
assert self.experiment is None
self.environment.__enter__()
self.optimizer.__enter__()
Expand Down
Loading

0 comments on commit 4166e96

Please sign in to comment.