Skip to content

Commit

Permalink
[IMP] runbot: global state computation
Browse files Browse the repository at this point in the history
One of the most problematic concurrency issue is when multiple children
tries to write the state/result on the parent concurrently.

Multiple partial solutions are possible here:
- avoid to write if the state doesn't changes
- avoid to write on a build belonging to another host

A maybe overkill solution would be to add a message queue for an host,
signaling that one of the child state changed.

An intermediate solution would be to let the host check the state of the
children while there are some of them, and update the local build state
assynchronously himself.

We can actualy use the 'waiting' global state to now if we need to
continue to check the build state and result.

While a build is not done or running, we need to check all children
result and state on case they were updated.

One corner case is when rebuilding a child: a new child is added
but the parent is maybe not in the 'waiting' global state anymore.
If this is the case, we need to recusivelly change the state of the
parents to waiting so that they will update again.
  • Loading branch information
Xavier-Do committed Mar 17, 2023
1 parent e7c63d2 commit b18d187
Show file tree
Hide file tree
Showing 23 changed files with 616 additions and 263 deletions.
3 changes: 3 additions & 0 deletions runbot/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ def _docker_stop(container_name, build_dir):
docker_stop_failures[container_name] = time.time()


docker_stop_failures[container_name] = time.time()


def docker_state(container_name, build_dir):
container_name = sanitize_container_name(container_name)
exist = os.path.exists(os.path.join(build_dir, 'exist-%s' % container_name))
Expand Down
2 changes: 1 addition & 1 deletion runbot/models/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Branch(models.Model):
pr_title = fields.Char('Pr Title')
pr_body = fields.Char('Pr Body')
pr_author = fields.Char('Pr Author')

pull_head_name = fields.Char(compute='_compute_branch_infos', string='PR HEAD name', readonly=1, store=True)
pull_head_remote_id = fields.Many2one('runbot.remote', 'Pull head repository', compute='_compute_branch_infos', store=True, index=True)
target_branch_name = fields.Char(compute='_compute_branch_infos', string='PR target branch', store=True)
Expand Down
188 changes: 121 additions & 67 deletions runbot/models/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,25 @@ def _compute_commit_ids(self):
for params in self:
params.commit_ids = params.commit_link_ids.commit_id

def create(self, values):
params = self.new(values)
match = self._find_existing(params.fingerprint)
if match:
return match
values = self._convert_to_write(params._cache)
return super().create(values)
@api.model_create_multi
def create(self, values_list):
records = self.browse()
for values in values_list:
params = self.new(values)
record = self._find_existing(params.fingerprint)
if record:
records |= record
else:
values = self._convert_to_write(params._cache)
records |= super().create(values)
return records

def _find_existing(self, fingerprint):
return self.env['runbot.build.params'].search([('fingerprint', '=', fingerprint)], limit=1)

def write(self, vals):
if not self.env.registry.loaded:
return
raise UserError('Params cannot be modified')


Expand Down Expand Up @@ -156,15 +163,16 @@ class BuildResult(models.Model):
trigger_id = fields.Many2one('runbot.trigger', related='params_id.trigger_id', store=True, index=True)

# state machine
global_state = fields.Selection(make_selection(state_order), string='Status', compute='_compute_global_state', store=True, recursive=True)
global_state = fields.Selection(make_selection(state_order), string='Status', default='pending', required=True)
local_state = fields.Selection(make_selection(state_order), string='Build Status', default='pending', required=True, index=True)
global_result = fields.Selection(make_selection(result_order), string='Result', compute='_compute_global_result', store=True, recursive=True)
local_result = fields.Selection(make_selection(result_order), string='Build Result')
global_result = fields.Selection(make_selection(result_order), string='Result', default='ok')
local_result = fields.Selection(make_selection(result_order), string='Build Result', default='ok')
triggered_result = fields.Selection(make_selection(result_order), string='Triggered Result') # triggered by db only

requested_action = fields.Selection([('wake_up', 'To wake up'), ('deathrow', 'To kill')], string='Action requested', index=True)
# web infos
host = fields.Char('Host')
host = fields.Char('Host name')
host_id = fields.Many2one('runbot.host', string="Host", compute='_compute_host_id')
keep_host = fields.Boolean('Keep host on rebuild and for children')

port = fields.Integer('Port')
Expand Down Expand Up @@ -208,8 +216,8 @@ class BuildResult(models.Model):

parent_id = fields.Many2one('runbot.build', 'Parent Build', index=True)
parent_path = fields.Char('Parent path', index=True)
top_parent = fields.Many2one('runbot.build', compute='_compute_top_parent')
ancestors = fields.Many2many('runbot.build', compute='_compute_ancestors')
top_parent = fields.Many2one('runbot.build', compute='_compute_top_parent')
ancestors = fields.Many2many('runbot.build', compute='_compute_ancestors')
# should we add a has children stored boolean?
children_ids = fields.One2many('runbot.build', 'parent_id')

Expand All @@ -226,6 +234,7 @@ class BuildResult(models.Model):
killable = fields.Boolean('Killable')

database_ids = fields.One2many('runbot.database', 'build_id')
commit_export_ids = fields.One2many('runbot.commit.export', 'build_id')

static_run = fields.Char('Static run URL')

Expand All @@ -234,26 +243,18 @@ def _compute_display_name(self):
for build in self:
build.display_name = build.description or build.config_id.name

@api.depends('host')
def _compute_host_id(self):
get_host = self.env['runbot.host']._get_host
for record in self:
record.host_id = get_host(record.host)

@api.depends('params_id.config_id')
def _compute_log_list(self): # storing this field because it will be access trhoug repo viewn and keep track of the list at create
for build in self:
build.log_list = ','.join({step.name for step in build.params_id.config_id.step_ids() if step._has_log()})
# TODO replace logic, add log file to list when executed (avoid 404, link log on docker start, avoid fake is_docker_step)

@api.depends('children_ids.global_state', 'local_state')
def _compute_global_state(self):
for record in self:
waiting_score = record._get_state_score('waiting')
children_ids = [child for child in record.children_ids if not child.orphan_result]
if record._get_state_score(record.local_state) > waiting_score and children_ids: # if finish, check children
children_state = record._get_youngest_state([child.global_state for child in children_ids])
if record._get_state_score(children_state) > waiting_score:
record.global_state = record.local_state
else:
record.global_state = 'waiting'
else:
record.global_state = record.local_state

@api.depends('gc_delay', 'job_end')
def _compute_gc_date(self):
icp = self.env['ir.config_parameter'].sudo()
Expand Down Expand Up @@ -285,22 +286,6 @@ def _get_youngest_state(self, states):
def _get_state_score(self, result):
return state_order.index(result)

@api.depends('children_ids.global_result', 'local_result', 'children_ids.orphan_result')
def _compute_global_result(self):
for record in self:
if record.local_result and record._get_result_score(record.local_result) >= record._get_result_score('ko'):
record.global_result = record.local_result
else:
children_ids = [child for child in record.children_ids if not child.orphan_result]
if children_ids:
children_result = record._get_worst_result([child.global_result for child in children_ids], max_res='ko')
if record.local_result:
record.global_result = record._get_worst_result([record.local_result, children_result])
else:
record.global_result = children_result
else:
record.global_result = record.local_result

def _get_worst_result(self, results, max_res=False):
results = [result for result in results if result] # filter Falsy values
index = max([self._get_result_score(result) for result in results]) if results else 0
Expand All @@ -326,20 +311,50 @@ def copy_data(self, default=None):
})
return [values]

@api.model_create_multi
def create(self, vals_list):
for values in vals_list:
if 'local_state' in values:
values['global_state'] = values['local_state']
if 'local_result' in values:
values['global_result'] = values['local_result']
records = super().create(vals_list)
if records.parent_id:
records.parent_id._update_globals()
return records

def write(self, values):
# some validation to ensure db consistency
if 'local_state' in values:
if values['local_state'] == 'done':
self.env['runbot.commit.export'].search([('build_id', 'in', self.ids)]).unlink()
local_result = values.get('local_result')
for build in self:
if local_result and local_result != self._get_worst_result([build.local_result, local_result]): # dont write ok on a warn/error build
if len(self) == 1:
values.pop('local_result')
else:
raise ValidationError('Local result cannot be set to a less critical level')
res = super(BuildResult, self).write(values)
return res
self.filtered(lambda b: b.local_state != 'done').commit_export_ids.unlink()

# don't update if the value doesn't change to avoid triggering concurrent updates
def minimal_update(records, field_name):
updated = self.browse()
if field_name in values:
field_result = values.pop(field_name)
updated = records.filtered(lambda b: (b[field_name] != field_result))
if updated:
super(BuildResult, updated).write({field_name: field_result})
return updated

# local result is a special case since we don't only want to avoid an update if the value didn't change, but also if the value is less than the previous one
# example: don't write 'ok' if result is 'ko' or 'warn'
updated = self.browse()
if 'local_result' in values:
to_update = self.filtered(lambda b: (self._get_result_score(values['local_result']) > self._get_result_score(b.local_result)))
updated = minimal_update(to_update, 'local_result')
updated |= minimal_update(self, 'local_state')
updated._update_globals()
parents_to_update = minimal_update(self, 'global_result').parent_id
parents_to_update |= minimal_update(self, 'global_state').parent_id
parents_to_update |= minimal_update(self, 'orphan_result').parent_id
parents_to_update._notify_global_update()

if values:
super().write(values)
return True

def _add_child(self, param_values, orphan=False, description=False, additionnal_commit_links=False):

Expand Down Expand Up @@ -373,11 +388,6 @@ def result_multi(self):
return 'warning'
return 'ko' # ?

def update_build_end(self):
for build in self:
build.build_end = now()
if build.parent_id and build.parent_id.local_state in ('running', 'done'):
build.parent_id.update_build_end()

@api.depends('params_id.version_id.name')
def _compute_dest(self):
Expand Down Expand Up @@ -451,8 +461,6 @@ def _rebuild(self, message=None):
self.orphan_result = True

new_build = self.create(values)
if self.parent_id:
new_build._github_status()
user = request.env.user if request else self.env.user
new_build._log('rebuild', 'Rebuild initiated by %s%s' % (user.name, (' :%s' % message) if message else ''))

Expand Down Expand Up @@ -616,12 +624,15 @@ def _process_requested_actions(self):
continue

if build.requested_action == 'wake_up':
if docker_state(build._get_docker_name(), build._path()) == 'RUNNING':
build.write({'requested_action': False, 'local_state': 'running'})
build._log('wake_up', 'Waking up failed, **docker is already running**', log_type='markdown', level='SEPARATOR')
if build.local_state != 'done':
build.requested_action = False
build._log('wake_up', 'Impossible to wake-up, build is not done', log_type='markdown', level='SEPARATOR')
elif not os.path.exists(build._path()):
build.write({'requested_action': False, 'local_state': 'done'})
build.requested_action = False
build._log('wake_up', 'Impossible to wake-up, **build dir does not exists anymore**', log_type='markdown', level='SEPARATOR')
elif docker_state(build._get_docker_name(), build._path()) == 'RUNNING':
build.write({'requested_action': False, 'local_state': 'running'})
build._log('wake_up', 'Waking up failed, **docker is already running**', log_type='markdown', level='SEPARATOR')
else:
try:
log_path = build._path('logs', 'wake_up.txt')
Expand All @@ -641,14 +652,56 @@ def _process_requested_actions(self):
run_step = step_ids[-1]
else:
run_step = self.env.ref('runbot.runbot_build_config_step_run')
run_step._run_step(build, log_path, force=True)
run_step._run_step(build, log_path, force=True)()
# reload_nginx will be triggered by _run_run_odoo
except Exception:
_logger.exception('Failed to wake up build %s', build.dest)
build._log('_schedule', 'Failed waking up build', level='ERROR')
build.write({'requested_action': False, 'local_state': 'done'})
continue

def _notify_global_update(self):
for record in self:
if not record.host_id:
record._update_globals()
else:
self.env['runbot.host.message'].create({
'host_id': record.host_id.id,
'build_id': record.id,
'message': 'global_updated',
})

def _update_globals(self):
for record in self:
children = record.children_ids.filtered(lambda child: not child.orphan_result)
global_result = record.local_result
if children:
child_result = record._get_worst_result(children.mapped('global_result'), max_res='ko')
global_result = record._get_worst_result([record.local_result, child_result])
if global_result != record.global_result:
record.global_result = global_result
if not record.parent_id:
record._github_status() # failfast

init_state = record.global_state
testing_children = any(child.global_state not in ('running', 'done') for child in children)
global_state = record.local_state
if testing_children:
child_state = 'waiting'
global_state = record._get_youngest_state([record.local_state, child_state])

if global_state != record.global_state:
record.global_state = global_state

ending_build = init_state not in ('done', 'running') and record.global_state in ('done', 'running')

if ending_build:
if not record.local_result: # Set 'ok' result if no result set (no tests job on build)
record.local_result = 'ok'
record.build_end = now()
if not record.parent_id:
record._github_status()

def _schedule(self):
"""schedule the build"""
icp = self.env['ir.config_parameter'].sudo()
Expand Down Expand Up @@ -734,6 +787,7 @@ def _schedule(self):
break
build.active_step = new_step.id
build.local_state = new_step._step_state()

return build._run_job()

def _run_job(self):
Expand Down Expand Up @@ -926,7 +980,6 @@ def _kill(self, result=None):
v['local_result'] = result
build.write(v)
self.env.cr.commit()
build._github_status()
self.invalidate_cache()

def _ask_kill(self, lock=True, message=None):
Expand Down Expand Up @@ -1024,8 +1077,9 @@ def _cmd(self, python_params=None, py_version=None, local_only=True, sub_command
elif grep(config_path, "--xmlrpc-interface"):
command.add_config_tuple("xmlrpc_interface", "127.0.0.1")

log_db = self.env['ir.config_parameter'].get_param('runbot.logdb_name')
if grep(config_path, "log-db"):
command.add_config_tuple("log_db", "runbot_logs")
command.add_config_tuple("log_db", log_db)
if grep(config_path, 'log-db-level'):
command.add_config_tuple("log_db_level", '25')

Expand Down
1 change: 0 additions & 1 deletion runbot/models/build_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ class ConfigStep(models.Model):
# python
python_code = fields.Text('Python code', tracking=True, default=PYTHON_DEFAULT)
python_result_code = fields.Text('Python code for result', tracking=True, default=PYTHON_DEFAULT)
ignore_triggered_result = fields.Boolean('Ignore error triggered in logs', tracking=True, default=False)
running_job = fields.Boolean('Job final state is running', default=False, help="Docker won't be killed if checked")
# create_build
create_config_ids = fields.Many2many('runbot.build.config', 'runbot_build_config_step_ids_create_config_ids_rel', string='New Build Configs', tracking=True, index=True)
Expand Down
1 change: 0 additions & 1 deletion runbot/models/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class Bundle(models.Model):
# extra_info
for_next_freeze = fields.Boolean('Should be in next freeze')


@api.depends('name')
def _compute_host_id(self):
assigned_only = None
Expand Down
9 changes: 5 additions & 4 deletions runbot/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ def create(self, vals_list):
build_logs = logs_by_build_id[build.id]
for ir_log in build_logs:
ir_log['active_step_id'] = build.active_step.id
if ir_log['level'].upper() == 'WARNING':
build.triggered_result = 'warn'
elif ir_log['level'].upper() == 'ERROR':
build.triggered_result = 'ko'
if build.local_state != 'running':
if ir_log['level'].upper() == 'WARNING':
build.local_result = 'warn'
elif ir_log['level'].upper() == 'ERROR':
build.local_result = 'ko'
return super().create(vals_list)

def _markdown(self):
Expand Down
Loading

0 comments on commit b18d187

Please sign in to comment.