Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复审批流创建时的若干bug #2400

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ def set(self, key, value):
obj, created = Config.objects.update_or_create(
item=key, defaults={"value": db_value}
)
if created:
self.sys_config.update({key: value})
self.sys_config.update({key: value})

def replace(self, configs):
result = {"status": 0, "msg": "ok", "data": []}
Expand Down
8 changes: 8 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
SqlWorkflowContent,
QueryPrivilegesApply,
ArchiveConfig,
InstanceTag,
)
from common.config import SysConfig
from sql.utils.workflow_audit import AuditV2, AuditSetting
Expand Down Expand Up @@ -140,3 +141,10 @@ def fake_generate_audit_setting(mocker: MockFixture):
)
mock_generate_audit_setting.return_value = fake_audit_setting
yield mock_generate_audit_setting


@pytest.fixture
def instance_tag(db):
tag = InstanceTag.objects.create(tag_code="test_tag", tag_name="测试标签")
yield tag
tag.delete()
44 changes: 0 additions & 44 deletions sql/utils/sql_review.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,6 @@
from sql.utils.sql_utils import remove_comments


def is_auto_review(workflow_id):
"""
判断SQL上线是否无需审批,无需审批的提交会自动审核通过
:param workflow_id:
:return:
"""

workflow = SqlWorkflow.objects.get(id=workflow_id)
auto_review_tags = SysConfig().get("auto_review_tag", "").split(",")
auto_review_db_type = SysConfig().get("auto_review_db_type", "").split(",")
# TODO 这里也可以放到engine中实现,但是配置项可能会相对复杂
if (
workflow.instance.db_type in auto_review_db_type
and workflow.instance.instance_tag.filter(
tag_code__in=auto_review_tags
).exists()
):
# 获取正则表达式
auto_review_regex = SysConfig().get(
"auto_review_regex", "^alter|^create|^drop|^truncate|^rename|^delete"
)
p = re.compile(auto_review_regex, re.I)

# 判断是否匹配到需要手动审核的语句
auto_review = True
all_affected_rows = 0
review_content = workflow.sqlworkflowcontent.review_content
for review_row in json.loads(review_content):
review_result = ReviewResult(**review_row)
# 去除SQL注释 https://github.com/hhyo/Archery/issues/949
sql = remove_comments(review_result.sql).replace("\n", "").replace("\r", "")
# 正则匹配
if p.match(sql):
auto_review = False
break
# 影响行数加测, 总语句影响行数超过指定数量则需要人工审核
all_affected_rows += int(review_result.affected_rows)
if all_affected_rows > int(SysConfig().get("auto_review_max_update_rows", 50)):
auto_review = False
else:
auto_review = False
return auto_review


def can_execute(user, workflow_id):
"""
判断用户当前是否可执行,两种情况下用户有执行权限
Expand Down
51 changes: 48 additions & 3 deletions sql/utils/test_workflow_audit.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import json
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -468,10 +469,11 @@ def test_generate_audit_setting_auto_review(
):
sql_workflow, _ = sql_workflow
setup_sys_config.set("auto_review", True)
mock_is_auto_review = mocker.patch(
"sql.utils.workflow_audit.is_auto_review", return_value=True
)

audit = AuditV2(workflow=sql_workflow, sys_config=setup_sys_config)
mock_is_auto_review = mocker.patch.object(
audit, "is_auto_review", return_value=True
)
audit_setting = audit.generate_audit_setting()
assert audit_setting.auto_pass is True
mock_is_auto_review.assert_called()
Expand All @@ -497,3 +499,46 @@ def test_get_workflow(
audit_init_with_audit = AuditV2(audit=a.audit)
assert audit_init_with_audit.workflow_type == a.workflow_type
assert audit_init_with_audit.workflow == a.workflow


def test_auto_review_non_sql_review(sql_query_apply):
"""当前自动审核仅对 SQL 上线工单生效"""
audit = AuditV2(workflow=sql_query_apply)
assert audit.is_auto_review() is False


def test_auto_review_not_applicable(
db_instance, sql_workflow, instance_tag, setup_sys_config
):
"""未启用, 实例类型不匹配, 实例无对应标签, 正则匹配, 行数超规模"""
sql_workflow, _ = sql_workflow
# 未启用
setup_sys_config.set("auto_review", False)
audit = AuditV2(workflow=sql_workflow, sys_config=setup_sys_config)
assert audit.is_auto_review() is False
setup_sys_config.set("auto_review", True)
# 实例类型不匹配
db_instance.db_type = "redis"
db_instance.save()
audit.sys_config.set("auto_review_db_type", "mysql")
assert audit.is_auto_review() is False
audit.sys_config.set("auto_review_db_type", "redis")
# 实例无对应标签
audit.sys_config.set("auto_review_tag", instance_tag.tag_code)
assert audit.is_auto_review() is False
db_instance.instance_tag.add(instance_tag)
# 匹配到高危语句
audit.sys_config.set("auto_review_regex", "^drop")
audit.workflow.sqlworkflowcontent.sql_content = "drop table"
audit.workflow.sqlworkflowcontent.review_content = json.dumps(
[{"sql": "drop table", "affected_rows": 10}]
)
audit.workflow.sqlworkflowcontent.save()
assert audit.is_auto_review() is False
audit.sys_config.set("auto_review_regex", "^select")
# 行数超规模
audit.sys_config.set("auto_review_max_update_rows", 1)
assert audit.is_auto_review() is False
audit.sys_config.set("auto_review_max_update_rows", 1000)
# 全部条件满足, 自动审核通过
assert audit.is_auto_review() is True
184 changes: 0 additions & 184 deletions sql/utils/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
)
from sql.utils.resource_group import user_groups, user_instances, auth_group_users
from sql.utils.sql_review import (
is_auto_review,
can_execute,
can_timingtask,
can_cancel,
Expand Down Expand Up @@ -434,189 +433,6 @@ def tearDown(self):
self.master.delete()
self.sys_config.replace(json.dumps({}))

@patch("sql.engines.get_engine")
def test_auto_review_hit_review_regex(
self,
_get_engine,
):
"""
测试自动审批通过的判定条件,命中判断正则
:return:
"""
# 开启自动审批设置
self.sys_config.set("auto_review", "true")
self.sys_config.set("auto_review_db_type", "mysql")
self.sys_config.set("auto_review_regex", "^drop") # drop语句需要审批
self.sys_config.set("auto_review_max_update_rows", "50") # update影响行数大于50需要审批
self.sys_config.get_all_config()
# 修改工单为drop
self.wfc1.sql_content = "drop table users;"
self.wfc1.save(update_fields=("sql_content",))
r = is_auto_review(self.wfc1.workflow_id)
self.assertFalse(r)

@patch("sql.engines.mysql.MysqlEngine.execute_check")
@patch("sql.engines.get_engine")
def test_auto_review_gt_max_update_rows(self, _get_engine, _execute_check):
"""
测试自动审批通过的判定条件,影响行数大于auto_review_max_update_rows
:return:
"""
# 开启自动审批设置
self.sys_config.set("auto_review", "true")
self.sys_config.set("auto_review_db_type", "mysql")
self.sys_config.set("auto_review_regex", "^drop") # drop语句需要审批
self.sys_config.set("auto_review_max_update_rows", "2") # update影响行数大于2需要审批
self.sys_config.get_all_config()
# 修改工单为update
self.wfc1.sql_content = "update table users set email='';"
self.wfc1.save(update_fields=("sql_content",))
# mock返回值,update影响行数=3
_execute_check.return_value.to_dict.return_value = [
{
"id": 1,
"stage": "CHECKED",
"errlevel": 0,
"stagestatus": "Audit completed",
"errormessage": "None",
"sql": "use archer_test",
"affected_rows": 0,
"sequence": "'0_0_0'",
"backup_dbname": "None",
"execute_time": "0",
"sqlsha1": "",
"actual_affected_rows": "null",
},
{
"id": 2,
"stage": "CHECKED",
"errlevel": 0,
"stagestatus": "Audit completed",
"errormessage": "None",
"sql": "update table users set email=''",
"affected_rows": 3,
"sequence": "'0_0_1'",
"backup_dbname": "mysql_3306_archer_test",
"execute_time": "0",
"sqlsha1": "",
"actual_affected_rows": "null",
},
]
r = is_auto_review(self.wfc1.workflow_id)
self.assertFalse(r)

@patch("sql.engines.get_engine")
def test_auto_review_true(self, _get_engine):
"""
测试自动审批通过的判定条件,
:return:
"""
# 开启自动审批设置
self.sys_config.set("auto_review", "true")
self.sys_config.set("auto_review_db_type", "mysql")
self.sys_config.set("auto_review_regex", "^drop") # drop语句需要审批
self.sys_config.set("auto_review_max_update_rows", "2") # update影响行数大于2需要审批
self.sys_config.set("auto_review_tag", "GA") # 仅GA开启自动审批
self.sys_config.get_all_config()
# 修改工单为update,mock返回值,update影响行数=3
self.wfc1.sql_content = "update table users set email='';"
self.wfc1.review_content = json.dumps(
[
{
"id": 1,
"stage": "CHECKED",
"errlevel": 0,
"stagestatus": "Audit completed",
"errormessage": "None",
"sql": "use archer_test",
"affected_rows": 0,
"sequence": "'0_0_0'",
"backup_dbname": "None",
"execute_time": "0",
"sqlsha1": "",
"actual_affected_rows": "null",
},
{
"id": 2,
"stage": "CHECKED",
"errlevel": 0,
"stagestatus": "Audit completed",
"errormessage": "None",
"sql": "update table users set email=''",
"affected_rows": 1,
"sequence": "'0_0_1'",
"backup_dbname": "mysql_3306_archer_test",
"execute_time": "0",
"sqlsha1": "",
"actual_affected_rows": "null",
},
]
)
self.wfc1.save(update_fields=("sql_content", "review_content"))
# 修改工单实例标签
tag, is_created = InstanceTag.objects.get_or_create(
tag_code="GA", defaults={"tag_name": "生产环境", "active": True}
)
self.wf1.instance.instance_tag.add(tag)
r = is_auto_review(self.wfc1.workflow_id)
self.assertTrue(r)

@patch("sql.engines.get_engine")
def test_auto_review_false(self, _get_engine):
"""
测试自动审批通过的判定条件,
:return:
"""
# 开启自动审批设置
self.sys_config.set("auto_review", "true")
self.sys_config.set("auto_review_db_type", "") # 未配置auto_review_db_type需要审批
self.sys_config.set("auto_review_regex", "^drop") # drop语句需要审批
self.sys_config.set("auto_review_max_update_rows", "2") # update影响行数大于2需要审批
self.sys_config.set("auto_review_tag", "GA") # 仅GA开启自动审批
self.sys_config.get_all_config()
# 修改工单为update,mock返回值,update影响行数=3
self.wfc1.sql_content = "update table users set email='';"
self.wfc1.review_content = json.dumps(
[
{
"id": 1,
"stage": "CHECKED",
"errlevel": 0,
"stagestatus": "Audit completed",
"errormessage": "None",
"sql": "use archer_test",
"affected_rows": 0,
"sequence": "'0_0_0'",
"backup_dbname": "None",
"execute_time": "0",
"sqlsha1": "",
"actual_affected_rows": "null",
},
{
"id": 2,
"stage": "CHECKED",
"errlevel": 0,
"stagestatus": "Audit completed",
"errormessage": "None",
"sql": "update table users set email=''",
"affected_rows": 1,
"sequence": "'0_0_1'",
"backup_dbname": "mysql_3306_archer_test",
"execute_time": "0",
"sqlsha1": "",
"actual_affected_rows": "null",
},
]
)
self.wfc1.save(update_fields=("sql_content", "review_content"))
# 修改工单实例标签
tag, is_created = InstanceTag.objects.get_or_create(
tag_code="GA", defaults={"tag_name": "生产环境", "active": True}
)
self.wf1.instance.instance_tag.add(tag)
r = is_auto_review(self.wfc1.workflow_id)
self.assertFalse(r)

def test_can_execute_for_resource_group(
self,
):
Expand Down
Loading
Loading