Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary Search: show matching strings #1227

Closed
wants to merge 8 commits into from
70 changes: 53 additions & 17 deletions src/helperFunctions/yara_binary_search.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import re
import subprocess
from os.path import basename
from pathlib import Path
from subprocess import PIPE, STDOUT, CalledProcessError
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -37,7 +37,8 @@ def _execute_yara_search(self, rule_file_path: str, target_path: str | None = No
:return: The output from the yara scan.
"""
compiled_flag = '-C' if Path(rule_file_path).read_bytes().startswith(b'YARA') else ''
command = f'yara -r {compiled_flag} {rule_file_path} {target_path or self.db_path}'
# -r: recursive, -s: print strings, -N: no follow symlinks
command = f'yara -r -s -N {compiled_flag} {rule_file_path} {target_path or self.db_path}'
yara_process = subprocess.run(command, shell=True, stdout=PIPE, stderr=STDOUT, text=True, check=False)
return yara_process.stdout

Expand All @@ -50,24 +51,61 @@ def _get_file_paths_of_files_included_in_fw(self, fw_uid: str) -> list[str]:
return [self.fs_organizer.generate_path_from_uid(uid) for uid in self.db.get_all_files_in_fw(fw_uid)]

@staticmethod
def _parse_raw_result(raw_result: str) -> dict[str, list[str]]:
def _parse_raw_result(
raw_result: str, match_limit: int = 20, match_len_limit: int = 50
) -> dict[str, dict[str, list[dict]]]:
"""
YARA scan results have the following structure:
<rule_name> <matching_file_path>
<offset>:<condition>: <matching_string>
<offset>:<condition>: <matching_string>
...
<rule_name> <matching_file_path>
...

We parse the results and put them into a dictionary of the following form:
{
<uid:str>: {
<rule:str>: [
{
"offset": <offset in hex:str>,
"condition": <condition name:str>,
"match": <matching string:str>,
},
... (max match_limit)
]
},
...
}

:param raw_result: raw yara scan result
:return: dict of matching rules with lists of matched UIDs as values
:param match_limit: maximum number of stored strings per rule
:param match_len_limit: maximum length of stored strings
:return: dict of matching files, rules and strings
"""
results = {}
for line in raw_result.split('\n'):
if line and 'warning' not in line:
rule, match = line.split(' ')
results.setdefault(rule, []).append(basename(match)) # noqa: PTH119
for result_str in re.findall(
# <rule_name> <path> <offset> <condition> <string>
r'[a-zA-Z_][a-zA-Z0-9_]+ [^\n]+\n(?:0x[0-9a-f]+:\$[a-zA-Z0-9_]+: .+\n)+',
raw_result,
):
rule_str, *match_lines = result_str.splitlines()
rule, path_str = rule_str.split(' ', maxsplit=1)
uid = Path(path_str).name
results.setdefault(uid, {}).setdefault(rule, [])
for match_line in match_lines:
offset, condition, match_str = match_line.split(':', maxsplit=2)
match_str = match_str[1:] # remove the space at the beginning
if len(match_str) > match_len_limit:
match_str = match_str[:match_len_limit] + '...'
results[uid][rule].append({'offset': offset, 'condition': condition, 'match': match_str})
if len(results[uid][rule]) >= match_limit:
# only collect at most <match_limit> matching strings to avoid storing loads of unnecessary data
# in case of very general rules with lots of matches
break
return results

@staticmethod
def _eliminate_duplicates(result_dict: dict[str, list[str]]):
for key in result_dict:
result_dict[key] = sorted(set(result_dict[key]))

def get_binary_search_result(self, task: tuple[bytes, str | None]) -> dict[str, list[str]] | str:
def get_binary_search_result(self, task: tuple[bytes, str | None]) -> dict[str, dict[str, list[dict]]] | str:
"""
Perform a yara search on the files in the database.

Expand All @@ -80,9 +118,7 @@ def get_binary_search_result(self, task: tuple[bytes, str | None]) -> dict[str,
try:
self._prepare_temp_rule_file(temp_rule_file, yara_rules)
raw_result = self._get_raw_result(firmware_uid, temp_rule_file)
results = self._parse_raw_result(raw_result)
self._eliminate_duplicates(results)
return results
return self._parse_raw_result(raw_result)
except yara.SyntaxError as yara_error:
return f'There seems to be an error in the rule file:\n{yara_error}'
except CalledProcessError as process_error:
Expand Down
4 changes: 2 additions & 2 deletions src/intercom/back_end_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ class InterComBackEndBinarySearchTask(InterComListenerAndResponder):

def get_response(self, task):
yara_binary_searcher = YaraBinarySearchScanner()
uid_list = yara_binary_searcher.get_binary_search_result(task)
return uid_list, task
search_result = yara_binary_searcher.get_binary_search_result(task)
return search_result, task


class InterComBackEndDeleteFile(InterComListenerAndResponder):
Expand Down
7 changes: 6 additions & 1 deletion src/storage/db_interface_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class MetaEntry(NamedTuple):
class CachedQuery(NamedTuple):
query: str
yara_rule: str
match_data: dict[str, dict[str, list[dict]]] | None


class FrontEndDbInterface(DbInterfaceCommon):
Expand Down Expand Up @@ -369,7 +370,11 @@ def get_query_from_cache(self, query_id: str) -> CachedQuery | None:
entry: SearchCacheEntry = session.get(SearchCacheEntry, query_id)
if entry is None:
return None
return CachedQuery(query=entry.query, yara_rule=entry.yara_rule)
return CachedQuery(
query=entry.query,
yara_rule=entry.yara_rule,
match_data=entry.match_data,
)

def get_total_cached_query_count(self):
with self.get_read_only_session() as session:
Expand Down
9 changes: 7 additions & 2 deletions src/storage/db_interface_frontend_editing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ def delete_comment(self, uid, timestamp):
fo_entry: FileObjectEntry = session.get(FileObjectEntry, uid)
fo_entry.comments = [comment for comment in fo_entry.comments if comment['time'] != timestamp]

def add_to_search_query_cache(self, search_query: str, query_title: str | None = None) -> str:
def add_to_search_query_cache(self, search_query: str, match_data: dict, query_title: str | None = None) -> str:
query_uid = create_uid(query_title.encode())
with self.get_read_write_session() as session:
old_entry = session.get(SearchCacheEntry, query_uid)
if old_entry is not None: # update existing entry
session.delete(old_entry)
new_entry = SearchCacheEntry(uid=query_uid, query=search_query, yara_rule=query_title)
new_entry = SearchCacheEntry(
uid=query_uid,
query=search_query,
yara_rule=query_title,
match_data=match_data,
)
session.add(new_entry)
return query_uid
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Added matching strings to binary search cache

Revision ID: 81a549a2be95
Revises: 05d8effce8b3
Create Date: 2024-06-24 17:00:37.464098

"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '81a549a2be95'
down_revision = '05d8effce8b3'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
'search_cache',
sa.Column('match_data', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
)


def downgrade() -> None:
op.drop_column('search_cache', 'match_data')
1 change: 1 addition & 0 deletions src/storage/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class SearchCacheEntry(Base):
uid = mapped_column(UID, primary_key=True)
query = mapped_column(VARCHAR, nullable=False) # the query that searches for the files that the YARA rule matched
yara_rule = mapped_column(VARCHAR, nullable=False)
match_data = mapped_column(MutableDict.as_mutable(JSONB), nullable=True)


class WebInterfaceTemplateEntry(Base):
Expand Down
12 changes: 7 additions & 5 deletions src/test/integration/storage/test_db_interface_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,28 +568,30 @@ def test_get_tag_list(frontend_db, backend_db):
def test_get_query_from_cache(frontend_db, frontend_editing_db):
assert frontend_db.get_query_from_cache('non-existent') is None

id_ = frontend_editing_db.add_to_search_query_cache('foo', 'bar')
match_data = {'uid': {'rule': []}}
id_ = frontend_editing_db.add_to_search_query_cache('foo', match_data, 'bar')
entry = frontend_db.get_query_from_cache(id_)
assert isinstance(entry, CachedQuery)
assert entry.query == 'foo'
assert entry.yara_rule == 'bar'
assert entry.match_data == match_data


def test_get_cached_count(frontend_db, frontend_editing_db):
assert frontend_db.get_total_cached_query_count() == 0

frontend_editing_db.add_to_search_query_cache('foo', 'bar')
frontend_editing_db.add_to_search_query_cache('foo', {}, 'bar')
assert frontend_db.get_total_cached_query_count() == 1

frontend_editing_db.add_to_search_query_cache('bar', 'foo')
frontend_editing_db.add_to_search_query_cache('bar', {}, 'foo')
assert frontend_db.get_total_cached_query_count() == 2 # noqa: PLR2004


def test_search_query_cache(frontend_db, frontend_editing_db):
assert frontend_db.search_query_cache(offset=0, limit=10) == []

id1 = frontend_editing_db.add_to_search_query_cache('foo', 'rule bar{}')
id2 = frontend_editing_db.add_to_search_query_cache('bar', 'rule foo{}')
id1 = frontend_editing_db.add_to_search_query_cache('foo', {}, 'rule bar{}')
id2 = frontend_editing_db.add_to_search_query_cache('bar', {}, 'rule foo{}')
assert sorted(frontend_db.search_query_cache(offset=0, limit=10)) == [
(id1, 'rule bar{}', ['bar']),
(id2, 'rule foo{}', ['foo']),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,20 @@ def test_search_cache_insert(frontend_editing_db, frontend_db):
result = frontend_db.get_query_from_cache(RULE_UID)
assert result is None

result = frontend_editing_db.add_to_search_query_cache('{"foo": "bar"}', 'rule foo{}')
match_data = {'some_uid': {'foo': []}}
result = frontend_editing_db.add_to_search_query_cache('{"foo": "bar"}', match_data, 'rule foo{}')
assert result == RULE_UID

result = frontend_db.get_query_from_cache(RULE_UID)
assert isinstance(result, CachedQuery)
assert result.query == '{"foo": "bar"}'
assert result.yara_rule == 'rule foo{}'
assert result.match_data == match_data


def test_search_cache_update(frontend_editing_db, frontend_db):
assert frontend_editing_db.add_to_search_query_cache('{"uid": "some uid"}', 'rule foo{}') == RULE_UID
assert frontend_editing_db.add_to_search_query_cache('{"uid": "some uid"}', {}, 'rule foo{}') == RULE_UID
# update
assert frontend_editing_db.add_to_search_query_cache('{"uid": "some other uid"}', 'rule foo{}') == RULE_UID
assert frontend_editing_db.add_to_search_query_cache('{"uid": "some other uid"}', {}, 'rule foo{}') == RULE_UID

assert frontend_db.get_query_from_cache(RULE_UID).query == '{"uid": "some other uid"}'
45 changes: 36 additions & 9 deletions src/test/unit/helperFunctions/test_yara_binary_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TEST_FILE_1 = 'binary_search_test'
TEST_FILE_2 = 'binary_search_test_2'
TEST_FILE_3 = 'binary_search_test_3'
MATCH_DATA_KEYS = {'condition', 'match', 'offset'}


class MockCommonDbInterface:
Expand All @@ -37,12 +38,20 @@ def setUp(self):

def test_get_binary_search_result(self):
result = self.yara_binary_scanner.get_binary_search_result((self.yara_rule, None))
assert result == {'test_rule': [TEST_FILE_1]}
assert TEST_FILE_1 in result
assert 'test_rule' in result[TEST_FILE_1]
match_data = result[TEST_FILE_1]['test_rule']
assert len(match_data) == 1
assert all(k in m for k in MATCH_DATA_KEYS for m in match_data)

def test_get_binary_search_result_for_single_firmware(self):
yara_rule = b'rule test_rule_2 {strings: $a = "TEST_STRING!" condition: $a}'
result = self.yara_binary_scanner.get_binary_search_result((yara_rule, 'single_firmware'))
assert result == {'test_rule_2': [TEST_FILE_2]}
assert TEST_FILE_2 in result
assert 'test_rule_2' in result[TEST_FILE_2]
match_data = result[TEST_FILE_2]['test_rule_2']
assert len(match_data) == 1
assert all(k in m for k in MATCH_DATA_KEYS for m in match_data)

result = self.yara_binary_scanner.get_binary_search_result((yara_rule, 'foobar'))
assert result == {}
Expand All @@ -58,15 +67,33 @@ def test_get_binary_search_yara_error(self, _): # noqa: PT019
assert isinstance(result, str)
assert 'Error when calling YARA' in result

def test_eliminate_duplicates(self):
test_dict = {1: [1, 2, 3, 3], 2: [1, 1, 2, 3]}
self.yara_binary_scanner._eliminate_duplicates(test_dict)
assert test_dict == {1: [1, 2, 3], 2: [1, 2, 3]}

def test_parse_raw_result(self):
raw_result = 'rule_1 match_1\nrule_1 match_2\nrule_2 match_1'
raw_result = (
'rule_1 /media/data/fact_fw_data/00/uid1\n'
'0x123:$a: foo\n'
'0x456:$a: bar\n'
'rule_1 /media/data/fact_fw_data/99/uid2\n'
'0x321:$b: test123\n'
'rule_2 /media/data/fact_fw_data/00/uid1\n'
'0x666:$c: deadbeef\n'
)
result = self.yara_binary_scanner._parse_raw_result(raw_result)
assert result == {'rule_1': ['match_1', 'match_2'], 'rule_2': ['match_1']}
assert result == {
'uid1': {
'rule_1': [
{'condition': '$a', 'match': 'foo', 'offset': '0x123'},
{'condition': '$a', 'match': 'bar', 'offset': '0x456'},
],
'rule_2': [
{'condition': '$c', 'match': 'deadbeef', 'offset': '0x666'},
],
},
'uid2': {
'rule_1': [
{'condition': '$b', 'match': 'test123', 'offset': '0x321'},
],
},
}

def test_execute_yara_search(self):
test_rule_path = path.join(get_test_data_dir(), 'yara_binary_search_test_rule') # noqa: PTH118
Expand Down
2 changes: 1 addition & 1 deletion src/test/unit/web_interface/test_app_binary_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def add_to_search_query_cache(*_, **__):
@staticmethod
def get_query_from_cache(query_id):
if query_id == QUERY_CACHE_UID:
return CachedQuery(query='{"uid": {"$in": ["test_uid"]}}', yara_rule='some yara rule')
return CachedQuery(query='{"uid": {"$in": ["test_uid"]}}', yara_rule='some yara rule', match_data={})
return None


Expand Down
7 changes: 0 additions & 7 deletions src/test/unit/web_interface/test_app_jinja_filter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest
from flask import render_template_string

from storage.db_interface_frontend import MetaEntry
from web_interface.components.jinja_filter import FilterClass


Expand All @@ -19,12 +18,6 @@ def test_filter_replace_uid_with_file_name(self, web_frontend, filter_class):
result = _get_template_filter_output(web_frontend, test_string, 'replace_uid_with_file_name')
assert '>test_name<' in result

def test_filter_firmware_detail_tabular_field(self, web_frontend, filter_class):
test_firmware_meta_data = MetaEntry('UID', 'HID', {'tag1': 'danger', 'tag2': 'default'}, 0)
result = _get_template_filter_output(web_frontend, test_firmware_meta_data, 'firmware_detail_tabular_field')
for expected_part in ['/analysis/UID', 'HID', 'tag1<', 'tag2<']:
assert expected_part in result

def test_filter_replace_uid_with_hid(self, filter_class):
one_uid = f'{"a" * 64}_1234'
assert filter_class._filter_replace_uid_with_hid(f'{one_uid}_{one_uid}') == 'TEST_FW_HID_TEST_FW_HID'
Expand Down
Loading