diff --git a/rt1/__init__.py b/rt1/__init__.py index 8ef7656b..54cf6eb2 100644 --- a/rt1/__init__.py +++ b/rt1/__init__.py @@ -2,5 +2,5 @@ Import module for RT1 module """ -__version__ = '1.1.9' +__version__ = '1.2' __author__ = 'Raphael Quast' diff --git a/rt1/general_functions.py b/rt1/general_functions.py index b44bc025..bdbf663b 100644 --- a/rt1/general_functions.py +++ b/rt1/general_functions.py @@ -276,3 +276,55 @@ def groupby_unsorted(a, key=lambda x: x, sort=False, get=lambda x: x): return OrderedDict(sorted(d.items())) else: return d + + +def interpolate_to_index(data, index, data_index=None, **interp1d_kwargs): + ''' + A wrapper around scipy.interp1d to interpolate a dataset to a given index + + Parameters + ---------- + data : list, array-like, pandas.Series or pandas.DataFrame + The input-data as list, array, pandas.Series or pandas.DataFrame + If the data is provided as pandas Series or DataFrame, the index + must support a method .to_julian_date() to convert the timestamps + into numerical values. + index : array-like + the index to which the dataset should be interpolated. + It must support a method .to_julian_date() + data_index : TYPE, optional + DESCRIPTION. The default is None. + **interp1d_kwargs : + additional keyword-arguments passed to scipy.interpolate.interp1d + the default is (fill_value=None, bounds_error=False) + + Returns + ------- + TYPE + DESCRIPTION. + + ''' + from pandas import Series, DataFrame + from scipy.interpolate import interp1d + + kwargs = dict(fill_value=None, bounds_error=False) + kwargs.update(interp1d_kwargs) + + if isinstance(data, Series): + # perform a linear interpolation to the timestamps of the auxiliary data + f = interp1d(data.index.to_julian_date(), data.values, **kwargs) + x = f(index.to_julian_date()) + return Series(x, index) + elif isinstance(data, DataFrame): + f = interp1d(data.index.to_julian_date(), data.values, axis=0, + **kwargs) + x = f(index.to_julian_date()) + return DataFrame(x, index, columns=data.columns) + + elif isinstance(data, (list, np.ndarray)): + assert data_index is not None, ('you must provide "data_index"' + + 'if data is provided as list or array') + + f = interp1d(data_index.to_julian_date(), data.values, **kwargs) + x = f(index.to_julian_date()) + return Series(x, index) diff --git a/rt1/processing_config.py b/rt1/processing_config.py index 9434a2bb..a580747c 100644 --- a/rt1/processing_config.py +++ b/rt1/processing_config.py @@ -192,7 +192,7 @@ def preprocess(self, **kwargs): return - def reader(self, **reader_arg): + def reader(self, reader_arg): ''' a function that is called for each site to obtain the dataset ''' diff --git a/rt1/rtparse.py b/rt1/rtparse.py index edfc8343..4832c7e4 100644 --- a/rt1/rtparse.py +++ b/rt1/rtparse.py @@ -225,13 +225,17 @@ def _parse_dict(self, section, int_keys=[], float_keys=[], bool_keys=[], #assert inp[key].startswith('['), f'{key} must start with "[" ' #assert inp[key].endswith(']'), f'{key} must end with "]" ' #val = inp[key][1:-1].replace(' ', '').split(',') - if inp[key] is None: + if inp[key] == 'None': val = [] else: val = inp[key].replace(' ', '').split(',') val = [i for i in val if len(i) > 0] else: - val = inp[key] + # parse None as "real" None + if inp[key] == 'None': + val = None + else: + val = inp[key] parsed_dict[key] = val return parsed_dict @@ -432,6 +436,10 @@ def get_process_specs(self): process_specs = dict() for key, val in inp.items(): + # parse None as a 'real' None and not as the string 'None' + if val == 'None': + val = None + if key.startswith('datetime__'): date = dict(zip(['s', 'fmt'], [i.strip() for i in val.split('fmt=')])) diff --git a/rt1/rtprocess.py b/rt1/rtprocess.py index 3aeec2a5..58a0ebfc 100644 --- a/rt1/rtprocess.py +++ b/rt1/rtprocess.py @@ -4,6 +4,8 @@ from itertools import repeat import ctypes import sys +from textwrap import dedent +import warnings from pathlib import Path import shutil @@ -140,8 +142,8 @@ def __init__(self, config_path=None, autocontinue=False, def setup(self, copy=True): ''' perform necessary tasks to run a processing-routine - - initialize the folderstructure - - copy modules and .ini files (if copy=True) + - initialize the folderstructure (only from MainProcess!) + - copy modules and .ini files (if copy=True) (only from MainProcess!) - load modules and set parent-fit-object Parameters @@ -151,6 +153,7 @@ def setup(self, copy=True): the dumppath/cfg folder or not. The default is True. ''' if self._config_path is not None and self._proc_cls is None: + self.config_path = Path(self._config_path) assert self.config_path.exists(), (f'the file {self.config_path} ' + 'does not exist!') @@ -160,9 +163,10 @@ def setup(self, copy=True): # update specs with init_kwargs for key, val in self.init_kwargs.items(): if key in self.cfg.config['PROCESS_SPECS']: - print(f'"{key} = {self.cfg.config["PROCESS_SPECS"][key]}"', - 'will be overwritten by the definition provided via', - f'"init_kwargs": "{key} = {val}" ') + warnings.warn( + f'"{key} = {self.cfg.config["PROCESS_SPECS"][key]}"' + + 'will be overwritten by the definition provided via' + + f'"init_kwargs": "{key} = {val}" ') # update the parsed config (for import of modules etc.) self.cfg.config['PROCESS_SPECS'][key] = val @@ -170,31 +174,38 @@ def setup(self, copy=True): self.dumppath = specs['save_path'] / specs['dumpfolder'] - if self.autocontinue is False: - if self.dumppath.exists(): - def remove_folder(): - shutil.rmtree(specs['save_path'] / specs['dumpfolder']) - print(f'"{specs["save_path"] / specs["dumpfolder"]}"', - '\nhas successfully been removed.\n') - - _confirm_input( - msg=(f'the path \n "{self.dumppath}"\n' + - ' already exists...' + - '\n- to continue type YES or Y' + - '\n- to abort type NO or N' + - '\n- to remove the existing directory and all' + - 'subdirectories type REMOVE \n \n'), - callbackdict={'REMOVE':[ - (f'\n"{self.dumppath}"\n will be removed!' + - ' are you sure? (y, n): '), - remove_folder]}) - - # initialize the folderstructure - _make_folderstructure(specs['save_path'] / specs['dumpfolder'], - ['results', 'cfg', 'dumps']) - - if copy is True: - self._copy_cfg_and_modules() + + if mp.current_process().name == 'MainProcess': + # initialize the folderstructure and copy the files only from the main process + if self.autocontinue is False: + + if self.dumppath.exists(): + def remove_folder(): + shutil.rmtree( + specs['save_path'] / specs['dumpfolder']) + warnings.warn( + f'"{specs["save_path"]/specs["dumpfolder"]}"' + + '\nhas successfully been removed.\n') + + _confirm_input( + msg=(f'the path \n "{self.dumppath}"\n' + + ' already exists...' + + '\n- to continue type YES or Y' + + '\n- to abort type NO or N' + + '\n- to remove the existing directory and ' + + 'all subdirectories type REMOVE \n \n'), + callbackdict={'REMOVE':[ + (f'\n"{self.dumppath}"\n will be removed!' + + ' are you sure? (y, n): '), + remove_folder]}) + + # initialize the folderstructure + _make_folderstructure(specs['save_path'] / specs['dumpfolder'], + ['results', 'cfg', 'dumps']) + + + if copy is True: + self._copy_cfg_and_modules() # load the processing-class if 'processing_cfg_module' in specs: @@ -207,10 +218,12 @@ def remove_folder(): else: proc_class_name = 'processing_cfg' + # load ALL modules to ensure that the importer finds them procmodule = self.cfg.get_all_modules( load_copy=copy)[proc_module_name] - print(f'processing config class "{proc_class_name}" will be ' + - f'imported from \n"{procmodule}"') + + warnings.warn(f'processing config class "{proc_class_name}"' + + f' will be imported from \n"{procmodule}"') self.proc_cls = getattr(procmodule, proc_class_name)(**specs) @@ -239,16 +252,17 @@ def _copy_cfg_and_modules(self): # from the copied file copypath = self.dumppath / 'cfg' / self.cfg.configpath.name if (copypath).exists(): - print(f'the file \n"{copypath / self.cfg.configpath.name}"\n' + - 'already exists... NO copying is performed and the ' + - 'existing one is used!\n') + warnings.warn( + f'the file \n"{copypath / self.cfg.configpath.name}"\n' + + 'already exists... NO copying is performed and the ' + + 'existing one is used!\n') else: if len(self.init_kwargs) == 0: # if no init_kwargs have been provided, copy the # original file shutil.copy(self.cfg.configpath, copypath.parent) - print(f'"{self.cfg.configpath.name}" copied to\n' + - f'"{copypath.parent}"') + warnings.warn(f'"{self.cfg.configpath.name}" copied to\n' + + f'"{copypath.parent}"') else: # if init_kwargs have been provided, write the updated # config to the folder @@ -256,9 +270,10 @@ def _copy_cfg_and_modules(self): self.cfg.configpath.name, 'w') as file: self.cfg.config.write(file) - print(f'the config-file "{self.cfg.configpath}" has been', - ' updated with the init_kwargs and saved to', - f'"{copypath.parent / self.cfg.configpath.name}"') + warnings.warn( + f'the config-file "{self.cfg.configpath}" has been' + + ' updated with the init_kwargs and saved to' + + f'"{copypath.parent / self.cfg.configpath.name}"') # remove the config and re-read the config from the copied path del self.cfg @@ -278,12 +293,61 @@ def _copy_cfg_and_modules(self): copypath = self.dumppath / 'cfg' / location.name if copypath.exists(): - print(f'the file \n"{copypath}" \nalready ' + - 'exists ... NO copying is performed ' + - 'and the existing one is used!\n') + warnings.warn(f'the file \n"{copypath}" \nalready ' + + 'exists ... NO copying is performed ' + + 'and the existing one is used!\n') else: shutil.copy(location, copypath) - print(f'"{location.name}" copied to \n"{copypath}"') + warnings.warn( + f'"{location.name}" copied to \n"{copypath}"') + + @staticmethod + def _increase_cnt(process_cnt, start, err=False): + if process_cnt is None: + return + + if err is False: + p_totcnt, p_meancnt, p_max, p_time, p_ncpu = process_cnt + end = default_timer() + # increase the total counter + p_totcnt.value += 1 + + # update the estimate of the mean time needed to process a site + p_time.value = (p_meancnt.value * p_time.value + + (end - start)) / (p_meancnt.value + 1) + # increase the mean counter + p_meancnt.value += 1 + # get the remaining time and update the progressbar + remain = timedelta( + seconds = (p_max - p_totcnt.value) / p_ncpu * p_time.value) + d,h,m,s = dt_to_hms(remain) + + update_progress( + p_totcnt.value, p_max, + title=f"approx. {d} {h:02}:{m:02}:{s:02} remaining", + finalmsg="finished! " + \ + f"({p_max} [{p_totcnt.value - p_meancnt.value}] fits)", + progress2=p_totcnt.value - p_meancnt.value) + else: + p_totcnt, p_meancnt, p_max, p_time, p_ncpu = process_cnt + # only increase the total counter + p_totcnt.value += 1 + if p_meancnt.value == 0: + title=f"{'estimating time ...':<28}" + else: + # get the remaining time and update the progressbar + remain = timedelta( + seconds = (p_max - p_totcnt.value + ) / p_ncpu * p_time.value) + d,h,m,s = dt_to_hms(remain) + title=f"approx. {d} {h:02}:{m:02}:{s:02} remaining" + + update_progress( + p_totcnt.value, p_max, + title=title, + finalmsg="finished! " + \ + f"({p_max} [{p_totcnt.value - p_meancnt.value}] fits)", + progress2=p_totcnt.value - p_meancnt.value) def _evalfunc(self, reader_arg=None, process_cnt=None): @@ -306,7 +370,7 @@ def _evalfunc(self, reader_arg=None, process_cnt=None): start = default_timer() try: # if a reader (and no dataset) is provided, use the reader - read_data = self.proc_cls.reader(**reader_arg) + read_data = self.proc_cls.reader(reader_arg) # check for multiple return values and split them accordingly # (any value beyond the first is appended as aux_data) if isinstance(read_data, pd.DataFrame): @@ -332,6 +396,7 @@ def _evalfunc(self, reader_arg=None, process_cnt=None): # append reader_arg fit.reader_arg = reader_arg + self._increase_cnt(process_cnt, start, err=False) # if a post-processing function is provided, return its output, # else return the fit-object directly @@ -340,54 +405,18 @@ def _evalfunc(self, reader_arg=None, process_cnt=None): else: ret = fit - if process_cnt is not None: - p_totcnt, p_meancnt, p_max, p_time, p_ncpu = process_cnt - end = default_timer() - # increase the total counter - p_totcnt.value += 1 - - # update the estimate of the mean time needed to process a site - p_time.value = (p_meancnt.value * p_time.value - + (end - start)) / (p_meancnt.value + 1) - # increase the mean counter - p_meancnt.value += 1 - # get the remaining time and update the progressbar - remain = timedelta( - seconds = (p_max - p_totcnt.value) / p_ncpu * p_time.value) - d,h,m,s = dt_to_hms(remain) - update_progress( - p_totcnt.value, p_max, - title=f"approx. {d} {h:02}:{m:02}:{s:02} remaining", - finalmsg="finished! " + \ - f"({p_max} [{p_totcnt.value - p_meancnt.value}] fits)", - progress2=p_totcnt.value - p_meancnt.value) - return ret except Exception as ex: - if process_cnt is not None: - p_totcnt, p_meancnt, p_max, p_time, p_ncpu = process_cnt - # only increase the total counter - p_totcnt.value += 1 - if p_meancnt.value == 0: - title=f"{'estimating time ...':<28}" - else: - # get the remaining time and update the progressbar - remain = timedelta( - seconds = (p_max - p_totcnt.value - ) / p_ncpu * p_time.value) - d,h,m,s = dt_to_hms(remain) - title=f"approx. {d} {h:02}:{m:02}:{s:02} remaining" - - update_progress( - p_totcnt.value, p_max, - title=title, - finalmsg="finished! " + \ - f"({p_max} [{p_totcnt.value - p_meancnt.value}] fits)", - progress2=p_totcnt.value - p_meancnt.value) if callable(self.proc_cls.exceptfunc): - return self.proc_cls.exceptfunc(ex, reader_arg) + ex_ret = self.proc_cls.exceptfunc(ex, reader_arg) + if ex_ret is None or ex_ret is False: + self._increase_cnt(process_cnt, start, err=True) + else: + self._increase_cnt(process_cnt, start, err=False) + + return ex_ret else: raise ex @@ -522,6 +551,13 @@ def processfunc(self, ncpu=1, print_progress=True, res = res_async.get() else: print('start of single-core evaluation') + + # call the initializer if it has been provided + if 'initializer' in pool_kwargs: + if 'initargs' in pool_kwargs: + pool_kwargs['initializer'](*pool_kwargs['initargs']) + else: + pool_kwargs['initializer']() res = [] for reader_arg in reader_args: res.append(self._evalfunc(reader_arg=reader_arg, @@ -592,14 +628,23 @@ def run_processing(self, ncpu=1, copy=True, print_progress=True, if self.dumppath is not None: with open(self.dumppath / 'cfg' / 'model_definition.txt', 'w') as file: - print(self.parent_fit._model_definition, file=file) + + outtxt = '' + if hasattr(self.proc_cls, 'description'): + outtxt += dedent(self.proc_cls.description) + outtxt += '\n\n' + outtxt += '_'*77 + outtxt += '\n\n' + + outtxt += self.parent_fit._model_definition + + print(outtxt, file=file) _ = self.processfunc(ncpu=ncpu, print_progress=print_progress, reader_args=reader_args, pool_kwargs=pool_kwargs, preprocess_kwargs=preprocess_kwargs) - class RTresults(object): ''' A class to provide easy access to processed results. @@ -830,5 +875,3 @@ def NetCDF_variables(self): else: print(f'{key:<{space + 7}}', val.dimensions) - - diff --git a/tests/parallel_processing_config.py b/tests/parallel_processing_config.py index e65521b6..510da043 100644 --- a/tests/parallel_processing_config.py +++ b/tests/parallel_processing_config.py @@ -16,7 +16,7 @@ class processing_cfg(rt1_processing_config): def __init__(self, **kwargs): super().__init__(**kwargs) - def reader(self, **reader_arg): + def reader(self, reader_arg): self.check_dump_exists(reader_arg)