diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 583354b0b..3d82d92d0 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -46,7 +46,7 @@ from .ffmpeg import FFMpeg, get_ffmpeg_stream from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u from .process import AsyncProcess, check_output, communicate -from .util import create_tempfile, detect_charset +from .util import TimedAsyncGenerator, create_tempfile, detect_charset if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig, PlayerConfig @@ -124,7 +124,8 @@ async def crossfade_pcm_parts( return crossfaded_audio # no crossfade_data, return original data instead LOGGER.debug( - "crossfade of pcm chunks failed: not enough data? " "fade_in_part: %s - fade_out_part: %s", + "crossfade of pcm chunks failed: not enough data? " + "- fade_in_part: %s - fade_out_part: %s", len(fade_in_part), len(fade_out_part), ) @@ -286,7 +287,8 @@ async def get_media_stream( extra_input_args: list[str] | None = None, ) -> AsyncGenerator[bytes, None]: """Get PCM audio stream for given media details.""" - LOGGER.debug("Starting media stream for %s", streamdetails.uri) + logger = LOGGER.getChild("media_stream") + logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri) strip_silence_begin = streamdetails.strip_silence_begin strip_silence_end = streamdetails.strip_silence_end if streamdetails.fade_in: @@ -296,7 +298,6 @@ async def get_media_stream( chunk_number = 0 buffer: bytes = b"" finished = False - ffmpeg_proc = FFMpeg( audio_input=audio_source, input_format=streamdetails.audio_format, @@ -308,23 +309,21 @@ async def get_media_stream( ) try: await ffmpeg_proc.start() - logger = LOGGER.getChild(f"media_stream_{ffmpeg_proc.proc.pid}") - logger.log( - VERBOSE_LOG_LEVEL, - "Started media stream for %s \n" - "- using streamtype: %s\n " - "- using volume normalization: %s\n" - "- using pcm format: %s\n" - "- using filter params: %s\n" - "- using extra input args: %s", + logger.debug( + "Started media stream for %s" + " - using streamtype: %s " + " - volume normalization: %s" + " - pcm format: %s" + " - ffmpeg PID: %s", streamdetails.uri, streamdetails.stream_type, streamdetails.volume_normalization_mode, - pcm_format.output_format_str, - str(filter_params), - str(extra_input_args), + pcm_format.content_type.value, + ffmpeg_proc.proc.pid, ) - async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size): + async for chunk in TimedAsyncGenerator( + ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), timeout=30 + ): # for radio streams we just yield all chunks directly if streamdetails.media_type == MediaType.RADIO: yield chunk @@ -405,10 +404,11 @@ async def get_media_stream( if finished and not streamdetails.seek_position and seconds_streamed: streamdetails.duration = seconds_streamed - # parse loudnorm data if we have that collected + # parse loudnorm data if we have that collected (and enabled) if ( - streamdetails.loudness is None - and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED + (streamdetails.loudness is None or finished) + and streamdetails.volume_normalization_mode + in (VolumeNormalizationMode.DYNAMIC, VolumeNormalizationMode.FALLBACK_DYNAMIC) and (finished or (seconds_streamed >= 300)) ): # if dynamic volume normalization is enabled and the entire track is streamed @@ -901,16 +901,16 @@ def get_player_filter_params( def parse_loudnorm(raw_stderr: bytes | str) -> float | None: """Parse Loudness measurement from ffmpeg stderr output.""" stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr - if "[Parsed_loudnorm_" not in stderr_data: + if "[Parsed_loudnorm_0 @" not in stderr_data: return None - stderr_data = stderr_data.split("[Parsed_loudnorm_")[1] - stderr_data = "{" + stderr_data.rsplit("{")[-1].strip() - stderr_data = stderr_data.rsplit("}")[0].strip() + "}" - try: - loudness_data = json_loads(stderr_data) - except JSON_DECODE_EXCEPTIONS: - return None - return float(loudness_data["input_i"]) + for jsun_chunk in stderr_data.split(" { "): + try: + stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}" + loudness_data = json_loads(stderr_data) + return float(loudness_data["input_i"]) + except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError): + continue + return None async def analyze_loudness( diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index 67a8a15b7..99e3d8970 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -16,7 +16,7 @@ from music_assistant.constants import VERBOSE_LOG_LEVEL from .process import AsyncProcess -from .util import close_async_generator +from .util import TimedAsyncGenerator, close_async_generator if TYPE_CHECKING: from music_assistant_models.media_items import AudioFormat @@ -129,39 +129,27 @@ async def _feed_stdin(self) -> None: if TYPE_CHECKING: self.audio_input: AsyncGenerator[bytes, None] generator_exhausted = False - audio_received = asyncio.Event() - - async def stdin_watchdog() -> None: - # this is a simple watchdog to ensure we don't get stuck forever waiting for audio data - try: - await asyncio.wait_for(audio_received.wait(), timeout=30) - except TimeoutError: - self.logger.error("No audio data received from source after timeout") - self._stdin_task.cancel() - - asyncio.create_task(stdin_watchdog()) - + cancelled = False try: start = time.time() - self.logger.debug("Start reading audio data from source") - async for chunk in self.audio_input: - if not audio_received.is_set(): - audio_received.set() + self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...") + async for chunk in TimedAsyncGenerator(self.audio_input, timeout=30): await self.write(chunk) - self.logger.debug("Audio data source exhausted in %.2fs", time.time() - start) + self.logger.log( + VERBOSE_LOG_LEVEL, "Audio data source exhausted in %.2fs", time.time() - start + ) generator_exhausted = True - if not audio_received.is_set(): - raise AudioError("No audio data received from source") except Exception as err: - if isinstance(err, asyncio.CancelledError): - return - self.logger.error( - "Stream error: %s", - str(err) or err.__class__.__name__, - exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None, - ) + cancelled = isinstance(err, asyncio.CancelledError) + if not cancelled: + self.logger.error( + "Stream error: %s", + str(err) or err.__class__.__name__, + exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None, + ) finally: - await self.write_eof() + if not cancelled: + await self.write_eof() # we need to ensure that we close the async generator # if we get cancelled otherwise it keeps lingering forever if not generator_exhausted: