-
Notifications
You must be signed in to change notification settings - Fork 3
/
local.py
95 lines (84 loc) · 4.52 KB
/
local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# -*- coding: utf-8 -*-
"""
this file implements the core functionality to hunt for any sensitive files on NFS shares.
"""
__author__ = "Lukas Reiter"
__license__ = "GPL v3.0"
__copyright__ = """Copyright 2018 Lukas Reiter
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
__version__ = 0.1
import os
import glob
import logging
import argparse
from datetime import datetime
from datetime import timezone
from database.model import Path
from database.model import File
from database.model import HunterType
from hunters.modules.core import BaseSensitiveFileHunter
logger = logging.getLogger('nfs')
class LocalSensitiveFileHunter(BaseSensitiveFileHunter):
"""
This class implements the core functionality to hunt for files on local file system
"""
def __init__(self, args: argparse.Namespace, **kwargs):
super().__init__(args, address="127.0.0.1", service_name=HunterType.local, **kwargs)
self.path = [os.path.abspath(item) for item in args.path]
@staticmethod
def add_argparse_arguments(parser: argparse.ArgumentParser) -> None:
"""
This method initializes command line arguments that are required by the current module.
:param parser: The argument parser to which the required command line arguments shall be added.
:return:
"""
BaseSensitiveFileHunter.add_argparse_arguments(parser)
parser.add_argument('path', nargs="+", help='directories to enumerate')
parser.add_argument('--domains', type=str, nargs="*", metavar="USERDOMAIN",
help='the name of the domain name of existing microsoft active directories. if specified, '
'then the specified values become additional file content matching rules with'
'search pattern: "USERDOMAIN[/\\]\\w+". the objective is the identification domain '
'user names in files.')
def _enumerate(self) -> None:
"""
This method enumerates all files on the given service.
:return:
"""
for path in self.path:
path = path if path[-1] == "/" else path + "/"
for item in glob.iglob(path + "**", recursive=True):
stats = os.stat(item)
if os.path.isfile(item):
path = Path(service=self.service,
full_path=item,
access_time=datetime.fromtimestamp(stats.st_atime, tz=timezone.utc),
modified_time=datetime.fromtimestamp(stats.st_mtime, tz=timezone.utc),
creation_time=datetime.fromtimestamp(stats.st_ctime, tz=timezone.utc))
if self.is_file_size_below_threshold(path, stats.st_size):
try:
with open(item, "rb") as file:
content = file.read()
path.file = File(content=content)
# Add file to queue
self.file_queue.put(path)
except PermissionError:
# Catch permission exception, if FTP user does not have read permission on a certain file
logger.error("cannot read file: {}".format(str(path)), exc_info=self._args.verbose)
elif stats.st_size > 0:
path.file = File(content="[file ({}) not imported as file size ({}) "
"is above threshold]".format(str(path), stats.st_size).encode('utf-8'))
path.file.size_bytes = stats.st_size
relevance = self._analyze_path_name(path)
if self._args.debug and not relevance:
logger.debug("ignoring file (threshold: above, size: {}): {}".format(stats.st_size,
str(path)))