Skip to content

Commit

Permalink
[IMP] runbot: global state computation
Browse files Browse the repository at this point in the history
One of the most problematic concurrency issue is when multiple children
tries to write the state/result on the parent concurrently.

Multiple partial solutions are possible here:
- avoid to write if the state doesn't changes
- avoid to write on a build belonging to another host

A maybe overkill solution would be to add a message queue for an host,
signaling that one of the child state changed.

An intermediate solution would be to let the host check the state of the
children while there are some of them, and update the local build state
assynchronously himself.

We can actualy use the 'waiting' global state to now if we need to
continue to check the build state and result.

While a build is not done or running, we need to check all children
result and state on case they were updated.

One corner case is when rebuilding a child: a new child is added
but the parent is maybe not in the 'waiting' global state anymore.
If this is the case, we need to recusivelly change the state of the
parents to waiting so that they will update again.
  • Loading branch information
Xavier-Do committed Dec 1, 2022
1 parent 65d203b commit c790750
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 82 deletions.
110 changes: 48 additions & 62 deletions runbot/models/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ def _compute_commit_ids(self):

def create(self, values):
params = self.new(values)
match = self._find_existing(params.fingerprint)
if match:
return match
values = self._convert_to_write(params._cache)
return super().create(values)
record = self._find_existing(params.fingerprint)
if not record:
values = self._convert_to_write(params._cache)
record = super().create(values)
return record

def _find_existing(self, fingerprint):
return self.env['runbot.build.params'].search([('fingerprint', '=', fingerprint)], limit=1)
Expand Down Expand Up @@ -156,9 +156,9 @@ class BuildResult(models.Model):
trigger_id = fields.Many2one('runbot.trigger', related='params_id.trigger_id', store=True, index=True)

# state machine
global_state = fields.Selection(make_selection(state_order), string='Status', compute='_compute_global_state', store=True, recursive=True)
global_state = fields.Selection(make_selection(state_order), string='Status', default='pending', required=True)
local_state = fields.Selection(make_selection(state_order), string='Build Status', default='pending', required=True, index=True)
global_result = fields.Selection(make_selection(result_order), string='Result', compute='_compute_global_result', store=True, recursive=True)
global_result = fields.Selection(make_selection(result_order), string='Result')
local_result = fields.Selection(make_selection(result_order), string='Build Result')
triggered_result = fields.Selection(make_selection(result_order), string='Triggered Result') # triggered by db only

Expand Down Expand Up @@ -239,20 +239,6 @@ def _compute_log_list(self): # storing this field because it will be access trh
build.log_list = ','.join({step.name for step in build.params_id.config_id.step_ids() if step._has_log()})
# TODO replace logic, add log file to list when executed (avoid 404, link log on docker start, avoid fake is_docker_step)

#@api.depends('children_ids.global_state', 'local_state')
#def _compute_global_state(self):
# for record in self:
# waiting_score = record._get_state_score('waiting')
# children_ids = [child for child in record.children_ids if not child.orphan_result]
# if record._get_state_score(record.local_state) > waiting_score and children_ids: # if finish, check children
# children_state = record._get_youngest_state([child.global_state for child in children_ids])
# if record._get_state_score(children_state) > waiting_score:
# record.global_state = record.local_state
# else:
# record.global_state = 'waiting'
# else:
# record.global_state = record.local_state

@api.depends('gc_delay', 'job_end')
def _compute_gc_date(self):
icp = self.env['ir.config_parameter'].sudo()
Expand Down Expand Up @@ -284,22 +270,6 @@ def _get_youngest_state(self, states):
def _get_state_score(self, result):
return state_order.index(result)

@api.depends('children_ids.global_result', 'local_result', 'children_ids.orphan_result')
def _compute_global_result(self):
for record in self:
if record.local_result and record._get_result_score(record.local_result) >= record._get_result_score('ko'):
record.global_result = record.local_result
else:
children_ids = [child for child in record.children_ids if not child.orphan_result]
if children_ids:
children_result = record._get_worst_result([child.global_result for child in children_ids], max_res='ko')
if record.local_result:
record.global_result = record._get_worst_result([record.local_result, children_result])
else:
record.global_result = children_result
else:
record.global_result = record.local_result

def _get_worst_result(self, results, max_res=False):
results = [result for result in results if result] # filter Falsy values
index = max([self._get_result_score(result) for result in results]) if results else 0
Expand All @@ -310,6 +280,11 @@ def _get_worst_result(self, results, max_res=False):
def _get_result_score(self, result):
return result_order.index(result)

def _set_result(self, result):
for record in self:
if self._get_result_score(record.result) < self._get_result_score(result):
record.local_result = result

@api.depends('active_step')
def _compute_job(self):
for build in self:
Expand Down Expand Up @@ -367,11 +342,6 @@ def result_multi(self):
return 'warning'
return 'ko' # ?

#def update_build_end(self):
# for build in self:
# build.build_end = now()
# if build.parent_id and build.parent_id.local_state in ('running', 'done'):
# build.parent_id.update_build_end()

@api.depends('params_id.version_id.name')
def _compute_dest(self):
Expand Down Expand Up @@ -438,15 +408,14 @@ def _rebuild(self, message=None):
values['host'] = self.host
values['keep_host'] = True
if self.parent_id:
self._check_parent_state()
values.update({
'parent_id': self.parent_id.id,
'description': self.description,
})
self.orphan_result = True

new_build = self.create(values)
if self.parent_id:
new_build._github_status()
user = request.env.user if request else self.env.user
new_build._log('rebuild', 'Rebuild initiated by %s%s' % (user.name, (' :%s' % message) if message else ''))

Expand All @@ -463,6 +432,12 @@ def _rebuild(self, message=None):
slot.active = False
return new_build

def _check_parent_state(self):
if self.parent_id and self.parent_id.global_state in ('running', 'done'):
# when adding a multi
self.parent_id.global_state = 'waiting'
self.parent_id.check_parent_state()

def _skip(self, reason=None):
"""Mark builds ids as skipped"""
if reason:
Expand Down Expand Up @@ -661,21 +636,43 @@ def _process_requested_actions(self):

def _update_globals(self):
for record in self:
if record.global_state == 'waiting':
testing_children_ids = record.children_ids.filtered(lambda child: not child.orphan_result and child.global_state not in ('running', 'done'))
if not testing_children_ids:
record.global_state = record.local_state
init_state = record.global_state
children = record.children_ids.filtered(lambda child: not child.orphan_result)
global_result = record.local_result
if children:
child_result = record._get_worst_result(children.mapped('global_result'), max_res='ko')
global_result = record._get_worst_result([record.local_result, child_result])
if global_result != record.global_result:
record.global_result = global_result
if not record.parent_id:
record._github_status() # failfast

testing_children = any(child.global_state not in ('running', 'done') for child in children)
global_state = record.local_state
if testing_children:
child_state = 'waiting'
global_state = record._get_youngest_state([record.local_state, child_state])
if global_state != record.global_state:
record.global_state = global_state

ending_build = init_state not in ('done', 'running') and record.global_state in ('done', 'running')

if ending_build:
if not record.local_result: # Set 'ok' result if no result set (no tests job on build)
record.local_result = 'ok'
record.build_end = now()
if not record.parent_id:
record._github_status()

def _schedule(self):
"""schedule the build"""
icp = self.env['ir.config_parameter'].sudo()
hosts_by_name = {h.name: h for h in self.env['runbot.host'].search([('name', 'in', self.mapped('host'))])}
hosts_by_build = {b.id: hosts_by_name[b.host] for b in self}

for build in self:
build._update_globals()
if build.local_state not in ['testing', 'running']:
return
build._update_globals()
if build.local_state in ('waiting', 'done'):
continue
if build.local_state == 'testing':
Expand All @@ -684,7 +681,6 @@ def _schedule(self):
worst_result = self._get_worst_result([build.triggered_result, build.local_result])
if worst_result != build.local_result:
build.local_result = build.triggered_result
build._github_status() # failfast
# check if current job is finished
_docker_state = docker_state(build._get_docker_name(), build._path())
if _docker_state == 'RUNNING':
Expand Down Expand Up @@ -730,17 +726,8 @@ def _schedule(self):

build_values.update(build._next_job_values()) # find next active_step or set to done


ending_build = build.local_state not in ('done', 'running') and build_values.get('local_state') in ('done', 'running')
if ending_build:
build.update_build_end()

build.write(build_values)
if ending_build:
if not build.local_result: # Set 'ok' result if no result set (no tests job on build)
build.local_result = 'ok'
build._logger("No result set, setting ok by default")
build._github_status()

build._run_job()


Expand Down Expand Up @@ -934,7 +921,6 @@ def _kill(self, result=None):
v['local_result'] = result
build.write(v)
self.env.cr.commit()
build._github_status()
self.invalidate_cache()

def _ask_kill(self, lock=True, message=None):
Expand Down
9 changes: 5 additions & 4 deletions runbot/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ def create(self, vals_list):
for build in builds:
build_logs = logs_by_build_id[build.id]
for ir_log in build_logs:
if ir_log['level'].upper() == 'WARNING':
build.triggered_result = 'warn'
elif ir_log['level'].upper() == 'ERROR':
build.triggered_result = 'ko'
if not build.active_step.ignore_triggered_result:
if ir_log['level'].upper() == 'WARNING':
build._set_result('warn')
elif ir_log['level'].upper() == 'ERROR':
build._set_result('ko')
return super().create(vals_list)

def _markdown(self):
Expand Down
7 changes: 7 additions & 0 deletions runbot/models/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,10 @@ def process_logs(self, build_ids=None):
logs_db_name = self.env['ir.config_parameter'].get_param('runbot.logdb_name')
with local_pg_cursor(logs_db_name) as local_cr:
local_cr.execute("DELETE FROM ir_logging WHERE id in %s", [tuple(local_log_ids)])

def get_build_domain(self, domain=None):
domain = domain or []
return [('host', '=', self.name)] + domain

def get_builds(self, domain):
self.env['runbot.build'].search(self.get_build_domain(domain))
25 changes: 9 additions & 16 deletions runbot/models/runbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ def _root(self):
def _scheduler(self, host):
self._gc_testing(host)
self._commit()
for build in self._get_builds_with_requested_actions(host):
for build in host.get_builds([('requested_action', 'in', ['wake_up', 'deathrow'])]):
build._process_requested_actions()
self._commit()
host.process_logs()
self._commit()
for build in self._get_builds_to_schedule(host):
for build in host.get_builds([('global_state', 'not in', ('running', 'done'))]).sorted(lambda b: -b.id):
build._update_globals()
self._commit()
for build in host.get_builds([('local_state', 'in', ['testing', 'running'])]):
build._schedule()
self._commit()
self._assign_pending_builds(host, host.nb_worker, [('build_type', '!=', 'scheduled')])
Expand All @@ -58,20 +61,10 @@ def _scheduler(self, host):
self._commit()
self._reload_nginx()

def build_domain_host(self, host, domain=None):
domain = domain or []
return [('host', '=', host.name)] + domain

def _get_builds_with_requested_actions(self, host):
return self.env['runbot.build'].search(self.build_domain_host(host, [('requested_action', 'in', ['wake_up', 'deathrow'])]))

def _get_builds_to_schedule(self, host):
return self.env['runbot.build'].search(self.build_domain_host(host, [('local_state', 'in', ['testing', 'running'])]))

def _assign_pending_builds(self, host, nb_worker, domain=None):
if host.assigned_only or nb_worker <= 0:
return
domain_host = self.build_domain_host(host)
domain_host = host.build_domain()
reserved_slots = self.env['runbot.build'].search_count(domain_host + [('local_state', 'in', ('testing', 'pending'))])
assignable_slots = (nb_worker - reserved_slots)
if assignable_slots > 0:
Expand All @@ -81,7 +74,7 @@ def _assign_pending_builds(self, host, nb_worker, domain=None):
_logger.info('Builds %s where allocated to runbot', allocated)

def _get_builds_to_init(self, host):
domain_host = self.build_domain_host(host)
domain_host = ost.build_domain()
used_slots = self.env['runbot.build'].search_count(domain_host + [('local_state', '=', 'testing')])
available_slots = host.nb_worker - used_slots
if available_slots <= 0:
Expand All @@ -90,7 +83,7 @@ def _get_builds_to_init(self, host):

def _gc_running(self, host):
running_max = host.get_running_max()
domain_host = self.build_domain_host(host)
domain_host = ost.build_domain()
Build = self.env['runbot.build']
cannot_be_killed_ids = Build.search(domain_host + [('keep_running', '=', True)]).ids
sticky_bundles = self.env['runbot.bundle'].search([('sticky', '=', True), ('project_id.keep_sticky_running', '=', True)])
Expand All @@ -106,7 +99,7 @@ def _gc_testing(self, host):
"""garbage collect builds that could be killed"""
# decide if we need room
Build = self.env['runbot.build']
domain_host = self.build_domain_host(host)
domain_host = host.build_domain()
testing_builds = Build.search(domain_host + [('local_state', 'in', ['testing', 'pending']), ('requested_action', '!=', 'deathrow')])
used_slots = len(testing_builds)
available_slots = host.nb_worker - used_slots
Expand Down
4 changes: 4 additions & 0 deletions runbot/tests/test_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ def test_children(self):
'parent_id': build1_1.id,
})

builds = build1_1_1 | build1_1_2 | build1_1 | build1_2 | build1

def assert_state(global_state, build):
self.assertEqual(build.global_state, global_state)

Expand All @@ -362,6 +364,8 @@ def assert_state(global_state, build):
build1.local_state = 'done'
build1_1.local_state = 'done'

builds._update_globals()

assert_state('waiting', build1)
assert_state('waiting', build1_1)
assert_state('pending', build1_2)
Expand Down

0 comments on commit c790750

Please sign in to comment.