mirror of https://github.com/yt-dlp/yt-dlp.git
5791 lines
187 KiB
Python
5791 lines
187 KiB
Python
import asyncio
|
||
import atexit
|
||
import base64
|
||
import binascii
|
||
import calendar
|
||
import codecs
|
||
import collections
|
||
import contextlib
|
||
import datetime
|
||
import email.header
|
||
import email.utils
|
||
import errno
|
||
import gzip
|
||
import hashlib
|
||
import hmac
|
||
import html.entities
|
||
import html.parser
|
||
import http.client
|
||
import http.cookiejar
|
||
import importlib.util
|
||
import inspect
|
||
import io
|
||
import itertools
|
||
import json
|
||
import locale
|
||
import math
|
||
import mimetypes
|
||
import operator
|
||
import os
|
||
import platform
|
||
import random
|
||
import re
|
||
import shlex
|
||
import socket
|
||
import ssl
|
||
import struct
|
||
import subprocess
|
||
import sys
|
||
import tempfile
|
||
import time
|
||
import traceback
|
||
import types
|
||
import unicodedata
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
import xml.etree.ElementTree
|
||
import zlib
|
||
|
||
from .compat import functools # isort: split
|
||
from .compat import (
|
||
compat_etree_fromstring,
|
||
compat_expanduser,
|
||
compat_HTMLParseError,
|
||
compat_os_name,
|
||
compat_shlex_quote,
|
||
)
|
||
from .dependencies import brotli, certifi, websockets, xattr
|
||
from .socks import ProxyType, sockssocket
|
||
|
||
|
||
def register_socks_protocols():
|
||
# "Register" SOCKS protocols
|
||
# In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
|
||
# URLs with protocols not in urlparse.uses_netloc are not handled correctly
|
||
for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
|
||
if scheme not in urllib.parse.uses_netloc:
|
||
urllib.parse.uses_netloc.append(scheme)
|
||
|
||
|
||
# This is not clearly defined otherwise
|
||
compiled_regex_type = type(re.compile(''))
|
||
|
||
|
||
def random_user_agent():
|
||
_USER_AGENT_TPL = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/%s Safari/537.36'
|
||
_CHROME_VERSIONS = (
|
||
'90.0.4430.212',
|
||
'90.0.4430.24',
|
||
'90.0.4430.70',
|
||
'90.0.4430.72',
|
||
'90.0.4430.85',
|
||
'90.0.4430.93',
|
||
'91.0.4472.101',
|
||
'91.0.4472.106',
|
||
'91.0.4472.114',
|
||
'91.0.4472.124',
|
||
'91.0.4472.164',
|
||
'91.0.4472.19',
|
||
'91.0.4472.77',
|
||
'92.0.4515.107',
|
||
'92.0.4515.115',
|
||
'92.0.4515.131',
|
||
'92.0.4515.159',
|
||
'92.0.4515.43',
|
||
'93.0.4556.0',
|
||
'93.0.4577.15',
|
||
'93.0.4577.63',
|
||
'93.0.4577.82',
|
||
'94.0.4606.41',
|
||
'94.0.4606.54',
|
||
'94.0.4606.61',
|
||
'94.0.4606.71',
|
||
'94.0.4606.81',
|
||
'94.0.4606.85',
|
||
'95.0.4638.17',
|
||
'95.0.4638.50',
|
||
'95.0.4638.54',
|
||
'95.0.4638.69',
|
||
'95.0.4638.74',
|
||
'96.0.4664.18',
|
||
'96.0.4664.45',
|
||
'96.0.4664.55',
|
||
'96.0.4664.93',
|
||
'97.0.4692.20',
|
||
)
|
||
return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
|
||
|
||
|
||
SUPPORTED_ENCODINGS = [
|
||
'gzip', 'deflate'
|
||
]
|
||
if brotli:
|
||
SUPPORTED_ENCODINGS.append('br')
|
||
|
||
std_headers = {
|
||
'User-Agent': random_user_agent(),
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||
'Accept-Language': 'en-us,en;q=0.5',
|
||
'Sec-Fetch-Mode': 'navigate',
|
||
}
|
||
|
||
|
||
USER_AGENTS = {
|
||
'Safari': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) AppleWebKit/533.20.25 (KHTML, like Gecko) Version/5.0.4 Safari/533.20.27',
|
||
}
|
||
|
||
|
||
NO_DEFAULT = object()
|
||
IDENTITY = lambda x: x
|
||
|
||
ENGLISH_MONTH_NAMES = [
|
||
'January', 'February', 'March', 'April', 'May', 'June',
|
||
'July', 'August', 'September', 'October', 'November', 'December']
|
||
|
||
MONTH_NAMES = {
|
||
'en': ENGLISH_MONTH_NAMES,
|
||
'fr': [
|
||
'janvier', 'février', 'mars', 'avril', 'mai', 'juin',
|
||
'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
|
||
}
|
||
|
||
# From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
|
||
TIMEZONE_NAMES = {
|
||
'UT': 0, 'UTC': 0, 'GMT': 0, 'Z': 0,
|
||
'AST': -4, 'ADT': -3, # Atlantic (used in Canada)
|
||
'EST': -5, 'EDT': -4, # Eastern
|
||
'CST': -6, 'CDT': -5, # Central
|
||
'MST': -7, 'MDT': -6, # Mountain
|
||
'PST': -8, 'PDT': -7 # Pacific
|
||
}
|
||
|
||
# needed for sanitizing filenames in restricted mode
|
||
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
|
||
itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'],
|
||
'aaaaaa', ['ae'], 'ceeeeiiiionooooooo', ['oe'], 'uuuuuy', ['th'], 'y')))
|
||
|
||
DATE_FORMATS = (
|
||
'%d %B %Y',
|
||
'%d %b %Y',
|
||
'%B %d %Y',
|
||
'%B %dst %Y',
|
||
'%B %dnd %Y',
|
||
'%B %drd %Y',
|
||
'%B %dth %Y',
|
||
'%b %d %Y',
|
||
'%b %dst %Y',
|
||
'%b %dnd %Y',
|
||
'%b %drd %Y',
|
||
'%b %dth %Y',
|
||
'%b %dst %Y %I:%M',
|
||
'%b %dnd %Y %I:%M',
|
||
'%b %drd %Y %I:%M',
|
||
'%b %dth %Y %I:%M',
|
||
'%Y %m %d',
|
||
'%Y-%m-%d',
|
||
'%Y.%m.%d.',
|
||
'%Y/%m/%d',
|
||
'%Y/%m/%d %H:%M',
|
||
'%Y/%m/%d %H:%M:%S',
|
||
'%Y%m%d%H%M',
|
||
'%Y%m%d%H%M%S',
|
||
'%Y%m%d',
|
||
'%Y-%m-%d %H:%M',
|
||
'%Y-%m-%d %H:%M:%S',
|
||
'%Y-%m-%d %H:%M:%S.%f',
|
||
'%Y-%m-%d %H:%M:%S:%f',
|
||
'%d.%m.%Y %H:%M',
|
||
'%d.%m.%Y %H.%M',
|
||
'%Y-%m-%dT%H:%M:%SZ',
|
||
'%Y-%m-%dT%H:%M:%S.%fZ',
|
||
'%Y-%m-%dT%H:%M:%S.%f0Z',
|
||
'%Y-%m-%dT%H:%M:%S',
|
||
'%Y-%m-%dT%H:%M:%S.%f',
|
||
'%Y-%m-%dT%H:%M',
|
||
'%b %d %Y at %H:%M',
|
||
'%b %d %Y at %H:%M:%S',
|
||
'%B %d %Y at %H:%M',
|
||
'%B %d %Y at %H:%M:%S',
|
||
'%H:%M %d-%b-%Y',
|
||
)
|
||
|
||
DATE_FORMATS_DAY_FIRST = list(DATE_FORMATS)
|
||
DATE_FORMATS_DAY_FIRST.extend([
|
||
'%d-%m-%Y',
|
||
'%d.%m.%Y',
|
||
'%d.%m.%y',
|
||
'%d/%m/%Y',
|
||
'%d/%m/%y',
|
||
'%d/%m/%Y %H:%M:%S',
|
||
'%d-%m-%Y %H:%M',
|
||
])
|
||
|
||
DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
|
||
DATE_FORMATS_MONTH_FIRST.extend([
|
||
'%m-%d-%Y',
|
||
'%m.%d.%Y',
|
||
'%m/%d/%Y',
|
||
'%m/%d/%y',
|
||
'%m/%d/%Y %H:%M:%S',
|
||
])
|
||
|
||
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
|
||
JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?})\s*</script>'
|
||
|
||
NUMBER_RE = r'\d+(?:\.\d+)?'
|
||
|
||
|
||
@functools.cache
|
||
def preferredencoding():
|
||
"""Get preferred encoding.
|
||
|
||
Returns the best encoding scheme for the system, based on
|
||
locale.getpreferredencoding() and some further tweaks.
|
||
"""
|
||
try:
|
||
pref = locale.getpreferredencoding()
|
||
'TEST'.encode(pref)
|
||
except Exception:
|
||
pref = 'UTF-8'
|
||
|
||
return pref
|
||
|
||
|
||
def write_json_file(obj, fn):
|
||
""" Encode obj as JSON and write it to fn, atomically if possible """
|
||
|
||
tf = tempfile.NamedTemporaryFile(
|
||
prefix=f'{os.path.basename(fn)}.', dir=os.path.dirname(fn),
|
||
suffix='.tmp', delete=False, mode='w', encoding='utf-8')
|
||
|
||
try:
|
||
with tf:
|
||
json.dump(obj, tf, ensure_ascii=False)
|
||
if sys.platform == 'win32':
|
||
# Need to remove existing file on Windows, else os.rename raises
|
||
# WindowsError or FileExistsError.
|
||
with contextlib.suppress(OSError):
|
||
os.unlink(fn)
|
||
with contextlib.suppress(OSError):
|
||
mask = os.umask(0)
|
||
os.umask(mask)
|
||
os.chmod(tf.name, 0o666 & ~mask)
|
||
os.rename(tf.name, fn)
|
||
except Exception:
|
||
with contextlib.suppress(OSError):
|
||
os.remove(tf.name)
|
||
raise
|
||
|
||
|
||
def find_xpath_attr(node, xpath, key, val=None):
|
||
""" Find the xpath xpath[@key=val] """
|
||
assert re.match(r'^[a-zA-Z_-]+$', key)
|
||
expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']")
|
||
return node.find(expr)
|
||
|
||
# On python2.6 the xml.etree.ElementTree.Element methods don't support
|
||
# the namespace parameter
|
||
|
||
|
||
def xpath_with_ns(path, ns_map):
|
||
components = [c.split(':') for c in path.split('/')]
|
||
replaced = []
|
||
for c in components:
|
||
if len(c) == 1:
|
||
replaced.append(c[0])
|
||
else:
|
||
ns, tag = c
|
||
replaced.append('{%s}%s' % (ns_map[ns], tag))
|
||
return '/'.join(replaced)
|
||
|
||
|
||
def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
|
||
def _find_xpath(xpath):
|
||
return node.find(xpath)
|
||
|
||
if isinstance(xpath, str):
|
||
n = _find_xpath(xpath)
|
||
else:
|
||
for xp in xpath:
|
||
n = _find_xpath(xp)
|
||
if n is not None:
|
||
break
|
||
|
||
if n is None:
|
||
if default is not NO_DEFAULT:
|
||
return default
|
||
elif fatal:
|
||
name = xpath if name is None else name
|
||
raise ExtractorError('Could not find XML element %s' % name)
|
||
else:
|
||
return None
|
||
return n
|
||
|
||
|
||
def xpath_text(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
|
||
n = xpath_element(node, xpath, name, fatal=fatal, default=default)
|
||
if n is None or n == default:
|
||
return n
|
||
if n.text is None:
|
||
if default is not NO_DEFAULT:
|
||
return default
|
||
elif fatal:
|
||
name = xpath if name is None else name
|
||
raise ExtractorError('Could not find XML element\'s text %s' % name)
|
||
else:
|
||
return None
|
||
return n.text
|
||
|
||
|
||
def xpath_attr(node, xpath, key, name=None, fatal=False, default=NO_DEFAULT):
|
||
n = find_xpath_attr(node, xpath, key)
|
||
if n is None:
|
||
if default is not NO_DEFAULT:
|
||
return default
|
||
elif fatal:
|
||
name = f'{xpath}[@{key}]' if name is None else name
|
||
raise ExtractorError('Could not find XML attribute %s' % name)
|
||
else:
|
||
return None
|
||
return n.attrib[key]
|
||
|
||
|
||
def get_element_by_id(id, html, **kwargs):
|
||
"""Return the content of the tag with the specified ID in the passed HTML document"""
|
||
return get_element_by_attribute('id', id, html, **kwargs)
|
||
|
||
|
||
def get_element_html_by_id(id, html, **kwargs):
|
||
"""Return the html of the tag with the specified ID in the passed HTML document"""
|
||
return get_element_html_by_attribute('id', id, html, **kwargs)
|
||
|
||
|
||
def get_element_by_class(class_name, html):
|
||
"""Return the content of the first tag with the specified class in the passed HTML document"""
|
||
retval = get_elements_by_class(class_name, html)
|
||
return retval[0] if retval else None
|
||
|
||
|
||
def get_element_html_by_class(class_name, html):
|
||
"""Return the html of the first tag with the specified class in the passed HTML document"""
|
||
retval = get_elements_html_by_class(class_name, html)
|
||
return retval[0] if retval else None
|
||
|
||
|
||
def get_element_by_attribute(attribute, value, html, **kwargs):
|
||
retval = get_elements_by_attribute(attribute, value, html, **kwargs)
|
||
return retval[0] if retval else None
|
||
|
||
|
||
def get_element_html_by_attribute(attribute, value, html, **kargs):
|
||
retval = get_elements_html_by_attribute(attribute, value, html, **kargs)
|
||
return retval[0] if retval else None
|
||
|
||
|
||
def get_elements_by_class(class_name, html, **kargs):
|
||
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
|
||
return get_elements_by_attribute(
|
||
'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
|
||
html, escape_value=False)
|
||
|
||
|
||
def get_elements_html_by_class(class_name, html):
|
||
"""Return the html of all tags with the specified class in the passed HTML document as a list"""
|
||
return get_elements_html_by_attribute(
|
||
'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name),
|
||
html, escape_value=False)
|
||
|
||
|
||
def get_elements_by_attribute(*args, **kwargs):
|
||
"""Return the content of the tag with the specified attribute in the passed HTML document"""
|
||
return [content for content, _ in get_elements_text_and_html_by_attribute(*args, **kwargs)]
|
||
|
||
|
||
def get_elements_html_by_attribute(*args, **kwargs):
|
||
"""Return the html of the tag with the specified attribute in the passed HTML document"""
|
||
return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)]
|
||
|
||
|
||
def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value=True):
|
||
"""
|
||
Return the text (content) and the html (whole) of the tag with the specified
|
||
attribute in the passed HTML document
|
||
"""
|
||
|
||
quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
|
||
|
||
value = re.escape(value) if escape_value else value
|
||
|
||
partial_element_re = rf'''(?x)
|
||
<(?P<tag>[a-zA-Z0-9:._-]+)
|
||
(?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
|
||
\s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
|
||
'''
|
||
|
||
for m in re.finditer(partial_element_re, html):
|
||
content, whole = get_element_text_and_html_by_tag(m.group('tag'), html[m.start():])
|
||
|
||
yield (
|
||
unescapeHTML(re.sub(r'^(?P<q>["\'])(?P<content>.*)(?P=q)$', r'\g<content>', content, flags=re.DOTALL)),
|
||
whole
|
||
)
|
||
|
||
|
||
class HTMLBreakOnClosingTagParser(html.parser.HTMLParser):
|
||
"""
|
||
HTML parser which raises HTMLBreakOnClosingTagException upon reaching the
|
||
closing tag for the first opening tag it has encountered, and can be used
|
||
as a context manager
|
||
"""
|
||
|
||
class HTMLBreakOnClosingTagException(Exception):
|
||
pass
|
||
|
||
def __init__(self):
|
||
self.tagstack = collections.deque()
|
||
html.parser.HTMLParser.__init__(self)
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, *_):
|
||
self.close()
|
||
|
||
def close(self):
|
||
# handle_endtag does not return upon raising HTMLBreakOnClosingTagException,
|
||
# so data remains buffered; we no longer have any interest in it, thus
|
||
# override this method to discard it
|
||
pass
|
||
|
||
def handle_starttag(self, tag, _):
|
||
self.tagstack.append(tag)
|
||
|
||
def handle_endtag(self, tag):
|
||
if not self.tagstack:
|
||
raise compat_HTMLParseError('no tags in the stack')
|
||
while self.tagstack:
|
||
inner_tag = self.tagstack.pop()
|
||
if inner_tag == tag:
|
||
break
|
||
else:
|
||
raise compat_HTMLParseError(f'matching opening tag for closing {tag} tag not found')
|
||
if not self.tagstack:
|
||
raise self.HTMLBreakOnClosingTagException()
|
||
|
||
|
||
def get_element_text_and_html_by_tag(tag, html):
|
||
"""
|
||
For the first element with the specified tag in the passed HTML document
|
||
return its' content (text) and the whole element (html)
|
||
"""
|
||
def find_or_raise(haystack, needle, exc):
|
||
try:
|
||
return haystack.index(needle)
|
||
except ValueError:
|
||
raise exc
|
||
closing_tag = f'</{tag}>'
|
||
whole_start = find_or_raise(
|
||
html, f'<{tag}', compat_HTMLParseError(f'opening {tag} tag not found'))
|
||
content_start = find_or_raise(
|
||
html[whole_start:], '>', compat_HTMLParseError(f'malformed opening {tag} tag'))
|
||
content_start += whole_start + 1
|
||
with HTMLBreakOnClosingTagParser() as parser:
|
||
parser.feed(html[whole_start:content_start])
|
||
if not parser.tagstack or parser.tagstack[0] != tag:
|
||
raise compat_HTMLParseError(f'parser did not match opening {tag} tag')
|
||
offset = content_start
|
||
while offset < len(html):
|
||
next_closing_tag_start = find_or_raise(
|
||
html[offset:], closing_tag,
|
||
compat_HTMLParseError(f'closing {tag} tag not found'))
|
||
next_closing_tag_end = next_closing_tag_start + len(closing_tag)
|
||
try:
|
||
parser.feed(html[offset:offset + next_closing_tag_end])
|
||
offset += next_closing_tag_end
|
||
except HTMLBreakOnClosingTagParser.HTMLBreakOnClosingTagException:
|
||
return html[content_start:offset + next_closing_tag_start], \
|
||
html[whole_start:offset + next_closing_tag_end]
|
||
raise compat_HTMLParseError('unexpected end of html')
|
||
|
||
|
||
class HTMLAttributeParser(html.parser.HTMLParser):
|
||
"""Trivial HTML parser to gather the attributes for a single element"""
|
||
|
||
def __init__(self):
|
||
self.attrs = {}
|
||
html.parser.HTMLParser.__init__(self)
|
||
|
||
def handle_starttag(self, tag, attrs):
|
||
self.attrs = dict(attrs)
|
||
|
||
|
||
class HTMLListAttrsParser(html.parser.HTMLParser):
|
||
"""HTML parser to gather the attributes for the elements of a list"""
|
||
|
||
def __init__(self):
|
||
html.parser.HTMLParser.__init__(self)
|
||
self.items = []
|
||
self._level = 0
|
||
|
||
def handle_starttag(self, tag, attrs):
|
||
if tag == 'li' and self._level == 0:
|
||
self.items.append(dict(attrs))
|
||
self._level += 1
|
||
|
||
def handle_endtag(self, tag):
|
||
self._level -= 1
|
||
|
||
|
||
def extract_attributes(html_element):
|
||
"""Given a string for an HTML element such as
|
||
<el
|
||
a="foo" B="bar" c="&98;az" d=boz
|
||
empty= noval entity="&"
|
||
sq='"' dq="'"
|
||
>
|
||
Decode and return a dictionary of attributes.
|
||
{
|
||
'a': 'foo', 'b': 'bar', c: 'baz', d: 'boz',
|
||
'empty': '', 'noval': None, 'entity': '&',
|
||
'sq': '"', 'dq': '\''
|
||
}.
|
||
"""
|
||
parser = HTMLAttributeParser()
|
||
with contextlib.suppress(compat_HTMLParseError):
|
||
parser.feed(html_element)
|
||
parser.close()
|
||
return parser.attrs
|
||
|
||
|
||
def parse_list(webpage):
|
||
"""Given a string for an series of HTML <li> elements,
|
||
return a dictionary of their attributes"""
|
||
parser = HTMLListAttrsParser()
|
||
parser.feed(webpage)
|
||
parser.close()
|
||
return parser.items
|
||
|
||
|
||
def clean_html(html):
|
||
"""Clean an HTML snippet into a readable string"""
|
||
|
||
if html is None: # Convenience for sanitizing descriptions etc.
|
||
return html
|
||
|
||
html = re.sub(r'\s+', ' ', html)
|
||
html = re.sub(r'(?u)\s?<\s?br\s?/?\s?>\s?', '\n', html)
|
||
html = re.sub(r'(?u)<\s?/\s?p\s?>\s?<\s?p[^>]*>', '\n', html)
|
||
# Strip html tags
|
||
html = re.sub('<.*?>', '', html)
|
||
# Replace html entities
|
||
html = unescapeHTML(html)
|
||
return html.strip()
|
||
|
||
|
||
class LenientJSONDecoder(json.JSONDecoder):
|
||
def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
|
||
self.transform_source, self.ignore_extra = transform_source, ignore_extra
|
||
super().__init__(*args, **kwargs)
|
||
|
||
def decode(self, s):
|
||
if self.transform_source:
|
||
s = self.transform_source(s)
|
||
if self.ignore_extra:
|
||
return self.raw_decode(s.lstrip())[0]
|
||
return super().decode(s)
|
||
|
||
|
||
def sanitize_open(filename, open_mode):
|
||
"""Try to open the given filename, and slightly tweak it if this fails.
|
||
|
||
Attempts to open the given filename. If this fails, it tries to change
|
||
the filename slightly, step by step, until it's either able to open it
|
||
or it fails and raises a final exception, like the standard open()
|
||
function.
|
||
|
||
It returns the tuple (stream, definitive_file_name).
|
||
"""
|
||
if filename == '-':
|
||
if sys.platform == 'win32':
|
||
import msvcrt
|
||
|
||
# stdout may be any IO stream, e.g. when using contextlib.redirect_stdout
|
||
with contextlib.suppress(io.UnsupportedOperation):
|
||
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
|
||
return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
|
||
|
||
for attempt in range(2):
|
||
try:
|
||
try:
|
||
if sys.platform == 'win32':
|
||
# FIXME: An exclusive lock also locks the file from being read.
|
||
# Since windows locks are mandatory, don't lock the file on windows (for now).
|
||
# Ref: https://github.com/yt-dlp/yt-dlp/issues/3124
|
||
raise LockingUnsupportedError()
|
||
stream = locked_file(filename, open_mode, block=False).__enter__()
|
||
except OSError:
|
||
stream = open(filename, open_mode)
|
||
return stream, filename
|
||
except OSError as err:
|
||
if attempt or err.errno in (errno.EACCES,):
|
||
raise
|
||
old_filename, filename = filename, sanitize_path(filename)
|
||
if old_filename == filename:
|
||
raise
|
||
|
||
|
||
def timeconvert(timestr):
|
||
"""Convert RFC 2822 defined time string into system timestamp"""
|
||
timestamp = None
|
||
timetuple = email.utils.parsedate_tz(timestr)
|
||
if timetuple is not None:
|
||
timestamp = email.utils.mktime_tz(timetuple)
|
||
return timestamp
|
||
|
||
|
||
def sanitize_filename(s, restricted=False, is_id=NO_DEFAULT):
|
||
"""Sanitizes a string so it could be used as part of a filename.
|
||
@param restricted Use a stricter subset of allowed characters
|
||
@param is_id Whether this is an ID that should be kept unchanged if possible.
|
||
If unset, yt-dlp's new sanitization rules are in effect
|
||
"""
|
||
if s == '':
|
||
return ''
|
||
|
||
def replace_insane(char):
|
||
if restricted and char in ACCENT_CHARS:
|
||
return ACCENT_CHARS[char]
|
||
elif not restricted and char == '\n':
|
||
return '\0 '
|
||
elif is_id is NO_DEFAULT and not restricted and char in '"*:<>?|/\\':
|
||
# Replace with their full-width unicode counterparts
|
||
return {'/': '\u29F8', '\\': '\u29f9'}.get(char, chr(ord(char) + 0xfee0))
|
||
elif char == '?' or ord(char) < 32 or ord(char) == 127:
|
||
return ''
|
||
elif char == '"':
|
||
return '' if restricted else '\''
|
||
elif char == ':':
|
||
return '\0_\0-' if restricted else '\0 \0-'
|
||
elif char in '\\/|*<>':
|
||
return '\0_'
|
||
if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace() or ord(char) > 127):
|
||
return '\0_'
|
||
return char
|
||
|
||
if restricted and is_id is NO_DEFAULT:
|
||
s = unicodedata.normalize('NFKC', s)
|
||
s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
|
||
result = ''.join(map(replace_insane, s))
|
||
if is_id is NO_DEFAULT:
|
||
result = re.sub(r'(\0.)(?:(?=\1)..)+', r'\1', result) # Remove repeated substitute chars
|
||
STRIP_RE = r'(?:\0.|[ _-])*'
|
||
result = re.sub(f'^\0.{STRIP_RE}|{STRIP_RE}\0.$', '', result) # Remove substitute chars from start/end
|
||
result = result.replace('\0', '') or '_'
|
||
|
||
if not is_id:
|
||
while '__' in result:
|
||
result = result.replace('__', '_')
|
||
result = result.strip('_')
|
||
# Common case of "Foreign band name - English song title"
|
||
if restricted and result.startswith('-_'):
|
||
result = result[2:]
|
||
if result.startswith('-'):
|
||
result = '_' + result[len('-'):]
|
||
result = result.lstrip('.')
|
||
if not result:
|
||
result = '_'
|
||
return result
|
||
|
||
|
||
def sanitize_path(s, force=False):
|
||
"""Sanitizes and normalizes path on Windows"""
|
||
if sys.platform == 'win32':
|
||
force = False
|
||
drive_or_unc, _ = os.path.splitdrive(s)
|
||
elif force:
|
||
drive_or_unc = ''
|
||
else:
|
||
return s
|
||
|
||
norm_path = os.path.normpath(remove_start(s, drive_or_unc)).split(os.path.sep)
|
||
if drive_or_unc:
|
||
norm_path.pop(0)
|
||
sanitized_path = [
|
||
path_part if path_part in ['.', '..'] else re.sub(r'(?:[/<>:"\|\\?\*]|[\s.]$)', '#', path_part)
|
||
for path_part in norm_path]
|
||
if drive_or_unc:
|
||
sanitized_path.insert(0, drive_or_unc + os.path.sep)
|
||
elif force and s and s[0] == os.path.sep:
|
||
sanitized_path.insert(0, os.path.sep)
|
||
return os.path.join(*sanitized_path)
|
||
|
||
|
||
def sanitize_url(url, *, scheme='http'):
|
||
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
|
||
# the number of unwanted failures due to missing protocol
|
||
if url is None:
|
||
return
|
||
elif url.startswith('//'):
|
||
return f'{scheme}:{url}'
|
||
# Fix some common typos seen so far
|
||
COMMON_TYPOS = (
|
||
# https://github.com/ytdl-org/youtube-dl/issues/15649
|
||
(r'^httpss://', r'https://'),
|
||
# https://bx1.be/lives/direct-tv/
|
||
(r'^rmtp([es]?)://', r'rtmp\1://'),
|
||
)
|
||
for mistake, fixup in COMMON_TYPOS:
|
||
if re.match(mistake, url):
|
||
return re.sub(mistake, fixup, url)
|
||
return url
|
||
|
||
|
||
def extract_basic_auth(url):
|
||
parts = urllib.parse.urlsplit(url)
|
||
if parts.username is None:
|
||
return url, None
|
||
url = urllib.parse.urlunsplit(parts._replace(netloc=(
|
||
parts.hostname if parts.port is None
|
||
else '%s:%d' % (parts.hostname, parts.port))))
|
||
auth_payload = base64.b64encode(
|
||
('%s:%s' % (parts.username, parts.password or '')).encode())
|
||
return url, f'Basic {auth_payload.decode()}'
|
||
|
||
|
||
def sanitized_Request(url, *args, **kwargs):
|
||
url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
|
||
if auth_header is not None:
|
||
headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
|
||
headers['Authorization'] = auth_header
|
||
return urllib.request.Request(url, *args, **kwargs)
|
||
|
||
|
||
def expand_path(s):
|
||
"""Expand shell variables and ~"""
|
||
return os.path.expandvars(compat_expanduser(s))
|
||
|
||
|
||
def orderedSet(iterable, *, lazy=False):
|
||
"""Remove all duplicates from the input iterable"""
|
||
def _iter():
|
||
seen = [] # Do not use set since the items can be unhashable
|
||
for x in iterable:
|
||
if x not in seen:
|
||
seen.append(x)
|
||
yield x
|
||
|
||
return _iter() if lazy else list(_iter())
|
||
|
||
|
||
def _htmlentity_transform(entity_with_semicolon):
|
||
"""Transforms an HTML entity to a character."""
|
||
entity = entity_with_semicolon[:-1]
|
||
|
||
# Known non-numeric HTML entity
|
||
if entity in html.entities.name2codepoint:
|
||
return chr(html.entities.name2codepoint[entity])
|
||
|
||
# TODO: HTML5 allows entities without a semicolon.
|
||
# E.g. 'Éric' should be decoded as 'Éric'.
|
||
if entity_with_semicolon in html.entities.html5:
|
||
return html.entities.html5[entity_with_semicolon]
|
||
|
||
mobj = re.match(r'#(x[0-9a-fA-F]+|[0-9]+)', entity)
|
||
if mobj is not None:
|
||
numstr = mobj.group(1)
|
||
if numstr.startswith('x'):
|
||
base = 16
|
||
numstr = '0%s' % numstr
|
||
else:
|
||
base = 10
|
||
# See https://github.com/ytdl-org/youtube-dl/issues/7518
|
||
with contextlib.suppress(ValueError):
|
||
return chr(int(numstr, base))
|
||
|
||
# Unknown entity in name, return its literal representation
|
||
return '&%s;' % entity
|
||
|
||
|
||
def unescapeHTML(s):
|
||
if s is None:
|
||
return None
|
||
assert isinstance(s, str)
|
||
|
||
return re.sub(
|
||
r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
|
||
|
||
|
||
def escapeHTML(text):
|
||
return (
|
||
text
|
||
.replace('&', '&')
|
||
.replace('<', '<')
|
||
.replace('>', '>')
|
||
.replace('"', '"')
|
||
.replace("'", ''')
|
||
)
|
||
|
||
|
||
def process_communicate_or_kill(p, *args, **kwargs):
|
||
write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
|
||
'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
|
||
return Popen.communicate_or_kill(p, *args, **kwargs)
|
||
|
||
|
||
class Popen(subprocess.Popen):
|
||
if sys.platform == 'win32':
|
||
_startupinfo = subprocess.STARTUPINFO()
|
||
_startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||
else:
|
||
_startupinfo = None
|
||
|
||
def __init__(self, *args, text=False, **kwargs):
|
||
if text is True:
|
||
kwargs['universal_newlines'] = True # For 3.6 compatibility
|
||
kwargs.setdefault('encoding', 'utf-8')
|
||
kwargs.setdefault('errors', 'replace')
|
||
super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
|
||
|
||
def communicate_or_kill(self, *args, **kwargs):
|
||
try:
|
||
return self.communicate(*args, **kwargs)
|
||
except BaseException: # Including KeyboardInterrupt
|
||
self.kill(timeout=None)
|
||
raise
|
||
|
||
def kill(self, *, timeout=0):
|
||
super().kill()
|
||
if timeout != 0:
|
||
self.wait(timeout=timeout)
|
||
|
||
@classmethod
|
||
def run(cls, *args, **kwargs):
|
||
with cls(*args, **kwargs) as proc:
|
||
stdout, stderr = proc.communicate_or_kill()
|
||
return stdout or '', stderr or '', proc.returncode
|
||
|
||
|
||
def get_subprocess_encoding():
|
||
if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
|
||
# For subprocess calls, encode with locale encoding
|
||
# Refer to http://stackoverflow.com/a/9951851/35070
|
||
encoding = preferredencoding()
|
||
else:
|
||
encoding = sys.getfilesystemencoding()
|
||
if encoding is None:
|
||
encoding = 'utf-8'
|
||
return encoding
|
||
|
||
|
||
def encodeFilename(s, for_subprocess=False):
|
||
assert isinstance(s, str)
|
||
return s
|
||
|
||
|
||
def decodeFilename(b, for_subprocess=False):
|
||
return b
|
||
|
||
|
||
def encodeArgument(s):
|
||
# Legacy code that uses byte strings
|
||
# Uncomment the following line after fixing all post processors
|
||
# assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, str, type(s))
|
||
return s if isinstance(s, str) else s.decode('ascii')
|
||
|
||
|
||
def decodeArgument(b):
|
||
return b
|
||
|
||
|
||
def decodeOption(optval):
|
||
if optval is None:
|
||
return optval
|
||
if isinstance(optval, bytes):
|
||
optval = optval.decode(preferredencoding())
|
||
|
||
assert isinstance(optval, str)
|
||
return optval
|
||
|
||
|
||
_timetuple = collections.namedtuple('Time', ('hours', 'minutes', 'seconds', 'milliseconds'))
|
||
|
||
|
||
def timetuple_from_msec(msec):
|
||
secs, msec = divmod(msec, 1000)
|
||
mins, secs = divmod(secs, 60)
|
||
hrs, mins = divmod(mins, 60)
|
||
return _timetuple(hrs, mins, secs, msec)
|
||
|
||
|
||
def formatSeconds(secs, delim=':', msec=False):
|
||
time = timetuple_from_msec(secs * 1000)
|
||
if time.hours:
|
||
ret = '%d%s%02d%s%02d' % (time.hours, delim, time.minutes, delim, time.seconds)
|
||
elif time.minutes:
|
||
ret = '%d%s%02d' % (time.minutes, delim, time.seconds)
|
||
else:
|
||
ret = '%d' % time.seconds
|
||
return '%s.%03d' % (ret, time.milliseconds) if msec else ret
|
||
|
||
|
||
def _ssl_load_windows_store_certs(ssl_context, storename):
|
||
# Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
|
||
try:
|
||
certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
|
||
if encoding == 'x509_asn' and (
|
||
trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
|
||
except PermissionError:
|
||
return
|
||
for cert in certs:
|
||
with contextlib.suppress(ssl.SSLError):
|
||
ssl_context.load_verify_locations(cadata=cert)
|
||
|
||
|
||
def make_HTTPS_handler(params, **kwargs):
|
||
opts_check_certificate = not params.get('nocheckcertificate')
|
||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||
context.check_hostname = opts_check_certificate
|
||
if params.get('legacyserverconnect'):
|
||
context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
|
||
# Allow use of weaker ciphers in Python 3.10+. See https://bugs.python.org/issue43998
|
||
context.set_ciphers('DEFAULT')
|
||
|
||
context.verify_mode = ssl.CERT_REQUIRED if opts_check_certificate else ssl.CERT_NONE
|
||
if opts_check_certificate:
|
||
if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
|
||
context.load_verify_locations(cafile=certifi.where())
|
||
else:
|
||
try:
|
||
context.load_default_certs()
|
||
# Work around the issue in load_default_certs when there are bad certificates. See:
|
||
# https://github.com/yt-dlp/yt-dlp/issues/1060,
|
||
# https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
|
||
except ssl.SSLError:
|
||
# enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
|
||
if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
|
||
for storename in ('CA', 'ROOT'):
|
||
_ssl_load_windows_store_certs(context, storename)
|
||
context.set_default_verify_paths()
|
||
|
||
client_certfile = params.get('client_certificate')
|
||
if client_certfile:
|
||
try:
|
||
context.load_cert_chain(
|
||
client_certfile, keyfile=params.get('client_certificate_key'),
|
||
password=params.get('client_certificate_password'))
|
||
except ssl.SSLError:
|
||
raise YoutubeDLError('Unable to load client certificate')
|
||
|
||
# Some servers may reject requests if ALPN extension is not sent. See:
|
||
# https://github.com/python/cpython/issues/85140
|
||
# https://github.com/yt-dlp/yt-dlp/issues/3878
|
||
with contextlib.suppress(NotImplementedError):
|
||
context.set_alpn_protocols(['http/1.1'])
|
||
|
||
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
|
||
|
||
|
||
def bug_reports_message(before=';'):
|
||
from .update import REPOSITORY
|
||
|
||
msg = (f'please report this issue on https://github.com/{REPOSITORY}/issues?q= , '
|
||
'filling out the appropriate issue template. Confirm you are on the latest version using yt-dlp -U')
|
||
|
||
before = before.rstrip()
|
||
if not before or before.endswith(('.', '!', '?')):
|
||
msg = msg[0].title() + msg[1:]
|
||
|
||
return (before + ' ' if before else '') + msg
|
||
|
||
|
||
class YoutubeDLError(Exception):
|
||
"""Base exception for YoutubeDL errors."""
|
||
msg = None
|
||
|
||
def __init__(self, msg=None):
|
||
if msg is not None:
|
||
self.msg = msg
|
||
elif self.msg is None:
|
||
self.msg = type(self).__name__
|
||
super().__init__(self.msg)
|
||
|
||
|
||
network_exceptions = [urllib.error.URLError, http.client.HTTPException, socket.error]
|
||
if hasattr(ssl, 'CertificateError'):
|
||
network_exceptions.append(ssl.CertificateError)
|
||
network_exceptions = tuple(network_exceptions)
|
||
|
||
|
||
class ExtractorError(YoutubeDLError):
|
||
"""Error during info extraction."""
|
||
|
||
def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=None):
|
||
""" tb, if given, is the original traceback (so that it can be printed out).
|
||
If expected is set, this is a normal error message and most likely not a bug in yt-dlp.
|
||
"""
|
||
if sys.exc_info()[0] in network_exceptions:
|
||
expected = True
|
||
|
||
self.orig_msg = str(msg)
|
||
self.traceback = tb
|
||
self.expected = expected
|
||
self.cause = cause
|
||
self.video_id = video_id
|
||
self.ie = ie
|
||
self.exc_info = sys.exc_info() # preserve original exception
|
||
if isinstance(self.exc_info[1], ExtractorError):
|
||
self.exc_info = self.exc_info[1].exc_info
|
||
|
||
super().__init__(''.join((
|
||
format_field(ie, None, '[%s] '),
|
||
format_field(video_id, None, '%s: '),
|
||
msg,
|
||
format_field(cause, None, ' (caused by %r)'),
|
||
'' if expected else bug_reports_message())))
|
||
|
||
def format_traceback(self):
|
||
return join_nonempty(
|
||
self.traceback and ''.join(traceback.format_tb(self.traceback)),
|
||
self.cause and ''.join(traceback.format_exception(None, self.cause, self.cause.__traceback__)[1:]),
|
||
delim='\n') or None
|
||
|
||
|
||
class UnsupportedError(ExtractorError):
|
||
def __init__(self, url):
|
||
super().__init__(
|
||
'Unsupported URL: %s' % url, expected=True)
|
||
self.url = url
|
||
|
||
|
||
class RegexNotFoundError(ExtractorError):
|
||
"""Error when a regex didn't match"""
|
||
pass
|
||
|
||
|
||
class GeoRestrictedError(ExtractorError):
|
||
"""Geographic restriction Error exception.
|
||
|
||
This exception may be thrown when a video is not available from your
|
||
geographic location due to geographic restrictions imposed by a website.
|
||
"""
|
||
|
||
def __init__(self, msg, countries=None, **kwargs):
|
||
kwargs['expected'] = True
|
||
super().__init__(msg, **kwargs)
|
||
self.countries = countries
|
||
|
||
|
||
class UserNotLive(ExtractorError):
|
||
"""Error when a channel/user is not live"""
|
||
|
||
def __init__(self, msg=None, **kwargs):
|
||
kwargs['expected'] = True
|
||
super().__init__(msg or 'The channel is not currently live', **kwargs)
|
||
|
||
|
||
class DownloadError(YoutubeDLError):
|
||
"""Download Error exception.
|
||
|
||
This exception may be thrown by FileDownloader objects if they are not
|
||
configured to continue on errors. They will contain the appropriate
|
||
error message.
|
||
"""
|
||
|
||
def __init__(self, msg, exc_info=None):
|
||
""" exc_info, if given, is the original exception that caused the trouble (as returned by sys.exc_info()). """
|
||
super().__init__(msg)
|
||
self.exc_info = exc_info
|
||
|
||
|
||
class EntryNotInPlaylist(YoutubeDLError):
|
||
"""Entry not in playlist exception.
|
||
|
||
This exception will be thrown by YoutubeDL when a requested entry
|
||
is not found in the playlist info_dict
|
||
"""
|
||
msg = 'Entry not found in info'
|
||
|
||
|
||
class SameFileError(YoutubeDLError):
|
||
"""Same File exception.
|
||
|
||
This exception will be thrown by FileDownloader objects if they detect
|
||
multiple files would have to be downloaded to the same file on disk.
|
||
"""
|
||
msg = 'Fixed output name but more than one file to download'
|
||
|
||
def __init__(self, filename=None):
|
||
if filename is not None:
|
||
self.msg += f': {filename}'
|
||
super().__init__(self.msg)
|
||
|
||
|
||
class PostProcessingError(YoutubeDLError):
|
||
"""Post Processing exception.
|
||
|
||
This exception may be raised by PostProcessor's .run() method to
|
||
indicate an error in the postprocessing task.
|
||
"""
|
||
|
||
|
||
class DownloadCancelled(YoutubeDLError):
|
||
""" Exception raised when the download queue should be interrupted """
|
||
msg = 'The download was cancelled'
|
||
|
||
|
||
class ExistingVideoReached(DownloadCancelled):
|
||
""" --break-on-existing triggered """
|
||
msg = 'Encountered a video that is already in the archive, stopping due to --break-on-existing'
|
||
|
||
|
||
class RejectedVideoReached(DownloadCancelled):
|
||
""" --break-on-reject triggered """
|
||
msg = 'Encountered a video that did not match filter, stopping due to --break-on-reject'
|
||
|
||
|
||
class MaxDownloadsReached(DownloadCancelled):
|
||
""" --max-downloads limit has been reached. """
|
||
msg = 'Maximum number of downloads reached, stopping due to --max-downloads'
|
||
|
||
|
||
class ReExtractInfo(YoutubeDLError):
|
||
""" Video info needs to be re-extracted. """
|
||
|
||
def __init__(self, msg, expected=False):
|
||
super().__init__(msg)
|
||
self.expected = expected
|
||
|
||
|
||
class ThrottledDownload(ReExtractInfo):
|
||
""" Download speed below --throttled-rate. """
|
||
msg = 'The download speed is below throttle limit'
|
||
|
||
def __init__(self):
|
||
super().__init__(self.msg, expected=False)
|
||
|
||
|
||
class UnavailableVideoError(YoutubeDLError):
|
||
"""Unavailable Format exception.
|
||
|
||
This exception will be thrown when a video is requested
|
||
in a format that is not available for that video.
|
||
"""
|
||
msg = 'Unable to download video'
|
||
|
||
def __init__(self, err=None):
|
||
if err is not None:
|
||
self.msg += f': {err}'
|
||
super().__init__(self.msg)
|
||
|
||
|
||
class ContentTooShortError(YoutubeDLError):
|
||
"""Content Too Short exception.
|
||
|
||
This exception may be raised by FileDownloader objects when a file they
|
||
download is too small for what the server announced first, indicating
|
||
the connection was probably interrupted.
|
||
"""
|
||
|
||
def __init__(self, downloaded, expected):
|
||
super().__init__(f'Downloaded {downloaded} bytes, expected {expected} bytes')
|
||
# Both in bytes
|
||
self.downloaded = downloaded
|
||
self.expected = expected
|
||
|
||
|
||
class XAttrMetadataError(YoutubeDLError):
|
||
def __init__(self, code=None, msg='Unknown error'):
|
||
super().__init__(msg)
|
||
self.code = code
|
||
self.msg = msg
|
||
|
||
# Parsing code and msg
|
||
if (self.code in (errno.ENOSPC, errno.EDQUOT)
|
||
or 'No space left' in self.msg or 'Disk quota exceeded' in self.msg):
|
||
self.reason = 'NO_SPACE'
|
||
elif self.code == errno.E2BIG or 'Argument list too long' in self.msg:
|
||
self.reason = 'VALUE_TOO_LONG'
|
||
else:
|
||
self.reason = 'NOT_SUPPORTED'
|
||
|
||
|
||
class XAttrUnavailableError(YoutubeDLError):
|
||
pass
|
||
|
||
|
||
def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
|
||
hc = http_class(*args, **kwargs)
|
||
source_address = ydl_handler._params.get('source_address')
|
||
|
||
if source_address is not None:
|
||
# This is to workaround _create_connection() from socket where it will try all
|
||
# address data from getaddrinfo() including IPv6. This filters the result from
|
||
# getaddrinfo() based on the source_address value.
|
||
# This is based on the cpython socket.create_connection() function.
|
||
# https://github.com/python/cpython/blob/master/Lib/socket.py#L691
|
||
def _create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
|
||
host, port = address
|
||
err = None
|
||
addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
|
||
af = socket.AF_INET if '.' in source_address[0] else socket.AF_INET6
|
||
ip_addrs = [addr for addr in addrs if addr[0] == af]
|
||
if addrs and not ip_addrs:
|
||
ip_version = 'v4' if af == socket.AF_INET else 'v6'
|
||
raise OSError(
|
||
"No remote IP%s addresses available for connect, can't use '%s' as source address"
|
||
% (ip_version, source_address[0]))
|
||
for res in ip_addrs:
|
||
af, socktype, proto, canonname, sa = res
|
||
sock = None
|
||
try:
|
||
sock = socket.socket(af, socktype, proto)
|
||
if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
|
||
sock.settimeout(timeout)
|
||
sock.bind(source_address)
|
||
sock.connect(sa)
|
||
err = None # Explicitly break reference cycle
|
||
return sock
|
||
except OSError as _:
|
||
err = _
|
||
if sock is not None:
|
||
sock.close()
|
||
if err is not None:
|
||
raise err
|
||
else:
|
||
raise OSError('getaddrinfo returns an empty list')
|
||
if hasattr(hc, '_create_connection'):
|
||
hc._create_connection = _create_connection
|
||
hc.source_address = (source_address, 0)
|
||
|
||
return hc
|
||
|
||
|
||
def handle_youtubedl_headers(headers):
|
||
filtered_headers = headers
|
||
|
||
if 'Youtubedl-no-compression' in filtered_headers:
|
||
filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
|
||
del filtered_headers['Youtubedl-no-compression']
|
||
|
||
return filtered_headers
|
||
|
||
|
||
class YoutubeDLHandler(urllib.request.HTTPHandler):
|
||
"""Handler for HTTP requests and responses.
|
||
|
||
This class, when installed with an OpenerDirector, automatically adds
|
||
the standard headers to every HTTP request and handles gzipped and
|
||
deflated responses from web servers. If compression is to be avoided in
|
||
a particular request, the original request in the program code only has
|
||
to include the HTTP header "Youtubedl-no-compression", which will be
|
||
removed before making the real request.
|
||
|
||
Part of this code was copied from:
|
||
|
||
http://techknack.net/python-urllib2-handlers/
|
||
|
||
Andrew Rowls, the author of that code, agreed to release it to the
|
||
public domain.
|
||
"""
|
||
|
||
def __init__(self, params, *args, **kwargs):
|
||
urllib.request.HTTPHandler.__init__(self, *args, **kwargs)
|
||
self._params = params
|
||
|
||
def http_open(self, req):
|
||
conn_class = http.client.HTTPConnection
|
||
|
||
socks_proxy = req.headers.get('Ytdl-socks-proxy')
|
||
if socks_proxy:
|
||
conn_class = make_socks_conn_class(conn_class, socks_proxy)
|
||
del req.headers['Ytdl-socks-proxy']
|
||
|
||
return self.do_open(functools.partial(
|
||
_create_http_connection, self, conn_class, False),
|
||
req)
|
||
|
||
@staticmethod
|
||
def deflate(data):
|
||
if not data:
|
||
return data
|
||
try:
|
||
return zlib.decompress(data, -zlib.MAX_WBITS)
|
||
except zlib.error:
|
||
return zlib.decompress(data)
|
||
|
||
@staticmethod
|
||
def brotli(data):
|
||
if not data:
|
||
return data
|
||
return brotli.decompress(data)
|
||
|
||
def http_request(self, req):
|
||
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
|
||
# always respected by websites, some tend to give out URLs with non percent-encoded
|
||
# non-ASCII characters (see telemb.py, ard.py [#3412])
|
||
# urllib chokes on URLs with non-ASCII characters (see http://bugs.python.org/issue3991)
|
||
# To work around aforementioned issue we will replace request's original URL with
|
||
# percent-encoded one
|
||
# Since redirects are also affected (e.g. http://www.southpark.de/alle-episoden/s18e09)
|
||
# the code of this workaround has been moved here from YoutubeDL.urlopen()
|
||
url = req.get_full_url()
|
||
url_escaped = escape_url(url)
|
||
|
||
# Substitute URL if any change after escaping
|
||
if url != url_escaped:
|
||
req = update_Request(req, url=url_escaped)
|
||
|
||
for h, v in self._params.get('http_headers', std_headers).items():
|
||
# Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
|
||
# The dict keys are capitalized because of this bug by urllib
|
||
if h.capitalize() not in req.headers:
|
||
req.add_header(h, v)
|
||
|
||
if 'Accept-encoding' not in req.headers:
|
||
req.add_header('Accept-encoding', ', '.join(SUPPORTED_ENCODINGS))
|
||
|
||
req.headers = handle_youtubedl_headers(req.headers)
|
||
|
||
return super().do_request_(req)
|
||
|
||
def http_response(self, req, resp):
|
||
old_resp = resp
|
||
# gzip
|
||
if resp.headers.get('Content-encoding', '') == 'gzip':
|
||
content = resp.read()
|
||
gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
|
||
try:
|
||
uncompressed = io.BytesIO(gz.read())
|
||
except OSError as original_ioerror:
|
||
# There may be junk add the end of the file
|
||
# See http://stackoverflow.com/q/4928560/35070 for details
|
||
for i in range(1, 1024):
|
||
try:
|
||
gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
|
||
uncompressed = io.BytesIO(gz.read())
|
||
except OSError:
|
||
continue
|
||
break
|
||
else:
|
||
raise original_ioerror
|
||
resp = urllib.request.addinfourl(uncompressed, old_resp.headers, old_resp.url, old_resp.code)
|
||
resp.msg = old_resp.msg
|
||
del resp.headers['Content-encoding']
|
||
# deflate
|
||
if resp.headers.get('Content-encoding', '') == 'deflate':
|
||
gz = io.BytesIO(self.deflate(resp.read()))
|
||
resp = urllib.request.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
|
||
resp.msg = old_resp.msg
|
||
del resp.headers['Content-encoding']
|
||
# brotli
|
||
if resp.headers.get('Content-encoding', '') == 'br':
|
||
resp = urllib.request.addinfourl(
|
||
io.BytesIO(self.brotli(resp.read())), old_resp.headers, old_resp.url, old_resp.code)
|
||
resp.msg = old_resp.msg
|
||
del resp.headers['Content-encoding']
|
||
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
|
||
# https://github.com/ytdl-org/youtube-dl/issues/6457).
|
||
if 300 <= resp.code < 400:
|
||
location = resp.headers.get('Location')
|
||
if location:
|
||
# As of RFC 2616 default charset is iso-8859-1 that is respected by python 3
|
||
location = location.encode('iso-8859-1').decode()
|
||
location_escaped = escape_url(location)
|
||
if location != location_escaped:
|
||
del resp.headers['Location']
|
||
resp.headers['Location'] = location_escaped
|
||
return resp
|
||
|
||
https_request = http_request
|
||
https_response = http_response
|
||
|
||
|
||
def make_socks_conn_class(base_class, socks_proxy):
|
||
assert issubclass(base_class, (
|
||
http.client.HTTPConnection, http.client.HTTPSConnection))
|
||
|
||
url_components = urllib.parse.urlparse(socks_proxy)
|
||
if url_components.scheme.lower() == 'socks5':
|
||
socks_type = ProxyType.SOCKS5
|
||
elif url_components.scheme.lower() in ('socks', 'socks4'):
|
||
socks_type = ProxyType.SOCKS4
|
||
elif url_components.scheme.lower() == 'socks4a':
|
||
socks_type = ProxyType.SOCKS4A
|
||
|
||
def unquote_if_non_empty(s):
|
||
if not s:
|
||
return s
|
||
return urllib.parse.unquote_plus(s)
|
||
|
||
proxy_args = (
|
||
socks_type,
|
||
url_components.hostname, url_components.port or 1080,
|
||
True, # Remote DNS
|
||
unquote_if_non_empty(url_components.username),
|
||
unquote_if_non_empty(url_components.password),
|
||
)
|
||
|
||
class SocksConnection(base_class):
|
||
def connect(self):
|
||
self.sock = sockssocket()
|
||
self.sock.setproxy(*proxy_args)
|
||
if isinstance(self.timeout, (int, float)):
|
||
self.sock.settimeout(self.timeout)
|
||
self.sock.connect((self.host, self.port))
|
||
|
||
if isinstance(self, http.client.HTTPSConnection):
|
||
if hasattr(self, '_context'): # Python > 2.6
|
||
self.sock = self._context.wrap_socket(
|
||
self.sock, server_hostname=self.host)
|
||
else:
|
||
self.sock = ssl.wrap_socket(self.sock)
|
||
|
||
return SocksConnection
|
||
|
||
|
||
class YoutubeDLHTTPSHandler(urllib.request.HTTPSHandler):
|
||
def __init__(self, params, https_conn_class=None, *args, **kwargs):
|
||
urllib.request.HTTPSHandler.__init__(self, *args, **kwargs)
|
||
self._https_conn_class = https_conn_class or http.client.HTTPSConnection
|
||
self._params = params
|
||
|
||
def https_open(self, req):
|
||
kwargs = {}
|
||
conn_class = self._https_conn_class
|
||
|
||
if hasattr(self, '_context'): # python > 2.6
|
||
kwargs['context'] = self._context
|
||
if hasattr(self, '_check_hostname'): # python 3.x
|
||
kwargs['check_hostname'] = self._check_hostname
|
||
|
||
socks_proxy = req.headers.get('Ytdl-socks-proxy')
|
||
if socks_proxy:
|
||
conn_class = make_socks_conn_class(conn_class, socks_proxy)
|
||
del req.headers['Ytdl-socks-proxy']
|
||
|
||
try:
|
||
return self.do_open(
|
||
functools.partial(_create_http_connection, self, conn_class, True), req, **kwargs)
|
||
except urllib.error.URLError as e:
|
||
if (isinstance(e.reason, ssl.SSLError)
|
||
and getattr(e.reason, 'reason', None) == 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
|
||
raise YoutubeDLError('SSLV3_ALERT_HANDSHAKE_FAILURE: Try using --legacy-server-connect')
|
||
raise
|
||
|
||
|
||
class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
|
||
"""
|
||
See [1] for cookie file format.
|
||
|
||
1. https://curl.haxx.se/docs/http-cookies.html
|
||
"""
|
||
_HTTPONLY_PREFIX = '#HttpOnly_'
|
||
_ENTRY_LEN = 7
|
||
_HEADER = '''# Netscape HTTP Cookie File
|
||
# This file is generated by yt-dlp. Do not edit.
|
||
|
||
'''
|
||
_CookieFileEntry = collections.namedtuple(
|
||
'CookieFileEntry',
|
||
('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
|
||
|
||
def __init__(self, filename=None, *args, **kwargs):
|
||
super().__init__(None, *args, **kwargs)
|
||
if self.is_path(filename):
|
||
filename = os.fspath(filename)
|
||
self.filename = filename
|
||
|
||
@staticmethod
|
||
def _true_or_false(cndn):
|
||
return 'TRUE' if cndn else 'FALSE'
|
||
|
||
@staticmethod
|
||
def is_path(file):
|
||
return isinstance(file, (str, bytes, os.PathLike))
|
||
|
||
@contextlib.contextmanager
|
||
def open(self, file, *, write=False):
|
||
if self.is_path(file):
|
||
with open(file, 'w' if write else 'r', encoding='utf-8') as f:
|
||
yield f
|
||
else:
|
||
if write:
|
||
file.truncate(0)
|
||
yield file
|
||
|
||
def _really_save(self, f, ignore_discard=False, ignore_expires=False):
|
||
now = time.time()
|
||
for cookie in self:
|
||
if (not ignore_discard and cookie.discard
|
||
or not ignore_expires and cookie.is_expired(now)):
|
||
continue
|
||
name, value = cookie.name, cookie.value
|
||
if value is None:
|
||
# cookies.txt regards 'Set-Cookie: foo' as a cookie
|
||
# with no name, whereas http.cookiejar regards it as a
|
||
# cookie with no value.
|
||
name, value = '', name
|
||
f.write('%s\n' % '\t'.join((
|
||
cookie.domain,
|
||
self._true_or_false(cookie.domain.startswith('.')),
|
||
cookie.path,
|
||
self._true_or_false(cookie.secure),
|
||
str_or_none(cookie.expires, default=''),
|
||
name, value
|
||
)))
|
||
|
||
def save(self, filename=None, *args, **kwargs):
|
||
"""
|
||
Save cookies to a file.
|
||
Code is taken from CPython 3.6
|
||
https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
|
||
|
||
if filename is None:
|
||
if self.filename is not None:
|
||
filename = self.filename
|
||
else:
|
||
raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
|
||
|
||
# Store session cookies with `expires` set to 0 instead of an empty string
|
||
for cookie in self:
|
||
if cookie.expires is None:
|
||
cookie.expires = 0
|
||
|
||
with self.open(filename, write=True) as f:
|
||
f.write(self._HEADER)
|
||
self._really_save(f, *args, **kwargs)
|
||
|
||
def load(self, filename=None, ignore_discard=False, ignore_expires=False):
|
||
"""Load cookies from a file."""
|
||
if filename is None:
|
||
if self.filename is not None:
|
||
filename = self.filename
|
||
else:
|
||
raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
|
||
|
||
def prepare_line(line):
|
||
if line.startswith(self._HTTPONLY_PREFIX):
|
||
line = line[len(self._HTTPONLY_PREFIX):]
|
||
# comments and empty lines are fine
|
||
if line.startswith('#') or not line.strip():
|
||
return line
|
||
cookie_list = line.split('\t')
|
||
if len(cookie_list) != self._ENTRY_LEN:
|
||
raise http.cookiejar.LoadError('invalid length %d' % len(cookie_list))
|
||
cookie = self._CookieFileEntry(*cookie_list)
|
||
if cookie.expires_at and not cookie.expires_at.isdigit():
|
||
raise http.cookiejar.LoadError('invalid expires at %s' % cookie.expires_at)
|
||
return line
|
||
|
||
cf = io.StringIO()
|
||
with self.open(filename) as f:
|
||
for line in f:
|
||
try:
|
||
cf.write(prepare_line(line))
|
||
except http.cookiejar.LoadError as e:
|
||
if f'{line.strip()} '[0] in '[{"':
|
||
raise http.cookiejar.LoadError(
|
||
'Cookies file must be Netscape formatted, not JSON. See '
|
||
'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
|
||
write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
|
||
continue
|
||
cf.seek(0)
|
||
self._really_load(cf, filename, ignore_discard, ignore_expires)
|
||
# Session cookies are denoted by either `expires` field set to
|
||
# an empty string or 0. MozillaCookieJar only recognizes the former
|
||
# (see [1]). So we need force the latter to be recognized as session
|
||
# cookies on our own.
|
||
# Session cookies may be important for cookies-based authentication,
|
||
# e.g. usually, when user does not check 'Remember me' check box while
|
||
# logging in on a site, some important cookies are stored as session
|
||
# cookies so that not recognizing them will result in failed login.
|
||
# 1. https://bugs.python.org/issue17164
|
||
for cookie in self:
|
||
# Treat `expires=0` cookies as session cookies
|
||
if cookie.expires == 0:
|
||
cookie.expires = None
|
||
cookie.discard = True
|
||
|
||
|
||
class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
|
||
def __init__(self, cookiejar=None):
|
||
urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
|
||
|
||
def http_response(self, request, response):
|
||
return urllib.request.HTTPCookieProcessor.http_response(self, request, response)
|
||
|
||
https_request = urllib.request.HTTPCookieProcessor.http_request
|
||
https_response = http_response
|
||
|
||
|
||
class YoutubeDLRedirectHandler(urllib.request.HTTPRedirectHandler):
|
||
"""YoutubeDL redirect handler
|
||
|
||
The code is based on HTTPRedirectHandler implementation from CPython [1].
|
||
|
||
This redirect handler solves two issues:
|
||
- ensures redirect URL is always unicode under python 2
|
||
- introduces support for experimental HTTP response status code
|
||
308 Permanent Redirect [2] used by some sites [3]
|
||
|
||
1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py
|
||
2. https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/308
|
||
3. https://github.com/ytdl-org/youtube-dl/issues/28768
|
||
"""
|
||
|
||
http_error_301 = http_error_303 = http_error_307 = http_error_308 = urllib.request.HTTPRedirectHandler.http_error_302
|
||
|
||
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||
"""Return a Request or None in response to a redirect.
|
||
|
||
This is called by the http_error_30x methods when a
|
||
redirection response is received. If a redirection should
|
||
take place, return a new Request to allow http_error_30x to
|
||
perform the redirect. Otherwise, raise HTTPError if no-one
|
||
else should try to handle this url. Return None if you can't
|
||
but another Handler might.
|
||
"""
|
||
m = req.get_method()
|
||
if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD")
|
||
or code in (301, 302, 303) and m == "POST")):
|
||
raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
|
||
# Strictly (according to RFC 2616), 301 or 302 in response to
|
||
# a POST MUST NOT cause a redirection without confirmation
|
||
# from the user (of urllib.request, in this case). In practice,
|
||
# essentially all clients do redirect in this case, so we do
|
||
# the same.
|
||
|
||
# Be conciliant with URIs containing a space. This is mainly
|
||
# redundant with the more complete encoding done in http_error_302(),
|
||
# but it is kept for compatibility with other callers.
|
||
newurl = newurl.replace(' ', '%20')
|
||
|
||
CONTENT_HEADERS = ("content-length", "content-type")
|
||
# NB: don't use dict comprehension for python 2.6 compatibility
|
||
newheaders = {k: v for k, v in req.headers.items() if k.lower() not in CONTENT_HEADERS}
|
||
|
||
# A 303 must either use GET or HEAD for subsequent request
|
||
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
|
||
if code == 303 and m != 'HEAD':
|
||
m = 'GET'
|
||
# 301 and 302 redirects are commonly turned into a GET from a POST
|
||
# for subsequent requests by browsers, so we'll do the same.
|
||
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
|
||
# https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
|
||
if code in (301, 302) and m == 'POST':
|
||
m = 'GET'
|
||
|
||
return urllib.request.Request(
|
||
newurl, headers=newheaders, origin_req_host=req.origin_req_host,
|
||
unverifiable=True, method=m)
|
||
|
||
|
||
def extract_timezone(date_str):
|
||
m = re.search(
|
||
r'''(?x)
|
||
^.{8,}? # >=8 char non-TZ prefix, if present
|
||
(?P<tz>Z| # just the UTC Z, or
|
||
(?:(?<=.\b\d{4}|\b\d{2}:\d\d)| # preceded by 4 digits or hh:mm or
|
||
(?<!.\b[a-zA-Z]{3}|[a-zA-Z]{4}|..\b\d\d)) # not preceded by 3 alpha word or >= 4 alpha or 2 digits
|
||
[ ]? # optional space
|
||
(?P<sign>\+|-) # +/-
|
||
(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2}) # hh[:]mm
|
||
$)
|
||
''', date_str)
|
||
if not m:
|
||
m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P<tz>\s*[A-Z]+)$', date_str)
|
||
timezone = TIMEZONE_NAMES.get(m and m.group('tz').strip())
|
||
if timezone is not None:
|
||
date_str = date_str[:-len(m.group('tz'))]
|
||
timezone = datetime.timedelta(hours=timezone or 0)
|
||
else:
|
||
date_str = date_str[:-len(m.group('tz'))]
|
||
if not m.group('sign'):
|
||
timezone = datetime.timedelta()
|
||
else:
|
||
sign = 1 if m.group('sign') == '+' else -1
|
||
timezone = datetime.timedelta(
|
||
hours=sign * int(m.group('hours')),
|
||
minutes=sign * int(m.group('minutes')))
|
||
return timezone, date_str
|
||
|
||
|
||
def parse_iso8601(date_str, delimiter='T', timezone=None):
|
||
""" Return a UNIX timestamp from the given date """
|
||
|
||
if date_str is None:
|
||
return None
|
||
|
||
date_str = re.sub(r'\.[0-9]+', '', date_str)
|
||
|
||
if timezone is None:
|
||
timezone, date_str = extract_timezone(date_str)
|
||
|
||
with contextlib.suppress(ValueError):
|
||
date_format = f'%Y-%m-%d{delimiter}%H:%M:%S'
|
||
dt = datetime.datetime.strptime(date_str, date_format) - timezone
|
||
return calendar.timegm(dt.timetuple())
|
||
|
||
|
||
def date_formats(day_first=True):
|
||
return DATE_FORMATS_DAY_FIRST if day_first else DATE_FORMATS_MONTH_FIRST
|
||
|
||
|
||
def unified_strdate(date_str, day_first=True):
|
||
"""Return a string with the date in the format YYYYMMDD"""
|
||
|
||
if date_str is None:
|
||
return None
|
||
upload_date = None
|
||
# Replace commas
|
||
date_str = date_str.replace(',', ' ')
|
||
# Remove AM/PM + timezone
|
||
date_str = re.sub(r'(?i)\s*(?:AM|PM)(?:\s+[A-Z]+)?', '', date_str)
|
||
_, date_str = extract_timezone(date_str)
|
||
|
||
for expression in date_formats(day_first):
|
||
with contextlib.suppress(ValueError):
|
||
upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
|
||
if upload_date is None:
|
||
timetuple = email.utils.parsedate_tz(date_str)
|
||
if timetuple:
|
||
with contextlib.suppress(ValueError):
|
||
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
|
||
if upload_date is not None:
|
||
return str(upload_date)
|
||
|
||
|
||
def unified_timestamp(date_str, day_first=True):
|
||
if date_str is None:
|
||
return None
|
||
|
||
date_str = re.sub(r'\s+', ' ', re.sub(
|
||
r'(?i)[,|]|(mon|tues?|wed(nes)?|thu(rs)?|fri|sat(ur)?)(day)?', '', date_str))
|
||
|
||
pm_delta = 12 if re.search(r'(?i)PM', date_str) else 0
|
||
timezone, date_str = extract_timezone(date_str)
|
||
|
||
# Remove AM/PM + timezone
|
||
date_str = re.sub(r'(?i)\s*(?:AM|PM)(?:\s+[A-Z]+)?', '', date_str)
|
||
|
||
# Remove unrecognized timezones from ISO 8601 alike timestamps
|
||
m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P<tz>\s*[A-Z]+)$', date_str)
|
||
if m:
|
||
date_str = date_str[:-len(m.group('tz'))]
|
||
|
||
# Python only supports microseconds, so remove nanoseconds
|
||
m = re.search(r'^([0-9]{4,}-[0-9]{1,2}-[0-9]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}\.[0-9]{6})[0-9]+$', date_str)
|
||
if m:
|
||
date_str = m.group(1)
|
||
|
||
for expression in date_formats(day_first):
|
||
with contextlib.suppress(ValueError):
|
||
dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
|
||
return calendar.timegm(dt.timetuple())
|
||
|
||
timetuple = email.utils.parsedate_tz(date_str)
|
||
if timetuple:
|
||
return calendar.timegm(timetuple) + pm_delta * 3600 - timezone.total_seconds()
|
||
|
||
|
||
def determine_ext(url, default_ext='unknown_video'):
|
||
if url is None or '.' not in url:
|
||
return default_ext
|
||
guess = url.partition('?')[0].rpartition('.')[2]
|
||
if re.match(r'^[A-Za-z0-9]+$', guess):
|
||
return guess
|
||
# Try extract ext from URLs like http://example.com/foo/bar.mp4/?download
|
||
elif guess.rstrip('/') in KNOWN_EXTENSIONS:
|
||
return guess.rstrip('/')
|
||
else:
|
||
return default_ext
|
||
|
||
|
||
def subtitles_filename(filename, sub_lang, sub_format, expected_real_ext=None):
|
||
return replace_extension(filename, sub_lang + '.' + sub_format, expected_real_ext)
|
||
|
||
|
||
def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
|
||
R"""
|
||
Return a datetime object from a string.
|
||
Supported format:
|
||
(now|today|yesterday|DATE)([+-]\d+(microsecond|second|minute|hour|day|week|month|year)s?)?
|
||
|
||
@param format strftime format of DATE
|
||
@param precision Round the datetime object: auto|microsecond|second|minute|hour|day
|
||
auto: round to the unit provided in date_str (if applicable).
|
||
"""
|
||
auto_precision = False
|
||
if precision == 'auto':
|
||
auto_precision = True
|
||
precision = 'microsecond'
|
||
today = datetime_round(datetime.datetime.utcnow(), precision)
|
||
if date_str in ('now', 'today'):
|
||
return today
|
||
if date_str == 'yesterday':
|
||
return today - datetime.timedelta(days=1)
|
||
match = re.match(
|
||
r'(?P<start>.+)(?P<sign>[+-])(?P<time>\d+)(?P<unit>microsecond|second|minute|hour|day|week|month|year)s?',
|
||
date_str)
|
||
if match is not None:
|
||
start_time = datetime_from_str(match.group('start'), precision, format)
|
||
time = int(match.group('time')) * (-1 if match.group('sign') == '-' else 1)
|
||
unit = match.group('unit')
|
||
if unit == 'month' or unit == 'year':
|
||
new_date = datetime_add_months(start_time, time * 12 if unit == 'year' else time)
|
||
unit = 'day'
|
||
else:
|
||
if unit == 'week':
|
||
unit = 'day'
|
||
time *= 7
|
||
delta = datetime.timedelta(**{unit + 's': time})
|
||
new_date = start_time + delta
|
||
if auto_precision:
|
||
return datetime_round(new_date, unit)
|
||
return new_date
|
||
|
||
return datetime_round(datetime.datetime.strptime(date_str, format), precision)
|
||
|
||
|
||
def date_from_str(date_str, format='%Y%m%d', strict=False):
|
||
R"""
|
||
Return a date object from a string using datetime_from_str
|
||
|
||
@param strict Restrict allowed patterns to "YYYYMMDD" and
|
||
(now|today|yesterday)(-\d+(day|week|month|year)s?)?
|
||
"""
|
||
if strict and not re.fullmatch(r'\d{8}|(now|today|yesterday)(-\d+(day|week|month|year)s?)?', date_str):
|
||
raise ValueError(f'Invalid date format "{date_str}"')
|
||
return datetime_from_str(date_str, precision='microsecond', format=format).date()
|
||
|
||
|
||
def datetime_add_months(dt, months):
|
||
"""Increment/Decrement a datetime object by months."""
|
||
month = dt.month + months - 1
|
||
year = dt.year + month // 12
|
||
month = month % 12 + 1
|
||
day = min(dt.day, calendar.monthrange(year, month)[1])
|
||
return dt.replace(year, month, day)
|
||
|
||
|
||
def datetime_round(dt, precision='day'):
|
||
"""
|
||
Round a datetime object's time to a specific precision
|
||
"""
|
||
if precision == 'microsecond':
|
||
return dt
|
||
|
||
unit_seconds = {
|
||
'day': 86400,
|
||
'hour': 3600,
|
||
'minute': 60,
|
||
'second': 1,
|
||
}
|
||
roundto = lambda x, n: ((x + n / 2) // n) * n
|
||
timestamp = calendar.timegm(dt.timetuple())
|
||
return datetime.datetime.utcfromtimestamp(roundto(timestamp, unit_seconds[precision]))
|
||
|
||
|
||
def hyphenate_date(date_str):
|
||
"""
|
||
Convert a date in 'YYYYMMDD' format to 'YYYY-MM-DD' format"""
|
||
match = re.match(r'^(\d\d\d\d)(\d\d)(\d\d)$', date_str)
|
||
if match is not None:
|
||
return '-'.join(match.groups())
|
||
else:
|
||
return date_str
|
||
|
||
|
||
class DateRange:
|
||
"""Represents a time interval between two dates"""
|
||
|
||
def __init__(self, start=None, end=None):
|
||
"""start and end must be strings in the format accepted by date"""
|
||
if start is not None:
|
||
self.start = date_from_str(start, strict=True)
|
||
else:
|
||
self.start = datetime.datetime.min.date()
|
||
if end is not None:
|
||
self.end = date_from_str(end, strict=True)
|
||
else:
|
||
self.end = datetime.datetime.max.date()
|
||
if self.start > self.end:
|
||
raise ValueError('Date range: "%s" , the start date must be before the end date' % self)
|
||
|
||
@classmethod
|
||
def day(cls, day):
|
||
"""Returns a range that only contains the given day"""
|
||
return cls(day, day)
|
||
|
||
def __contains__(self, date):
|
||
"""Check if the date is in the range"""
|
||
if not isinstance(date, datetime.date):
|
||
date = date_from_str(date)
|
||
return self.start <= date <= self.end
|
||
|
||
def __str__(self):
|
||
return f'{self.start.isoformat()} - {self.end.isoformat()}'
|
||
|
||
def __eq__(self, other):
|
||
return (isinstance(other, DateRange)
|
||
and self.start == other.start and self.end == other.end)
|
||
|
||
|
||
def platform_name():
|
||
""" Returns the platform name as a str """
|
||
write_string('DeprecationWarning: yt_dlp.utils.platform_name is deprecated, use platform.platform instead')
|
||
return platform.platform()
|
||
|
||
|
||
@functools.cache
|
||
def system_identifier():
|
||
python_implementation = platform.python_implementation()
|
||
if python_implementation == 'PyPy' and hasattr(sys, 'pypy_version_info'):
|
||
python_implementation += ' version %d.%d.%d' % sys.pypy_version_info[:3]
|
||
|
||
return 'Python %s (%s %s) - %s %s' % (
|
||
platform.python_version(),
|
||
python_implementation,
|
||
platform.architecture()[0],
|
||
platform.platform(),
|
||
format_field(join_nonempty(*platform.libc_ver(), delim=' '), None, '(%s)'),
|
||
)
|
||
|
||
|
||
@functools.cache
|
||
def get_windows_version():
|
||
''' Get Windows version. returns () if it's not running on Windows '''
|
||
if compat_os_name == 'nt':
|
||
return version_tuple(platform.win32_ver()[1])
|
||
else:
|
||
return ()
|
||
|
||
|
||
def write_string(s, out=None, encoding=None):
|
||
assert isinstance(s, str)
|
||
out = out or sys.stderr
|
||
|
||
if compat_os_name == 'nt' and supports_terminal_sequences(out):
|
||
s = re.sub(r'([\r\n]+)', r' \1', s)
|
||
|
||
enc, buffer = None, out
|
||
if 'b' in getattr(out, 'mode', ''):
|
||
enc = encoding or preferredencoding()
|
||
elif hasattr(out, 'buffer'):
|
||
buffer = out.buffer
|
||
enc = encoding or getattr(out, 'encoding', None) or preferredencoding()
|
||
|
||
buffer.write(s.encode(enc, 'ignore') if enc else s)
|
||
out.flush()
|
||
|
||
|
||
def bytes_to_intlist(bs):
|
||
if not bs:
|
||
return []
|
||
if isinstance(bs[0], int): # Python 3
|
||
return list(bs)
|
||
else:
|
||
return [ord(c) for c in bs]
|
||
|
||
|
||
def intlist_to_bytes(xs):
|
||
if not xs:
|
||
return b''
|
||
return struct.pack('%dB' % len(xs), *xs)
|
||
|
||
|
||
class LockingUnsupportedError(OSError):
|
||
msg = 'File locking is not supported'
|
||
|
||
def __init__(self):
|
||
super().__init__(self.msg)
|
||
|
||
|
||
# Cross-platform file locking
|
||
if sys.platform == 'win32':
|
||
import ctypes
|
||
import ctypes.wintypes
|
||
import msvcrt
|
||
|
||
class OVERLAPPED(ctypes.Structure):
|
||
_fields_ = [
|
||
('Internal', ctypes.wintypes.LPVOID),
|
||
('InternalHigh', ctypes.wintypes.LPVOID),
|
||
('Offset', ctypes.wintypes.DWORD),
|
||
('OffsetHigh', ctypes.wintypes.DWORD),
|
||
('hEvent', ctypes.wintypes.HANDLE),
|
||
]
|
||
|
||
kernel32 = ctypes.windll.kernel32
|
||
LockFileEx = kernel32.LockFileEx
|
||
LockFileEx.argtypes = [
|
||
ctypes.wintypes.HANDLE, # hFile
|
||
ctypes.wintypes.DWORD, # dwFlags
|
||
ctypes.wintypes.DWORD, # dwReserved
|
||
ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
|
||
ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
|
||
ctypes.POINTER(OVERLAPPED) # Overlapped
|
||
]
|
||
LockFileEx.restype = ctypes.wintypes.BOOL
|
||
UnlockFileEx = kernel32.UnlockFileEx
|
||
UnlockFileEx.argtypes = [
|
||
ctypes.wintypes.HANDLE, # hFile
|
||
ctypes.wintypes.DWORD, # dwReserved
|
||
ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
|
||
ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
|
||
ctypes.POINTER(OVERLAPPED) # Overlapped
|
||
]
|
||
UnlockFileEx.restype = ctypes.wintypes.BOOL
|
||
whole_low = 0xffffffff
|
||
whole_high = 0x7fffffff
|
||
|
||
def _lock_file(f, exclusive, block):
|
||
overlapped = OVERLAPPED()
|
||
overlapped.Offset = 0
|
||
overlapped.OffsetHigh = 0
|
||
overlapped.hEvent = 0
|
||
f._lock_file_overlapped_p = ctypes.pointer(overlapped)
|
||
|
||
if not LockFileEx(msvcrt.get_osfhandle(f.fileno()),
|
||
(0x2 if exclusive else 0x0) | (0x0 if block else 0x1),
|
||
0, whole_low, whole_high, f._lock_file_overlapped_p):
|
||
# NB: No argument form of "ctypes.FormatError" does not work on PyPy
|
||
raise BlockingIOError(f'Locking file failed: {ctypes.FormatError(ctypes.GetLastError())!r}')
|
||
|
||
def _unlock_file(f):
|
||
assert f._lock_file_overlapped_p
|
||
handle = msvcrt.get_osfhandle(f.fileno())
|
||
if not UnlockFileEx(handle, 0, whole_low, whole_high, f._lock_file_overlapped_p):
|
||
raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
|
||
|
||
else:
|
||
try:
|
||
import fcntl
|
||
|
||
def _lock_file(f, exclusive, block):
|
||
flags = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
|
||
if not block:
|
||
flags |= fcntl.LOCK_NB
|
||
try:
|
||
fcntl.flock(f, flags)
|
||
except BlockingIOError:
|
||
raise
|
||
except OSError: # AOSP does not have flock()
|
||
fcntl.lockf(f, flags)
|
||
|
||
def _unlock_file(f):
|
||
try:
|
||
fcntl.flock(f, fcntl.LOCK_UN)
|
||
except OSError:
|
||
fcntl.lockf(f, fcntl.LOCK_UN)
|
||
|
||
except ImportError:
|
||
|
||
def _lock_file(f, exclusive, block):
|
||
raise LockingUnsupportedError()
|
||
|
||
def _unlock_file(f):
|
||
raise LockingUnsupportedError()
|
||
|
||
|
||
class locked_file:
|
||
locked = False
|
||
|
||
def __init__(self, filename, mode, block=True, encoding=None):
|
||
if mode not in {'r', 'rb', 'a', 'ab', 'w', 'wb'}:
|
||
raise NotImplementedError(mode)
|
||
self.mode, self.block = mode, block
|
||
|
||
writable = any(f in mode for f in 'wax+')
|
||
readable = any(f in mode for f in 'r+')
|
||
flags = functools.reduce(operator.ior, (
|
||
getattr(os, 'O_CLOEXEC', 0), # UNIX only
|
||
getattr(os, 'O_BINARY', 0), # Windows only
|
||
getattr(os, 'O_NOINHERIT', 0), # Windows only
|
||
os.O_CREAT if writable else 0, # O_TRUNC only after locking
|
||
os.O_APPEND if 'a' in mode else 0,
|
||
os.O_EXCL if 'x' in mode else 0,
|
||
os.O_RDONLY if not writable else os.O_RDWR if readable else os.O_WRONLY,
|
||
))
|
||
|
||
self.f = os.fdopen(os.open(filename, flags, 0o666), mode, encoding=encoding)
|
||
|
||
def __enter__(self):
|
||
exclusive = 'r' not in self.mode
|
||
try:
|
||
_lock_file(self.f, exclusive, self.block)
|
||
self.locked = True
|
||
except OSError:
|
||
self.f.close()
|
||
raise
|
||
if 'w' in self.mode:
|
||
try:
|
||
self.f.truncate()
|
||
except OSError as e:
|
||
if e.errno not in (
|
||
errno.ESPIPE, # Illegal seek - expected for FIFO
|
||
errno.EINVAL, # Invalid argument - expected for /dev/null
|
||
):
|
||
raise
|
||
return self
|
||
|
||
def unlock(self):
|
||
if not self.locked:
|
||
return
|
||
try:
|
||
_unlock_file(self.f)
|
||
finally:
|
||
self.locked = False
|
||
|
||
def __exit__(self, *_):
|
||
try:
|
||
self.unlock()
|
||
finally:
|
||
self.f.close()
|
||
|
||
open = __enter__
|
||
close = __exit__
|
||
|
||
def __getattr__(self, attr):
|
||
return getattr(self.f, attr)
|
||
|
||
def __iter__(self):
|
||
return iter(self.f)
|
||
|
||
|
||
@functools.cache
|
||
def get_filesystem_encoding():
|
||
encoding = sys.getfilesystemencoding()
|
||
return encoding if encoding is not None else 'utf-8'
|
||
|
||
|
||
def shell_quote(args):
|
||
quoted_args = []
|
||
encoding = get_filesystem_encoding()
|
||
for a in args:
|
||
if isinstance(a, bytes):
|
||
# We may get a filename encoded with 'encodeFilename'
|
||
a = a.decode(encoding)
|
||
quoted_args.append(compat_shlex_quote(a))
|
||
return ' '.join(quoted_args)
|
||
|
||
|
||
def smuggle_url(url, data):
|
||
""" Pass additional data in a URL for internal use. """
|
||
|
||
url, idata = unsmuggle_url(url, {})
|
||
data.update(idata)
|
||
sdata = urllib.parse.urlencode(
|
||
{'__youtubedl_smuggle': json.dumps(data)})
|
||
return url + '#' + sdata
|
||
|
||
|
||
def unsmuggle_url(smug_url, default=None):
|
||
if '#__youtubedl_smuggle' not in smug_url:
|
||
return smug_url, default
|
||
url, _, sdata = smug_url.rpartition('#')
|
||
jsond = urllib.parse.parse_qs(sdata)['__youtubedl_smuggle'][0]
|
||
data = json.loads(jsond)
|
||
return url, data
|
||
|
||
|
||
def format_decimal_suffix(num, fmt='%d%s', *, factor=1000):
|
||
""" Formats numbers with decimal sufixes like K, M, etc """
|
||
num, factor = float_or_none(num), float(factor)
|
||
if num is None or num < 0:
|
||
return None
|
||
POSSIBLE_SUFFIXES = 'kMGTPEZY'
|
||
exponent = 0 if num == 0 else min(int(math.log(num, factor)), len(POSSIBLE_SUFFIXES))
|
||
suffix = ['', *POSSIBLE_SUFFIXES][exponent]
|
||
if factor == 1024:
|
||
suffix = {'k': 'Ki', '': ''}.get(suffix, f'{suffix}i')
|
||
converted = num / (factor ** exponent)
|
||
return fmt % (converted, suffix)
|
||
|
||
|
||
def format_bytes(bytes):
|
||
return format_decimal_suffix(bytes, '%.2f%sB', factor=1024) or 'N/A'
|
||
|
||
|
||
def lookup_unit_table(unit_table, s):
|
||
units_re = '|'.join(re.escape(u) for u in unit_table)
|
||
m = re.match(
|
||
r'(?P<num>[0-9]+(?:[,.][0-9]*)?)\s*(?P<unit>%s)\b' % units_re, s)
|
||
if not m:
|
||
return None
|
||
num_str = m.group('num').replace(',', '.')
|
||
mult = unit_table[m.group('unit')]
|
||
return int(float(num_str) * mult)
|
||
|
||
|
||
def parse_filesize(s):
|
||
if s is None:
|
||
return None
|
||
|
||
# The lower-case forms are of course incorrect and unofficial,
|
||
# but we support those too
|
||
_UNIT_TABLE = {
|
||
'B': 1,
|
||
'b': 1,
|
||
'bytes': 1,
|
||
'KiB': 1024,
|
||