Skip to content

Commit

Permalink
Fix: Audio streaming hangs suddenly at the end of a track (#1872)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Jan 14, 2025
1 parent e0d8479 commit c5cc6c5
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 65 deletions.
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ ADD dist dist
COPY requirements_all.txt .
RUN uv pip install \
--no-cache \
--find-links "https://wheels.home-assistant.io/musllinux/" \
-r requirements_all.txt

# Install Music Assistant from prebuilt wheel
RUN uv pip install \
--no-cache \
--find-links "https://wheels.home-assistant.io/musllinux/" \
"music-assistant@dist/music_assistant-${MASS_VERSION}-py3-none-any.whl"

# Set some labels
Expand Down
47 changes: 28 additions & 19 deletions Dockerfile.base
Original file line number Diff line number Diff line change
@@ -1,44 +1,54 @@
# syntax=docker/dockerfile:1

# BASE docker image for music assistant container

FROM python:3.12-alpine3.20
# Based on Debian Trixie (unstable) because we need a newer version of ffmpeg (and snapcast)
# TODO: Switch back to regular python stable debian image + manually build ffmpeg and snapcast
FROM debian:trixie

ARG TARGETPLATFORM


RUN set -x \
&& apk add --no-cache \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
ca-certificates \
jemalloc \
curl \
git \
wget \
tzdata \
python3 \
python3-venv \
python3-pip \
libsox-fmt-all \
libsox3 \
ffmpeg \
sox \
openssl \
cifs-utils \
# install ffmpeg from community repo
&& apk add --no-cache ffmpeg --repository=https://dl-cdn.alpinelinux.org/alpine/v3.20/community \
# install snapcast from community repo
&& apk add --no-cache snapcast --repository=https://dl-cdn.alpinelinux.org/alpine/edge/community \
# install libnfs from community repo
&& apk add --no-cache libnfs --repository=https://dl-cdn.alpinelinux.org/alpine/v3.20/community \
# install openssl-dev (needed for airplay)
&& apk add --no-cache openssl-dev
libnfs-utils \
libjemalloc2 \
snapserver \
# cleanup
&& rm -rf /tmp/* \
&& rm -rf /var/lib/apt/lists/*


# Copy widevine client files to container
RUN mkdir -p /usr/local/bin/widevine_cdm
COPY widevine_cdm/* /usr/local/bin/widevine_cdm/

WORKDIR /app

# Configure runtime environmental variables
ENV LD_PRELOAD="/usr/lib/libjemalloc.so.2"
ENV VIRTUAL_ENV=/app/venv
# Enable jemalloc
RUN \
export LD_PRELOAD="$(find /usr/lib/ -name *libjemalloc.so.2)" \
export MALLOC_CONF="background_thread:true,metadata_thp:auto,dirty_decay_ms:20000,muzzy_decay_ms:20000"

# create python venv
RUN python3 -m venv $VIRTUAL_ENV && \
source $VIRTUAL_ENV/bin/activate && \
pip install --upgrade pip \
ENV VIRTUAL_ENV=/app/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN pip install --upgrade pip \
&& pip install uv==0.4.17

# we need to set (very permissive) permissions to the workdir
Expand All @@ -48,7 +58,6 @@ RUN python3 -m venv $VIRTUAL_ENV && \
RUN chmod -R 777 /app \
&& chmod -R 777 /tmp

ENV PATH="$VIRTUAL_ENV/bin:$PATH"
WORKDIR $VIRTUAL_ENV

LABEL \
Expand Down
53 changes: 36 additions & 17 deletions music_assistant/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, check_output, communicate
from .util import create_tempfile, detect_charset
from .util import TimedAsyncGenerator, create_tempfile, detect_charset

if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig, PlayerConfig
Expand Down Expand Up @@ -124,7 +124,8 @@ async def crossfade_pcm_parts(
return crossfaded_audio
# no crossfade_data, return original data instead
LOGGER.debug(
"crossfade of pcm chunks failed: not enough data? " "fade_in_part: %s - fade_out_part: %s",
"crossfade of pcm chunks failed: not enough data? "
"- fade_in_part: %s - fade_out_part: %s",
len(fade_in_part),
len(fade_out_part),
)
Expand Down Expand Up @@ -287,7 +288,7 @@ async def get_media_stream(
) -> AsyncGenerator[bytes, None]:
"""Get PCM audio stream for given media details."""
logger = LOGGER.getChild("media_stream")
logger.debug("start media stream for: %s", streamdetails.uri)
logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
strip_silence_begin = streamdetails.strip_silence_begin
strip_silence_end = streamdetails.strip_silence_end
if streamdetails.fade_in:
Expand All @@ -297,18 +298,32 @@ async def get_media_stream(
chunk_number = 0
buffer: bytes = b""
finished = False

ffmpeg_proc = FFMpeg(
audio_input=audio_source,
input_format=streamdetails.audio_format,
output_format=pcm_format,
filter_params=filter_params,
extra_input_args=extra_input_args,
collect_log_history=True,
loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
)
try:
await ffmpeg_proc.start()
async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
logger.debug(
"Started media stream for %s"
" - using streamtype: %s "
" - volume normalization: %s"
" - pcm format: %s"
" - ffmpeg PID: %s",
streamdetails.uri,
streamdetails.stream_type,
streamdetails.volume_normalization_mode,
pcm_format.content_type.value,
ffmpeg_proc.proc.pid,
)
async for chunk in TimedAsyncGenerator(
ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), timeout=30
):
# for radio streams we just yield all chunks directly
if streamdetails.media_type == MediaType.RADIO:
yield chunk
Expand Down Expand Up @@ -349,6 +364,7 @@ async def get_media_stream(
buffer = buffer[pcm_format.pcm_sample_size :]

# end of audio/track reached
logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
if strip_silence_end and buffer:
# strip silence from end of audio
buffer = await strip_silence(
Expand All @@ -364,6 +380,7 @@ async def get_media_stream(
finished = True

finally:
logger.log(VERBOSE_LOG_LEVEL, "Closing ffmpeg...")
await ffmpeg_proc.close()

if bytes_sent == 0:
Expand All @@ -387,15 +404,17 @@ async def get_media_stream(
if finished and not streamdetails.seek_position and seconds_streamed:
streamdetails.duration = seconds_streamed

# parse loudnorm data if we have that collected
# parse loudnorm data if we have that collected (and enabled)
if (
streamdetails.loudness is None
and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED
(streamdetails.loudness is None or finished)
and streamdetails.volume_normalization_mode
in (VolumeNormalizationMode.DYNAMIC, VolumeNormalizationMode.FALLBACK_DYNAMIC)
and (finished or (seconds_streamed >= 300))
):
# if dynamic volume normalization is enabled and the entire track is streamed
# the loudnorm filter will output the measuremeet in the log,
# so we can use those directly instead of analyzing the audio
logger.log(VERBOSE_LOG_LEVEL, "Collecting loudness measurement...")
if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
logger.debug(
"Loudness measurement for %s: %s dB",
Expand Down Expand Up @@ -882,16 +901,16 @@ def get_player_filter_params(
def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
"""Parse Loudness measurement from ffmpeg stderr output."""
stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
if "[Parsed_loudnorm_" not in stderr_data:
if "[Parsed_loudnorm_0 @" not in stderr_data:
return None
stderr_data = stderr_data.split("[Parsed_loudnorm_")[1]
stderr_data = "{" + stderr_data.rsplit("{")[-1].strip()
stderr_data = stderr_data.rsplit("}")[0].strip() + "}"
try:
loudness_data = json_loads(stderr_data)
except JSON_DECODE_EXCEPTIONS:
return None
return float(loudness_data["input_i"])
for jsun_chunk in stderr_data.split(" { "):
try:
stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}"
loudness_data = json_loads(stderr_data)
return float(loudness_data["input_i"])
except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError):
continue
return None


async def analyze_loudness(
Expand Down
47 changes: 20 additions & 27 deletions music_assistant/helpers/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import logging
import time
from collections import deque
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING
Expand All @@ -15,7 +16,7 @@
from music_assistant.constants import VERBOSE_LOG_LEVEL

from .process import AsyncProcess
from .util import close_async_generator
from .util import TimedAsyncGenerator, close_async_generator

if TYPE_CHECKING:
from music_assistant_models.media_items import AudioFormat
Expand All @@ -37,6 +38,7 @@ def __init__(
extra_input_args: list[str] | None = None,
audio_output: str | int = "-",
collect_log_history: bool = False,
loglevel: str = "error",
) -> None:
"""Initialize AsyncProcess."""
ffmpeg_args = get_ffmpeg_args(
Expand All @@ -47,7 +49,7 @@ def __init__(
input_path=audio_input if isinstance(audio_input, str) else "-",
output_path=audio_output if isinstance(audio_output, str) else "-",
extra_input_args=extra_input_args or [],
loglevel="info",
loglevel=loglevel,
)
self.audio_input = audio_input
self.input_format = input_format
Expand Down Expand Up @@ -127,36 +129,27 @@ async def _feed_stdin(self) -> None:
if TYPE_CHECKING:
self.audio_input: AsyncGenerator[bytes, None]
generator_exhausted = False
audio_received = asyncio.Event()

async def stdin_watchdog() -> None:
# this is a simple watchdog to ensure we don't get stuck forever waiting for audio data
try:
await asyncio.wait_for(audio_received.wait(), timeout=30)
except TimeoutError:
self.logger.error("No audio data received from source after timeout")
self._stdin_task.cancel()

asyncio.create_task(stdin_watchdog())

cancelled = False
try:
async for chunk in self.audio_input:
if not audio_received.is_set():
audio_received.set()
start = time.time()
self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...")
async for chunk in TimedAsyncGenerator(self.audio_input, timeout=30):
await self.write(chunk)
self.logger.log(
VERBOSE_LOG_LEVEL, "Audio data source exhausted in %.2fs", time.time() - start
)
generator_exhausted = True
if not audio_received.is_set():
raise AudioError("No audio data received from source")
except Exception as err:
if isinstance(err, asyncio.CancelledError):
return
self.logger.error(
"Stream error: %s",
str(err) or err.__class__.__name__,
exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
)
cancelled = isinstance(err, asyncio.CancelledError)
if not cancelled:
self.logger.error(
"Stream error: %s",
str(err) or err.__class__.__name__,
exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
)
finally:
await self.write_eof()
if not cancelled:
await self.write_eof()
# we need to ensure that we close the async generator
# if we get cancelled otherwise it keeps lingering forever
if not generator_exhausted:
Expand Down

0 comments on commit c5cc6c5

Please sign in to comment.