diff --git a/README.md b/README.md index 09be82b..78b97a0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Build Status](https://travis-ci.org/CINPLA/pyopenephys.svg?branch=master)](https://travis-ci.org/CINPLA/pyopenephys) +[![Build Status](https://github.com/CINPLA/pyopenephys/actions/workflows/python-package.yml/badge.svg)](https://github.com/CINPLA/pyopenephys/actions/workflows/python-package.yml/badge.svg) [![Project Status: Active - The project has reached a stable, usable state and is being actively developed.](http://www.repostatus.org/badges/latest/active.svg)](http://www.repostatus.org/#active) # pyopenephys diff --git a/pyopenephys/core.py b/pyopenephys/core.py index e4b1aa5..e123e3c 100644 --- a/pyopenephys/core.py +++ b/pyopenephys/core.py @@ -15,21 +15,37 @@ import struct import platform import xmltodict -from distutils.version import LooseVersion +from packaging.version import parse from pathlib import Path import warnings import json from natsort import natsorted import re -from .tools import * -from .openephys_tools import * +from .openephys_tools import loadContinuous, loadEvents, loadSpikes +from .tools import read_analog_binary_signals, clip_times, clip_anas, clip_events, clip_spiketrains, clip_tracking # For settings.xml files prior to 0.4.x, the sampling rate was an enumeration of these options -_enumerated_sample_rates = (1000, 1250, 1500, 2000, 2500, 3000, 1e4 / 3, - 4000, 5000, 6250, 8000, 10000, 12500, 15000, - 20000, 25000, 30000) +_enumerated_sample_rates = ( + 1000, + 1250, + 1500, + 2000, + 2500, + 3000, + 1e4 / 3, + 4000, + 5000, + 6250, + 8000, + 10000, + 12500, + 15000, + 20000, + 25000, + 30000, +) class AnalogSignal: @@ -42,9 +58,7 @@ def __init__(self, channel_ids, signal, times, gains, channel_names=None, sample self.channel_names = channel_names def __str__(self): - return "".format( - self.signal.shape - ) + return "".format(self.signal.shape) class TrackingData: @@ -58,9 +72,7 @@ def __init__(self, times, x, y, width, height, channels, metadata): self.metadata = metadata def __str__(self): - return "".format( - self.times.shape, self.x.shape - ) + return "".format(self.times.shape, self.x.shape) class EventData: @@ -88,8 +100,7 @@ def __str__(self): class SpikeTrain: - def __init__(self, times, waveforms, - electrode_indices, cluster, metadata): + def __init__(self, times, waveforms, electrode_indices, cluster, metadata): assert len(waveforms.shape) == 3 or len(waveforms.shape) == 2 self.times = times self.waveforms = waveforms @@ -130,47 +141,54 @@ class File: Class for reading experimental data from an OpenEphys dataset. """ - def __init__(self, foldername): + def __init__(self, foldername, verbose=False): self._absolute_foldername = foldername self._path, self.relative_foldername = os.path.split(foldername) + self._verbose = verbose # figure out format files = [f for f in sorted(os.listdir(self._absolute_foldername))] - if np.any([f.startswith('Continuous') for f in files]): - self.format = 'openephys' - cont_files = [f for f in sorted(os.listdir(self._absolute_foldername)) - if f.startswith('Continuous')] + if np.any([f.startswith("Continuous") for f in files]): + self.format = "openephys" + cont_files = [f for f in sorted(os.listdir(self._absolute_foldername)) if f.startswith("Continuous")] exp_ids = [] for con in cont_files: - if len(con.split('_')) == 2: + if len(con.split("_")) == 2: exp_ids.append(1) else: - exp_ids.append(int(con.split('_')[-1][0])) + exp_ids.append(int(con.split("_")[-1][0])) self._experiments = [] for id in exp_ids: - self._experiments.append(Experiment(self._absolute_foldername, id, self)) - - elif np.any([f.startswith('experiment') for f in files]): - self.format = 'binary' - experiments_names = [f for f in sorted(os.listdir(self._absolute_foldername)) - if os.path.isdir(op.join(self._absolute_foldername, f)) - and 'experiment' in f] + self._experiments.append(Experiment(self._absolute_foldername, id, self, verbose=verbose)) + + elif np.any([f.startswith("experiment") for f in files]): + self.format = "binary" + experiments_names = [ + f + for f in sorted(os.listdir(self._absolute_foldername)) + if os.path.isdir(op.join(self._absolute_foldername, f)) and "experiment" in f + ] exp_ids = [int(exp[-1]) for exp in experiments_names] self._experiments = [] - for (rel_path, id) in zip(experiments_names, exp_ids): + for rel_path, id in zip(experiments_names, exp_ids): self._experiments.append(Experiment(op.join(self._absolute_foldername, rel_path), id, self)) - elif list(Path(self._absolute_foldername).rglob('structure.oebin')): + elif list(Path(self._absolute_foldername).rglob("structure.oebin")): # 'binary' format could also be detected with the existence of `structure.oebin` and `continuous` folder under recordings - oebin_files = list(Path(self._absolute_foldername).rglob('structure.oebin')) - if not np.all([(oebin_file.parent / 'continuous').exists() for oebin_file in oebin_files]): - raise FileNotFoundError(f"Could not find paired 'continuous' file for each oebin in {oebin_file.parent}") + oebin_files = list(Path(self._absolute_foldername).rglob("structure.oebin")) + if not np.all([(oebin_file.parent / "continuous").exists() for oebin_file in oebin_files]): + raise FileNotFoundError( + f"Could not find paired 'continuous' file for each oebin in {oebin_files[0].parent}" + ) - self.format = 'binary' + self.format = "binary" experiments_names = sorted(set([oebin_file.parent.parent.name for oebin_file in oebin_files])) - exp_ids = [int(exp[-1]) if exp.startswith('experiment') else exp_idx for exp_idx, exp in enumerate(experiments_names)] + exp_ids = [ + int(exp[-1]) if exp.startswith("experiment") else exp_idx + for exp_idx, exp in enumerate(experiments_names) + ] self._experiments = [] - for (rel_path, id) in zip(experiments_names, exp_ids): + for rel_path, id in zip(experiments_names, exp_ids): self._experiments.append(Experiment(op.join(self._absolute_foldername, rel_path), id, self)) else: raise Exception("Only 'binary' and 'openephys' format are supported by pyopenephys") @@ -189,7 +207,7 @@ def experiments(self): class Experiment: - def __init__(self, path, id, file): + def __init__(self, path, id, file, verbose=False): self.file = file self.id = id self.sig_chain = dict() @@ -197,41 +215,53 @@ def __init__(self, path, id, file): self._recordings = [] self.settings = None self.acquisition_system = None + self._verbose = verbose - if self.file.format == 'openephys': + if self.file.format == "openephys": self._path = self._absolute_foldername self._read_settings(id) # retrieve number of recordings if self.acquisition_system is not None: if self.id == 1: - contFile = [f for f in os.listdir(self._absolute_foldername) if 'continuous' in f and 'CH' in f - and len(f.split('_')) == 2][0] + contFile = [ + f + for f in os.listdir(self._absolute_foldername) + if "continuous" in f and "CH" in f and len(f.split("_")) == 2 + ][0] else: - contFile = [f for f in os.listdir(self._absolute_foldername) if 'continuous' in f and 'CH' in f - and '_' + str(self.id) in f][0] + contFile = [ + f + for f in os.listdir(self._absolute_foldername) + if "continuous" in f and "CH" in f and "_" + str(self.id) in f + ][0] data = loadContinuous(op.join(self._absolute_foldername, contFile)) - rec_ids = np.unique(data['recordingNumber']) + rec_ids = np.unique(data["recordingNumber"]) for rec_id in rec_ids: - self._recordings.append(Recording(self._absolute_foldername, int(rec_id), self)) + self._recordings.append(Recording(self._absolute_foldername, int(rec_id), self, verbose=verbose)) else: - self._recordings.append(Recording(self._absolute_foldername, int(self.id), self)) + self._recordings.append(Recording(self._absolute_foldername, int(self.id), self, verbose=verbose)) - elif self.file.format == 'binary': - if (Path(path) / 'settings.xml').exists(): + elif self.file.format == "binary": + if (Path(path) / "settings.xml").exists(): self._path = path self._read_settings(1) else: self._path = op.dirname(path) self._read_settings(id) - recording_names = natsorted([f for f in os.listdir(self._absolute_foldername) - if os.path.isdir(op.join(self._absolute_foldername, f)) - and 'recording' in f]) + recording_names = natsorted( + [ + f + for f in os.listdir(self._absolute_foldername) + if os.path.isdir(op.join(self._absolute_foldername, f)) and "recording" in f + ] + ) rec_ids = [int(rec[-1]) for rec in recording_names] - for (rel_path, id) in zip(recording_names, rec_ids): - self._recordings.append(Recording(op.join(self._absolute_foldername, rel_path), id, - self)) + for rel_path, id in zip(recording_names, rec_ids): + self._recordings.append( + Recording(op.join(self._absolute_foldername, rel_path), id, self, verbose=verbose) + ) @property def absolute_foldername(self): @@ -250,19 +280,23 @@ def recordings(self): return self._recordings def _read_settings(self, id): - print('Loading Open-Ephys: reading settings...') + if self._verbose: + print("Loading Open-Ephys: reading settings...") if id == 1: - set_fname = [fname for fname in os.listdir(self._path) - if fname == 'settings.xml'] + set_fname = [fname for fname in os.listdir(self._path) if fname == "settings.xml"] else: - set_fname = [fname for fname in os.listdir(self._path) - if fname.startswith('settings') and fname.endswith('.xml') and str(id) in fname] + set_fname = [ + fname + for fname in os.listdir(self._path) + if fname.startswith("settings") and fname.endswith(".xml") and str(id) in fname + ] if not len(set_fname) == 1: - if self.file.format == 'binary': - raise IOError(f'Unique settings file not found in {self._path}') + if self.file.format == "binary": + raise IOError(f"Unique settings file not found in {self._path}") else: - print("settings.xml not found. Can't load signal chain information") + if self._verbose: + print("settings.xml not found. Can't load signal chain information") self._set_fname = None self.sig_chain = None self.setting = None @@ -273,99 +307,102 @@ def _read_settings(self, id): self._set_fname = op.join(self._path, set_fname[0]) with open(self._set_fname) as f: xmldata = f.read() - self.settings = xmltodict.parse(xmldata)['SETTINGS'] - is_v4 = LooseVersion(self.settings['INFO']['VERSION']) >= LooseVersion('0.4.0.0') - is_v6 = LooseVersion(self.settings['INFO']['VERSION']) >= LooseVersion('0.6.0') + self.settings = xmltodict.parse(xmldata)["SETTINGS"] + is_v4 = parse(self.settings["INFO"]["VERSION"]) >= parse("0.4.0.0") + is_v6 = parse(self.settings["INFO"]["VERSION"]) >= parse("0.6.0") # read date in US format - if platform.system() == 'Windows': - locale.setlocale(locale.LC_ALL, 'english') - elif platform.system() == 'Darwin': + if platform.system() == "Windows": + locale.setlocale(locale.LC_ALL, "english") + elif platform.system() == "Darwin": # bad hack... try: - locale.setlocale(locale.LC_ALL, 'en_US.UTF8') + locale.setlocale(locale.LC_ALL, "en_US.UTF8") except Exception: pass else: - locale.setlocale(locale.LC_ALL, 'en_US.UTF8') - self._start_datetime = datetime.strptime(self.settings['INFO']['DATE'], '%d %b %Y %H:%M:%S') + locale.setlocale(locale.LC_ALL, "en_US.UTF8") + self._start_datetime = datetime.strptime(self.settings["INFO"]["DATE"], "%d %b %Y %H:%M:%S") self._channel_info = {} self.nchan = 0 - if isinstance(self.settings['SIGNALCHAIN'], list): - sigchain_iter = self.settings['SIGNALCHAIN'] + if isinstance(self.settings["SIGNALCHAIN"], list): + sigchain_iter = self.settings["SIGNALCHAIN"] else: - sigchain_iter = [self.settings['SIGNALCHAIN']] + sigchain_iter = [self.settings["SIGNALCHAIN"]] for sigchain in sigchain_iter: - if isinstance(sigchain['PROCESSOR'], list): - processor_iter = sigchain['PROCESSOR'] + if isinstance(sigchain["PROCESSOR"], list): + processor_iter = sigchain["PROCESSOR"] else: - processor_iter = [sigchain['PROCESSOR']] + processor_iter = [sigchain["PROCESSOR"]] for processor in processor_iter: processor_node_id = processor.get("@nodeId", processor.get("@NodeId")) if processor_node_id is None: raise KeyError('Neither "@nodeId" nor "@NodeId" key found') - self.sig_chain.update({processor['@name']: processor_node_id}) + self.sig_chain.update({processor["@name"]: processor_node_id}) - if is_v6 and 'Neuropix-PXI' in processor['@name']: + if is_v6 and "Neuropix-PXI" in processor["@name"]: # No explicit "is_source" or "is_sink" in v0.6.0+ # no "CHANNELS" details, thus the "gain" has to be inferred elsewhere - self.acquisition_system = processor['@name'].split('/')[-1] - self._channel_info['gain'] = {} + self.acquisition_system = processor["@name"].split("/")[-1] + self._channel_info["gain"] = {} continue if is_v4: - is_source = 'CHANNEL_INFO' in processor.keys() and processor['@isSource'] == '1' - is_source_alt = 'CHANNEL' in processor.keys() and processor['@isSource'] == '1' + is_source = "CHANNEL_INFO" in processor.keys() and processor["@isSource"] == "1" + is_source_alt = "CHANNEL" in processor.keys() and processor["@isSource"] == "1" else: - is_source = 'CHANNEL_INFO' in processor.keys() and 'Source' in processor['@name'] - is_source_alt = 'CHANNEL' in processor.keys() and 'Source' in processor['@name'] + is_source = "CHANNEL_INFO" in processor.keys() and "Source" in processor["@name"] + is_source_alt = "CHANNEL" in processor.keys() and "Source" in processor["@name"] if is_source: # recorder - self.acquisition_system = processor['@name'].split('/')[-1] - self._channel_info['gain'] = {} + self.acquisition_system = processor["@name"].split("/")[-1] + self._channel_info["gain"] = {} # gain for all channels - gain = {ch['@number']: float(ch['@gain']) - for chs in processor['CHANNEL_INFO'].values() - for ch in chs} - for chan in processor['CHANNEL']: - if chan['SELECTIONSTATE']['@record'] == '1': + gain = { + ch["@number"]: float(ch["@gain"]) + for chs in processor["CHANNEL_INFO"].values() + for ch in chs + } + for chan in processor["CHANNEL"]: + if chan["SELECTIONSTATE"]["@record"] == "1": self.nchan += 1 - chnum = chan['@number'] - self._channel_info['gain'][chnum] = gain[chnum] + chnum = chan["@number"] + self._channel_info["gain"][chnum] = gain[chnum] elif is_source_alt: # recorder self._ephys = True - self.acquisition_system = processor['@name'].split('/')[-1] - self._channel_info['gain'] = {} + self.acquisition_system = processor["@name"].split("/")[-1] + self._channel_info["gain"] = {} - for chan in processor['CHANNEL']: - if chan['SELECTIONSTATE']['@record'] == '1': + for chan in processor["CHANNEL"]: + if chan["SELECTIONSTATE"]["@record"] == "1": self.nchan += 1 - chnum = chan['@number'] - self._channel_info['gain'][chnum] = 1 + chnum = chan["@number"] + self._channel_info["gain"][chnum] = 1 # Check openephys format if is_v4: - recorder = self.settings['CONTROLPANEL']['@recordEngine'] + recorder = self.settings["CONTROLPANEL"]["@recordEngine"] else: - recorder_idx = int(self.settings['CONTROLPANEL']['@recordEngine']) - 1 - recorder = self.settings['RECORDENGINES']['ENGINE'][recorder_idx]['@id'] - if recorder == 'OPENEPHYS': - self.format = 'openephys' - elif recorder in ('BINARY', 'RAWBINARY'): - self.format = 'binary' + recorder_idx = int(self.settings["CONTROLPANEL"]["@recordEngine"]) - 1 + recorder = self.settings["RECORDENGINES"]["ENGINE"][recorder_idx]["@id"] + if recorder == "OPENEPHYS": + self.format = "openephys" + elif recorder in ("BINARY", "RAWBINARY"): + self.format = "binary" else: self.format = None - print('Decoding data from ', self.format, ' format') + if self._verbose: + print("Decoding data from ", self.format, " format") if self.acquisition_system is not None: - recorded_channels = sorted([int(chan) for chan in - self._channel_info['gain'].keys()]) - self._channel_info['channels'] = recorded_channels + recorded_channels = sorted([int(chan) for chan in self._channel_info["gain"].keys()]) + self._channel_info["channels"] = recorded_channels + class Recording: - def __init__(self, path, id, experiment): + def __init__(self, path, id, experiment, verbose=False): self.experiment = experiment self.absolute_foldername = Path(path) self.format = experiment.format @@ -373,12 +410,13 @@ def __init__(self, path, id, experiment): self.nchan = experiment.nchan self.sig_chain = experiment.sig_chain self.id = id + self._verbose = verbose self._oebin = None if self.format == "binary": - events_folders = [f for f in self.absolute_foldername.iterdir() if 'events' in f.name] - continuous_folders = [f for f in self.absolute_foldername.iterdir() if 'continuous' in f.name] - spikes_folders = [f for f in self.absolute_foldername.iterdir() if 'spikes' in f.name] + events_folders = [f for f in self.absolute_foldername.iterdir() if "events" in f.name] + continuous_folders = [f for f in self.absolute_foldername.iterdir() if "continuous" in f.name] + spikes_folders = [f for f in self.absolute_foldername.iterdir() if "spikes" in f.name] self._events_folder = None if len(events_folders) == 1: @@ -396,15 +434,18 @@ def __init__(self, path, id, experiment): elif len(spikes_folders) > 1: raise Exception("More than one spikes folder found!") - if LooseVersion(self.experiment.settings['INFO']['VERSION']) >= LooseVersion('0.4.4.0'): - oebin_files = [f for f in self.absolute_foldername.iterdir() if 'oebin' in f.name] + if parse(self.experiment.settings["INFO"]["VERSION"]) >= parse("0.4.4.0"): + oebin_files = [f for f in self.absolute_foldername.iterdir() if "oebin" in f.name] if len(oebin_files) == 1: - print("Reading oebin file") - with oebin_files[0].open('r') as f: + if self._verbose: + print("Reading oebin file") + with oebin_files[0].open("r") as f: self._oebin = json.load(f) elif len(oebin_files) == 0: - raise FileNotFoundError(f"'structure.oebin' file not found in ({self.absolute_foldername})! Impossible to retrieve configuration " - "information") + raise FileNotFoundError( + f"'structure.oebin' file not found in ({self.absolute_foldername})! Impossible to retrieve configuration " + "information" + ) else: raise Exception("Multiple oebin files found. Impossible to retrieve configuration information") @@ -447,7 +488,7 @@ def duration(self): if self.experiment.acquisition_system is not None: self._duration = self.analog_signals[0].times[-1] - self.analog_signals[0].times[0] return self._duration - if 'Sources/Tracking Port' in self.sig_chain.keys(): + if "Sources/Tracking Port" in self.sig_chain.keys(): self._duration = self.tracking[0].times[-1] - self.tracking[0].times[0] return self._duration else: @@ -463,9 +504,11 @@ def sample_rate(self): if np.all([np.isclose(self._processor_sample_rates[0], sr) for sr in self._processor_sample_rates[1:]]): return self._processor_sample_rates[0] * pq.Hz else: - warnings.warn("Multiple streams with different sample rates found. To access the sample rate for " - "each stream use the 'sample_rate' field of the AnalogSignal object. " - "Returning maximum of the first stream") + warnings.warn( + "Multiple streams with different sample rates found. To access the sample rate for " + "each stream use the 'sample_rate' field of the AnalogSignal object. " + "Returning maximum of the first stream" + ) return np.max(self._processor_sample_rates) * pq.Hz else: return self._software_sample_rate * pq.Hz @@ -478,13 +521,16 @@ def start_time(self): if len(self._start_times) == 1: return self._start_times[0] else: - if np.all([np.isclose(self._start_times[0].magnitude, stime.magnitude) - for stime in self._start_times[1:]]): + if np.all( + [np.isclose(self._start_times[0].magnitude, stime.magnitude) for stime in self._start_times[1:]] + ): return self._start_times[0] else: - warnings.warn("Multiple streams with different start times found. To access the sample rate for each " - "stream use the 'start_time' field of the AnalogSignal object." - "Returning start_time of first stream") + warnings.warn( + "Multiple streams with different start times found. To access the sample rate for each " + "stream use the 'start_time' field of the AnalogSignal object." + "Returning start_time of first stream" + ) return self._start_times[0] else: return self._software_start_frame / self._software_sample_rate * pq.s @@ -547,90 +593,89 @@ def _read_sync_message(self): info = dict() stimes = [] - if self.format == 'binary': - sync_messagefile = [f for f in self.absolute_foldername.iterdir() if 'sync_messages' in f.name] + if self.format == "binary": + sync_messagefile = [f for f in self.absolute_foldername.iterdir() if "sync_messages" in f.name] if sync_messagefile: sync_messagefile = sync_messagefile[0] else: warnings.warn(f'No "sync_messages" file found for binary format in {self.absolute_foldername}') return info - elif self.format == 'openephys': + elif self.format == "openephys": if self.experiment.id == 1: - sync_messagefile = self.absolute_foldername / 'messages.events' + sync_messagefile = self.absolute_foldername / "messages.events" else: - sync_messagefile = self.absolute_foldername / f'messages_{self.experiment.id}.events' + sync_messagefile = self.absolute_foldername / f"messages_{self.experiment.id}.events" - is_v4 = LooseVersion(self.experiment.settings['INFO']['VERSION']) >= LooseVersion('0.4.0.0') - is_v6 = LooseVersion(self.experiment.settings['INFO']['VERSION']) >= LooseVersion('0.6.0') + is_v4 = parse(self.experiment.settings["INFO"]["VERSION"]) >= parse("0.4.0.0") + is_v6 = parse(self.experiment.settings["INFO"]["VERSION"]) >= parse("0.6.0") with sync_messagefile.open("r") as fh: - info['_processor_names'] = [] - info['_processor_sample_rates'] = [] - info['_processor_start_frames'] = [] - info['messages'] = [] - info['_software_sample_rate'] = None - info['_software_start_frame'] = None + info["_processor_names"] = [] + info["_processor_sample_rates"] = [] + info["_processor_start_frames"] = [] + info["messages"] = [] + info["_software_sample_rate"] = None + info["_software_start_frame"] = None while True: sync_msg_line = fh.readline() - spl = [s.strip('\x00') for s in sync_msg_line.split()] + spl = [s.strip("\x00") for s in sync_msg_line.split()] if not spl: break - if 'Software' in spl: + if "Software" in spl: self.processor = False if is_v4: - stime = spl[-1].split('@') - hz_start = stime[-1].find('Hz') + stime = spl[-1].split("@") + hz_start = stime[-1].find("Hz") sr = float(stime[-1][:hz_start]) - info['_software_sample_rate'] = sr - info['_software_start_frame'] = int(stime[0]) + info["_software_sample_rate"] = sr + info["_software_start_frame"] = int(stime[0]) else: # There's no apparent encoding of a distinct software sampling rate, # so assume it is the maximum processor rate (set later) - info['_software_start_frame'] = int(spl[0]) - elif 'Processor:' in spl: + info["_software_start_frame"] = int(spl[0]) + elif "Processor:" in spl: self.processor = True if is_v4: - stime = spl[-1].split('@') - hz_start = stime[-1].find('Hz') + stime = spl[-1].split("@") + hz_start = stime[-1].find("Hz") stimes.append(float(stime[-1][:hz_start])) sr = float(stime[-1][:hz_start]) - info['_processor_sample_rates'].append(sr) - info['_processor_start_frames'].append(int(stime[0])) + info["_processor_sample_rates"].append(sr) + info["_processor_start_frames"].append(int(stime[0])) else: proc_id = spl[2] - for proc in self.experiment.settings['SIGNALCHAIN']['PROCESSOR']: - if proc['@NodeId'] != proc_id: + for proc in self.experiment.settings["SIGNALCHAIN"]["PROCESSOR"]: + if proc["@NodeId"] != proc_id: continue - encoded_rate = proc['EDITOR']['@SampleRate'] + encoded_rate = proc["EDITOR"]["@SampleRate"] sr = float(_enumerated_sample_rates[int(encoded_rate) - 1]) - info['_processor_sample_rates'].append(sr) - info['_processor_start_frames'].append(int(spl[-1])) - elif sync_msg_line.startswith('Start Time for') and is_v6: + info["_processor_sample_rates"].append(sr) + info["_processor_start_frames"].append(int(spl[-1])) + elif sync_msg_line.startswith("Start Time for") and is_v6: self.processor = True - match = re.match('Start Time for (.*) @ (\d+) Hz: (\d+)', sync_msg_line) + match = re.match(r"Start Time for (.*) @ (\d+) Hz: (\d+)", sync_msg_line) p_name, sr, stime = match.groups() - info['_processor_names'].append(p_name) - info['_processor_sample_rates'].append(float(sr)) - info['_processor_start_frames'].append(int(stime)) + info["_processor_names"].append(p_name) + info["_processor_sample_rates"].append(float(sr)) + info["_processor_start_frames"].append(int(stime)) else: - message = {'time': int(spl[0]), - 'message': ' '.join(spl[1:])} - info['messages'].append(message) + message = {"time": int(spl[0]), "message": " ".join(spl[1:])} + info["messages"].append(message) if not is_v4: - info['_software_sample_rate'] = max(info['_processor_sample_rates']) + info["_software_sample_rate"] = max(info["_processor_sample_rates"]) return info def _read_messages(self): - if self.format == 'binary': + if self.format == "binary": if self._events_folder is not None: - message_folder = [f for f in self._events_folder.iterdir() if 'Message_Center' in f.name][0] - text_groups = [f.parent for f in Path(message_folder).rglob('*text.npy')] - - if self.format == 'binary': + message_folder = [f for f in self._events_folder.iterdir() if "Message_Center" in f.name][0] + text_groups = [f.parent for f in Path(message_folder).rglob("*text.npy")] + + if self.format == "binary": for tg in text_groups: - text = np.load(tg / 'text.npy') - channels = np.load(tg / 'channels.npy') - ts = _load_timestamps(tg / 'timestamps.npy', self.sample_rate) + text = np.load(tg / "text.npy") + channels = np.load(tg / "channels.npy") + ts = _load_timestamps(tg / "timestamps.npy", self.sample_rate) ts -= self.start_time if len(text) > 0: @@ -641,41 +686,46 @@ def _read_messages(self): text=t.decode("utf-8"), ) self._messages.append(message_data) - elif self.format == 'openephys': + elif self.format == "openephys": pass self._message_dirty = False def _read_events(self): - if self.format == 'binary': + if self.format == "binary": if self._events_folder is not None: events = [] processor_folders = [] if self._oebin is not None: - if 'events' in self._oebin.keys(): + if "events" in self._oebin.keys(): events = self._oebin["events"] if len(events) > 0: processor_folders = [] for ev in events: # other methods to read tracking and messages - if 'Tracking_Port' not in ev['folder_name'] and 'Message_Center' not in ev['folder_name']: - processor_folders.append((self._events_folder / ev['folder_name']).parent) + if "Tracking_Port" not in ev["folder_name"] and "Message_Center" not in ev["folder_name"]: + processor_folders.append((self._events_folder / ev["folder_name"]).parent) else: - processor_folders = [f for f in self._events_folder.iterdir() if 'Tracking_Port' not in f.name and - 'Message_Center' not in f.name and not f.name.startswith('.')] + processor_folders = [ + f + for f in self._events_folder.iterdir() + if "Tracking_Port" not in f.name + and "Message_Center" not in f.name + and not f.name.startswith(".") + ] for processor_folder in processor_folders: # Read TTL groups - TTL_groups = [f for f in processor_folder.iterdir() if 'TTL' in f.name] + TTL_groups = [f for f in processor_folder.iterdir() if "TTL" in f.name] for ttl in TTL_groups: - full_words = np.load(ttl / 'full_words.npy') - ts = _load_timestamps(ttl / 'timestamps.npy', self.sample_rate) - channels = np.load(ttl / 'channels.npy').astype(int) + full_words = np.load(ttl / "full_words.npy") + ts = _load_timestamps(ttl / "timestamps.npy", self.sample_rate) + channels = np.load(ttl / "channels.npy").astype(int) unique_channels = np.unique(channels) - channel_states = np.load(ttl / 'channel_states.npy') - metadata_file = ttl / 'metadata.npy' + channel_states = np.load(ttl / "channel_states.npy") + metadata_file = ttl / "metadata.npy" if metadata_file.is_file(): metadata = np.load(metadata_file) else: @@ -706,18 +756,18 @@ def _read_events(self): full_words=fw_chans, processor=processor_folder_split[0], node_id=int(float(processor_folder_split[1])), - metadata=metadata_chan + metadata=metadata_chan, ) self._event_signals.append(event_data) # Read Binary groups - binary_groups = [f for f in processor_folder.iterdir() if 'binary' in f.name] + binary_groups = [f for f in processor_folder.iterdir() if "binary" in f.name] for bg in binary_groups: - full_words = np.load(bg / 'full_words.npy') - channels = np.load(bg / 'channels.npy').astype(int) - channel_states = np.load(bg / 'channel_states.npy') + full_words = np.load(bg / "full_words.npy") + channels = np.load(bg / "channels.npy").astype(int) + channel_states = np.load(bg / "channel_states.npy") channel_states = channel_states / np.max(channel_states).astype(int) - metadata_file = bg / 'metadata.npy' + metadata_file = bg / "metadata.npy" if metadata_file.is_file(): metadata = np.load(metadata_file) else: @@ -728,7 +778,7 @@ def _read_events(self): else: sample_rate = self.sample_rate - ts = _load_timestamps(bg / 'timestamps.npy', sample_rate) + ts = _load_timestamps(bg / "timestamps.npy", sample_rate) ts -= self.start_time processor_folder_split = processor_folder.name.split("-") @@ -741,23 +791,23 @@ def _read_events(self): full_words=full_words, processor=processor_folder_split[0], node_id=int(float(processor_folder_split[1])), - metadata=metadata + metadata=metadata, ) self._event_signals.append(event_data) - elif self.format == 'openephys': + elif self.format == "openephys": if self.experiment.id == 1: - ev_file = op.join(self.absolute_foldername, 'all_channels.events') + ev_file = op.join(self.absolute_foldername, "all_channels.events") else: - ev_file = op.join(self.absolute_foldername, 'all_channels_' + str(int(self.experiment.id)) + '.events') + ev_file = op.join(self.absolute_foldername, "all_channels_" + str(int(self.experiment.id)) + ".events") data = loadEvents(ev_file) - node_ids = np.unique(data['nodeId']).astype(int) + node_ids = np.unique(data["nodeId"]).astype(int) for node in node_ids: - idx_ev = np.where(data['nodeId'] == node)[0] - ts = data['timestamps'][idx_ev] / self.software_sample_rate - channels = data['channel'][idx_ev].astype(int) - channel_states = data['eventId'][idx_ev].astype(int) + idx_ev = np.where(data["nodeId"] == node)[0] + ts = data["timestamps"][idx_ev] / self.software_sample_rate + channels = data["channel"][idx_ev].astype(int) + channel_states = data["eventId"][idx_ev].astype(int) channel_states[channel_states == 0] = -1 node_id = int(float(node)) full_words = None @@ -771,7 +821,7 @@ def _read_events(self): full_words=full_words, processor=None, node_id=node_id, - metadata=metadata + metadata=metadata, ) self._event_signals.append(event_data) @@ -779,50 +829,44 @@ def _read_events(self): self._events_dirty = False def _read_tracking(self): - if 'Sources/Tracking Port' in self.sig_chain.keys(): - if self.format == 'binary': + if "Sources/Tracking Port" in self.sig_chain.keys(): + if self.format == "binary": # Check and decode files if self._events_folder is not None: - tracking_folder = [f for f in self._events_folder.iterdir() if 'Tracking_Port' in f.name][0] + tracking_folder = [f for f in self._events_folder.iterdir() if "Tracking_Port" in f.name][0] binary_groups = [f for f in tracking_folder.iterdir()] for bg in binary_groups: - data_array = np.load(bg / 'data_array.npy') - channels = np.load(bg / 'channels.npy') - metadata = np.load(bg / 'metadata.npy') - data_array = np.array([struct.unpack('4f', d) for d in data_array]) + data_array = np.load(bg / "data_array.npy") + channels = np.load(bg / "channels.npy") + metadata = np.load(bg / "metadata.npy") + data_array = np.array([struct.unpack("4f", d) for d in data_array]) if self.software_sample_rate is not None: sample_rate = self.software_sample_rate else: sample_rate = self.sample_rate - ts = _load_timestamps(bg / 'timestamps.npy', sample_rate) + ts = _load_timestamps(bg / "timestamps.npy", sample_rate) ts -= self.start_time if len(ts) > 0: x, y, w, h = data_array[:, 0], data_array[:, 1], data_array[:, 2], data_array[:, 3] tracking_data = TrackingData( - times=ts, - x=x, - y=y, - channels=channels, - metadata=metadata, - width=w, - height=h + times=ts, x=x, y=y, channels=channels, metadata=metadata, width=w, height=h ) self._tracking_signals.append(tracking_data) - elif self.format == 'openephys': - print("Unfortunately, tracking is not saved in 'openephys' format. Use 'binary' instead!") + elif self.format == "openephys": + warnings.warn("tracking is not saved in 'openephys' format. Use 'binary' instead!") else: - print("Tracking is not found!") + warnings.warn("Tracking is not found!") self._tracking_dirty = False def _read_analog_signals(self): self._analog_signals = [] if self.experiment.acquisition_system is not None: - if self.format == 'binary': + if self.format == "binary": # Check and decode files if self._continuous_folder is not None: # Fix THIS! @@ -833,16 +877,16 @@ def _read_analog_signals(self): data_folder = self._continuous_folder / cont["folder_name"] nchan = cont["num_channels"] sample_rate = cont["sample_rate"] - datfiles = [f for f in data_folder.iterdir() if f.name == 'continuous.dat'] + datfiles = [f for f in data_folder.iterdir() if f.name == "continuous.dat"] if len(datfiles) == 1: datfile = datfiles[0] with datfile.open("rb") as fh: anas, nsamples = read_analog_binary_signals(fh, nchan) - ts = _load_timestamps(data_folder / 'timestamps.npy', sample_rate) + ts = _load_timestamps(data_folder / "timestamps.npy", sample_rate) self._start_times.append(ts[0] * pq.s) if len(ts) != nsamples: - warnings.warn('timestamps and nsamples are different ({})!'.format(data_folder)) + warnings.warn("timestamps and nsamples are different ({})!".format(data_folder)) ts = np.arange(nsamples) / sample_rate else: ts -= ts[0] @@ -855,14 +899,16 @@ def _read_analog_signals(self): gains.append(ch["bit_volts"]) ts = ts * pq.s - self._analog_signals.append(AnalogSignal( - channel_ids=range(anas.shape[0]), - channel_names=channel_names, - signal=anas, - times=ts, - gains=gains, - sample_rate=sample_rate - )) + self._analog_signals.append( + AnalogSignal( + channel_ids=range(anas.shape[0]), + channel_names=channel_names, + signal=anas, + times=ts, + gains=gains, + sample_rate=sample_rate, + ) + ) elif len(datfiles) > 1: raise ValueError("Multiple '.dat' files in folder, expected 1") else: @@ -874,20 +920,20 @@ def _read_analog_signals(self): if len(processor_folders) > 1: for c in processor_folders: # only get source continuous processors - if 'Rhythm_FPGA' in c or 'Intan' in c or 'File' in c or "NPIX" in c: + if "Rhythm_FPGA" in c or "Intan" in c or "File" in c or "NPIX" in c: processor_folder = c else: processor_folder = processor_folders[0] filenames = [f for f in os.listdir(processor_folder)] - if any('.dat' in f for f in filenames): - datfile = [f for f in filenames if '.dat' in f and 'continuous' in f][0] + if any(".dat" in f for f in filenames): + datfile = [f for f in filenames if ".dat" in f and "continuous" in f][0] with open(op.join(processor_folder, datfile), "rb") as fh: anas, nsamples = read_analog_binary_signals(fh, self.nchan) - ts = _load_timestamps(processor_folder / 'timestamps.npy', sample_rate) + ts = _load_timestamps(processor_folder / "timestamps.npy", sample_rate) self._start_times.append(ts[0] * pq.s) if len(ts) != nsamples: - warnings.warn('timestamps and nsamples are different!') + warnings.warn("timestamps and nsamples are different!") ts = np.arange(nsamples) / self.sample_rate.magnitude else: ts -= ts[0] @@ -895,26 +941,34 @@ def _read_analog_signals(self): raise ValueError("'continuous.dat' should be in the folder") ts = ts * pq.s - self._analog_signals.append(AnalogSignal( - channel_ids=range(anas.shape[0]), - signal=anas, - times=ts, - sample_rate=self.sample_rate.magnitude, - gains=np.ones(anas.shape[0]) * fixed_gain - )) + self._analog_signals.append( + AnalogSignal( + channel_ids=range(anas.shape[0]), + signal=anas, + times=ts, + sample_rate=self.sample_rate.magnitude, + gains=np.ones(anas.shape[0]) * fixed_gain, + ) + ) - elif self.format == 'openephys': + elif self.format == "openephys": fixed_gain = 0.195 # Find continuous CH data if self.experiment.id == 1: - cont_files = [f for f in self.absolute_foldername.iterdir() if 'continuous' in f.name and - 'CH' in f.name and len(f.name.split('_')) == 2] + cont_files = [ + f + for f in self.absolute_foldername.iterdir() + if "continuous" in f.name and "CH" in f.name and len(f.name.split("_")) == 2 + ] else: - cont_files = [f for f in self.absolute_foldername.iterdir() if 'continuous' in f.name and - 'CH' in f.name and '_' + str(self.experiment.id) in f.name] + cont_files = [ + f + for f in self.absolute_foldername.iterdir() + if "continuous" in f.name and "CH" in f.name and "_" + str(self.experiment.id) in f.name + ] # sort channels - idxs = [int(x.name[x.name.find('CH') + 2: x.name.find('.')]) for x in cont_files] + idxs = [int(x.name[x.name.find("CH") + 2 : x.name.find(".")]) for x in cont_files] cont_files = list(np.array(cont_files)[np.argsort(idxs)]) if len(cont_files) > 0: @@ -923,20 +977,20 @@ def _read_analog_signals(self): for i_f, f in enumerate(cont_files): fullpath = f sig = loadContinuous(str(fullpath)) - block_len = int(sig['header']['blockLength']) - sample_rate = float(sig['header']['sampleRate']) + block_len = int(sig["header"]["blockLength"]) + sample_rate = float(sig["header"]["sampleRate"]) if anas.shape[0] < 1: - anas = sig['data'][None, :] + anas = sig["data"][None, :] else: - if sig['data'].size == anas[-1].size: - anas = np.append(anas, sig['data'][None, :], axis=0) + if sig["data"].size == anas[-1].size: + anas = np.append(anas, sig["data"][None, :], axis=0) else: - raise Exception('Channels must have the same number of samples') + raise Exception("Channels must have the same number of samples") if i_f == len(cont_files) - 1: # Recordings number - rec_num = sig['recordingNumber'] - timestamps = sig['timestamps'] + rec_num = sig["recordingNumber"] + timestamps = sig["timestamps"] idx_rec = np.where(rec_num == self.id)[0] if len(idx_rec) > 0: idx_start = idx_rec[0] @@ -951,39 +1005,39 @@ def _read_analog_signals(self): anas = anas[:, anas_start:anas_end] self._processor_sample_rates = [sample_rate] - self._analog_signals = [AnalogSignal( - channel_ids=range(anas.shape[0]), - signal=anas, - times=ts, - sample_rate=self.sample_rate.magnitude, - gains=np.ones(anas.shape[0]) * fixed_gain, - )] + self._analog_signals = [ + AnalogSignal( + channel_ids=range(anas.shape[0]), + signal=anas, + times=ts, + sample_rate=self.sample_rate.magnitude, + gains=np.ones(anas.shape[0]) * fixed_gain, + ) + ] else: - self._analog_signals = [AnalogSignal( - channel_ids=np.array([]), - signal=np.array([]), - times=np.array([]), - gains=0 - )] + self._analog_signals = [ + AnalogSignal(channel_ids=np.array([]), signal=np.array([]), times=np.array([]), gains=0) + ] self._analog_signals_dirty = False def _read_spiketrains(self): - if self.format == 'binary': + if self.format == "binary": # Check and decode files if self._spikes_folder is not None: - processor_folders = [f for f in self._spikes_folder.iterdir() if f.is_dir() and - not f.name.startswith('.')] + processor_folders = [ + f for f in self._spikes_folder.iterdir() if f.is_dir() and not f.name.startswith(".") + ] for processor_folder in processor_folders: - spike_groups = [f for f in processor_folder.iterdir() if not f.name.startswith('.')] + spike_groups = [f for f in processor_folder.iterdir() if not f.name.startswith(".")] for sg in spike_groups: - spike_clusters = np.load(sg / 'spike_clusters.npy') - spike_electrode_indices = np.load(sg / 'spike_electrode_indices.npy') - spike_times = np.load(sg / 'spike_times.npy') - spike_waveforms = np.load(sg / 'spike_waveforms.npy') + spike_clusters = np.load(sg / "spike_clusters.npy") + spike_electrode_indices = np.load(sg / "spike_electrode_indices.npy") + spike_times = np.load(sg / "spike_times.npy") + spike_waveforms = np.load(sg / "spike_waveforms.npy") - metadata_file = sg / 'metadata.npy' + metadata_file = sg / "metadata.npy" if metadata_file.is_file(): metadata = np.load(metadata_file) else: @@ -994,17 +1048,19 @@ def _read_spiketrains(self): clusters = np.unique(spike_clusters) for clust in clusters: idx = np.where(spike_clusters == clust)[0] - spiketrain = SpikeTrain(times=spike_times[idx], - waveforms=spike_waveforms[idx], - electrode_indices=spike_electrode_indices[idx], - cluster=clust, - metadata=metadata) + spiketrain = SpikeTrain( + times=spike_times[idx], + waveforms=spike_waveforms[idx], + electrode_indices=spike_electrode_indices[idx], + cluster=clust, + metadata=metadata, + ) self._spiketrains.append(spiketrain) - elif self.format == 'openephys': - filenames = [f for f in self.absolute_foldername.iterdir() if f.suffix == '.spikes'] + elif self.format == "openephys": + filenames = [f for f in self.absolute_foldername.iterdir() if f.suffix == ".spikes"] # order channels - idxs = [int(x.name.split('.')[1][x.name.split('.')[1].find('0n') + 2:]) for x in filenames] + idxs = [int(x.name.split(".")[1][x.name.split(".")[1].find("0n") + 2 :]) for x in filenames] filenames = list(np.array(filenames)[np.argsort(idxs)]) if len(filenames) != 0: @@ -1020,34 +1076,38 @@ def _read_spiketrains(self): data = loadSpikes(str(fpath)) if i_f == 0: - spike_clusters = np.max(data['sortedId'], axis=1).astype(int) - spike_times = data['timestamps'] - spike_electrode_indices = np.array([int(fname[fname.find('0n') + 2]) + 1] - * len(spike_clusters)) - spike_waveforms = data['spikes'].swapaxes(1, 2) + spike_clusters = np.max(data["sortedId"], axis=1).astype(int) + spike_times = data["timestamps"] + spike_electrode_indices = np.array([int(fname[fname.find("0n") + 2]) + 1] * len(spike_clusters)) + spike_waveforms = data["spikes"].swapaxes(1, 2) else: - spike_clusters = np.hstack((spike_clusters, np.max(data['sortedId'], axis=1).astype(int))) - spike_times = np.hstack((spike_times, data['timestamps'])) - spike_electrode_indices = np.hstack((spike_electrode_indices, - np.array([int(fname[fname.find('0n') + 2]) + 1] - * len(data['sortedId'])))) - spike_waveforms = np.vstack((spike_waveforms, data['spikes'].swapaxes(1, 2))) + spike_clusters = np.hstack((spike_clusters, np.max(data["sortedId"], axis=1).astype(int))) + spike_times = np.hstack((spike_times, data["timestamps"])) + spike_electrode_indices = np.hstack( + ( + spike_electrode_indices, + np.array([int(fname[fname.find("0n") + 2]) + 1] * len(data["sortedId"])), + ) + ) + spike_waveforms = np.vstack((spike_waveforms, data["spikes"].swapaxes(1, 2))) clusters = np.unique(spike_clusters) spike_times = spike_times / self.sample_rate spike_times -= self.start_time for clust in clusters: idx = np.where(spike_clusters == clust)[0] - spiketrain = SpikeTrain(times=spike_times[idx], - waveforms=spike_waveforms[idx], - electrode_indices=spike_electrode_indices[idx], - cluster=clust, - metadata=None) + spiketrain = SpikeTrain( + times=spike_times[idx], + waveforms=spike_waveforms[idx], + electrode_indices=spike_electrode_indices[idx], + cluster=clust, + metadata=None, + ) self._spiketrains.append(spiketrain) self._spiketrains_dirty = False - def clip_recording(self, clipping_times, start_end='start'): + def clip_recording(self, clipping_times, start_end="start"): """ Clips recording, including analog signals, events, spike trains, and tracking @@ -1088,19 +1148,19 @@ def clip_recording(self, clipping_times, start_end='start'): else: print("Clipping times are outside of timestamps range") else: - print('Empty clipping times list.') + print("Empty clipping times list.") def export_matlab(self, filename): from scipy import io as sio - dict_to_save = {'duration': self.duration.rescale('s'), 'timestamps': self.times.rescale('s')} + dict_to_save = {"duration": self.duration.rescale("s"), "timestamps": self.times.rescale("s")} if len(self.tracking) != 0: - dict_to_save.update({'tracking': np.array([[tr.x, tr.y] for tr in self.tracking])}) + dict_to_save.update({"tracking": np.array([[tr.x, tr.y] for tr in self.tracking])}) if len(self.analog_signals) != 0: - dict_to_save.update({'analog': np.array([sig.signal for sig in self.analog_signals])}) + dict_to_save.update({"analog": np.array([sig.signal for sig in self.analog_signals])}) if len(self.events) != 0: - dict_to_save.update({'events': np.array([ev.times for ev in self.events])}) + dict_to_save.update({"events": np.array([ev.times for ev in self.events])}) sio.savemat(filename, dict_to_save) @@ -1118,14 +1178,20 @@ def _load_timestamps(ts_npy_file, sample_rate): ts_diff = np.diff(ts) if any(ts_diff <= 0): - warnings.warn('Loaded timestamps ({}) not monotonically increasing - constructing timestamps from sample rate instead!'.format(ts_npy_file)) + warnings.warn( + "Loaded timestamps ({}) not monotonically increasing - constructing timestamps from sample rate instead!".format( + ts_npy_file + ) + ) return np.arange(len(ts)) / sample_rate period = np.median(ts_diff) if period == 1: return ts / sample_rate - fs = 1/period + fs = 1 / period if not np.isclose(sample_rate, fs, rtol=3e-4): - raise ValueError(f'Error loading timestamps ({ts_npy_file})\nSignificant discrepancy found in the provided sample rate ({sample_rate}) and that computed from the data ({fs})') + raise ValueError( + f"Error loading timestamps ({ts_npy_file})\nSignificant discrepancy found in the provided sample rate ({sample_rate}) and that computed from the data ({fs})" + ) return ts diff --git a/pyopenephys/openephys_tools.py b/pyopenephys/openephys_tools.py index da415b7..a35ebee 100644 --- a/pyopenephys/openephys_tools.py +++ b/pyopenephys/openephys_tools.py @@ -1,6 +1,6 @@ -''' +""" This code was adapted from OpenEphys.py (https://github.com/open-ephys/analysis-tools/blob/master/Python3/OpenEphys.py) -''' +""" import os import numpy as np @@ -17,16 +17,16 @@ MAX_NUMBER_OF_RECORDS = int(1e6) MAX_NUMBER_OF_EVENTS = int(1e6) -def loadContinuous(filepath, dtype='int16'): - assert dtype in ('float', 'int16'), \ - 'Invalid data type specified for loadContinous, valid types are float and int16' + +def loadContinuous(filepath, dtype="int16"): + assert dtype in ("float", "int16"), "Invalid data type specified for loadContinous, valid types are float and int16" # print("Loading continuous data...") ch = {} # read in the data - f = open(filepath, 'rb') + f = open(filepath, "rb") fileLength = os.fstat(f.fileno()).st_size @@ -47,53 +47,54 @@ def loadContinuous(filepath, dtype='int16'): recIndices = np.arange(0, nrec) for recordNumber in recIndices: - - timestamps[recordNumber] = np.fromfile(f, np.dtype('u2'), 1)) # big-endian 16-bit unsigned integer + recordingNumbers[recordNumber] = np.fromfile(f, np.dtype(">u2"), 1) # big-endian 16-bit unsigned integer - if dtype == 'float': # Convert data to float array and convert bits to voltage. - data = np.fromfile(f, np.dtype('>i2'), N) * float(header['bitVolts']) # big-endian 16-bit signed integer, multiplied by bitVolts + if dtype == "float": # Convert data to float array and convert bits to voltage. + data = np.fromfile(f, np.dtype(">i2"), N) * float( + header["bitVolts"] + ) # big-endian 16-bit signed integer, multiplied by bitVolts else: # Keep data in signed 16 bit integer format. - data = np.fromfile(f, np.dtype('>i2'), N) # big-endian 16-bit signed integer - samples[indices[recordNumber]:indices[recordNumber + 1]] = data + data = np.fromfile(f, np.dtype(">i2"), N) # big-endian 16-bit signed integer + samples[indices[recordNumber] : indices[recordNumber + 1]] = data marker = f.read(10) # dump # print recordNumber # print index - ch['header'] = header - ch['timestamps'] = timestamps - ch['data'] = samples # OR use downsample(samples,1), to save space - ch['recordingNumber'] = recordingNumbers + ch["header"] = header + ch["timestamps"] = timestamps + ch["data"] = samples # OR use downsample(samples,1), to save space + ch["recordingNumber"] = recordingNumbers f.close() return ch def loadSpikes(filepath): - ''' + """ Loads spike waveforms and timestamps from filepath (should be .spikes file) - ''' + """ data = {} # print('loading spikes...') - f = open(filepath, 'rb') + f = open(filepath, "rb") header = readHeader(f) - if float(header[' version']) < 0.4: - raise Exception('Loader is only compatible with .spikes files with version 0.4 or higher') + if float(header[" version"]) < 0.4: + raise Exception("Loader is only compatible with .spikes files with version 0.4 or higher") - data['header'] = header - numChannels = int(header['num_channels']) + data["header"] = header + numChannels = int(header["num_channels"]) numSamples = 40 # **NOT CURRENTLY WRITTEN TO HEADER** spikes = np.zeros((MAX_NUMBER_OF_SPIKES, numSamples, numChannels)) @@ -108,23 +109,23 @@ def loadSpikes(filepath): currentSpike = 0 while f.tell() < os.fstat(f.fileno()).st_size: - eventType = np.fromfile(f, np.dtype(' clipping_times[0]) & (times < clipping_times[1])) elif len(clipping_times) == 1: - if start_end == 'start': + if start_end == "start": idx = np.where(times > clipping_times[0]) - elif start_end == 'end': + elif start_end == "end": idx = np.where(times < clipping_times[0]) else: - raise AttributeError('clipping_times must be of length 1 or 2') + raise AttributeError("clipping_times must be of length 1 or 2") if len(analog_signals.signal.shape) == 2: analog_signals.signal = analog_signals.signal[:, idx[0]] @@ -54,12 +54,12 @@ def clip_events(events, clipping_times, start_end): if len(clipping_times) == 2: idx = np.where((times > clipping_times[0]) & (times < clipping_times[1])) elif len(clipping_times) == 1: - if start_end == 'start': + if start_end == "start": idx = np.where(times > clipping_times[0]) - elif start_end == 'end': + elif start_end == "end": idx = np.where(times < clipping_times[0]) else: - raise AttributeError('clipping_times must be of length 1 or 2') + raise AttributeError("clipping_times must be of length 1 or 2") events.times = times[idx] events.channel_states = events.channel_states[idx] @@ -88,12 +88,12 @@ def clip_tracking(tracking, clipping_times, start_end): if len(clipping_times) == 2: idx = np.where((times > clipping_times[0]) & (times < clipping_times[1])) elif len(clipping_times) == 1: - if start_end == 'start': + if start_end == "start": idx = np.where(times > clipping_times[0]) - elif start_end == 'end': + elif start_end == "end": idx = np.where(times < clipping_times[0]) else: - raise AttributeError('clipping_times must be of length 1 or 2') + raise AttributeError("clipping_times must be of length 1 or 2") tracking.times = times[idx] tracking.x = tracking.x[idx] @@ -123,12 +123,12 @@ def clip_spiketrains(sptr, clipping_times, start_end): if len(clipping_times) == 2: idx = np.where((times > clipping_times[0]) & (times < clipping_times[1])) elif len(clipping_times) == 1: - if start_end == 'start': + if start_end == "start": idx = np.where(times > clipping_times[0]) - elif start_end == 'end': + elif start_end == "end": idx = np.where(times < clipping_times[0]) else: - raise AttributeError('clipping_times must be of length 1 or 2') + raise AttributeError("clipping_times must be of length 1 or 2") sptr.times = times[idx] sptr.waveforms = sptr.waveforms[idx] @@ -137,7 +137,7 @@ def clip_spiketrains(sptr, clipping_times, start_end): sptr.metadata = sptr.metadata[idx] -def clip_times(times, clipping_times, start_end='start'): +def clip_times(times, clipping_times, start_end="start"): """ Clips timestamps @@ -155,23 +155,21 @@ def clip_times(times, clipping_times, start_end='start'): if len(clipping_times) == 2: idx = np.where((times > clipping_times[0]) & (times <= clipping_times[1])) elif len(clipping_times) == 1: - if start_end == 'start': + if start_end == "start": idx = np.where(times >= clipping_times[0]) - elif start_end == 'end': + elif start_end == "end": idx = np.where(times <= clipping_times[0]) else: - raise AttributeError('clipping_times must be of length 1 or 2') + raise AttributeError("clipping_times must be of length 1 or 2") times_clip = times[idx] return times_clip def read_analog_binary_signals(filehandle, numchan): - numchan=int(numchan) - nsamples = os.fstat(filehandle.fileno()).st_size // (numchan*2) - samples = np.memmap(filehandle, np.dtype('i2'), mode='r', - shape=(nsamples, numchan)) + numchan = int(numchan) + nsamples = os.fstat(filehandle.fileno()).st_size // (numchan * 2) + samples = np.memmap(filehandle, np.dtype("i2"), mode="r", shape=(nsamples, numchan)) samples = np.transpose(samples) return samples, nsamples - diff --git a/pyopenephys/version.py b/pyopenephys/version.py index 6e486e4..c203c5d 100644 --- a/pyopenephys/version.py +++ b/pyopenephys/version.py @@ -1 +1 @@ -version = '1.1.6' +version = "1.2.0" diff --git a/tests/test_pyopenephys_GIN.py b/tests/test_pyopenephys_GIN.py index 12dd1a2..5b14d5d 100644 --- a/tests/test_pyopenephys_GIN.py +++ b/tests/test_pyopenephys_GIN.py @@ -1,21 +1,19 @@ -import numpy as np import pyopenephys import tempfile import unittest from pathlib import Path -import quantities as pq from datalad.api import install, Dataset from parameterized import parameterized class TestPyopenephysConversions(unittest.TestCase): def setUp(self): - pt = Path.cwd() / 'ephy_testing_data' + pt = Path.cwd() / "ephy_testing_data" if pt.exists(): self.dataset = Dataset(pt) else: - self.dataset = install('https://gin.g-node.org/NeuralEnsemble/ephy_testing_data') + self.dataset = install("https://gin.g-node.org/NeuralEnsemble/ephy_testing_data") self.savedir = Path(tempfile.mkdtemp()) def get_data(self, rt_write_fname, rt_read_fname, save_fname, dataset_path): @@ -24,37 +22,43 @@ def get_data(self, rt_write_fname, rt_read_fname, save_fname, dataset_path): save_path = self.savedir / save_fname rt_write_path = self.savedir / rt_write_fname rt_read_path = self.savedir / rt_read_fname - resp = self.dataset.get(dataset_path) + _ = self.dataset.get(dataset_path) return rt_write_path, rt_read_path, save_path - @parameterized.expand([ - ( + @parameterized.expand( + [ + ( True, "openephys/OpenEphys_SampleData_1", - Path.cwd() / "ephy_testing_data" / "openephys" / "OpenEphys_SampleData_1" - ), - ( + Path.cwd() / "ephy_testing_data" / "openephys" / "OpenEphys_SampleData_1", + ), + ( True, "openephys/OpenEphys_SampleData_2_(multiple_starts)", - Path.cwd() / "ephy_testing_data" / "openephys" / "OpenEphys_SampleData_2_(multiple_starts)" - ), - ( + Path.cwd() / "ephy_testing_data" / "openephys" / "OpenEphys_SampleData_2_(multiple_starts)", + ), + ( True, "openephys/OpenEphys_SampleData_3", - Path.cwd() / "ephy_testing_data" / "openephys" / "OpenEphys_SampleData_1" - ), - ( + Path.cwd() / "ephy_testing_data" / "openephys" / "OpenEphys_SampleData_1", + ), + ( True, "openephysbinary/v0.4.4.1_with_video_tracking", - Path.cwd() / "ephy_testing_data" / "openephysbinary" / "v0.4.4.1_with_video_tracking" - ), - ( + Path.cwd() / "ephy_testing_data" / "openephysbinary" / "v0.4.4.1_with_video_tracking", + ), + ( True, "openephysbinary/v0.5.3_two_neuropixels_stream", - Path.cwd() / "ephy_testing_data" / "openephysbinary" / "v0.5.3_two_neuropixels_stream" / "Record_Node_107" - ), - ]) + Path.cwd() + / "ephy_testing_data" + / "openephysbinary" + / "v0.5.3_two_neuropixels_stream" + / "Record_Node_107", + ), + ] + ) def test_open_file(self, download, dataset_path, foldername): if download: print(f"Testing GIN {dataset_path}") @@ -74,7 +78,8 @@ def test_open_file(self, download, dataset_path, foldername): for r, rec in enumerate(recordings): print( - f"\nRecording {r} - duration {rec.duration} - acquisition {rec.experiment.acquisition_system}") + f"\nRecording {r} - duration {rec.duration} - acquisition {rec.experiment.acquisition_system}" + ) analog = rec.analog_signals gains = [an.gains for an in analog] signal_shapes = [an.signal.shape for an in analog] @@ -89,8 +94,7 @@ def test_open_file(self, download, dataset_path, foldername): print(f"N tracking: {len(tracking)}") # test clipping - print(f"Test clipping") - duration = rec.duration + print("Test clipping") clip_times = 0.2 rec.clip_recording(clip_times) clip_times = [0.3, 0.8] @@ -99,5 +103,5 @@ def test_open_file(self, download, dataset_path, foldername): print(f"{foldername} not found!") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()