-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain2.py
363 lines (312 loc) · 13.6 KB
/
main2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import os
import subprocess
import logging
import json
import tempfile
import pyttsx3
from PyQt5.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QTextEdit, QProgressBar, QApplication
)
from PyQt5.QtCore import Qt, QTimer, QThread, pyqtSignal
from PyQt5.QtGui import QFont, QColor, QPalette, QTextCursor
from llms.gemini import GeminiService
import numpy as np
from llms.groq import GroqService
from config import GEMINI_API_KEY, GROQ_API_KEY
from voice_recognition_thread import VoiceRecognitionThread
from rag_service import RAGService
from create_embeddings import EmbeddingProvider
import google.generativeai as genai
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
class TTSThread(QThread):
finished = pyqtSignal() # Add finished signal
def __init__(self, text):
super().__init__()
self.text = text
def run(self):
try:
engine = pyttsx3.init()
engine.say(self.text)
engine.runAndWait()
engine.stop()
finally:
self.finished.emit() # Emit finished signal when done
class VoiceAssistant(QMainWindow):
def __init__(self, llm_service='gemini', embeddings_dir='embeddings'):
super().__init__()
# Initialize status states
self.status_states = {
'IDLE': "Idle",
'LISTENING': "Listening for voice input...",
'PROCESSING': "Processing command...",
'GENERATING': "Generating script...",
'VALIDATING': "Validating script...",
'EXECUTING': "Executing command...",
'INTERPRETING': "Interpreting results...",
'SPEAKING': "Speaking response...",
'ERROR': "Error occurred",
'INITIALIZING': "Initializing microphone..."
}
self.current_status = 'IDLE'
# Set up the main window
self.setWindowTitle("Smart Voice Assistant")
self.setGeometry(100, 100, 800, 600)
self.setup_ui()
# Initialize voice recognition thread
self.voice_thread = VoiceRecognitionThread()
self.voice_thread.status_update.connect(self.update_status)
self.voice_thread.command_received.connect(self.process_command)
self.voice_thread.listening_stopped.connect(self.on_listening_stopped)
# Initialize LLM service
if llm_service == 'gemini':
self.llm = GeminiService(GEMINI_API_KEY)
elif llm_service == 'groq':
self.llm = GroqService(GROQ_API_KEY)
else:
raise ValueError(f"Unsupported LLM service: {llm_service}")
self.llm.initialize()
# Initialize RAG service
try:
self.rag = RAGService(embeddings_dir)
self.rag.load_index()
except FileNotFoundError:
logging.error(
"Embeddings not found. Please run create_embeddings.py first with your context file."
)
raise
self.embedding_provider = EmbeddingProvider(llm_service)
# Set up progress bar timer
self.timer = QTimer(self)
self.timer.timeout.connect(self.update_progress)
# Initialize allowed commands
self.initialize_allowed_commands()
# Initialize temporary speech file
self.speech_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3').name
logging.info("Voice Assistant initialized")
def setup_ui(self):
"""Set up the user interface"""
self.setStyleSheet("""
QMainWindow {
background-color: #2C3E50;
}
QLabel {
color: #ECF0F1;
font-size: 16px;
}
QPushButton {
background-color: #3498DB;
color: white;
border: none;
padding: 10px;
font-size: 16px;
border-radius: 5px;
}
QPushButton:hover {
background-color: #2980B9;
}
QProgressBar {
border: 2px solid #3498DB;
border-radius: 5px;
text-align: center;
}
QProgressBar::chunk {
background-color: #3498DB;
}
""")
# Create central widget and layout
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# Create header layout
header_layout = QHBoxLayout()
# Create and set up status label
self.status_label = QLabel(self.status_states['IDLE'])
self.status_label.setAlignment(Qt.AlignCenter)
header_layout.addWidget(self.status_label)
# Create and set up listen button
self.listen_button = QPushButton("Start Listening")
self.listen_button.clicked.connect(self.start_listening)
header_layout.addWidget(self.listen_button)
main_layout.addLayout(header_layout)
# Create and set up progress bar
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
self.progress_bar.setTextVisible(False)
main_layout.addWidget(self.progress_bar)
# Create and set up terminal
self.terminal = QTextEdit()
self.terminal.setReadOnly(True)
self.terminal.setStyleSheet("""
QTextEdit {
background-color: #000000;
color: #FFFFFF;
border: none;
font-family: 'DejaVu Sans Mono', monospace;
font-size: 16px;
padding: 10px;
}
""")
main_layout.addWidget(self.terminal)
# Initialize terminal prompt
self.terminal.append("$ ")
self.terminal.moveCursor(QTextCursor.End)
def initialize_allowed_commands(self):
"""Initialize the set of allowed commands"""
self.allowed_commands = set([
'ls', 'echo', 'cat', 'grep', 'awk', 'sed', 'cut', 'sort', 'uniq',
'wc', 'head', 'tail', 'find', 'date', 'pwd', 'whoami', 'uname', 'mkdir',
'rmdir', 'touch', 'cp', 'mv', 'less', 'nano', 'vim', 'more', 'diff',
'tar', 'gzip', 'gunzip', 'zip', 'unzip', 'ping', 'curl', 'wget', 'apt',
'dpkg', 'python', 'python3', 'g++', 'gcc', 'node', 'javac', 'java',
'ruby', 'df', 'du', 'free', 'top', 'htop', 'uptime', 'ps', 'id',
'hostname', 'cal', 'man', 'bc', 'time', 'xargs', 'tr', 'chmod', 'chown',
'tee', 'split', 'dmesg', 'iostat', 'vmstat', 'sar', 'lsof', 'who',
'last', 'mount', 'umount', 'blkid', 'fdisk', 'mkfs', 'ifconfig', 'netstat',
'ss', 'iptables', 'traceroute', 'nmap', 'which', 'locate', 'bc',
'alias', 'unalias', 'factor', 'yes', 'shutdown', 'reboot', 'kill', 'killall',
'history', 'clear', 'exit', 'logout', 'su', 'sudo', 'passwd', 'useradd',
])
def start_listening(self):
"""Start the voice recognition process"""
self.progress_bar.setValue(0)
self.timer.start(40)
self.voice_thread.start()
self.update_status(self.status_states['INITIALIZING'])
self.listen_button.setEnabled(False)
self.listen_button.setText("Listening...")
def on_listening_stopped(self):
"""Handle cleanup when listening stops"""
self.timer.stop()
self.progress_bar.setValue(0)
self.listen_button.setEnabled(True)
self.listen_button.setText("Start Listening")
if self.current_status == self.status_states['LISTENING']:
self.update_status(self.status_states['IDLE'])
def update_progress(self):
"""Update the progress bar"""
value = self.progress_bar.value() + 1
if value > 100:
self.timer.stop()
self.progress_bar.setValue(0)
self.listen_button.setEnabled(True)
self.listen_button.setText("Start Listening")
else:
self.progress_bar.setValue(value)
def update_status(self, status):
"""Update status in UI and terminal"""
self.current_status = status
self.status_label.setText(status)
self.terminal_print(f"Status: {status}")
QApplication.processEvents() # Force UI update
def terminal_print(self, text):
"""Print text to the terminal widget"""
self.terminal.moveCursor(QTextCursor.End)
self.terminal.insertPlainText(text + "\n")
self.terminal.moveCursor(QTextCursor.End)
self.terminal.ensureCursorVisible()
self.terminal.append("$ ")
self.terminal.moveCursor(QTextCursor.End)
def process_command(self, command):
"""Process the received voice command"""
logging.info(f"Processing command: {command}")
self.update_status(self.status_states['PROCESSING'])
self.terminal_print(f"Received command: {command}")
try:
# Get context from RAG model
self.update_status(self.status_states['GENERATING'])
context = self.rag.get_relevant_context(command)
# Generate bash script
bash_script, description = self.llm.generate_bash_script(command, context)
logging.debug(f"Generated bash script: {bash_script}")
logging.debug(f"Description: {description}")
self.terminal_print(f"Description: {description}")
# Validate script
self.update_status(self.status_states['VALIDATING'])
self.terminal_print("Validating script...")
if self.validate_script(bash_script):
# Execute in sandbox
self.update_status(self.status_states['EXECUTING'])
self.terminal_print("Executing command in sandbox...")
command_output = self.execute_in_sandbox(bash_script)
self.terminal_print(f"Raw output: {command_output}")
# Interpret output
self.update_status(self.status_states['INTERPRETING'])
interpreted_response = self.llm.interpret_output(command, command_output)
self.terminal_print(f"Interpreted response: {interpreted_response}")
# Speak response
self.update_status(self.status_states['SPEAKING'])
self.speak_text(interpreted_response)
else:
self.update_status(self.status_states['ERROR'])
error_message = "Script execution aborted due to security concerns."
self.terminal_print(error_message)
self.speak_text(error_message)
except Exception as e:
self.update_status(self.status_states['ERROR'])
error_message = f"Error executing command: {str(e)}"
self.terminal_print(error_message)
self.speak_text(error_message)
logging.error(error_message)
def validate_script(self, script_content):
"""Validate the generated bash script"""
# Step 1: Check against whitelist
for line in script_content.split('\n'):
command = line.strip().split()[0] if line.strip() else ''
if command and command not in self.allowed_commands:
self.terminal_print(f"Validation failed: '{command}' is not in the whitelist of allowed commands.")
return False
# Step 2: Use shellcheck for static analysis
with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as temp_file:
temp_file.write(script_content)
temp_file_path = temp_file.name
try:
result = subprocess.run(['shellcheck', '-s', 'bash', temp_file_path],
capture_output=True, text=True)
if result.returncode != 0:
self.terminal_print("ShellCheck found issues in the script:")
self.terminal_print(result.stdout)
return False
else:
self.terminal_print("ShellCheck validation passed.")
return True
finally:
os.unlink(temp_file_path)
def execute_in_sandbox(self, script_content):
"""Execute the bash script in a sandboxed environment"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as temp_file:
temp_file.write(script_content)
temp_file_path = temp_file.name
try:
result = subprocess.run([
'firejail',
'--noprofile',
'--quiet',
'--private',
'--noroot',
'--net=none',
'--nosound',
'--no3d',
'bash',
temp_file_path
], capture_output=True, text=True)
return result.stdout if result.returncode == 0 else result.stderr
finally:
os.unlink(temp_file_path)
def speak_text(self, text):
"""Speak the given text using text-to-speech"""
self.tts_thread = TTSThread(text)
self.tts_thread.finished.connect(lambda: self.update_status(self.status_states['IDLE']))
self.tts_thread.start()
def closeEvent(self, event):
"""Handle application closing"""
if hasattr(self, 'tts_thread') and self.tts_thread.isRunning():
self.tts_thread.wait()
event.accept()
if __name__ == "__main__":
app = QApplication([])
window = VoiceAssistant()
window.show()
app.exec_()