Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented chat message queue dumping on exit #3899

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion chatcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
tell_rooms, tell_rooms_with, get_message
# noinspection PyUnresolvedReferences
from globalvars import GlobalVars
exit_mode = GlobalVars.exit_mode
import findspam
# noinspection PyUnresolvedReferences
from datetime import datetime
Expand All @@ -26,7 +27,7 @@
from ast import literal_eval
# noinspection PyCompatibility
import regex
from helpers import exit_mode, only_blacklists_changed, only_modules_changed, log, expand_shorthand_link, \
from helpers import only_blacklists_changed, only_modules_changed, log, expand_shorthand_link, \
reload_modules, chunk_list
from classes import Post
from classes.feedback import *
Expand Down
68 changes: 66 additions & 2 deletions chatcommunicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,39 @@ class CmdException(Exception):
_pickle_run = threading.Event()


# Ignore flake8 warning as this class is intended to be private.
class _cleanup: # noqa: N801
""" Utilities for cleaning up """
_counter = 0
_cv = threading.Condition(lock=threading.Lock())
terminate = threading.Event()
cleanup_finished = threading.Event()

@staticmethod
def t_started():
""" Indicate a new thread is started. """
with _cleanup._cv:
_cleanup._counter += 1

@staticmethod
def t_stopped():
""" Indicate a thread terminated. """
with _cleanup._cv:
if _cleanup._counter > 0:
_cleanup._counter -= 1
else:
raise ValueError("More threads terminated than started.")
_cleanup._cv.notify()

@staticmethod
def wait_t_terminate():
_cleanup.terminate.set()
with _cleanup._cv:
while _cleanup._counter != 0:
_cleanup._cv.wait()
return


def init(username, password, try_cookies=True):
global _clients
global _rooms
Expand Down Expand Up @@ -119,13 +152,35 @@ def init(username, password, try_cookies=True):
except EOFError:
pass

if os.path.isfile("messageQueue.p"):
# Restore previous session, if exists
with open("messageQueue.p", "rb") as queue_file:
try:
_msg_queue = pickle.load(queue_file)
except EOFError:
pass

# Tell GlobalVars that this process needs cleaning up.
with GlobalVars.need_cleanup_list_lock:
GlobalVars.need_cleanup.append(_cleanup.cleanup_finished)

threading.Thread(name="pickle ---rick--- runner", target=pickle_last_messages, daemon=True).start()
threading.Thread(name="message sender", target=send_messages, daemon=True).start()
threading.Thread(name="exit watcher", target=exit_watcher, daemon=True).start()

if try_cookies:
datahandling.dump_cookies()


def exit_watcher():
""" Watch for the program termination event and dump current session at exit. """
GlobalVars.terminate.wait()
_cleanup.wait_t_terminate()
with open("messageQueue.p", "wb") as pickle_file:
pickle.dump(_msg_queue, pickle_file)
_cleanup.cleanup_finished.set()


def join_command_rooms():
for site, roomid in _command_rooms:
room = _clients[site].get_room(roomid)
Expand Down Expand Up @@ -199,16 +254,20 @@ def add_room(room, roles):


def pickle_last_messages():
while True:
_cleanup.t_started()
while not _cleanup.terminate.isSet():
_pickle_run.wait()
_pickle_run.clear()

with open("messageData.p", "wb") as pickle_file:
pickle.dump(_last_messages, pickle_file)

_cleanup.t_stopped()


def send_messages():
while True:
_cleanup.t_started()
while not _cleanup.terminate.isSet():
room, msg, report_data = _msg_queue.get()
if len(msg) > 500 and "\n" not in msg:
log('warn', 'Discarded the following message because it was over 500 characters')
Expand Down Expand Up @@ -255,6 +314,8 @@ def send_messages():

_msg_queue.task_done()

_cleanup.t_stopped()


def on_msg(msg, client):
global _room_roles
Expand Down Expand Up @@ -323,6 +384,9 @@ def tell_rooms_without(prop, msg, notify_site="", report_data=None):
def tell_rooms(msg, has, hasnt, notify_site="", report_data=None):
global _rooms

if _cleanup.terminate.isSet():
return # To prevent writing to _msg_queue while dumping

msg = msg.rstrip()
target_rooms = set()

Expand Down
3 changes: 2 additions & 1 deletion excepthook.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
# noinspection PyPackageRequirements
from websocket import WebSocketConnectionClosedException
import requests
from helpers import exit_mode, log, log_exception
from helpers import log, log_exception
from globalvars import GlobalVars
exit_mode = GlobalVars.exit_mode


# noinspection PyProtectedMember
Expand Down
19 changes: 19 additions & 0 deletions globalvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
else:
from sh.contrib import git

from helpers import exit_mode


CommitInfo = namedtuple('CommitInfo', ['id', 'id_full', 'author', 'message'])

Expand Down Expand Up @@ -180,6 +182,23 @@ class GlobalVars:

valid_content = """This is a totally valid post that should never be caught. Any blacklist or watchlist item that triggers on this item should be avoided. java.io.BbbCccDddException: nothing wrong found. class Safe { perfect valid code(int float &#%$*v a b c =+ /* - 0 1 2 3 456789.EFGQ} English 中文Français Español Português Italiano Deustch ~@#%*-_/'()?!:;" vvv kkk www sss ttt mmm absolute std::adjacent_find (power).each do |s| bbb end ert zal l gsopsq kdowhs@ xjwk* %_sooqmzb xjwpqpxnf. Please don't blacklist disk-partition.com, it's a valid domain (though it also gets spammed rather frequently).""" # noqa: E501

# Any thread that need cleaning up before the program exits shall add an threading.Event()
# object to the following queue, and prior to all modification the lock must be acquired.
need_cleanup = []
need_cleanup_list_lock = threading.Lock()
# This event is set when the program is about to exit. Those need clean up shall be actively
# monitoring this event.
terminate = threading.Event()

@staticmethod
def exit_mode(*args, code=0):
GlobalVars.terminate.set()
with GlobalVars.need_cleanup_list_lock:
for item in GlobalVars.need_cleanup:
# Wait for every thread needing clean up to finish.
item.wait()
exit_mode(args, GlobalVars.standby_mode, code)

@staticmethod
def reload():
GlobalVars.commit = commit = git_commit_info()
Expand Down
5 changes: 2 additions & 3 deletions helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
import sqlite3


def exit_mode(*args, code=0):
def exit_mode(*args, standby_mode, code=0):
args = set(args)

if not (args & {'standby', 'no_standby'}):
from globalvars import GlobalVars
standby = 'standby' if GlobalVars.standby_mode else 'no_standby'
standby = 'standby' if standby_mode else 'no_standby'
args.add(standby)

with open("exit.txt", "w", encoding="utf-8") as f:
Expand Down
3 changes: 2 additions & 1 deletion metasmoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import requests
import importlib # for .reload()
from globalvars import GlobalVars
exit_mode = GlobalVars.exit_mode
import threading
# noinspection PyPackageRequirements
import websocket
Expand All @@ -24,7 +25,7 @@
import spamhandling
import classes
import chatcommunicate
from helpers import log, exit_mode, only_blacklists_changed, \
from helpers import log, only_blacklists_changed, \
only_modules_changed, blacklist_integrity_check, reload_modules
from gitmanager import GitManager
import findspam
Expand Down
4 changes: 2 additions & 2 deletions test/test_chatcommunicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def test_init(room_config, client_constructor, thread):
client_constructor.assert_any_call("stackoverflow.com")
client_constructor.assert_any_call("meta.stackexchange.com")

assert thread.call_count == 2
assert thread.call_count == 3
thread.assert_any_call(name="pickle ---rick--- runner", target=chatcommunicate.pickle_last_messages, daemon=True)
thread.assert_any_call(name="message sender", target=chatcommunicate.send_messages, daemon=True)

Expand Down Expand Up @@ -145,7 +145,7 @@ def throw_every_other(*_):
client_constructor.assert_any_call("stackoverflow.com")
client_constructor.assert_any_call("meta.stackexchange.com")

assert thread.call_count == 2
assert thread.call_count == 3
thread.assert_any_call(name="pickle ---rick--- runner", target=chatcommunicate.pickle_last_messages, daemon=True)
thread.assert_any_call(name="message sender", target=chatcommunicate.send_messages, daemon=True)

Expand Down
3 changes: 2 additions & 1 deletion ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from datetime import datetime
from spamhandling import check_if_spam_json
from globalvars import GlobalVars
exit_mode = GlobalVars.exit_mode
from datahandling import _load_pickle, PICKLE_STORAGE, load_files, filter_auto_ignored_posts
from metasmoke import Metasmoke
from metasmoke_cache import MetasmokeCache
Expand All @@ -33,7 +34,7 @@
import requests
# noinspection PyPackageRequirements
from tld.utils import update_tld_names, TldIOError
from helpers import exit_mode, log, Helpers, log_exception
from helpers import log, Helpers, log_exception
from flovis import Flovis
from tasks import Tasks

Expand Down