Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter data sent to gsteamer using magic bytes #50

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
266fd67
Use magick headers for deciding what to do with the data
QuantumEntangledAndy Jul 28, 2020
1acb893
Include whole frame even the magick header
QuantumEntangledAndy Jul 28, 2020
67f42fe
Refractor video magic bytes into BinaryData
QuantumEntangledAndy Jul 29, 2020
317f4cd
Adding more structure to the deserialised video frames
QuantumEntangledAndy Jul 30, 2020
3ee6cdd
Using getters for the VideoI and PFrames
QuantumEntangledAndy Jul 30, 2020
152282a
Attempt to keep track of chunked video frames
QuantumEntangledAndy Jul 30, 2020
7ecd089
Use little endian and some bug fixes
QuantumEntangledAndy Jul 30, 2020
9a3b831
Refractor assuming extra binary bytes are junk and use single BinaryData
QuantumEntangledAndy Jul 31, 2020
011a639
A basic continuation model using contex, also rustfmt-ed
QuantumEntangledAndy Jul 31, 2020
1f98cf5
Ok bug fixes to my continuation model and rustfmt again
QuantumEntangledAndy Jul 31, 2020
9d4ebe0
Hashmaps in the contiuation model
QuantumEntangledAndy Jul 31, 2020
cf7efca
Refractor continuation logic to seperate subscription getter
QuantumEntangledAndy Aug 2, 2020
c5948f3
Rustfmt-ed
QuantumEntangledAndy Aug 2, 2020
8ee375f
Merge branch 'master' into magic_headers
QuantumEntangledAndy Aug 3, 2020
963e479
Use a deque and process media packets as a substream
QuantumEntangledAndy Aug 3, 2020
e973b1e
Merge branch 'master' into magic_headers
QuantumEntangledAndy Aug 3, 2020
832907f
Rust fmt-ed
QuantumEntangledAndy Aug 3, 2020
b5584bb
Clean up to continuation code
QuantumEntangledAndy Aug 3, 2020
eb3abb4
Seperate MediaData logic into its own module
QuantumEntangledAndy Aug 3, 2020
66fbd99
Using own dockerhub
QuantumEntangledAndy Aug 5, 2020
25ba342
Merge branch 'master' into magic_headers
QuantumEntangledAndy Aug 5, 2020
e580985
Make docker hub usernames etc based on repo names
QuantumEntangledAndy Aug 5, 2020
f6dd3b7
Move media packet to bc_protocol module and implement adv header checks
QuantumEntangledAndy Aug 5, 2020
00e4678
String formatting for docker publish
QuantumEntangledAndy Aug 5, 2020
024659e
Rustfmt-ed
QuantumEntangledAndy Aug 5, 2020
07257ed
Remove uneeded function from media_packet.rs
QuantumEntangledAndy Aug 9, 2020
4dd4ffe
Removed unused expected_num_packets
QuantumEntangledAndy Aug 11, 2020
7ad7706
Use const to explain the pad_size
QuantumEntangledAndy Aug 11, 2020
018c70a
Remove continue packet type
QuantumEntangledAndy Aug 11, 2020
e5f5798
Removed uneeded pup from media_packet
QuantumEntangledAndy Aug 11, 2020
11396df
Using bytes instead of string for header checks
QuantumEntangledAndy Aug 11, 2020
b0378e0
Clippy linting
QuantumEntangledAndy Aug 11, 2020
d69f1cc
Remove dead code and vars from media_packet
QuantumEntangledAndy Aug 11, 2020
efd79ba
Use simple magic header checks and remove Invalid Media Packet
QuantumEntangledAndy Aug 11, 2020
d44b1dc
Rename MAGIC_LEN to MAGIC_SIZE
QuantumEntangledAndy Aug 12, 2020
9ca38b1
Add warning when hitting unexpected data in the stream
QuantumEntangledAndy Aug 12, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@ jobs:
- name: Check out the repo
uses: actions/checkout@v2
if: ${{ steps.vars.outputs.HAS_SECRET_TOKEN }}
- name: Convert username to lower case for docker
id: string_user
uses: ASzc/change-string-case-action@v1
with:
string: ${{ github.repository_owner }}
- name: Convert repo to lower case for docker
id: string_repo
uses: ASzc/change-string-case-action@v1
with:
string: ${{ github.repository }}
- name: Push to Docker Hub
uses: docker/build-push-action@v1
if: ${{ steps.vars.outputs.HAS_SECRET_TOKEN }}
with:
username: thirtythreeforty
username: ${{ steps.string_user.outputs.lowercase }}
password: ${{ secrets.DOCKER_TOKEN }}
registry: docker.io
repository: thirtythreeforty/neolink
repository: ${{ steps.string_repo.outputs.lowercase }}
tag_with_ref: true
6 changes: 1 addition & 5 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ fn git_ver() -> Option<String> {
let mut git_cmd = Command::new("git");
git_cmd.args(&["describe", "--tags"]);

if let Some(true) = git_cmd
.status()
.ok()
.map(|exit| exit.success())
{
if let Some(true) = git_cmd.status().ok().map(|exit| exit.success()) {
println!("cargo:rerun-if-changed=.git/HEAD");
git_cmd
.output()
Expand Down
29 changes: 16 additions & 13 deletions src/bc_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use self::connection::BcConnection;
use self::media_packet::{MediaDataKind, MediaDataSubscriber};
use crate::bc;
use crate::bc::{model::*, xml::*};
use err_derive::Error;
Expand All @@ -11,6 +12,7 @@ use std::time::Duration;
use Md5Trunc::*;

mod connection;
mod media_packet;
mod time;

pub struct BcCamera {
Expand Down Expand Up @@ -48,6 +50,9 @@ pub enum Error {
#[error(display = "Timeout")]
Timeout(#[error(source)] std::sync::mpsc::RecvTimeoutError),

#[error(display = "Media")]
MediaPacket(#[error(source)] self::media_packet::Error),

#[error(display = "Credential error")]
AuthFailed,

Expand Down Expand Up @@ -266,20 +271,18 @@ impl BcCamera {

sub_video.send(start_video)?;

let mut media_sub = MediaDataSubscriber::from_bc_sub(&sub_video);

loop {
trace!("Getting video message...");
let msg = sub_video.rx.recv_timeout(RX_TIMEOUT)?;
if let BcBody::ModernMsg(ModernMsg {
binary: Some(binary),
..
}) = msg.body
{
trace!("Got {} bytes of video data", binary.len());
data_out.write_all(binary.as_slice())?;
} else {
warn!("Ignoring weird video message");
debug!("Contents: {:?}", msg);
}
let binary_data = media_sub.next_media_packet(RX_TIMEOUT)?;
// We now have a complete interesting packet. Send it to gst.
// Process the packet
match binary_data.kind() {
MediaDataKind::VideoDataIframe | MediaDataKind::VideoDataPframe => {
data_out.write_all(binary_data.body())?;
}
_ => {}
};
}
}
}
Expand Down
248 changes: 248 additions & 0 deletions src/bc_protocol/media_packet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
use crate::bc::model::*;
use crate::bc_protocol::connection::BcSubscription;
use err_derive::Error;
use log::trace;
use log::*;
use std::collections::VecDeque;
use std::convert::TryInto;
use std::time::Duration;

const INVALID_MEDIA_PACKETS: &[MediaDataKind] = &[
MediaDataKind::Unknown,
];

// MAGIC_SIZE: Number of bytes needed to get magic header type, represets minimum bytes to pull from the
// stream
const MAGIC_SIZE: usize = 4;
// PAD_SIZE: Media packets use 8 byte padding
const PAD_SIZE: usize = 8;

type Result<T> = std::result::Result<T, Error>;

#[derive(Debug, Error)]
pub enum Error {
#[error(display = "Timeout")]
Timeout(#[error(source)] std::sync::mpsc::RecvTimeoutError),
}

#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
pub enum MediaDataKind {
VideoDataIframe,
VideoDataPframe,
AudioDataAac,
AudioDataAdpcm,
InfoData,
Unknown,
}

#[derive(Debug, PartialEq, Eq)]
pub struct MediaData {
data: Vec<u8>,
}

impl MediaData {
pub fn body(&self) -> &[u8] {
let lower_limit = self.header_size();
let upper_limit = self.data_size() + lower_limit;
&self.data[lower_limit..upper_limit]
}

fn header_size_from_kind(kind: MediaDataKind) -> usize {
match kind {
MediaDataKind::VideoDataIframe => 32,
MediaDataKind::VideoDataPframe => 24,
MediaDataKind::AudioDataAac => 8,
MediaDataKind::AudioDataAdpcm => 16,
MediaDataKind::InfoData => 32,
MediaDataKind::Unknown => 0,
}
}

fn header_size_from_raw(data: &[u8]) -> usize {
let kind = MediaData::kind_from_raw(data);
MediaData::header_size_from_kind(kind)
}

fn header_size(&self) -> usize {
MediaData::header_size_from_raw(&self.data)
}

fn data_size_from_raw(data: &[u8]) -> usize {
let kind = MediaData::kind_from_raw(data);
match kind {
MediaDataKind::VideoDataIframe => MediaData::bytes_to_size(&data[8..12]),
MediaDataKind::VideoDataPframe => MediaData::bytes_to_size(&data[8..12]),
MediaDataKind::AudioDataAac => MediaData::bytes_to_size(&data[4..6]),
MediaDataKind::AudioDataAdpcm => MediaData::bytes_to_size(&data[4..6]),
MediaDataKind::InfoData => 0, // The bytes in MediaData::bytes_to_size(&data[4..8]) seem to be the size of the header
MediaDataKind::Unknown => data.len(),
}
}

fn data_size(&self) -> usize {
MediaData::data_size_from_raw(&self.data)
}

fn pad_size_from_raw(data: &[u8]) -> usize {
let data_size = MediaData::data_size_from_raw(data);
match data_size % PAD_SIZE {
0 => 0,
n => PAD_SIZE - n,
}
}

fn bytes_to_size(bytes: &[u8]) -> usize {
match bytes.len() {
// 8 Won't fit into usize on a 32-bit machine
4 => u32::from_le_bytes(bytes.try_into().expect("slice with incorrect length"))
.try_into()
.expect("u32 won't fit into usize"),
2 => u16::from_le_bytes(bytes.try_into().expect("slice with incorrect length"))
.try_into()
.expect("u16 won't fit into usize"),
1 => u8::from_le_bytes(bytes.try_into().expect("slice with incorrect length"))
.try_into()
.expect("u8 won't fit into usize"),
_ => unreachable!(),
}
}

fn kind_from_raw(data: &[u8]) -> MediaDataKind {
// When calling this ensure you have enough data for header_size +2
// Else full_header_check_from_kind will fail because we check the
// First two bytes after the header for the audio stream
// Since AAC and ADMPC streams start in a predicatble manner
assert!(data.len() >= MAGIC_SIZE, "At least four bytes needed to get media packet type");
const MAGIC_VIDEO_INFO_V1: &[u8] = &[0x31, 0x30, 0x30, 0x31];
const MAGIC_VIDEO_INFO_V2: &[u8] = &[0x31, 0x30, 0x30, 0x32];
const MAGIC_AAC: &[u8] = &[0x30, 0x35, 0x77, 0x62];
const MAGIC_ADPCM: &[u8] = &[0x30, 0x31, 0x77, 0x62];
const MAGIC_IFRAME: &[u8] = &[0x30, 0x30, 0x64, 0x63];
const MAGIC_PFRAME: &[u8] = &[0x30, 0x31, 0x64, 0x63];

let magic = &data[..MAGIC_SIZE];
match magic {
MAGIC_VIDEO_INFO_V1 | MAGIC_VIDEO_INFO_V2 => MediaDataKind::InfoData,
MAGIC_AAC => MediaDataKind::AudioDataAac,
MAGIC_ADPCM => MediaDataKind::AudioDataAdpcm,
MAGIC_IFRAME => MediaDataKind::VideoDataIframe,
MAGIC_PFRAME => MediaDataKind::VideoDataPframe,
_ => {
trace!("Unknown magic kind: {:x?}", &magic);
MediaDataKind::Unknown
}
}
}

pub fn kind(&self) -> MediaDataKind {
MediaData::kind_from_raw(&self.data)
}
}

pub struct MediaDataSubscriber<'a> {
binary_buffer: VecDeque<u8>,
bc_sub: &'a BcSubscription<'a>,
}

impl<'a> MediaDataSubscriber<'a> {
pub fn from_bc_sub<'b>(bc_sub: &'b BcSubscription) -> MediaDataSubscriber<'b> {
MediaDataSubscriber {
binary_buffer: VecDeque::new(),
bc_sub,
}
}

fn fill_binary_buffer(&mut self, rx_timeout: Duration) -> Result<()> {
// Loop messages until we get binary add that data and return
loop {
let msg = self.bc_sub.rx.recv_timeout(rx_timeout)?;
if let BcBody::ModernMsg(ModernMsg {
binary: Some(binary),
..
}) = msg.body
{
// Add the new binary to the buffer and return
self.binary_buffer.extend(binary);
break;
}
}
Ok(())
}

fn advance_to_media_packet(&mut self, rx_timeout: Duration) -> Result<()> {
// In the event we get an unknown packet we advance by brute force
// reading of bytes to the next valid magic
while self.binary_buffer.len() < MAGIC_SIZE {
self.fill_binary_buffer(rx_timeout)?;
}

// Check the kind, if its invalid use pop a byte and try again
let mut magic =
MediaDataSubscriber::get_first_n_deque(&self.binary_buffer, MAGIC_SIZE);
if INVALID_MEDIA_PACKETS.contains(&MediaData::kind_from_raw(&magic)) {
warn!("Possibly truncated packet or unknown magic in stream");
trace!("Unknown magic was: {:x?}", &magic);
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this if block - it does the same check as the while beneath it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if check is to print once only at the beginning of the loop not on every step of the loop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep it in as its a good way to detect new magic and see when the code is skipping a packet. But if you still want to remove that sort of thing from the trace let me know and I'll axe it.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave it but I reserve the right to refactor at some point if I change my mind ;)

while INVALID_MEDIA_PACKETS.contains(&MediaData::kind_from_raw(&magic)) {
self.binary_buffer.pop_front();
while self.binary_buffer.len() < MAGIC_SIZE {
self.fill_binary_buffer(rx_timeout)?;
}
magic = MediaDataSubscriber::get_first_n_deque(&self.binary_buffer, MAGIC_SIZE);
}

Ok(())
}

fn get_first_n_deque<T: std::clone::Clone>(deque: &VecDeque<T>, n: usize) -> Vec<T> {
// NOTE: I want to use make_contiguous
// This will make this func unneeded as we can use
// make_contiguous then as_slices.0
// We won't need the clone in this case either.
// This is an experimental feature.
// It is about to be moved to stable though
// As can be seen from this PR
// https://github.com/rust-lang/rust/pull/74559
thirtythreeforty marked this conversation as resolved.
Show resolved Hide resolved
let slice0 = deque.as_slices().0;
let slice1 = deque.as_slices().1;
if slice0.len() >= n {
slice0[0..n].to_vec()
} else {
let remain = n - slice0.len();
slice0.iter().chain(&slice1[0..remain]).cloned().collect()
}
}

pub fn next_media_packet(
&mut self,
rx_timeout: Duration,
) -> std::result::Result<MediaData, Error> {
// Find the first packet (does nothing if already at one)
self.advance_to_media_packet(rx_timeout)?;

// Get the magic bytes (guaranteed by advance_to_media_packet)
let magic = MediaDataSubscriber::get_first_n_deque(&self.binary_buffer, MAGIC_SIZE);

// Get enough for the full header
let header_size = MediaData::header_size_from_raw(&magic);
while self.binary_buffer.len() < header_size {
self.fill_binary_buffer(rx_timeout)?;
}

// Get enough for the full data + 8 byte buffer
let header = MediaDataSubscriber::get_first_n_deque(&self.binary_buffer, header_size);
let data_size = MediaData::data_size_from_raw(&header);
let pad_size = MediaData::pad_size_from_raw(&header);
let full_size = header_size + data_size + pad_size;
while self.binary_buffer.len() < full_size {
self.fill_binary_buffer(rx_timeout)?;
}

// Pop the full binary buffer
let binary = self.binary_buffer.drain(..full_size);

Ok(MediaData {
data: binary.collect(),
})
}
}
21 changes: 11 additions & 10 deletions src/bc_protocol/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,22 @@ impl BcCamera {
..
}) = msg.body
{
let datetime = match try_build_timestamp(
time_zone, year, month, day, hour, minute, second
) {
Ok(dt) => dt,
Err(e) => return Err(Error::UnintelligibleReply {
reply: msg,
why: "Could not parse date",
})
};
let datetime =
match try_build_timestamp(time_zone, year, month, day, hour, minute, second) {
Ok(dt) => dt,
Err(e) => {
return Err(Error::UnintelligibleReply {
reply: msg,
why: "Could not parse date",
})
}
};

// This code was written in 2020; I'm trying to catch all the possible epochs that
// cameras might reset themselves to. My B800 resets to Jan 1, 1999, but I can't
// guarantee that Reolink won't pick some newer date. Therefore, last year ought
// to be new enough, yet still distant enough that it won't interfere with anything
const BOUNDARY: Date = date!(2019-01-01);
const BOUNDARY: Date = date!(2019 - 01 - 01);

// detect if no time is actually set, and return Ok(None): that is, operation
// succeeded, and there is no time set
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn default_tls_client_auth() -> String {
pub static RESERVED_NAMES: &[&str] = &["anyone", "anonymous"];
fn validate_username(name: &str) -> Result<(), ValidationError> {
if name.trim().is_empty() {
return Err(ValidationError::new("username cannot be empty"))
return Err(ValidationError::new("username cannot be empty"));
}
if RESERVED_NAMES.contains(&name) {
return Err(ValidationError::new("This is a reserved username"));
Expand Down
Loading