Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨增加任务扩展字段,记录任务运行信息 #704

Merged
merged 4 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions server/projects/main/apps/job/apis/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ def post(self, request, **kwargs):
raise PermissionDenied("您没有执行该操作的权限,该扫描方案已私有化,您不在该方案权限配置的关联分支项目权限成员列表中!!!")
slz = self.get_serializer(data=request.data)
if slz.is_valid(raise_exception=True):
logger.info("参数校验通过,开始初始化任务,参数如下:")
logger.info(json.dumps(slz.validated_data, indent=4))
logger.info("[Project: %s] 参数校验通过,开始初始化任务,参数如下:" % project.id)
logger.info(json.dumps(slz.validated_data))
try:
job_id, scan_id, task_infos = codeproj_core.create_local_scan(
project=project, creator=UserManager.get_username(request.user),
Expand Down Expand Up @@ -152,8 +152,7 @@ def post(self, request, **kwargs):
raise PermissionDenied("您没有执行该操作的权限,该扫描方案已私有化,您不在该方案权限配置的关联分支项目权限成员列表中!!!")
slz = self.get_serializer(data=request.data)
if slz.is_valid(raise_exception=True):
logger.info("参数校验通过,开始结束任务,参数如下:")
logger.info(json.dumps(request.data, indent=4))
logger.info("[Job: %s]参数校验通过,开始结束任务" % job.id)
try:
job_id, scan_id = codeproj_core.finish_job_from_client(
job, project, slz.validated_data, puppy_create=True)
Expand Down Expand Up @@ -215,7 +214,7 @@ def _get_task(self, request, node_id, occupy=False):
if killed_task:
return killed_task
# 查询并占用节点,如果节点忙碌,则返回空
if not core.NodeTaskRegisterManager.set_node_busy_state(node_id, **request.data):
if not core.NodeTaskRegisterManager.set_node_busy_state(node, **request.data):
return None

task, processes = core.NodeTaskRegisterManager.register_task(node, occupy)
Expand Down
122 changes: 81 additions & 41 deletions server/projects/main/apps/job/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

# 项目内 import
from apps.codeproj.models import CodeLintInfo, CodeMetricCCInfo, CodeMetricClocInfo, \
CodeMetricDupInfo, Project, Repository
CodeMetricDupInfo, Organization, Project, ProjectTeam, Repository
from apps.job import models
from apps.nodemgr.models import Node, NodeToolProcessRelation
from util import errcode
Expand All @@ -48,6 +48,27 @@ def check_job_tag_disabled(cls, tag):
else:
return False

@classmethod
def get_enable_nodes(cls, job, tag):
"""获取可执行的节点列表
"""
project = job.project
repo = project.repo
team = repo.project_team
org = repo.organization

superusers = User.objects.filter(is_superuser=True)
users = [user for user in repo.get_members(Repository.PermissionEnum.ADMIN)] + \
[user for user in repo.get_members(Repository.PermissionEnum.USER)] + \
[user for user in superusers]
if team:
users += [user for user in team.get_members(ProjectTeam.PermissionEnum.ADMIN)]
if org:
users += [user for user in org.get_members(Organization.PermissionEnum.ADMIN)]
users = set(users)
nodes = Node.objects.filter(manager__in=users, exec_tags=tag).exclude(enabled=Node.EnabledEnum.DISABLED)
return nodes

@classmethod
def add_job_to_queue(cls, job):
"""根据分配规则,将job下相关的task和task_process分配到相应的node节点去:
Expand All @@ -62,21 +83,13 @@ def add_job_to_queue(cls, job):
if cls.check_job_tag_disabled(tag):
message = "当前项目配置的运行环境标签[%s]已停用,请在扫描方案中调整\"运行环境标签\"后重新启动。" % tag.name
logger.warning(message)
JobCloseHandler.revoke_job(job, errcode.E_CLIENT_CONFIG_ERROR, message)
JobCloseHandler.revoke_job(job, errcode.E_USER_CONFIG_NODE_ERROR, message)
return

logger.info("[Job: %s] start to add job to queue..." % job)
project = job.project
repo = project.repo

superusers = User.objects.filter(is_superuser=True)
users = set([user for user in repo.get_members(Repository.PermissionEnum.ADMIN)] +
[user for user in repo.get_members(Repository.PermissionEnum.USER)] +
[user for user in superusers])

logger.info("[Job: %s] start to add job to queue..." % job)
# 1. Node管理员需要对Project有访问权限
# 2. 标签对应
nodes = Node.objects.filter(manager__in=users, exec_tags=tag)
nodes = cls.get_enable_nodes(job, tag)
if not nodes:
logger.warning("找不到符合运行环境标签[%s]且有权限的节点分配" % tag.name if tag else "None")
queue_set = []
Expand Down Expand Up @@ -114,7 +127,7 @@ def add_job_to_queue(cls, job):
message = "当前项目配置的运行环境标签[%s]没有机器资源可以运行工具[%s],请在扫描方案中调整\"运行环境\"的标签后重新启动。" % (
tag.name, none_node_task_str)
logger.warning(message)
JobCloseHandler.revoke_job(job, errcode.E_CLIENT_CONFIG_ERROR, message)
JobCloseHandler.revoke_job(job, errcode.E_USER_CONFIG_NODE_ERROR, message)


class JobCloseHandler(object):
Expand Down Expand Up @@ -163,7 +176,7 @@ def reclose_job(cls, job_id):
logger.exception("[Job: %s] 更新扫描任务状态失败: %s" % (job_id, err))
return
models.Job.objects.filter(id=job_id).update(
state=models.Job.StateEnum.RUNNING, result_code=None)
state=models.Job.StateEnum.CLOSING, result_code=None, closing_time=timezone.now())
cls.close_scan(job_id, reset=True)

@classmethod
Expand Down Expand Up @@ -201,10 +214,10 @@ def after_job_closed(cls, job_id, result_code, result_msg, result_data=None, res
cls.CodeMetricDupResultInfoSerializer),
("code_metric_cloc_info", CodeMetricClocInfo,
cls.CodeMetricClocResultInfoSerializer), ]
for key, model, serializer in info_list:
for key, model_cls, serializer in info_list:
info_data = result_data.get(key) if result_data else None
if info_data:
instance, _ = model.objects.get_or_create(project=project)
instance, _ = model_cls.objects.get_or_create(project=project)
slz = serializer(instance=instance, data=info_data)
if slz.is_valid():
slz.save()
Expand All @@ -230,7 +243,7 @@ def revoke_job(cls, job, result_code, result_msg):
).update(state=models.Job.StateEnum.CLOSING)
if nrows == 0:
return

logger.info("[Job: %s] revoke job, result: [%s]%s" % (job.id, result_code, result_msg))
job = models.Job.objects.get(id=job_id)
revoke_time = now()
for task in job.task_set.exclude(state=models.Task.StateEnum.CLOSED):
Expand All @@ -248,6 +261,7 @@ def revoke_job(cls, job, result_code, result_msg):
if nrow == 1: # race condition
# put this task to killingtask table
if task.node:
logger.info("[Task: %s][Node: %s] revoke job, update node to free state" % (task.id, task.node))
node_id = task.node.id # codepuppy 未上线_kill_task可能会导致继续给节点派发任务
Node.objects.filter(id=node_id).update(state=Node.StateEnum.FREE)
models.KillingTask.objects.create(node=task.node, task=task)
Expand Down Expand Up @@ -359,14 +373,17 @@ def close_job(cls, job_id, reclose=False):
logger.info("[Job: %d] checking older job unclosed..." % job_id)
try:
older_jobs = models.Job.objects.filter(
project=job.project, id__lt=job.id).exclude(state=models.Job.StateEnum.CLOSED)
project=job.project, id__lt=job.id
).exclude(
state=models.Job.StateEnum.CLOSED
)
if older_jobs:
logger.info("[Job: %d] canceling %d older scan jobs..." %
(job_id, older_jobs.count()))
logger.info("[Job: %d] canceling %d older scan jobs..." % (job_id, older_jobs.count()))
try:
for j in older_jobs:
logger.info(
"[Job: %d] canceling older job[%d]..." % (job_id, j.id))
if not j.check_redirect():
continue
logger.info("[Job: %d] canceling older job[%d]..." % (job_id, j.id))
result_msg = json.dumps(
{"job_id": job.id, "scan_id": scan_id, "msg": "plz check the other job's result"})
JobCloseHandler.revoke_job(j, errcode.CLIENT_REDIRECT, result_msg)
Expand Down Expand Up @@ -436,12 +453,21 @@ def save_task_result(cls, task_id, task_version, result_code, result_msg, result
logger.info("[Task: %s] 进程[%s]关闭数量: %s" % (task_id, processes, nrow))
if nrow == 0:
return

tp_relations = models.TaskProcessRelation.objects.filter(task_id=task_id)
if tp_relations.filter(state=models.TaskProcessRelation.StateEnum.RUNNING):
running_processes = tp_relations.filter(state=models.TaskProcessRelation.StateEnum.RUNNING)
waiting_processes = tp_relations.exclude(state=models.TaskProcessRelation.StateEnum.CLOSED)
if running_processes.count() > 0 and nrow != tp_relations.count():
logger.info("[Task: %s] 当前还有%s个进程正在运行: %s" % (
task_id, running_processes.count(), running_processes.values_list("process__name", flat=True)))
task_state = models.Task.StateEnum.RUNNING
elif tp_relations.exclude(state=models.TaskProcessRelation.StateEnum.CLOSED):
elif waiting_processes.count() > 0 and nrow != tp_relations.count():
logger.info("[Task: %s] 当前还有%s个进程处于等待状态: %s" % (
task_id, waiting_processes.count(), waiting_processes.values_list("process__name", flat=True)))
task_state = models.Task.StateEnum.WAITING
else:
logger.info("[Task: %s] 当前%s个进程全部关闭: %s" % (
task_id, tp_relations.count(), tp_relations.values_list("process__name", flat=True)))
task_state = models.Task.StateEnum.CLOSED
nrow = models.Task.objects.filter(
id=task_id,
Expand Down Expand Up @@ -471,9 +497,6 @@ def save_task_result(cls, task_id, task_version, result_code, result_msg, result
node=task.node
)
logger.info("[Job: %s][Task: %s] 关闭Task: %s" % (job_id, task_id, nrow))
if task.node: # 私有进程执行,可能不会记录node节点
Node.objects.filter(id=task.node.id, state=Node.StateEnum.BUSY).update(
state=Node.StateEnum.FREE)

else:
# 不存在processes,使用旧逻辑
Expand All @@ -498,6 +521,24 @@ def save_task_result(cls, task_id, task_version, result_code, result_msg, result
Node.objects.filter(id=task.node.id, state=Node.StateEnum.BUSY).update(
state=Node.StateEnum.FREE)

@classmethod
def check_closing_job(cls, job):
"""检查正在入库的任务
"""
if not models.Job.objects.filter(id=job.id, state=models.Job.StateEnum.CLOSING).exists():
return True
try:
response = AnalyseClient().api("scan_check", path_params=(job.project_id, job.scan_id,), data=None)
except Exception as err:
logger.exception("[Project: %s][Job: %s] scan check error, err: %s" % (job.project_id, job.id, err))
return True
if response.get("result") is True:
return True
logger.warning("[Project: %s][Job: %s] job closing check failed, reset job closing..." % (
job.project_id, job.id))
cls.reclose_job(job.id)
return False


class NodeTaskRegisterManager(object):
"""节点任务注册
Expand Down Expand Up @@ -527,7 +568,7 @@ def update_task_state_with_running(cls, task_id, node_id):
"""
task = models.Task.objects.get(id=task_id)
node = Node.objects.get(id=node_id)
logger.info("[Task: %s] start running with node:%s" % (task_id, node.name))
logger.info("[Task: %s] start running with node:%s(%s)" % (task_id, node.name, node.state))
current_time = now()
# 将task的process置为运行中的状态
models.TaskProcessRelation.objects.filter(
Expand All @@ -546,8 +587,7 @@ def update_task_state_with_running(cls, task_id, node_id):
if task.job.state in [models.Job.StateEnum.WAITING, models.Job.StateEnum.INITED]:
nrows = models.Job.objects.filter(id=task.job.id, state__in=[
models.Job.StateEnum.WAITING, models.Job.StateEnum.INITED
]).update(
state=models.Job.StateEnum.RUNNING)
]).update(state=models.Job.StateEnum.RUNNING)
if nrows > 0:
models.Job.objects.filter(id=task.job_id, start_time__isnull=True).update(
start_time=current_time
Expand All @@ -557,7 +597,7 @@ def update_task_state_with_running(cls, task_id, node_id):
def release_node(cls, node_id):
"""更新节点状态,调整为空闲状态
"""
models.Node.objects.select_related().filter(id=node_id, state=models.Node.StateEnum.BUSY).update(
models.Node.objects.filter(id=node_id, state=models.Node.StateEnum.BUSY).update(
state=models.Node.StateEnum.FREE)

@classmethod
Expand All @@ -568,22 +608,22 @@ def clean_closed_task_process_node(cls, job):
models.TaskProcessNodeQueue.objects.filter(task__job=job, task__state=models.Task.StateEnum.CLOSED).delete()

@classmethod
def set_node_busy_state(cls, node_id, **kwargs):
def set_node_busy_state(cls, node, **kwargs):
"""设置节点为忙碌状态
"""
# 查询并占用节点,如果节点忙碌,则返回空
if kwargs.get("free") is True:
logger.info("[NodeID: %s] update node free state" % node_id)
models.Node.objects.filter(id=node_id).update(state=models.Node.StateEnum.FREE)
logger.info("[Node: %s] update node free state" % node)
models.Node.objects.filter(id=node.id).update(state=models.Node.StateEnum.FREE)

node = models.Node.objects.select_related().filter(id=node_id, state=models.Node.StateEnum.FREE).first()
logger.debug("[NodeID: %s] filter free node: %s" % (node_id, node))
if not node:
free_node = models.Node.everything.filter(id=node.id, state=models.Node.StateEnum.FREE).first()
logger.debug("[Node: %s] filter free node %s" % (node, node.id))
if not free_node:
return False
nrows = models.Node.everything.filter(
id=node.id, state=models.Node.StateEnum.FREE
id=free_node.id, state=models.Node.StateEnum.FREE
).update(state=models.Node.StateEnum.BUSY)
logger.info("[NodeID: %s] update free node:%s busy state" % (node_id, node))
logger.info("[Node: %s] update free node %s to busy state" % (free_node, free_node.id))
if nrows == 1: # Node已有任务在执行,并未put result
return True
else:
Expand Down Expand Up @@ -759,8 +799,8 @@ def register_task_v1(cls, node, occupy):
if task:
if occupy: # 将任务相关状态更新为运行中
cls.update_task_process_state_with_running(task, processes, node)
logger.debug("[Task: %s] task with processes(%s) got by node[%d]" % (
task.id, ",".join([process.name for process in processes]), node.id))
logger.debug("[Task: %s] task with processes(%s) got by node[%s]" % (
task.id, ",".join([process.name for process in processes]), node))
else:
if occupy:
# 取不到任务则重新释放节点
Expand Down
24 changes: 24 additions & 0 deletions server/projects/main/apps/job/migrations/0005_job_ext_field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2021-2022 THL A29 Limited
# #
# This source code file is made available under MIT License
# See LICENSE for details
# ==============================================================================
# Generated by Django 3.1.14 on 2022-07-21 13:03

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('job', '0004_auto_20220729_1020'),
]

operations = [
migrations.AddField(
model_name='job',
name='ext_field',
field=models.JSONField(blank=True, null=True, verbose_name='扩展字段'),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2021-2022 THL A29 Limited
# #
# This source code file is made available under MIT License
# See LICENSE for details
# ==============================================================================
# Generated by Django 3.1.14 on 2022-07-26 13:53

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('job', '0005_job_ext_field'),
]

operations = [
migrations.AlterField(
model_name='task',
name='log_url',
field=models.TextField(blank=True, null=True, verbose_name='日志链接'),
),
migrations.AlterField(
model_name='taskprocessrelation',
name='log_url',
field=models.TextField(blank=True, null=True, verbose_name='日志链接'),
),
]
39 changes: 39 additions & 0 deletions server/projects/main/apps/job/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class StateEnum(object):
async_flag = models.BooleanField(default=False, verbose_name="异步启动标识", blank=True, null=True)
client_flag = models.BooleanField(verbose_name="客户端创建标识", blank=True, null=True)
creator = models.CharField(max_length=32, verbose_name="启动人", blank=True, null=True)
ext_field = models.JSONField(verbose_name="扩展字段", null=True, blank=True)

class Meta:
abstract = True
Expand All @@ -107,6 +108,44 @@ class Meta:
def get_project_id(self):
raise NotImplementedError

def add_field(self, field_info):
"""增加字段
"""
if self.ext_field:
for k, v in field_info.items():
self.ext_field[k] = v
else:
self.ext_field = field_info
self.save()

def disable_redirect(self):
"""禁止重定向
"""
self.add_field({"redirect": False})

def check_redirect(self):
"""检查是否支持重定向
不支持重定向时,会显式禁止
"""
if self.ext_field and self.ext_field.get("redirect") is False:
return False
else:
return True

def disable_reinit(self):
"""禁止重新初始化
"""
self.add_field({"reinit": False})

def check_reinit(self):
"""检查是否支持重新初始化
不支持重新初始化,会显式禁止
"""
if self.ext_field and self.ext_field.get("reinit") is False:
return False
else:
return True

@property
def waiting_time(self):
"""等待时间
Expand Down
Loading