Skip to content

Commit

Permalink
Use threads to read from subprocess to avoid deadlocks #49
Browse files Browse the repository at this point in the history
  • Loading branch information
MrS0m30n3 committed Jul 13, 2015
1 parent 365dc13 commit b646af4
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 29 deletions.
103 changes: 74 additions & 29 deletions youtube_dl_gui/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,53 @@
import locale
import subprocess

from time import sleep
from Queue import Queue
from threading import Thread


class PipeReader(Thread):
"""Helper class to avoid deadlocks when reading from subprocess pipes.
This class uses python threads and queues in order to read from subprocess
pipes in an asynchronous way.
Attributes:
WAIT_TIME (float): Time in seconds to sleep.
Args:
queue (Queue.Queue): Python queue to store the output of the subprocess.
"""

WAIT_TIME = 0.1

def __init__(self, queue):
super(PipeReader, self).__init__()
self._filedescriptor = None
self._running = True
self._queue = queue

self.start()

def run(self):
while self._running:
if self._filedescriptor is not None:
for line in iter(self._filedescriptor.readline, ''):
self._queue.put_nowait(line.rstrip())

self._filedescriptor = None

sleep(self.WAIT_TIME)

def attach_filedescriptor(self, filedesc):
"""Attach a filedescriptor to the PipeReader. """
self._filedescriptor = filedesc

def join(self, timeout=None):
self._running = False
super(PipeReader, self).join(timeout)


class YoutubeDLDownloader(object):

Expand All @@ -40,6 +87,11 @@ class YoutubeDLDownloader(object):
Note:
For available data keys check self._data under __init__().
Warnings:
The caller is responsible for calling the close() method after he has
finished with the object in order for the object to be able to properly
close down itself.
Example:
How to use YoutubeDLDownloader from a python script.
Expand Down Expand Up @@ -79,6 +131,9 @@ def __init__(self, youtubedl_path, data_hook=None, log_data=None):
'eta': None
}

self._stderr_queue = Queue()
self._stderr_reader = PipeReader(self._stderr_queue)

def download(self, url, options):
"""Download url using given options.
Expand All @@ -88,7 +143,7 @@ def download(self, url, options):
Returns:
An integer that shows the status of the download process.
Right now we support 5 different return codes.
Right now we support 6 different return codes.
OK (0): The download process completed successfully.
ERROR (1): An error occured during the download process.
Expand All @@ -104,22 +159,28 @@ def download(self, url, options):
cmd = self._get_cmd(url, options)
self._create_process(cmd)

self._stderr_reader.attach_filedescriptor(self._proc.stderr)

while self._proc_is_alive():
stdout, stderr = self._read()
stdout = self._proc.stdout.readline().rstrip().decode(self._get_encoding(), 'ignore')

if stderr:
if stdout:
self._sync_data(extract_data(stdout))
self._hook_data()

# Read stderr after download process has been completed
# We don't need to read stderr in real time
while not self._stderr_queue.empty():
stderr = self._stderr_queue.get_nowait().decode(self._get_encoding(), 'ignore')

self._log(stderr)

if self._return_code != self.STOPPED:
if self._is_warning(stderr):
self._return_code = self.WARNING
else:
self._return_code = self.ERROR

self._log(stderr)

if stdout:
self._sync_data(extract_data(stdout))
self._hook_data()

self._last_data_hook()

return self._return_code
Expand All @@ -130,6 +191,10 @@ def stop(self):
self._proc.kill()
self._return_code = self.STOPPED

def close(self):
"""Destructor like function for the object. """
self._stderr_reader.join()

def _is_warning(self, stderr):
return stderr.split(':')[0] == 'WARNING'

Expand Down Expand Up @@ -216,26 +281,6 @@ def _proc_is_alive(self):

return self._proc.poll() is None

def _read(self):
"""Read subprocess stdout, stderr.
Returns:
Python tuple that contains the STDOUT and STDERR
strings.
"""
stdout = stderr = ''

if self._proc is not None:
stdout = self._proc.stdout.readline().rstrip()

if not stdout:
stderr = self._proc.stderr.readline().rstrip()

encoding = self._get_encoding()

return stdout.decode(encoding, 'ignore'), stderr.decode(encoding, 'ignore')

def _get_cmd(self, url, options):
"""Build the subprocess command.
Expand Down
3 changes: 3 additions & 0 deletions youtube_dl_gui/downloadmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ def run(self):

time.sleep(self.WAIT_TIME)

# Call the destructor function of YoutubeDLDownloader object
self._downloader.close()

def download(self, item):
"""Download given item.
Expand Down

0 comments on commit b646af4

Please sign in to comment.