Skip to content

Commit

Permalink
Add clippy.toml to disallow methods (#204)
Browse files Browse the repository at this point in the history
* add clippy.toml to disallow methods

* remove use of mem::zeroed in xcoder, where possible

* remove unbounded channels in macos/video-toolbox

* remove use of zeroed in xcoder-logan

* remove use of zeroed in srt crate

* bump ffmpeg test bound up even higher to reduce test flakiness

* Change compression session to use smaller channel and never block

* Change decompression session to use a mutex instead of a channel

* video-toolbox: restructure error to better handle when the inner channel is dropped

* Document channel drop behavior for fn frames
  • Loading branch information
Xaeroxe authored Aug 20, 2024
1 parent a5d72a6 commit 89c92f9
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 114 deletions.
18 changes: 18 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
disallowed-methods = [
# These functions are very easy to misuse, please use `MaybeUninit` instead.
"std::mem::zeroed",
"std::mem::uninitialized",
# Unbounded memory growth is not approved for use. Use a bounded channel buffer.
"std::sync::mpsc::channel",
"tokio::sync::mpsc::unbounded_channel",
"crossbeam_channel::unbounded",
# These methods poll futures in a biased manner without containing the word "biased" in their name.
# This is a footgun and can result in the system starving itself out during high load scenarios.
# See https://github.com/rust-lang/futures-rs/issues/2135 for more.
"futures::future::select",
"futures_util::future::select",
"futures::future::try_select",
"futures_util::future::try_select",
"futures::future::select_ok",
"futures_util::future::select_ok",
]
34 changes: 22 additions & 12 deletions macos/video-toolbox/src/compression_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
};

struct CallbackContext<C> {
frames: mpsc::Sender<Result<CompressionSessionOutputFrame<C>, OSStatus>>,
frames: Option<mpsc::SyncSender<Result<CompressionSessionOutputFrame<C>, OSStatus>>>,
_pinned: PhantomPinned,
}

Expand Down Expand Up @@ -40,10 +40,10 @@ pub struct CompressionSessionConfig {

impl<C: Send> CompressionSession<C> {
pub fn new(config: CompressionSessionConfig) -> Result<Self, OSStatus> {
let (tx, rx) = mpsc::channel();
let (tx, rx) = mpsc::sync_channel(120);

let callback_context = Box::pin(CallbackContext {
frames: tx,
frames: Some(tx),
_pinned: PhantomPinned,
});

Expand All @@ -54,16 +54,24 @@ impl<C: Send> CompressionSession<C> {
_info_flags: sys::VTDecodeInfoFlags,
sample_buffer: sys::CMSampleBufferRef,
) {
let ctx = &*(output_callback_ref_con as *mut CallbackContext<C>);
// SAFETY: Panicking is not allowed across an FFI boundary. If you add code that may panic here
// then you must wrap it in `std::panic::catch_unwind`.
let ctx = &mut *(output_callback_ref_con as *mut CallbackContext<C>);
let frame_context = *Box::<C>::from_raw(source_frame_ref_con as *mut C);
let _ = ctx.frames.send(result(status.into()).map(|_| CompressionSessionOutputFrame {
sample_buffer: if sample_buffer.is_null() {
None
} else {
Some(SampleBuffer::from_get_rule(sample_buffer as _))
},
context: frame_context,
}));
if let Some(frames) = ctx.frames.as_ref() {
let r = frames.try_send(result(status.into()).map(|_| CompressionSessionOutputFrame {
sample_buffer: if sample_buffer.is_null() {
None
} else {
Some(SampleBuffer::from_get_rule(sample_buffer as _))
},
context: frame_context,
}));
if r.is_err() {
// Send correct data, or send nothing at all. Do not block.
ctx.frames = None;
}
}
}

let mut sess = std::ptr::null_mut();
Expand Down Expand Up @@ -121,6 +129,8 @@ impl<C: Send> CompressionSession<C> {
unsafe { result(sys::VTCompressionSessionPrepareToEncodeFrames(self.inner.0).into()) }
}

/// Note: The sending half of this channel will be dropped when the channel exceeds its buffer capacity.
/// If that happens then you are not polling this channel frequently enough.
pub fn frames(&self) -> &mpsc::Receiver<Result<CompressionSessionOutputFrame<C>, OSStatus>> {
&self.frames
}
Expand Down
26 changes: 17 additions & 9 deletions macos/video-toolbox/src/decompression_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use super::sys;
use core_foundation::{result, CFType, OSStatus};
use core_media::{BlockBuffer, SampleBuffer, VideoFormatDescription};
use core_video::ImageBuffer;
use std::{pin::Pin, sync::mpsc};
use std::{
pin::Pin,
sync::{Arc, Mutex},
};

pub struct DecompressionSession(sys::VTDecompressionSessionRef);
core_foundation::trait_impls!(DecompressionSession);
Expand Down Expand Up @@ -44,14 +47,18 @@ impl DecompressionSession {
}

pub fn decode_frame(&mut self, frame_data: &[u8], format_desc: &VideoFormatDescription) -> Result<ImageBuffer, OSStatus> {
let (tx, rx) = mpsc::channel();
let image_mutex = Arc::new(Mutex::new(None));
let cb_image_mutex = image_mutex.clone();
let mut cb: Pin<Box<Callback>> = Box::pin(Box::new(move |status, image_buffer| {
tx.send(if image_buffer.is_null() {
Err(status.into())
} else {
result(status.into()).map(|_| unsafe { ImageBuffer::from_get_rule(image_buffer as _) })
})
.unwrap();
// SAFETY: Panicking is not allowed across an FFI boundary. If you add code that may panic here
// then you must wrap it in `std::panic::catch_unwind`.
if let Ok(mut lock) = cb_image_mutex.try_lock() {
*lock = Some(if image_buffer.is_null() {
Err(status.into())
} else {
result(status.into()).map(|_| unsafe { ImageBuffer::from_get_rule(image_buffer as _) })
});
}
}));
result(
unsafe {
Expand All @@ -67,7 +74,8 @@ impl DecompressionSession {
}
.into(),
)?;
rx.try_recv().unwrap()
let mut mutex_lock = image_mutex.lock().expect("lock shouldn't be poisoned");
mutex_lock.take().expect("mutex should be populated")
}
}

Expand Down
17 changes: 12 additions & 5 deletions macos/video-toolbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ pub use video_encoder_list::{Encoder, EncoderList};
pub mod decompression_session;
pub use decompression_session::*;

pub struct Error {
context: &'static str,
inner: OSStatus,
pub enum Error {
OSStatus { context: &'static str, inner: OSStatus },
InnerChannelDropped,
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.context, self.inner)
match self {
Self::OSStatus { context, inner } => {
write!(f, "{}: {}", context, inner)
}
Self::InnerChannelDropped => {
write!(f, "inner channel exceeded capacity and failed; channel must be polled more often")
}
}
}
}

Expand All @@ -50,6 +57,6 @@ trait ResultExt<T> {

impl<T> ResultExt<T> for Result<T, OSStatus> {
fn context(self, context: &'static str) -> Result<T, Error> {
self.map_err(|inner| Error { context, inner })
self.map_err(|inner| Error::OSStatus { context, inner })
}
}
56 changes: 36 additions & 20 deletions macos/video-toolbox/src/video_encoder.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::*;
use av_traits::{EncodedFrameType, EncodedVideoFrame, RawVideoFrame, VideoEncoderOutput};
use core_foundation::{self as cf, Array, Boolean, CFType, Dictionary, MutableDictionary, Number, OSStatus};
use core_foundation::{self as cf, Array, Boolean, CFType, Dictionary, MutableDictionary, Number};
use core_media::{Time, VideoCodecType};
use core_video::{PixelBuffer, PixelBufferPlane};
use std::pin::Pin;
use std::{pin::Pin, sync::mpsc::TryRecvError};

#[derive(Copy, Clone, Debug)]
pub enum H265ProfileLevel {
Expand Down Expand Up @@ -207,9 +207,9 @@ impl<F: Send> VideoEncoder<F> {
Ok(Self { sess, config, frame_count: 0 })
}

fn next_video_encoder_trait_frame(&mut self) -> Result<Option<VideoEncoderOutput<F>>, OSStatus> {
Ok(match self.sess.frames().try_recv().ok().transpose()? {
Some(frame) => {
fn next_video_encoder_trait_frame(&mut self) -> Result<Option<VideoEncoderOutput<F>>, Error> {
match self.sess.frames().try_recv() {
Ok(Ok(frame)) => {
let raw_frame = unsafe { *Pin::into_inner_unchecked(frame.context) };
let Some(sample_buffer) = frame.sample_buffer else {
return Ok(Some(VideoEncoderOutput {
Expand All @@ -232,13 +232,23 @@ impl<F: Send> VideoEncoder<F> {
}

let data_buffer = sample_buffer.data_buffer().expect("all frames should have data");
let data_buffer = data_buffer.create_contiguous(0, 0)?;
let mut avcc_data = data_buffer.data(0)?;
let data_buffer = data_buffer.create_contiguous(0, 0).context("create contiguous data buffer")?;
let mut avcc_data = data_buffer.data(0).context("access first entry of data buffer")?;

let format_desc = sample_buffer.format_description().expect("all frames should have format descriptions");
let prefix_len = match self.config.codec {
VideoEncoderCodec::H264 { .. } => format_desc.h264_parameter_set_at_index(0)?.nal_unit_header_length,
VideoEncoderCodec::H265 { .. } => format_desc.hevc_parameter_set_at_index(0)?.nal_unit_header_length,
VideoEncoderCodec::H264 { .. } => {
format_desc
.h264_parameter_set_at_index(0)
.context("h264_parameter_set_at_index")?
.nal_unit_header_length
}
VideoEncoderCodec::H265 { .. } => {
format_desc
.hevc_parameter_set_at_index(0)
.context("hevc_parameter_set_at_index")?
.nal_unit_header_length
}
};

let mut data = Vec::with_capacity(avcc_data.len() + 100);
Expand All @@ -248,14 +258,14 @@ impl<F: Send> VideoEncoder<F> {
match self.config.codec {
VideoEncoderCodec::H264 { .. } => {
for i in 0..2 {
let ps = format_desc.h264_parameter_set_at_index(i)?;
let ps = format_desc.h264_parameter_set_at_index(i).context("h264_parameter_set_at_index - keyframe")?;
data.extend_from_slice(&[0, 0, 0, 1]);
data.extend_from_slice(ps.data);
}
}
VideoEncoderCodec::H265 { .. } => {
for i in 0..3 {
let ps = format_desc.hevc_parameter_set_at_index(i)?;
let ps = format_desc.hevc_parameter_set_at_index(i).context("hevc_parameter_set_at_index - keyframe")?;
data.extend_from_slice(&[0, 0, 0, 1]);
data.extend_from_slice(ps.data);
}
Expand All @@ -280,15 +290,17 @@ impl<F: Send> VideoEncoder<F> {

let encoded_frame = Some(EncodedVideoFrame { data, is_keyframe });

Some(VideoEncoderOutput { encoded_frame, raw_frame })
Ok(Some(VideoEncoderOutput { encoded_frame, raw_frame }))
}
None => None,
})
Ok(Err(e)) => Err(e).context("error from os session"),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(Error::InnerChannelDropped),
}
}
}
impl<F: RawVideoFrame<u8> + Send + Unpin> av_traits::VideoEncoder for VideoEncoder<F> {
type Error = OSStatus;
type Error = Error;
type RawVideoFrame = F;
fn encode(&mut self, input: Self::RawVideoFrame, frame_type: EncodedFrameType) -> Result<Option<VideoEncoderOutput<Self::RawVideoFrame>>, Self::Error> {
Expand Down Expand Up @@ -320,7 +332,8 @@ impl<F: RawVideoFrame<u8> + Send + Unpin> av_traits::VideoEncoder for VideoEncod
data: input.samples(2).as_ptr() as _,
},
],
)?
)
.context("Yuv420Planar - with_planar_bytes")?
},
VideoEncoderInputFormat::Yuv444Planar => unsafe {
PixelBuffer::with_planar_bytes(
Expand All @@ -347,7 +360,8 @@ impl<F: RawVideoFrame<u8> + Send + Unpin> av_traits::VideoEncoder for VideoEncod
data: input.samples(2).as_ptr() as _,
},
],
)?
)
.context("Yuv444Planar - with_planar_bytes")?
},
VideoEncoderInputFormat::Bgra => unsafe {
PixelBuffer::with_bytes(
Expand All @@ -356,7 +370,8 @@ impl<F: RawVideoFrame<u8> + Send + Unpin> av_traits::VideoEncoder for VideoEncod
sys::kCVPixelFormatType_32BGRA,
4,
input.samples(0).as_ptr() as _,
)?
)
.context("Bgra - with_bytes")?
},
};

Expand All @@ -366,12 +381,13 @@ impl<F: RawVideoFrame<u8> + Send + Unpin> av_traits::VideoEncoder for VideoEncod
let frame_number = self.frame_count;
self.frame_count += 1;
self.sess
.encode_frame(pixel_buffer.into(), Time::new((fps_den * frame_number) as _, fps_num as _), input, frame_type)?;
.encode_frame(pixel_buffer.into(), Time::new((fps_den * frame_number) as _, fps_num as _), input, frame_type)
.context("session encode frame")?;
self.next_video_encoder_trait_frame()
}

fn flush(&mut self) -> Result<Option<VideoEncoderOutput<Self::RawVideoFrame>>, Self::Error> {
self.sess.flush()?;
self.sess.flush().context("flush session")?;
self.next_video_encoder_trait_frame()
}
}
Expand Down
5 changes: 3 additions & 2 deletions srt/src/async_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ impl<'a, 'c> Future for Accept<'a, 'c> {
let api = self.listener.socket.api.clone();
// TODO: use epoll instead of spawn_blocking
let mut handle = spawn_blocking(move || {
let mut storage: sys::sockaddr_storage = unsafe { mem::zeroed() };
let mut storage = mem::MaybeUninit::<sys::sockaddr_storage>::zeroed();
let mut len = mem::size_of_val(&storage) as sys::socklen_t;
let sock = unsafe { sys::srt_accept(sock, &mut storage as *mut _ as *mut _, &mut len as *mut _ as *mut _) };
let sock = unsafe { sys::srt_accept(sock, storage.as_mut_ptr() as *mut _, &mut len as *mut _ as *mut _) };
let socket = Socket { api, sock };
let storage = unsafe { storage.assume_init() };
let addr = sockaddr_from_storage(&storage, len)?;
Ok((AsyncStream::new(socket.get(sys::SRT_SOCKOPT_SRTO_STREAMID)?, socket)?, addr))
});
Expand Down
27 changes: 15 additions & 12 deletions srt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,12 @@ impl Socket {

fn raw_stats(&mut self, clear: bool, instantaneous: bool) -> Result<sys::SRT_TRACEBSTATS> {
unsafe {
let mut perf: sys::SRT_TRACEBSTATS = mem::zeroed();
let mut perf = mem::MaybeUninit::<sys::SRT_TRACEBSTATS>::zeroed();
check_code(
"srt_bistats",
sys::srt_bistats(self.raw(), &mut perf, if clear { 1 } else { 0 }, if instantaneous { 1 } else { 0 }),
sys::srt_bistats(self.raw(), perf.as_mut_ptr(), if clear { 1 } else { 0 }, if instantaneous { 1 } else { 0 }),
)?;
Ok(perf)
Ok(perf.assume_init())
}
}

Expand Down Expand Up @@ -368,24 +368,24 @@ fn sockaddr_from_storage(storage: &sys::sockaddr_storage, len: sys::socklen_t) -

fn to_sockaddr(addr: &SocketAddr) -> (sys::sockaddr_storage, sys::socklen_t) {
use libc::{AF_INET, AF_INET6};
let mut storage: sys::sockaddr_storage = unsafe { mem::zeroed() };
let mut storage = mem::MaybeUninit::<sys::sockaddr_storage>::zeroed();
let socklen = match addr {
SocketAddr::V4(ref a) => {
let storage = unsafe { &mut *(&mut storage as *mut _ as *mut sockaddr_in) };
let storage = unsafe { &mut *(storage.as_mut_ptr() as *mut sockaddr_in) };
storage.sin_family = AF_INET as _;
storage.sin_port = u16::to_be(a.port());
storage.sin_addr.s_addr = u32::from_ne_bytes(a.ip().octets());
mem::size_of::<sockaddr_in>()
}
SocketAddr::V6(ref a) => {
let storage = unsafe { &mut *(&mut storage as *mut _ as *mut sockaddr_in6) };
let storage = unsafe { &mut *(storage.as_mut_ptr() as *mut sockaddr_in6) };
storage.sin6_family = AF_INET6 as _;
storage.sin6_port = u16::to_be(a.port());
storage.sin6_addr.s6_addr.copy_from_slice(&a.ip().octets());
mem::size_of::<sockaddr_in6>()
}
};
(storage, socklen as _)
(unsafe { storage.assume_init() }, socklen as _)
}

pub trait ListenerCallback: Send + Sync {
Expand Down Expand Up @@ -495,12 +495,15 @@ impl<'c> Listener<'c> {
}

pub fn accept(&self) -> Result<(Stream, SocketAddr)> {
let mut storage: sys::sockaddr_storage = unsafe { mem::zeroed() };
let mut storage = mem::MaybeUninit::<sys::sockaddr_storage>::zeroed();
let mut len = mem::size_of_val(&storage) as sys::socklen_t;
let sock = unsafe { sys::srt_accept(self.socket.raw(), &mut storage as *mut _ as *mut _, &mut len as *mut _ as *mut _) };
let socket = Socket {
api: self.socket.api.clone(),
sock,
let (storage, socket) = unsafe {
let sock = sys::srt_accept(self.socket.raw(), storage.as_mut_ptr() as *mut _, &mut len as *mut _ as *mut _);
let socket = Socket {
api: self.socket.api.clone(),
sock,
};
(storage.assume_init(), socket)
};
let addr = sockaddr_from_storage(&storage, len)?;
let max_send_payload_size = socket
Expand Down
2 changes: 1 addition & 1 deletion x264/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ mod test {

assert_eq!(encoded_frames, 90);
assert!(encoded.len() > 5000, "encoded.len() = {}", encoded.len());
assert!(encoded.len() < 20000, "encoded.len() = {}", encoded.len());
assert!(encoded.len() < 30000, "encoded.len() = {}", encoded.len());

// To inspect the output, uncomment these lines:
//use std::io::Write;
Expand Down
Loading

0 comments on commit 89c92f9

Please sign in to comment.