Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

[Retiarii] Grid search, random and evolution strategy #3377

Merged
merged 13 commits into from
Feb 24, 2021
Prev Previous commit
Next Next commit
Refine evolution
  • Loading branch information
ultmaster committed Feb 10, 2021
commit d1e20b5846e52fb9f672d9d62734d561e2abfd48
162 changes: 41 additions & 121 deletions nni/retiarii/strategy/evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import dataclasses
import logging
import random
import time

from ..execution import submit_models
from ..graph import ModelStatus
Expand Down Expand Up @@ -33,23 +34,34 @@ def __init__(self, optimize_mode='maximize', population_size=100, sample_size=25

self._worst = float('-inf') if self.optimize_mode == 'maximize' else float('inf')

self._succeed_count = 0
self._success_count = 0
self._population = collections.deque()
self._running_models = []
self._polling_interval = 2.

def random(self, search_space):
return {k: random.choice(v) for k, v in search_space.items()}

def mutate(self, config, search_space):
new_config = {}
for k, v in config.items():
def mutate(self, parent, search_space):
child = {}
for k, v in parent.items():
if random.uniform(0, 1) < self.mutation_prob:
# NOTE: we do not exclude the original choice here for simplicity,
# which is slightly different from the original paper.
new_config[k] = random.choice(search_space[k])
child[k] = random.choice(search_space[k])
else:
new_config[k] = v
return new_config
child[k] = v
return child

def best_parent(self):
samples = [p for p in self.population] # copy population
random.shuffle(samples)
samples = list(samples)[:self.sample_size]
if self.optimize_mode == 'maximize':
parent = max(samples, key=lambda sample: sample.y)
else:
parent = min(samples, key=lambda sample: sample.y)
return parent.x

def run(self, base_model, applied_mutators):
search_space = dry_run_for_search_space(base_model, applied_mutators)
Expand All @@ -58,19 +70,30 @@ def run(self, base_model, applied_mutators):
while len(self._population) + len(self._running_models) <= self._population:
# try to submit new models
if len(self._population) + len(self._running_models) < self._population:
random_config = self.random(search_space)
random_model = get_targeted_model(base_model, applied_mutators, random_config)
submit_models(random_model)
self._running_models.append((random_config, random_model))
config = self.random(search_space)
self._submit_config(config, base_model, applied_mutators)
# collect results
self._remove_failed_models_from_running_list()
self._move_succeeded_models_to_population()
time.sleep(self._polling_interval)

def _is_better(self, a, b):
if self.optimize_mode == 'maximize':
return a > b
else:
return a < b
# Resource-aware mutation of models
_logger.info('Running mutations.')
while self._success_count + len(self._running_models) <= self.cycles:
# try to submit new models
if self._success_count + len(self._running_models) < self.cycles:
config = self.mutate(self.best_parent(), search_space)
self._submit_config(config, base_model, applied_mutators)
# collect results
self._remove_failed_models_from_running_list()
self._move_succeeded_models_to_population()
time.sleep(self._polling_interval)

def _submit_config(self, config, base_model, mutators):
model = get_targeted_model(base_model, mutators, config)
submit_models(model)
self._running_models.append((config, model))
return model

def _remove_failed_models_from_running_list(self):
if self.on_failure == 'ignore':
Expand All @@ -90,110 +113,7 @@ def _move_succeeded_models_to_population(self):
self._population.append(Individual(config, metric))
if len(self._population) >= self.population_size:
self._population.popleft()
completed_indices.append(i)
completed_indices.append(i)
for i in completed_indices:
self._success_count += 1
self._running_models.pop(i)


class RegularizedEvolution:
def __init__(self, search_space,
concurrency, population_size, sample_size, cycles, mutation_prob,
reward_fn, command, setup):
self.search_space = search_space
self.concurrency = concurrency
self.population_size = population_size
self.command = command
self.setup = setup
self.population_size = population_size
self.sample_size = sample_size
self.cycles = cycles
self.mutation_prob = mutation_prob
self.reward_fn = reward_fn
assert self.cycles >= self.population_size >= self.sample_size

self.population = collections.deque()

def train_and_eval(self, config):
pid = get_trial_manager().submit_new_trial(self.command, config, self.setup)

while True:
try:
metrics = get_trial_manager().query_metrics(pid)
if metrics is not None:
break
time.sleep(5)
continue
except TrialFailed:
_logger.warning(f'Config: {config}. Trial failed and use -inf as metrics.')
metrics = float('-inf')
break
return self.reward_fn(config, metrics)

def random_config(self):
config = {}
for k, v in SearchSpaceUtils.flatten_search_space(self.search_space).items():
config[k] = v.random()
_logger.info(f'Generated random config: {config}')
return SearchSpaceUtils.restore_config(config, self.search_space)

def mutate_config(self, parent_config):
parent_config = SearchSpaceUtils.flatten_config(parent_config)
config = {}
for k, v in SearchSpaceUtils.flatten_search_space(self.search_space).items():
config[k] = parent_config[k]
if random.uniform(0, 1) < self.mutation_prob:
config[k] = v.random(excludes=[parent_config[k]])
_logger.info(f'Generated mutated config: {config}')
return SearchSpaceUtils.restore_config(config, self.search_space)

def import_(self, individuals):
self.individuals = sorted(individuals, key=lambda i: i.reward)[-self.population_size:]
random.shuffle(self.individuals)
_logger.info(f'Imported individuals: {self.individuals}')

def _run_random(self):
individual = Individual(self.random_config(), None)
individual.reward = self.train_and_eval(individual.config)
self.population.append(individual)

def _run_mutation(self):
# Sample randomly chosen models from the current population.
try:
_lock.acquire()
samples = copy.deepcopy(self.population)
finally:
_lock.release()
random.shuffle(samples)
samples = list(samples)[:self.population_size]
parent = max(samples, key=lambda i: i.reward)

individual = Individual(self.mutate_config(parent.config), None)
individual.reward = self.train_and_eval(individual.config)
try:
_lock.acquire()
self.population.append(individual)
self.population.popleft()
finally:
_lock.release()

def _wait_for_futures_and_shutdown(self, futures, pool):
for i in futures:
try:
i.result()
except:
traceback.print_exc()
for k in futures:
k.cancel()
pool.shutdown(wait=True)
raise
pool.shutdown()

def run(self):
# Initialize the population with random models.
pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency)
fs = [pool.submit(self._run_random) for _ in range(self.population_size - len(self.population))]
self._wait_for_futures_and_shutdown(fs, pool)

pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency)
fs = [pool.submit(self._run_mutation) for _ in range(self.cycles - self.population_size)]
self._wait_for_futures_and_shutdown(fs, pool)