-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel.py
345 lines (294 loc) · 14.9 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import paramiko
from paramiko.ssh_exception import *
import logging
import sys
import socket
from ipaddress import IPv4Network
import re
import subprocess
import tempfile
import os
import pickle
import util
logger = logging.getLogger("recon.model")
# paramiko log level can be set to DEBUG for more detailed logging, otherwise it'll follow the root logger level
# logging.getLogger("paramiko").setLevel(logging.DEBUG)
strList = list[str]
class ReCon:
"""
The ReCon class represents a tool for remote reconnaissance and management of hosts.
Attributes:
HOST_DATA_PATH (str): The file path to store the host data.
Methods:
__init__(): Initializes a new instance of the ReCon class.
restore_host_data(): Restores the host data from the file.
save_host_data(): Saves the host data to the file.
bind(event, handler): Binds an event to a handler function.
call_handler(handler, **kwargs): Calls the specified handler function with the given arguments.
start(): Starts the ReCon tool.
connect(host, username, password): Connects to a host using SSH.
activity(msg): Logs an activity message.
fill_prompt(): Fills the prompt with initial data from the SSH channel.
execute_command(command): Executes a command on the connected host.
clear_consoles(): Clears the list of consoles.
enumerate_consoles(): Enumerates the available consoles on the host.
clear_local_networks(): Clears the list of local networks.
enumerate_local_networks(): Enumerates the available local networks on the host.
clear_nodes(): Clears the list of nodes.
enumerate_nodes(): Enumerates the available nodes on the host.
toggle_https_tunnel(): Toggles the HTTPS tunnel for the nodes.
deploy_key(): Deploys the RSA key to the host.
spawn_console(console): Spawns a console for the specified console name.
spawn_shell(): Spawns a shell console.
stop(): Stops the ReCon tool and closes connections.
setup(host, username, password): Sets up the ReCon tool for the specified host.
"""
HOST_DATA_PATH = util.get_dyn_path("user/hosts.dat")
class HostInfo:
"""
Represents information about a host.
This class instances will be added to host_pool and saved to the disk.
Attributes:
address (str): The address of the host.
username (str): The username associated with the host.
active (bool): Indicates if the host is active.
consoles (list): A list of consoles associated with the host.
networks (list): A list of networks associated with the host.
nodes (list): A list of nodes associated with the host.
"""
def __init__(self, address: str, username: str):
self.address: str = address
self.username: str = username
self.active: bool = True
self.consoles: strList = []
self.networks: strList = []
self.nodes: strList = []
def __init__(self):
# For some of SSH connections we'll use paramiko and for some we'll use subprocess and ssh util.
self._ssh_client = paramiko.SSHClient()
self._sftp_client = None
self._ssh_tunnel_proc = None
# After connecting to a host using credentials, we'll generate a key and deploy it to the host for the further operations.
self._key_file: str = None
# event handlers are stored in a dictionary with event names as keys and handler functions as values
self._event_handlers: dict[str, callable] = {}
# we'll save each host info to the disk and restore it when the tool is started again.
# this way, recent host connections will be loaded automatically.
# A host is added to the host_pool when it is connected successfully.
self.host_pool: dict[str,self.HostInfo] = {}
# as there may be muliple hosts in the pool, we'll keep track of the current host.
self.current_host = None
# try to restore host data from the disk if exists
self._restore_host_data()
# prompt is to ensure that we're connected to the right host
self._prompt = ""
def _restore_host_data(self):
"""Restore the host data from the pickled file."""
if not os.path.isfile(self.HOST_DATA_PATH):
return
with open(self.HOST_DATA_PATH, 'rb') as file:
# Deserialize and retrieve the variable from the file
self.host_pool = pickle.load(file)
def _save_host_data(self):
"""Save the host data to the pickle file."""
os.makedirs(os.path.dirname(self.HOST_DATA_PATH), exist_ok=True)
# Open the file in binary mode
with open(self.HOST_DATA_PATH, 'wb') as file:
# Serialize and write the variable to the file
pickle.dump(self.host_pool, file)
def _call_handler(self, handler, **kwargs):
"""Call the specified handler function with the given arguments."""
if handler in self._event_handlers:
self._event_handlers[handler](**kwargs)
def _activity(self, msg):
"""Log an activity message and call the activity handler."""
logger.info(msg)
self._call_handler("activity", msg=msg)
def _fill_prompt(self):
# open a session on the SSH channel
channel = self._ssh_client.get_transport().open_session()
channel.settimeout(2)
prompt = ""
# read the initial data if available. This will be prompt
try:
prompt = channel.recv(1024).decode('utf-8')
except socket.timeout:
pass
channel.close()
self._prompt = prompt
def _deploy_key(self):
"""
Generate and deploy a RSA key to the host. The public part of the key will be
saved to the remote host authorized_keys file.
"""
# generate a RSA key
self.rsa_key = rsa_key = paramiko.RSAKey.generate(bits=2048)
key_name= rsa_key.get_name()
# get the base64 encoded public key
key = rsa_key.get_base64()
# create a temporary file and dump the private key to it
key_fd, self._key_file = tempfile.mkstemp()
os.close(key_fd)
rsa_key.write_private_key_file(self._key_file)
# deploy the public key to the remote host
command = f'echo {key_name} {key} > C:\\Users\\{self.current_host.username}\\.ssh\\authorized_keys'
out, err, code = self.execute_command(command)
def bind(self, event, handler):
"""Bind an event to a handler function."""
self._event_handlers[event] = handler
def start(self):
# this doesn't do anything yet, but it's a good practice to have a start method
self._call_handler("initialized")
def _connect(self, host, username, password):
"""Connect to a host using SSH."""
self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
self._activity(f"Connecting to {host}...")
self._ssh_client.connect(host, 22, username, password,
timeout=3, look_for_keys=False)
# Create a new HostInfo object and add it to the host pool or update the existing one
self.host_pool.setdefault(host, self.HostInfo(address=host, username=username))
self.current_host = self.host_pool[host]
self._save_host_data()
self._activity(f"Checking prompt...")
self._fill_prompt()
self._activity(f"Deploying key...")
self._deploy_key()
self._call_handler("host_establishment", is_ok=True)
return True, None
except (AuthenticationException, TimeoutError, socket.error) as e:
# Return a tuple indicating unsuccessful connection (False) and the raised exception
return False, e
except Exception as e:
type, msg, traceback = sys.exc_info()
logger.debug(f"{type} occured @ {traceback.tb_next.tb_frame}. {msg}")
return False, e
def execute_command(self, command):
"""Run the command on the connected host and return the output, error, and exit code."""
# SSH command execution code
logger.info(f"Sending command via SSH: {command}")
stdin, stdout, stderr = self._ssh_client.exec_command(command)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
exit_code = stdout.channel.recv_exit_status()
logger.info(f"Response: {output}")
return output, error, exit_code
def clear_consoles(self):
"""Clear console list"""
self.current_host.consoles.clear()
def enumerate_consoles(self):
"""Enumerate the available serial consoles on the host."""
if not self.current_host.consoles:
self._activity(f"Enumerating consoles...")
output, _, _ = self.execute_command("wmic path Win32_SerialPort get Caption")
self.current_host.consoles.extend(com.strip() for com in output.splitlines()[1:] if com)
self._save_host_data()
self._call_handler("consoles_loaded", consoles=self.current_host.consoles)
def clear_local_networks(self):
"""Clear local network list"""
self.current_host.networks.clear()
def enumerate_local_networks(self):
"""Enumerate the available local networks on the host."""
if not self.current_host.networks:
self._activity(f"Enumerating local networks...")
output, _, _ = self.execute_command("ipconfig")
pattern = r"IPv4 Address\D+(\d+\.\d+\.\d+\.\d+)\r\s+Subnet Mask\D+(\d+\.\d+\.\d+\.\d+)"
matches = re.findall(pattern, output)
for ip,subnet_mask in matches:
# Filter out loopback addresses and return the IP addresses of up interfaces
ipn = IPv4Network(f"{ip}/{subnet_mask}", False)
if ipn.is_private and not(ipn.is_loopback):
self.current_host.networks.append(ipn)
self._save_host_data()
self._call_handler("local_networks_loaded", networks=[str(network) for network in self.current_host.networks])
def clear_nodes(self):
"""Clear node list"""
self.current_host.nodes.clear()
def enumerate_nodes(self):
"""Enumerate the available IP reachable nodes on the host."""
if not self.current_host.nodes:
for host in self.current_host.networks[0].hosts():
host = str(host)
self._activity(f"Querying... {host}")
output, err, exit_code = self.execute_command(f"ping -n 1 -w 25 {host}")
if exit_code == 0:
self._call_handler("node_found", node=host)
self.current_host.nodes.append(host)
self._save_host_data()
else:
for host in self.current_host.nodes:
self._call_handler("node_found", node=host)
self._call_handler("nodes_loaded")
def toggle_https_tunnel(self):
"""Toggle the HTTPS tunnel for the nodes."""
if self._ssh_tunnel_proc is None:
port_mapping = {}
self._activity(f"Establishing tunnel...")
# use deployed key to establish a tunnel. -N flag is used to not execute a remote command
args = ["ssh","-N","-i",self._key_file,f"{self.current_host.username}@{self.current_host.address}"]
# for each node, find a local port and map it to the node's 443 port.
# add -L localport:node:443 to the args list
for node in self.current_host.nodes:
args.append("-L")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.bind(("127.0.0.1", 0))
_, local_port = server_socket.getsockname()
args.append(f"{local_port}:{node}:443")
port_mapping[node] = local_port
# execute ssh subprocess for tunneling
self._ssh_tunnel_proc = subprocess.Popen(args)
self._call_handler("tunnel_established", port_mapping=port_mapping)
else:
self._activity(f"Closing tunnel...")
self._ssh_tunnel_proc.terminate()
self._ssh_tunnel_proc = None
self._call_handler("tunnel_closed")
def spawn_console(self, console):
"""Spawn a console for the specified console name like 'USB to UART Bridge (COM6)'"""
# grab the COMx part from the console name
match = re.search(r"(COM\d+)", console)
title = f"[ReCon]sole serial {match.group()} on {self.current_host.address}"
# this is the plink command that connects to the serial port
powershell_post_cmd = f"plink -serial {match.group()} -sercfg 115200 8,n,1,X"
fcolor = "darkcyan"
args = [
# local
"ssh",
"-o StrictHostKeyChecking=no", "-t", "-i" , self._key_file, f"{self.current_host.username}@{self.current_host.address}",
# remote
# by using powershell as remote command we can set the window title and color before running the actual serial console utility
f'powershell -Command "$Host.UI.RawUI.WindowTitle = \'{title}\'; $Host.UI.RawUI.ForegroundColor = \'{fcolor}\'; {powershell_post_cmd}"'
]
subprocess.Popen(args, creationflags=subprocess.CREATE_NEW_CONSOLE)
def spawn_shell(self):
title = f"[ReCon]sole powershell on {self.current_host.address}"
fcolor = "green"
args = [
# local
"ssh",
"-o StrictHostKeyChecking=no", "-t", "-i" , self._key_file, f"{self.current_host.username}@{self.current_host.address}",
# remote
# by using powershell as remote command we can set the window title and color before running the shell
# by providing -NoExit, powershell will not exit and wait for user input
f'powershell -NoExit -Command "$Host.UI.RawUI.WindowTitle = \'{title}\'; $Host.UI.RawUI.ForegroundColor = \'{fcolor}\';"'
]
subprocess.Popen(args, creationflags=subprocess.CREATE_NEW_CONSOLE)
def stop(self):
if self._ssh_client:
self._ssh_client.close()
if self._ssh_tunnel_proc:
self._ssh_tunnel_proc.terminate()
if self._key_file:
os.remove(self._key_file)
# kill spawned ssh and serial consoles before exiting
subprocess.run(['taskkill', '/f', '/fi', "WINDOWTITLE eq ReConSole*"])
def setup(self, host, username, password):
connected, err = self._connect(host, username, password)
if not connected:
self._call_handler("host_establishment", is_ok=False, error = err)
return
self.enumerate_consoles()
self.enumerate_local_networks()
if self.current_host.nodes:
self.enumerate_nodes()
return