Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/generic stream #414

Merged
merged 13 commits into from
Jul 27, 2021
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ rev = "33aa67d"
git = "https://github.com/rust-embedded/cortex-m-rt.git"
rev = "a2e3ad5"

[patch.crates-io.heapless]
git = "https://github.com/quartiq/heapless.git"
branch = "feature/assume-init"

[patch.crates-io.miniconf]
git = "https://github.com/quartiq/miniconf.git"
rev = "9c826f8"
Expand Down
61 changes: 38 additions & 23 deletions scripts/stream_throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,28 @@

Description: Provides a mechanism for measuring Stabilizer stream data throughput.
"""
import argparse
import socket
import collections
import struct
import time
import logging

# Representation of a single UDP packet transmitted by Stabilizer.
Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac'])
# Representation of a single data batch transmitted by Stabilizer.
Packet = collections.namedtuple('Packet', ['index', 'data'])

# Specifies a known format for incoming packets.
#
# * `sample_size_bytes` is the number of bytes required for each sample in the batch.
# * `batch_format` is a `struct` format string that will be provided the `batch_size` as an named
# argument. This format string will be used to deserialize each batch of data from the frame.
Format = collections.namedtuple('Format', ['sample_size_bytes', 'batch_format'])

# All supported formats by this reception script.
FORMAT = {
0: Format(sample_size_bytes=8,
batch_format='<{batch_size}H{batch_size}H{batch_size}H{batch_size}H')
}

class Timer:
""" A basic timer for measuring elapsed time periods. """
Expand Down Expand Up @@ -85,33 +99,29 @@ def parse_all_packets(self):
def _parse(self):
""" Attempt to parse packets from the received buffer. """
# Attempt to parse a block from the buffer.
if len(self.buf) < 4:
if len(self.buf) < 7:
return None

start_id, num_blocks, data_size = struct.unpack_from('!HBB', self.buf)
# Parse out the packet header
start_id, format_id, batch_count, batch_size = struct.unpack_from('<HHHB', self.buf)

packet_size = 4 + data_size * num_blocks * 8
if format_id not in FORMAT:
raise Exception(f'Unknown format specifier: {format_id}')

if len(self.buf) < packet_size:
frame_format = FORMAT[format_id]
required_length = 7 + batch_count * frame_format.sample_size_bytes * batch_size

if len(self.buf) < required_length:
return None

self.buf = self.buf[4:]
self.buf = self.buf[7:]

packets = []
for offset in range(num_blocks):
adcs_dacs = struct.unpack_from(f'!{4 * data_size}H', self.buf)
adc = [
adcs_dacs[0:data_size],
adcs_dacs[data_size:2*data_size],
]

dac = [
adcs_dacs[2*data_size: 3*data_size],
adcs_dacs[3*data_size:],
]

self.buf = self.buf[8*data_size:]
packets.append(Packet(start_id + offset, adc, dac))
for offset in range(batch_count):
format_string = frame_format.batch_format.format(batch_size=batch_size)
data = struct.unpack_from(format_string, self.buf)
self.buf = self.buf[struct.calcsize(format_string):]
packets.append(Packet(start_id + offset, data))

return packets

Expand All @@ -132,8 +142,13 @@ def check_index(previous_index, next_index):

def main():
""" Main program. """
parser = argparse.ArgumentParser(description='Measure Stabilizer livestream quality')
parser.add_argument('--port', default=1111, help='The port that stabilizer is streaming to')

args = parser.parse_args()

connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
connection.bind(("", 1111))
connection.bind(("", args.port))

logging.basicConfig(level=logging.INFO,
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s')
Expand All @@ -159,7 +174,7 @@ def main():

# Handle any dropped packets.
if not check_index(last_index, packet.index):
print(hex(last_index), hex(packet.index))
print(f'Drop from {hex(last_index)} to {hex(packet.index)}')
if packet.index < (last_index + 1):
dropped = packet.index + 65536 - (last_index + 1)
else:
Expand Down
43 changes: 40 additions & 3 deletions src/bin/dual-iir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ use stabilizer::{
adc::{Adc0Input, Adc1Input, AdcCode},
afe::Gain,
dac::{Dac0Output, Dac1Output, DacCode},
design_parameters::SAMPLE_BUFFER_SIZE,
embedded_hal::digital::v2::InputPin,
hal,
signal_generator::{self, SignalGenerator},
system_timer::SystemTimer,
DigitalInput0, DigitalInput1, AFE0, AFE1,
},
net::{
data_stream::{BlockGenerator, StreamTarget},
data_stream::{FrameGenerator, StreamFormat, StreamTarget},
miniconf::Miniconf,
serde::Deserialize,
telemetry::{Telemetry, TelemetryBuffer},
Expand Down Expand Up @@ -169,7 +170,7 @@ const APP: () = {
adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output),
network: NetworkUsers<Settings, Telemetry>,
generator: BlockGenerator,
generator: FrameGenerator,
signal_generator: [SignalGenerator; 2],

settings: Settings,
Expand Down Expand Up @@ -307,7 +308,43 @@ const APP: () = {
}

// Stream the data.
generator.send(&adc_samples, &dac_samples);
generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>(
StreamFormat::AdcDacData,
|buf| unsafe {
let dst = buf.as_ptr() as usize as *mut u16;

let adc0 = &adc_samples[0][0] as *const u16;
core::ptr::copy_nonoverlapping(
adc0,
dst,
SAMPLE_BUFFER_SIZE,
);

let dst = dst.add(SAMPLE_BUFFER_SIZE);
let adc1 = &adc_samples[1][0] as *const u16;
core::ptr::copy_nonoverlapping(
adc1,
dst,
SAMPLE_BUFFER_SIZE,
);

let dst = dst.add(SAMPLE_BUFFER_SIZE);
let dac0 = &dac_samples[0][0] as *const u16;
core::ptr::copy_nonoverlapping(
dac0,
dst,
SAMPLE_BUFFER_SIZE,
);

let dst = dst.add(SAMPLE_BUFFER_SIZE);
let dac1 = &dac_samples[1][0] as *const u16;
core::ptr::copy_nonoverlapping(
dac1,
dst,
SAMPLE_BUFFER_SIZE,
);
},
);
ryan-summers marked this conversation as resolved.
Show resolved Hide resolved

// Update telemetry measurements.
telemetry.adcs =
Expand Down
45 changes: 41 additions & 4 deletions src/bin/lockin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use stabilizer::{
adc::{Adc0Input, Adc1Input, AdcCode},
afe::Gain,
dac::{Dac0Output, Dac1Output, DacCode},
design_parameters::SAMPLE_BUFFER_SIZE,
embedded_hal::digital::v2::InputPin,
hal,
input_stamper::InputStamper,
Expand All @@ -51,7 +52,7 @@ use stabilizer::{
DigitalInput0, DigitalInput1, AFE0, AFE1,
},
net::{
data_stream::{BlockGenerator, StreamTarget},
data_stream::{FrameGenerator, StreamFormat, StreamTarget},
miniconf::Miniconf,
serde::Deserialize,
telemetry::{Telemetry, TelemetryBuffer},
Expand Down Expand Up @@ -208,7 +209,7 @@ const APP: () = {
settings: Settings,
telemetry: TelemetryBuffer,
digital_inputs: (DigitalInput0, DigitalInput1),
generator: BlockGenerator,
generator: FrameGenerator,
signal_generator: signal_generator::SignalGenerator,

timestamper: InputStamper,
Expand Down Expand Up @@ -394,8 +395,44 @@ const APP: () = {
}
}

// Stream data
generator.send(&adc_samples, &dac_samples);
// Stream the data.
generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>(
StreamFormat::AdcDacData,
|buf| unsafe {
let dst = buf.as_ptr() as usize as *mut u16;

let adc0 = &adc_samples[0][0] as *const u16;
core::ptr::copy_nonoverlapping(
adc0,
dst,
SAMPLE_BUFFER_SIZE,
);

let dst = dst.add(SAMPLE_BUFFER_SIZE);
let adc1 = &adc_samples[1][0] as *const u16;
core::ptr::copy_nonoverlapping(
adc1,
dst,
SAMPLE_BUFFER_SIZE,
);

let dst = dst.add(SAMPLE_BUFFER_SIZE);
let dac0 = &dac_samples[0][0] as *const u16;
core::ptr::copy_nonoverlapping(
dac0,
dst,
SAMPLE_BUFFER_SIZE,
);

let dst = dst.add(SAMPLE_BUFFER_SIZE);
let dac1 = &dac_samples[1][0] as *const u16;
core::ptr::copy_nonoverlapping(
dac1,
dst,
SAMPLE_BUFFER_SIZE,
);
},
);

// Update telemetry measurements.
telemetry.adcs =
Expand Down
9 changes: 0 additions & 9 deletions src/hardware/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ pub struct NetStorage {
[Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8],
pub routes_cache:
[Option<(smoltcp::wire::IpCidr, smoltcp::iface::Route)>; 8],

pub dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata; 1],
pub dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata; 1],
pub dhcp_tx_storage: [u8; 600],
pub dhcp_rx_storage: [u8; 600],
}

pub struct UdpSocketStorage {
Expand Down Expand Up @@ -94,10 +89,6 @@ impl Default for NetStorage {
sockets: [None, None, None, None, None, None],
tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS],
udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS],
dhcp_tx_storage: [0; 600],
dhcp_rx_storage: [0; 600],
dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
}
}
}
Expand Down
Loading