Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[options] Added workaround option to execute "n_function" #31187

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion test/test_youtube_signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ def make_tfunc(url, sig_input, expected_sig):
test_id = m.group('id')

def test_func(self):
basename = 'player-{0}-{1}.js'.format(name, test_id)
tn = name
if name.endswith('_wd'):
tn = name[:-3]
basename = 'player-{0}-{1}.js'.format(tn, test_id)
fn = os.path.join(self.TESTDATA_DIR, basename)

if not os.path.exists(fn):
Expand All @@ -241,6 +244,10 @@ def n_sig(jscode, sig_input):
return JSInterpreter(jscode).call_function(funcname, sig_input)


def n_sig_wd(jscode, sig_input):
return YoutubeIE(FakeYDL())._call_n_function_with_webdriver('chrome', jscode, sig_input)


make_sig_test = t_factory(
'signature', signature, re.compile(r'.*-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player)?\.[a-z]+$'))
for test_spec in _SIG_TESTS:
Expand All @@ -251,6 +258,17 @@ def n_sig(jscode, sig_input):
for test_spec in _NSIG_TESTS:
make_nsig_test(*test_spec)

test_wd = False
for arg in sys.argv:
if arg == '--test_wd':
test_wd = True
break
if test_wd:
sys.argv = [arg for arg in sys.argv if arg != '--test_wd']
make_nsig_wd_test = t_factory(
'nsig_wd', n_sig_wd, re.compile(r'.+/player/(?P<id>[a-zA-Z0-9_-]+)/.+.js$'))
for test_spec in _NSIG_TESTS:
make_nsig_wd_test(*test_spec)

if __name__ == '__main__':
unittest.main()
1 change: 1 addition & 0 deletions youtube_dl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ def parse_retries(retries):
'call_home': opts.call_home,
'sleep_interval': opts.sleep_interval,
'max_sleep_interval': opts.max_sleep_interval,
'webdriver': opts.webdriver,
'external_downloader': opts.external_downloader,
'list_thumbnails': opts.list_thumbnails,
'playlist_items': opts.playlist_items,
Expand Down
6 changes: 6 additions & 0 deletions youtube_dl/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2448,6 +2448,11 @@ class compat_HTMLParseError(Exception):
except ImportError:
import BaseHTTPServer as compat_http_server

try:
from urllib.parse import quote as compat_urllib_quote
except ImportError: # Python 2
from urllib import quote as compat_urllib_quote

try:
from urllib.parse import unquote_to_bytes as compat_urllib_parse_unquote_to_bytes
from urllib.parse import unquote as compat_urllib_parse_unquote
Expand Down Expand Up @@ -3560,6 +3565,7 @@ def compat_datetime_timedelta_total_seconds(td):
'compat_tokenize_tokenize',
'compat_urllib_error',
'compat_urllib_parse',
'compat_urllib_quote',
'compat_urllib_request',
'compat_urllib_request_DataHandler',
'compat_urllib_response',
Expand Down
113 changes: 113 additions & 0 deletions youtube_dl/extractor/youtube.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import random
import re
import traceback
import importlib

from .common import InfoExtractor, SearchInfoExtractor
from ..compat import (
Expand All @@ -19,11 +20,13 @@
compat_urllib_parse,
compat_urllib_parse_parse_qs as compat_parse_qs,
compat_urllib_parse_unquote_plus,
compat_urllib_quote,
compat_urllib_parse_urlparse,
compat_zip as zip,
)
from ..jsinterp import JSInterpreter
from ..utils import (
check_executable,
clean_html,
dict_get,
error_to_compat_str,
Expand Down Expand Up @@ -1464,6 +1467,7 @@ def __init__(self, *args, **kwargs):
super(YoutubeIE, self).__init__(*args, **kwargs)
self._code_cache = {}
self._player_cache = {}
self._webdriver_wrapper = None

# *ytcfgs, webpage=None
def _extract_player_url(self, *ytcfgs, **kw_webpage):
Expand Down Expand Up @@ -1633,6 +1637,22 @@ def _decrypt_nsig(self, n, video_id, player_url):
if player_url is None:
raise ExtractorError('Cannot decrypt nsig without player_url')

webdriver_type = self._downloader.params.get('webdriver', None)
if webdriver_type is not None:
try:
jscode = self._load_player(video_id, player_url)
ret = self._call_n_function_with_webdriver(webdriver_type, jscode, n)
except Exception as e:
self.report_warning(
'%s (%s %s)' % (
'Unable to decode n-parameter: download likely to be throttled',
error_to_compat_str(e),
traceback.format_exc()),
video_id=video_id)
return
self.write_debug('Decrypted nsig(with webdriver) {0} => {1}'.format(n, ret))
return ret

try:
jsi, player_id, func_code = self._extract_n_function_code(video_id, player_url)
except ExtractorError as e:
Expand All @@ -1656,6 +1676,27 @@ def _decrypt_nsig(self, n, video_id, player_url):
self.write_debug('Decrypted nsig {0} => {1}'.format(n, ret))
return ret

def _call_n_function_with_webdriver(self, webdriver_type, jscode, n_param):
if self._webdriver_wrapper is None:
self._webdriver_wrapper = WebDriverJSWrapper(webdriver_type)
self._webdriver_wrapper.get('about:blank')
funcname = self._extract_n_function_name(jscode)
alphabet = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
dummyfunc = ''.join(random.choice(alphabet) for _ in range(8))
f = ('return ((e) => {{'
'const d = decodeURIComponent(e);'
'const p = d.lastIndexOf("}}");'
'const th = d.substring(0, p);'
'const bh = d.substring(p);'
'const m = "var {0};" + th + ";{0} = {1};" + bh;'
'const s = document.createElement("script");'
's.innerHTML = m;'
'document.body.append(s);'
'return {0}("{2}");'
'}})("{3}");').format(dummyfunc, funcname, n_param, compat_urllib_quote(jscode))
Comment on lines +1686 to +1696
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this run the entire js?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when the JS is loaded, an anonymous function is called with _yt_player as an argument.
There is a lot of waste, but this is what the browser normally does when browsing youtube.
Also, the JS that allows the desired function to be referenced from the outside is inserted at the time of this call, so there is no need for complicated parsing. :)

n = self._webdriver_wrapper.executeJS(f)
return n

def _extract_n_function_name(self, jscode):
func_name, idx = self._search_regex(
# new: (b=String.fromCharCode(110),c=a.get(b))&&c=nfunc[idx](c)
Expand Down Expand Up @@ -3809,3 +3850,75 @@ def _real_extract(self, url):
raise ExtractorError(
'Incomplete YouTube ID %s. URL %s looks truncated.' % (video_id, url),
expected=True)


class WebDriverJSWrapper(object):
"""WebDriver Wrapper class"""

def __init__(self, webdriver_type, pageload_timeout=10, script_timeout=5):
self._webdriver = None
try:
wd = importlib.import_module('selenium.webdriver')
except ImportError as e:
self._raise_exception('Failed to import module "selenium.webdriver"', cause=e)

if webdriver_type == 'firefox': # geckodriver
if not check_executable('geckodriver', ['--version']):
self._raise_exception('geckodriver not found in PATH')
o = wd.FirefoxOptions()
o.headless = True
s = wd.firefox.service.Service(log_path=os.path.devnull)
self._webdriver = wd.Firefox(options=o, service=s)
elif webdriver_type == 'chrome': # chromedriver
if not check_executable('chromedriver', ['--version']):
self._raise_exception('chromedriver not found in PATH')
o = wd.ChromeOptions()
o.headless = True
"""
If you are using the snap version of the chromium, chromedriver is included in the snap package.
You should use that driver.
$ cd /snap/bin && sudo ln -s -T chromium.chromedriver chromedriver
or
s = wd.chrome.service.Service(executable_path='chromium.chromedriver')
self._webdriver = wd.Chrome(options=o, service=s)
"""
self._webdriver = wd.Chrome(options=o)
elif webdriver_type == 'edge': # msedgedriver
if not check_executable('msedgedriver', ['--version']):
self._raise_exception('msedgedriver not found in PATH')
o = wd.EdgeOptions()
o.headless = True
self._webdriver = wd.Edge(options=o)
elif webdriver_type == 'safari': # safaridriver
if not check_executable('safaridriver', ['--version']):
self._raise_exception('safaridriver not found in PATH')
"""
safaridriver does not have headless-mode. :(
But macOS includes safaridriver by default.
To enable automation on safaridriver, run the following command once from the admin terminal.
# safaridriver --enable
"""
self._webdriver = wd.Safari()
else:
self._raise_exception('unsupported type: %s' % (webdriver_type))
self._webdriver.set_page_load_timeout(pageload_timeout)
self._webdriver.set_script_timeout(script_timeout)

def __del__(self):
if self._webdriver is not None:
self._webdriver.quit()

def _raise_exception(self, msg, cause=None):
raise ExtractorError('[WebDriverJSWrapper] %s' % (msg), cause=cause)

def get(self, url):
"""Loads a web page in the current browser session"""
self._webdriver.get(url)

def executeJS(self, jscode):
"""Execute JS and return value"""
try:
ret = self._webdriver.execute_script(jscode)
except Exception as e:
self._raise_exception('Failed to execute JS', cause=e)
return ret
3 changes: 3 additions & 0 deletions youtube_dl/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,9 @@ def _comma_separated_values_options_callback(option, opt_str, value, parser):
'Upper bound of a range for randomized sleep before each download '
'(maximum possible number of seconds to sleep). Must only be used '
'along with --min-sleep-interval.'))
workarounds.add_option(
'--webdriver', metavar='TYPE', dest='webdriver', default=None,
help='Specify webdriver type when you want to use Selenium to execute YouTube\'s "n_function" in order to avoid throttling: "firefox", "chrome", "edge", or "safari"')

verbosity = optparse.OptionGroup(parser, 'Verbosity / Simulation Options')
verbosity.add_option(
Expand Down
Loading