Skip to content

Commit

Permalink
Merge pull request #1068 from wazuh/add-mock-decorator
Browse files Browse the repository at this point in the history
Improve stop/start behavior of DB's related functions
  • Loading branch information
BraulioV authored Feb 16, 2021
2 parents 68f04cd + b5c8122 commit 6af9382
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 155 deletions.
39 changes: 38 additions & 1 deletion deps/wazuh_testing/wazuh_testing/vulnerability_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

import datetime
import functools
import json
import os
import re
import sqlite3
import random
from time import time, sleep

from wazuh_testing.tools import WAZUH_PATH
from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH
from wazuh_testing.tools import file
from wazuh_testing.tools.services import control_service, check_if_process_is_running

Expand Down Expand Up @@ -98,6 +99,42 @@
"اختبار"]


def mock_cve_db(func):
"""Decorator used in any function that needs to mock cve.db
This function will execute `func` after stopping wazuh-modulesd and wazuh-db and cleaning the db. After that,
it will start the daemons again, clean the logs, etc.
Args:
func (callable): function that will mock the cve.db
Example:
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
@functools.wraps(func)
def magic(*args, **kwargs):
control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')

# Clean tables
clean_vd_tables(agent=kwargs['mock_agent'])

func(*args, **kwargs)

# Truncate ossec.log
file.truncate_file(LOG_FILE_PATH)

control_service('start', daemon='wazuh-modulesd')
control_service('start', daemon='wazuh-db')

yield kwargs['request'].param

clean_vuln_and_sys_programs_tables(agent=kwargs['mock_agent'])

return magic


def callback_detect_vulnerability_scan_sleeping(line):
msg = rf'{VULNERABILITY_DETECTOR_PREFIX} Sleeping for (.*)...'
match = re.match(msg, line)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@
# Created by Wazuh, Inc. <info@wazuh.com>.
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

import json
import os
from time import sleep

import pytest

from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools import file
from wazuh_testing import vulnerability_detector as vd
from wazuh_testing.tools.services import control_service

# Marks
pytestmark = pytest.mark.tier(level=0)
Expand Down Expand Up @@ -47,17 +44,11 @@ def get_configuration(request):


@pytest.fixture(scope='module', params=debian_vulnerabilities, ids=debian_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system
"""

control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')

# Clean tables
vd.clean_vd_tables(agent=mock_agent)

# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME)
Expand All @@ -68,15 +59,6 @@ def mock_vulnerability_scan(request, mock_agent):
vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'],
target=request.param['target'])

control_service('start', daemon='wazuh-db')
control_service('start', daemon='wazuh-modulesd')

file.truncate_file(LOG_FILE_PATH)

yield request.param

vd.clean_vuln_and_sys_programs_tables(agent=mock_agent)


def test_debian_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd,
mock_vulnerability_scan):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@
# Created by Wazuh, Inc. <info@wazuh.com>.
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

import json
import os
from time import sleep

import pytest
import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools import file
from wazuh_testing.tools.services import control_service

# Marks
pytestmark = pytest.mark.tier(level=0)
Expand Down Expand Up @@ -47,16 +44,11 @@ def get_configuration(request):


@pytest.fixture(scope='module', params=macos_vulnerabilities, ids=macos_systems)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system
"""
control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')

# Clean tables
vd.clean_vd_tables(agent=mock_agent)

# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME,
Expand All @@ -70,16 +62,6 @@ def mock_vulnerability_scan(request, mock_agent):
for vulnerability in request.param['vulnerabilities']:
vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name'])

control_service('start', daemon='wazuh-db')
control_service('start', daemon='wazuh-modulesd')

# Truncate ossec.log
file.truncate_file(LOG_FILE_PATH)

yield request.param

vd.clean_vuln_and_sys_programs_tables(agent=mock_agent)


def test_macos_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd,
mock_vulnerability_scan):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

import os
from time import sleep

import pytest

import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools import file
from wazuh_testing.tools.services import control_service

# Marks
pytestmark = pytest.mark.tier(level=1)
Expand Down Expand Up @@ -60,17 +58,12 @@ def get_configuration(request):


@pytest.fixture(scope='module', params=system_data, ids=system_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
It allows to mock the vulnerability scan inserting custom hotfixes, feeds and changing the host system
"""
control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')

vd.clean_vd_tables(agent=mock_agent)

# Modify the necessary databases. The arch follows a special format rather than the
# usual x86_64.
# Modify the necessary databases. The arch follows a special format rather than the usual x86_64.
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME)

Expand All @@ -79,16 +72,6 @@ def mock_vulnerability_scan(request, mock_agent):
for patch in request.param["hotfixes"]:
vd.insert_hotfix(agent=mock_agent, hotfix=patch)

control_service('start', daemon='wazuh-db')
control_service('start', daemon='wazuh-modulesd')

# Truncate ossec.log
file.truncate_file(LOG_FILE_PATH)

yield request.param

vd.clean_vuln_and_sys_programs_tables(agent=mock_agent)


def is_hotfix_installed(cve_patch, dependencies, hotfixes):
"""
Expand Down Expand Up @@ -119,7 +102,7 @@ def is_hotfix_installed(cve_patch, dependencies, hotfixes):


def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd,
mock_vulnerability_scan):
mock_vulnerability_scan, mock_agent):
"""
Check if a missing patch triggers a vulnerability(only windows).
"""
Expand All @@ -133,17 +116,16 @@ def test_vulnerabilities_report(get_configuration, configure_environment, restar
timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT,
update_position=False,
callback=vd.make_vuln_callback(
f"Agent '000' has installed '{hotfix}' that corrects the vulnerability '{cve}'"
f"Agent '{mock_agent}' has installed '{hotfix}' that corrects the vulnerability '{cve}'"
),
error_message=f"Could not find the report which says that the patch {hotfix}" +
f" solves {cve}"
error_message=f"Could not find the report which says that the patch {hotfix} solves {cve}"
)
else:
wazuh_log_monitor.start(
timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT,
update_position=False,
callback=vd.make_vuln_callback(
f"Agent '000' is vulnerable to '{cve}'. Condition: 'KB{hotfix} patch is not installed'"
f"Agent '{mock_agent}' is vulnerable to '{cve}'. Condition: 'KB{hotfix} patch is not installed'"
),
error_message=f"Could not find the report which says that the system" +
f" is vulnerable to {cve} due to missing {hotfix}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

import os

import pytest

import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
Expand Down Expand Up @@ -48,7 +48,7 @@ def get_configuration(request):
def test_redhat_duplicate_vulns(clean_vuln_tables, get_configuration, configure_environment, restart_modulesd):
"""
RedHat provider was duplicating vulnerabilities when it downloaded a feed to update the database.
This test check the vulnerabilites are not repeated in the database when it is update.
This test check the vulnerabilities are not repeated in the database when it is update.
"""
feed = get_configuration['metadata']['feed']
timestamp = '2020-10-31T20:46:48'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

import os
from time import sleep

import pytest

from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools import file
from wazuh_testing import vulnerability_detector as vd
from wazuh_testing.tools.services import control_service

# Marks
pytestmark = pytest.mark.tier(level=0)
Expand Down Expand Up @@ -46,16 +44,11 @@ def get_configuration(request):


@pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system
"""
control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')

# Clean tables
vd.clean_vd_tables(agent=mock_agent)

# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME)
Expand All @@ -66,15 +59,6 @@ def mock_vulnerability_scan(request, mock_agent):
vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'],
target=request.param['target'])

control_service('start', daemon='wazuh-db')
control_service('start', daemon='wazuh-modulesd')

file.truncate_file(LOG_FILE_PATH)

yield request.param

vd.clean_vuln_and_sys_programs_tables(agent=mock_agent)


def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd,
mock_vulnerability_scan):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

import os
from time import sleep

import pytest

import wazuh_testing.vulnerability_detector as vd
from wazuh_testing.tools import LOG_FILE_PATH
from wazuh_testing.tools.configuration import load_wazuh_configurations
from wazuh_testing.tools.monitoring import FileMonitor
from wazuh_testing.tools import file
from wazuh_testing.tools.services import control_service

# Marks
pytestmark = pytest.mark.tier(level=1)
Expand Down Expand Up @@ -83,16 +81,11 @@ def get_configuration(request):


@pytest.fixture(scope='module', params=system_data, ids=system_data_ids)
@vd.mock_cve_db
def mock_vulnerability_scan(request, mock_agent):
"""
It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system
"""
control_service('stop', daemon='wazuh-modulesd')
control_service('stop', daemon='wazuh-db')

# Clean tables
vd.clean_vd_tables(agent=mock_agent)

# Mock system
vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'],
os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME)
Expand All @@ -107,16 +100,6 @@ def mock_vulnerability_scan(request, mock_agent):
vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'],
format=request.param['format'], agent=mock_agent)

control_service('start', daemon='wazuh-db')
control_service('start', daemon='wazuh-modulesd')

# Truncate ossec.log
file.truncate_file(LOG_FILE_PATH)

yield request.param

vd.clean_vuln_and_sys_programs_tables(agent=mock_agent)


def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd,
mock_vulnerability_scan):
Expand Down
Loading

0 comments on commit 6af9382

Please sign in to comment.