Skip to content

Commit

Permalink
#249: gtk2 solution for large UI thread delays on osx - which can als…
Browse files Browse the repository at this point in the history
…o be re-used for other purposes (ie: pause window refresh):

* monitor the UI thread latency (enabled for osx only, or via XPRA_FAKE_UI_LOCKUPS env var)
* if we find the UI thread is blocked for more than 2 seconds, we tell the server to "suspend" the "UI" events and pixel refresh for all the windows we know about
* when it resumes, we tell the server to "resume" operations
* new server capability for these new packet types is "suspend-resume"
* the 'ServerSource' will stop the sound, prevent bell events, notifications and such from being sent - when we get the resume event it will re-start the sound if it was running before
* the 'WindowSource's specified will also cancel all damage requests (and the pending acks which may take a while to come) and wait for the resume message (and we drop all damage packets that were being compressed), on resume we issue a brand new damage request at 100% quality to refresh all the windows
* flags added to "xpra info"

git-svn-id: https://xpra.org/svn/Xpra/trunk@3786 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Jul 7, 2013
1 parent 393ab07 commit 753f681
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 11 deletions.
42 changes: 42 additions & 0 deletions src/xpra/client/gtk2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.

import time
import os
import sys
import gobject
try:
Expand All @@ -30,6 +32,9 @@
"custom" : CustomClientWindow,
}

FAKE_UI_LOCKUPS = int(os.environ.get("XPRA_FAKE_UI_LOCKUPS", "0"))


class XpraClient(GTKXpraClient):

WINDOW_TOPLEVEL = gdk.WINDOW_TOPLEVEL
Expand Down Expand Up @@ -192,6 +197,43 @@ def process_ui_capabilities(self, capabilities):
screen = display.get_screen(i)
screen.connect("size-changed", self._screen_size_changed)
i += 1
#if server supports it, enable UI thread monitoring workaround when needed:
if self.suspend_resume and (sys.platform.startswith("darwin") or FAKE_UI_LOCKUPS>0):
self.start_UI_thread_polling()
if FAKE_UI_LOCKUPS>0:
def sleep_in_ui_thread(*args):
time.sleep(FAKE_UI_LOCKUPS)
return True
gobject.timeout_add((10+FAKE_UI_LOCKUPS)*1000, sleep_in_ui_thread)

def start_UI_thread_polling(self):
log("start_UI_thread_polling()")
import thread
self.UI_blocked_sent = False
def UI_thread_wakeup(*args):
log("UI_thread_wakeup()")
self.last_UI_thread_time = time.time()
#UI thread was blocked?
if self.UI_blocked_sent:
log.info("UI thread is running again, resuming")
self.send("resume", True, self._id_to_window.keys())
self.UI_blocked_sent = False
return self.exit_code is None
UI_thread_wakeup()
gobject.timeout_add(1000, UI_thread_wakeup)
def poll_UI_loop(*args):
log("poll_UI_loop() running")
while self.exit_code is None:
delta = time.time()-self.last_UI_thread_time
log("poll_UI_loop() last_UI_thread_time was %.1f seconds ago, UI_blocked_sent=%s", delta, self.UI_blocked_sent)
if delta>2.0:
#UI thread is (still?) blocked:
if not self.UI_blocked_sent:
log.info("UI thread is blocked, pausing server")
self.send("suspend", True, self._id_to_window.keys())
self.UI_blocked_sent = True
time.sleep(1.0)
thread.start_new_thread(poll_UI_loop, ())

def _screen_size_changed(self, *args):
def update_size(current=None):
Expand Down
1 change: 1 addition & 0 deletions src/xpra/client/ui_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ def parse_server_capabilities(self, capabilities):
self.X11_OR_focus = capabilities.get("X11.OR_focus", False)
self.window_configure = capabilities.get("window_configure", False)
self.window_unmap = capabilities.get("window_unmap", False)
self.suspend_resume = capabilities.get("suspend-resume", False)
self.server_supports_notifications = capabilities.get("notifications", False)
self.notifications_enabled = self.server_supports_notifications and self.client_supports_notifications
self.server_supports_cursors = capabilities.get("cursors", True) #added in 0.5, default to True!
Expand Down
27 changes: 27 additions & 0 deletions src/xpra/server/server_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ def init_packet_handlers(self):
"set_deflate": self._process_set_deflate,
"desktop_size": self._process_desktop_size,
"encoding": self._process_encoding,
"suspend": self._process_suspend,
"resume": self._process_resume,
#sound:
"sound-control": self._process_sound_control,
"sound-data": self._process_sound_data,
Expand Down Expand Up @@ -761,6 +763,7 @@ def make_hello(self):
capabilities["sound_sequence"] = True
capabilities["info-request"] = True
capabilities["notify-startup-complete"] = True
capabilities["suspend-resume"] = True
if self._reverse_aliases:
capabilities["aliases"] = self._reverse_aliases
capabilities["server_type"] = "base"
Expand Down Expand Up @@ -1051,6 +1054,30 @@ def _process_encoding(self, proto, packet):
self.refresh_windows(proto, wid_windows)


def _get_window_dict(self, wids):
wd = {}
for wid in wids:
window = self._id_to_window.get(wid)
if window:
wd[wid] = window
return wd

def _process_suspend(self, proto, packet):
log("suspend(%s)", packet[1:])
ui = packet[1]
wd = self._get_window_dict(packet[2])
ss = self._server_sources.get(proto)
if ss:
ss.suspend(ui, wd)

def _process_resume(self, proto, packet):
log("resume(%s)", packet[1:])
ui = packet[1]
wd = self._get_window_dict(packet[2])
ss = self._server_sources.get(proto)
if ss:
ss.resume(ui, wd)

def send_ping(self):
for ss in self._server_sources.values():
ss.ping()
Expand Down
53 changes: 44 additions & 9 deletions src/xpra/server/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ def __init__(self, protocol, disconnect_cb, idle_add, timeout_add, source_remove
self.default_encoding_options = {}

self.window_sources = {} #WindowSource for each Window ID
self.suspended = False
self.resume_sound = False

self.uuid = ""
self.hostname = ""
Expand Down Expand Up @@ -361,6 +363,31 @@ def close(self):
self.protocol.close()
self.protocol = None

def suspend(self, ui, wd):
if ui:
self.suspended = True
if self.sound_source is None:
self.resume_sound = False
else:
self.resume_sound = True
self.stop_sending_sound()
for wid in wd.keys():
ws = self.window_sources.get(wid)
if ws:
ws.suspend()

def resume(self, ui, wd):
if not ui:
self.suspended = False
for wid, window in wd.items():
ws = self.window_sources.get(wid)
if ws:
ws.resume(window)
if self.resume_sound:
self.start_sending_sound()
self.do_send_cursor()


def user_event(self):
self.last_user_event = time.time()

Expand Down Expand Up @@ -529,6 +556,7 @@ def startup_complete(self):
self.send("startup-complete")

def start_sending_sound(self):
assert not self.suspended, "cannot start sound when UI is suspended"
assert self.supports_speaker, "cannot send sound: support not enabled on the server"
assert self.sound_source is None, "a sound source already exists"
assert self.sound_receive, "cannot send sound: support is not enabled on the client"
Expand All @@ -549,6 +577,8 @@ def stop_sending_sound(self):

def new_sound_buffer(self, sound_source, data, metadata):
assert self.sound_source
if self.suspended:
return
if self.sound_source_sequence>0:
metadata["sequence"] = self.sound_source_sequence
self.send("sound-data", self.sound_source.codec, Compressed(self.sound_source.codec, data), metadata)
Expand Down Expand Up @@ -781,6 +811,7 @@ def addattr(k, name):
info["client.desktop_size" + suffix] = self.desktop_size or ""
info["client.connection_time" + suffix] = int(self.connection_time)
info["client.elapsed_time" + suffix] = int(time.time()-self.connection_time)
info["client.suspended" + suffix] = self.suspended
#= time.time()
#self.start_time = time.time()
if self.screen_sizes:
Expand Down Expand Up @@ -840,11 +871,12 @@ def send_info_response(self, info):
self.send("info-response", info)

def send_clipboard(self, packet):
if self.clipboard_enabled:
self.send(*packet)
if not self.clipboard_enabled or self.suspended:
return
self.send(*packet)

def send_cursor(self, cursor_data):
if not self.send_cursors:
if not self.send_cursors or self.suspended:
return
self.cursor_data = cursor_data
if not self.send_cursor_pending:
Expand All @@ -863,16 +895,19 @@ def do_send_cursor(self):
self.send("cursor", "")

def bell(self, wid, device, percent, pitch, duration, bell_class, bell_id, bell_name):
if self.send_bell:
self.send("bell", wid, device, percent, pitch, duration, bell_class, bell_id, bell_name)
if not self.send_bell or self.suspended:
return
self.send("bell", wid, device, percent, pitch, duration, bell_class, bell_id, bell_name)

def notify(self, dbus_id, nid, app_name, replaces_nid, app_icon, summary, body, expire_timeout):
if self.send_notifications:
self.send("notify_show", dbus_id, int(nid), str(app_name), int(replaces_nid), str(app_icon), str(summary), str(body), int(expire_timeout))
if not self.send_notifications or self.suspended:
return
self.send("notify_show", dbus_id, int(nid), str(app_name), int(replaces_nid), str(app_icon), str(summary), str(body), int(expire_timeout))

def notify_close(self, nid):
if self.send_notifications:
self.send("notify_close", nid)
if not self.send_notifications or self.suspended:
return
self.send("notify_close", nid)

def set_deflate(self, level):
self.send("set_deflate", level)
Expand Down
25 changes: 23 additions & 2 deletions src/xpra/server/window_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(self, idle_add, timeout_add, source_remove,
self.supports_delta = [x for x in encoding_options.get("supports_delta", []) if x in ("png", "rgb24", "rgb32")]
self.last_pixmap_data = None
self.batch_config = batch_config
self.suspended = False
#auto-refresh:
self.auto_refresh_delay = auto_refresh_delay
self.refresh_timer = None
Expand Down Expand Up @@ -154,6 +155,19 @@ def cleanup(self):
self.statistics.reset()
debug("encoding_totals for wid=%s with primary encoding=%s : %s", self.wid, self.encoding, self.statistics.encoding_totals)

def suspend(self):
self.cancel_damage()
self.statistics.reset()
self.suspended = True

def resume(self, window):
self.cancel_damage()
self.statistics.reset()
self.suspended = False
w, h = window.get_dimensions()
self.damage(window, 0, 0, w, h, {"quality" : 100})


def set_new_encoding(self, encoding):
""" Changes the encoder for the given 'window_ids',
or for all windows if 'window_ids' is None.
Expand Down Expand Up @@ -230,6 +244,7 @@ def add_stats(self, info, suffix=""):
#no suffix for metadata (as it is the same for all clients):
info[prefix+"dimensions"] = self.window_dimensions
info[prefix+"encoding"+suffix] = self.encoding
info[prefix+"suspended"+suffix] = self.suspended
self.statistics.add_stats(info, prefix, suffix)

#batch delay stats:
Expand Down Expand Up @@ -259,6 +274,8 @@ def calculate_batch_delay(self):
calculate_batch_delay(self.window_dimensions, self.wid, self.batch_config, self.global_statistics, self.statistics)

def update_speed(self):
if self.suspended:
return
speed = self.default_encoding_options.get("speed", -1)
if speed<0:
min_speed = self.get_min_speed()
Expand Down Expand Up @@ -286,6 +303,8 @@ def get_current_speed(self):
return max(ms, self._encoding_speed[-1][-1])

def update_quality(self):
if self.suspended:
return
quality = self.default_encoding_options.get("quality", -1)
if quality<0:
min_quality = self.default_encoding_options.get("min-quality", -1)
Expand Down Expand Up @@ -330,6 +349,8 @@ def damage(self, window, x, y, w, h, options={}):
force the current options to override the old ones,
otherwise they are only merged.
"""
if self.suspended:
return
if w==0 or h==0:
#we may fire damage ourselves,
#in which case the dimensions may be zero (if so configured by the client)
Expand Down Expand Up @@ -760,7 +781,7 @@ def make_data_packet(self, damage_time, process_damage_time, wid, image, coding,
* 'x264' and 'vpx' use 'video_encode'
* 'rgb24' and 'rgb32' use 'rgb_encode' and the 'Compressed' wrapper to tell the network layer it is already zlibbed
"""
if self.is_cancelled(sequence):
if self.is_cancelled(sequence) or self.suspended:
debug("make_data_packet: dropping data packet for window %s with sequence=%s", wid, sequence)
return None
x, y, w, h, _ = image.get_geometry()
Expand Down Expand Up @@ -797,7 +818,7 @@ def make_data_packet(self, damage_time, process_damage_time, wid, image, coding,
data, client_options, outw, outh, outstride = encoder(coding, image, options)
#check cancellation list again since the code above may take some time:
#but always send mmap data so we can reclaim the space!
if coding!="mmap" and self.is_cancelled(sequence):
if coding!="mmap" and (self.is_cancelled(sequence) or self.suspended):
debug("make_data_packet: dropping data packet for window %s with sequence=%s", wid, sequence)
return None
if data is None:
Expand Down

0 comments on commit 753f681

Please sign in to comment.