Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose an AsyncReadOnlySource #256

Merged
merged 1 commit into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 3 additions & 55 deletions src/input/sources/hls.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
use std::{
io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
pin::Pin,
task::{Context, Poll},
};

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use pin_project::pin_project;
use reqwest::{header::HeaderMap, Client};
use stream_lib::Event;
use symphonia_core::io::MediaSource;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
use tokio_util::io::StreamReader;

use crate::input::{
AsyncAdapterStream,
AsyncMediaSource,
AsyncReadOnlySource,
AudioStream,
AudioStreamError,
Compose,
Expand Down Expand Up @@ -51,7 +43,7 @@ impl HlsRequest {
}
}

fn create_stream(&mut self) -> Result<HlsStream, AudioStreamError> {
fn create_stream(&mut self) -> Result<AsyncReadOnlySource, AudioStreamError> {
let request = self
.client
.get(&self.request)
Expand All @@ -70,51 +62,7 @@ impl HlsRequest {
)),
})));

Ok(HlsStream { stream })
}
}

#[pin_project]
struct HlsStream {
#[pin]
stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
}

impl AsyncRead for HlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
AsyncRead::poll_read(self.project().stream, cx, buf)
}
}

impl AsyncSeek for HlsStream {
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> {
Err(IoErrorKind::Unsupported.into())
}

fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<u64>> {
unreachable!()
}
}

#[async_trait]
impl AsyncMediaSource for HlsStream {
fn is_seekable(&self) -> bool {
false
}

async fn byte_len(&self) -> Option<u64> {
None
}

async fn try_resume(
&mut self,
_offset: u64,
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
Ok(AsyncReadOnlySource { stream })
}
}

Expand Down
81 changes: 81 additions & 0 deletions src/input/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,84 @@ mod http;
mod ytdl;

pub use self::{file::*, hls::*, http::*, ytdl::*};

use std::{
io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
pin::Pin,
task::{Context, Poll},
};

use async_trait::async_trait;
use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};

use crate::input::{AsyncMediaSource, AudioStreamError};

/// `AsyncReadOnlySource` wraps any source implementing [`tokio::io::AsyncRead`] in an unseekable
/// [`symphonia_core::io::MediaSource`], similar to [`symphonia_core::io::ReadOnlySource`]
#[pin_project]
pub struct AsyncReadOnlySource {
#[pin]
stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
}

impl AsyncReadOnlySource {
/// Instantiates a new `AsyncReadOnlySource` by taking ownership and wrapping the provided
/// `Read`er.
pub fn new<R>(inner: R) -> Self
where
R: AsyncRead + Send + Sync + Unpin + 'static,
{
AsyncReadOnlySource {
stream: Box::new(inner),
}
}

/// Gets a reference to the underlying reader.
pub fn get_ref(&self) -> &Box<dyn AsyncRead + Send + Sync + Unpin> {
&self.stream
}

/// Unwraps this `AsyncReadOnlySource`, returning the underlying reader.
pub fn into_inner<R>(self) -> Box<dyn AsyncRead + Send + Sync + Unpin> {
self.stream.into()
}
}

impl AsyncRead for AsyncReadOnlySource {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
AsyncRead::poll_read(self.project().stream, cx, buf)
}
}

impl AsyncSeek for AsyncReadOnlySource {
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> {
Err(IoErrorKind::Unsupported.into())
}

fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<u64>> {
unreachable!()
}
}

#[async_trait]
impl AsyncMediaSource for AsyncReadOnlySource {
fn is_seekable(&self) -> bool {
false
}

async fn byte_len(&self) -> Option<u64> {
None
}

async fn try_resume(
&mut self,
_offset: u64,
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
}
}
Loading