diff --git a/Ska/engarchive/derived/base.py b/Ska/engarchive/derived/base.py index 3f65c566..c523d0de 100644 --- a/Ska/engarchive/derived/base.py +++ b/Ska/engarchive/derived/base.py @@ -1,7 +1,6 @@ # Licensed under a 3-clause BSD style license - see LICENSE.rst from Chandra.Time import DateTime -from .. import fetch, fetch_eng import Ska.Numpy import numpy as np from .. import cache @@ -35,6 +34,8 @@ def calc(self, data): raise NotImplementedError def fetch(self, start, stop): + from .. import fetch + unit_system = fetch.get_units() # cache current units and restore after fetch fetch.set_units(self.unit_system) dataset = fetch.MSIDset(self.rootparams, start, stop) @@ -89,6 +90,7 @@ def fetch(self, start, stop): return dataset def __call__(self, start, stop): + from .. import fetch_eng dataset = fetch_eng.MSIDset(self.rootparams, start, stop, filter_bad=True) # Translate state codes "ON" and "OFF" to 1 and 0, respectively. diff --git a/Ska/engarchive/derived/comps.py b/Ska/engarchive/derived/comps.py new file mode 100644 index 00000000..0d2a6f2c --- /dev/null +++ b/Ska/engarchive/derived/comps.py @@ -0,0 +1,424 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +""" +Support computed MSIDs in the cheta archive. + +- Base class ComputedMsid for user-generated comps. +- Cleaned MUPS valve temperatures MSIDs: '(pm2thv1t|pm1thv2t)_clean'. +- Commanded states 'cmd_state__
' for any kadi commanded state value. + +See: https://nbviewer.jupyter.org/urls/cxc.harvard.edu/mta/ASPECT/ipynb/misc/DAWG-mups-valve-xija-filtering.ipynb +""" # noqa + +import re + +import numpy as np +from Chandra.Time import DateTime + +from ..units import converters as unit_converter_funcs + +__all__ = ['ComputedMsid', 'Comp_MUPS_Valve_Temp_Clean', 'Comp_KadiCommandState'] + + +def calc_stats_vals(msid, rows, indexes, interval): + """ + Compute statistics values for ``msid`` over specified intervals. + This is a very slightly modified version of the same function from + update_archive.py. However, cannot directly import that because + it has side effects that break everything, probably related to + enabling caching. + + The mods here are basically to take out handling of state codes + and turn a warning about negative dts into an exception. + + :param msid: Msid object (filter_bad=True) + :param rows: Msid row indices corresponding to stat boundaries + :param indexes: Universal index values for stat (row times // dt) + :param interval: interval name (5min or daily) + + :returns: np.recarray of stats values + """ + import scipy.stats + + quantiles = (1, 5, 16, 50, 84, 95, 99) + n_out = len(rows) - 1 + + # Check if data type is "numeric". Boolean values count as numeric, + # partly for historical reasons, in that they support funcs like + # mean (with implicit conversion to float). + msid_dtype = msid.vals.dtype + msid_is_numeric = issubclass(msid_dtype.type, (np.number, np.bool_)) + + # If MSID data is unicode, then for stats purposes cast back to bytes + # by creating the output array as a like-sized S-type array. + if msid_dtype.kind == 'U': + msid_dtype = re.sub(r'U', 'S', msid.vals.dtype.str) + + # Predeclare numpy arrays of correct type and sufficient size for accumulating results. + out = {} + out['index'] = np.ndarray((n_out,), dtype=np.int32) + out['n'] = np.ndarray((n_out,), dtype=np.int32) + out['val'] = np.ndarray((n_out,), dtype=msid_dtype) + + if msid_is_numeric: + out['min'] = np.ndarray((n_out,), dtype=msid_dtype) + out['max'] = np.ndarray((n_out,), dtype=msid_dtype) + out['mean'] = np.ndarray((n_out,), dtype=np.float32) + + if interval == 'daily': + out['std'] = np.ndarray((n_out,), dtype=msid_dtype) + for quantile in quantiles: + out['p{:02d}'.format(quantile)] = np.ndarray((n_out,), dtype=msid_dtype) + + i = 0 + for row0, row1, index in zip(rows[:-1], rows[1:], indexes[:-1]): + vals = msid.vals[row0:row1] + times = msid.times[row0:row1] + + n_vals = len(vals) + if n_vals > 0: + out['index'][i] = index + out['n'][i] = n_vals + out['val'][i] = vals[n_vals // 2] + if msid_is_numeric: + if n_vals <= 2: + dts = np.ones(n_vals, dtype=np.float64) + else: + dts = np.empty(n_vals, dtype=np.float64) + dts[0] = times[1] - times[0] + dts[-1] = times[-1] - times[-2] + dts[1:-1] = ((times[1:-1] - times[:-2]) + + (times[2:] - times[1:-1])) / 2.0 + negs = dts < 0.0 + if np.any(negs): + times_dts = [(DateTime(t).date, dt) + for t, dt in zip(times[negs], dts[negs])] + raise ValueError('WARNING - negative dts in {} at {}' + .format(msid.MSID, times_dts)) + + # Clip to range 0.001 to 300.0. The low bound is just there + # for data with identical time stamps. This shouldn't happen + # but in practice might. The 300.0 represents 5 minutes and + # is the largest normal time interval. Data near large gaps + # will get a weight of 5 mins. + dts.clip(0.001, 300.0, out=dts) + sum_dts = np.sum(dts) + + out['min'][i] = np.min(vals) + out['max'][i] = np.max(vals) + out['mean'][i] = np.sum(dts * vals) / sum_dts + if interval == 'daily': + # biased weighted estimator of variance (N should be big enough) + # http://en.wikipedia.org/wiki/Mean_square_weighted_deviation + sigma_sq = np.sum(dts * (vals - out['mean'][i]) ** 2) / sum_dts + out['std'][i] = np.sqrt(sigma_sq) + quant_vals = scipy.stats.mstats.mquantiles(vals, np.array(quantiles) / 100.0) + for quant_val, quantile in zip(quant_vals, quantiles): + out['p%02d' % quantile][i] = quant_val + + i += 1 + + return np.rec.fromarrays([x[:i] for x in out.values()], names=list(out.keys())) + + +class ComputedMsid: + """Base class for cheta computed MSID. + + Sub-classes must define at least the following: + + * ``msid_match`` class attribute as a regex to match for the MSID. + * ``get_msid_attrs()`` method to perform the computation and return + a dict with the result. + + Optionally: + + * ``units`` attribute to specify unit handling. + + See the fetch tutorial Computed MSIDs section for details. + """ + # Global dict of registered computed MSIDs + msid_classes = [] + + # Base units specification (None implies no unit handling) + units = None + + def __init__(self, unit_system='eng'): + self.unit_system = unit_system + + def __init_subclass__(cls, **kwargs): + """Validate and register ComputedMSID subclass.""" + super().__init_subclass__(**kwargs) + + if not hasattr(cls, 'msid_match'): + raise ValueError(f'comp {cls.__name__} must define msid_match') + + cls.msid_classes.append(cls) + + @classmethod + def get_matching_comp_cls(cls, msid): + """Get computed classes that match ``msid`` + + :param msid: str, input msid + :returns: first ComputedMsid subclass that matches ``msid`` or None + """ + for comp_cls in ComputedMsid.msid_classes: + match = re.match(comp_cls.msid_match + '$', msid, re.IGNORECASE) + if match: + return comp_cls + + return None + + # These four properties are provided as a convenience because the module + # itself cannot import fetch because this is circular. + @property + def fetch_eng(self): + """Fetch in TDB engineering units like DEGF""" + from .. import fetch_eng + return fetch_eng + + @property + def fetch_sci(self): + """Fetch in scientific units like DEGC""" + from .. import fetch_sci + return fetch_sci + + @property + def fetch_cxc(self): + """Fetch in CXC (FITS standard) units like K""" + from .. import fetch + return fetch + + @property + def fetch_sys(self): + """Fetch in the unit system specified for the class""" + fetch = getattr(self, f'fetch_{self.unit_system}') + return fetch + + def __call__(self, tstart, tstop, msid, interval=None): + """Emulate the fetch.MSID() API, but return a dict of MSID attributes. + + The returned dict turned into a proper MSID object by the upstream caller + `fetch.MSID._get_comp_data()`. + + :param tstart: float, start time (CXC seconds) + :param tstop: float, stop time (CXC seconds) + :param msid: str, MSID name + :param interval: str or None, stats interval (None, '5min', 'daily') + + :returns: dict of MSID attributes including 'times', 'vals', 'bads' + """ + # Parse any arguments from the input `msid` + match = re.match(self.msid_match, msid, re.IGNORECASE) + if not match: + raise RuntimeError(f'unexpected mismatch of {msid} with {self.msid_match}') + match_args = [arg.lower() for arg in match.groups()] + + if interval is None: + # Call the actual user-supplied work method to compute the MSID values + msid_attrs = self.get_msid_attrs(tstart, tstop, msid.lower(), match_args) + + for attr in ('vals', 'bads', 'times', 'unit'): + if attr not in msid_attrs: + raise ValueError(f'computed MSID {self.__class__.__name__} failed ' + f'to set required attribute {attr}') + + # Presence of a non-None `units` class attribute means that the MSID has + # units that should be converted to `self.unit_system` if required, where + # unit_system is 'cxc', 'sci', or 'eng'. + if self.units is not None: + msid_attrs = self.convert_units(msid_attrs) + else: + msid_attrs = self.get_stats_attrs(tstart, tstop, msid.lower(), match_args, + interval) + + return msid_attrs + + def convert_units(self, msid_attrs): + """ + Convert required elements of ``msid_attrs`` to ``self.unit_system``. + + Unit_system can be one of 'cxc', 'sci', 'eng'. + + :param msid_attrs: dict, input MSID attributes + :param unit_system: str, unit system + + :returns: dict, converted MSID attributes + """ + unit_current = self.units[self.units['internal_system']] + unit_new = self.units[self.unit_system] + + out = msid_attrs.copy() + out['unit'] = unit_new + + if unit_current != unit_new: + for attr in self.units['convert_attrs']: + out[attr] = unit_converter_funcs[unit_current, unit_new](msid_attrs[attr]) + + return out + + def get_msid_attrs(self, tstart, tstop, msid, msid_args): + """Get the attributes required for this MSID. + + Get attributes for computed MSID, which must include at least + ``vals``, ``bads``, ``times``, and may include additional attributes. + + This MUST be supplied by sub-classes. + + :param tstart: start time (CXC secs) + :param tstop: stop time (CXC secs) + :param msid: full MSID name e.g. tephin_plus_5 + :param msid_args: tuple of regex match groups (msid_name,) + :returns: dict of MSID attributes + """ + raise NotImplementedError('sub-class must implement get_msid_attrs()') + + def get_stats_attrs(self, tstart, tstop, msid, match_args, interval): + """Get 5-min or daily stats attributes. + + This is normally not overridden by sub-classes. + + :param tstart: start time (CXC secs) + :param tstop: stop time (CXC secs) + :param msid: full MSID name e.g. tephin_plus_5 + :param msid_args: tuple of regex match groups (msid_name,) + :returns: dict of MSID attributes + """ + from ..fetch import _plural + + # Replicate a stripped-down version of processing in update_archive. + # This produces a recarray with columns that correspond to the raw + # stats HDF5 files. + dt = {'5min': 328, + 'daily': 86400}[interval] + index0 = int(np.floor(tstart / dt)) + index1 = int(np.ceil(tstop / dt)) + tstart = (index0 - 1) * dt + tstop = (index1 + 1) * dt + + msid_obj = self.fetch_sys.Msid(msid, tstart, tstop) + + indexes = np.arange(index0, index1 + 1) + times = indexes * dt # This is the *start* time of each bin + + if len(times) > 0: + rows = np.searchsorted(msid_obj.times, times) + vals_stats = calc_stats_vals(msid_obj, rows, indexes, interval) + else: + raise ValueError() + + # Replicate the name munging that fetch does going from the HDF5 columns + # to what is seen in a stats fetch query. + out = {} + for key in vals_stats.dtype.names: + out_key = _plural(key) if key != 'n' else 'samples' + out[out_key] = vals_stats[key] + out['times'] = (vals_stats['index'] + 0.5) * dt + out['bads'] = np.zeros(len(vals_stats), dtype=bool) + out['midvals'] = out['vals'] + out['vals'] = out['means'] + out['unit'] = msid_obj.unit + + return out + +############################################################################ +# Built-in computed MSIDs +############################################################################ + + +class Comp_MUPS_Valve_Temp_Clean(ComputedMsid): + """Computed MSID for cleaned MUPS valve temps PM2THV1T, PM1THV2T + + This uses the cleaning method demonstrated in the following notebook + to return a version of the MUPS valve temperature comprised of + telemetry values that are consistent with a thermal model. + + https://nbviewer.jupyter.org/urls/cxc.cfa.harvard.edu/mta/ASPECT/ipynb/misc/mups-valve-xija-filtering.ipynb + https://nbviewer.jupyter.org/urls/cxc.harvard.edu/mta/ASPECT/ipynb/misc/DAWG-mups-valve-xija-filtering.ipynb + + Allowed MSIDs are 'pm2thv1t_clean' and 'pm1thv2t_clean' (as always case is + not important). + """ + msid_match = r'(pm2thv1t|pm1thv2t)_clean' + + units = { + 'internal_system': 'eng', # Unit system for attrs from get_msid_attrs() + 'eng': 'DEGF', # Units for eng, sci, cxc systems + 'sci': 'DEGC', + 'cxc': 'K', + # Attributes that need conversion + 'convert_attrs': ['vals', 'vals_raw', 'vals_nan', 'vals_corr', 'vals_model'] + } + + def get_msid_attrs(self, tstart, tstop, msid, msid_args): + """Get attributes for computed MSID: ``vals``, ``bads``, ``times``, ``unit`` + + :param tstart: start time (CXC secs) + :param tstop: stop time (CXC secs) + :param msid: full MSID name e.g. pm2thv1t_clean + :param msid_args: tuple of regex match groups (msid_name,) + + :returns: dict of MSID attributes + """ + from .mups_valve import fetch_clean_msid + + # Get cleaned MUPS valve temperature data as an MSID object + dat = fetch_clean_msid(msid_args[0], tstart, tstop, + dt_thresh=5.0, median=7, model_spec=None) + + # Convert to dict as required by the get_msids_attrs API. `fetch_clean_msid` + # returns an MSID object with the following attrs. + attrs = ('vals', 'times', 'bads', 'vals_raw', 'vals_nan', + 'vals_corr', 'vals_model', 'source') + msid_attrs = {attr: getattr(dat, attr) for attr in attrs} + msid_attrs['unit'] = 'DEGF' + + return msid_attrs + + +class Comp_KadiCommandState(ComputedMsid): + """Computed MSID for kadi dynamic commanded states. + + The MSID here takes the form ``cmd_state__
`` where: + + * ``state_key`` is a valid commanded state key such as ``pitch`` or + ``pcad_mode`` or ``acisfp_temp``. + * ``dt`` is the sampling time expressed as a multiple of 1.025 sec + frames. + + Example MSID names:: + + 'cmd_state_pcad_mode_1': sample ``pcad_mode`` every 1.025 secs + 'cmd_state_acisfp_temp_32': sample ``acisfp_temp`` every 32.8 secs + """ + msid_match = r'cmd_state_(\w+)_(\d+)' + + def get_msid_attrs(self, tstart, tstop, msid, msid_args): + """Get attributes for computed MSID: ``vals``, ``bads``, ``times`` + + :param tstart: start time (CXC secs) + :param tstop: stop time (CXC secs) + :param msid: full MSID name e.g. cmd_state_pitch_clean + :param msid_args: tuple of regex match groups: (state_key, dt) + + :returns: dict of MSID attributes + """ + from kadi.commands.states import get_states + from Chandra.Time import date2secs + + state_key = msid_args[0] + dt = 1.025 * int(msid_args[1]) + states = get_states(tstart, tstop, state_keys=[state_key]) + + tstart = date2secs(states['datestart'][0]) + tstops = date2secs(states['datestop']) + + times = np.arange(tstart, tstops[-1], dt) + vals = states[state_key].view(np.ndarray) + + indexes = np.searchsorted(tstops, times) + + out = {'vals': vals[indexes], + 'times': times, + 'bads': np.zeros(len(times), dtype=bool), + 'unit': None} + + return out diff --git a/Ska/engarchive/derived/mups_valve.py b/Ska/engarchive/derived/mups_valve.py new file mode 100644 index 00000000..d6f49008 --- /dev/null +++ b/Ska/engarchive/derived/mups_valve.py @@ -0,0 +1,253 @@ +# coding: utf-8 + +""" + Fetch clean MUPS valve temperature telemetry + + This makes use of the temperature correction code provided by Scott Blanchard (to correct + for a resistor dropout in the thermistor data) and xija thermal model developed by + Matt Dahmer. + + The basic cleaning algorithm is very simple: + + - Fetch raw telemetry from the cheta archive. + - Compute a xija thermal model prediction for the same timespan (actually starting a few + days in advance to burn out uncertainty in the pseudo-node value). + - Accept either raw telemetry or corrected data which are within a tolerance (5 degF) of + the model. + - In the gaps where the model diverges from both raw and corrected temperatures, + "repropagate" the model starting from the last accepted temperature value. + This effectively takes out much of the systematic model error, which can be up to + 15 degF after a transition to a new attitude. + - In some cases this allows recovery of additional data, while in others the data are + not recoverable: either due to partial disconnect of the parallel resistor or full + disconnects where the output voltage exceeds 5.12 V. + + The output of this function is a `fetch.Msid` object with some bonus attributes, + documented below. In particular the output cleaned data are labeled so one knows + exactly where each data point came from. + + The function is fast, and one can get 5 years of cleaned telemetry in a few seconds + on a modern laptop with SSD drive. + + This cleaning technique recovers on average about 90% of data for PM2THV1T. Since + 2015, about 60% of telemetry is good (no dropout) while 30% is in a recoverable + fully-dropped state (and 10% is not recoverable). + + ``` + def fetch_clean_msid(msid, start, stop=None, dt_thresh=5.0, median=7, model_spec=None): + + Fetch a cleaned version of telemetry for ``msid``. + + If not supplied the model spec will be downloaded from github using this URL: + https://raw.githubusercontent.com/sot/chandra_models/master/chandra_models/xija/ + mups_valve/{msid}_spec.json + + This function returns a `fetch.Msid` object like a normal fetch but with extra attributes: + + - vals: cleaned telemetry (either original or corrected telemetry, or xija model prediction) + - source: label for each vals data point + - 0: unrecoverable, so use xija model value + - 1: original telemetry + - 2: corrected telemetry + - vals_raw: raw (uncleaned) telemetry + - vals_nan: cleaned telem but with np.nan at points where data are unrecoverable (this is + for plotting) + - vals_corr: telemetry with the MUPS correction applied + - vals_model: xija model prediction + + :param start: start time + :param stop: stop time (default=NOW) + :param dt_thresh: tolerance for matching model to data in degF (default=5 degF) + :param median: length of median filter (default=7, use 0 to disable) + :param model_spec: file name or URL containing relevant xija model spec + + :returns: fetch.Msid object + ``` +""" + +import os + +import numpy as np +import numba +from astropy.utils.data import download_file +from scipy.interpolate import interp1d +from scipy.ndimage import median_filter + +from cheta import fetch_eng +from xija import XijaModel + + +def c2f(degc): + degf = 32 + degc * 1.8 + return degf + + +# Define MUPS valve thermistor point pair calibration table (from TDB). +pp_counts = np.array([0, 27, 36, 44, 55, 70, 90, 118, 175, 195, 210, 219, 226, 231, 235, 255]) +pp_temps = np.array([369.5305, 263.32577, 239.03652, 222.30608, 203.6944, 183.2642, 161.0796, + 134.93818, 85.65725, 65.6537, 47.3176, 33.50622, 19.9373, 7.42435, -5.79635, + -111.77265]) + +count_to_degf = interp1d(pp_counts, pp_temps) +degf_to_counts = interp1d(pp_temps, pp_counts) + + +# Define MUPS valve thermistor voltage point pair calibration table. +def count_to_volts(counts): + return counts / 256 * 5.12 + + +def volts_to_counts(volts): + return volts / 5.12 * 256 + + +# Voltage and Temperature, with and without resistor (see flight note 447 for +# the source of these numbers). +volt_with_resistor = [4.153325779, 3.676396578, 3.175100371, 2.587948965, 2.435, 2.025223702, + 1.538506813, 1.148359251, 0.63128179, 0.354868907, 0.208375569] +volt_without_resistor = [28.223, 15, 9.1231, 5.5228, 4.87, 3.467, 2.249, + 1.5027, 0.7253, 0.38276, 0.21769] +volt_without_resistor_to_volt_with_resistor = interp1d(volt_without_resistor, + volt_with_resistor) + + +def get_corr_mups_temp(temp): + """ Calculate a MUPS valve thermistor corrected temperature. + Args: + temp (float, int): Temperature in Fahrenheit to which a correction will be applied. + + Returns: + (float): Corrected temperaure + + """ + # Convert observed temperature to voltage + count = degf_to_counts(temp) + volt = count_to_volts(count) + + # Convert voltage as read, assuming no resistor, to what it would be with the resistor + new_volt = volt_without_resistor_to_volt_with_resistor(volt) + + # Convert this new voltage to counts and, in turn, to a new temperature + new_count = volts_to_counts(new_volt) + new_temp = count_to_degf(new_count) + + return new_temp + + +@numba.autojit() +def select_using_model(data1, data2, model, dt_thresh, out, source): + """Select from either ``data1`` or ``data2`` using ``model`` to get clean data. + + This does a 2-step process: + + 1. Select either `data1` or `data2` where either is within `dt_thresh` of `model`. + 2. Fill in gaps where possible by propagating the model from previous good data, + thus reducing the impact of model errors. + + :param data1: ndarray of data (e.g. observed temperature telemetry) + :param data2: ndarray of data (e.g. corrected temperature telemetry) + :param model: ndarray with model prediction for data + :param dt_thresh: model error tolerance for selecting data1 or data2 + :param out: ndarray where output cleaned data is stored (must be same length as data) + :param source: ndarray where output source labeling is stored (same length as data) + """ + # Select all data where either data1 or data2 are within dt_thresh of model + for data, source_id in [(data1, 1), (data2, 2)]: + ok = np.abs(data - model) < dt_thresh + out[ok] = data[ok] + source[ok] = source_id + + # Fill in gaps where possible by propagating model from last good data value + for ii in range(len(data1)): + if source[ii] != 0: + # Already got a good value so move on. + continue + + if ii == 0: + model_prop_ii = model[0] + else: + # Propagate model using last point + delta-model + model_prop_ii = out[ii-1] + (model[ii] - model[ii-1]) # noqa + + if abs(data1[ii] - model_prop_ii) < dt_thresh: + out[ii] = data1[ii] + source[ii] = 1 + elif abs(data2[ii] - model_prop_ii) < dt_thresh: + out[ii] = data2[ii] + source[ii] = 2 + else: + # No good match, use propagated model value and mark as such + out[ii] = model_prop_ii + source[ii] = 0 + + +def fetch_clean_msid(msid, start, stop=None, dt_thresh=5.0, median=7, model_spec=None): + """Fetch a cleaned version of telemetry for ``msid``. + + If not supplied the model spec will be downloaded from github using this URL: + https://raw.githubusercontent.com/sot/chandra_models/master/chandra_models/xija/ + mups_valve/{msid}_spec.json + + This function returns a `fetch.Msid` object like a normal fetch but with extra attributes: + + - vals: cleaned telemetry (either original or corrected telemetry, or xija model prediction) + - source: label for each vals data point + - 0: unrecoverable, so use xija model value + - 1: original telemetry + - 2: corrected telemetry + - vals_raw: raw (uncleaned) telemetry + - vals_nan: cleaned telem but with np.nan at points where data are unrecoverable (this is + for plotting) + - vals_corr: telemetry with the MUPS correction applied + - vals_model: xija model prediction + + :param start: start time + :param stop: stop time (default=NOW) + :param dt_thresh: tolerance for matching model to data in degF (default=5 degF) + :param median: length of median filter (default=7, use 0 to disable) + :param model_spec: file name or URL containing relevant xija model spec + + :returns: fetch.Msid object + """ + if model_spec is None or not os.path.exists(model_spec): + url = (f'https://raw.githubusercontent.com/sot/chandra_models/master/chandra_models/xija/' + f'mups_valve/{msid}_spec.json') + model_spec = download_file(url, cache=True) + + # NOTE: all temperatures are in degF unless otherwise noted + + dat = fetch_eng.Msid(msid, start, stop) + + t_obs = dat.vals + if median: + t_obs = median_filter(t_obs, size=median) + t_corr = get_corr_mups_temp(t_obs) + + # Model is computed in degC + mdl = XijaModel(model_spec=model_spec, + start=dat.times[0] - 400000, + stop=dat.times[-1]) + mdl.comp['mups0'].set_data(75) # degC + mdl.make() + mdl.calc() + + # Interpolate model temperatures (converted to degF with c2f) onto telemetry times + t_model = np.interp(xp=mdl.times, fp=c2f(mdl.comp[msid].mvals), x=dat.times) + + out = np.zeros_like(dat.vals) + source = np.zeros(len(dat.vals), dtype=int) + select_using_model(t_obs, t_corr, t_model, dt_thresh=dt_thresh, out=out, source=source) + + dat.vals_raw = dat.vals + + dat.vals = out + dat.vals_nan = out.copy() + + dat.bads = (source == 0) + dat.vals_nan[dat.bads] = np.nan + + dat.source = source + dat.vals_corr = t_corr + dat.vals_model = t_model + + return dat diff --git a/Ska/engarchive/fetch.py b/Ska/engarchive/fetch.py index 6d5004ca..16ac817e 100644 --- a/Ska/engarchive/fetch.py +++ b/Ska/engarchive/fetch.py @@ -29,6 +29,7 @@ from . import cache from . import remote_access from .remote_access import ENG_ARCHIVE +from .derived.comps import ComputedMsid from . import __version__ # noqa from Chandra.Time import DateTime @@ -377,6 +378,11 @@ def msid_glob(msid): msids = collections.OrderedDict() MSIDS = collections.OrderedDict() + # First check if `msid` matches a computed class. This does not allow + # for globs, and here the output MSIDs is the single computed class. + if ComputedMsid.get_matching_comp_cls(msid) is not None: + return [msid], [msid.upper()] + sources = data_source.sources(include_test=False) for source in sources: ms, MS = _msid_glob(msid, source) @@ -401,7 +407,6 @@ def _msid_glob(msid, source): :param msid: input MSID glob :returns: tuple (msids, MSIDs) """ - source_msids = data_source.get_msids(source) MSID = msid.upper() @@ -578,6 +583,11 @@ def _get_data(self): logger.info('Getting data for %s between %s to %s', self.msid, self.datestart, self.datestop) + comp_cls = ComputedMsid.get_matching_comp_cls(self.msid) + if comp_cls: + self._get_comp_data(comp_cls) + return + # Avoid stomping on caller's filetype 'ft' values with _cache_ft() with _cache_ft(): ft['content'] = self.content @@ -620,6 +630,32 @@ def _get_data(self): # telemetry to existing CXC values. self._get_msid_data_from_maude(*args) + def _get_comp_data(self, comp_cls): + logger.info(f'Getting computed values for {self.msid}') + + # Do computation. This returns a dict of MSID attribute values. + attrs = comp_cls(self.units['system'])(self.tstart, self.tstop, self.msid, self.stat) + + # Allow upstream class to be a bit sloppy on times and include samples + # outside the time range. This can happen with classes that inherit + # from DerivedParameter. + ok = (attrs['times'] >= self.tstart) & (attrs['times'] <= self.tstop) + all_ok = np.all(ok) + + # List of "colnames", which is the ndarray attributes. There can be + # non-ndarray attributes that get returned, including typically 'unit'. + self.colnames = [attr for attr, val in attrs.items() + if (isinstance(val, np.ndarray) + and len(val) == len(attrs['times']))] + + # Apply attributes to self + for attr, val in attrs.items(): + if (not all_ok + and isinstance(val, np.ndarray) + and len(val) == len(attrs['times'])): + val = val[ok] + setattr(self, attr, val) + def _get_stat_data(self): """Do the actual work of getting stats values for an MSID from HDF5 files""" diff --git a/Ska/engarchive/tests/test_comps.py b/Ska/engarchive/tests/test_comps.py new file mode 100644 index 00000000..a6fda013 --- /dev/null +++ b/Ska/engarchive/tests/test_comps.py @@ -0,0 +1,199 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + +"""Test that computed MSIDs work as expected.""" + +import numpy as np +import pytest + +from .. import fetch_eng, fetch_sci, fetch as fetch_cxc +from ..derived.base import DerivedParameter +from ..derived.comps import ComputedMsid + +try: + import maude + date1 = '2016:001:00:00:00.1' + date2 = '2016:001:00:00:02.0' + maude.get_msids(msids='ccsdsid', start=date1, stop=date2) +except Exception: + HAS_MAUDE = False +else: + HAS_MAUDE = True + + +class Comp_Passthru(ComputedMsid): + """Pass MSID through unchanged (for checking that stats work)""" + msid_match = r'passthru_(\w+)' + + def get_msid_attrs(self, tstart, tstop, msid, msid_args): + dat = self.fetch_eng.MSID(msid_args[0], tstart, tstop) + + out = {'vals': dat.vals, + 'bads': dat.bads, + 'times': dat.times, + 'unit': dat.unit} + return out + + +class Comp_Val_Plus_Five(ComputedMsid): + """Silly base comp to add 5 to the value""" + msid_match = r'comp_(\w+)_plus_five' + + def get_msid_attrs(self, tstart, tstop, msid, msid_args): + dat = self.fetch_sys.MSID(msid_args[0], tstart, tstop) + + out = {'vals': dat.vals + 5, + 'bads': dat.bads, + 'times': dat.times, + 'unit': dat.unit} + return out + + +class Comp_CSS1_NPM_SUN(ComputedMsid, DerivedParameter): + """Coarse Sun Sensor Counts 1 filtered for NPM and SA Illuminated + + Defined as CSS-1 current converted back into counts + (AOCSSI1 * 4095 / 5.49549) when in NPM (AOPCADMD==1) and SA is illuminated + (AOSAILLM==1). Otherwise, "Bads" flag is set equal to one. + + """ + rootparams = ['aocssi1', 'aopcadmd', 'aosaillm'] + time_step = 1.025 + max_gap = 10.0 + msid_match = 'comp_css1_npm_sun' + + def get_msid_attrs(self, tstart, tstop, msid, msid_args): + # Get an interpolated MSIDset for rootparams + msids = self.fetch(tstart, tstop) + + # Do the computation and set bad values + npm_sun = ((msids['aopcadmd'].vals == 'NPNT') & + (msids['aosaillm'].vals == 'ILLM')) + msids.bads = msids.bads | ~npm_sun + css1_npm_sun = msids['aocssi1'].vals * 4095 / 5.49549 + + out = {'vals': css1_npm_sun, + 'times': msids.times, + 'bads': msids.bads, + 'unit': None} + return out + + +def test_comp_from_derived_parameter(): + """Test that on-the-fly comp gives same result as derived parameter from + same code. + """ + dat1 = fetch_eng.Msid('comp_css1_npm_sun', '2020:001', '2020:010') + dat2 = fetch_eng.Msid('dp_css1_npm_sun', '2020:001', '2020:010') + + for attr in ('vals', 'times', 'bads', 'unit'): + assert np.all(getattr(dat1, attr) == getattr(dat2, attr)) + + +@pytest.mark.parametrize('fetch,unit', [(fetch_eng, 'DEGF'), + (fetch_sci, 'DEGC'), + (fetch_cxc, 'K')]) +def test_simple_comp(fetch, unit): + dat1 = fetch.Msid('tephin', '2020:001', '2020:010') + dat2 = fetch.Msid('comp_tephin_plus_five', '2020:001', '2020:010') + assert np.all(dat1.vals + 5 == dat2.vals) + assert np.all(dat1.times == dat2.times) + assert np.all(dat1.bads == dat2.bads) + assert dat1.unit == dat2.unit + assert dat2.unit == unit + + +@pytest.mark.skipif("not HAS_MAUDE") +@pytest.mark.parametrize('fetch,unit', [(fetch_eng, 'DEGF'), + (fetch_sci, 'DEGC'), + (fetch_cxc, 'K')]) +def test_simple_comp_with_maude(fetch, unit): + with fetch.data_source('maude'): + dat1 = fetch.Msid('tephin', '2020:001', '2020:003') + dat2 = fetch.Msid('comp_tephin_plus_five', '2020:001', '2020:003') + assert np.all(dat1.vals + 5 == dat2.vals) + assert np.all(dat1.times == dat2.times) + assert np.all(dat1.bads == dat2.bads) + assert dat1.unit == dat2.unit + assert dat2.unit == unit + + +def test_mups_valve(): + colnames = ['vals', 'times', 'bads', 'vals_raw', + 'vals_nan', 'vals_corr', 'vals_model', 'source'] + + dat = fetch_eng.MSID('PM2THV1T_clean', '2020:001', '2020:010') + assert dat.unit == 'DEGF' + assert len(dat.vals) == 36661 + ok = dat.source != 0 + # Temps are reasonable for degF + assert np.all((dat.vals[ok] > 55) & (dat.vals[ok] < 220)) + assert np.count_nonzero(ok) == 34499 + assert dat.colnames == colnames + for attr in colnames: + assert len(dat.vals) == len(getattr(dat, attr)) + + dat = fetch_sci.Msid('PM2THV1T_clean', '2020:001', '2020:010') + assert dat.unit == 'DEGC' + ok = dat.source != 0 + # Temps are reasonable for degC + assert np.all((dat.vals[ok] > 10) & (dat.vals[ok] < 110)) + assert len(dat.vals) == 34499 # Some bad values + assert dat.colnames == colnames + for attr in colnames: + if attr != 'bads': + assert len(dat.vals) == len(getattr(dat, attr)) + + dat = fetch_cxc.MSID('PM1THV2T_clean', '2020:001', '2020:010') + ok = dat.source != 0 + # Temps are reasonable for K + assert np.all((dat.vals[ok] > 280) & (dat.vals[ok] < 380)) + assert len(dat.vals) == 36661 # Same as PM2THV1T + assert dat.colnames == colnames + for attr in colnames: + assert len(dat.vals) == len(getattr(dat, attr)) + + dat = fetch_eng.Msid('pm1thv2t_clean', '2020:001', '2020:010') + assert len(dat.vals) == 36240 # Some bad values + assert len(dat.source) == 36240 # Filtering applies to sources + assert dat.colnames == colnames + for attr in colnames: + if attr != 'bads': + assert len(dat.vals) == len(getattr(dat, attr)) + + +def test_cmd_states(): + start, stop = '2020:002:08:00:00', '2020:002:10:00:00' + dat = fetch_eng.Msid('cmd_state_pitch_1000', start, stop) + exp_vals = np.array([55.99128956, 55.8747053, 55.8747053, 90.66266599, + 159.06945155, 173.11528258, 173.11528258, 173.11528258]) + assert np.allclose(dat.vals, exp_vals) + assert type(dat.vals) is np.ndarray + assert np.allclose(np.diff(dat.times), 1025.0) + assert not np.any(dat.bads) + assert dat.unit is None + + dat = fetch_eng.Msid('cmd_state_pcad_mode_1000', start, stop) + exp_vals = np.array(['NPNT', 'NPNT', 'NPNT', 'NMAN', 'NMAN', 'NPNT', 'NPNT', 'NPNT']) + assert np.all(dat.vals == exp_vals) + assert type(dat.vals) is np.ndarray + assert np.allclose(np.diff(dat.times), 1025.0) + + +@pytest.mark.parametrize('stat', ['5min', 'daily']) +def test_stats(stat): + start, stop = '2020:001', '2020:010' + + dat = fetch_eng.Msid('pitch', start, stop, stat=stat) + datc = fetch_eng.Msid('passthru_pitch', start, stop, stat=stat) + + for attr in datc.colnames: + val = getattr(dat, attr) + valc = getattr(datc, attr) + if attr == 'bads': + assert val == valc + continue + assert val.dtype == valc.dtype + if val.dtype.kind == 'f': + assert np.allclose(val, valc) + else: + assert np.all(val == valc) diff --git a/Ska/engarchive/units.py b/Ska/engarchive/units.py index 221b5ad6..9ec8d157 100644 --- a/Ska/engarchive/units.py +++ b/Ska/engarchive/units.py @@ -93,6 +93,13 @@ def F_to_C(vals, delta_val=False): return (vals - 32.0) / 1.8 +def C_to_F(vals, delta_val=False): + if delta_val: + return vals * 1.8 + else: + return vals * 1.8 + 32 + + def C_to_K(vals, delta_val=False): if delta_val: return vals @@ -171,6 +178,7 @@ def divide(scale_factor, decimals=None): ('mm', 'TSCSTEP'): mult(1.0 / 0.00251431530156, decimals=3), ('mm', 'FASTEP'): mm_to_FASTEP, ('PWM', 'PWMSTEP'): mult(16), + ('DEGC', 'DEGF'): C_to_F, # Eng units to CXC or Sci ('DEGC', 'K'): C_to_K, diff --git a/doc/fetch_tutorial.rst b/doc/fetch_tutorial.rst index b6615322..2084ce02 100644 --- a/doc/fetch_tutorial.rst +++ b/doc/fetch_tutorial.rst @@ -1547,7 +1547,7 @@ local or remote. In order to use this option, the user must have a special key file ``ska_remote_access.json``placed at the root of the local Python installation folder. This is placed in the directory shown with ``import sys; print(sys.prefix)``. To get a copy of this file - contact Mark Baski or Tom Aldcroft. +contact Mark Baski or Tom Aldcroft. Remote access is controlled as follows: @@ -1559,3 +1559,11 @@ Remote access is controlled as follows: is *enabled* unless the system finds a local engineering data archive. It looks for data in either ``$SKA/data/eng_archive`` or ``$ENG_ARCHIVE``, where those refer to user-defined environment variables. + +Local cheta archive +=================== + +Instructions for creating, using, and maintaining a local cheta archive (on your +laptop for instance) are found at the `Tutorial for installing and maintaining a +cheta telemetry archive +`_. diff --git a/doc/pseudo_msids.rst b/doc/pseudo_msids.rst index 8483420c..503770f6 100644 --- a/doc/pseudo_msids.rst +++ b/doc/pseudo_msids.rst @@ -8,7 +8,8 @@ stream are also available in the archive. These are: * ACIS DEA housekeeping: status from the DEA (including detector focal plane temperature) * Ephemeris: predictive and definitive orbital (Chandra), solar, and lunar ephemeris values * SIM telemetry: SIM position and moving status -* Derived parameters: values computed from other MSIDs in the archive +* Derived parameters: values computed from other MSIDs in the archive and stored in archive files +* Computed parameters: values computed on the fly from other MSIDs ACIS DEA housekeeping -------------------------------------------------- @@ -656,3 +657,136 @@ Thermal :members: :undoc-members: +Computed MSIDs +-------------- + +Cheta provides support for on-the-fly computed MSIDs with the following features: + +* Designed for simplicity and ease of use by non-expert Ska3 users. +* Following a simple recipe in user code, new MSIDs are automatically registered + to fetch. +* Computed MSIDs can included embedded parameters to allow further customization + or application of a function to any other MSID. +* Support for 5-minute and daily stats also included. + +See the `DAWG computed MSIDs notebook +`_ +for a longer introduction to the topic with a number of examples. + +What's the advantage? +^^^^^^^^^^^^^^^^^^^^^ + +If you have a function of MSID values, what is the advantage of going through +this formalism to create a computed MSID instead of just using the function +output directly? + +* It gives you the entire fetch API! You get for free all the + `fetch bling `_ + like plotting, selecting intervals, kadi event integration, interpolation. +* If your computed MSID is useful to the community it is simple to add to the + released ``cheta`` package for other users. +* It allows creation of arbitrary MSIDs that can be used in xija models. Examples: + * ``pm2tv1t_clean`` as a ``Node`` component + * ``cmd_state_acisfp_temp_32`` as a telemetry input (``TelemData`` component). + +Example +^^^^^^^ + +This example is not terribly useful but illustrates the key concepts and +requirements for defining a computed MSID:: + + from cheta.derived.comps import ComputedMsid # Inherit from this class + + # Class name is arbitrary, but by convention start with `Comp_` + class Comp_Val_Plus_Offset(ComputedMsid): + """ + Computed MSID to add an integer offset to MSID value. + + MSID format is "_plus_", where is an existing MSID + in the archive and is an integer offset. + + """ + msid_match = r'(\w+)_plus_(\d+)' + + # `msid_match` is a class attribute that defines a regular expresion to + # match for this computed MSID. This must be defined and it must be + # unambiguous (not matching an existing MSID or other computed MSID). + # + # The two groups in parentheses specify the arguments and . + # These are passed to `get_msid_attrs` as msid_args[0] and msid_args[1]. + # The \w symbol means to match a-z, A-Z, 0-9 and underscore (_). + # The \d symbol means to match digits 0-9. + + def get_msid_attrs(self, tstart, tstop, msid, msid_args): + """ + Get attributes for computed MSID: ``vals``, ``bads``, ``times``, + ``unit``, ``raw_vals``, and ``offset``. The first four must always + be provided. + + :param tstart: start time (CXC secs) + :param tstop: stop time (CXC secs) + :param msid: full MSID name e.g. tephin_plus_5 + :param msid_args: tuple of regex match groups (msid_name,) + :returns: dict of MSID attributes + """ + # Process the arguments parsed from the MSID + msid msid_args[0] + offset = int(msid_args[1]) + + # Get the raw telemetry value in user-requested unit system + dat = self.fetch_sys.MSID(msid, tstart, tstop) + + # Do the computation + vals = dat.vals + offset + + # Return a dict with at least `vals`, `times`, `bads`, and `unit`. + # Additional attributes are allowed and will be set on the + # final MSID object. + out = {'vals': vals, + 'bads': dat.bads, + 'times': dat.times, + 'unit': dat.unit, + 'vals_raw': dat.vals, # Provide original values without offset + 'offset': offset # Provide the offset for reference + } + return out + +Units +^^^^^ + +Computed MSIDs should support units where applicable. In the example above +this was done by using ``self.fetch_sys`` in order to get the original data +in the user-requested system. In other words, if the user did a call +``dat = fetch_sci.Msid('msid_plus_8', '2010:001', '2010:002')``, then +``self.fetch_sys`` would translate to ``self.fetch_sci``. + +In some cases the unit handling may require additional specification. This +can happen if the computation needs to be done in a particular unit, as is +the case for the built-in +:class:`~Ska.engarchive.derived.comps.Comp_MUPS_Valve_Temp_Clean` class. +Here the class must define an additional ``units`` attribute with the +following structure:: + + units = { + # Unit system for attrs from get_msid_attrs(), one of 'eng', 'sci', 'cxc' + 'internal_system': 'eng', + + # Units for eng, sci, cxc systems + 'eng': 'DEGF', + 'sci': 'DEGC', + 'cxc': 'K', + + # Attributes that need conversion. At least `vals` but maybe others. + 'convert_attrs': ['vals'] + } + +The specified units must all be convertable using functions defined in the +``converters`` dict in the ``Ska.engarchive.units`` module. + +Built-in computed MSIDs and API +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + .. automodule:: Ska.engarchive.derived.comps + :members: + :undoc-members: +