Skip to content

Commit

Permalink
[IMP] runbot: global state computation
Browse files Browse the repository at this point in the history
One of the most problematic concurrency issue is when multiple children
tries to write the state/result on the parent concurrently.

Multiple partial solutions are possible here:
- avoid to write if the state doesn't changes
- avoid to write on a build belonging to another host

A maybe overkill solution would be to add a message queue for an host,
signaling that one of the child state changed.

An intermediate solution would be to let the host check the state of the
children while there are some of them, and update the local build state
assynchronously himself.

We can actualy use the 'waiting' global state to now if we need to
continue to check the build state and result.

While a build is not done or running, we need to check all children
result and state on case they were updated.

One corner case is when rebuilding a child: a new child is added
but the parent is maybe not in the 'waiting' global state anymore.
If this is the case, we need to recusivelly change the state of the
parents to waiting so that they will update again.
  • Loading branch information
Xavier-Do committed Mar 3, 2023
1 parent b05b943 commit 96ca0ff
Show file tree
Hide file tree
Showing 16 changed files with 543 additions and 372 deletions.
15 changes: 13 additions & 2 deletions runbot/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
import re
import subprocess
import time
import warnings

# unsolved issue https://github.com/docker/docker-py/issues/2928
Expand All @@ -27,7 +28,7 @@


_logger = logging.getLogger(__name__)

docker_stop_failures = {}

class Command():

Expand Down Expand Up @@ -114,7 +115,6 @@ def _docker_build(build_dir, image_tag):
_logger.error('Build of image %s failed with this BUILD error:', image_tag)
msg = f"{e.msg}\n{''.join(l.get('stream') or '' for l in e.build_log)}"
return (False, msg)
_logger.info('Dockerfile %s finished build', image_tag)
return (True, None)


Expand Down Expand Up @@ -203,6 +203,13 @@ def _docker_stop(container_name, build_dir):
"""Stops the container named container_name"""
container_name = sanitize_container_name(container_name)
_logger.info('Stopping container %s', container_name)
if container_name in docker_stop_failures:
if docker_stop_failures[container_name] + 60 * 60 < time.time():
_logger.warning('Removing %s from docker_stop_failures', container_name)
del docker_stop_failures[container_name]
else:
_logger.warning('Skipping %s, is in failure', container_name)
return
docker_client = docker.from_env()
if build_dir:
end_file = os.path.join(build_dir, 'end-%s' % container_name)
Expand All @@ -212,11 +219,15 @@ def _docker_stop(container_name, build_dir):
try:
container = docker_client.containers.get(container_name)
container.stop(timeout=1)
return
except docker.errors.NotFound:
_logger.error('Cannnot stop container %s. Container not found', container_name)
except docker.errors.APIError as e:
_logger.error('Cannnot stop container %s. API Error "%s"', container_name, e)

docker_stop_failures[container_name] = time.time()


def docker_state(container_name, build_dir):
container_name = sanitize_container_name(container_name)
exist = os.path.exists(os.path.join(build_dir, 'exist-%s' % container_name))
Expand Down
353 changes: 178 additions & 175 deletions runbot/models/build.py

Large diffs are not rendered by default.

22 changes: 10 additions & 12 deletions runbot/models/build_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ class ConfigStep(models.Model):
# python
python_code = fields.Text('Python code', tracking=True, default=PYTHON_DEFAULT)
python_result_code = fields.Text('Python code for result', tracking=True, default=PYTHON_DEFAULT)
ignore_triggered_result = fields.Boolean('Ignore error triggered in logs', tracking=True, default=False)
running_job = fields.Boolean('Job final state is running', default=False, help="Docker won't be killed if checked")
# create_build
create_config_ids = fields.Many2many('runbot.build.config', 'runbot_build_config_step_ids_create_config_ids_rel', string='New Build Configs', tracking=True, index=True)
Expand Down Expand Up @@ -275,14 +274,16 @@ def _run(self, build):
url = f"{log_url}/runbot/static/build/{build.dest}/logs/{self.name}.txt"
log_link = f'[@icon-file-text]({url})'
build._log('run', 'Starting step **%s** from config **%s** %s' % (self.name, build.params_id.config_id.name, log_link), log_type='markdown', level='SEPARATOR')
self._run_step(build, log_path)
return self._run_step(build, log_path)

def _run_step(self, build, log_path, **kwargs):
build.log_counter = self.env['ir.config_parameter'].sudo().get_param('runbot.runbot_maxlogs', 100)
run_method = getattr(self, '_run_%s' % self.job_type)
docker_params = run_method(build, log_path, **kwargs)
if docker_params:
build._docker_run(**docker_params)
def start_docker():
build._docker_run(**docker_params)
return start_docker

def _run_create_build(self, build, log_path):
count = 0
Expand Down Expand Up @@ -909,23 +910,20 @@ def _coverage_params(self, build, modules_to_install):
return ['--omit', ','.join(pattern_to_omit)]

def _make_results(self, build):
build_values = {}
log_time = self._get_log_last_write(build)
if log_time:
build_values['job_end'] = log_time
build.job_end = log_time
if self.job_type == 'python' and self.python_result_code and self.python_result_code != PYTHON_DEFAULT:
build_values.update(self._make_python_results(build))
build.write(self._make_python_results(build))
elif self.job_type in ['install_odoo', 'python']:
if self.coverage:
build_values.update(self._make_coverage_results(build))
build.write(self._make_coverage_results(build))
if self.test_enable or self.test_tags:
build_values.update(self._make_tests_results(build))
build.write(self._make_tests_results(build))
elif self.job_type == 'test_upgrade':
build_values.update(self._make_upgrade_results(build))
build.write(self._make_upgrade_results(build))
elif self.job_type == 'restore':
build_values.update(self._make_restore_results(build))

return build_values
build.write(self._make_restore_results(build))

def _make_python_results(self, build):
eval_ctx = self.make_python_ctx(build)
Expand Down
9 changes: 5 additions & 4 deletions runbot/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ def create(self, vals_list):
build_logs = logs_by_build_id[build.id]
for ir_log in build_logs:
ir_log['active_step_id'] = build.active_step.id
if ir_log['level'].upper() == 'WARNING':
build.triggered_result = 'warn'
elif ir_log['level'].upper() == 'ERROR':
build.triggered_result = 'ko'
if build.local_state != 'running':
if ir_log['level'].upper() == 'WARNING':
build.local_result = 'warn'
elif ir_log['level'].upper() == 'ERROR':
build.local_result = 'ko'
return super().create(vals_list)

def _markdown(self):
Expand Down
16 changes: 15 additions & 1 deletion runbot/models/host.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import getpass
import time

from collections import defaultdict

Expand Down Expand Up @@ -112,9 +113,11 @@ def _docker_build(self):
self.clear_caches() # needed to ensure that content is updated on all hosts
for dockerfile in self.env['runbot.dockerfile'].search([('to_build', '=', True)]):
self._docker_build_dockerfile(dockerfile, static_path)
_logger.info('Done...')

def _docker_build_dockerfile(self, dockerfile, workdir):
_logger.info('Building %s, %s', dockerfile.name, hash(str(dockerfile.dockerfile)))
start = time.time()
# _logger.info('Building %s, %s', dockerfile.name, hash(str(dockerfile.dockerfile)))
docker_build_path = os.path.join(workdir, 'docker', dockerfile.image_tag)
os.makedirs(docker_build_path, exist_ok=True)

Expand All @@ -137,6 +140,10 @@ def _docker_build_dockerfile(self, dockerfile, workdir):
dockerfile.to_build = False
dockerfile.message_post(body=f'Build failure:\n{msg}')
self.env['runbot.runbot'].warning(f'Dockerfile build "{dockerfile.image_tag}" failed on host {self.name}')
else:
duration = time.time() - start
if duration > 1:
_logger.info('Dockerfile %s finished build in %s', dockerfile.image_tag, duration)

def _get_work_path(self):
return os.path.abspath(os.path.join(os.path.dirname(__file__), '../static'))
Expand Down Expand Up @@ -244,3 +251,10 @@ def process_logs(self, build_ids=None):
logs_db_name = self.env['ir.config_parameter'].get_param('runbot.logdb_name')
with local_pg_cursor(logs_db_name) as local_cr:
local_cr.execute("DELETE FROM ir_logging WHERE id in %s", [tuple(local_log_ids)])

def get_build_domain(self, domain=None):
domain = domain or []
return [('host', '=', self.name)] + domain

def get_builds(self, domain, order=None):
return self.env['runbot.build'].search(self.get_build_domain(domain), order=order)
76 changes: 38 additions & 38 deletions runbot/models/runbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,78 +39,79 @@ def _root(self):
def _scheduler(self, host):
self._gc_testing(host)
self._commit()
for build in self._get_builds_with_requested_actions(host):
build = build.browse(build.id) # remove preftech ids, manage build one by one
processed = 0
for build in host.get_builds([('requested_action', 'in', ['wake_up', 'deathrow'])]):
processed += 1
build._process_requested_actions()
self._commit()
host.process_logs()
self._commit()
for build in self._get_builds_to_schedule(host):
#for build in host.get_builds([('global_state', 'not in', ('running', 'done'))]).sorted(lambda b: -b.id):
# # processed += 1 # not sure if we should update
# build = build.browse(build.id) # remove preftech ids, manage build one by one

# self._commit()
for build in host.get_builds([('local_state', 'in', ['testing', 'running'])]) | self._get_builds_to_init(host):
build = build.browse(build.id) # remove preftech ids, manage build one by one
build._schedule()
result = build._schedule()
if result:
processed += 1
self._commit()
self._assign_pending_builds(host, host.nb_worker, [('build_type', '!=', 'scheduled')])
if callable(result):
result() # start docker
processed += self._assign_pending_builds(host, host.nb_worker, [('build_type', '!=', 'scheduled')])
self._commit()
self._assign_pending_builds(host, host.nb_worker - 1 or host.nb_worker)
processed += self._assign_pending_builds(host, host.nb_worker - 1 or host.nb_worker)
self._commit()
self._assign_pending_builds(host, host.nb_worker and host.nb_worker + 1, [('build_type', '=', 'priority')])
processed += self._assign_pending_builds(host, host.nb_worker and host.nb_worker + 1, [('build_type', '=', 'priority')])
self._commit()
for build in self._get_builds_to_init(host):
build = build.browse(build.id) # remove preftech ids, manage build one by one
build._init_pendings(host)
self._commit()
self._gc_running(host)
self._commit()
self._reload_nginx()

def build_domain_host(self, host, domain=None):
domain = domain or []
return [('host', '=', host.name)] + domain

def _get_builds_with_requested_actions(self, host):
return self.env['runbot.build'].search(self.build_domain_host(host, [('requested_action', 'in', ['wake_up', 'deathrow'])]))

def _get_builds_to_schedule(self, host):
return self.env['runbot.build'].search(self.build_domain_host(host, [('local_state', 'in', ['testing', 'running'])]))
self._commit()
return processed

def _assign_pending_builds(self, host, nb_worker, domain=None):
if host.assigned_only or nb_worker <= 0:
return
domain_host = self.build_domain_host(host)
reserved_slots = self.env['runbot.build'].search_count(domain_host + [('local_state', 'in', ('testing', 'pending'))])
return 0
reserved_slots = len(host.get_builds([('local_state', 'in', ('testing', 'pending'))]))
assignable_slots = (nb_worker - reserved_slots)
if assignable_slots > 0:
allocated = self._allocate_builds(host, assignable_slots, domain)
if allocated:
_logger.info('Builds %s where allocated to runbot', allocated)
return len(allocated)
return 0

def _get_builds_to_init(self, host):
domain_host = self.build_domain_host(host)
used_slots = self.env['runbot.build'].search_count(domain_host + [('local_state', '=', 'testing')])
domain_host = host.get_build_domain()
used_slots = len(host.get_builds([('local_state', '=', 'testing')]))
available_slots = host.nb_worker - used_slots
if available_slots <= 0:
return self.env['runbot.build']
return self.env['runbot.build'].search(domain_host + [('local_state', '=', 'pending')], limit=available_slots)
build_to_init = self.env['runbot.build']
if available_slots > 0:
build_to_init |= self.env['runbot.build'].search(domain_host + [('local_state', '=', 'pending')], limit=available_slots)
if available_slots + 1 > 0:
build_to_init |= self.env['runbot.build'].search(domain_host + [('local_state', '=', 'pending'), ('build_type', '=', 'priority')], limit=1)
return build_to_init

def _gc_running(self, host):
running_max = host.get_running_max()
domain_host = self.build_domain_host(host)
cannot_be_killed_ids = host.get_builds([('keep_running', '=', True)]).ids
Build = self.env['runbot.build']
cannot_be_killed_ids = Build.search(domain_host + [('keep_running', '=', True)]).ids
sticky_bundles = self.env['runbot.bundle'].search([('sticky', '=', True), ('project_id.keep_sticky_running', '=', True)])
cannot_be_killed_ids += [
build.id
for build in sticky_bundles.mapped('last_batchs.slot_ids.build_id')
if build.host == host.name
][:running_max]
build_ids = Build.search(domain_host + [('local_state', '=', 'running'), ('id', 'not in', cannot_be_killed_ids)], order='job_start desc').ids
build_ids = host.get_builds([('local_state', '=', 'running'), ('id', 'not in', cannot_be_killed_ids)], order='job_start desc').ids
Build.browse(build_ids)[running_max:]._kill()

def _gc_testing(self, host):
"""garbage collect builds that could be killed"""
# decide if we need room
Build = self.env['runbot.build']
domain_host = self.build_domain_host(host)
domain_host = host.get_build_domain()
testing_builds = Build.search(domain_host + [('local_state', 'in', ['testing', 'pending']), ('requested_action', '!=', 'deathrow')])
used_slots = len(testing_builds)
available_slots = host.nb_worker - used_slots
Expand Down Expand Up @@ -277,14 +278,13 @@ def _fetch_loop_turn(self, host, pull_info_failures, default_sleep=1):
_logger.warning('Removing %s from pull_info_failures', pr_number)
del pull_info_failures[pr_number]


return manager.get('sleep', default_sleep)

def _scheduler_loop_turn(self, host, default_sleep=5):
_logger.info('Scheduling...')
def _scheduler_loop_turn(self, host, sleep=5):
with self.manage_host_exception(host) as manager:
self._scheduler(host)
return manager.get('sleep', default_sleep)
if self._scheduler(host):
sleep = 0.1
return manager.get('sleep', sleep)

@contextmanager
def manage_host_exception(self, host):
Expand Down
6 changes: 5 additions & 1 deletion runbot/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ def setUp(self):
self.start_patcher('docker_stop', 'odoo.addons.runbot.container._docker_stop')
self.start_patcher('docker_get_gateway_ip', 'odoo.addons.runbot.models.build_config.docker_get_gateway_ip', None)

self.start_patcher('cr_commit', 'odoo.sql_db.Cursor.commit', None)
self.start_patcher('repo_commit', 'odoo.addons.runbot.models.runbot.Runbot._commit', None)
self.start_patcher('_local_cleanup_patcher', 'odoo.addons.runbot.models.build.BuildResult._local_cleanup')
self.start_patcher('_local_pg_dropdb_patcher', 'odoo.addons.runbot.models.build.BuildResult._local_pg_dropdb')
Expand All @@ -194,6 +193,11 @@ def setUp(self):

self.start_patcher('_get_py_version', 'odoo.addons.runbot.models.build.BuildResult._get_py_version', 3)

def no_commit(*_args, **_kwargs):
_logger.info('Skipping commit')

self.patch(self.env.cr, 'commit', no_commit)


def start_patcher(self, patcher_name, patcher_path, return_value=DEFAULT, side_effect=DEFAULT, new=DEFAULT):

Expand Down
Loading

0 comments on commit 96ca0ff

Please sign in to comment.