From 6f6dad636dace479f160d7d3148a74bc195c7a81 Mon Sep 17 00:00:00 2001 From: YSevenK Date: Fri, 26 Jul 2024 15:37:26 +0800 Subject: [PATCH 01/10] unittest for scene.py --- test/__init__.py | 17 +++++++++ test/common/test_scene.py | 76 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 test/__init__.py create mode 100644 test/common/test_scene.py diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 00000000..4f2405d9 --- /dev/null +++ b/test/__init__.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -* +# Copyright (c) 2022 OceanBase +# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +@time: 2022/6/20 +@file: __init__.py +@desc: +""" diff --git a/test/common/test_scene.py b/test/common/test_scene.py new file mode 100644 index 00000000..f3d866c8 --- /dev/null +++ b/test/common/test_scene.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -* +# Copyright (c) 2022 OceanBase +# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +@time: 2024/01/16 +@file: scene.py +@desc: +""" + +import unittest +from unittest.mock import MagicMock +from common.scene import filter_by_version + + +class TestFilterByVersion(unittest.TestCase): + def setUp(self): + self.stdio = MagicMock() + self.scene = [{"version": "[1.0.0,2.0.0)"}, {"version": "(1.0.0,2.0.0]"}] + self.cluster = {"version": "1.5.0"} + + def test_filter_by_version_with_valid_version(self): + # Test case where cluster version is within the range specified in the scene + result = filter_by_version(self.scene, self.cluster, self.stdio) + self.assertEqual(result, 1) + + def test_filter_by_version_with_invalid_version(self): + # Test case where cluster version is outside the range specified in the scene + self.cluster["version"] = "0.5.0" + result = filter_by_version(self.scene, self.cluster, self.stdio) + self.assertEqual(result, 0) + + def test_filter_by_version_with_wildcard_min_version(self): + # Test case where min version is wildcard (*) and cluster version is valid + self.scene[0]["version"] = "[*,2.0.0)" + result = filter_by_version(self.scene, self.cluster, self.stdio) + self.assertEqual(result, 1) + + def test_filter_by_version_with_wildcard_max_version(self): + # Test case where max version is wildcard (*) and cluster version is valid + self.scene[1]["version"] = "(1.0.0,*]" + result = filter_by_version(self.scene, self.cluster, self.stdio) + self.assertEqual(result, 2) + + def test_filter_by_version_with_non_string_version(self): + # Test case where version is not a string + self.scene[0]["version"] = str(1.0) + with self.assertRaises(Exception) as context: + filter_by_version(self.scene, self.cluster, self.stdio) + self.assertTrue( + "filter_by_version steps_version Exception" in str(context.exception) + ) + + def test_filter_by_version_no_version_in_cluster(self): + # Test case where version is not specified in the cluster + del self.cluster["version"] + result = filter_by_version(self.scene, self.cluster, self.stdio) + self.assertEqual(result, 0) + + def test_filter_by_version_no_version_in_steps(self): + # Test case where no version is specified in any steps + self.scene = [{"some_key": "some_value"}] + result = filter_by_version(self.scene, self.cluster, self.stdio) + self.assertEqual(result, -1) + + +if __name__ == "__main__": + unittest.main() From 8cdef7ec03a842965d1c5651adb401f5b248a684 Mon Sep 17 00:00:00 2001 From: YSevenK Date: Fri, 26 Jul 2024 15:49:32 +0800 Subject: [PATCH 02/10] unittest for common/scene.py --- test/common/test_scene.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/common/test_scene.py b/test/common/test_scene.py index f3d866c8..135447df 100644 --- a/test/common/test_scene.py +++ b/test/common/test_scene.py @@ -55,9 +55,7 @@ def test_filter_by_version_with_non_string_version(self): self.scene[0]["version"] = str(1.0) with self.assertRaises(Exception) as context: filter_by_version(self.scene, self.cluster, self.stdio) - self.assertTrue( - "filter_by_version steps_version Exception" in str(context.exception) - ) + self.assertTrue("filter_by_version steps_version Exception" in str(context.exception)) def test_filter_by_version_no_version_in_cluster(self): # Test case where version is not specified in the cluster From 5080cbbc7e68de6960f9a12efda54459cbb17a04 Mon Sep 17 00:00:00 2001 From: YSevenK Date: Thu, 1 Aug 2024 11:05:18 +0800 Subject: [PATCH 03/10] =?UTF-8?q?=E4=B8=BAcommon=E5=8C=85=E4=B8=8Bcommand?= =?UTF-8?q?=EF=BC=8Cconfig=5Fhelper=E7=94=9F=E6=88=90=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/common/test_command.py | 109 ++++++++++++++++++++++++++++++ test/common/test_config_helper.py | 62 +++++++++++++++++ 2 files changed, 171 insertions(+) create mode 100644 test/common/test_command.py create mode 100644 test/common/test_config_helper.py diff --git a/test/common/test_command.py b/test/common/test_command.py new file mode 100644 index 00000000..8320a651 --- /dev/null +++ b/test/common/test_command.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -* +# Copyright (c) 2022 OceanBase +# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + +import unittest +from unittest.mock import MagicMock +from common.command import * + +# 测试类,用于测试common.command模块中的函数 +class TestCommonCommand(unittest.TestCase): + + # 准备测试环境,创建模拟的SSH客户端和标准输入输出对象 + def setUp(self): + self.ssh_client = MagicMock() + self.stdio = MagicMock() + + # 测试下载文件功能 + def test_download_file(self): + # 设置远程和本地文件路径 + remote_path = "/remote/path/file.txt" + local_path = "/local/path/file.txt" + + # 执行下载文件操作 + result = download_file(self.ssh_client, remote_path, local_path, self.stdio) + + # 验证是否正确调用了SSH客户端的下载方法,并检查返回结果是否为本地文件路径 + self.ssh_client.download.assert_called_once_with(remote_path, local_path) + self.assertEqual(result, local_path) + + # 测试上传文件功能 + def test_upload_file(self): + # 设置本地和远程文件路径 + local_path = "/local/path/file.txt" + remote_path = "/remote/path/file.txt" + + # 执行上传文件操作 + upload_file(self.ssh_client, local_path, remote_path, self.stdio) + + # 验证是否正确调用了SSH客户端的上传方法 + self.ssh_client.upload.assert_called_once_with(remote_path, local_path) + + # 测试远程删除文件夹功能 + def test_rm_rf_file(self): + # 设置远程文件夹路径 + dir = "/remote/path" + + # 执行删除操作 + rm_rf_file(self.ssh_client, dir, self.stdio) + + # 验证是否正确调用了SSH客户端的执行命令方法,并传入了正确的删除命令 + self.ssh_client.exec_cmd.assert_called_once_with(f"rm -rf {dir}") + + # 测试删除文件夹中文件的功能 + def test_delete_file_in_folder(self): + # 设置远程文件夹路径 + file_path = "/remote/path/folder" + + # 执行删除操作并期望抛出异常 + with self.assertRaises(Exception) as context: + delete_file_in_folder(self.ssh_client, file_path, self.stdio) + + # 验证异常中是否包含特定的字符串 + self.assertTrue("gather_pack" in str(context.exception)) + + # 测试判断远程目录是否为空 + def test_is_empty_dir(self): + # 设置远程目录路径 + dir_path = "/remote/path/dir" + + # 执行判断操作,并设置返回值为"1",表示目录不为空 + self.ssh_client.exec_cmd.return_value = "1" + result = is_empty_dir(self.ssh_client, dir_path, self.stdio) + + # 验证是否正确执行了命令,并判断结果是否为False + self.ssh_client.exec_cmd.assert_called_once_with(f"ls -A {dir_path}|wc -w") + self.assertFalse(result) + + # 执行判断操作,并设置返回值为"0",表示目录为空 + self.ssh_client.exec_cmd.return_value = "0" + result = is_empty_dir(self.ssh_client, dir_path, self.stdio) + + # 判断结果是否为True + self.assertTrue(result) + + # 测试获取文件起始时间的功能 + def test_get_file_start_time(self): + # 设置文件名和远程目录路径 + file_name = "file.log" + dir = "/remote/path" + + # 执行获取起始时间操作,并设置返回值为特定的时间字符串 + self.ssh_client.exec_cmd.return_value = "2022-01-01 00:00:00" + result = get_file_start_time(self.ssh_client, file_name, dir, self.stdio) + + # 验证是否正确执行了命令,并检查返回结果是否为预期的时间字符串 + self.ssh_client.exec_cmd.assert_called_once_with(f"head -n 1 {dir}/{file_name}") + self.assertEqual(result, "2022-01-01 00:00:00") + + +if __name__ == '__main__': + unittest.main() diff --git a/test/common/test_config_helper.py b/test/common/test_config_helper.py new file mode 100644 index 00000000..5cbf90a7 --- /dev/null +++ b/test/common/test_config_helper.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -* +# Copyright (c) 2022 OceanBase +# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +import unittest +from unittest.mock import patch, MagicMock +from common.config_helper import ConfigHelper + + +class TestConfigHelper(unittest.TestCase): + def setUp(self): + self.context = MagicMock() + self.context.stdio = MagicMock() + self.context.options = MagicMock() + self.context.inner_config = MagicMock() + self.config_helper = ConfigHelper(self.context) + + @patch('common.config_helper.get_observer_version') + @patch('common.config_helper.OBConnector') + def test_get_cluster_name(self, mock_connector, mock_get_observer_version): + mock_get_observer_version.return_value = "3.0.0" + mock_connector_instance = mock_connector.return_value + mock_connector_instance.execute_sql.return_value = [("cluster_name",)] + + cluster_name = self.config_helper.get_cluster_name() + + mock_connector.assert_called_once() + mock_connector_instance.execute_sql.assert_called_once_with("select cluster_name from oceanbase.v$ob_cluster") + self.assertEqual(cluster_name, "cluster_name") + + @patch('common.config_helper.get_observer_version') + @patch('common.config_helper.OBConnector') + def test_get_host_info_list_by_cluster(self, mock_connector, mock_get_observer_version): + mock_get_observer_version.return_value = "3.0.0" + mock_connector_instance = mock_connector.return_value + mock_connector_instance.execute_sql.return_value = [("192.168.1.1", 8080, "zone1", "build_version")] + + host_info_list = self.config_helper.get_host_info_list_by_cluster() + + mock_connector.assert_called_once() + mock_connector_instance.execute_sql.assert_called_once_with("select SVR_IP, SVR_PORT, ZONE, BUILD_VERSION from oceanbase.v$ob_cluster") + self.assertEqual(len(host_info_list), 1) + self.assertEqual(host_info_list[0], {"ip": "192.168.1.1"}) + + @patch('common.config_helper.get_observer_version') + @patch('common.config_helper.OBConnector') + def test_build_configuration(self, mock_connector, mock_get_observer_version): + mock_get_observer_version.return_value = "3.0.0" + self.config_helper.build_configuration() + + +if __name__ == '__main__': + unittest.main() From 6e6e6b3cd58dd3afdeb366f92408d8c7d1eec5bd Mon Sep 17 00:00:00 2001 From: YSevenK Date: Thu, 1 Aug 2024 11:10:47 +0800 Subject: [PATCH 04/10] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=B1=BB=E6=B3=A8=E9=87=8A,test/common/test=5Fcommand.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/common/test_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/test_command.py b/test/common/test_command.py index 8320a651..4dbaf57c 100644 --- a/test/common/test_command.py +++ b/test/common/test_command.py @@ -14,7 +14,7 @@ from unittest.mock import MagicMock from common.command import * -# 测试类,用于测试common.command模块中的函数 + class TestCommonCommand(unittest.TestCase): # 准备测试环境,创建模拟的SSH客户端和标准输入输出对象 From 6191d98e4f878b5adbb435de0f053aff0d100c14 Mon Sep 17 00:00:00 2001 From: YSevenK Date: Tue, 6 Aug 2024 14:18:33 +0800 Subject: [PATCH 05/10] fix test bug --- test/common/test_command.py | 158 +++++++++++++----------------------- 1 file changed, 55 insertions(+), 103 deletions(-) diff --git a/test/common/test_command.py b/test/common/test_command.py index 4dbaf57c..ee94a176 100644 --- a/test/common/test_command.py +++ b/test/common/test_command.py @@ -1,109 +1,61 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -* -# Copyright (c) 2022 OceanBase -# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -# See the Mulan PSL v2 for more details. - import unittest -from unittest.mock import MagicMock -from common.command import * - - -class TestCommonCommand(unittest.TestCase): +from unittest.mock import Mock, patch +import subprocess +from common.command import LocalClient # 请替换为实际的模块路径 - # 准备测试环境,创建模拟的SSH客户端和标准输入输出对象 +class TestLocalClient(unittest.TestCase): def setUp(self): - self.ssh_client = MagicMock() - self.stdio = MagicMock() - - # 测试下载文件功能 - def test_download_file(self): - # 设置远程和本地文件路径 - remote_path = "/remote/path/file.txt" - local_path = "/local/path/file.txt" - - # 执行下载文件操作 - result = download_file(self.ssh_client, remote_path, local_path, self.stdio) - - # 验证是否正确调用了SSH客户端的下载方法,并检查返回结果是否为本地文件路径 - self.ssh_client.download.assert_called_once_with(remote_path, local_path) - self.assertEqual(result, local_path) - - # 测试上传文件功能 - def test_upload_file(self): - # 设置本地和远程文件路径 - local_path = "/local/path/file.txt" - remote_path = "/remote/path/file.txt" - - # 执行上传文件操作 - upload_file(self.ssh_client, local_path, remote_path, self.stdio) - - # 验证是否正确调用了SSH客户端的上传方法 - self.ssh_client.upload.assert_called_once_with(remote_path, local_path) - - # 测试远程删除文件夹功能 - def test_rm_rf_file(self): - # 设置远程文件夹路径 - dir = "/remote/path" - - # 执行删除操作 - rm_rf_file(self.ssh_client, dir, self.stdio) - - # 验证是否正确调用了SSH客户端的执行命令方法,并传入了正确的删除命令 - self.ssh_client.exec_cmd.assert_called_once_with(f"rm -rf {dir}") - - # 测试删除文件夹中文件的功能 - def test_delete_file_in_folder(self): - # 设置远程文件夹路径 - file_path = "/remote/path/folder" - - # 执行删除操作并期望抛出异常 - with self.assertRaises(Exception) as context: - delete_file_in_folder(self.ssh_client, file_path, self.stdio) - - # 验证异常中是否包含特定的字符串 - self.assertTrue("gather_pack" in str(context.exception)) - - # 测试判断远程目录是否为空 - def test_is_empty_dir(self): - # 设置远程目录路径 - dir_path = "/remote/path/dir" - - # 执行判断操作,并设置返回值为"1",表示目录不为空 - self.ssh_client.exec_cmd.return_value = "1" - result = is_empty_dir(self.ssh_client, dir_path, self.stdio) - - # 验证是否正确执行了命令,并判断结果是否为False - self.ssh_client.exec_cmd.assert_called_once_with(f"ls -A {dir_path}|wc -w") - self.assertFalse(result) - - # 执行判断操作,并设置返回值为"0",表示目录为空 - self.ssh_client.exec_cmd.return_value = "0" - result = is_empty_dir(self.ssh_client, dir_path, self.stdio) - - # 判断结果是否为True - self.assertTrue(result) - - # 测试获取文件起始时间的功能 - def test_get_file_start_time(self): - # 设置文件名和远程目录路径 - file_name = "file.log" - dir = "/remote/path" - - # 执行获取起始时间操作,并设置返回值为特定的时间字符串 - self.ssh_client.exec_cmd.return_value = "2022-01-01 00:00:00" - result = get_file_start_time(self.ssh_client, file_name, dir, self.stdio) - - # 验证是否正确执行了命令,并检查返回结果是否为预期的时间字符串 - self.ssh_client.exec_cmd.assert_called_once_with(f"head -n 1 {dir}/{file_name}") - self.assertEqual(result, "2022-01-01 00:00:00") - + self.stdio = Mock() + self.local_client = LocalClient(stdio=self.stdio) + + @patch('subprocess.Popen') + def test_run_success(self, mock_popen): + # 模拟命令成功执行 + mock_process = Mock() + mock_process.communicate.return_value = (b'success', None) + mock_popen.return_value = mock_process + + cmd = 'echo "hello"' + result = self.local_client.run(cmd) + + # 验证 verbose 和 Popen 调用 + self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") + mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') + + # 验证结果 + self.assertEqual(result, b'success') + + @patch('subprocess.Popen') + def test_run_failure(self, mock_popen): + # 模拟命令执行失败 + mock_process = Mock() + mock_process.communicate.return_value = (b'', b'error') + mock_popen.return_value = mock_process + + cmd = 'echo "hello"' + result = self.local_client.run(cmd) + + # 验证 verbose 和 Popen 调用 + self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") + mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') + + # 验证错误处理 + self.stdio.error.assert_called_with("run cmd = [echo \"hello\"] on localhost, stderr=[b'error']") + self.assertEqual(result, b'') + + @patch('subprocess.Popen') + def test_run_exception(self, mock_popen): + # 模拟命令执行时抛出异常 + mock_popen.side_effect = Exception('Test exception') + + cmd = 'echo "hello"' + result = self.local_client.run(cmd) + + # 验证 verbose 调用和异常处理 + self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") + self.stdio.error.assert_called_with("run cmd = [echo \"hello\"] on localhost") + self.assertIsNone(result) + if __name__ == '__main__': unittest.main() From f897245a681d6955c7ed7ccf60df8d4423776809 Mon Sep 17 00:00:00 2001 From: YSevenK Date: Tue, 6 Aug 2024 14:43:24 +0800 Subject: [PATCH 06/10] fix bug --- test/common/test_command.py | 56 +++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/test/common/test_command.py b/test/common/test_command.py index ee94a176..75fa6c96 100644 --- a/test/common/test_command.py +++ b/test/common/test_command.py @@ -55,7 +55,63 @@ def test_run_exception(self, mock_popen): self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") self.stdio.error.assert_called_with("run cmd = [echo \"hello\"] on localhost") self.assertIsNone(result) + + + @patch('subprocess.Popen') + def test_run_get_stderr_success(self, mock_popen): + # 模拟命令成功执行 + mock_process = Mock() + mock_process.communicate.return_value = (b'success', b'') + mock_popen.return_value = mock_process + + cmd = 'echo "hello"' + result = self.local_client.run_get_stderr(cmd) + + # 验证 verbose 和 Popen 调用 + self.stdio.verbose.assert_called_with("run cmd = [echo \"hello\"] on localhost") + mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') + # 验证结果 + self.assertEqual(result, b'') + + + @patch('subprocess.Popen') + def test_run_get_stderr_failure(self, mock_popen): + # 模拟命令执行失败 + mock_process = Mock() + mock_process.communicate.return_value = (b'', b'error') + mock_popen.return_value = mock_process + + cmd = 'echo "hello"' + result = self.local_client.run_get_stderr(cmd) + + # 验证 verbose 和 Popen 调用 + self.stdio.verbose.assert_called_with("run cmd = [echo \"hello\"] on localhost") + mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') + + # 验证错误处理 + # 因为 stdout 和 stderr 都是 b'',stderr 应该是 b'error' + self.assertEqual(result, b'error') + + # 检查 error 方法是否被调用,且调用内容是否正确 + # 注意:在正常情况下 error 方法不应该被调用,只有异常情况才会被调用。 + # 确保 error 方法在异常情况下被调用 + self.stdio.error.assert_not_called() + + + @patch('subprocess.Popen') + def test_run_get_stderr_exception(self, mock_popen): + # 模拟命令执行时抛出异常 + mock_popen.side_effect = Exception('Test exception') + + cmd = 'echo "hello"' + result = self.local_client.run_get_stderr(cmd) + + # 验证 verbose 调用和异常处理 + self.stdio.verbose.assert_called_with("run cmd = [echo \"hello\"] on localhost") + self.stdio.error.assert_called_with(f"run cmd = [{cmd}] on localhost") + self.assertIsNone(result) + if __name__ == '__main__': unittest.main() From 29680b1f4efdb6fb7e72423dbfb08daef6f60a06 Mon Sep 17 00:00:00 2001 From: YSevenK Date: Tue, 6 Aug 2024 15:14:30 +0800 Subject: [PATCH 07/10] fix test_command bug --- test/common/test_command.py | 113 +++++++++++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 1 deletion(-) diff --git a/test/common/test_command.py b/test/common/test_command.py index 75fa6c96..29950a1e 100644 --- a/test/common/test_command.py +++ b/test/common/test_command.py @@ -1,12 +1,33 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -* +# Copyright (c) 2022 OceanBase +# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +@time: 2024/08/06 +@file: test_command.py +@desc: 测试到command的delete_file_in_folder方法 +""" import unittest from unittest.mock import Mock, patch import subprocess -from common.command import LocalClient # 请替换为实际的模块路径 +from common.command import * + +""" +""" class TestLocalClient(unittest.TestCase): def setUp(self): self.stdio = Mock() self.local_client = LocalClient(stdio=self.stdio) + self.ssh_client = Mock() @patch('subprocess.Popen') def test_run_success(self, mock_popen): @@ -112,6 +133,96 @@ def test_run_get_stderr_exception(self, mock_popen): self.stdio.error.assert_called_with(f"run cmd = [{cmd}] on localhost") self.assertIsNone(result) + def test_download_file_success(self): + remote_path = "/remote/path/file.txt" + local_path = "/local/path/file.txt" + + result = download_file(self.ssh_client, remote_path, local_path, self.stdio) + + self.ssh_client.download.assert_called_once_with(remote_path, local_path) + self.assertEqual(result, local_path) + self.stdio.error.assert_not_called() + self.stdio.verbose.assert_not_called() + + def test_download_file_failure(self): + remote_path = "/remote/path/file.txt" + local_path = "/local/path/file.txt" + + self.ssh_client.download.side_effect = Exception("Simulated download exception") + + result = download_file(self.ssh_client, remote_path, local_path, self.stdio) + + self.ssh_client.download.assert_called_once_with(remote_path, local_path) + self.assertEqual(result, local_path) + self.stdio.error.assert_called_once_with("Download File Failed error: Simulated download exception") + self.stdio.verbose.assert_called_once() + + def test_upload_file_success(self): + local_path = "/local/path/file.txt" + remote_path = "/remote/path/file.txt" + self.ssh_client.get_name.return_value = "test_server" + + result = upload_file(self.ssh_client, local_path, remote_path, self.stdio) + + self.ssh_client.upload.assert_called_once_with(remote_path, local_path) + self.stdio.verbose.assert_called_once_with( + "Please wait a moment, upload file to server test_server, local file path /local/path/file.txt, remote file path /remote/path/file.txt" + ) + self.stdio.error.assert_not_called() + + def test_rm_rf_file_success(self): + dir_path = "/path/to/delete" + + rm_rf_file(self.ssh_client, dir_path, self.stdio) + + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/delete") + + def test_rm_rf_file_empty_dir(self): + dir_path = "" + + rm_rf_file(self.ssh_client, dir_path, self.stdio) + + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf ") + + def test_rm_rf_file_special_chars(self): + dir_path = "/path/to/delete; echo 'This is a test'" + + rm_rf_file(self.ssh_client, dir_path, self.stdio) + + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/delete; echo 'This is a test'") + + def test_delete_file_in_folder_success(self): + file_path = "/path/to/gather_pack" + + delete_file_in_folder(self.ssh_client, file_path, self.stdio) + + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/gather_pack/*") + + + def test_delete_file_in_folder_none_path(self): + file_path = None + + with self.assertRaises(Exception) as context: + delete_file_in_folder(self.ssh_client, file_path, self.stdio) + + self.assertTrue("Please check file path, None" in str(context.exception)) + + def test_delete_file_in_folder_invalid_path(self): + file_path = "/path/to/invalid_folder" + + with self.assertRaises(Exception) as context: + delete_file_in_folder(self.ssh_client, file_path, self.stdio) + + self.assertTrue("Please check file path, /path/to/invalid_folder" in str(context.exception)) + + + def test_delete_file_in_folder_special_chars(self): + file_path = "/path/to/gather_pack; echo 'test'" + + delete_file_in_folder(self.ssh_client, file_path, self.stdio) + + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/gather_pack; echo 'test'/*") + if __name__ == '__main__': unittest.main() From 760141e844e39f5a0fd3a46291c94f7addf3c72c Mon Sep 17 00:00:00 2001 From: YSevenK Date: Tue, 6 Aug 2024 16:00:03 +0800 Subject: [PATCH 08/10] fix bug:test_config_helper --- test/common/test_command.py | 2 - test/common/test_config_helper.py | 188 +++++++++++++++++++++++------- 2 files changed, 145 insertions(+), 45 deletions(-) diff --git a/test/common/test_command.py b/test/common/test_command.py index 29950a1e..dce787ae 100644 --- a/test/common/test_command.py +++ b/test/common/test_command.py @@ -20,8 +20,6 @@ import subprocess from common.command import * -""" -""" class TestLocalClient(unittest.TestCase): def setUp(self): diff --git a/test/common/test_config_helper.py b/test/common/test_config_helper.py index 5cbf90a7..747a7c58 100644 --- a/test/common/test_config_helper.py +++ b/test/common/test_config_helper.py @@ -10,53 +10,155 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. - +""" +@time: 2024/8/6 +@file: test_config_helper.py +@desc: 测试config_helper的 get_old_configuration ~ input_choice_default 方法 +""" import unittest -from unittest.mock import patch, MagicMock +from unittest import mock from common.config_helper import ConfigHelper - class TestConfigHelper(unittest.TestCase): - def setUp(self): - self.context = MagicMock() - self.context.stdio = MagicMock() - self.context.options = MagicMock() - self.context.inner_config = MagicMock() - self.config_helper = ConfigHelper(self.context) - - @patch('common.config_helper.get_observer_version') - @patch('common.config_helper.OBConnector') - def test_get_cluster_name(self, mock_connector, mock_get_observer_version): - mock_get_observer_version.return_value = "3.0.0" - mock_connector_instance = mock_connector.return_value - mock_connector_instance.execute_sql.return_value = [("cluster_name",)] - - cluster_name = self.config_helper.get_cluster_name() - - mock_connector.assert_called_once() - mock_connector_instance.execute_sql.assert_called_once_with("select cluster_name from oceanbase.v$ob_cluster") - self.assertEqual(cluster_name, "cluster_name") - - @patch('common.config_helper.get_observer_version') - @patch('common.config_helper.OBConnector') - def test_get_host_info_list_by_cluster(self, mock_connector, mock_get_observer_version): - mock_get_observer_version.return_value = "3.0.0" - mock_connector_instance = mock_connector.return_value - mock_connector_instance.execute_sql.return_value = [("192.168.1.1", 8080, "zone1", "build_version")] - - host_info_list = self.config_helper.get_host_info_list_by_cluster() - - mock_connector.assert_called_once() - mock_connector_instance.execute_sql.assert_called_once_with("select SVR_IP, SVR_PORT, ZONE, BUILD_VERSION from oceanbase.v$ob_cluster") - self.assertEqual(len(host_info_list), 1) - self.assertEqual(host_info_list[0], {"ip": "192.168.1.1"}) - - @patch('common.config_helper.get_observer_version') - @patch('common.config_helper.OBConnector') - def test_build_configuration(self, mock_connector, mock_get_observer_version): - mock_get_observer_version.return_value = "3.0.0" - self.config_helper.build_configuration() + @mock.patch('common.config_helper.YamlUtils.write_yaml_data') + @mock.patch('common.config_helper.DirectoryUtil.mkdir') + @mock.patch('common.config_helper.os.path.expanduser') + @mock.patch('common.config_helper.TimeUtils.timestamp_to_filename_time') + def test_save_old_configuration(self, mock_timestamp_to_filename_time, mock_expanduser, mock_mkdir, mock_write_yaml_data): + # 模拟时间戳生成函数,返回一个特定的值 + mock_timestamp_to_filename_time.return_value = '20240806_123456' + + # 模拟路径扩展函数 + def mock_expanduser_path(path): + return { + '~/.obdiag/config.yml': '/mock/config.yml', + '~/mock/backup/dir': '/mock/backup/dir' + }.get(path, path) # 默认返回原路径 + + mock_expanduser.side_effect = mock_expanduser_path + + # 模拟目录创建函数 + mock_mkdir.return_value = None + + # 模拟YAML数据写入函数 + mock_write_yaml_data.return_value = None + + # 创建一个模拟的上下文对象 + context = mock.MagicMock() + context.inner_config = { + "obdiag": { + "basic": { + "config_backup_dir": "~/mock/backup/dir" + } + } + } + + # 初始化ConfigHelper对象 + config_helper = ConfigHelper(context) + + # 定义一个示例配置 + sample_config = {'key': 'value'} + + # 调用需要测试的方法 + config_helper.save_old_configuration(sample_config) + + # 验证路径扩展是否被正确调用 + mock_expanduser.assert_any_call('~/.obdiag/config.yml') + mock_expanduser.assert_any_call('~/mock/backup/dir') + + # 验证目录创建是否被正确调用 + mock_mkdir.assert_called_once_with(path='/mock/backup/dir') + + # 验证YAML数据写入是否被正确调用 + expected_backup_path = '/mock/backup/dir/config_backup_20240806_123456.yml' + mock_write_yaml_data.assert_called_once_with(sample_config, expected_backup_path) + + # 测试带有默认值输入的方法 + @mock.patch('builtins.input') + def test_input_with_default(self, mock_input): + # 创建一个模拟的上下文对象(虽然该方法并不需要它) + context = mock.Mock() + config_helper = ConfigHelper(context) + + # 测试用户输入为空的情况 + mock_input.return_value = '' + result = config_helper.input_with_default('username', 'default_user') + self.assertEqual(result, 'default_user') + + # 测试用户输入为'y'的情况(应该返回默认值) + mock_input.return_value = 'y' + result = config_helper.input_with_default('username', 'default_user') + self.assertEqual(result, 'default_user') + + # 测试用户输入为'yes'的情况(应该返回默认值) + mock_input.return_value = 'yes' + result = config_helper.input_with_default('username', 'default_user') + self.assertEqual(result, 'default_user') + + # 测试用户输入为其他值的情况(应该返回用户输入) + mock_input.return_value = 'custom_user' + result = config_helper.input_with_default('username', 'default_user') + self.assertEqual(result, 'custom_user') + + # 测试带有默认值的密码输入方法 + @mock.patch('common.config_helper.pwinput.pwinput') + def test_input_password_with_default(self, mock_pwinput): + # 创建一个模拟的上下文对象 + context = mock.MagicMock() + config_helper = ConfigHelper(context) + + # 测试密码输入为空的情况,应该返回默认值 + mock_pwinput.return_value = '' + result = config_helper.input_password_with_default("password", "default_password") + self.assertEqual(result, "default_password") + + # 测试密码输入为'y'的情况,应该返回默认值 + mock_pwinput.return_value = 'y' + result = config_helper.input_password_with_default("password", "default_password") + self.assertEqual(result, "default_password") + + # 测试密码输入为'yes'的情况,应该返回默认值 + mock_pwinput.return_value = 'yes' + result = config_helper.input_password_with_default("password", "default_password") + self.assertEqual(result, "default_password") + + # 测试密码输入为其他值的情况,应该返回输入值 + mock_pwinput.return_value = 'custom_password' + result = config_helper.input_password_with_default("password", "default_password") + self.assertEqual(result, "custom_password") + + # 测试带有默认选项的选择输入方法 + @mock.patch('common.config_helper.input') + def test_input_choice_default(self, mock_input): + # 创建一个模拟的上下文对象 + context = mock.MagicMock() + config_helper = ConfigHelper(context) + + # 测试输入为'y'的情况,应该返回True + mock_input.return_value = 'y' + result = config_helper.input_choice_default("choice", "N") + self.assertTrue(result) + + # 测试输入为'yes'的情况,应该返回True + mock_input.return_value = 'yes' + result = config_helper.input_choice_default("choice", "N") + self.assertTrue(result) + + # 测试输入为'n'的情况,应该返回False + mock_input.return_value = 'n' + result = config_helper.input_choice_default("choice", "N") + self.assertFalse(result) + + # 测试输入为'no'的情况,应该返回False + mock_input.return_value = 'no' + result = config_helper.input_choice_default("choice", "N") + self.assertFalse(result) + + # 测试输入为空字符串的情况,应该返回False + mock_input.return_value = '' + result = config_helper.input_choice_default("choice", "N") + self.assertFalse(result) if __name__ == '__main__': - unittest.main() + unittest.main() \ No newline at end of file From 716cbe89f988b4a768284518b41a3eec9b14557d Mon Sep 17 00:00:00 2001 From: YSevenK Date: Tue, 6 Aug 2024 17:52:29 +0800 Subject: [PATCH 09/10] fix bugs.add unittest workflow.reformat --- .../test_command_scene_configHelper.yml | 31 ++++ test/common/test_command.py | 95 +++++------ test/common/test_config_helper.py | 44 +++-- test/common/test_scene.py | 153 +++++++++++++----- 4 files changed, 205 insertions(+), 118 deletions(-) create mode 100644 .github/workflows/test_command_scene_configHelper.yml diff --git a/.github/workflows/test_command_scene_configHelper.yml b/.github/workflows/test_command_scene_configHelper.yml new file mode 100644 index 00000000..e5647e0a --- /dev/null +++ b/.github/workflows/test_command_scene_configHelper.yml @@ -0,0 +1,31 @@ +# common包下command、scene、config_helper的测试用例 +name: Test command_scene_configHelper + +on: + push: + branches: "*" + pull_request: + branches: "*" + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 # Fetch all history for proper version detection + + - name: Set up Python 3.8 + uses: actions/setup-python@v3 + with: + python-version: 3.8 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements3.txt + + - name: Run tests + run: python -m unittest discover -s test/common -p 'test_*.py' diff --git a/test/common/test_command.py b/test/common/test_command.py index dce787ae..ac78f06e 100644 --- a/test/common/test_command.py +++ b/test/common/test_command.py @@ -18,7 +18,7 @@ import unittest from unittest.mock import Mock, patch import subprocess -from common.command import * +from common.command import * class TestLocalClient(unittest.TestCase): @@ -36,11 +36,11 @@ def test_run_success(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run(cmd) - + # 验证 verbose 和 Popen 调用 self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') - + # 验证结果 self.assertEqual(result, b'success') @@ -53,11 +53,11 @@ def test_run_failure(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run(cmd) - + # 验证 verbose 和 Popen 调用 self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') - + # 验证错误处理 self.stdio.error.assert_called_with("run cmd = [echo \"hello\"] on localhost, stderr=[b'error']") self.assertEqual(result, b'') @@ -69,13 +69,12 @@ def test_run_exception(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run(cmd) - + # 验证 verbose 调用和异常处理 self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") self.stdio.error.assert_called_with("run cmd = [echo \"hello\"] on localhost") self.assertIsNone(result) - - + @patch('subprocess.Popen') def test_run_get_stderr_success(self, mock_popen): # 模拟命令成功执行 @@ -85,15 +84,14 @@ def test_run_get_stderr_success(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run_get_stderr(cmd) - + # 验证 verbose 和 Popen 调用 self.stdio.verbose.assert_called_with("run cmd = [echo \"hello\"] on localhost") mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') - + # 验证结果 self.assertEqual(result, b'') - @patch('subprocess.Popen') def test_run_get_stderr_failure(self, mock_popen): # 模拟命令执行失败 @@ -107,16 +105,15 @@ def test_run_get_stderr_failure(self, mock_popen): # 验证 verbose 和 Popen 调用 self.stdio.verbose.assert_called_with("run cmd = [echo \"hello\"] on localhost") mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') - + # 验证错误处理 # 因为 stdout 和 stderr 都是 b'',stderr 应该是 b'error' self.assertEqual(result, b'error') - + # 检查 error 方法是否被调用,且调用内容是否正确 # 注意:在正常情况下 error 方法不应该被调用,只有异常情况才会被调用。 # 确保 error 方法在异常情况下被调用 self.stdio.error.assert_not_called() - @patch('subprocess.Popen') def test_run_get_stderr_exception(self, mock_popen): @@ -125,18 +122,18 @@ def test_run_get_stderr_exception(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run_get_stderr(cmd) - + # 验证 verbose 调用和异常处理 self.stdio.verbose.assert_called_with("run cmd = [echo \"hello\"] on localhost") self.stdio.error.assert_called_with(f"run cmd = [{cmd}] on localhost") self.assertIsNone(result) - + def test_download_file_success(self): remote_path = "/remote/path/file.txt" local_path = "/local/path/file.txt" - + result = download_file(self.ssh_client, remote_path, local_path, self.stdio) - + self.ssh_client.download.assert_called_once_with(remote_path, local_path) self.assertEqual(result, local_path) self.stdio.error.assert_not_called() @@ -145,82 +142,78 @@ def test_download_file_success(self): def test_download_file_failure(self): remote_path = "/remote/path/file.txt" local_path = "/local/path/file.txt" - + self.ssh_client.download.side_effect = Exception("Simulated download exception") - + result = download_file(self.ssh_client, remote_path, local_path, self.stdio) - + self.ssh_client.download.assert_called_once_with(remote_path, local_path) self.assertEqual(result, local_path) self.stdio.error.assert_called_once_with("Download File Failed error: Simulated download exception") self.stdio.verbose.assert_called_once() - + def test_upload_file_success(self): local_path = "/local/path/file.txt" remote_path = "/remote/path/file.txt" self.ssh_client.get_name.return_value = "test_server" - + result = upload_file(self.ssh_client, local_path, remote_path, self.stdio) - + self.ssh_client.upload.assert_called_once_with(remote_path, local_path) - self.stdio.verbose.assert_called_once_with( - "Please wait a moment, upload file to server test_server, local file path /local/path/file.txt, remote file path /remote/path/file.txt" - ) - self.stdio.error.assert_not_called() - + self.stdio.verbose.assert_called_once_with("Please wait a moment, upload file to server test_server, local file path /local/path/file.txt, remote file path /remote/path/file.txt") + self.stdio.error.assert_not_called() + def test_rm_rf_file_success(self): dir_path = "/path/to/delete" - + rm_rf_file(self.ssh_client, dir_path, self.stdio) - - self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/delete") - + + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/delete") + def test_rm_rf_file_empty_dir(self): dir_path = "" - + rm_rf_file(self.ssh_client, dir_path, self.stdio) - - self.ssh_client.exec_cmd.assert_called_once_with("rm -rf ") - + + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf ") + def test_rm_rf_file_special_chars(self): dir_path = "/path/to/delete; echo 'This is a test'" - + rm_rf_file(self.ssh_client, dir_path, self.stdio) - + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/delete; echo 'This is a test'") def test_delete_file_in_folder_success(self): file_path = "/path/to/gather_pack" - + delete_file_in_folder(self.ssh_client, file_path, self.stdio) - + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/gather_pack/*") - def test_delete_file_in_folder_none_path(self): file_path = None - + with self.assertRaises(Exception) as context: delete_file_in_folder(self.ssh_client, file_path, self.stdio) - + self.assertTrue("Please check file path, None" in str(context.exception)) - + def test_delete_file_in_folder_invalid_path(self): file_path = "/path/to/invalid_folder" - + with self.assertRaises(Exception) as context: delete_file_in_folder(self.ssh_client, file_path, self.stdio) - + self.assertTrue("Please check file path, /path/to/invalid_folder" in str(context.exception)) - def test_delete_file_in_folder_special_chars(self): file_path = "/path/to/gather_pack; echo 'test'" - + delete_file_in_folder(self.ssh_client, file_path, self.stdio) - + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/gather_pack; echo 'test'/*") - + if __name__ == '__main__': unittest.main() diff --git a/test/common/test_config_helper.py b/test/common/test_config_helper.py index 747a7c58..0137dd73 100644 --- a/test/common/test_config_helper.py +++ b/test/common/test_config_helper.py @@ -19,6 +19,7 @@ from unittest import mock from common.config_helper import ConfigHelper + class TestConfigHelper(unittest.TestCase): @mock.patch('common.config_helper.YamlUtils.write_yaml_data') @mock.patch('common.config_helper.DirectoryUtil.mkdir') @@ -27,48 +28,39 @@ class TestConfigHelper(unittest.TestCase): def test_save_old_configuration(self, mock_timestamp_to_filename_time, mock_expanduser, mock_mkdir, mock_write_yaml_data): # 模拟时间戳生成函数,返回一个特定的值 mock_timestamp_to_filename_time.return_value = '20240806_123456' - + # 模拟路径扩展函数 def mock_expanduser_path(path): - return { - '~/.obdiag/config.yml': '/mock/config.yml', - '~/mock/backup/dir': '/mock/backup/dir' - }.get(path, path) # 默认返回原路径 - + return {'~/.obdiag/config.yml': '/mock/config.yml', '~/mock/backup/dir': '/mock/backup/dir'}.get(path, path) # 默认返回原路径 + mock_expanduser.side_effect = mock_expanduser_path - + # 模拟目录创建函数 mock_mkdir.return_value = None # 模拟YAML数据写入函数 mock_write_yaml_data.return_value = None - + # 创建一个模拟的上下文对象 context = mock.MagicMock() - context.inner_config = { - "obdiag": { - "basic": { - "config_backup_dir": "~/mock/backup/dir" - } - } - } - + context.inner_config = {"obdiag": {"basic": {"config_backup_dir": "~/mock/backup/dir"}}} + # 初始化ConfigHelper对象 config_helper = ConfigHelper(context) # 定义一个示例配置 sample_config = {'key': 'value'} - + # 调用需要测试的方法 config_helper.save_old_configuration(sample_config) - + # 验证路径扩展是否被正确调用 mock_expanduser.assert_any_call('~/.obdiag/config.yml') mock_expanduser.assert_any_call('~/mock/backup/dir') - + # 验证目录创建是否被正确调用 mock_mkdir.assert_called_once_with(path='/mock/backup/dir') - + # 验证YAML数据写入是否被正确调用 expected_backup_path = '/mock/backup/dir/config_backup_20240806_123456.yml' mock_write_yaml_data.assert_called_once_with(sample_config, expected_backup_path) @@ -99,7 +91,7 @@ def test_input_with_default(self, mock_input): mock_input.return_value = 'custom_user' result = config_helper.input_with_default('username', 'default_user') self.assertEqual(result, 'custom_user') - + # 测试带有默认值的密码输入方法 @mock.patch('common.config_helper.pwinput.pwinput') def test_input_password_with_default(self, mock_pwinput): @@ -111,7 +103,7 @@ def test_input_password_with_default(self, mock_pwinput): mock_pwinput.return_value = '' result = config_helper.input_password_with_default("password", "default_password") self.assertEqual(result, "default_password") - + # 测试密码输入为'y'的情况,应该返回默认值 mock_pwinput.return_value = 'y' result = config_helper.input_password_with_default("password", "default_password") @@ -125,7 +117,7 @@ def test_input_password_with_default(self, mock_pwinput): # 测试密码输入为其他值的情况,应该返回输入值 mock_pwinput.return_value = 'custom_password' result = config_helper.input_password_with_default("password", "default_password") - self.assertEqual(result, "custom_password") + self.assertEqual(result, "custom_password") # 测试带有默认选项的选择输入方法 @mock.patch('common.config_helper.input') @@ -138,7 +130,7 @@ def test_input_choice_default(self, mock_input): mock_input.return_value = 'y' result = config_helper.input_choice_default("choice", "N") self.assertTrue(result) - + # 测试输入为'yes'的情况,应该返回True mock_input.return_value = 'yes' result = config_helper.input_choice_default("choice", "N") @@ -157,8 +149,8 @@ def test_input_choice_default(self, mock_input): # 测试输入为空字符串的情况,应该返回False mock_input.return_value = '' result = config_helper.input_choice_default("choice", "N") - self.assertFalse(result) + self.assertFalse(result) if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/test/common/test_scene.py b/test/common/test_scene.py index 135447df..21ad57d3 100644 --- a/test/common/test_scene.py +++ b/test/common/test_scene.py @@ -11,64 +11,135 @@ # See the Mulan PSL v2 for more details. """ -@time: 2024/01/16 -@file: scene.py -@desc: +@time: 2024/8/6 +@file: test_scene.py +@desc: 为scene模块中filter_by_version和get_version_by_type函数进行单元测试 """ - import unittest -from unittest.mock import MagicMock -from common.scene import filter_by_version +from unittest.mock import MagicMock, patch +from common.scene import * class TestFilterByVersion(unittest.TestCase): def setUp(self): self.stdio = MagicMock() - self.scene = [{"version": "[1.0.0,2.0.0)"}, {"version": "(1.0.0,2.0.0]"}] - self.cluster = {"version": "1.5.0"} + StringUtils.compare_versions_greater = MagicMock() + self.context = MagicMock() + self.context.stdio = MagicMock() - def test_filter_by_version_with_valid_version(self): - # Test case where cluster version is within the range specified in the scene - result = filter_by_version(self.scene, self.cluster, self.stdio) - self.assertEqual(result, 1) + def test_no_version_in_cluster(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {} + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) - def test_filter_by_version_with_invalid_version(self): - # Test case where cluster version is outside the range specified in the scene - self.cluster["version"] = "0.5.0" - result = filter_by_version(self.scene, self.cluster, self.stdio) + def test_empty_version_in_cluster(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {"version": ""} + result = filter_by_version(scene, cluster, self.stdio) self.assertEqual(result, 0) - def test_filter_by_version_with_wildcard_min_version(self): - # Test case where min version is wildcard (*) and cluster version is valid - self.scene[0]["version"] = "[*,2.0.0)" - result = filter_by_version(self.scene, self.cluster, self.stdio) - self.assertEqual(result, 1) + def test_version_not_string(self): + scene = [{"version": 123}] + cluster = {"version": "1.5"} + with self.assertRaises(Exception): + filter_by_version(scene, cluster, self.stdio) - def test_filter_by_version_with_wildcard_max_version(self): - # Test case where max version is wildcard (*) and cluster version is valid - self.scene[1]["version"] = "(1.0.0,*]" - result = filter_by_version(self.scene, self.cluster, self.stdio) - self.assertEqual(result, 2) + def test_version_match_min(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {"version": "1.0"} + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) - def test_filter_by_version_with_non_string_version(self): - # Test case where version is not a string - self.scene[0]["version"] = str(1.0) - with self.assertRaises(Exception) as context: - filter_by_version(self.scene, self.cluster, self.stdio) - self.assertTrue("filter_by_version steps_version Exception" in str(context.exception)) + def test_version_match_max(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {"version": "2.0"} + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) + + def test_version_in_range(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {"version": "1.5"} + StringUtils.compare_versions_greater.side_effect = [True, True] + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) - def test_filter_by_version_no_version_in_cluster(self): - # Test case where version is not specified in the cluster - del self.cluster["version"] - result = filter_by_version(self.scene, self.cluster, self.stdio) + def test_version_out_of_range(self): + scene = [{"version": "[1.0,2.0]"}, {"version": "[2.0,3.0]"}] + cluster = {"version": "2.5"} + StringUtils.compare_versions_greater.side_effect = [False, True, True, True] + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 1) + + def test_no_version_in_steps(self): + scene = [{}] + cluster = {"version": "1.0"} + result = filter_by_version(scene, cluster, self.stdio) self.assertEqual(result, 0) - def test_filter_by_version_no_version_in_steps(self): - # Test case where no version is specified in any steps - self.scene = [{"some_key": "some_value"}] - result = filter_by_version(self.scene, self.cluster, self.stdio) + def test_no_matching_version(self): + scene = [{"version": "[1.0,2.0]"}, {"version": "[2.0,3.0]"}] + cluster = {"version": "3.5"} + StringUtils.compare_versions_greater.return_value = False + result = filter_by_version(scene, cluster, self.stdio) self.assertEqual(result, -1) + def test_wildcard_min_version(self): + scene = [{"version": "[*,2.0]"}] + cluster = {"version": "1.0"} + StringUtils.compare_versions_greater.side_effect = [True, True] + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) + + def test_wildcard_max_version(self): + scene = [{"version": "[1.0,*]"}] + cluster = {"version": "3.0"} + StringUtils.compare_versions_greater.side_effect = [True, True] + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) + + @patch('common.scene.get_observer_version') + def test_get_observer_version(self, mock_get_observer_version): + mock_get_observer_version.return_value = "1.0.0" + result = get_version_by_type(self.context, "observer") + self.assertEqual(result, "1.0.0") + mock_get_observer_version.assert_called_once_with(self.context) + + @patch('common.scene.get_observer_version') + def test_get_other_version(self, mock_get_observer_version): + mock_get_observer_version.return_value = "2.0.0" + result = get_version_by_type(self.context, "other") + self.assertEqual(result, "2.0.0") + mock_get_observer_version.assert_called_once_with(self.context) + + @patch('common.scene.get_observer_version') + def test_get_observer_version_fail(self, mock_get_observer_version): + mock_get_observer_version.side_effect = Exception("Observer error") + with self.assertRaises(Exception) as context: + get_version_by_type(self.context, "observer") + self.assertIn("can't get observer version", str(context.exception)) + self.context.stdio.warn.assert_called_once() + + @patch('common.scene.get_obproxy_version') + def test_get_obproxy_version(self, mock_get_obproxy_version): + mock_get_obproxy_version.return_value = "3.0.0" + result = get_version_by_type(self.context, "obproxy") + self.assertEqual(result, "3.0.0") + mock_get_obproxy_version.assert_called_once_with(self.context) + + def test_unsupported_type(self): + with self.assertRaises(Exception) as context: + get_version_by_type(self.context, "unsupported") + self.assertIn("No support to get the version", str(context.exception)) + + @patch('common.scene.get_observer_version') + def test_general_exception_handling(self, mock_get_observer_version): + mock_get_observer_version.side_effect = Exception("Unexpected error") + with self.assertRaises(Exception) as context: + get_version_by_type(self.context, "observer") + self.assertIn("can't get observer version", str(context.exception)) + self.context.stdio.exception.assert_called_once() + -if __name__ == "__main__": +if __name__ == '__main__': unittest.main() From 256afc4c99ccd0e0e73bf1997c57cb14425bec9d Mon Sep 17 00:00:00 2001 From: xuyan wang <35394786+wayyoungboy@users.noreply.github.com> Date: Wed, 7 Aug 2024 11:10:23 +0800 Subject: [PATCH 10/10] Delete test/__init__.py --- test/__init__.py | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 test/__init__.py diff --git a/test/__init__.py b/test/__init__.py deleted file mode 100644 index 4f2405d9..00000000 --- a/test/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -* -# Copyright (c) 2022 OceanBase -# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -# See the Mulan PSL v2 for more details. - -""" -@time: 2022/6/20 -@file: __init__.py -@desc: -"""