diff --git a/README.rst b/README.rst index a6ba67a..2695ee2 100644 --- a/README.rst +++ b/README.rst @@ -76,6 +76,33 @@ The latest release, including source and binary packages for Linux, macOS and Windows, is available for download from the `Python Package Index`_. You can find source releases at the `Releases Page`_. + +Highlevel interface +------------ + +pyEDFlib includes an highlevel interface for easy access to read and write edf files. +Additionally functionality as anonymizing, dropping or renaming channels can be found there. + +.. code-block:: Python + + from pyedflib import highlevel + + # write an edf file + signals = np.random.rand(5, 256*300)*200 # 5 minutes of random signal + channel_names = ['ch1', 'ch2', 'ch3', 'ch4', 'ch5'] + signal_headers = highlevel.make_signal_headers(channel_names, sample_rate=256) + header = highlevel.make_header(patientname='patient_x', gender='Female') + highlevel.write_edf('edf_file.edf', signals, signal_headers, header) + + # read an edf file + signals, signal_headers, header = highlevel.read_edf('edf_file.edf') + print(signal_headers[0]['sample_rate']) # prints 256 + + # drop a channel from the file and anonymize + highlevel.drop_channels('edf_file.edf', to_drop=['ch2', 'ch4']) + highlevel.anonymize('edf_file.edf') + + License ------- diff --git a/appveyor.yml b/appveyor.yml index 66c1155..8382c8a 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -25,7 +25,7 @@ install: - "util\\appveyor\\build.cmd %PYTHON%\\python.exe -m pip install numpy --cache-dir c:\\tmp\\pip-cache" - "util\\appveyor\\build.cmd %PYTHON%\\python.exe -m pip install - Cython nose coverage matplotlib --cache-dir c:\\tmp\\pip-cache" + Cython nose coverage matplotlib dateparser tqdm --cache-dir c:\\tmp\\pip-cache" test_script: - "util\\appveyor\\build.cmd %PYTHON%\\python.exe setup.py build --build-lib build\\lib\\" diff --git a/circle.yml b/circle.yml index 8c2c3c3..564d94e 100644 --- a/circle.yml +++ b/circle.yml @@ -3,7 +3,7 @@ dependencies: - sudo apt-get install texlive-fonts-recommended - pip install -q --install-option="--no-cython-compile" Cython==0.23.4 - pip install -q numpy - - pip install -q nose mpmath argparse Pillow codecov matplotlib Sphinx==1.5.5 + - pip install -q nose mpmath argparse Pillow codecov matplotlib Sphinx==1.5.5 tqdm - git submodule init - git submodule update - python setup.py build diff --git a/pyedflib/__init__.py b/pyedflib/__init__.py index a28a4a2..02b7e24 100644 --- a/pyedflib/__init__.py +++ b/pyedflib/__init__.py @@ -10,6 +10,7 @@ from ._extensions._pyedflib import * from .edfwriter import * from .edfreader import * +from . import highlevel from . import data diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py new file mode 100644 index 0000000..34eb7cb --- /dev/null +++ b/pyedflib/highlevel.py @@ -0,0 +1,772 @@ + +# -*- coding: utf-8 -*- +# Copyright (c) 2015 - 2017 Holger Nahrstaedt +# Copyright (c) 2011, 2015, Chris Lee-Messer +# Copyright (c) 2016-2017 The pyedflib Developers +# +# See LICENSE for license details. +""" +Created on Tue Jan 7 12:13:47 2020 + +This file contains high-level functions to work with pyedflib. + +Includes + - Reading and writing EDFs + - Anonymizing EDFs + - Comparing EDFs + - Renaming Channels from EDF files + - Dropping Channels from EDF files + +@author: skjerns +""" + +import os +import numpy as np +import warnings +import pyedflib +from datetime import datetime +# from . import EdfWriter +# from . import EdfReader + +def tqdm(iteratable, *args, **kwargs): + """ + These is an optional dependecies that shows a progress bar for some + of the functions, e.g. loading. + + install this dependency with `pip install tqdm` + + if not installed this is just a pass through iterator + """ + try: + from tqd3m import tqdm as iterator + return iterator(iteratable, *args, **kwargs) + except: + return iteratable + + +def _parse_date(string): + """ + A simple dateparser that detects common date formats + + Parameters + ---------- + string : str + a date string in format as denoted below. + + Returns + ------- + datetime.datetime + datetime object of a time. + + """ + # some common formats. + formats = ['%Y-%m-%d', '%d-%m-%Y', '%d.%m.%Y', '%Y.%m.%d', '%d %b %Y', + '%Y/%m/%d', '%d/%m/%Y'] + for f in formats: + try: + return datetime.strptime(string, f) + except: + pass + try: + import dateparser + return dateparser.parse(string) + except: + print('dateparser is not installed. to convert strings to dates'\ + 'install via `pip install dateparser`.') + raise ValueError('birthdate must be datetime object or of format'\ + ' `%d-%m-%Y`, eg. `24-01-2020`') + +def dig2phys(signal, dmin, dmax, pmin, pmax): + """ + converts digital edf values to physical values + + Parameters + ---------- + signal : np.ndarray or int + A numpy array with int values (digital values) or an int. + dmin : int + digital minimum value of the edf file (eg -2048). + dmax : int + digital maximum value of the edf file (eg 2048). + pmin : float + physical maximum value of the edf file (eg -200.0). + pmax : float + physical maximum value of the edf file (eg 200.0). + + Returns + ------- + physical : np.ndarray or float + converted physical values + + """ + m = (pmax-pmin) / (dmax-dmin) + physical = m * signal + return physical + +def phys2dig(signal, dmin, dmax, pmin, pmax): + """ + converts physical values to digital values + + Parameters + ---------- + signal : np.ndarray or int + A numpy array with int values (digital values) or an int. + dmin : int + digital minimum value of the edf file (eg -2048). + dmax : int + digital maximum value of the edf file (eg 2048). + pmin : float + physical maximum value of the edf file (eg -200.0). + pmax : float + physical maximum value of the edf file (eg 200.0). + + Returns + ------- + digital : np.ndarray or int + converted digital values + + """ + m = (dmax-dmin)/(pmax-pmin) + digital = (m * signal) + return digital + + + +def make_header(technician='', recording_additional='', patientname='', + patient_additional='', patientcode= '', equipment= '', + admincode= '', gender= '', startdate=None, birthdate= ''): + """ + A convenience function to create an EDF header (a dictionary) that + can be used by pyedflib to update the main header of the EDF + + Parameters + ---------- + technician : str, optional + name of the technician. The default is ''. + recording_additional : str, optional + comments etc. The default is ''. + patientname : str, optional + the name of the patient. The default is ''. + patient_additional : TYPE, optional + more info about the patient. The default is ''. + patientcode : str, optional + alphanumeric code. The default is ''. + equipment : str, optional + which system was used. The default is ''. + admincode : str, optional + code of the admin. The default is ''. + gender : str, optional + gender of patient. The default is ''. + startdate : datetime.datetime, optional + startdate of recording. The default is None. + birthdate : str/datetime.datetime, optional + date of birth of the patient. The default is ''. + + Returns + ------- + header : dict + a dictionary with the values given filled in. + + """ + + if not birthdate=='' and isinstance(birthdate, str): + birthdate = _parse_date(birthdate) + if startdate is None: + now = datetime.now() + startdate = datetime(now.year, now.month, now.day, + now.hour, now.minute, now.second) + del now + if isinstance(birthdate, datetime): + birthdate = birthdate.strftime('%d %b %Y').lower() + local = locals() + header = {} + for var in local: + if isinstance(local[var], datetime): + header[var] = local[var] + else: + header[var] = str(local[var]) + return header + + +def make_signal_header(label, dimension='uV', sample_rate=256, + physical_min=-200, physical_max=200, digital_min=-32768, + digital_max=32767, transducer='', prefiler=''): + """ + A convenience function that creates a signal header for a given signal. + This can be used to create a list of signal headers that is used by + pyedflib to create an edf. With this, different sampling frequencies + can be indicated. + + Parameters + ---------- + label : str + the name of the channel. + dimension : str, optional + dimension, eg mV. The default is 'uV'. + sample_rate : int, optional + sampling frequency. The default is 256. + physical_min : float, optional + minimum value in dimension. The default is -200. + physical_max : float, optional + maximum value in dimension. The default is 200. + digital_min : int, optional + digital minimum of the ADC. The default is -32768. + digital_max : int, optional + digital maximum of the ADC. The default is 32767. + transducer : str, optional + electrode type that was used. The default is ''. + prefiler : str, optional + filtering and sampling method. The default is ''. + + Returns + ------- + signal_header : dict + a signal header that can be used to save a channel to an EDF. + + """ + + signal_header = {'label': label, + 'dimension': dimension, + 'sample_rate': sample_rate, + 'physical_min': physical_min, + 'physical_max': physical_max, + 'digital_min': digital_min, + 'digital_max': digital_max, + 'transducer': transducer, + 'prefilter': prefiler} + return signal_header + + +def make_signal_headers(list_of_labels, dimension='uV', sample_rate=256, + physical_min=-200, physical_max=200, digital_min=-32768, + digital_max=32767, transducer='', prefiler=''): + """ + A function that creates signal headers for a given list of channel labels. + This can only be used if each channel has the same sampling frequency + + Parameters + ---------- + list_of_labels : list of str + A list with labels for each channel. + dimension : str, optional + dimension, eg mV. The default is 'uV'. + sample_rate : int, optional + sampling frequency. The default is 256. + physical_min : float, optional + minimum value in dimension. The default is -200. + physical_max : float, optional + maximum value in dimension. The default is 200. + digital_min : int, optional + digital minimum of the ADC. The default is -32768. + digital_max : int, optional + digital maximum of the ADC. The default is 32767. + transducer : str, optional + electrode type that was used. The default is ''. + prefiler : str, optional + filtering and sampling method. The default is ''. + + Returns + ------- + signal_headers : list of dict + returns n signal headers as a list to save several signal headers. + + """ + signal_headers = [] + for label in list_of_labels: + header = make_signal_header(label, dimension=dimension, sample_rate=sample_rate, + physical_min=physical_min, physical_max=physical_max, + digital_min=digital_min, digital_max=digital_max, + transducer=transducer, prefiler=prefiler) + signal_headers.append(header) + return signal_headers + + +def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=True): + """ + Convenience function for reading EDF+/BDF data with pyedflib. + + Will load the edf and return the signals, the headers of the signals + and the header of the EDF. If all signals have the same sample frequency + will return a numpy array, else a list with the individual signals + + + Parameters + ---------- + edf_file : str + link to an edf file. + ch_nrs : list of int, optional + The indices of the channels to read. The default is None. + ch_names : list of str, optional + The names of channels to read. The default is None. + digital : bool, optional + will return the signals as digital values (ADC). The default is False. + verbose : bool, optional + DESCRIPTION. The default is True. + + Returns + ------- + signals : np.ndarray or list + the signals of the chosen channels contained in the EDF. + signal_headers : list + one signal header for each channel in the EDF. + header : dict + the main header of the EDF file containing meta information. + + """ + assert os.path.exists(edf_file), 'file {} does not exist'.format(edf_file) + assert (ch_nrs is None) or (ch_names is None), \ + 'names xor numbers should be supplied' + if ch_nrs is not None and not isinstance(ch_nrs, list): ch_nrs = [ch_nrs] + if ch_names is not None and \ + not isinstance(ch_names, list): ch_names = [ch_names] + + with pyedflib.EdfReader(edf_file) as f: + # see which channels we want to load + available_chs = [ch.upper() for ch in f.getSignalLabels()] + n_chrs = f.signals_in_file + + # find out which number corresponds to which channel + if ch_names is not None: + ch_nrs = [] + for ch in ch_names: + if not ch.upper() in available_chs: + warnings.warn('{} is not in source file (contains {})'\ + .format(ch, available_chs)) + print('will be ignored.') + else: + ch_nrs.append(available_chs.index(ch.upper())) + + # if there ch_nrs is not given, load all channels + + if ch_nrs is None: # no numbers means we load all + ch_nrs = range(n_chrs) + + # convert negative numbers into positives + ch_nrs = [n_chrs+ch if ch<0 else ch for ch in ch_nrs] + + # load headers, signal information and + header = f.getHeader() + signal_headers = [f.getSignalHeaders()[c] for c in ch_nrs] + + # add annotations to header + annotations = f.read_annotation() + annotations = [[float(t)/10000000, d if d else -1, x.decode()] for t,d,x in annotations] + header['annotations'] = annotations + signals = [] + for i,c in enumerate(tqdm(ch_nrs, desc='Reading Channels', + disable=not verbose)): + signal = f.readSignal(c, digital=digital) + signals.append(signal) + + # we can only return a np.array if all signals have the same samplefreq + sfreqs = [shead['sample_rate'] for shead in signal_headers] + all_sfreq_same = sfreqs[1:]==sfreqs[:-1] + if all_sfreq_same: + dtype = np.int if digital else np.float + signals = np.array(signals, dtype=dtype) + elif verbose: + warnings.warn('Not all sampling frequencies are the same ({}). '\ + .format(sfreqs)) + assert len(signals)==len(signal_headers), 'Something went wrong, lengths'\ + ' of headers is not length of signals' + del f + return signals, signal_headers, header + + +def write_edf(edf_file, signals, signal_headers, header, digital=False): + """ + Write signals to an edf_file. Header can be generated on the fly with + generic values. + + Parameters + ---------- + edf_file : np.ndarray or list + where to save the EDF file + signals : list + The signals as a list of arrays or a ndarray. + + signal_headers : list of dict + a list with one signal header(dict) for each signal. + See pyedflib.EdfWriter.setSignalHeader.. + header : dict + a main header (dict) for the EDF file, see + pyedflib.EdfWriter.setHeader for details. + digital : bool, optional + whether the signals are in digital format (ADC). The default is False. + + Returns + ------- + bool + True if successful, False if failed. + """ + assert header is None or isinstance(header, dict), \ + 'header must be dictioniary' + assert isinstance(signal_headers, list), \ + 'signal headers must be list' + assert len(signal_headers)==len(signals), \ + 'signals and signal_headers must be same length' + + n_channels = len(signals) + + default_header = make_header() + default_header.update(header) + header = default_header + + annotations = header.get('annotations', '') + print(annotations) + + with pyedflib.EdfWriter(edf_file, n_channels=n_channels) as f: + f.setSignalHeaders(signal_headers) + f.setHeader(header) + f.writeSamples(signals, digital=digital) + for annotation in annotations: + f.writeAnnotation(*annotation) + del f + + return os.path.isfile(edf_file) + + +def write_edf_quick(edf_file, signals, sfreq, digital=False): + """ + wrapper for write_pyedf without creating headers. + Use this if you don't care about headers or channel names and just + want to dump some signals with the same sampling freq. to an edf + + Parameters + ---------- + edf_file : str + where to store the data/edf. + signals : np.ndarray + The signals you want to store as numpy array. + sfreq : int + the sampling frequency of the signals. + digital : bool, optional + if the data is present digitally (int) or as mV/uV.The default is False. + + Returns + ------- + bool + True if successful, else False or raise Error. + + """ + labels = ['CH_{}'.format(i) for i in range(len(signals))] + signal_headers = make_signal_headers(labels, sample_rate = sfreq) + return write_edf(edf_file, signals, signal_headers, digital=digital) + + +def read_edf_header(edf_file): + """ + Reads the header and signal headers of an EDF file and it's annotations + + Parameters + ---------- + edf_file : str + EDF/BDF file to read. + + Returns + ------- + summary : dict + header of the edf file as dictionary. + + """ + assert os.path.isfile(edf_file), 'file {} does not exist'.format(edf_file) + with pyedflib.EdfReader(edf_file) as f: + annotations = f.read_annotation() + annotations = [[t//10000000, d if d else -1, x] for t,d,x in annotations] + summary = f.getHeader() + summary['Duration'] = f.getFileDuration + summary['SignalHeaders'] = f.getSignalHeaders() + summary['channels'] = f.getSignalLabels() + summary['annotations'] = annotations + del f + return summary + + +def compare_edf(edf_file1, edf_file2, verbose=True): + """ + Loads two edf files and checks whether the values contained in + them are the same. Does not check the header or annotations data. + + Mainly to verify that other options (eg anonymization) produce the + same EDF file. + + Parameters + ---------- + edf_file1 : str + edf file 1 to compare. + edf_file2 : str + edf file 2 to compare. + verbose : bool, optional + print progress or not. The default is True. + + Returns + ------- + bool + True if signals are equal, else raises error. + """ + if verbose: print('verifying data') + + signals1, shead1, _ = read_edf(edf_file1, digital=True, verbose=verbose, + return_list=True) + signals2, shead2, _ = read_edf(edf_file2, digital=True, verbose=verbose, + return_list=True) + + for i, sigs in enumerate(zip(signals1, signals2)): + s1, s2 = sigs + if np.array_equal(s1, s2): continue # early stopping + s1 = np.abs(s1) + s2 = np.abs(s2) + if np.array_equal(s1, s2): continue # early stopping + close = np.mean(np.isclose(s1, s2)) + assert close>0.99, 'Error, digital values of {}'\ + ' and {} for ch {}: {} are not the same: {:.3f}'.format( + edf_file1, edf_file2, shead1[i]['label'], + shead2[i]['label'], close) + + dmin1, dmax1 = shead1[i]['digital_min'], shead1[i]['digital_max'] + pmin1, pmax1 = shead1[i]['physical_min'], shead1[i]['physical_max'] + dmin2, dmax2 = shead2[i]['digital_min'], shead2[i]['digital_max'] + pmin2, pmax2 = shead2[i]['physical_min'], shead2[i]['physical_max'] + + for i, sigs in enumerate(zip(signals1, signals2)): + s1, s2 = sigs + + # convert to physical values, no need to load all data again + s1 = dig2phys(s1, dmin1, dmax1, pmin1, pmax1) + s2 = dig2phys(s2, dmin2, dmax2, pmin2, pmax2) + + # now we can remove the signals from the list to save memory + signals1[i] = None + signals2[i] = None + + # compare absolutes in case of inverted signals + if np.array_equal(s1, s2): continue # early stopping + s1 = np.abs(s1) + s2 = np.abs(s2) + if np.array_equal(s1, s2): continue # early stopping + min_dist = np.abs(dig2phys(1, dmin1, dmax1, pmin1, pmax1)) + close = np.mean(np.isclose(s1, s2, atol=min_dist)) + assert close>0.99, 'Error, physical values of {}'\ + ' and {} for ch {}: {} are not the same: {:.3f}'.format( + edf_file1, edf_file2, shead1[i]['label'], + shead2[i]['label'], close) + return True + + +def drop_channels(edf_source, edf_target=None, to_keep=None, to_drop=None): + """ + Remove channels from an edf file. Save the file. + For safety reasons, no source files can be overwritten. + + Parameters + ---------- + edf_source : str + The source edf file from which to drop channels. + edf_target : str, optional + Where to save the file.If None, will be edf_source+'dropped.edf'. + The default is None. + to_keep : list, optional + A list of channel names or indices that will be kept. + Strings will always be interpreted as channel names. + 'to_keep' will overwrite any droppings proposed by to_drop. + The default is None. + to_drop : list, optional + A list of channel names/indices that should be dropped. + Strings will be interpreted as channel names. The default is None. + + Returns + ------- + edf_target : str + the target filename with the dropped channels. + + """ + + # convert to list if necessary + if isinstance(to_keep, (int, str)): to_keep = [to_keep] + if isinstance(to_drop, (int, str)): to_drop = [to_drop] + + # check all parameters are good + assert to_keep is None or to_drop is None,'Supply only to_keep xor to_drop' + if to_keep is not None: + assert all([isinstance(ch, (str, int)) for ch in to_keep]),\ + 'channels must be int or string' + if to_drop is not None: + assert all([isinstance(ch, (str, int)) for ch in to_drop]),\ + 'channels must be int or string' + assert os.path.exists(edf_source), 'source file {} does not exist'\ + .format(edf_source) + assert edf_source!=edf_target, 'For safet, target must not be source file.' + + if edf_target is None: + edf_target = os.path.splitext(edf_source)[0] + '_dropped.edf' + if os.path.exists(edf_target): + warnings.warn('Target file will be overwritten') + + ch_names = read_edf_header(edf_source)['channels'] + # convert to all lowercase for compatibility + ch_names = [ch.lower() for ch in ch_names] + ch_nrs = list(range(len(ch_names))) + + if to_keep is not None: + for i,ch in enumerate(to_keep): + if isinstance(ch,str): + ch_idx = ch_names.index(ch.lower()) + to_keep[i] = ch_idx + load_channels = to_keep.copy() + elif to_drop is not None: + for i,ch in enumerate(to_drop): + if isinstance(ch,str): + ch_idx = ch_names.index(ch.lower()) + to_drop[i] = ch_idx + to_drop = [len(ch_nrs)+ch if ch<0 else ch for ch in to_drop] + + [ch_nrs.remove(ch) for ch in to_drop] + load_channels = ch_nrs.copy() + else: + raise ValueError + + signals, signal_headers, header = read_edf(edf_source, + ch_nrs=load_channels, + digital=True) + + write_edf(edf_target, signals, signal_headers, header, digital=True) + return edf_target + + +def anonymize_edf(edf_file, new_file=None, + to_remove = ['patientname', 'birthdate'], + new_values = ['xxx', ''], verify=False): + """ + Anonymizes an EDF file, that means it strips all header information + that is patient specific, ie. birthdate and patientname as well as XXX + + Parameters + ---------- + edf_file : str + a string with a filename of an EDF/BDF. + new_file : str, optional + a string with the new filename of an EDF/BDF. The default is None. + to_remove : list of str, optional + a list of attributes to remove from the file. + The default is ['patientname', 'birthdate']. + new_values : list of str, optional + a list of values that should be given instead to the edf. + Each to_remove value must have one new_value. + The default is ['xxx', '']. + verify : bool + compare the two edf files for equality (double check values are same) + + Returns + ------- + bool + True if successful, False if failed. + """ + + assert len(to_remove)==len(new_values), \ + 'Each to_remove must have one new_value' + header = read_edf_header(edf_file) + + for new_val, attr in zip(new_values, to_remove): + header[attr] = new_val + + if new_file is None: + file, ext = os.path.splitext(edf_file) + new_file = file + '_anonymized' + ext + n_chs = len(header['channels']) + signal_headers = [] + signals = [] + for ch_nr in tqdm(range(n_chs)): + signal, signal_header, _ = read_edf(edf_file, digital=True, + ch_nrs=ch_nr, verbose=False) + signal_headers.append(signal_header[0]) + signals.append(signal.squeeze()) + if verify: + compare_edf(edf_file, new_file) + return write_edf(new_file, signals, signal_headers, header, digital=True) + + + +def rename_channels(edf_file, mapping, new_file=None): + """ + A convenience function to rename channels in an EDF file. + + Parameters + ---------- + edf_file : str + an string pointing to an edf file. + mapping : dict + a dictionary with channel mappings as key:value. + eg: {'M1-O2':'A1-O2'} + new_file : str, optional + the new filename. If None will be edf_file + '_renamed' + The default is None. + + Returns + ------- + bool + True if successful, False if failed. + + """ + header = read_edf_header(edf_file) + channels = header['channels'] + if new_file is None: + file, ext = os.path.splitext(edf_file) + new_file = file + '_renamed' + ext + + signal_headers = [] + signals = [] + for ch_nr in tqdm(range(len(channels))): + signal, signal_header, _ = read_edf(file, digital=True, + ch_nrs=ch_nr, verbose=False) + ch = signal_header[0]['label'] + if ch in mapping : + print('{} to {}'.format(ch, mapping[ch])) + ch = mapping[ch] + signal_header[0]['label']=ch + else: + print('no mapping for {}, leave as it is'.format(ch)) + signal_headers.append(signal_header[0]) + signals.append(signal.squeeze()) + + return write_edf(new_file, signals, signal_headers, header, digital=True) + + +def change_polarity(edf_file, channels, new_file=None, verify=True, verbose=True): + """ + Change polarity of certain channels + + Parameters + ---------- + edf_file : str + from which file to change polarity. + channels : list of int + the indices of the channels. + new_file : str, optional + where to save the edf with inverted channels. The default is None. + verify : bool, optional + whether to verify the two edfs for similarity. The default is True. + verbose : str, optional + print progress or not. The default is True. + + Returns + ------- + bool + True if success. + + """ + + if new_file is None: + new_file = os.path.splitext(edf_file)[0] + '.edf' + + if isinstance(channels, str): channels=[channels] + channels = [c.lower() for c in channels] + + signals, signal_headers, header = read_edf(edf_file, digital=True, verbose=verbose) + for i,sig in enumerate(signals): + label = signal_headers[i]['label'].lower() + if label in channels: + if verbose: print('inverting {}'.format(label)) + signals[i] = -sig + write_edf(new_file, signals, signal_headers, header, digital=True, correct=False) + if verify: compare_edf(edf_file, new_file) + return True diff --git a/pyedflib/tests/test_edfwriter.py b/pyedflib/tests/test_edfwriter.py index d8a4732..adb9651 100644 --- a/pyedflib/tests/test_edfwriter.py +++ b/pyedflib/tests/test_edfwriter.py @@ -25,11 +25,11 @@ def test_EdfWriter_BDFplus(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} channel_info2 = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 100, - 'physical_max': 1.0, 'physical_min': -1.0, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, - file_type=pyedflib.FILETYPE_BDFPLUS) + file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info1) f.setSignalHeader(1,channel_info2) f.setTechnician('tec1') @@ -90,11 +90,11 @@ def test_EdfWriter_BDFplus2(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} channel_info2 = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 100, - 'physical_max': 1.0, 'physical_min': -1.0, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, - file_type=pyedflib.FILETYPE_BDFPLUS) + file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info1) f.setSignalHeader(1,channel_info2) f.setTechnician('tec1') @@ -152,7 +152,7 @@ def test_EdfWriter_BDF(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.bdf_data_file, 2, - file_type=pyedflib.FILETYPE_BDF) + file_type=pyedflib.FILETYPE_BDF) f.setSignalHeader(0,channel_info1) f.setSignalHeader(1,channel_info2) @@ -178,7 +178,7 @@ def test_EdfWriter_EDFplus(self): 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.edfplus_data_file, 1, - file_type=pyedflib.FILETYPE_EDFPLUS) + file_type=pyedflib.FILETYPE_EDFPLUS) header = {'technician': 'tec1', 'recording_additional': 'recAdd1', 'patientname': 'pat1', 'patient_additional': 'patAdd1', 'patientcode': 'code1', 'equipment': 'eq1', @@ -215,11 +215,11 @@ def test_EdfWriter_EDF(self): 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} channel_info2 = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 100, - 'physical_max': 1.0, 'physical_min': -1.0, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.edf_data_file, 2, - file_type=pyedflib.FILETYPE_EDF) + file_type=pyedflib.FILETYPE_EDF) f.setSignalHeader(0,channel_info1) f.setSignalHeader(1,channel_info2) data = np.ones(100) * 0.1 @@ -238,13 +238,13 @@ def test_EdfWriter_EDF(self): def test_SampleWriting(self): channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre1','transducer':'trans1'} channel_info2 = {'label':'test_label2', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre2','transducer':'trans2'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre2','transducer':'trans2'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info1) @@ -273,7 +273,7 @@ def test_EdfWriter_EDF_contextmanager(self): 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} channel_info2 = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 100, - 'physical_max': 1.0, 'physical_min': -1.0, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} with pyedflib.EdfWriter(self.edf_data_file, 2, file_type=pyedflib.FILETYPE_EDF) as f: @@ -292,13 +292,13 @@ def test_EdfWriter_EDF_contextmanager(self): def test_SampleWritingContextManager(self): channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre1','transducer':'trans1'} channel_info2 = {'label':'test_label2', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre2','transducer':'trans2'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre2','transducer':'trans2'} with pyedflib.EdfWriter(self.bdfplus_data_file, 2, file_type=pyedflib.FILETYPE_BDFPLUS) as f: @@ -326,13 +326,13 @@ def test_SampleWritingContextManager(self): def test_SampleWriting2(self): channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre1','transducer':'trans1'} channel_info2 = {'label':'test_label2', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre2','transducer':'trans2'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre2','transducer':'trans2'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info1) @@ -360,13 +360,13 @@ def test_SampleWriting_digital(self): dmin, dmax = [0, 1024] pmin, pmax = [0, 1.0] channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':pmax,'physical_min':pmin, - 'digital_max':dmax,'digital_min':dmin, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':pmax,'physical_min':pmin, + 'digital_max':dmax,'digital_min':dmin, + 'prefilter':'pre1','transducer':'trans1'} channel_info2 = {'label':'test_label2', 'dimension':'mV', 'sample_rate':100, - 'physical_max':pmax,'physical_min':pmin, - 'digital_max':dmax,'digital_min':dmin, - 'prefilter':'pre2','transducer':'trans2'} + 'physical_max':pmax,'physical_min':pmin, + 'digital_max':dmax,'digital_min':dmin, + 'prefilter':'pre2','transducer':'trans2'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, @@ -409,9 +409,9 @@ def test_SampleWriting_digital(self): def test_TestRoundingEDF(self): channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':32767,'digital_min':-32768, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':32767,'digital_min':-32768, + 'prefilter':'pre1','transducer':'trans1'} f = pyedflib.EdfWriter(self.edfplus_data_file, 1, file_type=pyedflib.FILETYPE_EDFPLUS) f.setSignalHeader(0,channel_info1) @@ -430,7 +430,7 @@ def test_TestRoundingEDF(self): np.testing.assert_almost_equal(data1, data1_read,decimal=4) f = pyedflib.EdfWriter(self.edfplus_data_file, 1, - file_type=pyedflib.FILETYPE_EDFPLUS) + file_type=pyedflib.FILETYPE_EDFPLUS) f.setSignalHeader(0,channel_info1) data_list = [] @@ -485,7 +485,7 @@ def test_AnnotationWritingUTF8(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': u'test', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.bdf_data_file, 1, - file_type=pyedflib.FILETYPE_BDFPLUS) + file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info) data = np.ones(100) * 0.1 f.writePhysicalSamples(data) @@ -516,7 +516,7 @@ def test_BytesChars(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': b' ', 'transducer': b'trans1'} f = pyedflib.EdfWriter(self.bdf_data_file, 1, - file_type=pyedflib.FILETYPE_BDFPLUS) + file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info) data = np.ones(100) * 0.1 f.writePhysicalSamples(data) diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py new file mode 100644 index 0000000..dfd77e8 --- /dev/null +++ b/pyedflib/tests/test_highlevel.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2015 Holger Nahrstaedt +from __future__ import division, print_function, absolute_import + +import os, sys +import numpy as np +# from numpy.testing import (assert_raises, run_module_suite, +# assert_equal, assert_allclose, assert_almost_equal) +import unittest +from pyedflib import highlevel +from datetime import datetime, date + + +class TestEdfWriter(unittest.TestCase): + def setUp(self): + data_dir = os.path.join(os.path.dirname(__file__), 'data') + self.edfplus_data_file = os.path.join(data_dir, 'tmp_test_file_plus.edf') + + def test_read_write_edf(self): + startdate = datetime.now() + t = startdate + startdate = datetime(t.year,t.month,t.day,t.hour, t.minute,t.second) + + header = highlevel.make_header(technician='tech', recording_additional='radd', + patientname='name', patient_additional='padd', + patientcode='42', equipment='eeg', admincode='420', + gender='Male', startdate=startdate,birthdate='05.09.1980') + annotations = [[0.01, -1, 'begin'],[0.5, -1, 'middle'],[10, -1, 'end']] + header['annotations'] = annotations + signal_headers1 = highlevel.make_signal_headers(['ch'+str(i) for i in range(5)]) + signals = np.random.rand(5, 256*300)*200 #5 minutes of eeg + + success = highlevel.write_edf(self.edfplus_data_file, signals, signal_headers1, header) + self.assertTrue(os.path.isfile(self.edfplus_data_file)) + self.assertGreater(os.path.getsize(self.edfplus_data_file), 0) + self.assertTrue(success) + + signals2, signal_headers2, header2 = highlevel.read_edf(self.edfplus_data_file) + + self.assertEqual(len(signals2), 5) + self.assertEqual(len(signals2), len(signal_headers2)) + for shead1, shead2 in zip(signal_headers1, signal_headers2): + self.assertDictEqual(shead1, shead2) + + self.assertDictEqual(header, header2) + np.testing.assert_allclose(signals, signals2, atol=0.01) + + signals = (signals*100).astype(np.int8) + success = highlevel.write_edf(self.edfplus_data_file, signals, signal_headers1, header, digital=True) + self.assertTrue(os.path.isfile(self.edfplus_data_file)) + self.assertGreater(os.path.getsize(self.edfplus_data_file), 0) + self.assertTrue(success) + + signals2, signal_headers2, header2 = highlevel.read_edf(self.edfplus_data_file, digital=True) + + self.assertEqual(len(signals2), 5) + self.assertEqual(len(signals2), len(signal_headers2)) + for shead1, shead2 in zip(signal_headers1, signal_headers2): + self.assertDictEqual(shead1, shead2) + + self.assertDictEqual(header, header2) + np.testing.assert_array_equal(signals, signals2) + + + # def test_annotations_accuracy(self): + + + +if __name__ == '__main__': + # run_module_suite(argv=sys.argv) + unittest.main() diff --git a/requirements-test.txt b/requirements-test.txt index 3477888..e3999b6 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -4,4 +4,6 @@ coverage cython numpy matplotlib -pytest \ No newline at end of file +pytest +dateparser +tqdm \ No newline at end of file