From df210983dc2dff1985da97dfc76d050cf1b82ae0 Mon Sep 17 00:00:00 2001 From: Misko Date: Sun, 17 Mar 2024 21:31:35 -0700 Subject: [PATCH] fix fake radio --- spf/data_collector.py | 195 ++++++++++++++++++++++++++++---- spf/mavlink_radio_collection.py | 30 ++--- tests/manual_tests.sh | 2 +- tests/rover_config.yaml | 20 +++- 4 files changed, 205 insertions(+), 42 deletions(-) diff --git a/spf/data_collector.py b/spf/data_collector.py index 67b858a2..ebb208d5 100644 --- a/spf/data_collector.py +++ b/spf/data_collector.py @@ -52,7 +52,7 @@ def prepare_record_entry_v3(ds: DataSnapshot, current_pos_heading_and_time): # _z = struct.unpack("d", struct.pack("ff", a, b))[0] return np.hstack( [ - ds.timestamp, + ds.timestamp, # 1 gps_time_1, # 1 gps_time_2, # 1 current_pos_heading_and_time["gps"], # 2 @@ -84,6 +84,109 @@ def prepare_record_entry_v2(ds: DataSnapshot, rx_pos: np.array, tx_pos: np.array ) +class FakeThreadedRX: + def __init__( + self, + thread_idx, + time_offset, + nthetas, + rx_config, + records_per_second=20, + ): + self.thread_idx = thread_idx + self.read_lock = threading.Lock() + self.ready_lock = threading.Lock() + self.ready_lock.acquire() + self.run = False + self.time_offset = time_offset + self.nthetas = nthetas + self.rx_config = rx_config + self.records_per_second = records_per_second + + def join(self): + self.t.join() + + def start_read_thread(self): + self.t = threading.Thread(target=self.read_forever, daemon=True) + self.run = True + self.t.start() + + def random_signal_matrix(self): + return np.random.uniform( + -1, 1, (self.rx_config.buffer_size,) + ) + 1.0j * np.random.uniform(-1, 1, (self.rx_config.buffer_size,)) + + def read_forever(self): + logging.info(f"{self.thread_idx} fake read_forever()") + steering_vectors = precompute_steering_vectors( + receiver_positions=self.rx_config.rx_pos, + carrier_frequency=self.rx_config.lo, + spacing=self.nthetas, + ) + + while self.run: + if self.read_lock.acquire(blocking=True, timeout=0.5): + # got the semaphore, read some data! + + tries = 0 + try: + signal_matrix = [self.random_signal_matrix() for x in range(2)] + rssis = np.random.rand( + 2, + ) + gains = np.random.rand( + 2, + ) + time.sleep(1.0 / self.records_per_second) + # rssi_and_gain = self.pplus.get_rssi_and_gain() + except Exception as e: + logging.error( + f"Failed to receive RX data! removing file : retry {tries} {e}", + ) + time.sleep(0.1) + tries += 1 + if tries > 15: + logging.error("GIVE UP") + sys.exit(1) + + # process the data + current_time = time.time() - self.time_offset # timestamp + # _, beam_sds, _ = beamformer( + # self.pplus.rx_config.rx_pos, + # signal_matrix, + # self.pplus.rx_config.lo, + # spacing=self.nthetas, + # ) + signal_matrix = np.vstack(signal_matrix) + beam_sds = beamformer_given_steering( + steering_vectors=steering_vectors, signal_matrix=signal_matrix + ) + # assert np.isclose(beam_sds, beam_sds2).all() + + avg_phase_diff = get_avg_phase(signal_matrix) + + self.data = DataSnapshot( + timestamp=current_time, + rx_center_pos=self.rx_config.rx_spacing, + rx_theta_in_pis=self.rx_config.rx_theta_in_pis, + rx_spacing=self.rx_config.rx_spacing, + beam_sds=beam_sds, + avg_phase_diff=avg_phase_diff, + signal_matrix=None, + rssis=rssis, + gains=gains, + ) + + try: + self.ready_lock.release() # tell the parent we are ready to provide + except Exception as e: + logging.error(f"Thread encountered an issue exiting {str(e)}") + self.run = False + # logging.info(f"{self.pplus.rx_config.uri} READY") + + logging.info(f"{str(self.rx_config.uri)} PPlus read_forever() exit!") + + class ThreadedRX: def __init__(self, pplus: PPlus, time_offset, nthetas): self.pplus = pplus @@ -169,7 +272,10 @@ def read_forever(self): class DataCollector: - def __init__(self, yaml_config, filename_npy, position_controller, tag=""): + def __init__( + self, yaml_config, filename_npy, position_controller, column_names, tag="" + ): + self.column_names = column_names self.yaml_config = yaml_config self.filename_npy = filename_npy Path(self.filename_npy).touch() @@ -177,6 +283,19 @@ def __init__(self, yaml_config, filename_npy, position_controller, tag=""): self.position_controller = position_controller self.finished_collecting = False + # record matrix + if not self.yaml_config["dry-run"]: + self.record_matrix = np.memmap( + self.filename_npy, + dtype="float32", + mode="w+", + shape=( + 2, # TODO should be nreceivers + self.yaml_config["n-records-per-receiver"], + len(self.column_names), + ), # t,tx,ty,rx,ry,rtheta,rspacing / avg1,avg2 / sds + ) + def radios_to_online(self): # lets open all the radios radio_uris = [] @@ -316,21 +435,11 @@ def run_collector_thread(self): class DroneDataCollector(DataCollector): def __init__(self, *args, **kwargs): - super(DroneDataCollector, self).__init__(*args, **kwargs) - self.column_names = v3rx_column_names(nthetas=self.yaml_config["n-thetas"]) - - # record matrix - if not self.yaml_config["dry-run"]: - self.record_matrix = np.memmap( - self.filename_npy, - dtype="float32", - mode="w+", - shape=( - 2, # TODO should be nreceivers - self.yaml_config["n-records-per-receiver"], - len(self.column_names), - ), # t,tx,ty,rx,ry,rtheta,rspacing / avg1,avg2 / sds - ) + super(DroneDataCollector, self).__init__( + *args, + column_names=v3rx_column_names(nthetas=kwargs["yaml_config"]["n-thetas"]), + **kwargs, + ) def write_to_record_matrix(self, thread_idx, record_idx, read_thread: ThreadedRX): current_pos_heading_and_time = ( @@ -345,10 +454,15 @@ def write_to_record_matrix(self, thread_idx, record_idx, read_thread: ThreadedRX class FakeDroneDataCollector(DataCollector): def __init__(self, *args, **kwargs): - super(FakeDroneDataCollector, self).__init__(*args, **kwargs) - self.column_names = v3rx_column_names(nthetas=self.yaml_config["n-thetas"]) + super(FakeDroneDataCollector, self).__init__( + *args, + column_names=v3rx_column_names(nthetas=kwargs["yaml_config"]["n-thetas"]), + **kwargs, + ) - def write_to_record_matrix(self, thread_idx, record_idx, read_thread: ThreadedRX): + def write_to_record_matrix( + self, thread_idx, record_idx, read_thread: FakeThreadedRX + ): self.record_matrix[thread_idx, record_idx] = prepare_record_entry_v3( ds=read_thread.data, current_pos_heading_and_time={ @@ -358,6 +472,47 @@ def write_to_record_matrix(self, thread_idx, record_idx, read_thread: ThreadedRX }, ) + def radios_to_online(self): + rx_configs = [] + for receiver in self.yaml_config["receivers"]: + rx_config = ReceiverConfig( + lo=receiver["f-carrier"], + rf_bandwidth=receiver["bandwidth"], + sample_rate=receiver["f-sampling"], + gains=[receiver["rx-gain"], receiver["rx-gain"]], + gain_control_modes=[ + receiver["rx-gain-mode"], + receiver["rx-gain-mode"], + ], + enabled_channels=[0, 1], + buffer_size=receiver["buffer-size"], + intermediate=receiver["f-intermediate"], + uri=receiver["receiver-uri"], + rx_spacing=receiver["antenna-spacing-m"], + rx_theta_in_pis=receiver["theta-in-pis"], + motor_channel=( + receiver["motor_channel"] if "motor_channel" in receiver else None + ), + rx_buffers=receiver["rx-buffers"], + ) + rx_configs.append(rx_config) + + self.read_threads = [] + time_offset = time.time() + for receiver_idx, rx_config in enumerate(rx_configs): + read_thread = FakeThreadedRX( + thread_idx=receiver_idx, + rx_config=rx_config, + time_offset=time_offset, + nthetas=self.yaml_config["n-thetas"], + ) + read_thread.start_read_thread() + self.read_threads.append(read_thread) + + self.collector_thread = threading.Thread( + target=self.run_collector_thread, daemon=True + ) + class GrblDataCollector(DataCollector): def __init__(self, *args, **kwargs): diff --git a/spf/mavlink_radio_collection.py b/spf/mavlink_radio_collection.py index 4335c233..a12b37e4 100644 --- a/spf/mavlink_radio_collection.py +++ b/spf/mavlink_radio_collection.py @@ -181,12 +181,6 @@ def filenames_from_time_in_seconds(time_in_seconds, temp_dir_name, yaml_config): ) drone.start() - - data_collector = DroneDataCollector( - filename_npy=temp_filenames["npy"], - yaml_config=yaml_config, - position_controller=drone, - ) else: drone = Drone( None, @@ -194,6 +188,14 @@ def filenames_from_time_in_seconds(time_in_seconds, temp_dir_name, yaml_config): distance_finder=distance_finder, fake=True, ) + + if not args.fake_radio: + data_collector = DroneDataCollector( + filename_npy=temp_filenames["npy"], + yaml_config=yaml_config, + position_controller=drone, + ) + else: data_collector = FakeDroneDataCollector( filename_npy=temp_filenames["npy"], yaml_config=yaml_config, @@ -201,8 +203,7 @@ def filenames_from_time_in_seconds(time_in_seconds, temp_dir_name, yaml_config): ) logging.info("MavRadioCollection: Radios online...") - if not args.fake_radio: - data_collector.radios_to_online() # blocking + data_collector.radios_to_online() # blocking while not args.fake_drone and not drone.has_planner_started_moving(): logging.info( @@ -225,16 +226,9 @@ def filenames_from_time_in_seconds(time_in_seconds, temp_dir_name, yaml_config): f"MavRadioCollection: Current system time: {system_time} current gps time {gps_time}" ) - if not args.fake_radio: - data_collector.start() - while data_collector.is_collecting(): - time.sleep(5) - else: - if args.run_for_seconds == 0: - while not args.exit: - time.sleep(5) - else: - time.sleep(args.run_for_seconds) + data_collector.start() + while data_collector.is_collecting(): + time.sleep(5) # we finished lets move files out to final positions diff --git a/tests/manual_tests.sh b/tests/manual_tests.sh index 1f2e0c96..b1b66367 100644 --- a/tests/manual_tests.sh +++ b/tests/manual_tests.sh @@ -14,7 +14,7 @@ function mavlink_controller() { } function mavlink_radio_collection () { - $py ${repo_root}/spf/mavlink_radio_collection.py -c ${repo_root}/tests/rover_config.yaml -m ${repo_root}/tests/device_mapping --fake-radio $@ > mavlink_radio_collection.log 2>&1 + $py ${repo_root}/spf/mavlink_radio_collection.py -c ${repo_root}/tests/rover_config.yaml -m ${repo_root}/tests/device_mapping --fake-radio $@ > mavlink_radio_collection.log -n 500 2>&1 } echo "Test time since boot + reboot" diff --git a/tests/rover_config.yaml b/tests/rover_config.yaml index d9d53169..d33eada5 100644 --- a/tests/rover_config.yaml +++ b/tests/rover_config.yaml @@ -15,7 +15,7 @@ distance-finder: #USB 2 (Radio A) | USB 1 (Radio B) receivers: #radio A - - receiver-uri: usb:1.1.5 + - receiver-port: 2 theta-in-pis: 0 antenna-spacing-m: 0.05075 # 50.75 mm nelements: 2 @@ -28,13 +28,27 @@ receivers: f-carrier: 2467000000 #2.5e9 f-sampling: 16000000 # 16.0e6 bandwidth: 300000 #3.0e5 + #radio B + - receiver-port: 1 + theta-in-pis: 0.5 + antenna-spacing-m: 0.05075 # 50.75 mm + nelements: 2 + array-type: linear + rx-gain-mode: fast_attack + rx-buffers: 4 + rx-gain: -3 + buffer-size: 4096 # 2**12 + f-intermediate: 100000 #1.0e5 + f-carrier: 2467000000 #2.5e9 + f-sampling: 16000000 # 16.0e6 + bandwidth: 300000 #3.0e5 n-thetas: 65 -n-records-per-receiver: 2000 +n-records-per-receiver: 500 width: 4000 calibration-frames: 800 routine: null drone-uri: tcp:127.0.0.1:14591 -dry-run: False +dry-run: False \ No newline at end of file