Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate into gobject.MainLoop? #11

Closed
masmu opened this issue Nov 9, 2016 · 12 comments
Closed

Integrate into gobject.MainLoop? #11

masmu opened this issue Nov 9, 2016 · 12 comments

Comments

@masmu
Copy link

masmu commented Nov 9, 2016

Hello, great project!

Is it possible to integrate this into a gobject.MainLoop?
I've read about the set_poll_func but I am not quite sure how that fits into gobject.

        import gobject
        mainloop = gobject.MainLoop()
        try:
            mainloop.run()
        except KeyboardInterrupt:
            pass

Furthermore, is that library compatible with any pulseaudio version?

@mk-fg
Copy link
Owner

mk-fg commented Nov 9, 2016

Hey. Thanks.

First of all, import gobject is an obsolete way to use it - these days everything g* is done through gobject-introspection, i.e.:

from gi.repository import GLib
mainloop = GLib.MainLoop()

https://lazka.github.io/pgi-docs/#GLib-2.0

pulseaudio already uses glib event loop as its main loop by default, see src/pulse/glib-mainloop.c, so calling set_poll_func and passing fd's to glib might be redundant.

I'm not sure if it's GLib.MainLoop though - should be possible to create bunch of unrelated contexts/loops with glib - you can probably check the source or test it by e.g. registering callback via GLib.timeout_add(), running pulse.event_listen() and see if callback gets called (if yes - same loop, if not - probably diff one).

With async code though, problem should also be that one'd need to invert control flow somehow, i.e. have pulse.some_pulse_func() register its callbacks and return immediately without calling pa_mainloop_run() synchronously and waiting-for/processing results, which is how this module's public api works (so it can be easily used from blocking code).

So best options to explore (that I can think of):

  • If pulse uses same GLib.MainLoop() - use either pulse.something() calls to run the loop or mainloop.run() if there are none pending.

    I.e. have something like pending_pulse_calls = list() and have your loop_run() function look something like this:

    loop_running = True
    pending_pulse_calls = deque()
    
    def callback_for_something_else():
      add_pulse_call(pulse.module_list, pulse_module_list_callback)
    
    def pulse_module_list_callback(module_list):
      # do somethng with module_list
    
    def add_pulse_call(func, callback):
      loop_wakeup = bool(pending_pulse_calls)
      pending_pulse_calls.append((func, callback))
      if loop_wakeup: mainloop.stop()
    
    def loop_run():
      while loop_running:
        while pending_pulse_calls:
          call, cb = pending_pulse_calls.popleft()
          cb(call())
        mainloop.run()
    

    Again, should only work if pa_mainloop_run() and mainloop.run() is the same thing, be sure to check that first.

  • Don't use public module API and/or override blocking calls there to be coroutines or something like that.

    E.g. starting with something like this: https://github.com/mk-fg/python-pulse-control/blob/3a2be31/pulsectl/pulsectl.py#L601-L610

    Have that method accept some callback or promise-object (e.g. asyncio.Future) and instead of using with self._pulse_op_cb(raw=True) as cb: contextmanager (which invokes pa_mainloop_run()) register callback to process result and send it back to that passed callback/promise.

    Probably easier/cleaner way if you only need like 3 pulse calls and don't care about the rest - just re-implement these in such async fashion and done.

  • Use set_poll_func to invert control flow.

    This func will be called when pa_mainloop_run() gets invoked with list of fd's to wait on and timeout - run GLib.io_add_watch() on each fd, GLib.timeout_add() with timeout and then mainloop.run(), and once any of the callbacks you've added on behalf of pulse triggers (either its fd or timeout) - stop the loop, return.

Probably not exhaustive list of options.
Hope it might help to get started.

Furthermore, is that library compatible with any pulseaudio version?

I suppose you might get some deprecation warnings or errors from import gobject, hence the question - just use gi.repository (PyGObject) instead.

EDIT: clarified which "main" loop I meant in "not sure if it's main loop" - GLib.MainLoop, ofc.

@mk-fg
Copy link
Owner

mk-fg commented Nov 9, 2016

Furthermore, is that library compatible with any pulseaudio version?

just use gi.repository (PyGObject) instead.

Btw, not sure if you mean this module or gobject module, or some other library here.

And I do realize that just use gi.repository (PyGObject) instead might be terrible advice and involve bunch of porting work, so maybe better answer is: don't know with "gobject" module, really - haven't used it myself for years now.

@masmu
Copy link
Author

masmu commented Nov 10, 2016

Thanks for your fast and detailed response!

Well, it is quite sure that those are not the same loops. When calling mainloop.run() there are no pulsectl events, when using pulse.event_listen() there are no gobject events.

I started some experiments using set_poll_func() and added the provided fds to gobject.io_add_watch() as you suggested. But I got the feeling that this is getting more complicated as necessary. So the following was I ended up with:

import gobject
import pulsectl

def task1():
    print('running task1')
    return True

def print_events(ev):
    print('Pulse event:', ev)

pulse = pulsectl.Pulse('test')
pulse.event_mask_set('all')
pulse.event_callback_set(print_events)

mainloop = gobject.MainLoop()
gobject.timeout_add(1000, task1)
context = mainloop.get_context()
while mainloop is not None:
    if context.pending():
        context.iteration()
    else:
        pulse.event_listen(timeout=0.01)

That works great in first tests. Basically it combines both loops to act as one. The gobject one is main event loop, the pulsectl event loop is being checked when the main one idles. From what I read in your source this does not seem to create and destroy any objects every 0.01 seconds, so from my perspective the performance is great and it seems like this is indented use for event_listen(timeout=...), right?

It does not seem to be the case, but is there a chance of missing some events while the main loop is working?

@mk-fg
Copy link
Owner

mk-fg commented Nov 10, 2016

Well, it is quite sure that those are not the same loops. When calling mainloop.run() there are no pulsectl events, when using pulse.event_listen() there are no gobject events.

Yeah, it certainly seem to be the case, unfortunately.

Though looking at src/pulse/mainloop.c implementation:

int pa_mainloop_iterate(pa_mainloop *m, int block, int *retval) {
...
    if ((r = pa_mainloop_prepare(m, block ? -1 : 0)) < 0)
        goto quit;

    if ((r = pa_mainloop_poll(m)) < 0)
        goto quit;

    if ((r = pa_mainloop_dispatch(m)) < 0)
        goto quit;
...
}

int pa_mainloop_run(pa_mainloop *m, int *retval) {
...
    while ((r = pa_mainloop_iterate(m, 1, retval)) >= 0)
        ;
...
}

I'd say it might still be the case that pulse inits all event handlers in prepare() there and removes in dispatch() or something like that (checked it because I remember re-implementing this bit in pulsectl.py somewhere), which is why you don't have these triggered when running glib mainloop, but it seems unlikely and maybe not very useful anyway.

I started some experiments using set_poll_func() and added the provided fds to gobject.io_add_watch() as you suggested. But I got the feeling that this is getting more complicated as necessary.

True, I also put it in the last place there because it seems rather complicated - but likely unavoidably so - to me as well.
Might still be the cleanest way to do it, but if something simplier works fine, totally can see why it's probably not worth the effort.

pulse.event_listen(timeout=0.01)

Oh yeah, that's where I've kinda re-implemented pa_mainloop_iterate() - _pulse_poll() call there (which is pretty much all pulse.event_listen() does).

From what I read in your source this does not seem to create and destroy any objects every 0.01 seconds

Indeed, should only check/create some timer-related values and do few minor state-checks like if context got disconnected/closed and whether threading lock is required (not the case here).

I think you can actually use timeout=0 there, as e.g. pa_mainloop_prepare(m, block ? -1 : 0) is clearly supported in pulseaudio API as "process all pending events" with timeout=0 value.

it seems like this is indented use for event_listen(timeout=...), right?

Wasn't thinking of mixing different event loops together that way, no.

Was rather thinking that timeout might be useful if one's e.g. waiting for 1s for something to happen, do something else when it does (signal success, exit), or some other thing if it does not by that time (e.g. quit with error, whatever).

But should only mean that I didn't use/test it that way myself, nothing else.

is there a chance of missing some events while the main loop is working?

Don't know details of pulseaudio "native" protocol, but I can imagine two ways it might work:

  • Server leaves "there are events pending" notification in libpulse client socket.
  • Server writes each event info to libpulse client socket as they happen.

In first case, doing such poll 1/s should work great, but if it's the second case, obviously something can get lost, client might get disconnected or something like that.
Though even if it's the second case, such overflow situation might happen to be extremely unlikely to the point where it's not worth worrying about in practice.

Again, unfortunately don't know protocol details to answer conclusively, but maybe will check the sources, or you can probably do that yourself, or e.g. ask in #pulseaudio on freenode, might get an easy answer from devs/maintainers there, or maybe on ML.

@mk-fg
Copy link
Owner

mk-fg commented Nov 10, 2016

Figured I'd just connect to pulse via tcp socket with something like this:

def ev_cb(ev): print('event:', ev)
with Pulse('test') as pulse:
  pulse.event_mask_set('all')
  pulse.event_callback_set(ev_cb)
  pulse.event_listen(1)
  print('done caring')
  time.sleep(600)

Then run wireshark and see what's flying there.

Bad news - definitely second case - each event sends packet with 40B payload, apparently event data.

Given that sysctl -a | g tcp_rmem = net.ipv4.tcp_rmem = 4096 87380 6291456, that'd be ~80 KiB buffer per tcp socket by default, and rmem_default for unix sockets is ~200K.
Sending just one event puts 80B (40B TCP header + 40B payload) in Recv-Q in ss -atn | g 4713 output (using 4713 as port for module-native-protocol-tcp), and probably just 40B (payload only) for unix sockets.

So that'd be like 1k-5k pulse events within that 1s window that should be necessary to overflow default socket buffers on my machine - don't think it's realistic, something like 50-100 is likely the high watermark here.

So I'd say checking on that queue once per second should be perfectly fine for real-world scenarios, unless it's a really busy pulse instance or some parameters above are different.

You can also easily check what happens when buffer overflows that way - replace sleep() with input() + event_listen() there, spam pulse events (e.g. roll volume bars or something) until it's clear that queue is full in wireshark/ss send '\n' to do event_listen(), see what happens.
Best-case - nothing bad, just some events lost - and who cares with >1k/s ;)

@mk-fg mk-fg closed this as completed Nov 12, 2016
@masmu
Copy link
Author

masmu commented Nov 14, 2016

Thanks again for your efforts and your awesome support!

Are you planning on supporting blocking calls? I have some legacy python2 code full with blocking calls... and integration of asyncio is not that easy in python2.

@mk-fg
Copy link
Owner

mk-fg commented Nov 14, 2016

Thanks.

I don't think this module will ever support asyncio or non-blocking API - i.e. should stay as it is now.
For asyncio in particular, I think making entirely separate asyncio-only non-blocking module would make more sense than trying to shoehorn it into this one.

@mk-fg
Copy link
Owner

mk-fg commented Nov 14, 2016

Just randomly thought of another thing related to this topic - you can run two separate glib loops from two separate threads just fine, so it might be another way to integrate the thing.
I didn't consider suggesting the option initially as I thought it might be same loop, but it doesn't seem to be case.

As both threads will be IO-bound, there shouldn't be any issue with GIL, and it should be relatively easy to keep all things pulse in a separate daemon-thread, proxying calls to it from the main one via some queue.

@mk-fg
Copy link
Owner

mk-fg commented Nov 14, 2016

Weird that I kinda-forgot about this option, given that I did exactly that with this module in mk-fg/pulseaudio-mixer-cli and AccelerateNetworks/PagingServer projects.
Former has curses input loop in main thread, and it's pjsip/pjsua eventloop in the latter case.

@masmu
Copy link
Author

masmu commented Nov 18, 2016

Sorry for the delay, was busy with some other refactoring.

I followed your advise and I am very happy with it. By far not that ugly as it sounded first 😄

from gi.repository import GObject
import pulsectl
import threading
import multiprocessing
import sys
import time


class PulseThread(threading.Thread):

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def on_event(self, event):
        self.queue.put(event)

    def run(self):
        pulse = pulsectl.Pulse('test1')
        pulse.event_mask_set('all')
        pulse.event_callback_set(self.on_event)
        pulse.event_listen()


class PulseReactor(object):

    def __init__(self):
        self.pulse_queue = multiprocessing.Queue()
        self.pulse = pulsectl.Pulse('test2')

    def task1(self):
        print('running task1')
        return True

    def on_pulse_event(self, fd, condition):
        try:
            event = self.pulse_queue.get_nowait()
        except:
            return True

        print(event)
        self.update_sinks()
        return True

    def update_sinks(self):
        for sink in self.pulse.sink_list():
            print(sink)

    def start(self):
        pulse_thread = PulseThread(self.pulse_queue)
        pulse_thread.daemon = True
        pulse_thread.start()

        mainloop = GObject.MainLoop()
        GObject.timeout_add(1000, self.task1)
        GObject.io_add_watch(
            self.pulse_queue._reader, GObject.IO_IN | GObject.IO_PRI,
            self.on_pulse_event)

        try:
            mainloop.run()
        except KeyboardInterrupt:
            pass

reactor = PulseReactor()
reactor.start()

One thing I am wondering, are there any potential side effects when using 2 instances of pulsectl? One is currently living in the main thread (test2) the other in the pulse thread (test1).

Why did you decide to deny blocking calls from the event callbacks in the first place?

@mk-fg
Copy link
Owner

mk-fg commented Nov 18, 2016

I've never actually tested two client instances running in the same pid, but from all I've seen in libpulse it shouldn't be a problem there.
One concern though is that some global stuff might leak between these in the python code, if I messed something up there, as it is untested territory, but if there're such things, they're certainly unintentional and should be fixed.

Implementation looks rather straightforward, a few things that stick-out to me:

  • multiprocessing.Queue() is most likely an actual os pipe, and stuff passed through it gets pickled/unpickled, which seem to be a bit of an overkill to use between threads, which can just push/pop things from same list, and use threading primitives to sync access to it, if necessary.

  • You can subscribe to sink/new events (or look for those among others) and get index of each new sink with them, without needing to run sink_list(), iirc.

  • Can get all the info on sinks or whatever in the pulse thread and return these, if only info on these is needed.

  • In both cases when I've used such "pulse worker" thread, I've used Pulse(..., threading_lock=True) and event_listen_stop() to re-use same client both as event-listener and command-runner from any thread.

    E.g. in pulseaudio-mixer-cli, "mute" does this from the main thread (while bg thread is listening for events):

    with self.menu.update_wakeup() as pulse: pulse.mute(self.obj, self.obj.mute)
    

    update_wakeup() context manager stops the event_listen() loop and returns client object for whatever thread to use, resuming the loop after context finishes doing whatever it needs to.

    Code for this contextmanager that I have in there:

    @contextmanager
    def update_wakeup(self, trap_errors=True, loop_interval=0.03):
        'Anything pulse-related MUST be done in this context.'
        with self._pulse_hold:
            for n in range(int(5.0 / loop_interval)):
                # wakeup only works when loop is actually started,
                #  which might not be the case regardless of any locks.
                self.pulse.event_listen_stop()
                if self._pulse_lock.acquire(timeout=loop_interval): break
            else:
                raise RuntimeError('poll_wakeup() hangs, likely locking issue')
            try: yield self.pulse
            except Exception as err:
                if not trap_errors:
                    self._update_wakeup_break = True
                    raise
                log.exception('Pulse interaction failure, skipping: <{}> {}', err.__class__.__name__, err)
            finally: self._pulse_lock.release()
    

    Which idk, maybe a bit ugly, but seem to avoid the need for a second client.

    Note that second lock and whole retry-loop is only there because it's not known if libpulse loop is started, which is not a thing you can know at all (maybe libpulse is taking its time or something), and pa_mainloop_wakeup() only works when poll() is running.
    But ofc, it's likely 99.9% works on the first try.

    trap_errors=True is there in case you don't want to stop the whole thing if libpulse raises error within context and nothing bothers to catch it - e.g. in "mute" example above muted object is gone right when the call is made or something like that.

  • This bit:

    try: mainloop.run()
    except KeyboardInterrupt: pass
    

    Can be replaced by signal.signal(signal.SIGINT, signal.SIG_DFL), I think, as the issue here is python's default KeyboardInterrupt-raising SIGINT handler.
    Probably not on windows though, dunno how interrupt stuff works there, maybe makes more sense in a cross-platform app.

  • import time seems unused, but can be dangerous if time.time() is used from there in a naive way - which is why py3 has time.monotonic() and _pulsectl has mono_time() for py2.

    It sounds far-fetched that time.time() will be jumping around, but actually very real thing for daemons starting on boot when it definitely tends to happen, leading to add_timeout(~46years) calls.

@mk-fg
Copy link
Owner

mk-fg commented Nov 18, 2016

import time seems unused, but can be dangerous
which is why py3 has time.monotonic() and _pulsectl has mono_time() for py2.

Forgot to mention here that glib/gobject has its own get_monotonic_time wrapper - https://lazka.github.io/pgi-docs/#GLib-2.0/functions.html#GLib.get_monotonic_time - which would probably be the best option for glib/gobject-based apps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants