Skip to content

Commit

Permalink
Forward IPC to unique client socket
Browse files Browse the repository at this point in the history
  • Loading branch information
killown committed Sep 8, 2024
1 parent c60c358 commit 96dec36
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 46 deletions.
66 changes: 64 additions & 2 deletions waypanel/src/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import math
import gi
import orjson as json
import numpy as np
from time import sleep
import subprocess
from gi.repository import Gtk, Adw, Gio, Gdk, GLib
from subprocess import check_output
import toml
import socket
import aiohttp
import asyncio
from aiohttp import ClientTimeout
Expand All @@ -15,6 +17,7 @@
from wayfire.extra.ipc_utils import WayfireUtils
from wayfire.extra.stipc import Stipc
from subprocess import call
from waypanel.src.ipc_server.ipc_client import WayfireClientIPC

gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
Expand All @@ -32,8 +35,11 @@ def __init__(self, **kwargs):
self.fd = None
self.watch_id = None
self.sock = WayfireSocket()
self.sock.watch()
GLib.io_add_watch(self.sock.client, GLib.IO_IN, self.on_event_ready)

self.ipc_client = WayfireClientIPC(self.handle_event)
# here is where the ipc events happen
self.ipc_client.wayfire_events_setup("/tmp/waypanel-utils.sock")

self.wf_utils = WayfireUtils(self.sock)
self.stipc = Stipc(self.sock)

Expand Down Expand Up @@ -68,6 +74,55 @@ def wrapper(*args, **kwargs):
return None
return wrapper

def connect_socket(self):
"""Establish a connection to the Unix socket."""
socket_path = "/tmp/waypanel-utils.sock"
self.client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.client_socket.connect(socket_path)

# Create a GLib IO Watcher
self.source = GLib.io_add_watch(self.client_socket, GLib.PRIORITY_DEFAULT, GLib.IO_IN, self.handle_socket_event)

def handle_socket_event(self, fd, condition):
"""Read from the socket and process events."""
chunk = fd.recv(1024).decode()
if not chunk:
return GLib.SOURCE_REMOVE # Remove source if no data is received

self.buffer += chunk

# Process the complete events in the buffer
while '\n' in self.buffer:
event_str, self.buffer = self.buffer.split('\n', 1)
if event_str:
try:
event = json.loads(event_str)
self.process_event(event)
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")

return GLib.SOURCE_CONTINUE # Continue receiving data

def process_event(self, event):
"""Process the event dictionary."""
print(f"Received event: {event}")
self.handle_event(event)

def disconnect_socket(self):
"""Clean up resources."""
if self.source:
self.source.remove() # Remove the source when done
if self.client_socket:
self.client_socket.close()
def wayfire_events_setup(self):
"""Initialize the Wayfire event listener within a GTK application."""
# Create a GTK application
app = Gtk.Application(application_id="com.example.GtkApplication")

# Define the path for the Unix socket
self.connect_socket()


def run_app(self, cmd, wclass=None, initial_title=None, cmd_mode=True):
if [c for c in self.terminal_emulators if cmd in c] and cmd_mode:
#**note-taking**
Expand Down Expand Up @@ -240,6 +295,13 @@ def take_note_app(self, *_):
# Run the note-taking application using the specified command
self.run_app(config["take_note_app"]["cmd"])

def reconnect_client(self, socket):
socket.close()
sock = WayfireSocket()
utils = WayfireUtils(sock)
stipc = Stipc(sock)
return sock, utils, stipc

def on_event_ready(self, fd, condition):
msg = self.sock.read_next_event()
if msg is None:
Expand Down
98 changes: 98 additions & 0 deletions waypanel/src/ipc_server/ipc-async-server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio
import os
import orjson as json
import time
from concurrent.futures import ThreadPoolExecutor
from wayfire import WayfireSocket
from wayfire.extra.ipc_utils import WayfireUtils


#why forward ipc for socket files, why not just one?
#because this is the most reliable way to provide ipc through GLib which in rare cases the code would break/hang
#this is the best choice until there is a more reliable way to provide IPC without breaking the GTK main loop with freezing issues.

socket_paths = ['/tmp/waypanel.sock', '/tmp/waypanel-dockbar.sock', '/tmp/waypanel-utils.sock']

for path in socket_paths:
if os.path.exists(path):
os.remove(path)

sock = WayfireSocket()
utils = WayfireUtils(sock)
sock.watch()

executor = ThreadPoolExecutor()

event_queue = asyncio.Queue()


def reconnect_wayfire_socket():
sock = WayfireSocket()
utils = WayfireUtils(sock)
sock.watch()

def is_socket_active():
if not sock.is_connected():
try:
reconnect_wayfire_socket()
except Exception as e:
print(f"retrying in 10 seconds: {e}")
time.sleep(10)
return False
return True

def read_events():
while True:
if not is_socket_active():
continue

try:
event = sock.read_next_event() # blocking call
asyncio.run_coroutine_threadsafe(event_queue.put(event), loop)
except Exception as e:
print(f"skiping the event: {e}")
continue

# handle events and send them to connected clients
async def handle_event(clients):
while True:
event = await event_queue.get()
serialized_event = json.dumps(event)
# Broadcast the event to all connected clients
for client in clients:
try:
client.write((serialized_event + b'\n'))
await client.drain()
except (ConnectionResetError, BrokenPipeError):
clients.remove(client) # Remove clients that have disconnected

async def handle_client(reader, writer, clients):
clients.append(writer)
try:
while True:
await asyncio.sleep(3600) # Keep the client connection alive
except (ConnectionResetError, BrokenPipeError):
pass
finally:
clients.remove(writer)
writer.close()
await writer.wait_closed()

# start a server for a given path
async def start_server(path, clients):
server = await asyncio.start_unix_server(lambda r, w: handle_client(r, w, clients), path=path)
async with server:
await server.serve_forever()

async def main():
clients = []
servers = [start_server(path, clients) for path in socket_paths]
# Start the event reader in a separate thread
global loop
loop = asyncio.get_running_loop()
executor.submit(read_events)
await asyncio.gather(*servers, handle_event(clients))

if __name__ == '__main__':
asyncio.run(main())

54 changes: 54 additions & 0 deletions waypanel/src/ipc_server/ipc_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import socket
import orjson
from gi.repository import GLib

class WayfireClientIPC:
def __init__(self, handle_event):
self.client_socket = None
self.source = None
self.buffer = ""
self.socket_path = None
self.handle_event = handle_event # Store the custom handle_event function

def connect_socket(self, socket_path):
self.socket_path = socket_path
self.client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.client_socket.connect(socket_path)
self.source = GLib.io_add_watch(self.client_socket, GLib.PRIORITY_DEFAULT, GLib.IO_IN, self.handle_socket_event)

def handle_socket_event(self, fd, condition):
# try decode before actually handle the event
# if the code fail, glib will stop watching
try:
chunk = fd.recv(1024).decode()
if not chunk:
return GLib.SOURCE_REMOVE

self.buffer += chunk

while '\n' in self.buffer:
event_str, self.buffer = self.buffer.split('\n', 1)
if event_str:
try:
event = orjson.loads(event_str)
self.process_event(event)
except orjson.JSONDecodeError as e:
print(f"JSON decode error: {e}")
except UnicodeDecodeError as e:
print(f"{e}")


return GLib.SOURCE_CONTINUE

def process_event(self, event):
self.handle_event(event) # Call the custom handle_event function

def disconnect_socket(self):
if self.source:
self.source.remove()
if self.client_socket:
self.client_socket.close()

def wayfire_events_setup(self, socket_path):
self.connect_socket(socket_path)

42 changes: 10 additions & 32 deletions waypanel/src/panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import netifaces
import soundcard as sc
import os
import socket
from pathlib import Path
from subprocess import Popen
from subprocess import check_output
Expand All @@ -14,7 +15,7 @@
import time
from collections import ChainMap
from waypanel.src.core.utils import Utils as utils
from waypanel.src.core.create_panel import CreatePanel, set_layer_position_exclusive
from waypanel.src.core.create_panel import CreatePanel, set_layer_position_exclusive, unset_layer_position_exclusive
from gi.repository import Gtk, Adw, GLib, Gio, Gdk
from waypanel.src.plugins.dockbar import Dockbar
from waypanel.src.plugins.bookmarksPopover import PopoverBookmarks
Expand All @@ -26,11 +27,11 @@
from waypanel.src.plugins.clipboardMenu import MenuClipboard
from waypanel.src.plugins.launcherMenu import MenuLauncher
from waypanel.src.core.utils import Utils
from waypanel.src.core.background import Background
from wayfire import WayfireSocket as OriginalWayfireSocket
from wayfire.core.template import get_msg_template
from wayfire.extra.ipc_utils import WayfireUtils
from wayfire.extra.stipc import Stipc
from waypanel.src.ipc_server.ipc_client import WayfireClientIPC

#to get the gtk and gdk completions
#pip install pygobject-stubs --no-cache-dir --config-settings=config=Gtk4,Gdk
Expand Down Expand Up @@ -71,9 +72,12 @@ def __init__(self, application_id):
self.panel_cfg = self.load_topbar_config()

self.sock = WayfireSocket()
self.sock.watch()
GLib.io_add_watch(self.sock.client, GLib.IO_IN, self.on_event_ready)
#self.sock.watch()
#GLib.io_add_watch(self.sock.client, GLib.IO_IN, self.on_event_ready)
self.wf_utils = WayfireUtils(self.sock)
self.ipc_client = WayfireClientIPC(self.handle_event)
# here is where the ipc events happen
self.ipc_client.wayfire_events_setup("/tmp/waypanel.sock")

self.fd = None

Expand Down Expand Up @@ -716,43 +720,17 @@ def on_expo_desactivated(self):
# just disable this function call
return True

# events that will make the dockbars clickable or not
def on_scale_activated(self):
# call it once to update without waiting timeout_add
# then timeout_add until scale is false

#if self.background_panel_enabled:
# self.update_background_panel()
# self.set_cpu_usage()
# GLib.timeout_add_seconds(1, self.set_cpu_usage)
# GLib.timeout_add_seconds(1, self.update_background_panel)
# set_layer_position_exclusive(self.update_background_panel)
self.is_scale_active = True
#self.utils.run_app('wofi --yoffset 20 --search ....')

def on_scale_desactivated(self):
#if self.background_panel_enabled:
# unset_layer_position_exclusive(self.update_background_panel)
self.is_scale_active = False
#self.utils.run_app("pkill wofi")

def on_view_focused(self):
self.update_title_top_panel()

def start_thread_all_events(self):
self.all_events = Background(
self.all_events_watch, lambda: self.on_events_finished
)
self.all_events.start()

def on_events_finished(self):
# non working code
try:
self.all_events_watch.finish()
except Exception as err:
print(err)

def list_views(self):

return self.sock.list_views()

# this function need a rework, get active monitor
Expand Down Expand Up @@ -2003,7 +1981,7 @@ def start_panel():
append_to_env("output_id", utils.get_output_id_by_name(monitor_name))

app.run(None)
sock.watch(["event"])
#sock.watch(["event"])

while True:
msg = sock.read_message()
Expand Down
Loading

0 comments on commit 96dec36

Please sign in to comment.