Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMP] runbot: avoid concurrent update when updating build status #692

Open
wants to merge 2 commits into
base: 16.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 82 additions & 60 deletions runbot/models/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ class BuildResult(models.Model):
trigger_id = fields.Many2one('runbot.trigger', related='params_id.trigger_id', store=True, index=True)

# state machine
global_state = fields.Selection(make_selection(state_order), string='Status', compute='_compute_global_state', store=True, recursive=True)
global_state = fields.Selection(make_selection(state_order), string='Status', default='pending', required=True)
local_state = fields.Selection(make_selection(state_order), string='Build Status', default='pending', required=True, index=True)
global_result = fields.Selection(make_selection(result_order), string='Result', compute='_compute_global_result', store=True, recursive=True)
global_result = fields.Selection(make_selection(result_order), string='Result', default='ok')
local_result = fields.Selection(make_selection(result_order), string='Build Result', default='ok')

requested_action = fields.Selection([('wake_up', 'To wake up'), ('deathrow', 'To kill')], string='Action requested', index=True)
Expand Down Expand Up @@ -254,20 +254,6 @@ def _compute_log_list(self): # storing this field because it will be access trh
build.log_list = ','.join({step.name for step in build.params_id.config_id.step_ids() if step._has_log()})
# TODO replace logic, add log file to list when executed (avoid 404, link log on docker start, avoid fake is_docker_step)

@api.depends('children_ids.global_state', 'local_state')
def _compute_global_state(self):
for record in self:
waiting_score = record._get_state_score('waiting')
children_ids = [child for child in record.children_ids if not child.orphan_result]
if record._get_state_score(record.local_state) > waiting_score and children_ids: # if finish, check children
children_state = record._get_youngest_state([child.global_state for child in children_ids])
if record._get_state_score(children_state) > waiting_score:
record.global_state = record.local_state
else:
record.global_state = 'waiting'
else:
record.global_state = record.local_state

@api.depends('gc_delay', 'job_end')
def _compute_gc_date(self):
icp = self.env['ir.config_parameter'].sudo()
Expand Down Expand Up @@ -299,22 +285,6 @@ def _get_youngest_state(self, states):
def _get_state_score(self, result):
return state_order.index(result)

@api.depends('children_ids.global_result', 'local_result', 'children_ids.orphan_result')
def _compute_global_result(self):
for record in self:
if record.local_result and record._get_result_score(record.local_result) >= record._get_result_score('ko'):
record.global_result = record.local_result
else:
children_ids = [child for child in record.children_ids if not child.orphan_result]
if children_ids:
children_result = record._get_worst_result([child.global_result for child in children_ids], max_res='ko')
if record.local_result:
record.global_result = record._get_worst_result([record.local_result, children_result])
else:
record.global_result = children_result
else:
record.global_result = record.local_result

def _get_worst_result(self, results, max_res=False):
results = [result for result in results if result] # filter Falsy values
index = max([self._get_result_score(result) for result in results]) if results else 0
Expand All @@ -340,37 +310,50 @@ def copy_data(self, default=None):
})
return [values]

@api.model_create_multi
def create(self, vals_list):
for values in vals_list:
if 'local_state' in values:
values['global_state'] = values['local_state']
if 'local_result' in values:
values['global_result'] = values['local_result']
records = super().create(vals_list)
if records.parent_id:
records.parent_id._update_globals()
return records

def write(self, values):
# some validation to ensure db consistency
if 'local_state' in values:
if values['local_state'] == 'done':
self.filtered(lambda b: b.local_state != 'done').commit_export_ids.unlink()

local_result = values.get('local_result')
for build in self:
if local_result and local_result != self._get_worst_result([build.local_result, local_result]): # dont write ok on a warn/error build
if len(self) == 1:
values.pop('local_result')
else:
raise ValidationError('Local result cannot be set to a less critical level')
init_global_results = self.mapped('global_result')
init_global_states = self.mapped('global_state')
init_local_states = self.mapped('local_state')

res = super(BuildResult, self).write(values)
for init_global_result, build in zip(init_global_results, self):
if init_global_result != build.global_result:
build._github_status()

for init_local_state, build in zip(init_local_states, self):
if init_local_state not in ('done', 'running') and build.local_state in ('done', 'running'):
build.build_end = now()

for init_global_state, build in zip(init_global_states, self):
if init_global_state not in ('done', 'running') and build.global_state in ('done', 'running'):
build._github_status()

return res
# don't update if the value doesn't change to avoid triggering concurrent updates
def minimal_update(records, field_name):
updated = self.browse()
if field_name in values:
field_result = values.pop(field_name)
updated = records.filtered(lambda b: (b[field_name] != field_result))
if updated:
super(BuildResult, updated).write({field_name: field_result})
return updated

# local result is a special case since we don't only want to avoid an update if the value didn't change, but also if the value is less than the previous one
# example: don't write 'ok' if result is 'ko' or 'warn'
updated = self.browse()
if 'local_result' in values:
to_update = self.filtered(lambda b: (self._get_result_score(values['local_result']) > self._get_result_score(b.local_result)))
updated = minimal_update(to_update, 'local_result')
updated |= minimal_update(self, 'local_state')
updated._update_globals()
parents_to_update = minimal_update(self, 'global_result').parent_id
parents_to_update |= minimal_update(self, 'global_state').parent_id
parents_to_update |= minimal_update(self, 'orphan_result').parent_id
parents_to_update._notify_global_update()

if values:
super().write(values)
return True

def _add_child(self, param_values, orphan=False, description=False, additionnal_commit_links=False):

Expand Down Expand Up @@ -476,8 +459,6 @@ def _rebuild(self, message=None):
self.orphan_result = True

new_build = self.create(values)
if self.parent_id:
new_build._github_status()
user = request.env.user if request else self.env.user
new_build._log('rebuild', 'Rebuild initiated by %s%s' % (user.name, (' :%s' % message) if message else ''))

Expand Down Expand Up @@ -676,6 +657,48 @@ def _process_requested_actions(self):
build.write({'requested_action': False, 'local_state': 'done'})
continue

def _notify_global_update(self):
for record in self:
if not record.host_id:
record._update_globals()
else:
self.env['runbot.host.message'].create({
'host_id': record.host_id.id,
'build_id': record.id,
'message': 'global_updated',
})

def _update_globals(self):
for record in self:
children = record.children_ids.filtered(lambda child: not child.orphan_result)
global_result = record.local_result
if children:
child_result = record._get_worst_result(children.mapped('global_result'), max_res='ko')
global_result = record._get_worst_result([record.local_result, child_result])
if global_result != record.global_result:
record.global_result = global_result
if not record.parent_id:
record._github_status() # failfast

init_state = record.global_state
testing_children = any(child.global_state not in ('running', 'done') for child in children)
global_state = record.local_state
if testing_children:
child_state = 'waiting'
global_state = record._get_youngest_state([record.local_state, child_state])

if global_state != record.global_state:
record.global_state = global_state

ending_build = init_state not in ('done', 'running') and record.global_state in ('done', 'running')

if ending_build:
if not record.local_result: # Set 'ok' result if no result set (no tests job on build)
record.local_result = 'ok'
record.build_end = now()
if not record.parent_id:
record._github_status()

def _schedule(self):
"""schedule the build"""
icp = self.env['ir.config_parameter'].sudo()
Expand Down Expand Up @@ -958,7 +981,6 @@ def _kill(self, result=None):
v['local_result'] = result
build.write(v)
self.env.cr.commit()
build._github_status()
self.invalidate_cache()

def _ask_kill(self, lock=True, message=None):
Expand Down
7 changes: 6 additions & 1 deletion runbot/models/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,12 @@ class MessageQueue(models.Model):

def _process(self):
records = self
# todo consume messages here
global_updates = records.filtered(lambda r: r.message == 'global_updated')
global_updates.build_id._update_globals()
records -= global_updates
# ask_kills = records.filtered(lambda r: r.message == 'ask_kill')
# ask_kills.build_id._ask_kill()
# records -= ask_kills
if records:
for record in records:
self.env['runbot.runbot'].warning(f'Host {record.host_id.name} got an unexpected message {record.message}')
Expand Down
1 change: 0 additions & 1 deletion runbot/models/runbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ def _fetch_loop_turn(self, host, pull_info_failures, default_sleep=1):
_logger.warning('Removing %s from pull_info_failures', pr_number)
del pull_info_failures[pr_number]


return manager.get('sleep', default_sleep)

def _scheduler_loop_turn(self, host, sleep=5):
Expand Down
19 changes: 9 additions & 10 deletions runbot/tests/test_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,9 @@ def test_base_fields(self):

# test a bulk write, that one cannot change from 'ko' to 'ok'
builds = self.Build.browse([build.id, other.id])
with self.assertRaises(ValidationError):
builds.write({'local_result': 'warn'})
# self.assertEqual(build.local_result, 'warn')
# self.assertEqual(other.local_result, 'ko')
builds.write({'local_result': 'warn'})
self.assertEqual(build.local_result, 'warn')
self.assertEqual(other.local_result, 'ko')


def test_markdown_description(self):
Expand Down Expand Up @@ -385,16 +384,16 @@ def test_children(self):
with self.assertQueries([]): # no change should be triggered
build1_2.local_state = "testing"

# with self.assertQueries(['''UPDATE "runbot_build" SET "global_state"=%s,"local_state"=%s,"write_date"=%s,"write_uid"=%s WHERE id IN %s''']):
build1.local_state = 'done'
build1.flush()
with self.assertQueries(['''UPDATE "runbot_build" SET "global_state"=%s,"local_state"=%s,"write_date"=%s,"write_uid"=%s WHERE id IN %s''']):
build1.local_state = 'done'
build1.flush()

self.assertEqual('waiting', build1.global_state)
self.assertEqual('testing', build1_1.global_state)

# with self.assertQueries([]): # write the same value, no update should be triggered
build1.local_state = 'done'
build1.flush()
with self.assertQueries([]): # write the same value, no update should be triggered
build1.local_state = 'done'
build1.flush()

build1_1.local_state = 'done'

Expand Down
46 changes: 45 additions & 1 deletion runbot_populate/demo/runbot_demo.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<field name="repo_id" ref="repo_runbot"/>
</record>

<record id="version_16" model="runbot.version">
<field name="name">16.0</field>
</record>

<!-- BUNDLES -->
<record id="bundle_16_1" model="runbot.bundle">
<field name="name">saas-16.1</field>
Expand Down Expand Up @@ -146,14 +150,54 @@
<field name="ci_context"/>
</record>


<record id="runbot_build_config_linting" model="runbot.build.config">
<field name="name">Linting</field>
</record>
<record id="runbot_build_config_security" model="runbot.build.config">
<field name="name">Security</field>
</record>

<record id="runbot_build_config_split" model="runbot.build.config">
<field name="name">Split</field>
</record>

<record id="runbot_build_config_post_install" model="runbot.build.config">
<field name="name">Post install</field>
</record>

<record id="main_host" model="runbot.host">
<field name="name">main host</field>
</record>

<record id="build_base_params" model="runbot.build.params">
<field name="config_id" ref="runbot_build_config_split"/>
<field name="version_id" ref="version_16"/>
<field name="project_id" ref="project_runbot"/>
</record>

<record id="build_base" model="runbot.build">
<field name="params_id" ref="build_base_params"/>
<field name="host">main host</field>
</record>

<record id="build_child_params" model="runbot.build.params">
<field name="config_id" ref="runbot_build_config_post_install"/>
<field name="version_id" ref="version_16"/>
<field name="project_id" ref="project_runbot"/>
</record>

<record id="build_child1" model="runbot.build">
<field name="params_id" ref="build_child_params"/>
<field name="parent_id" ref="build_base"/>
<field name="host">main host</field>
</record>

<record id="build_child2" model="runbot.build">
<field name="params_id" ref="build_child_params"/>
<field name="parent_id" ref="build_base"/>
<field name="host">main host</field>
</record>

<function model="runbot.runbot" name="_create_demo_data">
</function>

Expand Down
1 change: 1 addition & 0 deletions runbot_populate/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import test_concurrency
55 changes: 55 additions & 0 deletions runbot_populate/tests/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from odoo import api, SUPERUSER_ID
from odoo.tests import TransactionCase
from unittest.mock import patch


class TestConcurrency(TransactionCase):
def test_local_status_update(self):
"""
This test ensures that a parent build global state will eventually be updated
even if updated concurrenctly in 2 different transactions, without transaction error
"""
with self.registry.cursor() as cr0:
env0 = api.Environment(cr0, SUPERUSER_ID, {})
host = env0.ref('runbot_populate.main_host')
host._process_messages() # ensure queue is empty
parent_build = env0.ref('runbot_populate.build_base')
build_child1 = env0.ref('runbot_populate.build_child1')
build_child2 = env0.ref('runbot_populate.build_child2')
parent_build.local_state = 'done'
self.assertEqual(host.host_message_ids.mapped('message'), [])
build_child1.local_state = 'testing'
build_child2.local_state = 'testing'
self.assertEqual(host.host_message_ids.mapped('message'), ['global_updated', 'global_updated'])
self.assertEqual(host.host_message_ids.build_id, parent_build)
host._process_messages()
self.assertEqual(parent_build.global_state, 'waiting')
env0.cr.commit() # youplahé

with self.registry.cursor() as cr1:
env1 = api.Environment(cr1, SUPERUSER_ID, {})
with self.registry.cursor() as cr2:
env2 = api.Environment(cr2, SUPERUSER_ID, {})
build_child_cr1 = env1['runbot.build'].browse(build_child1.id)
build_child_cr2 = env2['runbot.build'].browse(build_child2.id)
self.assertEqual(build_child_cr1.parent_id.global_state, 'waiting')
self.assertEqual(build_child_cr1.parent_id.children_ids.mapped('local_state'), ['testing', 'testing'])
self.assertEqual(build_child_cr2.parent_id.global_state, 'waiting')
self.assertEqual(build_child_cr2.parent_id.children_ids.mapped('local_state'), ['testing', 'testing'])
build_child_cr1.local_state = 'done'
build_child_cr2.local_state = 'done'
# from the point of view of each transaction, the other one local_state didn't changed
self.assertEqual(build_child_cr1.parent_id.children_ids.mapped('local_state'), ['testing', 'done'])
self.assertEqual(build_child_cr2.parent_id.global_state, 'waiting')
self.assertEqual(build_child_cr2.parent_id.children_ids.mapped('local_state'), ['done', 'testing'])
env1.cr.commit()
env2.cr.commit()
env0.cr.commit() # not usefull just to ensure we have efefct of other transactions
env0.cache.invalidate()
self.assertEqual(parent_build.children_ids.mapped('local_state'), ['done', 'done'])
self.assertEqual(parent_build.children_ids.mapped('global_state'), ['done', 'done'])
self.assertEqual(host.host_message_ids.mapped('message'), ['global_updated', 'global_updated'])
self.assertEqual(host.host_message_ids.build_id, parent_build)
# at this point, this assertion is true, but not expected : self.assertEqual(parent_build.global_state, 'waiting')
host._process_messages()
self.assertEqual(parent_build.global_state, 'done')