diff --git a/.gitignore b/.gitignore index 3a2644eb..0ea7b5b6 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,8 @@ htmlcov/ .settings .project .vscode/ +.vipenv/ +justfile # scripts _pypi.sh diff --git a/vip_hci/fits/fits.py b/vip_hci/fits/fits.py index 9667c3ac..3d027ac7 100644 --- a/vip_hci/fits/fits.py +++ b/vip_hci/fits/fits.py @@ -1,26 +1,30 @@ #! /usr/bin/env python - """ Module with various fits handling functions. """ -__author__ = 'Carlos Alberto Gomez Gonzalez' -__all__ = ['open_fits', - 'info_fits', - 'write_fits', - 'verify_fits', - 'byteswap_array'] +__author__ = "Carlos Alberto Gomez Gonzalez" +__all__ = ["open_fits", "info_fits", "write_fits", "verify_fits", "byteswap_array"] import os import numpy as np from astropy.io import fits as ap_fits - - -def open_fits(fitsfilename, n=0, header=False, ignore_missing_end=False, - precision=np.float32, return_memmap=False, verbose=True, - **kwargs): +from astropy.io.fits import HDUList +from ..var.paramenum import ALL_FITS + + +def open_fits( + fitsfilename, + n=0, + get_header=False, + ignore_missing_end=False, + precision=np.float32, + return_memmap=False, + verbose=True, + **kwargs, +): """ Load a fits file into memory as numpy array. @@ -29,7 +33,8 @@ def open_fits(fitsfilename, n=0, header=False, ignore_missing_end=False, fitsfilename : string or pathlib.Path Name of the fits file or ``pathlib.Path`` object n : int, optional - It chooses which HDU to open. Default is the first one. + It chooses which HDU to open. Default is the first one. If n is equal + to -2, opens and returns all extensions. header : bool, optional Whether to return the header along with the data or not. precision : numpy dtype, optional @@ -50,42 +55,99 @@ def open_fits(fitsfilename, n=0, header=False, ignore_missing_end=False, Returns ------- - hdulist : hdulist - [memmap=True] FITS file ``n`` hdulist. - data : numpy ndarray - [memmap=False] Array containing the frames of the fits-cube. - header : dict + hdulist : HDU or HDUList + [memmap=True] FITS file ``n`` hdulist. If n equals -2, returns the whole + hdulist. + data : numpy ndarray or list of numpy ndarrays + [memmap=False] Array containing the frames of the fits-cube. If n + equals -2, returns a list of all arrays. + header : dict or list of dict [memmap=False, header=True] Dictionary containing the fits header. + If n equals -2, returns a list of all dictionnaries. """ fitsfilename = str(fitsfilename) if not os.path.isfile(fitsfilename): - fitsfilename += '.fits' + fitsfilename += ".fits" + + hdulist = ap_fits.open( + fitsfilename, ignore_missing_end=ignore_missing_end, memmap=True, **kwargs + ) + + # Opening all extensions in a MEF + if n == ALL_FITS: + data_list = [] + header_list = [] + if return_memmap: + return hdulist + + for index, element in enumerate(hdulist): + data, header = _return_data_fits( + hdulist=hdulist, + index=index, + precision=precision, + verbose=verbose, + ) + data_list.append(data) + header_list.append(header) - hdulist = ap_fits.open(fitsfilename, ignore_missing_end=ignore_missing_end, - memmap=True, **kwargs) - - if return_memmap: - return hdulist[n] - else: - data = hdulist[n].data - data = np.array(data, dtype=precision) hdulist.close() - if header: - header = hdulist[n].header + if get_header: if verbose: - print("Fits HDU-{} data and header successfully loaded. " - "Data shape: {}".format(n, data.shape)) - return data, header + print(f"All fits HDU data and headers succesfully loaded.") + return data_list, header_list else: if verbose: - print("Fits HDU-{} data successfully loaded. " - "Data shape: {}".format(n, data.shape)) + print(f"All fits HDU data succesfully loaded.") + return data_list + # Opening only a specified extension + else: + if return_memmap: + return hdulist[n] + + data, header = _return_data_fits( + hdulist=hdulist, + index=n, + precision=precision, + verbose=verbose, + ) + hdulist.close() + if get_header: + return data, header + else: return data +def _return_data_fits( + hdulist: HDUList, + index: int, + precision=np.float32, + verbose: bool = True, +): + """ + Subfunction used to return data (and header) from a given index. + + Parameters + ---------- + hdulist : HDUList + List of FITS cubes with their headers. + index : int + The wanted index to extract. + """ + data = hdulist[index].data + data = np.array(data, dtype=precision) + header = hdulist[index].header + + if verbose: + print( + f"Fits HDU-{index} data successfully loaded, header available. " + f"Data shape: {data.shape}" + ) + return data, header + + def byteswap_array(array): - """ FITS files are stored in big-endian byte order. All modern CPUs are + """FITS files are stored in big-endian byte order. All modern CPUs are little-endian byte order, so at some point you have to byteswap the data. Some FITS readers (cfitsio, the fitsio python module) do the byteswap when reading the data from disk to memory, so we get numpy arrays in native @@ -153,8 +215,14 @@ def verify_fits(fitsfilename): f.verify() -def write_fits(fitsfilename, array, header=None, output_verify='exception', - precision=np.float32, verbose=True): +def write_fits( + fitsfilename, + array, + header=None, + output_verify="exception", + precision=np.float32, + verbose=True, +): """ Write array and header into FITS file. @@ -181,20 +249,20 @@ def write_fits(fitsfilename, array, header=None, output_verify='exception', """ - if not fitsfilename.endswith('.fits'): - fitsfilename += '.fits' + if not fitsfilename.endswith(".fits"): + fitsfilename += ".fits" res = "saved" if os.path.exists(fitsfilename): os.remove(fitsfilename) - res = 'overwritten' + res = "overwritten" if isinstance(array, tuple): new_hdul = ap_fits.HDUList() if header is None: - header = [None]*len(array) + header = [None] * len(array) elif not isinstance(header, tuple): - header = [header]*len(array) + header = [header] * len(array) elif len(header) != len(array): msg = "If input header is a tuple, it should have the same length " msg += "as tuple of arrays." diff --git a/vip_hci/objects/postproc.py b/vip_hci/objects/postproc.py index a24f3424..17e8c855 100644 --- a/vip_hci/objects/postproc.py +++ b/vip_hci/objects/postproc.py @@ -31,7 +31,9 @@ from ..config.utils_conf import algo_calculates_decorator as calculates from ..config.utils_conf import Saveable from ..var import frame_center -from ..var.object_utils import print_algo_params +from ..var.object_utils import print_algo_params, dict_to_fitsheader, fitsheader_to_dict +from ..var.paramenum import ALL_FITS +from ..fits import write_fits, open_fits PROBLEMATIC_ATTRIBUTE_NAMES = ["_repr_html_"] LAST_SESSION = -1 @@ -45,6 +47,7 @@ "scale_list": "wavelengths", "psf": "psfn", } +PREFIX = "postproc_" @dataclass @@ -66,7 +69,7 @@ class Session: # TODO: find a proper format for results saving (pdf, images, dictionnaries...) @dataclass -class PPResult(Saveable): +class PPResult: """ Container for results of post-processing algorithms. @@ -79,10 +82,24 @@ class PPResult(Saveable): sessions: List = field(default_factory=lambda: []) + def __init__(self, load_from_path: str = None): + """ + Create a PPResult object or load one from a FITS file. + + Parameters + ---------- + load_from_path : str, optional + Path of FITS file to optionally load a previously saved PPResult + object from. + """ + self.sessions = [] + if load_from_path is not None: + self.fits_to_results(filepath=load_from_path) + def register_session( self, frame: np.ndarray, - algo_name: Optional[dict] = None, + algo_name: Optional[str] = None, params: Optional[dict] = None, snr_map: Optional[np.ndarray] = None, ) -> None: @@ -102,9 +119,13 @@ def register_session( """ # If frame is already registered in a session, add the associated snr_map only for session in self.sessions: - if (frame == session.frame).all() and snr_map is not None: - session.snr_map = snr_map - return + if session.frame.shape == frame.shape: + if ( + np.allclose(np.abs(session.frame), np.abs(frame), atol=1e-3) + and snr_map is not None + ): + session.snr_map = snr_map + return # TODO: review filter_params to only target cube and angles, not all ndarrays # TODO: rename angles-type parameters in all procedural functions @@ -160,6 +181,91 @@ def show_session_results( " a session with the function `register_session`." ) + def results_to_fits(self, filepath: str) -> None: + """ + Save all configurations as a fits file. + + Parameters + ---------- + filepath: str + The path of the FITS file. + """ + if self.sessions: + images = [] + headers = [] + for _, session in enumerate(self.sessions): + cube = None + # Stacks both frame and detection map (if any), else only frame + if session.snr_map is not None: + cube = np.stack((session.frame, session.snr_map), axis=0) + else: + cube = session.frame + images.append(cube) + session.parameters["algo_name"] = session.algo_name + # Adding a specific prefix to identify the PostProc parameters when + # extracting the header + prefix_dict = { + PREFIX + key: value for key, value in session.parameters.items() + } + fits_header = dict_to_fitsheader(prefix_dict) + headers.append(fits_header) + + write_fits( + fitsfilename=filepath, array=tuple(images), header=tuple(headers) + ) + + print(f"Results saved successfully to {filepath} !") + else: + raise AttributeError( + "No session was registered yet. Please register" + " a session with the function `register_session`." + ) + + def fits_to_results(self, filepath: str, session_id: int = ALL_FITS) -> None: + """ + Load all configurations from a fits file. + + Parameters + ---------- + filepath: str + The path of the FITS file. + """ + data, header = open_fits(fitsfilename=filepath, n=session_id, get_header=True) + self.sessions = [] + if session_id == ALL_FITS: + for index, element in enumerate(data): + frame = None + snr_map = None + parameters, algo_name = fitsheader_to_dict( + initial_header=header[index], sort_by_prefix=PREFIX + ) + # Both frame and detmap were saved + if element.ndim == 3: + frame = element[0] + snr_map = element[1] + # Frame only + else: + frame = element + self.register_session( + frame=frame, algo_name=algo_name, params=parameters, snr_map=snr_map + ) + else: + frame = None + snr_map = None + parameters, algo_name = fitsheader_to_dict( + initial_header=header, sort_by_prefix=PREFIX + ) + # Both frame and detmap were saved + if data.ndim == 3: + frame = data[0] + snr_map = data[1] + # Frame only + else: + frame = data + self.register_session( + frame=frame, algo_name=algo_name, params=parameters, snr_map=snr_map + ) + def _show_single_session( self, session_id: Optional[int], @@ -279,7 +385,10 @@ def _create_parameters_dict(self, parent_class: any) -> dict: def print_parameters(self) -> None: """Print out the parameters of the algorithm.""" for key, value in self.__dict__.items(): - print(f"{key} : {value}") + if not isinstance(value, np.ndarray): + print(f"{key} : {value}") + else: + print(f"{key} : numpy ndarray (not shown)") # TODO: write test def compute_significance(self, source_xy: Tuple[float] = None) -> None: diff --git a/vip_hci/objects/ppandromeda.py b/vip_hci/objects/ppandromeda.py index 34fccfa4..f3780706 100644 --- a/vip_hci/objects/ppandromeda.py +++ b/vip_hci/objects/ppandromeda.py @@ -60,6 +60,7 @@ def run( Print some parameter values for control. """ + self.snr_map = None self._update_dataset(dataset) if self.dataset.fwhm is None: diff --git a/vip_hci/objects/ppfmmf.py b/vip_hci/objects/ppfmmf.py index fbeb06c0..8b30a44f 100644 --- a/vip_hci/objects/ppfmmf.py +++ b/vip_hci/objects/ppfmmf.py @@ -48,6 +48,7 @@ def run( If True prints to stdout intermediate info. """ + self.snr_map = None self._update_dataset(dataset) if self.dataset.fwhm is None: diff --git a/vip_hci/objects/ppframediff.py b/vip_hci/objects/ppframediff.py index 78a4cc2e..1f88c2e2 100644 --- a/vip_hci/objects/ppframediff.py +++ b/vip_hci/objects/ppframediff.py @@ -58,6 +58,7 @@ def run( ``vip_hci.preproc.frame_rotate``). """ + self.snr_map = None self._update_dataset(dataset) if self.dataset.fwhm is None: diff --git a/vip_hci/objects/ppllsg.py b/vip_hci/objects/ppllsg.py index 613a6b5e..702ce826 100644 --- a/vip_hci/objects/ppllsg.py +++ b/vip_hci/objects/ppllsg.py @@ -64,6 +64,7 @@ def run( ``vip_hci.preproc.frame_rotate``) """ + self.snr_map = None self._update_dataset(dataset) self._explicit_dataset() diff --git a/vip_hci/objects/pploci.py b/vip_hci/objects/pploci.py index 0bca8b53..ded032f5 100644 --- a/vip_hci/objects/pploci.py +++ b/vip_hci/objects/pploci.py @@ -65,6 +65,7 @@ def run( ``vip_hci.preproc.frame_rotate``) """ + self.snr_map = None self._update_dataset(dataset) if self.dataset.fwhm is None: diff --git a/vip_hci/objects/ppmediansub.py b/vip_hci/objects/ppmediansub.py index 459907a9..ec10b39a 100644 --- a/vip_hci/objects/ppmediansub.py +++ b/vip_hci/objects/ppmediansub.py @@ -72,6 +72,7 @@ def run( ``vip_hci.preproc.frame_rotate``). """ + self.snr_map = None self._update_dataset(dataset) if self.mode == "annular" and self.dataset.fwhm is None: diff --git a/vip_hci/objects/ppnmf.py b/vip_hci/objects/ppnmf.py index cbc8e2e3..2a08a715 100644 --- a/vip_hci/objects/ppnmf.py +++ b/vip_hci/objects/ppnmf.py @@ -86,6 +86,7 @@ def run( ``vip_hci.preproc.frame_rotate``). """ + self.snr_map = None self._update_dataset(dataset) if self.dataset.fwhm is None: diff --git a/vip_hci/objects/pppca.py b/vip_hci/objects/pppca.py index d2180bbb..42dac65c 100644 --- a/vip_hci/objects/pppca.py +++ b/vip_hci/objects/pppca.py @@ -109,6 +109,7 @@ class PPPCA(PostProc, PCAParams, PCAAnnParams): pc_list: List = None opt_number_pc: int = None # Single annulus parameters + annulus_width: float = None r_guess: float = None # TODO: write test @@ -124,7 +125,6 @@ class PPPCA(PostProc, PCAParams, PCAAnnParams): "final_residuals_cube", "medians", "dataframe", - "pc_list", "opt_number_pc", ) def run( @@ -175,6 +175,7 @@ def run( ``vip_hci.preproc.frame_rotate``) """ + self.snr_map = None self._update_dataset(dataset) if self.dataset.fwhm is None: raise ValueError("`fwhm` has not been set") @@ -232,7 +233,6 @@ def run( ( self.cube_residuals, - self.pc_list, self.frame_final, self.dataframe, self.opt_number_pc, diff --git a/vip_hci/preproc/parangles.py b/vip_hci/preproc/parangles.py index 0b66b174..b03e31a1 100644 --- a/vip_hci/preproc/parangles.py +++ b/vip_hci/preproc/parangles.py @@ -1,5 +1,4 @@ #! /usr/bin/env python - """ Module with frame parallactica angles calculations and de-rotation routines for ADI. @@ -10,15 +9,17 @@ | *Book* | `https://ui.adsabs.harvard.edu/abs/1998aalg.book.....M/abstract `_ - + """ -__author__ = 'V. Christiaens @ UChile/ULg, Carlos Alberto Gomez Gonzalez' -__all__ = ['compute_paral_angles', - 'compute_derot_angles_pa', - 'compute_derot_angles_cd', - 'check_pa_vector'] +__author__ = "V. Christiaens @ UChile/ULg, Carlos Alberto Gomez Gonzalez" +__all__ = [ + "compute_paral_angles", + "compute_derot_angles_pa", + "compute_derot_angles_cd", + "check_pa_vector", +] import math import numpy as np @@ -30,8 +31,9 @@ from astropy.units import hourangle, degree -def compute_paral_angles(header, latitude, ra_key, dec_key, lst_key, - acqtime_key, date_key='DATE-OBS'): +def compute_paral_angles( + header, latitude, ra_key, dec_key, lst_key, acqtime_key, date_key="DATE-OBS" +): """Calculates the parallactic angle for a frame, taking coordinates and local sidereal time from fits-headers (frames taken in an alt-az telescope with the image rotator off). @@ -55,13 +57,14 @@ def compute_paral_angles(header, latitude, ra_key, dec_key, lst_key, pa.value : float Parallactic angle in degrees for current header (frame). """ - obs_epoch = Time(header[date_key], format='iso', scale='utc') + obs_epoch = Time(header[date_key], format="iso", scale="utc") # equatorial coordinates in J2000 ra = header[ra_key] dec = header[dec_key] - coor = sky_coordinate.SkyCoord(ra=ra, dec=dec, unit=(hourangle, degree), - frame=FK5, equinox='J2000.0') + coor = sky_coordinate.SkyCoord( + ra=ra, dec=dec, unit=(hourangle, degree), frame=FK5, equinox="J2000.0" + ) # recalculate for DATE-OBS (precession) coor_curr = coor.transform_to(FK5(equinox=obs_epoch)) @@ -69,11 +72,11 @@ def compute_paral_angles(header, latitude, ra_key, dec_key, lst_key, ra_curr = coor_curr.ra dec_curr = coor_curr.dec - lst_split = header[lst_key].split(':') - lst = float(lst_split[0])+float(lst_split[1])/60+float(lst_split[2])/3600 + lst_split = header[lst_key].split(":") + lst = float(lst_split[0]) + float(lst_split[1]) / 60 + float(lst_split[2]) / 3600 exp_delay = (header[acqtime_key] * 0.5) / 3600 # solar to sidereal time - exp_delay = exp_delay*1.0027 + exp_delay = exp_delay * 1.0027 # hour angle in degrees hour_angle = (lst + exp_delay) * 15 - ra_curr.deg @@ -81,20 +84,30 @@ def compute_paral_angles(header, latitude, ra_key, dec_key, lst_key, latitude = np.deg2rad(latitude) # PA formula from Astronomical Algorithms - pa = -np.rad2deg(np.arctan2(-np.sin(hour_angle), np.cos(dec_curr) * - np.tan(latitude) - np.sin(dec_curr) * np.cos(hour_angle))) + pa = -np.rad2deg( + np.arctan2( + -np.sin(hour_angle), + np.cos(dec_curr) * np.tan(latitude) - np.sin(dec_curr) * np.cos(hour_angle), + ) + ) - #if dec_curr.value > latitude: pa = (pa.value + 360) % 360 + # if dec_curr.value > latitude: pa = (pa.value + 360) % 360 return pa.value -def compute_derot_angles_pa(objname_tmp_A, digit_format=3, objname_tmp_B='', - inpath='./', writing=False, outpath='./', - list_obj=None, - PosAng_st_key='HIERARCH ESO ADA POSANG', - PosAng_nd_key='HIERARCH ESO ADA POSANG END', - verbose=False): +def compute_derot_angles_pa( + objname_tmp_A, + digit_format=3, + objname_tmp_B="", + inpath="./", + writing=False, + outpath="./", + list_obj=None, + PosAng_st_key="HIERARCH ESO ADA POSANG", + PosAng_nd_key="HIERARCH ESO ADA POSANG END", + verbose=False, +): """ Function that returns a numpy vector of angles to derotate datacubes so as to match North up, East left, based on the mean of the Position Angle at @@ -168,40 +181,41 @@ def compute_derot_angles_pa(objname_tmp_A, digit_format=3, objname_tmp_B='', posang_nd = [] def _fitsfile(ii): - return "{}{}{:0{}d}{}.fits".format(inpath, objname_tmp_A, ii, - digit_format, objname_tmp_B) + return "{}{}{:0{}d}{}.fits".format( + inpath, objname_tmp_A, ii, digit_format, objname_tmp_B + ) if list_obj is None: list_obj = [] for ii in range(10**digit_format): if os.path.exists(_fitsfile(ii)): list_obj.append(ii) - _, header = open_fits(_fitsfile(ii), verbose=False, header=True) + _, header = open_fits(_fitsfile(ii), verbose=False, get_header=True) posang_st.append(header[PosAng_st_key]) posang_nd.append(header[PosAng_nd_key]) else: for ii in list_obj: - _, header = open_fits(_fitsfile(ii), verbose=False, header=True) + _, header = open_fits(_fitsfile(ii), verbose=False, get_header=True) posang_st.append(header[PosAng_st_key]) posang_nd.append(header[PosAng_nd_key]) # Write the vector containing parallactic angles rot = np.zeros(len(list_obj)) for ii in range(len(list_obj)): - rot[ii] = -(posang_st[ii]+posang_nd[ii])/2 + rot[ii] = -(posang_st[ii] + posang_nd[ii]) / 2 # Check and correct to output at the right format - rot = check_pa_vector(rot, 'deg') + rot = check_pa_vector(rot, "deg") if verbose: print("This is the list of angles to be applied: ") for ii in range(len(list_obj)): - print(ii, ' -> ', rot[ii]) + print(ii, " -> ", rot[ii]) if writing: - if outpath == '' or outpath is None: + if outpath == "" or outpath is None: outpath = inpath - f = open(outpath+'Parallactic_angles.txt', 'w') + f = open(outpath + "Parallactic_angles.txt", "w") for ii in range(len(list_obj)): print(rot[ii], file=f) f.close() @@ -209,11 +223,21 @@ def _fitsfile(ii): return rot -def compute_derot_angles_cd(objname_tmp_A, digit_format=3, objname_tmp_B='', - inpath='./', skew=False, writing=False, - outpath='./', list_obj=None, cd11_key='CD1_1', - cd12_key='CD1_2', cd21_key='CD2_1', - cd22_key='CD2_2', verbose=False): +def compute_derot_angles_cd( + objname_tmp_A, + digit_format=3, + objname_tmp_B="", + inpath="./", + skew=False, + writing=False, + outpath="./", + list_obj=None, + cd11_key="CD1_1", + cd12_key="CD1_2", + cd21_key="CD2_1", + cd22_key="CD2_2", + verbose=False, +): """ Function that returns a numpy vector of angles to derotate datacubes so as to match North up, East left, based on the CD matrix information contained @@ -302,29 +326,30 @@ def compute_derot_angles_cd(objname_tmp_A, digit_format=3, objname_tmp_B='', cd2_2 = [] def _fitsfile(ii): - return "{}{}{:0{}d}{}.fits".format(inpath, objname_tmp_A, ii, - digit_format, objname_tmp_B) + return "{}{}{:0{}d}{}.fits".format( + inpath, objname_tmp_A, ii, digit_format, objname_tmp_B + ) if list_obj is None: list_obj = [] for ii in range(10**digit_format): if os.path.exists(_fitsfile(ii)): list_obj.append(ii) - _, header = open_fits(_fitsfile(ii), verbose=False, header=True) + _, header = open_fits(_fitsfile(ii), verbose=False, get_header=True) cd1_1.append(header[cd11_key]) cd1_2.append(header[cd12_key]) cd2_1.append(header[cd21_key]) cd2_2.append(header[cd22_key]) else: for ii in list_obj: - _, header = open_fits(_fitsfile(ii), verbose=False, header=True) + _, header = open_fits(_fitsfile(ii), verbose=False, get_header=True) cd1_1.append(header[cd11_key]) cd1_2.append(header[cd12_key]) cd2_1.append(header[cd21_key]) cd2_2.append(header[cd22_key]) # Determine if it's a right- or left-handed coord system from first cube - det = cd1_1[0]*cd2_2[0]-cd1_2[0]*cd2_1[0] + det = cd1_1[0] * cd2_2[0] - cd1_2[0] * cd2_1[0] if det < 0: sgn = -1 else: @@ -338,31 +363,31 @@ def _fitsfile(ii): rot[ii] = 0 rot2[ii] = 0 else: - rot[ii] = -np.arctan2(sgn*cd1_2[ii], sgn*cd1_1[ii]) + rot[ii] = -np.arctan2(sgn * cd1_2[ii], sgn * cd1_1[ii]) rot2[ii] = -np.arctan2(-cd2_1[ii], cd2_2[ii]) if rot2[ii] < 0: - rot2[ii] = 2*math.pi + rot2[ii] + rot2[ii] = 2 * math.pi + rot2[ii] if np.floor(rot[ii]) != np.floor(rot2[ii]): msg = "There is more than 1deg skewness between y and x! " msg2 = "Please re-run the function with argument skew=True" - raise ValueError(msg+msg2) + raise ValueError(msg + msg2) # Check and correct to output at the right format - rot = check_pa_vector(rot, 'rad') + rot = check_pa_vector(rot, "rad") if skew: - rot2 = check_pa_vector(rot2, 'rad') + rot2 = check_pa_vector(rot2, "rad") if verbose: print("This is the list of angles to be applied: ") for ii in range(len(cd1_1)): - print(ii, ' -> ', rot[ii]) + print(ii, " -> ", rot[ii]) if skew: - print('rot2: ', ii, ' -> ', rot2[ii]) + print("rot2: ", ii, " -> ", rot2[ii]) if writing: - if outpath == '' or outpath is None: + if outpath == "" or outpath is None: outpath = inpath - f = open(outpath+'Parallactic_angles.txt', 'w') + f = open(outpath + "Parallactic_angles.txt", "w") if skew: for ii in range(len(cd1_1)): print(rot[ii], rot2[ii], file=f) @@ -377,8 +402,8 @@ def _fitsfile(ii): return rot -def check_pa_vector(angle_list, unit='deg'): - """ Checks if the angle list has the right format to avoid any bug in the +def check_pa_vector(angle_list, unit="deg"): + """Checks if the angle list has the right format to avoid any bug in the pca-adi algorithm. The right format complies to 3 criteria: 1. angles are expressed in degree @@ -401,24 +426,24 @@ def check_pa_vector(angle_list, unit='deg'): the 3 criteria, if needed) """ angle_list = angle_list.copy() - if unit != 'rad' and unit != 'deg': + if unit != "rad" and unit != "deg": raise ValueError("The input unit should either be 'deg' or 'rad'") npa = angle_list.shape[0] for ii in range(npa): - if unit == 'rad': + if unit == "rad": angle_list[ii] = np.rad2deg(angle_list[ii]) if angle_list[ii] < 0: - angle_list[ii] = 360+angle_list[ii] + angle_list[ii] = 360 + angle_list[ii] correct = False - #sorted_rot = np.sort(angle_list) + # sorted_rot = np.sort(angle_list) # Check if there is a jump > 180deg within the angle list - for ii in range(npa-1): + for ii in range(npa - 1): # if abs(sorted_rot[ii+1]-sorted_rot[ii]) > 180: - if abs(angle_list[ii+1]-angle_list[ii]) > 180: + if abs(angle_list[ii + 1] - angle_list[ii]) > 180: correct = True break @@ -426,6 +451,6 @@ def check_pa_vector(angle_list, unit='deg'): if correct: for ii in range(npa): if angle_list[ii] < 180: - angle_list[ii] = 360+angle_list[ii] + angle_list[ii] = 360 + angle_list[ii] return angle_list diff --git a/vip_hci/psfsub/pca_fullfr.py b/vip_hci/psfsub/pca_fullfr.py index 3a167122..e5eccc2a 100644 --- a/vip_hci/psfsub/pca_fullfr.py +++ b/vip_hci/psfsub/pca_fullfr.py @@ -332,7 +332,6 @@ def pca(*all_args: List, **all_kwargs: dict): """ # Separating the parameters of the ParamsObject from the optionnal rot_options - print(f"kwargs : {all_kwargs}") class_params, rot_options = separate_kwargs_dict( initial_kwargs=all_kwargs, parent_class=PCAParams diff --git a/vip_hci/var/object_utils.py b/vip_hci/var/object_utils.py index d44779c3..e568c983 100644 --- a/vip_hci/var/object_utils.py +++ b/vip_hci/var/object_utils.py @@ -2,8 +2,10 @@ from collections import OrderedDict from inspect import signature from typing import Callable +from typing import Tuple import numpy as np +from astropy.io.fits import Header KWARGS_EXCEPTIONS = ["param"] @@ -156,3 +158,64 @@ def separate_kwargs_dict(initial_kwargs: dict, parent_class: any) -> None: more_params[key] = value return class_params, more_params + + +def dict_to_fitsheader(initial_dict: dict) -> Header: + """ + Convert a dictionnary into a fits Header object. + + Parameters + ---------- + initial_dict: dict + Dictionnary of parameters to convert to a Header object. + + Returns + ------- + fits_header: Header + Converted set of parameters. + """ + fits_header = Header() + for key, value in initial_dict.items(): + fits_header[key] = value + + return fits_header + + +def fitsheader_to_dict( + initial_header: Header, sort_by_prefix: str = "" +) -> Tuple[dict, str]: + """ + Extract a dictionnary of parameters and a string from a FITS Header. + + The string is supposedly the name of the algorithm that was used to obtain + the results that go with the Header. + + Parameters + ---------- + initial_header : Header + HDU Header that contains parameters used for the run of an algorithm + through PostProc, and some unwanted parameters as well. + sort_by_prefix : str + String that will help filter keys of the header that don't start with + that same string. By default, it doesn't filter out anything. + + Returns + ------- + parameters : dict + The set of parameters saved in PPResult that was used for a run of + an algorithm. + algo_name : str + The name of the algorithm that was saved alongside its parameters. + """ + head_dict = dict(initial_header) + # Some parameters get their keys converted to uppercase + lowercase_dict = {key.lower(): value for key, value in head_dict.items()} + # Sorting parameters that don't belong in the dictionnary + parameters = { + key[len(sort_by_prefix) :]: value + for key, value in lowercase_dict.items() + if key.startswith(sort_by_prefix) + } + algo_name = parameters["algo_name"] + del parameters["algo_name"] + return parameters, algo_name diff --git a/vip_hci/var/paramenum.py b/vip_hci/var/paramenum.py index 206f0f56..26e74af5 100644 --- a/vip_hci/var/paramenum.py +++ b/vip_hci/var/paramenum.py @@ -1,8 +1,9 @@ -"""Module containing enums for parameters of HCI algorithms.""" +"""Module containing enums for parameters of HCI algorithms and literal constants.""" from enum import auto from enum import Enum ALGO_KEY = "algo_params" +ALL_FITS = -2 class SvdMode(str, Enum):