Skip to content

Commit

Permalink
Merge pull request #94 from F33RNI/next
Browse files Browse the repository at this point in the history
Next
  • Loading branch information
F33RNI authored Nov 30, 2023
2 parents 6ec6dcd + 023aa33 commit 04358fb
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 18 deletions.
38 changes: 27 additions & 11 deletions BotHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import QueueHandler
import RequestResponseContainer
import UsersHandler
from JSONReaderWriter import load_json
from main import __version__

# User commands
Expand Down Expand Up @@ -216,7 +217,8 @@ async def send_message_async(config: dict, messages: List[Dict],
# Collect media group
media_group = []
for url in request_response.response:
media_group.append(InputMediaPhoto(media=url))
if not url.lower().endswith(".svg"):
media_group.append(InputMediaPhoto(media=url))

# Send it
media_group_message_id = (await (telegram.Bot(config["telegram"]["api_key"]).sendMediaGroup(
Expand Down Expand Up @@ -532,13 +534,14 @@ def clear_conversation_process(logging_queue: multiprocessing.Queue, str_or_exce


class BotHandler:
def __init__(self, config: dict, messages: List[Dict],
def __init__(self, config: dict, config_file: str, messages: List[Dict],
users_handler: UsersHandler.UsersHandler,
queue_handler: QueueHandler.QueueHandler,
proxy_automation: ProxyAutomation.ProxyAutomation,
logging_queue: multiprocessing.Queue,
chatgpt_module, bard_module, edgegpt_module):
self.config = config
self.config_file = config_file
self.messages = messages
self.users_handler = users_handler
self.queue_handler = queue_handler
Expand Down Expand Up @@ -619,7 +622,7 @@ def start_bot(self):
# Bot error?
except Exception as e:
if "Event loop is closed" in str(e):
if not self._restart_requested_flag:
if not self._restart_requested_flag and not self.queue_handler.prevent_shutdown_flag:
logging.warning("Stopping telegram bot")
break
else:
Expand Down Expand Up @@ -1009,33 +1012,46 @@ async def bot_command_restart(self, update: Update, context: ContextTypes.DEFAUL
logging.info("Stopping ProxyAutomation")
self.proxy_automation.stop_automation_loop()

# Reload config
logging.info("Reloading config from {} file".format(self.config_file))
config_new = load_json(self.config_file)
for key, value in config_new.items():
self.config[key] = value

# Make sure queue is empty
if self.queue_handler.request_response_queue.qsize() > 0:
logging.info("Waiting for all requests to finish")
while self.queue_handler.request_response_queue.qsize() > 0:
# Cancel all active containers
with self.queue_handler.lock:
queue_list = QueueHandler.queue_to_list(self.queue_handler.request_response_queue)
for container in queue_list:
container.processing_state = RequestResponseContainer.PROCESSING_STATE_CANCEL

# Check every 100ms
time.sleep(0.1)
# Cancel all active containers (clear the queue)
self.queue_handler.lock.acquire(block=True)
queue_list = QueueHandler.queue_to_list(self.queue_handler.request_response_queue)
for container in queue_list:
if container.processing_state != RequestResponseContainer.PROCESSING_STATE_ABORT:
container.processing_state = RequestResponseContainer.PROCESSING_STATE_ABORT
QueueHandler.put_container_to_queue(self.queue_handler.request_response_queue, None, container)
self.queue_handler.lock.release()

# Check every 1s
time.sleep(1)

# Start proxy automation
logging.info("Starting back ProxyAutomation")
self.proxy_automation.start_automation_loop()

# Restart telegram bot
self._restart_requested_flag = True
logging.info("Stopping event loop to restart Telegram bot")
self._event_loop.stop()
time.sleep(1)
try:
logging.info("Closing event loop to restart Telegram bot")
self._event_loop.close()
except:
pass

def send_message_after_restart():
# Sleep while restarting
logging.info("Waiting for _restart_requested_flag")
while self._restart_requested_flag:
time.sleep(1)

Expand Down
3 changes: 3 additions & 0 deletions LoggingHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def worker_configurer(queue: multiprocessing.Queue):
# Setup queue handler
queue_handler = logging.handlers.QueueHandler(queue)
root_logger = logging.getLogger()
if root_logger.handlers:
for handler in root_logger.handlers:
root_logger.removeHandler(handler)
root_logger.addHandler(queue_handler)
root_logger.setLevel(logging.INFO)

Expand Down
26 changes: 22 additions & 4 deletions QueueHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import RequestResponseContainer
import UsersHandler

# After how long (seconds) clear self.prevent_shutdown_flag
CLEAR_PREVENT_SHUTDOWN_FLAG_AFTER = 5.


def get_container_from_queue(request_response_queue: multiprocessing.Queue, lock: multiprocessing.Lock,
container_id: int) -> RequestResponseContainer.RequestResponseContainer | None:
Expand Down Expand Up @@ -415,6 +418,10 @@ def __init__(self, config: dict,
self.request_response_queue = multiprocessing.Queue(maxsize=-1)
self.lock = multiprocessing.Lock()

# Prevent bot shutdown in case of event loop close after process.kill()
self.prevent_shutdown_flag = False
self._prevent_shutdown_flag_clear_timer = 0

self._exit_flag = False
self._processing_loop_thread = None
self._log_filename = ""
Expand Down Expand Up @@ -447,6 +454,14 @@ def _queue_processing_loop(self) -> None:
self._exit_flag = False
while not self._exit_flag:
try:
# Clear prevent shutdown flag
if self._prevent_shutdown_flag_clear_timer > 0 and \
time.time() - self._prevent_shutdown_flag_clear_timer > CLEAR_PREVENT_SHUTDOWN_FLAG_AFTER and \
self.prevent_shutdown_flag:
logging.info("Clearing prevent_shutdown_flag")
self.prevent_shutdown_flag = False
self._prevent_shutdown_flag_clear_timer = 0

# Skip one cycle in queue is empty
if self.request_response_queue.qsize() == 0:
time.sleep(0.1)
Expand Down Expand Up @@ -562,13 +577,17 @@ def _queue_processing_loop(self) -> None:
# Update
put_container_to_queue(self.request_response_queue, None, request_)

# Done processing / Timed out -> log data and finally remove it
# Done processing / Timed out / abort requested -> log data and finally remove it
if request_.processing_state == RequestResponseContainer.PROCESSING_STATE_DONE \
or request_.processing_state == RequestResponseContainer.PROCESSING_STATE_TIMED_OUT:
or request_.processing_state == RequestResponseContainer.PROCESSING_STATE_TIMED_OUT \
or request_.processing_state == RequestResponseContainer.PROCESSING_STATE_ABORT:
# Kill process if it is active
if request_.pid > 0 and psutil.pid_exists(request_.pid):
logging.info("Trying to kill process with PID {}".format(request_.pid))
try:
logging.info("Setting prevent_shutdown_flag")
self.prevent_shutdown_flag = True
self._prevent_shutdown_flag_clear_timer = time.time()
process = psutil.Process(request_.pid)
process.terminate()
process.kill()
Expand Down Expand Up @@ -614,7 +633,6 @@ def _queue_processing_loop(self) -> None:
logging.info("Trying to kill process with PID {}".format(container.pid))
try:
process = psutil.Process(container.pid)
process.terminate()
process.kill()
process.wait(timeout=5)
except Exception as e:
Expand Down Expand Up @@ -700,7 +718,7 @@ def _collect_data(self, request_response: RequestResponseContainer, log_request=
if (request_response.request_type == RequestResponseContainer.REQUEST_TYPE_DALLE
or request_response.request_type == RequestResponseContainer.REQUEST_TYPE_BING_IMAGEGEN) \
and not request_response.error:
response_url = request_response.response if type(request_response.response) == str\
response_url = request_response.response if type(request_response.response) == str \
else request_response.response[0]
response = base64.b64encode(requests.get(response_url, timeout=120).content) \
.decode("utf-8")
Expand Down
3 changes: 2 additions & 1 deletion RequestResponseContainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
PROCESSING_STATE_DONE = 3
PROCESSING_STATE_TIMED_OUT = 4
PROCESSING_STATE_CANCEL = 5
PROCESSING_STATE_CANCELING = 5
PROCESSING_STATE_CANCELING = 6
PROCESSING_STATE_ABORT = 7

REQUEST_NAMES = ["ChatGPT", "DALL-E", "EdgeGPT", "Bard", "Bing ImageGen"]
PROCESSING_STATE_NAMES = ["Waiting", "Starting", "Active", "Done", "Timed out", "Canceling", "Canceling"]
Expand Down
4 changes: 2 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from JSONReaderWriter import load_json

# GPT-Telegramus version
__version__ = "3.6.1"
__version__ = "3.6.7"

# Logging level
LOGGING_LEVEL = logging.INFO
Expand Down Expand Up @@ -109,7 +109,7 @@ def main():
bing_image_gen_module)

# Initialize Telegram bot class
bot_handler = BotHandler.BotHandler(config, messages, user_handler, queue_handler, proxy_automation,
bot_handler = BotHandler.BotHandler(config, args.config, messages, user_handler, queue_handler, proxy_automation,
logging_handler.queue,
chatgpt_module, bard_module, edgegpt_module)

Expand Down

0 comments on commit 04358fb

Please sign in to comment.