Skip to content

Commit

Permalink
Finish logging, add timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Jan 14, 2025
1 parent 323c41a commit d8eca32
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 57 deletions.
58 changes: 29 additions & 29 deletions music_assistant/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, check_output, communicate
from .util import create_tempfile, detect_charset
from .util import TimedAsyncGenerator, create_tempfile, detect_charset

if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig, PlayerConfig
Expand Down Expand Up @@ -124,7 +124,8 @@ async def crossfade_pcm_parts(
return crossfaded_audio
# no crossfade_data, return original data instead
LOGGER.debug(
"crossfade of pcm chunks failed: not enough data? " "fade_in_part: %s - fade_out_part: %s",
"crossfade of pcm chunks failed: not enough data? "
"- fade_in_part: %s - fade_out_part: %s",
len(fade_in_part),
len(fade_out_part),
)
Expand Down Expand Up @@ -286,7 +287,8 @@ async def get_media_stream(
extra_input_args: list[str] | None = None,
) -> AsyncGenerator[bytes, None]:
"""Get PCM audio stream for given media details."""
LOGGER.debug("Starting media stream for %s", streamdetails.uri)
logger = LOGGER.getChild("media_stream")
logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
strip_silence_begin = streamdetails.strip_silence_begin
strip_silence_end = streamdetails.strip_silence_end
if streamdetails.fade_in:
Expand All @@ -296,7 +298,6 @@ async def get_media_stream(
chunk_number = 0
buffer: bytes = b""
finished = False

ffmpeg_proc = FFMpeg(
audio_input=audio_source,
input_format=streamdetails.audio_format,
Expand All @@ -308,23 +309,21 @@ async def get_media_stream(
)
try:
await ffmpeg_proc.start()
logger = LOGGER.getChild(f"media_stream_{ffmpeg_proc.proc.pid}")
logger.log(
VERBOSE_LOG_LEVEL,
"Started media stream for %s \n"
"- using streamtype: %s\n "
"- using volume normalization: %s\n"
"- using pcm format: %s\n"
"- using filter params: %s\n"
"- using extra input args: %s",
logger.debug(
"Started media stream for %s"
" - using streamtype: %s "
" - volume normalization: %s"
" - pcm format: %s"
" - ffmpeg PID: %s",
streamdetails.uri,
streamdetails.stream_type,
streamdetails.volume_normalization_mode,
pcm_format.output_format_str,
str(filter_params),
str(extra_input_args),
pcm_format.content_type.value,
ffmpeg_proc.proc.pid,
)
async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
async for chunk in TimedAsyncGenerator(
ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), timeout=30
):
# for radio streams we just yield all chunks directly
if streamdetails.media_type == MediaType.RADIO:
yield chunk
Expand Down Expand Up @@ -405,10 +404,11 @@ async def get_media_stream(
if finished and not streamdetails.seek_position and seconds_streamed:
streamdetails.duration = seconds_streamed

# parse loudnorm data if we have that collected
# parse loudnorm data if we have that collected (and enabled)
if (
streamdetails.loudness is None
and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED
(streamdetails.loudness is None or finished)
and streamdetails.volume_normalization_mode
in (VolumeNormalizationMode.DYNAMIC, VolumeNormalizationMode.FALLBACK_DYNAMIC)
and (finished or (seconds_streamed >= 300))
):
# if dynamic volume normalization is enabled and the entire track is streamed
Expand Down Expand Up @@ -901,16 +901,16 @@ def get_player_filter_params(
def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
"""Parse Loudness measurement from ffmpeg stderr output."""
stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
if "[Parsed_loudnorm_" not in stderr_data:
if "[Parsed_loudnorm_0 @" not in stderr_data:
return None
stderr_data = stderr_data.split("[Parsed_loudnorm_")[1]
stderr_data = "{" + stderr_data.rsplit("{")[-1].strip()
stderr_data = stderr_data.rsplit("}")[0].strip() + "}"
try:
loudness_data = json_loads(stderr_data)
except JSON_DECODE_EXCEPTIONS:
return None
return float(loudness_data["input_i"])
for jsun_chunk in stderr_data.split(" { "):
try:
stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}"
loudness_data = json_loads(stderr_data)
return float(loudness_data["input_i"])
except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError):
continue
return None


async def analyze_loudness(
Expand Down
44 changes: 16 additions & 28 deletions music_assistant/helpers/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from music_assistant.constants import VERBOSE_LOG_LEVEL

from .process import AsyncProcess
from .util import close_async_generator
from .util import TimedAsyncGenerator, close_async_generator

if TYPE_CHECKING:
from music_assistant_models.media_items import AudioFormat
Expand Down Expand Up @@ -129,39 +129,27 @@ async def _feed_stdin(self) -> None:
if TYPE_CHECKING:
self.audio_input: AsyncGenerator[bytes, None]
generator_exhausted = False
audio_received = asyncio.Event()

async def stdin_watchdog() -> None:
# this is a simple watchdog to ensure we don't get stuck forever waiting for audio data
try:
await asyncio.wait_for(audio_received.wait(), timeout=30)
except TimeoutError:
self.logger.error("No audio data received from source after timeout")
self._stdin_task.cancel()

asyncio.create_task(stdin_watchdog())

cancelled = False
try:
start = time.time()
self.logger.debug("Start reading audio data from source")
async for chunk in self.audio_input:
if not audio_received.is_set():
audio_received.set()
self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...")
async for chunk in TimedAsyncGenerator(self.audio_input, timeout=30):
await self.write(chunk)
self.logger.debug("Audio data source exhausted in %.2fs", time.time() - start)
self.logger.log(
VERBOSE_LOG_LEVEL, "Audio data source exhausted in %.2fs", time.time() - start
)
generator_exhausted = True
if not audio_received.is_set():
raise AudioError("No audio data received from source")
except Exception as err:
if isinstance(err, asyncio.CancelledError):
return
self.logger.error(
"Stream error: %s",
str(err) or err.__class__.__name__,
exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
)
cancelled = isinstance(err, asyncio.CancelledError)
if not cancelled:
self.logger.error(
"Stream error: %s",
str(err) or err.__class__.__name__,
exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
)
finally:
await self.write_eof()
if not cancelled:
await self.write_eof()
# we need to ensure that we close the async generator
# if we get cancelled otherwise it keeps lingering forever
if not generator_exhausted:
Expand Down

0 comments on commit d8eca32

Please sign in to comment.