Skip to content

Commit

Permalink
stream.hls: refactor segment byterange calculation
Browse files Browse the repository at this point in the history
- Move "Range" header value calculation to separate class:
  Don't cache offsets by segment URL, and instead refer to the previous
  sequence. Initialization maps always require an existing offset value.
- Refactor HLSStreamWriter.fetch and related methods
- Add byterange tests
  • Loading branch information
bastimeyer authored and back-to committed Jan 22, 2022
1 parent 35a91ea commit 8153a6b
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 27 deletions.
81 changes: 56 additions & 25 deletions src/streamlink/stream/hls.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging
import re
import struct
from collections import OrderedDict, defaultdict
from collections import OrderedDict
from concurrent.futures import Future
from threading import Event
from typing import List, NamedTuple, Optional, Union
from typing import List, NamedTuple, Optional, Tuple, Union
from urllib.parse import urlparse

# noinspection PyPackageRequirements
Expand All @@ -16,7 +16,7 @@

from streamlink.exceptions import StreamError
from streamlink.stream.ffmpegmux import FFMPEGMuxer, MuxedStream
from streamlink.stream.hls_playlist import Key, M3U8, Map, Segment, load as load_hls_playlist
from streamlink.stream.hls_playlist import ByteRange, Key, M3U8, Map, Segment, load as load_hls_playlist
from streamlink.stream.http import HTTPStream
from streamlink.stream.segmented import SegmentedStreamReader, SegmentedStreamWorker, SegmentedStreamWriter
from streamlink.utils.cache import LRUCache
Expand All @@ -30,15 +30,45 @@ class Sequence(NamedTuple):
segment: Segment


class HLSStreamWriter(SegmentedStreamWriter):
class ByteRangeOffset:
sequence: Sequence.num = None
offset: int = None

@staticmethod
def _calc_end(start: int, size: ByteRange.range):
return start + max(size - 1, 0)

def cached(self, sequence: Sequence.num, byterange: ByteRange) -> Tuple[int, int]:
if byterange.offset is not None:
bytes_start = byterange.offset
elif self.offset is not None and self.sequence == sequence - 1:
bytes_start = self.offset
else:
raise StreamError("Missing BYTERANGE offset")

bytes_end = self._calc_end(bytes_start, byterange.range)

self.sequence = sequence
self.offset = bytes_end + 1

return bytes_start, bytes_end

def uncached(self, byterange: ByteRange) -> Tuple[int, int]:
bytes_start = byterange.offset
if bytes_start is None:
raise StreamError("Missing BYTERANGE offset")

return bytes_start, self._calc_end(bytes_start, byterange.range)


class HLSStreamWriter(SegmentedStreamWriter):
WRITE_CHUNK_SIZE = 8192

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
options = self.session.options

self.byterange_offsets = defaultdict(int)
self.byterange: ByteRangeOffset = ByteRangeOffset()
self.map_cache: LRUCache[Sequence.segment.map.uri, Future] = LRUCache(self.threads)
self.key_data = None
self.key_uri = None
Expand Down Expand Up @@ -90,19 +120,16 @@ def create_decryptor(self, key: Key, num: int) -> AES:

return AES.new(self.key_data, AES.MODE_CBC, iv)

def create_request_params(self, segment: Union[Segment, Map]):
def create_request_params(self, num: Sequence.num, segment: Union[Segment, Map], is_map: bool):
request_params = dict(self.reader.request_params)
headers = request_params.pop("headers", {})

if segment.byterange:
bytes_start = self.byterange_offsets[segment.uri]
if segment.byterange.offset is not None:
bytes_start = segment.byterange.offset

bytes_len = max(segment.byterange.range - 1, 0)
bytes_end = bytes_start + bytes_len
if is_map:
bytes_start, bytes_end = self.byterange.uncached(segment.byterange)
else:
bytes_start, bytes_end = self.byterange.cached(num, segment.byterange)
headers["Range"] = f"bytes={bytes_start}-{bytes_end}"
self.byterange_offsets[segment.uri] = bytes_end + 1

request_params["headers"] = headers

Expand Down Expand Up @@ -131,28 +158,33 @@ def put(self, sequence: Sequence):

def fetch(self, sequence: Sequence) -> Optional[Response]:
try:
return self._fetch(sequence.segment, self.stream_data and not sequence.segment.key)
except StreamError as err: # pragma: no cover
return self._fetch(
sequence.segment.uri,
stream=self.stream_data and not sequence.segment.key,
**self.create_request_params(sequence.num, sequence.segment, False)
)
except StreamError as err:
log.error(f"Failed to fetch segment {sequence.num}: {err}")

def fetch_map(self, sequence: Sequence) -> Optional[Response]:
try:
return self._fetch(sequence.segment.map, False)
except StreamError as err: # pragma: no cover
return self._fetch(
sequence.segment.map.uri,
stream=False,
**self.create_request_params(sequence.num, sequence.segment.map, True)
)
except StreamError as err:
log.error(f"Failed to fetch map for segment {sequence.num}: {err}")

def _fetch(self, segment: Union[Segment, Map], stream: bool) -> Optional[Response]:
def _fetch(self, url: str, **request_params) -> Optional[Response]:
if self.closed or not self.retries: # pragma: no cover
return

request_params = self.create_request_params(segment)

return self.session.http.get(
segment.uri,
stream=stream,
url,
timeout=self.timeout,
exception=StreamError,
retries=self.retries,
exception=StreamError,
**request_params
)

Expand Down Expand Up @@ -181,8 +213,7 @@ def _write_discard(self, sequence: Sequence, res: Response, is_map: bool):
try:
for _ in res.iter_content(self.WRITE_CHUNK_SIZE):
pass
except (ChunkedEncodingError, ContentDecodingError,
ConnectionError, StreamConsumedError):
except (ChunkedEncodingError, ContentDecodingError, ConnectionError, StreamConsumedError):
pass

def _write(self, sequence: Sequence, res: Response, is_map: bool):
Expand Down
104 changes: 102 additions & 2 deletions tests/stream/test_hls.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ def __init__(self, num, key, iv, *args, padding=b"", append=b"", **kwargs):


class TagMap(Tag):
def __init__(self, num, namespace):
def __init__(self, num, namespace, attrs=None):
self.path = f"map{num}"
self.content = f"[map{num}]".encode("ascii")
super().__init__("EXT-X-MAP", {"URI": self.val_quoted_string(self.url(namespace))})
super().__init__("EXT-X-MAP", {
"URI": self.val_quoted_string(self.url(namespace)),
**(attrs or {})
})


class TagMapEnc(EncryptedBase, TagMap):
Expand Down Expand Up @@ -146,6 +149,103 @@ def test_map(self):
self.assertTrue(self.called(map2, once=True), "Downloads second map only once")


@patch("streamlink.stream.hls.HLSStreamWorker.wait", Mock(return_value=True))
class TestHLSStreamByterange(TestMixinStreamHLS, unittest.TestCase):
__stream__ = EventedHLSStream

# The dummy segments in the error tests are required because the writer's run loop would otherwise continue forever
# due to the segment's future result being None (no requests result), and we can't await the end of the stream
# without waiting for the stream's timeout error. The dummy segments ensure that we can call await_write for these
# successful segments, so we can close the stream afterwards and safely make the test assertions.
# The EventedHLSStreamWriter could also implement await_fetch, but this is unnecessarily more complex than it already is.

@patch("streamlink.stream.hls.log")
def test_unknown_offset(self, mock_log: Mock):
thread, _ = self.subject([
Playlist(0, [
Tag("EXT-X-BYTERANGE", "3"), Segment(0),
Segment(1)
], end=True)
])

self.await_write(2 - 1)
self.thread.close()

self.assertEqual(mock_log.error.call_args_list, [
call("Failed to fetch segment 0: Missing BYTERANGE offset")
])
self.assertFalse(self.called(Segment(0)))

@patch("streamlink.stream.hls.log")
def test_unknown_offset_map(self, mock_log: Mock):
map1 = TagMap(1, self.id(), {"BYTERANGE": "\"1234\""})
self.mock("GET", self.url(map1), content=map1.content)
thread, _ = self.subject([
Playlist(0, [
Segment(0),
map1,
Segment(1)
], end=True)
])

self.await_write(3 - 1)
self.thread.close()

self.assertEqual(mock_log.error.call_args_list, [
call("Failed to fetch map for segment 1: Missing BYTERANGE offset")
])
self.assertFalse(self.called(map1))

@patch("streamlink.stream.hls.log")
def test_invalid_offset_reference(self, mock_log: Mock):
thread, _ = self.subject([
Playlist(0, [
Tag("EXT-X-BYTERANGE", "3@0"), Segment(0),
Segment(1),
Tag("EXT-X-BYTERANGE", "5"), Segment(2),
Segment(3)
], end=True)
])

self.await_write(4 - 1)
self.thread.close()

self.assertEqual(mock_log.error.call_args_list, [
call("Failed to fetch segment 2: Missing BYTERANGE offset")
])
self.assertEqual(self.mocks[self.url(Segment(0))].last_request._request.headers["Range"], "bytes=0-2")
self.assertFalse(self.called(Segment(2)))

def test_offsets(self):
map1 = TagMap(1, self.id(), {"BYTERANGE": "\"1234@0\""})
map2 = TagMap(2, self.id(), {"BYTERANGE": "\"42@1337\""})
self.mock("GET", self.url(map1), content=map1.content)
self.mock("GET", self.url(map2), content=map2.content)
s1, s2, s3, s4, s5 = Segment(0), Segment(1), Segment(2), Segment(3), Segment(4)

self.subject([
Playlist(0, [
map1,
Tag("EXT-X-BYTERANGE", "5@3"), s1,
Tag("EXT-X-BYTERANGE", "7"), s2,
map2,
Tag("EXT-X-BYTERANGE", "11"), s3,
Tag("EXT-X-BYTERANGE", "17@13"), s4,
Tag("EXT-X-BYTERANGE", "19"), s5,
], end=True)
])

self.await_write(5 * 2)
self.await_read(read_all=True)
self.assertEqual(self.mocks[self.url(map1)].last_request._request.headers["Range"], "bytes=0-1233")
self.assertEqual(self.mocks[self.url(map2)].last_request._request.headers["Range"], "bytes=1337-1378")
self.assertEqual(self.mocks[self.url(s1)].last_request._request.headers["Range"], "bytes=3-7")
self.assertEqual(self.mocks[self.url(s2)].last_request._request.headers["Range"], "bytes=8-14")
self.assertEqual(self.mocks[self.url(s3)].last_request._request.headers["Range"], "bytes=15-25")
self.assertEqual(self.mocks[self.url(s4)].last_request._request.headers["Range"], "bytes=13-29")
self.assertEqual(self.mocks[self.url(s5)].last_request._request.headers["Range"], "bytes=30-48")


@patch("streamlink.stream.hls.HLSStreamWorker.wait", Mock(return_value=True))
class TestHLSStreamEncrypted(TestMixinStreamHLS, unittest.TestCase):
__stream__ = EventedHLSStream
Expand Down

0 comments on commit 8153a6b

Please sign in to comment.