From 3ee7a94214d7f6fbd22607436076527ee0eb2e36 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 17 Oct 2018 14:27:29 +0200 Subject: [PATCH 01/24] Update test_edfwriter.py --- pyedflib/tests/test_edfwriter.py | 51 ++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/pyedflib/tests/test_edfwriter.py b/pyedflib/tests/test_edfwriter.py index bc234f1..b38893e 100644 --- a/pyedflib/tests/test_edfwriter.py +++ b/pyedflib/tests/test_edfwriter.py @@ -297,6 +297,57 @@ def test_SampleWriting2(self): np.testing.assert_equal(len(data2), len(data2_read)) np.testing.assert_almost_equal(data1, data1_read) np.testing.assert_almost_equal(data2, data2_read) + + def test_SampleWriting_digital(self): + + dmin, dmax = [0, 1024] + pmin, pmax = [0, 1.0] + channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, + 'physical_max':pmax,'physical_min':pmin, + 'digital_max':dmax,'digital_min':dmin, + 'prefilter':'pre1','transducer':'trans1'} + channel_info2 = {'label':'test_label2', 'dimension':'mV', 'sample_rate':100, + 'physical_max':pmax,'physical_min':pmin, + 'digital_max':dmax,'digital_min':dmin, + 'prefilter':'pre2','transducer':'trans2'} + + + f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, + file_type=pyedflib.FILETYPE_EDFPLUS) + f.setSignalHeader(0,channel_info1) + f.setSignalHeader(1,channel_info2) + + data1 = np.arange(500, dtype=np.float) + data2 = np.arange(500, dtype=np.float) + data_list = [] + data_list.append(data1) + data_list.append(data2) + with np.testing.assert_raises(TypeError): + f.writeSamples(data_list, digital=True) + del f + + f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, + file_type=pyedflib.FILETYPE_EDFPLUS) + f.setSignalHeader(0,channel_info1) + f.setSignalHeader(1,channel_info2) + + data1 = np.arange(500, dtype=np.int) + data2 = np.arange(500, dtype=np.int) + data_list = [] + data_list.append(data1) + data_list.append(data2) + f.writeSamples(data_list, digital=True) + del f + + f = pyedflib.EdfReader(self.bdfplus_data_file) + data1_read = (f.readSignal(0) - pmin)/((pmax-pmin)/(dmax-dmin)) # converting back to digital + data2_read = (f.readSignal(1) - pmin)/((pmax-pmin)/(dmax-dmin)) # converting back to digital + del f + + np.testing.assert_equal(len(data1), len(data1_read)) + np.testing.assert_equal(len(data2), len(data2_read)) + np.testing.assert_almost_equal(data1, data1_read) + np.testing.assert_almost_equal(data2, data2_read) def test_TestRoundingEDF(self): channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, From cff49a81fc12bbbad692a2c7e19626c69959cb0a Mon Sep 17 00:00:00 2001 From: Simon Kern Date: Tue, 7 Jan 2020 14:18:45 +0100 Subject: [PATCH 02/24] added some high level functions for convenience --- pyedflib/highlevel.py | 328 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 328 insertions(+) create mode 100644 pyedflib/highlevel.py diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py new file mode 100644 index 0000000..e9a401f --- /dev/null +++ b/pyedflib/highlevel.py @@ -0,0 +1,328 @@ + +# -*- coding: utf-8 -*- +# Copyright (c) 2015 - 2017 Holger Nahrstaedt +# Copyright (c) 2011, 2015, Chris Lee-Messer +# Copyright (c) 2016-2017 The pyedflib Developers +# +# See LICENSE for license details. +""" +Created on Tue Jan 7 12:13:47 2020 + +@author: skjerns +""" + +import os +import numpy as np +import warnings +import pyedflib +from tqdm import tqdm +# from . import EdfWriter +# from . import EdfReader + +def make_header(technician='', recording_additional='', patientname='', + patient_additional='', patientcode= '', equipment= '', + admincode= '', gender= '', startdate=None, birthdate= ''): + """ + A convenience function to create an EDF header (a dictionary) that + can be used by pyedflib to update the main header of the EDF + """ + if not( startdate is None or isinstance(startdate, datetime)): + warnings.warn('must be datetime or None, is {}: {},attempting convert'\ + .format(type(startdate), startdate)) + startdate = dateparser.parse(startdate) + if not (birthdate is '' or isinstance(birthdate, (datetime,str))): + warnings.warn('must be datetime or empty, is {}, {}'\ + .format(type(birthdate), birthdate)) + birthdate = dateparser.parse(birthdate) + if startdate is None: + now = datetime.now() + startdate = datetime(now.year, now.month, now.day, + now.hour, now.minute, now.second) + del now + if isinstance(birthdate, datetime): + birthdate = birthdate.strftime('%d %b %Y') + local = locals() + header = {} + for var in local: + if isinstance(local[var], datetime): + header[var] = local[var] + else: + header[var] = str(local[var]) + return header + +def make_signal_header(label, dimension='uV', sample_rate=256, + physical_min=-200, physical_max=200, digital_min=-32768, + digital_max=32767, transducer='', prefiler=''): + """ + A convenience function that creates a signal header for a given signal. + This can be used to create a list of signal headers that is used by + pyedflib to create an edf. With this, different sampling frequencies + can be indicated. + + :param label: the name of the channel + """ + signal_header = {'label': label, + 'dimension': dimension, + 'sample_rate': sample_rate, + 'physical_min': physical_min, + 'physical_max': physical_max, + 'digital_min': digital_min, + 'digital_max': digital_max, + 'transducer': transducer, + 'prefilter': prefiler} + return signal_header + + +def make_signal_headers(list_of_labels, dimension='uV', sample_rate=256, + physical_min=-200, physical_max=200, digital_min=-32768, + digital_max=32767, transducer='', prefiler=''): + """ + A function that creates signal headers for a given list of channel labels. + This can only be used if each channel has the same sampling frequency + + :param list_of_labels: A list with labels for each channel. + :returns: A dictionary that can be used by pyedflib to update the header + """ + signal_headers = [] + for label in list_of_labels: + header = make_signal_header(label, dimension=dimension, sample_rate=sample_rate, + physical_min=physical_min, physical_max=physical_max, + digital_min=digital_min, digital_max=digital_max, + transducer=transducer, prefiler=prefiler) + signal_headers.append(header) + return signal_headers + + +def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=True): + """ + Reading EDF+/BDF data with pyedflib. + + Will load the edf and return the signals, the headers of the signals + and the header of the EDF. If all signals have the same sample frequency + will return a numpy array, else a list with the individual signals + + :param edf_file: link to an edf file + :param ch_nrs: The numbers of channels to read (optional) + :param ch_names: The names of channels to read (optional) + :returns: signals, signal_headers, header + """ + assert os.path.exists(edf_file), 'file {} does not exist'.format(edf_file) + assert (ch_nrs is None) or (ch_names is None), \ + 'names xor numbers should be supplied' + if ch_nrs is not None and not isinstance(ch_nrs, list): ch_nrs = [ch_nrs] + if ch_names is not None and \ + not isinstance(ch_names, list): ch_names = [ch_names] + + with pyedflib.EdfReader(edf_file) as f: + # see which channels we want to load + available_chs = [ch.upper() for ch in f.getSignalLabels()] + n_chrs = f.signals_in_file + + # find out which number corresponds to which channel + if ch_names is not None: + ch_nrs = [] + for ch in ch_names: + if not ch.upper() in available_chs: + warnings.warn('{} is not in source file (contains {})'\ + .format(ch, available_chs)) + print('will be ignored.') + else: + ch_nrs.append(available_chs.index(ch.upper())) + + # if there ch_nrs is not given, load all channels + + if ch_nrs is None: # no numbers means we load all + ch_nrs = range(n_chrs) + + # convert negative numbers into positives + ch_nrs = [n_chrs+ch if ch<0 else ch for ch in ch_nrs] + + # load headers, signal information and + header = f.getHeader() + signal_headers = [f.getSignalHeaders()[c] for c in ch_nrs] + + signals = [] + for i,c in enumerate(tqdm(ch_nrs, desc='Reading Channels', + disable=not verbose)): + signal = f.readSignal(c, digital=digital) + signals.append(signal) + + # we can only return a np.array if all signals have the same samplefreq + sfreqs = [header['sample_rate'] for header in signal_headers] + all_sfreq_same = sfreqs[1:]==sfreqs[:-1] + if all_sfreq_same: + dtype = np.int if digital else np.float + signals = np.array(signals, dtype=dtype) + elif verbose: + warnings.warn('Not all sampling frequencies are the same ({}). '\ + .format(sfreqs)) + assert len(signals)==len(signal_headers), 'Something went wrong, lengths'\ + ' of headers is not length of signals' + return signals, signal_headers, header + + +def write_edf(edf_file, signals, signal_headers, header, digital=False): + """ + Write signals to an edf_file. Header can be generated on the fly. + + :param signals: The signals as a list of arrays or a ndarray + :param signal_headers: a list with one signal header(dict) for each signal. + See pyedflib.EdfWriter.setSignalHeader + :param header: a main header (dict) for the EDF file, see + pyedflib.EdfWriter.setHeader for details + :param digital: whether signals are presented digitally + or in physical values + + :returns: True if successful, False if failed + """ + assert header is None or isinstance(header, dict), \ + 'header must be dictioniary' + assert isinstance(signal_headers, list), \ + 'signal headers must be list' + assert len(signal_headers)==len(signals), \ + 'signals and signal_headers must be same length' + + n_channels = len(signals) + + with pyedflib.EdfWriter(edf_file, n_channels=n_channels) as f: + f.setSignalHeaders(signal_headers) + f.setHeader(header) + f.writeSamples(signals, digital=digital) + + return os.path.isfile(edf_file) + + +def write_edf_quick(edf_file, signals, sfreq, digital=False): + """ + wrapper for write_pyedf without creating headers. + Use this if you don't care about headers or channel names and just + want to dump some signals with the same sampling freq. to an edf + + :param edf_file: where to store the data/edf + :param signals: The signals you want to store as numpy array + :param sfreq: the sampling frequency of the signals + :param digital: if the data is present digitally (int) or as mV/uV + """ + labels = ['CH_{}'.format(i) for i in range(len(signals))] + signal_headers = make_signal_headers(labels, sample_rate = sfreq) + return write_pyedf(edf_file, signals, signal_headers, digital=digital) + + +def read_edf_header(edf_file): + """ + Reads the header and signal headers of an EDF file + + :returns: header of the edf file (dict) + """ + assert os.path.isfile(edf_file), 'file {} does not exist'.format(edf_file) + with pyedflib.EdfReader(edf_file) as f: + summary = f.getHeader() + summary['Duration'] = f.getFileDuration + summary['SignalHeaders'] = f.getSignalHeaders() + summary['channels'] = f.getSignalLabels() + del f + return summary + + + +def drop_channels(edf_source, edf_target=None, to_keep=None, to_drop=None): + """ + Remove channels from an edf file using pyedflib. + Save the file as edf_target. + For safety reasons, no source files can be overwritten. + + :param edf_source: The source edf file + :param edf_target: Where to save the file. + If None, will be edf_source+'dropped.edf' + :param to_keep: A list of channel names or indices that will be kept. + Strings will always be interpreted as channel names. + 'to_keep' will overwrite any droppings proposed by to_drop + :param to_drop: A list of channel names/indices that should be dropped. + Strings will be interpreted as channel names. + :returns: the target filename with the dropped channels + """ + # convert to list if necessary + if isinstance(to_keep, (int, str)): to_keep = [to_keep] + if isinstance(to_drop, (int, str)): to_drop = [to_drop] + + # check all parameters are good + assert to_keep is None or to_drop is None,'Supply only to_keep xor to_drop' + if to_keep is not None: + assert all([isinstance(ch, (str, int)) for ch in to_keep]),\ + 'channels must be int or string' + if to_drop is not None: + assert all([isinstance(ch, (str, int)) for ch in to_drop]),\ + 'channels must be int or string' + assert os.path.exists(edf_source), 'source file {} does not exist'\ + .format(edf_source) + assert edf_source!=edf_target, 'For safet, target must not be source file.' + + if edf_target is None: + edf_target = os.path.splitext(edf_source)[0] + '_dropped.edf' + if os.path.exists(edf_target): + warnings.warn('Target file will be overwritten') + + ch_names = read_edf_header(edf_source)['channels'] + # convert to all lowercase for compatibility + ch_names = [ch.lower() for ch in ch_names] + ch_nrs = list(range(len(ch_names))) + + if to_keep is not None: + for i,ch in enumerate(to_keep): + if isinstance(ch,str): + ch_idx = ch_names.index(ch.lower()) + to_keep[i] = ch_idx + load_channels = to_keep.copy() + elif to_drop is not None: + for i,ch in enumerate(to_drop): + if isinstance(ch,str): + ch_idx = ch_names.index(ch.lower()) + to_drop[i] = ch_idx + to_drop = [len(ch_nrs)+ch if ch<0 else ch for ch in to_drop] + + [ch_nrs.remove(ch) for ch in to_drop] + load_channels = ch_nrs.copy() + else: + raise ValueError + + signals, signal_headers, header = read_edf(edf_source, + ch_nrs=load_channels, + digital=True) + + write_edf(edf_target, signals, signal_headers, header, digital=True) + return edf_target + + +def anonymize_edf(edf_file, new_file=None, + to_remove = ['patientname', 'birthdate'], + new_values = ['xxx', '']): + """ + Anonymizes an EDF file, that means it strips all header information + that is patient specific, ie. birthdate and patientname as well as XXX + + :param edf_file: a string with a filename of an EDF/BDF + :param new_file: where to save the anonymized edf file + :param to_remove: a list of attributes to remove from the file + :param new_values: a list of values that should be given instead to the edf + :returns: True if successful, False if failed + """ + assert len(to_remove)==len(new_values), \ + 'Each to_remove must have one new_value' + header = read_edf_header(edf_file) + + for new_val, attr in zip(new_values, to_remove): + header[attr] = new_val + + if new_file is None: + file, ext = os.path.splitext(edf_file) + new_file = file + '_anonymized' + ext + n_chs = len(header['channels']) + signal_headers = [] + signals = [] + for ch_nr in tqdm(range(n_chs)): + signal, signal_header, _ = read_edf(edf_file, digital=True, + ch_nrs=ch_nr, verbose=False) + signal_headers.append(signal_header[0]) + signals.append(signal.squeeze()) + + return write_edf(new_file, signals, signal_headers, header,digital=True) From 9a502cc5fd256cfa892875e0d773042c97f0ee4b Mon Sep 17 00:00:00 2001 From: Simon Kern Date: Tue, 7 Jan 2020 14:26:38 +0100 Subject: [PATCH 03/24] added rename function --- pyedflib/highlevel.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index e9a401f..7cfaf18 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -205,7 +205,7 @@ def write_edf_quick(edf_file, signals, sfreq, digital=False): """ labels = ['CH_{}'.format(i) for i in range(len(signals))] signal_headers = make_signal_headers(labels, sample_rate = sfreq) - return write_pyedf(edf_file, signals, signal_headers, digital=digital) + return write_edf(edf_file, signals, signal_headers, digital=digital) def read_edf_header(edf_file): @@ -326,3 +326,36 @@ def anonymize_edf(edf_file, new_file=None, signals.append(signal.squeeze()) return write_edf(new_file, signals, signal_headers, header,digital=True) + + +def rename_channels(edf_file, mapping, new_file=None): + """ + A convenience function to rename channels in an EDF file. + + :param edf_file: an string pointing to an edf file + :param mapping: a dictionary with channel mappings as key:value + :param new_file: the new filename + """ + header = sleep_utils.read_edf_header(edf_file) + channels = header['channels'] + if new_file is None: + file, ext = os.path.splitext(edf_file) + new_file = file + '_renamed' + ext + + signal_headers = [] + signals = [] + for ch_nr in tqdm(range(len(channels))): + signal, signal_header, _ = read_edf(file, digital=True, + ch_nrs=ch_nr, verbose=False) + ch = signal_header[0]['label'] + if ch in ch_mapping : + print('{} to {}'.format(ch, ch_mapping[ch])) + ch = ch_mapping[ch] + signal_header[0]['label']=ch + else: + print('no mapping for {}, leave as it is'.format(ch)) + signal_headers.append(signal_header[0]) + signals.append(signal.squeeze()) + + write_edf(new_file, signals, signal_headers, header,digital=True) + \ No newline at end of file From 260be26de2ec99d801eb8c143b450db7221a8c2b Mon Sep 17 00:00:00 2001 From: Simon Kern Date: Tue, 7 Jan 2020 14:34:05 +0100 Subject: [PATCH 04/24] add dependencies --- pyedflib/highlevel.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index 7cfaf18..78a4f18 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -15,6 +15,8 @@ import numpy as np import warnings import pyedflib +import dateparser +from datetime import datetime from tqdm import tqdm # from . import EdfWriter # from . import EdfReader @@ -30,7 +32,7 @@ def make_header(technician='', recording_additional='', patientname='', warnings.warn('must be datetime or None, is {}: {},attempting convert'\ .format(type(startdate), startdate)) startdate = dateparser.parse(startdate) - if not (birthdate is '' or isinstance(birthdate, (datetime,str))): + if not (birthdate == '' or isinstance(birthdate, (datetime,str))): warnings.warn('must be datetime or empty, is {}, {}'\ .format(type(birthdate), birthdate)) birthdate = dateparser.parse(birthdate) @@ -336,7 +338,7 @@ def rename_channels(edf_file, mapping, new_file=None): :param mapping: a dictionary with channel mappings as key:value :param new_file: the new filename """ - header = sleep_utils.read_edf_header(edf_file) + header = read_edf_header(edf_file) channels = header['channels'] if new_file is None: file, ext = os.path.splitext(edf_file) @@ -348,9 +350,9 @@ def rename_channels(edf_file, mapping, new_file=None): signal, signal_header, _ = read_edf(file, digital=True, ch_nrs=ch_nr, verbose=False) ch = signal_header[0]['label'] - if ch in ch_mapping : - print('{} to {}'.format(ch, ch_mapping[ch])) - ch = ch_mapping[ch] + if ch in mapping : + print('{} to {}'.format(ch, mapping[ch])) + ch = mapping[ch] signal_header[0]['label']=ch else: print('no mapping for {}, leave as it is'.format(ch)) From c16e4b76b8fe4f9c5b3cf861ab6e78549c798b04 Mon Sep 17 00:00:00 2001 From: Simon Kern Date: Tue, 14 Jan 2020 12:24:54 +0100 Subject: [PATCH 05/24] add dependencies --- requirements-test.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/requirements-test.txt b/requirements-test.txt index 3477888..e3999b6 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -4,4 +4,6 @@ coverage cython numpy matplotlib -pytest \ No newline at end of file +pytest +dateparser +tqdm \ No newline at end of file From faa0d01e17a42ed1e352813c1f83c7ddc61b7a08 Mon Sep 17 00:00:00 2001 From: Simon Kern Date: Tue, 14 Jan 2020 13:55:20 +0100 Subject: [PATCH 06/24] add reqs --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index bd0cdfd..0df58db 100644 --- a/setup.py +++ b/setup.py @@ -283,6 +283,6 @@ def install_for_development(self): libraries=[c_lib], cmdclass={'develop': develop_build_clib}, test_suite='nose.collector', - install_requires=["numpy>=1.9.1"], + install_requires=["numpy>=1.9.1", "dateparser", "tqdm"], ) From b7c84284538b5f0aaac7cb191e63f98200572b28 Mon Sep 17 00:00:00 2001 From: Simon Kern Date: Tue, 14 Jan 2020 14:09:05 +0100 Subject: [PATCH 07/24] add deps to circle --- circle.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/circle.yml b/circle.yml index 8c2c3c3..651c369 100644 --- a/circle.yml +++ b/circle.yml @@ -3,7 +3,7 @@ dependencies: - sudo apt-get install texlive-fonts-recommended - pip install -q --install-option="--no-cython-compile" Cython==0.23.4 - pip install -q numpy - - pip install -q nose mpmath argparse Pillow codecov matplotlib Sphinx==1.5.5 + - pip install -q nose mpmath argparse Pillow codecov matplotlib Sphinx==1.5.5 dateparser tqdm - git submodule init - git submodule update - python setup.py build From 08411d24f78d6b27c0b6977ce60779abc43119cf Mon Sep 17 00:00:00 2001 From: Simon Kern Date: Tue, 14 Jan 2020 14:17:27 +0100 Subject: [PATCH 08/24] add more deps for appveyor --- appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index 66c1155..8382c8a 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -25,7 +25,7 @@ install: - "util\\appveyor\\build.cmd %PYTHON%\\python.exe -m pip install numpy --cache-dir c:\\tmp\\pip-cache" - "util\\appveyor\\build.cmd %PYTHON%\\python.exe -m pip install - Cython nose coverage matplotlib --cache-dir c:\\tmp\\pip-cache" + Cython nose coverage matplotlib dateparser tqdm --cache-dir c:\\tmp\\pip-cache" test_script: - "util\\appveyor\\build.cmd %PYTHON%\\python.exe setup.py build --build-lib build\\lib\\" From 5c2c33968e3a3d111c1e2955004416d46feb6db9 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Sat, 25 Jan 2020 00:00:09 +0100 Subject: [PATCH 09/24] removed dependencies --- circle.yml | 2 +- pyedflib/highlevel.py | 40 ++++++++++++++++++++++++++++++---------- setup.py | 2 +- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/circle.yml b/circle.yml index 651c369..564d94e 100644 --- a/circle.yml +++ b/circle.yml @@ -3,7 +3,7 @@ dependencies: - sudo apt-get install texlive-fonts-recommended - pip install -q --install-option="--no-cython-compile" Cython==0.23.4 - pip install -q numpy - - pip install -q nose mpmath argparse Pillow codecov matplotlib Sphinx==1.5.5 dateparser tqdm + - pip install -q nose mpmath argparse Pillow codecov matplotlib Sphinx==1.5.5 tqdm - git submodule init - git submodule update - python setup.py build diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index 78a4f18..5f8a0e1 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -15,12 +15,37 @@ import numpy as np import warnings import pyedflib -import dateparser from datetime import datetime -from tqdm import tqdm # from . import EdfWriter # from . import EdfReader +def tqdm(*args, **kwargs): + """ + These are optional dependecies that show a progress bar + for some of the functions, e.g. loading. + + if not installed this is just a pass through iterator + """ + try: + from tqd2m import tqdm as iterator + return iterator(*args, **kwargs) + except: + return list(args[0]) + +def _parse_date(string): + # some common formats. + formats = ['%Y-%m-%d', '%d-%m-%Y', '%d.%m.%Y', '%Y.%m.%d', '%d %b %Y', + '%Y/%m/%d', '%d/%m/%Y'] + for f in formats: + try: + return datetime.strptime(string, f) + except: + pass + print('dateparser is not installed. to convert strings to dates'\ + 'install via `pip install dateparser`.') + raise ValueError('birthdate must be datetime object or of format'\ + ' `%d-%m-%Y`, eg. `24-01-2020`') + def make_header(technician='', recording_additional='', patientname='', patient_additional='', patientcode= '', equipment= '', admincode= '', gender= '', startdate=None, birthdate= ''): @@ -28,14 +53,8 @@ def make_header(technician='', recording_additional='', patientname='', A convenience function to create an EDF header (a dictionary) that can be used by pyedflib to update the main header of the EDF """ - if not( startdate is None or isinstance(startdate, datetime)): - warnings.warn('must be datetime or None, is {}: {},attempting convert'\ - .format(type(startdate), startdate)) - startdate = dateparser.parse(startdate) - if not (birthdate == '' or isinstance(birthdate, (datetime,str))): - warnings.warn('must be datetime or empty, is {}, {}'\ - .format(type(birthdate), birthdate)) - birthdate = dateparser.parse(birthdate) + if not birthdate=='' and isinstance(birthdate, str): + birthdate = _parse_date(birthdate) if startdate is None: now = datetime.now() startdate = datetime(now.year, now.month, now.day, @@ -52,6 +71,7 @@ def make_header(technician='', recording_additional='', patientname='', header[var] = str(local[var]) return header + def make_signal_header(label, dimension='uV', sample_rate=256, physical_min=-200, physical_max=200, digital_min=-32768, digital_max=32767, transducer='', prefiler=''): diff --git a/setup.py b/setup.py index 0df58db..bd0cdfd 100644 --- a/setup.py +++ b/setup.py @@ -283,6 +283,6 @@ def install_for_development(self): libraries=[c_lib], cmdclass={'develop': develop_build_clib}, test_suite='nose.collector', - install_requires=["numpy>=1.9.1", "dateparser", "tqdm"], + install_requires=["numpy>=1.9.1"], ) From d977f96246915616700bc1e25489e6e181f1fe55 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Sat, 25 Jan 2020 00:12:57 +0100 Subject: [PATCH 10/24] add digital signal converter --- pyedflib/highlevel.py | 79 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index 5f8a0e1..3f9fdc0 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -45,7 +45,39 @@ def _parse_date(string): 'install via `pip install dateparser`.') raise ValueError('birthdate must be datetime object or of format'\ ' `%d-%m-%Y`, eg. `24-01-2020`') - + +def dig2phys(signal, dmin, dmax, pmin, pmax): + """ + converts digital edf values to analogue values + + :param signal: A numpy array with int values (digital values) or an int + :param dmin: digital minimum value of the edf file (eg -2048) + :param dmax: digital maximum value of the edf file (eg 2048) + :param pmin: physical maximum value of the edf file (eg -200.0) + :param pmax: physical maximum value of the edf file (eg 200.0) + :returns: converted physical values + """ + m = (pmax-pmin) / (dmax-dmin) + physical = m * signal + return physical + +def phys2dig(signal, dmin, dmax, pmin, pmax): + """ + converts physical edf values to digital values + + :param signal: A numpy array with int values (digital values) or an int + :param dmin: digital minimum value of the edf file (eg -2048) + :param dmax: digital maximum value of the edf file (eg 2048) + :param pmin: physical maximum value of the edf file (eg -200.0) + :param pmax: physical maximum value of the edf file (eg 200.0) + :returns: converted digital values + """ + m = (dmax-dmin)/(pmax-pmin) + digital = (m * signal) + return digital + + + def make_header(technician='', recording_additional='', patientname='', patient_additional='', patientcode= '', equipment= '', admincode= '', gender= '', startdate=None, birthdate= ''): @@ -245,6 +277,51 @@ def read_edf_header(edf_file): del f return summary +def compare_edf(edf_file1, edf_file2, verbose=True): + """ + Loads two edf files and checks whether the values contained in + them are the same. Does not check the header data. + + :param edf_file1: First edf file to check + :param edf_file2: second edf file to compare against + :param verbose: print update messages or not. + """ + if verbose: print('verifying data') + files = [(edf_file1, True), (edf_file2, True), + (edf_file1, False), (edf_file2, False)] + results = Parallel(n_jobs=4, backend='loky')(delayed(read_edf)\ + (file, digital=digital, verbose=False) for file, \ + digital in tqdm(files, disable=not verbose)) + + signals1, signal_headers1, _ = results[0] + signals2, signal_headers2, _ = results[1] + signals3, signal_headers3, _ = results[0] + signals4, signal_headers4, _ = results[1] + + for i, sigs in enumerate(zip(signals1, signals2)): + s1, s2 = sigs + s1 = np.abs(s1) + s2 = np.abs(s2) + assert np.allclose(s1, s2), 'Error, digital values of {}'\ + ' and {} for ch {}: {} are not the same'.format( + edf_file1, edf_file2, signal_headers1[i]['label'], + signal_headers2[i]['label']) + + for i, sigs in enumerate(zip(signals3, signals4)): + s1, s2 = sigs + # compare absolutes in case of inverted signals + s1 = np.abs(s1) + s2 = np.abs(s2) + dmin, dmax = signal_headers3[i]['digital_min'], signal_headers3[i]['digital_max'] + pmin, pmax = signal_headers3[i]['physical_min'], signal_headers3[i]['physical_max'] + min_dist = np.abs(dig2phys(1, dmin, dmax, pmin, pmax)) + close = np.mean(np.isclose(s1, s2, atol=min_dist)) + assert close>0.99, 'Error, physical values of {}'\ + ' and {} for ch {}: {} are not the same: {:.3f}'.format( + edf_file1, edf_file2, signal_headers1[i]['label'], + signal_headers2[i]['label'], close) + gc.collect() + return True def drop_channels(edf_source, edf_target=None, to_keep=None, to_drop=None): From daa3f37c0098e838e0101f65e3ac5e342851e082 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 15:55:51 +0100 Subject: [PATCH 11/24] added docstrings and support for annotations --- pyedflib/highlevel.py | 488 ++++++++++++++++++++++++++++++++---------- 1 file changed, 379 insertions(+), 109 deletions(-) diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index 3f9fdc0..d9fc3ec 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -8,6 +8,15 @@ """ Created on Tue Jan 7 12:13:47 2020 +This file contains high-level functions to work with pyedflib. + +Includes + - Reading and writing EDFs + - Anonymizing EDFs + - Comparing EDFs + - Renaming Channels from EDF files + - Dropping Channels from EDF files + @author: skjerns """ @@ -19,20 +28,37 @@ # from . import EdfWriter # from . import EdfReader -def tqdm(*args, **kwargs): +def tqdm(iteratable, *args, **kwargs): """ - These are optional dependecies that show a progress bar - for some of the functions, e.g. loading. + These is an optional dependecies that shows a progress bar for some + of the functions, e.g. loading. + + install this dependency with `pip install tqdm` if not installed this is just a pass through iterator """ try: - from tqd2m import tqdm as iterator - return iterator(*args, **kwargs) + from tqd3m import tqdm as iterator + return iterator(iteratable, *args, **kwargs) except: - return list(args[0]) + return iteratable + def _parse_date(string): + """ + A simple dateparser that detects common date formats + + Parameters + ---------- + string : str + a date string in format as denoted below. + + Returns + ------- + datetime.datetime + datetime object of a time. + + """ # some common formats. formats = ['%Y-%m-%d', '%d-%m-%Y', '%d.%m.%Y', '%Y.%m.%d', '%d %b %Y', '%Y/%m/%d', '%d/%m/%Y'] @@ -41,21 +67,37 @@ def _parse_date(string): return datetime.strptime(string, f) except: pass - print('dateparser is not installed. to convert strings to dates'\ - 'install via `pip install dateparser`.') - raise ValueError('birthdate must be datetime object or of format'\ - ' `%d-%m-%Y`, eg. `24-01-2020`') + try: + import dateparser + return dateparser.parse(string) + except: + print('dateparser is not installed. to convert strings to dates'\ + 'install via `pip install dateparser`.') + raise ValueError('birthdate must be datetime object or of format'\ + ' `%d-%m-%Y`, eg. `24-01-2020`') def dig2phys(signal, dmin, dmax, pmin, pmax): """ - converts digital edf values to analogue values - - :param signal: A numpy array with int values (digital values) or an int - :param dmin: digital minimum value of the edf file (eg -2048) - :param dmax: digital maximum value of the edf file (eg 2048) - :param pmin: physical maximum value of the edf file (eg -200.0) - :param pmax: physical maximum value of the edf file (eg 200.0) - :returns: converted physical values + converts digital edf values to physical values + + Parameters + ---------- + signal : np.ndarray or int + A numpy array with int values (digital values) or an int. + dmin : int + digital minimum value of the edf file (eg -2048). + dmax : int + digital maximum value of the edf file (eg 2048). + pmin : float + physical maximum value of the edf file (eg -200.0). + pmax : float + physical maximum value of the edf file (eg 200.0). + + Returns + ------- + physical : np.ndarray or float + converted physical values + """ m = (pmax-pmin) / (dmax-dmin) physical = m * signal @@ -63,14 +105,26 @@ def dig2phys(signal, dmin, dmax, pmin, pmax): def phys2dig(signal, dmin, dmax, pmin, pmax): """ - converts physical edf values to digital values - - :param signal: A numpy array with int values (digital values) or an int - :param dmin: digital minimum value of the edf file (eg -2048) - :param dmax: digital maximum value of the edf file (eg 2048) - :param pmin: physical maximum value of the edf file (eg -200.0) - :param pmax: physical maximum value of the edf file (eg 200.0) - :returns: converted digital values + converts physical values to digital values + + Parameters + ---------- + signal : np.ndarray or int + A numpy array with int values (digital values) or an int. + dmin : int + digital minimum value of the edf file (eg -2048). + dmax : int + digital maximum value of the edf file (eg 2048). + pmin : float + physical maximum value of the edf file (eg -200.0). + pmax : float + physical maximum value of the edf file (eg 200.0). + + Returns + ------- + digital : np.ndarray or int + converted digital values + """ m = (dmax-dmin)/(pmax-pmin) digital = (m * signal) @@ -84,7 +138,37 @@ def make_header(technician='', recording_additional='', patientname='', """ A convenience function to create an EDF header (a dictionary) that can be used by pyedflib to update the main header of the EDF + + Parameters + ---------- + technician : str, optional + name of the technician. The default is ''. + recording_additional : str, optional + comments etc. The default is ''. + patientname : str, optional + the name of the patient. The default is ''. + patient_additional : TYPE, optional + more info about the patient. The default is ''. + patientcode : str, optional + alphanumeric code. The default is ''. + equipment : str, optional + which system was used. The default is ''. + admincode : str, optional + code of the admin. The default is ''. + gender : str, optional + gender of patient. The default is ''. + startdate : datetime.datetime, optional + startdate of recording. The default is None. + birthdate : str/datetime.datetime, optional + date of birth of the patient. The default is ''. + + Returns + ------- + header : dict + a dictionary with the values given filled in. + """ + if not birthdate=='' and isinstance(birthdate, str): birthdate = _parse_date(birthdate) if startdate is None: @@ -112,9 +196,35 @@ def make_signal_header(label, dimension='uV', sample_rate=256, This can be used to create a list of signal headers that is used by pyedflib to create an edf. With this, different sampling frequencies can be indicated. - - :param label: the name of the channel + + Parameters + ---------- + label : str + the name of the channel. + dimension : str, optional + dimension, eg mV. The default is 'uV'. + sample_rate : int, optional + sampling frequency. The default is 256. + physical_min : float, optional + minimum value in dimension. The default is -200. + physical_max : float, optional + maximum value in dimension. The default is 200. + digital_min : int, optional + digital minimum of the ADC. The default is -32768. + digital_max : int, optional + digital maximum of the ADC. The default is 32767. + transducer : str, optional + electrode type that was used. The default is ''. + prefiler : str, optional + filtering and sampling method. The default is ''. + + Returns + ------- + signal_header : dict + a signal header that can be used to save a channel to an EDF. + """ + signal_header = {'label': label, 'dimension': dimension, 'sample_rate': sample_rate, @@ -133,9 +243,33 @@ def make_signal_headers(list_of_labels, dimension='uV', sample_rate=256, """ A function that creates signal headers for a given list of channel labels. This can only be used if each channel has the same sampling frequency - - :param list_of_labels: A list with labels for each channel. - :returns: A dictionary that can be used by pyedflib to update the header + + Parameters + ---------- + list_of_labels : list of str + A list with labels for each channel. + dimension : str, optional + dimension, eg mV. The default is 'uV'. + sample_rate : int, optional + sampling frequency. The default is 256. + physical_min : float, optional + minimum value in dimension. The default is -200. + physical_max : float, optional + maximum value in dimension. The default is 200. + digital_min : int, optional + digital minimum of the ADC. The default is -32768. + digital_max : int, optional + digital maximum of the ADC. The default is 32767. + transducer : str, optional + electrode type that was used. The default is ''. + prefiler : str, optional + filtering and sampling method. The default is ''. + + Returns + ------- + signal_headers : list of dict + returns n signal headers as a list to save several signal headers. + """ signal_headers = [] for label in list_of_labels: @@ -149,17 +283,36 @@ def make_signal_headers(list_of_labels, dimension='uV', sample_rate=256, def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=True): """ - Reading EDF+/BDF data with pyedflib. + Convenience function for reading EDF+/BDF data with pyedflib. Will load the edf and return the signals, the headers of the signals and the header of the EDF. If all signals have the same sample frequency will return a numpy array, else a list with the individual signals - :param edf_file: link to an edf file - :param ch_nrs: The numbers of channels to read (optional) - :param ch_names: The names of channels to read (optional) - :returns: signals, signal_headers, header - """ + + Parameters + ---------- + edf_file : str + link to an edf file. + ch_nrs : list of int, optional + The indices of the channels to read. The default is None. + ch_names : list of str, optional + The names of channels to read. The default is None. + digital : bool, optional + will return the signals as digital values (ADC). The default is False. + verbose : bool, optional + DESCRIPTION. The default is True. + + Returns + ------- + signals : np.ndarray or list + the signals of the chosen channels contained in the EDF. + signal_headers : list + one signal header for each channel in the EDF. + header : dict + the main header of the EDF file containing meta information. + + """ assert os.path.exists(edf_file), 'file {} does not exist'.format(edf_file) assert (ch_nrs is None) or (ch_names is None), \ 'names xor numbers should be supplied' @@ -192,8 +345,13 @@ def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=True): ch_nrs = [n_chrs+ch if ch<0 else ch for ch in ch_nrs] # load headers, signal information and - header = f.getHeader() + header = f.getHeader() signal_headers = [f.getSignalHeaders()[c] for c in ch_nrs] + + # add annotations to header + annotations = f.read_annotation() + annotations = [[t/10000000, d if d else -1, x.decode()] for t,d,x in annotations] + header['annotations'] = annotations signals = [] for i,c in enumerate(tqdm(ch_nrs, desc='Reading Channels', @@ -217,17 +375,29 @@ def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=True): def write_edf(edf_file, signals, signal_headers, header, digital=False): """ - Write signals to an edf_file. Header can be generated on the fly. - - :param signals: The signals as a list of arrays or a ndarray - :param signal_headers: a list with one signal header(dict) for each signal. - See pyedflib.EdfWriter.setSignalHeader - :param header: a main header (dict) for the EDF file, see - pyedflib.EdfWriter.setHeader for details - :param digital: whether signals are presented digitally - or in physical values - - :returns: True if successful, False if failed + Write signals to an edf_file. Header can be generated on the fly with + generic values. + + Parameters + ---------- + edf_file : np.ndarray or list + where to save the EDF file + signals : list + The signals as a list of arrays or a ndarray. + + signal_headers : list of dict + a list with one signal header(dict) for each signal. + See pyedflib.EdfWriter.setSignalHeader.. + header : dict + a main header (dict) for the EDF file, see + pyedflib.EdfWriter.setHeader for details. + digital : bool, optional + whether the signals are in digital format (ADC). The default is False. + + Returns + ------- + bool + True if successful, False if failed. """ assert header is None or isinstance(header, dict), \ 'header must be dictioniary' @@ -238,11 +408,19 @@ def write_edf(edf_file, signals, signal_headers, header, digital=False): n_channels = len(signals) + default_header = make_header() + default_header.update(header) + header = default_header + + annotations = header.get('annotations', '') + with pyedflib.EdfWriter(edf_file, n_channels=n_channels) as f: f.setSignalHeaders(signal_headers) - f.setHeader(header) + f.setHeader(header) + for annotation in annotations: + f.writeAnnotation(*annotation) f.writeSamples(signals, digital=digital) - + return os.path.isfile(edf_file) @@ -251,11 +429,23 @@ def write_edf_quick(edf_file, signals, sfreq, digital=False): wrapper for write_pyedf without creating headers. Use this if you don't care about headers or channel names and just want to dump some signals with the same sampling freq. to an edf - - :param edf_file: where to store the data/edf - :param signals: The signals you want to store as numpy array - :param sfreq: the sampling frequency of the signals - :param digital: if the data is present digitally (int) or as mV/uV + + Parameters + ---------- + edf_file : str + where to store the data/edf. + signals : np.ndarray + The signals you want to store as numpy array. + sfreq : int + the sampling frequency of the signals. + digital : bool, optional + if the data is present digitally (int) or as mV/uV.The default is False. + + Returns + ------- + bool + True if successful, else False or raise Error. + """ labels = ['CH_{}'.format(i) for i in range(len(signals))] signal_headers = make_signal_headers(labels, sample_rate = sfreq) @@ -264,82 +454,131 @@ def write_edf_quick(edf_file, signals, sfreq, digital=False): def read_edf_header(edf_file): """ - Reads the header and signal headers of an EDF file - - :returns: header of the edf file (dict) + Reads the header and signal headers of an EDF file and it's annotations + + Parameters + ---------- + edf_file : str + EDF/BDF file to read. + + Returns + ------- + summary : dict + header of the edf file as dictionary. + """ assert os.path.isfile(edf_file), 'file {} does not exist'.format(edf_file) with pyedflib.EdfReader(edf_file) as f: + annotations = f.read_annotation() + annotations = [[t/10000, d if d else -1, x] for t,d,x in annotations] summary = f.getHeader() summary['Duration'] = f.getFileDuration summary['SignalHeaders'] = f.getSignalHeaders() summary['channels'] = f.getSignalLabels() + summary['annotations'] = annotations del f return summary + def compare_edf(edf_file1, edf_file2, verbose=True): """ Loads two edf files and checks whether the values contained in - them are the same. Does not check the header data. + them are the same. Does not check the header or annotations data. - :param edf_file1: First edf file to check - :param edf_file2: second edf file to compare against - :param verbose: print update messages or not. + Mainly to verify that other options (eg anonymization) produce the + same EDF file. + + Parameters + ---------- + edf_file1 : str + edf file 1 to compare. + edf_file2 : str + edf file 2 to compare. + verbose : bool, optional + print progress or not. The default is True. + + Returns + ------- + bool + True if signals are equal, else raises error. """ if verbose: print('verifying data') - files = [(edf_file1, True), (edf_file2, True), - (edf_file1, False), (edf_file2, False)] - results = Parallel(n_jobs=4, backend='loky')(delayed(read_edf)\ - (file, digital=digital, verbose=False) for file, \ - digital in tqdm(files, disable=not verbose)) - - signals1, signal_headers1, _ = results[0] - signals2, signal_headers2, _ = results[1] - signals3, signal_headers3, _ = results[0] - signals4, signal_headers4, _ = results[1] + signals1, shead1, _ = read_edf(edf_file1, digital=True, verbose=verbose, + return_list=True) + signals2, shead2, _ = read_edf(edf_file2, digital=True, verbose=verbose, + return_list=True) + for i, sigs in enumerate(zip(signals1, signals2)): s1, s2 = sigs + if np.array_equal(s1, s2): continue # early stopping s1 = np.abs(s1) s2 = np.abs(s2) - assert np.allclose(s1, s2), 'Error, digital values of {}'\ - ' and {} for ch {}: {} are not the same'.format( - edf_file1, edf_file2, signal_headers1[i]['label'], - signal_headers2[i]['label']) + if np.array_equal(s1, s2): continue # early stopping + close = np.mean(np.isclose(s1, s2)) + assert close>0.99, 'Error, digital values of {}'\ + ' and {} for ch {}: {} are not the same: {:.3f}'.format( + edf_file1, edf_file2, shead1[i]['label'], + shead2[i]['label'], close) + + dmin1, dmax1 = shead1[i]['digital_min'], shead1[i]['digital_max'] + pmin1, pmax1 = shead1[i]['physical_min'], shead1[i]['physical_max'] + dmin2, dmax2 = shead2[i]['digital_min'], shead2[i]['digital_max'] + pmin2, pmax2 = shead2[i]['physical_min'], shead2[i]['physical_max'] - for i, sigs in enumerate(zip(signals3, signals4)): + for i, sigs in enumerate(zip(signals1, signals2)): s1, s2 = sigs + + # convert to physical values, no need to load all data again + s1 = dig2phys(s1, dmin1, dmax1, pmin1, pmax1) + s2 = dig2phys(s2, dmin2, dmax2, pmin2, pmax2) + + # now we can remove the signals from the list to save memory + signals1[i] = None + signals2[i] = None + # compare absolutes in case of inverted signals + if np.array_equal(s1, s2): continue # early stopping s1 = np.abs(s1) s2 = np.abs(s2) - dmin, dmax = signal_headers3[i]['digital_min'], signal_headers3[i]['digital_max'] - pmin, pmax = signal_headers3[i]['physical_min'], signal_headers3[i]['physical_max'] - min_dist = np.abs(dig2phys(1, dmin, dmax, pmin, pmax)) + if np.array_equal(s1, s2): continue # early stopping + min_dist = np.abs(dig2phys(1, dmin1, dmax1, pmin1, pmax1)) close = np.mean(np.isclose(s1, s2, atol=min_dist)) assert close>0.99, 'Error, physical values of {}'\ ' and {} for ch {}: {} are not the same: {:.3f}'.format( - edf_file1, edf_file2, signal_headers1[i]['label'], - signal_headers2[i]['label'], close) - gc.collect() + edf_file1, edf_file2, shead1[i]['label'], + shead2[i]['label'], close) return True def drop_channels(edf_source, edf_target=None, to_keep=None, to_drop=None): """ - Remove channels from an edf file using pyedflib. - Save the file as edf_target. + Remove channels from an edf file. Save the file. For safety reasons, no source files can be overwritten. - - :param edf_source: The source edf file - :param edf_target: Where to save the file. - If None, will be edf_source+'dropped.edf' - :param to_keep: A list of channel names or indices that will be kept. - Strings will always be interpreted as channel names. - 'to_keep' will overwrite any droppings proposed by to_drop - :param to_drop: A list of channel names/indices that should be dropped. - Strings will be interpreted as channel names. - :returns: the target filename with the dropped channels + + Parameters + ---------- + edf_source : str + The source edf file from which to drop channels. + edf_target : str, optional + Where to save the file.If None, will be edf_source+'dropped.edf'. + The default is None. + to_keep : list, optional + A list of channel names or indices that will be kept. + Strings will always be interpreted as channel names. + 'to_keep' will overwrite any droppings proposed by to_drop. + The default is None. + to_drop : list, optional + A list of channel names/indices that should be dropped. + Strings will be interpreted as channel names. The default is None. + + Returns + ------- + edf_target : str + the target filename with the dropped channels. + """ + # convert to list if necessary if isinstance(to_keep, (int, str)): to_keep = [to_keep] if isinstance(to_drop, (int, str)): to_drop = [to_drop] @@ -394,17 +633,33 @@ def drop_channels(edf_source, edf_target=None, to_keep=None, to_drop=None): def anonymize_edf(edf_file, new_file=None, to_remove = ['patientname', 'birthdate'], - new_values = ['xxx', '']): + new_values = ['xxx', ''], verify=False): """ Anonymizes an EDF file, that means it strips all header information that is patient specific, ie. birthdate and patientname as well as XXX - - :param edf_file: a string with a filename of an EDF/BDF - :param new_file: where to save the anonymized edf file - :param to_remove: a list of attributes to remove from the file - :param new_values: a list of values that should be given instead to the edf - :returns: True if successful, False if failed + + Parameters + ---------- + edf_file : str + a string with a filename of an EDF/BDF. + new_file : str, optional + a string with the new filename of an EDF/BDF. The default is None. + to_remove : list of str, optional + a list of attributes to remove from the file. + The default is ['patientname', 'birthdate']. + new_values : list of str, optional + a list of values that should be given instead to the edf. + Each to_remove value must have one new_value. + The default is ['xxx', '']. + verify : bool + compare the two edf files for equality (double check values are same) + + Returns + ------- + bool + True if successful, False if failed. """ + assert len(to_remove)==len(new_values), \ 'Each to_remove must have one new_value' header = read_edf_header(edf_file) @@ -423,17 +678,32 @@ def anonymize_edf(edf_file, new_file=None, ch_nrs=ch_nr, verbose=False) signal_headers.append(signal_header[0]) signals.append(signal.squeeze()) + if verify: + compare_edf(edf_file, new_file) + return write_edf(new_file, signals, signal_headers, header, digital=True) - return write_edf(new_file, signals, signal_headers, header,digital=True) def rename_channels(edf_file, mapping, new_file=None): """ A convenience function to rename channels in an EDF file. - - :param edf_file: an string pointing to an edf file - :param mapping: a dictionary with channel mappings as key:value - :param new_file: the new filename + + Parameters + ---------- + edf_file : str + an string pointing to an edf file. + mapping : dict + a dictionary with channel mappings as key:value. + eg: {'M1-O2':'A1-O2'} + new_file : str, optional + the new filename. If None will be edf_file + '_renamed' + The default is None. + + Returns + ------- + bool + True if successful, False if failed. + """ header = read_edf_header(edf_file) channels = header['channels'] @@ -456,5 +726,5 @@ def rename_channels(edf_file, mapping, new_file=None): signal_headers.append(signal_header[0]) signals.append(signal.squeeze()) - write_edf(new_file, signals, signal_headers, header,digital=True) + return write_edf(new_file, signals, signal_headers, header, digital=True) \ No newline at end of file From 7e98a839f32fbafec1fb4f2ce33fed788fa6d0df Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 16:42:59 +0100 Subject: [PATCH 12/24] add first test --- pyedflib/highlevel.py | 45 ++++++++++++++++++++++++++++++-- pyedflib/tests/test_highlevel.py | 43 ++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) create mode 100644 pyedflib/tests/test_highlevel.py diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index d9fc3ec..9db9b40 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -370,6 +370,7 @@ def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=True): .format(sfreqs)) assert len(signals)==len(signal_headers), 'Something went wrong, lengths'\ ' of headers is not length of signals' + del f return signals, signal_headers, header @@ -420,7 +421,7 @@ def write_edf(edf_file, signals, signal_headers, header, digital=False): for annotation in annotations: f.writeAnnotation(*annotation) f.writeSamples(signals, digital=digital) - + del f return os.path.isfile(edf_file) @@ -727,4 +728,44 @@ def rename_channels(edf_file, mapping, new_file=None): signals.append(signal.squeeze()) return write_edf(new_file, signals, signal_headers, header, digital=True) - \ No newline at end of file + + +def change_polarity(edf_file, channels, new_file=None, verify=True, verbose=True): + """ + Change polarity of certain channels + + Parameters + ---------- + edf_file : str + from which file to change polarity. + channels : list of int + the indices of the channels. + new_file : str, optional + where to save the edf with inverted channels. The default is None. + verify : bool, optional + whether to verify the two edfs for similarity. The default is True. + verbose : str, optional + print progress or not. The default is True. + + Returns + ------- + bool + True if success. + + """ + + if new_file is None: + new_file = os.path.splitext(edf_file)[0] + '.edf' + + if isinstance(channels, str): channels=[channels] + channels = [c.lower() for c in channels] + + signals, signal_headers, header = read_edf(edf_file, digital=True, verbose=verbose) + for i,sig in enumerate(signals): + label = signal_headers[i]['label'].lower() + if label in channels: + if verbose: print('inverting {}'.format(label)) + signals[i] = -sig + write_edf(new_file, signals, signal_headers, header, digital=True, correct=False) + if verify: compare_edf(edf_file, new_file) + return True diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py new file mode 100644 index 0000000..41f7fb5 --- /dev/null +++ b/pyedflib/tests/test_highlevel.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2015 Holger Nahrstaedt +from __future__ import division, print_function, absolute_import + +import os +import numpy as np +# from numpy.testing import (assert_raises, run_module_suite, +# assert_equal, assert_allclose, assert_almost_equal) +import unittest +from pyedflib import highlevel +from datetime import datetime, date + + +class TestEdfWriter(unittest.TestCase): + def setUp(self): + data_dir = os.path.join(os.path.dirname(__file__), 'data') + self.bdfplus_data_file = os.path.join(data_dir, 'tmp_test_file_plus.bdf') + self.edfplus_data_file = os.path.join(data_dir, 'tmp_test_file_plus.edf') + self.bdf_data_file = os.path.join(data_dir, 'tmp_test_file.bdf') + self.edf_data_file = os.path.join(data_dir, 'tmp_test_file.edf') + + def test_read_write_edf(self): + startdate = datetime.now() + header = highlevel.make_header(technician='tech', recording_additional='radd', + patientname='name', patient_additional='padd', + patientcode='42', equipment='eeg', admincode='420', + gender='male', startdate=startdate,birthdate='05.09.1980') + annotations = [[50, -1, 'begin'],[150, -1, 'end']] + header['annotations'] = annotations + signal_headers = highlevel.make_signal_headers(['ch'+str(i) for i in range(5)]) + signals = np.random.rand(5, 256*300) #5 minutes of eeg + + success = highlevel.write_edf(self.edfplus_data_file,signals, signal_headers, header) + self.assertTrue(os.path.isfile(self.edfplus_data_file)) + self.assertGreater(os.path.getsize(self.edfplus_data_file), 0) + self.assertTrue(success) + + + + +if __name__ == '__main__': + # run_module_suite(argv=sys.argv) + unittest.main() From dc511360bc84aa08fa0f7b72039af32c060437a8 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 17:17:09 +0100 Subject: [PATCH 13/24] test read/write --- pyedflib/__init__.py | 1 + pyedflib/highlevel.py | 2 +- pyedflib/tests/test_highlevel.py | 38 +++++++++++++++++++++++++------- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/pyedflib/__init__.py b/pyedflib/__init__.py index a28a4a2..02b7e24 100644 --- a/pyedflib/__init__.py +++ b/pyedflib/__init__.py @@ -10,6 +10,7 @@ from ._extensions._pyedflib import * from .edfwriter import * from .edfreader import * +from . import highlevel from . import data diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index 9db9b40..05e9581 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -177,7 +177,7 @@ def make_header(technician='', recording_additional='', patientname='', now.hour, now.minute, now.second) del now if isinstance(birthdate, datetime): - birthdate = birthdate.strftime('%d %b %Y') + birthdate = birthdate.strftime('%d %b %Y').lower() local = locals() header = {} for var in local: diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py index 41f7fb5..8e63303 100644 --- a/pyedflib/tests/test_highlevel.py +++ b/pyedflib/tests/test_highlevel.py @@ -14,29 +14,51 @@ class TestEdfWriter(unittest.TestCase): def setUp(self): data_dir = os.path.join(os.path.dirname(__file__), 'data') - self.bdfplus_data_file = os.path.join(data_dir, 'tmp_test_file_plus.bdf') self.edfplus_data_file = os.path.join(data_dir, 'tmp_test_file_plus.edf') - self.bdf_data_file = os.path.join(data_dir, 'tmp_test_file.bdf') - self.edf_data_file = os.path.join(data_dir, 'tmp_test_file.edf') def test_read_write_edf(self): startdate = datetime.now() header = highlevel.make_header(technician='tech', recording_additional='radd', patientname='name', patient_additional='padd', patientcode='42', equipment='eeg', admincode='420', - gender='male', startdate=startdate,birthdate='05.09.1980') + gender='Male', startdate=startdate,birthdate='05.09.1980') annotations = [[50, -1, 'begin'],[150, -1, 'end']] header['annotations'] = annotations - signal_headers = highlevel.make_signal_headers(['ch'+str(i) for i in range(5)]) - signals = np.random.rand(5, 256*300) #5 minutes of eeg + signal_headers1 = highlevel.make_signal_headers(['ch'+str(i) for i in range(5)]) + signals = np.random.rand(5, 256*300)*200 #5 minutes of eeg - success = highlevel.write_edf(self.edfplus_data_file,signals, signal_headers, header) + success = highlevel.write_edf(self.edfplus_data_file, signals, signal_headers1, header) self.assertTrue(os.path.isfile(self.edfplus_data_file)) self.assertGreater(os.path.getsize(self.edfplus_data_file), 0) self.assertTrue(success) + signals2, signal_headers2, header2 = highlevel.read_edf(self.edfplus_data_file) + t = header['startdate'] + header['startdate'] = datetime(t.year,t.month,t.day,t.hour, t.minute,t.second) + + self.assertEqual(len(signals2), 5) + self.assertEqual(len(signals2), len(signal_headers2)) + for shead1, shead2 in zip(signal_headers1, signal_headers2): + self.assertDictEqual(shead1, shead2) + self.assertEqual(header, header2) + np.testing.assert_allclose(signals, signals2, atol=0.01) - + signals = (signals*100).astype(int) + success = highlevel.write_edf(self.edfplus_data_file, signals, signal_headers1, header, digital=True) + self.assertTrue(os.path.isfile(self.edfplus_data_file)) + self.assertGreater(os.path.getsize(self.edfplus_data_file), 0) + self.assertTrue(success) + + signals2, signal_headers2, header2 = highlevel.read_edf(self.edfplus_data_file, digital=True) + t = header['startdate'] + header['startdate'] = datetime(t.year, t.month, t.day, t.hour, t.minute, t.second) + + self.assertEqual(len(signals2), 5) + self.assertEqual(len(signals2), len(signal_headers2)) + for shead1, shead2 in zip(signal_headers1, signal_headers2): + self.assertDictEqual(shead1, shead2) + self.assertEqual(header, header2) + np.testing.assert_array_equal(signals, signals2) if __name__ == '__main__': # run_module_suite(argv=sys.argv) From df313acc728a1a1a60672e6e5d2d88639d1ff108 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 17:29:03 +0100 Subject: [PATCH 14/24] added quickstart to readme --- README.rst | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/README.rst b/README.rst index a6ba67a..95aa465 100644 --- a/README.rst +++ b/README.rst @@ -76,6 +76,30 @@ The latest release, including source and binary packages for Linux, macOS and Windows, is available for download from the `Python Package Index`_. You can find source releases at the `Releases Page`_. + +Highlevel interface +------------ + +pyEDFlib includes an highlevel interface for easy access to read and write edf files. +Additionally functionality as anonymizing, dropping or renaming channels can be found there. + + from pyedflib import highlevel + # write an edf file + signals = np.random.rand(5, 256*300)*200 # 5 minutes of random signal + channel_names = ['ch1', 'ch2', 'ch3', 'ch4', 'ch5'] + signal_headers = highlevel.make_signal_headers(channel_names, sample_rate=256) + header = highlevel.make_header(patientname='patient_x', gender='Female') + highlevel.write_edf('edf_file.edf', signals, signal_headers, header) + + # read an edf file + signals, signal_headers, header = highlevel.read_edf('edf_file.edf') + print(signal_headers[0]['sample_rate']) # prints 256 + + # drop a channel from the file and anonymize + highlevel.drop_channels('edf_file.edf', to_drop=['ch2', 'ch4']) + highlevel.anonymize('edf_file.edf') + + License ------- From 75714e20540ffee81d3f3f677d9034cc771d2f3b Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 17:31:19 +0100 Subject: [PATCH 15/24] added python syntax --- README.rst | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 95aa465..6435ada 100644 --- a/README.rst +++ b/README.rst @@ -83,6 +83,8 @@ Highlevel interface pyEDFlib includes an highlevel interface for easy access to read and write edf files. Additionally functionality as anonymizing, dropping or renaming channels can be found there. +.. code-block:: Python + from pyedflib import highlevel # write an edf file signals = np.random.rand(5, 256*300)*200 # 5 minutes of random signal @@ -98,8 +100,8 @@ Additionally functionality as anonymizing, dropping or renaming channels can be # drop a channel from the file and anonymize highlevel.drop_channels('edf_file.edf', to_drop=['ch2', 'ch4']) highlevel.anonymize('edf_file.edf') - - + + License ------- From 983dd08b369bc427131e74d138b4b7cf4324d863 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 17:32:05 +0100 Subject: [PATCH 16/24] fix --- README.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 6435ada..84ebdbd 100644 --- a/README.rst +++ b/README.rst @@ -86,6 +86,7 @@ Additionally functionality as anonymizing, dropping or renaming channels can be .. code-block:: Python from pyedflib import highlevel + # write an edf file signals = np.random.rand(5, 256*300)*200 # 5 minutes of random signal channel_names = ['ch1', 'ch2', 'ch3', 'ch4', 'ch5'] @@ -96,7 +97,7 @@ Additionally functionality as anonymizing, dropping or renaming channels can be # read an edf file signals, signal_headers, header = highlevel.read_edf('edf_file.edf') print(signal_headers[0]['sample_rate']) # prints 256 - + # drop a channel from the file and anonymize highlevel.drop_channels('edf_file.edf', to_drop=['ch2', 'ch4']) highlevel.anonymize('edf_file.edf') From addfb183dc160496fc47fa6200094bde8e87169c Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 17:32:44 +0100 Subject: [PATCH 17/24] fix? --- README.rst | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/README.rst b/README.rst index 84ebdbd..e4427ce 100644 --- a/README.rst +++ b/README.rst @@ -85,22 +85,22 @@ Additionally functionality as anonymizing, dropping or renaming channels can be .. code-block:: Python - from pyedflib import highlevel - - # write an edf file - signals = np.random.rand(5, 256*300)*200 # 5 minutes of random signal - channel_names = ['ch1', 'ch2', 'ch3', 'ch4', 'ch5'] - signal_headers = highlevel.make_signal_headers(channel_names, sample_rate=256) - header = highlevel.make_header(patientname='patient_x', gender='Female') - highlevel.write_edf('edf_file.edf', signals, signal_headers, header) +from pyedflib import highlevel + +# write an edf file +signals = np.random.rand(5, 256*300)*200 # 5 minutes of random signal +channel_names = ['ch1', 'ch2', 'ch3', 'ch4', 'ch5'] +signal_headers = highlevel.make_signal_headers(channel_names, sample_rate=256) +header = highlevel.make_header(patientname='patient_x', gender='Female') +highlevel.write_edf('edf_file.edf', signals, signal_headers, header) - # read an edf file - signals, signal_headers, header = highlevel.read_edf('edf_file.edf') - print(signal_headers[0]['sample_rate']) # prints 256 +# read an edf file +signals, signal_headers, header = highlevel.read_edf('edf_file.edf') +print(signal_headers[0]['sample_rate']) # prints 256 - # drop a channel from the file and anonymize - highlevel.drop_channels('edf_file.edf', to_drop=['ch2', 'ch4']) - highlevel.anonymize('edf_file.edf') +# drop a channel from the file and anonymize +highlevel.drop_channels('edf_file.edf', to_drop=['ch2', 'ch4']) +highlevel.anonymize('edf_file.edf') License From 9be88c431e2c469142fdb00ca4f53c7f212003aa Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 17:33:52 +0100 Subject: [PATCH 18/24] rst fix... --- README.rst | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/README.rst b/README.rst index e4427ce..2695ee2 100644 --- a/README.rst +++ b/README.rst @@ -85,22 +85,22 @@ Additionally functionality as anonymizing, dropping or renaming channels can be .. code-block:: Python -from pyedflib import highlevel - -# write an edf file -signals = np.random.rand(5, 256*300)*200 # 5 minutes of random signal -channel_names = ['ch1', 'ch2', 'ch3', 'ch4', 'ch5'] -signal_headers = highlevel.make_signal_headers(channel_names, sample_rate=256) -header = highlevel.make_header(patientname='patient_x', gender='Female') -highlevel.write_edf('edf_file.edf', signals, signal_headers, header) - -# read an edf file -signals, signal_headers, header = highlevel.read_edf('edf_file.edf') -print(signal_headers[0]['sample_rate']) # prints 256 - -# drop a channel from the file and anonymize -highlevel.drop_channels('edf_file.edf', to_drop=['ch2', 'ch4']) -highlevel.anonymize('edf_file.edf') + from pyedflib import highlevel + + # write an edf file + signals = np.random.rand(5, 256*300)*200 # 5 minutes of random signal + channel_names = ['ch1', 'ch2', 'ch3', 'ch4', 'ch5'] + signal_headers = highlevel.make_signal_headers(channel_names, sample_rate=256) + header = highlevel.make_header(patientname='patient_x', gender='Female') + highlevel.write_edf('edf_file.edf', signals, signal_headers, header) + + # read an edf file + signals, signal_headers, header = highlevel.read_edf('edf_file.edf') + print(signal_headers[0]['sample_rate']) # prints 256 + + # drop a channel from the file and anonymize + highlevel.drop_channels('edf_file.edf', to_drop=['ch2', 'ch4']) + highlevel.anonymize('edf_file.edf') License From c6f3fdfc7beedf691e2a144c419797abb4f3954c Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 18:24:46 +0100 Subject: [PATCH 19/24] changed int to long --- pyedflib/tests/test_highlevel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py index 8e63303..2b4bab0 100644 --- a/pyedflib/tests/test_highlevel.py +++ b/pyedflib/tests/test_highlevel.py @@ -43,7 +43,7 @@ def test_read_write_edf(self): self.assertEqual(header, header2) np.testing.assert_allclose(signals, signals2, atol=0.01) - signals = (signals*100).astype(int) + signals = (signals*100).astype(np.long) success = highlevel.write_edf(self.edfplus_data_file, signals, signal_headers1, header, digital=True) self.assertTrue(os.path.isfile(self.edfplus_data_file)) self.assertGreater(os.path.getsize(self.edfplus_data_file), 0) From 17c8ae0b82fd90e744949f93ce5a962d2297a5d3 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 19:37:43 +0100 Subject: [PATCH 20/24] this damn xcode test --- pyedflib/tests/test_highlevel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py index 2b4bab0..3504fe5 100644 --- a/pyedflib/tests/test_highlevel.py +++ b/pyedflib/tests/test_highlevel.py @@ -43,7 +43,7 @@ def test_read_write_edf(self): self.assertEqual(header, header2) np.testing.assert_allclose(signals, signals2, atol=0.01) - signals = (signals*100).astype(np.long) + signals = (signals*100).astype(np.int8) success = highlevel.write_edf(self.edfplus_data_file, signals, signal_headers1, header, digital=True) self.assertTrue(os.path.isfile(self.edfplus_data_file)) self.assertGreater(os.path.getsize(self.edfplus_data_file), 0) From b5310b14b2232b778207be89dade8b7c2980c941 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 19:38:19 +0100 Subject: [PATCH 21/24] now fix 2.7? --- pyedflib/tests/test_highlevel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py index 3504fe5..f038ff8 100644 --- a/pyedflib/tests/test_highlevel.py +++ b/pyedflib/tests/test_highlevel.py @@ -57,7 +57,7 @@ def test_read_write_edf(self): self.assertEqual(len(signals2), len(signal_headers2)) for shead1, shead2 in zip(signal_headers1, signal_headers2): self.assertDictEqual(shead1, shead2) - self.assertEqual(header, header2) + self.assertDictEqual(header, header2) np.testing.assert_array_equal(signals, signals2) if __name__ == '__main__': From 4f5a8d480bd2635104f02056d68be44847f8fe7d Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Wed, 12 Feb 2020 20:13:19 +0100 Subject: [PATCH 22/24] fix py27 --- pyedflib/highlevel.py | 7 +++---- pyedflib/tests/test_highlevel.py | 13 ++++++------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index 05e9581..9ccb32f 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -350,9 +350,8 @@ def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=True): # add annotations to header annotations = f.read_annotation() - annotations = [[t/10000000, d if d else -1, x.decode()] for t,d,x in annotations] + annotations = [[t//10000000, d if d else -1, x.decode()] for t,d,x in annotations] header['annotations'] = annotations - signals = [] for i,c in enumerate(tqdm(ch_nrs, desc='Reading Channels', disable=not verbose)): @@ -360,7 +359,7 @@ def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=True): signals.append(signal) # we can only return a np.array if all signals have the same samplefreq - sfreqs = [header['sample_rate'] for header in signal_headers] + sfreqs = [shead['sample_rate'] for shead in signal_headers] all_sfreq_same = sfreqs[1:]==sfreqs[:-1] if all_sfreq_same: dtype = np.int if digital else np.float @@ -471,7 +470,7 @@ def read_edf_header(edf_file): assert os.path.isfile(edf_file), 'file {} does not exist'.format(edf_file) with pyedflib.EdfReader(edf_file) as f: annotations = f.read_annotation() - annotations = [[t/10000, d if d else -1, x] for t,d,x in annotations] + annotations = [[t//10000000, d if d else -1, x] for t,d,x in annotations] summary = f.getHeader() summary['Duration'] = f.getFileDuration summary['SignalHeaders'] = f.getSignalHeaders() diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py index f038ff8..8ea43e9 100644 --- a/pyedflib/tests/test_highlevel.py +++ b/pyedflib/tests/test_highlevel.py @@ -18,6 +18,9 @@ def setUp(self): def test_read_write_edf(self): startdate = datetime.now() + t = startdate + startdate = datetime(t.year,t.month,t.day,t.hour, t.minute,t.second) + header = highlevel.make_header(technician='tech', recording_additional='radd', patientname='name', patient_additional='padd', patientcode='42', equipment='eeg', admincode='420', @@ -33,14 +36,12 @@ def test_read_write_edf(self): self.assertTrue(success) signals2, signal_headers2, header2 = highlevel.read_edf(self.edfplus_data_file) - t = header['startdate'] - header['startdate'] = datetime(t.year,t.month,t.day,t.hour, t.minute,t.second) - + self.assertEqual(len(signals2), 5) self.assertEqual(len(signals2), len(signal_headers2)) for shead1, shead2 in zip(signal_headers1, signal_headers2): self.assertDictEqual(shead1, shead2) - self.assertEqual(header, header2) + self.assertDictEqual(header, header2) np.testing.assert_allclose(signals, signals2, atol=0.01) signals = (signals*100).astype(np.int8) @@ -50,9 +51,7 @@ def test_read_write_edf(self): self.assertTrue(success) signals2, signal_headers2, header2 = highlevel.read_edf(self.edfplus_data_file, digital=True) - t = header['startdate'] - header['startdate'] = datetime(t.year, t.month, t.day, t.hour, t.minute, t.second) - + self.assertEqual(len(signals2), 5) self.assertEqual(len(signals2), len(signal_headers2)) for shead1, shead2 in zip(signal_headers1, signal_headers2): From 240ec625c8b5b86068c470c942a131e900d7f8a8 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Thu, 13 Feb 2020 12:06:17 +0100 Subject: [PATCH 23/24] fix annotations ms --- pyedflib/highlevel.py | 2 +- pyedflib/tests/test_edfwriter.py | 78 ++++++++++++++++---------------- pyedflib/tests/test_highlevel.py | 11 ++++- 3 files changed, 49 insertions(+), 42 deletions(-) diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index 9ccb32f..17059e1 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -350,7 +350,7 @@ def read_edf(edf_file, ch_nrs=None, ch_names=None, digital=False, verbose=True): # add annotations to header annotations = f.read_annotation() - annotations = [[t//10000000, d if d else -1, x.decode()] for t,d,x in annotations] + annotations = [[float(t)/10000000, d if d else -1, x.decode()] for t,d,x in annotations] header['annotations'] = annotations signals = [] for i,c in enumerate(tqdm(ch_nrs, desc='Reading Channels', diff --git a/pyedflib/tests/test_edfwriter.py b/pyedflib/tests/test_edfwriter.py index d8a4732..adb9651 100644 --- a/pyedflib/tests/test_edfwriter.py +++ b/pyedflib/tests/test_edfwriter.py @@ -25,11 +25,11 @@ def test_EdfWriter_BDFplus(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} channel_info2 = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 100, - 'physical_max': 1.0, 'physical_min': -1.0, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, - file_type=pyedflib.FILETYPE_BDFPLUS) + file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info1) f.setSignalHeader(1,channel_info2) f.setTechnician('tec1') @@ -90,11 +90,11 @@ def test_EdfWriter_BDFplus2(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} channel_info2 = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 100, - 'physical_max': 1.0, 'physical_min': -1.0, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, - file_type=pyedflib.FILETYPE_BDFPLUS) + file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info1) f.setSignalHeader(1,channel_info2) f.setTechnician('tec1') @@ -152,7 +152,7 @@ def test_EdfWriter_BDF(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.bdf_data_file, 2, - file_type=pyedflib.FILETYPE_BDF) + file_type=pyedflib.FILETYPE_BDF) f.setSignalHeader(0,channel_info1) f.setSignalHeader(1,channel_info2) @@ -178,7 +178,7 @@ def test_EdfWriter_EDFplus(self): 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.edfplus_data_file, 1, - file_type=pyedflib.FILETYPE_EDFPLUS) + file_type=pyedflib.FILETYPE_EDFPLUS) header = {'technician': 'tec1', 'recording_additional': 'recAdd1', 'patientname': 'pat1', 'patient_additional': 'patAdd1', 'patientcode': 'code1', 'equipment': 'eq1', @@ -215,11 +215,11 @@ def test_EdfWriter_EDF(self): 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} channel_info2 = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 100, - 'physical_max': 1.0, 'physical_min': -1.0, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.edf_data_file, 2, - file_type=pyedflib.FILETYPE_EDF) + file_type=pyedflib.FILETYPE_EDF) f.setSignalHeader(0,channel_info1) f.setSignalHeader(1,channel_info2) data = np.ones(100) * 0.1 @@ -238,13 +238,13 @@ def test_EdfWriter_EDF(self): def test_SampleWriting(self): channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre1','transducer':'trans1'} channel_info2 = {'label':'test_label2', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre2','transducer':'trans2'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre2','transducer':'trans2'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info1) @@ -273,7 +273,7 @@ def test_EdfWriter_EDF_contextmanager(self): 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} channel_info2 = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 100, - 'physical_max': 1.0, 'physical_min': -1.0, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 32767, 'digital_min': -32768, 'prefilter': 'pre1', 'transducer': 'trans1'} with pyedflib.EdfWriter(self.edf_data_file, 2, file_type=pyedflib.FILETYPE_EDF) as f: @@ -292,13 +292,13 @@ def test_EdfWriter_EDF_contextmanager(self): def test_SampleWritingContextManager(self): channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre1','transducer':'trans1'} channel_info2 = {'label':'test_label2', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre2','transducer':'trans2'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre2','transducer':'trans2'} with pyedflib.EdfWriter(self.bdfplus_data_file, 2, file_type=pyedflib.FILETYPE_BDFPLUS) as f: @@ -326,13 +326,13 @@ def test_SampleWritingContextManager(self): def test_SampleWriting2(self): channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre1','transducer':'trans1'} channel_info2 = {'label':'test_label2', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':8388607,'digital_min':-8388608, - 'prefilter':'pre2','transducer':'trans2'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':8388607,'digital_min':-8388608, + 'prefilter':'pre2','transducer':'trans2'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info1) @@ -360,13 +360,13 @@ def test_SampleWriting_digital(self): dmin, dmax = [0, 1024] pmin, pmax = [0, 1.0] channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':pmax,'physical_min':pmin, - 'digital_max':dmax,'digital_min':dmin, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':pmax,'physical_min':pmin, + 'digital_max':dmax,'digital_min':dmin, + 'prefilter':'pre1','transducer':'trans1'} channel_info2 = {'label':'test_label2', 'dimension':'mV', 'sample_rate':100, - 'physical_max':pmax,'physical_min':pmin, - 'digital_max':dmax,'digital_min':dmin, - 'prefilter':'pre2','transducer':'trans2'} + 'physical_max':pmax,'physical_min':pmin, + 'digital_max':dmax,'digital_min':dmin, + 'prefilter':'pre2','transducer':'trans2'} f = pyedflib.EdfWriter(self.bdfplus_data_file, 2, @@ -409,9 +409,9 @@ def test_SampleWriting_digital(self): def test_TestRoundingEDF(self): channel_info1 = {'label':'test_label1', 'dimension':'mV', 'sample_rate':100, - 'physical_max':1.0,'physical_min':-1.0, - 'digital_max':32767,'digital_min':-32768, - 'prefilter':'pre1','transducer':'trans1'} + 'physical_max':1.0,'physical_min':-1.0, + 'digital_max':32767,'digital_min':-32768, + 'prefilter':'pre1','transducer':'trans1'} f = pyedflib.EdfWriter(self.edfplus_data_file, 1, file_type=pyedflib.FILETYPE_EDFPLUS) f.setSignalHeader(0,channel_info1) @@ -430,7 +430,7 @@ def test_TestRoundingEDF(self): np.testing.assert_almost_equal(data1, data1_read,decimal=4) f = pyedflib.EdfWriter(self.edfplus_data_file, 1, - file_type=pyedflib.FILETYPE_EDFPLUS) + file_type=pyedflib.FILETYPE_EDFPLUS) f.setSignalHeader(0,channel_info1) data_list = [] @@ -485,7 +485,7 @@ def test_AnnotationWritingUTF8(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': u'test', 'transducer': 'trans1'} f = pyedflib.EdfWriter(self.bdf_data_file, 1, - file_type=pyedflib.FILETYPE_BDFPLUS) + file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info) data = np.ones(100) * 0.1 f.writePhysicalSamples(data) @@ -516,7 +516,7 @@ def test_BytesChars(self): 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': b' ', 'transducer': b'trans1'} f = pyedflib.EdfWriter(self.bdf_data_file, 1, - file_type=pyedflib.FILETYPE_BDFPLUS) + file_type=pyedflib.FILETYPE_BDFPLUS) f.setSignalHeader(0,channel_info) data = np.ones(100) * 0.1 f.writePhysicalSamples(data) diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py index 8ea43e9..4a1bfdc 100644 --- a/pyedflib/tests/test_highlevel.py +++ b/pyedflib/tests/test_highlevel.py @@ -2,7 +2,7 @@ # Copyright (c) 2015 Holger Nahrstaedt from __future__ import division, print_function, absolute_import -import os +import os, sys import numpy as np # from numpy.testing import (assert_raises, run_module_suite, # assert_equal, assert_allclose, assert_almost_equal) @@ -25,7 +25,7 @@ def test_read_write_edf(self): patientname='name', patient_additional='padd', patientcode='42', equipment='eeg', admincode='420', gender='Male', startdate=startdate,birthdate='05.09.1980') - annotations = [[50, -1, 'begin'],[150, -1, 'end']] + annotations = [[0.01, -1, u'begin'],[0.5, -1, u'middle'],[10, -1, u'end']] header['annotations'] = annotations signal_headers1 = highlevel.make_signal_headers(['ch'+str(i) for i in range(5)]) signals = np.random.rand(5, 256*300)*200 #5 minutes of eeg @@ -41,6 +41,7 @@ def test_read_write_edf(self): self.assertEqual(len(signals2), len(signal_headers2)) for shead1, shead2 in zip(signal_headers1, signal_headers2): self.assertDictEqual(shead1, shead2) + self.assertDictEqual(header, header2) np.testing.assert_allclose(signals, signals2, atol=0.01) @@ -56,9 +57,15 @@ def test_read_write_edf(self): self.assertEqual(len(signals2), len(signal_headers2)) for shead1, shead2 in zip(signal_headers1, signal_headers2): self.assertDictEqual(shead1, shead2) + self.assertDictEqual(header, header2) np.testing.assert_array_equal(signals, signals2) + + # def test_annotations_accuracy(self): + + + if __name__ == '__main__': # run_module_suite(argv=sys.argv) unittest.main() From ec6ec41513dd6db13479e28d2ed82fbafabe90e8 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Thu, 13 Feb 2020 15:40:41 +0100 Subject: [PATCH 24/24] write annotations after signals, else breaks EDF --- pyedflib/highlevel.py | 4 +++- pyedflib/tests/test_highlevel.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index 17059e1..34eb7cb 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -413,14 +413,16 @@ def write_edf(edf_file, signals, signal_headers, header, digital=False): header = default_header annotations = header.get('annotations', '') + print(annotations) with pyedflib.EdfWriter(edf_file, n_channels=n_channels) as f: f.setSignalHeaders(signal_headers) f.setHeader(header) + f.writeSamples(signals, digital=digital) for annotation in annotations: f.writeAnnotation(*annotation) - f.writeSamples(signals, digital=digital) del f + return os.path.isfile(edf_file) diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py index 4a1bfdc..dfd77e8 100644 --- a/pyedflib/tests/test_highlevel.py +++ b/pyedflib/tests/test_highlevel.py @@ -25,7 +25,7 @@ def test_read_write_edf(self): patientname='name', patient_additional='padd', patientcode='42', equipment='eeg', admincode='420', gender='Male', startdate=startdate,birthdate='05.09.1980') - annotations = [[0.01, -1, u'begin'],[0.5, -1, u'middle'],[10, -1, u'end']] + annotations = [[0.01, -1, 'begin'],[0.5, -1, 'middle'],[10, -1, 'end']] header['annotations'] = annotations signal_headers1 = highlevel.make_signal_headers(['ch'+str(i) for i in range(5)]) signals = np.random.rand(5, 256*300)*200 #5 minutes of eeg