Skip to content

Commit

Permalink
making space for v4 data format
Browse files Browse the repository at this point in the history
  • Loading branch information
misko committed Mar 24, 2024
1 parent 59191c2 commit 722feac
Show file tree
Hide file tree
Showing 10 changed files with 476 additions and 173 deletions.
274 changes: 110 additions & 164 deletions spf/data_collector.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
import argparse
import json
import logging
import os
import struct
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Optional

import numpy as np
import yaml
from attr import dataclass
from tqdm import tqdm

from spf.dataset.rover_idxs import v3rx_column_names
from spf.dataset.v4_data import v4rx_new_dataset
from spf.dataset.wall_array_v2_idxs import v2_column_names
from spf.gps.boundaries import franklin_safe
from spf.rf import beamformer_given_steering, precompute_steering_vectors
from spf.sdrpluto.sdr_controller import (
EmitterConfig,
Expand Down Expand Up @@ -209,67 +203,53 @@ def start_read_thread(self):
self.run = True
self.t.start()

def get_data(self):
def get_rx(self, max_retries=15):
tries = 0
try:
signal_matrix = self.pplus.sdr.rx()
rssis = self.pplus.rssis()
gains = self.pplus.gains()
# rssi_and_gain = self.pplus.get_rssi_and_gain()
except Exception as e:
logging.error(
f"Failed to receive RX data! removing file : retry {tries} {e}",
)
time.sleep(0.1)
tries += 1
if tries > 15:
logging.error("GIVE UP")
sys.exit(1)
while tries < max_retries:
try:
signal_matrix = self.pplus.sdr.rx()
rssis = self.pplus.rssis()
gains = self.pplus.gains()
# rssi_and_gain = self.pplus.get_rssi_and_gain()
return {"signal_matrix": signal_matrix, "rssis": rssis, "gains": gains}
except Exception as e:
logging.error(
f"Failed to receive RX data! removing file : retry {tries} {e}",
)
time.sleep(0.1)
tries += 1
if tries > max_retries:
logging.error("GIVE UP")
sys.exit(1)
return None

def get_data(self):
sdr_rx = self.get_rx()

# process the data
signal_matrix = np.vstack(signal_matrix)
signal_matrix = np.vstack(sdr_rx["signal_matrix"])
current_time = time.time() - self.time_offset # timestamp

self.data = data_to_snapshot(
current_time=current_time,
signal_matrix=signal_matrix,
steering_vectors=self.steering_vectors,
rssis=rssis,
gains=gains,
rssis=sdr_rx(["rssis"]),
gains=sdr_rx(["gains"]),
rx_config=self.pplus.rx_config,
)

# self.data = {
# "current_time": current_time,
# "signal_matrix": signal_matrix,
# "steering_vectors": steering_vectors,
# }


class DataCollector:
def __init__(
self, yaml_config, filename_npy, position_controller, column_names, tag=""
):
self.column_names = column_names
def __init__(self, yaml_config, data_filename, position_controller, tag=""):
self.yaml_config = yaml_config
self.filename_npy = filename_npy
Path(self.filename_npy).touch()
self.data_filename = data_filename
#
self.record_matrix = None
self.position_controller = position_controller
self.finished_collecting = False

# record matrix
if not self.yaml_config["dry-run"]:
self.record_matrix = np.memmap(
self.filename_npy,
dtype="float32",
mode="w+",
shape=(
2, # TODO should be nreceivers
self.yaml_config["n-records-per-receiver"],
len(self.column_names),
), # t,tx,ty,rx,ry,rtheta,rspacing / avg1,avg2 / sds
)
self.setup_record_matrix()

def radios_to_online(self):
# lets open all the radios
Expand Down Expand Up @@ -386,6 +366,9 @@ def done(self):
def is_collecting(self):
return not self.finished_collecting

def setup_record_matrix(self):
raise NotImplementedError

def write_to_record_matrix(self, thread_idx, record_idx, read_thread: ThreadedRX):
raise NotImplementedError

Expand Down Expand Up @@ -414,14 +397,63 @@ def run_collector_thread(self):
read_thread.join()


class DroneDataCollectorChunked(DataCollector):
def __init__(self, *args, **kwargs):
super(DroneDataCollectorChunked, self).__init__(
*args,
**kwargs,
)

def setup_record_matrix(self):
# make sure all receivers are sharing a common buffer size
buffer_size = None
for receiver in self.yaml_config["receivers"]:
assert "buffer_size" in receiver
if buffer_size is None:
buffer_size = self.yaml_config["buffer_size"]
else:
assert buffer_size == self.yaml_config["buffer_size"]
# record matrix
self.zarr = v4rx_new_dataset(
self.data_filename,
self.yaml_config["timesteps"],
buffer_size,
len(self.yaml_config["receivers"]),
chunk_size=4096,
compressor=None,
)

# def write_to_record_matrix(self, thread_idx, record_idx, data):
# current_pos_heading_and_time = (
# self.position_controller.get_position_bearing_and_time()
# )

# self.record_matrix[thread_idx, record_idx] = prepare_record_entry_v3(
# ds=data,
# current_pos_heading_and_time=current_pos_heading_and_time,
# )


class DroneDataCollector(DataCollector):
def __init__(self, *args, **kwargs):
super(DroneDataCollector, self).__init__(
*args,
column_names=v3rx_column_names(nthetas=kwargs["yaml_config"]["n-thetas"]),
**kwargs,
)

def setup_record_matrix(self):
# record matrix
self.record_matrix = np.memmap(
self.data_filename,
dtype="float32",
mode="w+",
shape=(
2, # TODO should be nreceivers
self.yaml_config["n-records-per-receiver"],
v3rx_column_names(nthetas=self.yaml_config["n-thetas"]),
), # t,tx,ty,rx,ry,rtheta,rspacing / avg1,avg2 / sds
)

def write_to_record_matrix(self, thread_idx, record_idx, data):
current_pos_heading_and_time = (
self.position_controller.get_position_bearing_and_time()
Expand All @@ -437,10 +469,24 @@ class FakeDroneDataCollector(DataCollector):
def __init__(self, *args, **kwargs):
super(FakeDroneDataCollector, self).__init__(
*args,
column_names=v3rx_column_names(nthetas=kwargs["yaml_config"]["n-thetas"]),
**kwargs,
)

def setup_record_matrix(self):
# record matrix
self.record_matrix = np.memmap(
self.data_filename,
dtype="float32",
mode="w+",
shape=(
2, # TODO should be nreceivers
self.yaml_config["n-records-per-receiver"],
len(
v3rx_column_names(nthetas=self.yaml_config["n-thetas"]),
), # t,tx,ty,rx,ry,rtheta,rspacing / avg1,avg2 / sds
),
)

def write_to_record_matrix(self, thread_idx, record_idx, data):
self.record_matrix[thread_idx, record_idx] = prepare_record_entry_v3(
ds=data,
Expand Down Expand Up @@ -497,10 +543,22 @@ class GrblDataCollector(DataCollector):
def __init__(self, *args, **kwargs):
super(GrblDataCollector, self).__init__(
*args,
column_names=v2_column_names(nthetas=kwargs["yaml_config"]["n-thetas"]),
**kwargs,
)

def setup_record_matrix(self):
# record matrix
self.record_matrix = np.memmap(
self.data_filename,
dtype="float32",
mode="w+",
shape=(
2, # TODO should be nreceivers
self.yaml_config["n-records-per-receiver"],
v2_column_names(nthetas=self.yaml_config["n-thetas"]),
), # t,tx,ty,rx,ry,rtheta,rspacing / avg1,avg2 / sds
)

def write_to_record_matrix(self, thread_idx, record_idx, data):
tx_pos = self.position_controller.controller.position["xy"][
self.yaml_config["emitter"]["motor_channel"]
Expand All @@ -512,115 +570,3 @@ def write_to_record_matrix(self, thread_idx, record_idx, data):
self.record_matrix[thread_idx, record_idx] = prepare_record_entry_v2(
ds=data, rx_pos=rx_pos, tx_pos=tx_pos
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--yaml-config",
type=str,
help="YAML config file",
required=True,
)
parser.add_argument(
"-t", "--tag", type=str, help="tag files", required=False, default=""
)
parser.add_argument(
"--tx-gain", type=int, help="tag files", required=False, default=None
)
parser.add_argument(
"-l",
"--logging-level",
type=str,
help="Logging level",
default="INFO",
required=False,
)
parser.add_argument(
"-m",
"--device-mapping",
type=str,
help="Device mapping file",
default=None,
required=True,
)
args = parser.parse_args()

run_started_at = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
# read YAML
with open(args.yaml_config, "r") as stream:
yaml_config = yaml.safe_load(stream)

# open device mapping and figure out URIs
with open(args.device_mapping, "r") as device_mapping:
port_to_uri = {
int(mapping[0]): f"usb:1.{mapping[1]}.5"
for mapping in [line.strip().split() for line in device_mapping]
}

for receiver in yaml_config["receivers"] + [yaml_config["emitter"]]:
if "receiver-port" in receiver:
receiver["receiver-uri"] = port_to_uri[receiver["receiver-port"]]
if "emitter-port" in yaml_config["emitter"]:
yaml_config["emitter"]["emitter-uri"] = port_to_uri[
yaml_config["emitter"]["emitter-port"]
]

if args.tx_gain is not None:
assert yaml_config["emitter"]["type"] == "sdr"
yaml_config["emitter"]["tx-gain"] = args.tx_gain

output_files_prefix = f"rover_{run_started_at}_nRX{len(yaml_config['receivers'])}_{yaml_config['routine']}"
if args.tag != "":
output_files_prefix += f"_tag_{args.tag}"

# setup filename
# tmpdir = tempfile.TemporaryDirectory()
# temp_dir_name = tmpdir.name
temp_dir_name = "./"
filename_log = f"{temp_dir_name}/{output_files_prefix}.log.tmp"
filename_yaml = f"{temp_dir_name}/{output_files_prefix}.yaml.tmp"
filename_npy = f"{temp_dir_name}/{output_files_prefix}.npy.tmp"
temp_filenames = [filename_log, filename_yaml, filename_npy]
final_filenames = [x.replace(".tmp", "") for x in temp_filenames]

logger = logging.getLogger(__name__)

# setup logging
handlers = [
logging.StreamHandler(),
logging.FileHandler(filename_log),
]
logging.basicConfig(
handlers=handlers,
format="%(asctime)s:%(levelname)s:%(message)s",
level=getattr(logging, args.logging_level.upper(), None),
)

# make a copy of the YAML
with open(filename_yaml, "w") as outfile:
yaml.dump(yaml_config, outfile, default_flow_style=False)

logging.info(json.dumps(yaml_config, sort_keys=True, indent=4))

boundary = franklin_safe

logging.info("Starting data collector...")
data_collector = FakeDroneDataCollector(
filename_npy=filename_npy, yaml_config=yaml_config, position_controller=None
)
data_collector.radios_to_online() # blocking

if len(yaml_config["receivers"]) == 0:
logging.info("EMITTER ONLINE!")
while True:
time.sleep(5)
else:
data_collector.start()
while data_collector.is_collecting():
time.sleep(5)

# we finished lets move files out to final positions
for idx in range(len(temp_filenames)):
os.rename(temp_filenames[idx], final_filenames[idx])
Loading

0 comments on commit 722feac

Please sign in to comment.