Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/generic stream #414

Merged
merged 13 commits into from
Jul 27, 2021
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ rev = "33aa67d"
git = "https://github.com/rust-embedded/cortex-m-rt.git"
rev = "a2e3ad5"

[patch.crates-io.heapless]
git = "https://github.com/quartiq/heapless.git"
branch = "feature/assume-init"

[patch.crates-io.miniconf]
git = "https://github.com/quartiq/miniconf.git"
rev = "9c826f8"
Expand Down
163 changes: 67 additions & 96 deletions scripts/stream_throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,59 @@

Description: Provides a mechanism for measuring Stabilizer stream data throughput.
"""
import argparse
import socket
import collections
import struct
import time
import logging

# Representation of a single UDP packet transmitted by Stabilizer.
Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac'])
# Representation of a single data batch transmitted by Stabilizer.
Packet = collections.namedtuple('Packet', ['index', 'data'])

# The magic header half-word at the start of each packet.
MAGIC_HEADER = 0x057B

# The struct format of the header.
HEADER_FORMAT = '<HBBI'

# All supported formats by this reception script.
#
# The items in this dict are functions that will be provided the sampel batch size and will return
ryan-summers marked this conversation as resolved.
Show resolved Hide resolved
# the struct deserialization code to unpack a single batch.
FORMAT = {
1: lambda batch_size: f'<{batch_size}H{batch_size}H{batch_size}H{batch_size}H'
}

def parse_packet(buf):
""" Attempt to parse packets from the received buffer. """
# Attempt to parse a block from the buffer.
if len(buf) < struct.calcsize(HEADER_FORMAT):
return []

# Parse out the packet header
magic, format_id, batch_size, sequence_number = struct.unpack_from(HEADER_FORMAT, buf)
buf = buf[struct.calcsize(HEADER_FORMAT):]

if magic != MAGIC_HEADER:
logging.warning('Encountered bad magic header: %s', hex(magic))
return []

if format_id not in FORMAT:
ryan-summers marked this conversation as resolved.
Show resolved Hide resolved
raise Exception(f'Unknown format specifier: {format_id}')

frame_format = FORMAT[format_id](batch_size)

batch_count = int(len(buf) / struct.calcsize(frame_format))

packets = []
for offset in range(batch_count):
data = struct.unpack_from(frame_format, buf)
buf = buf[struct.calcsize(frame_format):]
packets.append(Packet(sequence_number + offset, data))
ryan-summers marked this conversation as resolved.
Show resolved Hide resolved

return packets


class Timer:
""" A basic timer for measuring elapsed time periods. """
Expand Down Expand Up @@ -52,134 +97,60 @@ def elapsed(self):
return now - self.start_time


class PacketParser:
""" Utilize class used for parsing received UDP data. """

def __init__(self):
""" Initialize the parser. """
self.buf = b''
self.total_bytes = 0


def ingress(self, data):
""" Ingress received UDP data. """
self.total_bytes += len(data)
self.buf += data


def parse_all_packets(self):
""" Parse all received packets from the receive buffer.

Returns:
A list of received Packets.
"""
packets = []
while True:
new_packets = self._parse()
if new_packets:
packets += new_packets
else:
return packets

def sequence_delta(previous_sequence, next_sequence):
""" Check the number of items between two sequence numbers. """
if previous_sequence is None:
return 0

def _parse(self):
""" Attempt to parse packets from the received buffer. """
# Attempt to parse a block from the buffer.
if len(self.buf) < 4:
return None

start_id, num_blocks, data_size = struct.unpack_from('!HBB', self.buf)

packet_size = 4 + data_size * num_blocks * 8

if len(self.buf) < packet_size:
return None

self.buf = self.buf[4:]

packets = []
for offset in range(num_blocks):
adcs_dacs = struct.unpack_from(f'!{4 * data_size}H', self.buf)
adc = [
adcs_dacs[0:data_size],
adcs_dacs[data_size:2*data_size],
]

dac = [
adcs_dacs[2*data_size: 3*data_size],
adcs_dacs[3*data_size:],
]

self.buf = self.buf[8*data_size:]
packets.append(Packet(start_id + offset, adc, dac))

return packets


def check_index(previous_index, next_index):
""" Check if two indices are sequential. """
if previous_index == -1:
return True

# Handle index roll-over. Indices are only stored in 16-bit numbers.
if next_index < previous_index:
next_index += 65536

expected_index = previous_index + 1

return next_index == expected_index
delta = next_sequence - (previous_sequence + 1)
return delta & 0xFFFFFFFF


def main():
""" Main program. """
parser = argparse.ArgumentParser(description='Measure Stabilizer livestream quality')
parser.add_argument('--port', default=1111, help='The port that stabilizer is streaming to')

args = parser.parse_args()

connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
connection.bind(("", 1111))
connection.bind(("", args.port))

logging.basicConfig(level=logging.INFO,
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s')

last_index = -1
last_index = None

drop_count = 0
good_blocks = 0
total_bytes = 0

timer = Timer()
parser = PacketParser()

while True:
# Receive any data over UDP and parse it.
data = connection.recv(4096)
data = connection.recv(1024)
ryan-summers marked this conversation as resolved.
Show resolved Hide resolved
if data and not timer.is_started():
timer.start()

parser.ingress(data)

# Handle any received packets.
for packet in parser.parse_all_packets():

total_bytes += len(data)
for packet in parse_packet(data):
# Handle any dropped packets.
if not check_index(last_index, packet.index):
print(hex(last_index), hex(packet.index))
if packet.index < (last_index + 1):
dropped = packet.index + 65536 - (last_index + 1)
else:
dropped = packet.index - (last_index + 1)

drop_count += dropped

drop_count += sequence_delta(last_index, packet.index)
last_index = packet.index
good_blocks += 1

# Report the throughput periodically.
if timer.is_triggered():
drate = parser.total_bytes * 8 / 1e6 / timer.elapsed()
drate = total_bytes * 8 / 1e6 / timer.elapsed()

print(f'''
Data Rate: {drate:.3f} Mbps
Received Blocks: {good_blocks}
Dropped blocks: {drop_count}

Metadata: {parser.total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s
Metadata: {total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s
----
''')
timer.arm()
Expand Down
25 changes: 21 additions & 4 deletions src/bin/dual-iir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ use stabilizer::{
adc::{Adc0Input, Adc1Input, AdcCode},
afe::Gain,
dac::{Dac0Output, Dac1Output, DacCode},
design_parameters::SAMPLE_BUFFER_SIZE,
embedded_hal::digital::v2::InputPin,
hal,
signal_generator::{self, SignalGenerator},
system_timer::SystemTimer,
DigitalInput0, DigitalInput1, AFE0, AFE1,
},
net::{
data_stream::{BlockGenerator, StreamTarget},
data_stream::{FrameGenerator, StreamFormat, StreamTarget},
miniconf::Miniconf,
serde::Deserialize,
telemetry::{Telemetry, TelemetryBuffer},
Expand Down Expand Up @@ -169,7 +170,7 @@ const APP: () = {
adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output),
network: NetworkUsers<Settings, Telemetry>,
generator: BlockGenerator,
generator: FrameGenerator,
signal_generator: [SignalGenerator; 2],

settings: Settings,
Expand All @@ -193,7 +194,7 @@ const APP: () = {
stabilizer.net.mac_address,
);

let generator = network.enable_streaming();
let generator = network.enable_streaming(StreamFormat::AdcDacData);

// Spawn a settings update for default settings.
c.spawn.settings_update().unwrap();
Expand Down Expand Up @@ -307,7 +308,23 @@ const APP: () = {
}

// Stream the data.
generator.send(&adc_samples, &dac_samples);
const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::<u16>();
generator.add::<_, { N * 4 }>(|buf| {
for (data, buf) in adc_samples
.iter()
.chain(dac_samples.iter())
.zip(buf.chunks_exact_mut(N))
{
assert_eq!(core::mem::size_of_val(*data), N);
let data = unsafe {
core::slice::from_raw_parts(
data.as_ptr() as *const u8,
N,
)
};
buf.copy_from_slice(data)
}
});

// Update telemetry measurements.
telemetry.adcs =
Expand Down
27 changes: 22 additions & 5 deletions src/bin/lockin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use stabilizer::{
adc::{Adc0Input, Adc1Input, AdcCode},
afe::Gain,
dac::{Dac0Output, Dac1Output, DacCode},
design_parameters::SAMPLE_BUFFER_SIZE,
embedded_hal::digital::v2::InputPin,
hal,
input_stamper::InputStamper,
Expand All @@ -51,7 +52,7 @@ use stabilizer::{
DigitalInput0, DigitalInput1, AFE0, AFE1,
},
net::{
data_stream::{BlockGenerator, StreamTarget},
data_stream::{FrameGenerator, StreamFormat, StreamTarget},
miniconf::Miniconf,
serde::Deserialize,
telemetry::{Telemetry, TelemetryBuffer},
Expand Down Expand Up @@ -208,7 +209,7 @@ const APP: () = {
settings: Settings,
telemetry: TelemetryBuffer,
digital_inputs: (DigitalInput0, DigitalInput1),
generator: BlockGenerator,
generator: FrameGenerator,
signal_generator: signal_generator::SignalGenerator,

timestamper: InputStamper,
Expand All @@ -230,7 +231,7 @@ const APP: () = {
stabilizer.net.mac_address,
);

let generator = network.enable_streaming();
let generator = network.enable_streaming(StreamFormat::AdcDacData);

let settings = Settings::default();

Expand Down Expand Up @@ -394,8 +395,24 @@ const APP: () = {
}
}

// Stream data
generator.send(&adc_samples, &dac_samples);
// Stream the data.
const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::<u16>();
generator.add::<_, { N * 4 }>(|buf| {
for (data, buf) in adc_samples
.iter()
.chain(dac_samples.iter())
.zip(buf.chunks_exact_mut(N))
{
assert_eq!(core::mem::size_of_val(*data), N);
let data = unsafe {
core::slice::from_raw_parts(
data.as_ptr() as *const u8,
N,
)
};
buf.copy_from_slice(data)
}
});

// Update telemetry measurements.
telemetry.adcs =
Expand Down
9 changes: 0 additions & 9 deletions src/hardware/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ pub struct NetStorage {
[Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8],
pub routes_cache:
[Option<(smoltcp::wire::IpCidr, smoltcp::iface::Route)>; 8],

pub dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata; 1],
pub dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata; 1],
pub dhcp_tx_storage: [u8; 600],
pub dhcp_rx_storage: [u8; 600],
}

pub struct UdpSocketStorage {
Expand Down Expand Up @@ -94,10 +89,6 @@ impl Default for NetStorage {
sockets: [None, None, None, None, None, None],
tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS],
udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS],
dhcp_tx_storage: [0; 600],
dhcp_rx_storage: [0; 600],
dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
}
}
}
Expand Down
Loading