diff --git a/common/config.py b/common/config.py index e70b498d95..d3ad43f835 100644 --- a/common/config.py +++ b/common/config.py @@ -67,8 +67,7 @@ def set(self, key, value): obj, created = Config.objects.update_or_create( item=key, defaults={"value": db_value} ) - if created: - self.sys_config.update({key: value}) + self.sys_config.update({key: value}) def replace(self, configs): result = {"status": 0, "msg": "ok", "data": []} diff --git a/conftest.py b/conftest.py index 69ba86761a..aab8637ce6 100644 --- a/conftest.py +++ b/conftest.py @@ -11,6 +11,7 @@ SqlWorkflowContent, QueryPrivilegesApply, ArchiveConfig, + InstanceTag, ) from common.config import SysConfig from sql.utils.workflow_audit import AuditV2, AuditSetting @@ -140,3 +141,10 @@ def fake_generate_audit_setting(mocker: MockFixture): ) mock_generate_audit_setting.return_value = fake_audit_setting yield mock_generate_audit_setting + + +@pytest.fixture +def instance_tag(db): + tag = InstanceTag.objects.create(tag_code="test_tag", tag_name="测试标签") + yield tag + tag.delete() diff --git a/sql/utils/sql_review.py b/sql/utils/sql_review.py index 771747f30d..78296247f6 100644 --- a/sql/utils/sql_review.py +++ b/sql/utils/sql_review.py @@ -10,50 +10,6 @@ from sql.utils.sql_utils import remove_comments -def is_auto_review(workflow_id): - """ - 判断SQL上线是否无需审批,无需审批的提交会自动审核通过 - :param workflow_id: - :return: - """ - - workflow = SqlWorkflow.objects.get(id=workflow_id) - auto_review_tags = SysConfig().get("auto_review_tag", "").split(",") - auto_review_db_type = SysConfig().get("auto_review_db_type", "").split(",") - # TODO 这里也可以放到engine中实现,但是配置项可能会相对复杂 - if ( - workflow.instance.db_type in auto_review_db_type - and workflow.instance.instance_tag.filter( - tag_code__in=auto_review_tags - ).exists() - ): - # 获取正则表达式 - auto_review_regex = SysConfig().get( - "auto_review_regex", "^alter|^create|^drop|^truncate|^rename|^delete" - ) - p = re.compile(auto_review_regex, re.I) - - # 判断是否匹配到需要手动审核的语句 - auto_review = True - all_affected_rows = 0 - review_content = workflow.sqlworkflowcontent.review_content - for review_row in json.loads(review_content): - review_result = ReviewResult(**review_row) - # 去除SQL注释 https://github.com/hhyo/Archery/issues/949 - sql = remove_comments(review_result.sql).replace("\n", "").replace("\r", "") - # 正则匹配 - if p.match(sql): - auto_review = False - break - # 影响行数加测, 总语句影响行数超过指定数量则需要人工审核 - all_affected_rows += int(review_result.affected_rows) - if all_affected_rows > int(SysConfig().get("auto_review_max_update_rows", 50)): - auto_review = False - else: - auto_review = False - return auto_review - - def can_execute(user, workflow_id): """ 判断用户当前是否可执行,两种情况下用户有执行权限 diff --git a/sql/utils/test_workflow_audit.py b/sql/utils/test_workflow_audit.py index 22dc8e4929..da1286efb9 100644 --- a/sql/utils/test_workflow_audit.py +++ b/sql/utils/test_workflow_audit.py @@ -1,4 +1,5 @@ import datetime +import json from unittest.mock import patch import pytest @@ -468,10 +469,11 @@ def test_generate_audit_setting_auto_review( ): sql_workflow, _ = sql_workflow setup_sys_config.set("auto_review", True) - mock_is_auto_review = mocker.patch( - "sql.utils.workflow_audit.is_auto_review", return_value=True - ) + audit = AuditV2(workflow=sql_workflow, sys_config=setup_sys_config) + mock_is_auto_review = mocker.patch.object( + audit, "is_auto_review", return_value=True + ) audit_setting = audit.generate_audit_setting() assert audit_setting.auto_pass is True mock_is_auto_review.assert_called() @@ -497,3 +499,46 @@ def test_get_workflow( audit_init_with_audit = AuditV2(audit=a.audit) assert audit_init_with_audit.workflow_type == a.workflow_type assert audit_init_with_audit.workflow == a.workflow + + +def test_auto_review_non_sql_review(sql_query_apply): + """当前自动审核仅对 SQL 上线工单生效""" + audit = AuditV2(workflow=sql_query_apply) + assert audit.is_auto_review() is False + + +def test_auto_review_not_applicable( + db_instance, sql_workflow, instance_tag, setup_sys_config +): + """未启用, 实例类型不匹配, 实例无对应标签, 正则匹配, 行数超规模""" + sql_workflow, _ = sql_workflow + # 未启用 + setup_sys_config.set("auto_review", False) + audit = AuditV2(workflow=sql_workflow, sys_config=setup_sys_config) + assert audit.is_auto_review() is False + setup_sys_config.set("auto_review", True) + # 实例类型不匹配 + db_instance.db_type = "redis" + db_instance.save() + audit.sys_config.set("auto_review_db_type", "mysql") + assert audit.is_auto_review() is False + audit.sys_config.set("auto_review_db_type", "redis") + # 实例无对应标签 + audit.sys_config.set("auto_review_tag", instance_tag.tag_code) + assert audit.is_auto_review() is False + db_instance.instance_tag.add(instance_tag) + # 匹配到高危语句 + audit.sys_config.set("auto_review_regex", "^drop") + audit.workflow.sqlworkflowcontent.sql_content = "drop table" + audit.workflow.sqlworkflowcontent.review_content = json.dumps( + [{"sql": "drop table", "affected_rows": 10}] + ) + audit.workflow.sqlworkflowcontent.save() + assert audit.is_auto_review() is False + audit.sys_config.set("auto_review_regex", "^select") + # 行数超规模 + audit.sys_config.set("auto_review_max_update_rows", 1) + assert audit.is_auto_review() is False + audit.sys_config.set("auto_review_max_update_rows", 1000) + # 全部条件满足, 自动审核通过 + assert audit.is_auto_review() is True diff --git a/sql/utils/tests.py b/sql/utils/tests.py index 5e490dad18..378b931b25 100644 --- a/sql/utils/tests.py +++ b/sql/utils/tests.py @@ -30,7 +30,6 @@ ) from sql.utils.resource_group import user_groups, user_instances, auth_group_users from sql.utils.sql_review import ( - is_auto_review, can_execute, can_timingtask, can_cancel, @@ -434,189 +433,6 @@ def tearDown(self): self.master.delete() self.sys_config.replace(json.dumps({})) - @patch("sql.engines.get_engine") - def test_auto_review_hit_review_regex( - self, - _get_engine, - ): - """ - 测试自动审批通过的判定条件,命中判断正则 - :return: - """ - # 开启自动审批设置 - self.sys_config.set("auto_review", "true") - self.sys_config.set("auto_review_db_type", "mysql") - self.sys_config.set("auto_review_regex", "^drop") # drop语句需要审批 - self.sys_config.set("auto_review_max_update_rows", "50") # update影响行数大于50需要审批 - self.sys_config.get_all_config() - # 修改工单为drop - self.wfc1.sql_content = "drop table users;" - self.wfc1.save(update_fields=("sql_content",)) - r = is_auto_review(self.wfc1.workflow_id) - self.assertFalse(r) - - @patch("sql.engines.mysql.MysqlEngine.execute_check") - @patch("sql.engines.get_engine") - def test_auto_review_gt_max_update_rows(self, _get_engine, _execute_check): - """ - 测试自动审批通过的判定条件,影响行数大于auto_review_max_update_rows - :return: - """ - # 开启自动审批设置 - self.sys_config.set("auto_review", "true") - self.sys_config.set("auto_review_db_type", "mysql") - self.sys_config.set("auto_review_regex", "^drop") # drop语句需要审批 - self.sys_config.set("auto_review_max_update_rows", "2") # update影响行数大于2需要审批 - self.sys_config.get_all_config() - # 修改工单为update - self.wfc1.sql_content = "update table users set email='';" - self.wfc1.save(update_fields=("sql_content",)) - # mock返回值,update影响行数=3 - _execute_check.return_value.to_dict.return_value = [ - { - "id": 1, - "stage": "CHECKED", - "errlevel": 0, - "stagestatus": "Audit completed", - "errormessage": "None", - "sql": "use archer_test", - "affected_rows": 0, - "sequence": "'0_0_0'", - "backup_dbname": "None", - "execute_time": "0", - "sqlsha1": "", - "actual_affected_rows": "null", - }, - { - "id": 2, - "stage": "CHECKED", - "errlevel": 0, - "stagestatus": "Audit completed", - "errormessage": "None", - "sql": "update table users set email=''", - "affected_rows": 3, - "sequence": "'0_0_1'", - "backup_dbname": "mysql_3306_archer_test", - "execute_time": "0", - "sqlsha1": "", - "actual_affected_rows": "null", - }, - ] - r = is_auto_review(self.wfc1.workflow_id) - self.assertFalse(r) - - @patch("sql.engines.get_engine") - def test_auto_review_true(self, _get_engine): - """ - 测试自动审批通过的判定条件, - :return: - """ - # 开启自动审批设置 - self.sys_config.set("auto_review", "true") - self.sys_config.set("auto_review_db_type", "mysql") - self.sys_config.set("auto_review_regex", "^drop") # drop语句需要审批 - self.sys_config.set("auto_review_max_update_rows", "2") # update影响行数大于2需要审批 - self.sys_config.set("auto_review_tag", "GA") # 仅GA开启自动审批 - self.sys_config.get_all_config() - # 修改工单为update,mock返回值,update影响行数=3 - self.wfc1.sql_content = "update table users set email='';" - self.wfc1.review_content = json.dumps( - [ - { - "id": 1, - "stage": "CHECKED", - "errlevel": 0, - "stagestatus": "Audit completed", - "errormessage": "None", - "sql": "use archer_test", - "affected_rows": 0, - "sequence": "'0_0_0'", - "backup_dbname": "None", - "execute_time": "0", - "sqlsha1": "", - "actual_affected_rows": "null", - }, - { - "id": 2, - "stage": "CHECKED", - "errlevel": 0, - "stagestatus": "Audit completed", - "errormessage": "None", - "sql": "update table users set email=''", - "affected_rows": 1, - "sequence": "'0_0_1'", - "backup_dbname": "mysql_3306_archer_test", - "execute_time": "0", - "sqlsha1": "", - "actual_affected_rows": "null", - }, - ] - ) - self.wfc1.save(update_fields=("sql_content", "review_content")) - # 修改工单实例标签 - tag, is_created = InstanceTag.objects.get_or_create( - tag_code="GA", defaults={"tag_name": "生产环境", "active": True} - ) - self.wf1.instance.instance_tag.add(tag) - r = is_auto_review(self.wfc1.workflow_id) - self.assertTrue(r) - - @patch("sql.engines.get_engine") - def test_auto_review_false(self, _get_engine): - """ - 测试自动审批通过的判定条件, - :return: - """ - # 开启自动审批设置 - self.sys_config.set("auto_review", "true") - self.sys_config.set("auto_review_db_type", "") # 未配置auto_review_db_type需要审批 - self.sys_config.set("auto_review_regex", "^drop") # drop语句需要审批 - self.sys_config.set("auto_review_max_update_rows", "2") # update影响行数大于2需要审批 - self.sys_config.set("auto_review_tag", "GA") # 仅GA开启自动审批 - self.sys_config.get_all_config() - # 修改工单为update,mock返回值,update影响行数=3 - self.wfc1.sql_content = "update table users set email='';" - self.wfc1.review_content = json.dumps( - [ - { - "id": 1, - "stage": "CHECKED", - "errlevel": 0, - "stagestatus": "Audit completed", - "errormessage": "None", - "sql": "use archer_test", - "affected_rows": 0, - "sequence": "'0_0_0'", - "backup_dbname": "None", - "execute_time": "0", - "sqlsha1": "", - "actual_affected_rows": "null", - }, - { - "id": 2, - "stage": "CHECKED", - "errlevel": 0, - "stagestatus": "Audit completed", - "errormessage": "None", - "sql": "update table users set email=''", - "affected_rows": 1, - "sequence": "'0_0_1'", - "backup_dbname": "mysql_3306_archer_test", - "execute_time": "0", - "sqlsha1": "", - "actual_affected_rows": "null", - }, - ] - ) - self.wfc1.save(update_fields=("sql_content", "review_content")) - # 修改工单实例标签 - tag, is_created = InstanceTag.objects.get_or_create( - tag_code="GA", defaults={"tag_name": "生产环境", "active": True} - ) - self.wf1.instance.instance_tag.add(tag) - r = is_auto_review(self.wfc1.workflow_id) - self.assertFalse(r) - def test_can_execute_for_resource_group( self, ): diff --git a/sql/utils/workflow_audit.py b/sql/utils/workflow_audit.py index 509138ebb0..7aab5fee9b 100644 --- a/sql/utils/workflow_audit.py +++ b/sql/utils/workflow_audit.py @@ -1,6 +1,8 @@ # -*- coding: UTF-8 -*- import dataclasses import importlib +import json +import re from dataclasses import dataclass, field from typing import Union, Optional, List import logging @@ -10,8 +12,8 @@ from django.core.exceptions import ObjectDoesNotExist from django.conf import settings +from sql.engines.models import ReviewResult from sql.utils.resource_group import user_groups, auth_group_users -from sql.utils.sql_review import is_auto_review from common.utils.const import WorkflowStatus, WorkflowType, WorkflowAction from sql.models import ( WorkflowAudit, @@ -25,7 +27,7 @@ ArchiveConfig, ) from common.config import SysConfig - +from sql.utils.sql_utils import remove_comments logger = logging.getLogger("default") @@ -141,12 +143,52 @@ def get_workflow(self): self.resource_group = self.audit.group_name self.resource_group_id = self.audit.group_id + def is_auto_review(self) -> bool: + if self.workflow_type != WorkflowType.SQL_REVIEW: + # 当前自动审核仅对 sql 上线工单有用 + return False + auto_review_enabled = self.sys_config.get("auto_review", False) + if not auto_review_enabled: + return False + auto_review_tags = self.sys_config.get("auto_review_tag", "").split(",") + auto_review_db_type = self.sys_config.get("auto_review_db_type", "").split(",") + # TODO 这里也可以放到engine中实现,但是配置项可能会相对复杂 + if self.workflow.instance.db_type not in auto_review_db_type: + return False + if not self.workflow.instance.instance_tag.filter( + tag_code__in=auto_review_tags + ).exists(): + return False + + # 获取正则表达式 + auto_review_regex = self.sys_config.get( + "auto_review_regex", "^alter|^create|^drop|^truncate|^rename|^delete" + ) + p = re.compile(auto_review_regex, re.I) + + # 判断是否匹配到需要手动审核的语句 + all_affected_rows = 0 + review_content = self.workflow.sqlworkflowcontent.review_content + for review_row in json.loads(review_content): + review_result = ReviewResult(**review_row) + # 去除SQL注释 https://github.com/hhyo/Archery/issues/949 + sql = remove_comments(review_result.sql).replace("\n", "").replace("\r", "") + # 正则匹配 + if p.match(sql): + # 匹配成功, 代表需要人工复核 + return False + # 影响行数加测, 总语句影响行数超过指定数量则需要人工审核 + all_affected_rows += int(review_result.affected_rows) + if all_affected_rows > int( + self.sys_config.get("auto_review_max_update_rows", 50) + ): + # 影响行数超规模, 需要人工审核 + return False + return True + def generate_audit_setting(self) -> AuditSetting: - if self.workflow_type == WorkflowType.SQL_REVIEW: - if self.sys_config.get("auto_review", False): - # 判断是否无需审批 - if is_auto_review(self.workflow.id): - return AuditSetting(auto_pass=True, audit_auth_groups=["无需审批"]) + if self.is_auto_review(): + return AuditSetting(auto_pass=True, audit_auth_groups=["无需审批"]) if self.workflow_type in [WorkflowType.SQL_REVIEW, WorkflowType.QUERY]: group_id = self.workflow.group_id diff --git a/sql_api/serializers.py b/sql_api/serializers.py index 67834a58c2..9bffb5166f 100644 --- a/sql_api/serializers.py +++ b/sql_api/serializers.py @@ -431,7 +431,7 @@ def create(self, validated_data): # 有时候提交后自动审批通过, 在这里改写一下 workflow 状态 if auditor.audit.current_status == WorkflowStatus.PASSED: auditor.workflow.status = "workflow_review_pass" - auditor.workflow.save() + auditor.workflow.save() return workflow_content class Meta: