From 834465cd23e2c89448ae1b2b688c0835a0deb248 Mon Sep 17 00:00:00 2001 From: Robert David Stein Date: Mon, 22 Apr 2024 15:59:34 -0700 Subject: [PATCH] Fix history/naming (#858) --- mirar/paths.py | 14 +- mirar/pipelines/winter/blocks.py | 134 +++++----- mirar/pipelines/winter/generator.py | 14 +- mirar/pipelines/winter/winter_pipeline.py | 6 +- mirar/pipelines/wirc/blocks.py | 21 +- mirar/processors/base_processor.py | 12 +- mirar/processors/csvlog.py | 6 +- .../processors/database/database_inserter.py | 2 +- .../processors/database/database_selector.py | 42 ++-- mirar/processors/sources/__init__.py | 6 + mirar/processors/sources/forced_photometry.py | 34 ++- mirar/processors/sources/namer.py | 50 +++- .../sources/sextractor_source_detector.py | 7 + mirar/processors/sources/source_detector.py | 5 + mirar/processors/sources/source_selector.py | 234 ++++++++++++++++++ mirar/processors/utils/__init__.py | 1 + mirar/processors/utils/image_selector.py | 35 +++ 17 files changed, 488 insertions(+), 135 deletions(-) create mode 100644 mirar/processors/sources/source_selector.py diff --git a/mirar/paths.py b/mirar/paths.py index 620db6afe..ca94a319c 100644 --- a/mirar/paths.py +++ b/mirar/paths.py @@ -20,11 +20,10 @@ doc_dir = base_code_dir.joinpath("docs/") # Load environment variables for .env file -variables_loaded = dotenv.load_dotenv() +VARIABLES_LOADED = dotenv.load_dotenv() -if variables_loaded: - info = "Environment variables were automatically loaded from .env file." - logger.info(info) +if VARIABLES_LOADED: + logger.info("Environment variables were automatically loaded from .env file.") _n_cpu = os.cpu_count() if _n_cpu is None: @@ -346,6 +345,13 @@ def get_astrometry_keys() -> list: EXPTIME_KEY, ] +core_source_fields = [ + CAND_RA_KEY, + CAND_DEC_KEY, + SOURCE_NAME_KEY, + SOURCE_HISTORY_KEY, +] + MONITOR_EMAIL_KEY = "WATCHDOG_EMAIL" MONITOR_PASSWORD_KEY = "WATCHDOG_EMAIL_PASSWORD" MONITOR_RECIPIENT_KEY = "WATCHDOG_EMAIL_RECIPIENTS" diff --git a/mirar/pipelines/winter/blocks.py b/mirar/pipelines/winter/blocks.py index ed4ccd8eb..0593dc15b 100644 --- a/mirar/pipelines/winter/blocks.py +++ b/mirar/pipelines/winter/blocks.py @@ -109,10 +109,7 @@ DatabaseImageInserter, DatabaseSourceInserter, ) -from mirar.processors.database.database_selector import ( - SelectSourcesWithMetadata, - SingleSpatialCrossmatchSource, -) +from mirar.processors.database.database_selector import SelectSourcesWithMetadata from mirar.processors.database.database_updater import ImageDatabaseMultiEntryUpdater from mirar.processors.flat import FlatCalibrator from mirar.processors.mask import ( # MaskAboveThreshold, @@ -128,6 +125,7 @@ CandidateNamer, CustomSourceTableModifier, ForcedPhotometryDetector, + SourceBatcher, SourceLoader, SourceWriter, ZOGYSourceDetector, @@ -140,6 +138,7 @@ ImageDebatcher, ImageLoader, ImagePlotter, + ImageRebatcher, ImageRejector, ImageSaver, ImageSelector, @@ -267,8 +266,7 @@ ("FIELDID", str(3944)), ("BOARD_ID", str(BOARD_ID)), ), - ImageDebatcher(), - ImageBatcher("STACKID"), + ImageRebatcher("STACKID"), ] # mask @@ -292,8 +290,7 @@ save_raw = [ ImageSaver(output_dir_name="raw_unpacked", write_mask=False), DatabaseImageInserter(db_table=Raw, duplicate_protocol="replace"), - ImageDebatcher(), - ImageBatcher(["BOARD_ID", "FILTER", "EXPTIME", TARGET_KEY, "SUBCOORD"]), + ImageRebatcher(["BOARD_ID", "FILTER", "EXPTIME", TARGET_KEY, "SUBCOORD"]), CustomImageBatchModifier(winter_stackid_annotator), ImageSaver(output_dir_name="raw_unpacked", write_mask=False), HeaderAnnotator(input_keys=LATEST_SAVE_KEY, output_key=RAW_IMG_KEY), @@ -307,8 +304,7 @@ load_unpacked = [ ImageLoader(input_sub_dir="raw_unpacked", input_img_dir=base_output_dir), - ImageDebatcher(), - ImageBatcher("UTCTIME"), + ImageRebatcher("UTCTIME"), CSVLog( export_keys=[ "UTCTIME", @@ -340,8 +336,7 @@ # Detrend blocks dark_calibrate = [ - ImageDebatcher(), - ImageBatcher( + ImageRebatcher( ["BOARD_ID", EXPTIME_KEY, "SUBCOORD", "GAINCOLT", "GAINCOLB", "GAINROW"] ), DarkCalibrator( @@ -349,15 +344,13 @@ cache_image_name_header_keys=[EXPTIME_KEY, "BOARD_ID"], ), ImageSelector((OBSCLASS_KEY, ["science", "flat"])), - ImageDebatcher(), - ImageBatcher(["BOARD_ID", "UTCTIME", "SUBCOORD"]), + ImageRebatcher(["BOARD_ID", "UTCTIME", "SUBCOORD"]), ImageSaver(output_dir_name="darkcal"), CustomImageBatchModifier(winter_dark_oversubtraction_rejector), ] flat_calibrate = [ - ImageDebatcher(), - ImageBatcher( + ImageRebatcher( [ "BOARD_ID", "FILTER", @@ -373,8 +366,7 @@ cache_image_name_header_keys=["FILTER", "BOARD_ID"], ), ImageSaver(output_dir_name="skyflatcal"), - ImageDebatcher(), - ImageBatcher(["BOARD_ID", "UTCTIME", "SUBCOORD"]), + ImageRebatcher(["BOARD_ID", "UTCTIME", "SUBCOORD"]), Sextractor( **sextractor_astrometry_config, write_regions_bool=True, @@ -395,8 +387,7 @@ ] astrometry = [ - ImageDebatcher(), - ImageBatcher(["UTCTIME", "BOARD_ID", "SUBCOORD"]), + ImageRebatcher(["UTCTIME", "BOARD_ID", "SUBCOORD"]), AstrometryNet( output_sub_dir="anet", scale_bounds=[1.0, 1.3], @@ -410,8 +401,7 @@ cache=True, ), ImageSaver(output_dir_name="post_anet"), - ImageDebatcher(), - ImageBatcher( + ImageRebatcher( [TARGET_KEY, "FILTER", EXPTIME_KEY, "BOARD_ID", "SUBCOORD", "DITHGRP"] ), Sextractor( @@ -431,8 +421,7 @@ ] validate_astrometry = [ - ImageDebatcher(), - ImageBatcher(["UTCTIME", "BOARD_ID", "SUBCOORD", "DITHGRP"]), + ImageRebatcher(["UTCTIME", "BOARD_ID", "SUBCOORD", "DITHGRP"]), Sextractor( **sextractor_astromstats_config, write_regions_bool=True, @@ -467,8 +456,7 @@ ] photcal_and_export = [ - ImageDebatcher(), - ImageBatcher([BASE_NAME_KEY]), + ImageRebatcher([BASE_NAME_KEY]), HeaderAnnotator(input_keys=LATEST_SAVE_KEY, output_key=RAW_IMG_KEY), CustomImageBatchModifier(masked_images_rejector), Sextractor( @@ -541,8 +529,7 @@ ] plot_stack = [ - ImageDebatcher(), - ImageBatcher([TARGET_KEY, "BOARD_ID"]), + ImageRebatcher([TARGET_KEY, "BOARD_ID"]), ImagePlotter( output_sub_dir="final_stacks_plots", annotate_fields=[ @@ -559,15 +546,13 @@ ] split_stack = [ - ImageDebatcher(), - ImageBatcher(["BOARD_ID", "FILTER", TARGET_KEY, "SUBCOORD", "STACKID"]), + ImageRebatcher(["BOARD_ID", "FILTER", TARGET_KEY, "SUBCOORD", "STACKID"]), SwarpImageSplitter(swarp_config_path=swarp_config_path, n_x=2, n_y=1), ImageSaver(output_dir_name="split_stacks"), ] imsub = [ - ImageDebatcher(), - ImageBatcher([BASE_NAME_KEY]), + ImageRebatcher([BASE_NAME_KEY]), HeaderAnnotator(input_keys=[SUB_ID_KEY], output_key="SUBDETID"), ProcessReference( ref_image_generator=winter_reference_generator, @@ -651,6 +636,10 @@ SourceWriter(output_dir_name="kowalski"), ] +load_post_kowalski = [ + SourceLoader(input_dir_name="kowalski"), +] + select_history = [ SelectSourcesWithMetadata( db_query_columns=["sourceid"], @@ -661,48 +650,42 @@ ), ] -name_candidates = ( - [ - # Check if the source is already in the source table - SingleSpatialCrossmatchSource( - db_table=Source, - db_output_columns=["sourceid", SOURCE_NAME_KEY], - crossmatch_radius_arcsec=2.0, - ra_field_name="average_ra", - dec_field_name="average_dec", - ), - # Assign names to the new sources - CandidateNamer( - db_table=Source, - base_name=SOURCE_PREFIX, - name_start=NAME_START, - db_name_field=SOURCE_NAME_KEY, - ), - # Add the new sources to the source table - CustomSourceTableModifier(modifier_function=winter_new_source_updater), - DatabaseSourceInserter( - db_table=Source, - duplicate_protocol="ignore", - ), - # Get all candidates associated with source - ] - + select_history - + [ - # Update average ra and dec for source - CustomSourceTableModifier(modifier_function=winter_source_entry_updater), - # Update sources in the source table - DatabaseSourceInserter( - db_table=Source, - duplicate_protocol="replace", - ), - # Add candidates in the candidate table - DatabaseSourceInserter( - db_table=Candidate, - duplicate_protocol="fail", - ), - SourceWriter(output_dir_name="preavro"), - ] -) +name_candidates = [ + SourceBatcher(BASE_NAME_KEY), + # Add the new sources to the source table + CustomSourceTableModifier(modifier_function=winter_new_source_updater), + # Assign names to the new sources + CandidateNamer( + db_table=Source, + db_output_columns=["sourceid", SOURCE_NAME_KEY], + base_name=SOURCE_PREFIX, + name_start=NAME_START, + db_name_field=SOURCE_NAME_KEY, + crossmatch_radius_arcsec=2.0, + ra_field_name="average_ra", + dec_field_name="average_dec", + ), + # Add candidates in the candidate table + DatabaseSourceInserter( + db_table=Candidate, + duplicate_protocol="fail", + ), + SelectSourcesWithMetadata( + db_query_columns=["sourceid"], + db_table=Candidate, + db_output_columns=prv_candidate_cols + [SOURCE_NAME_KEY], + base_output_column=SOURCE_HISTORY_KEY, + additional_query_constraints=winter_history_deprecated_constraint, + ), + # Update average ra and dec for source + CustomSourceTableModifier(modifier_function=winter_source_entry_updater), + # Update sources in the source table + DatabaseSourceInserter( + db_table=Source, + duplicate_protocol="replace", + ), + SourceWriter(output_dir_name="preavro"), +] avro_write = [ # Add in the skyportal fields and all save locally @@ -844,8 +827,7 @@ ) stack_forced_photometry = [ - ImageDebatcher(), - ImageBatcher([BASE_NAME_KEY]), + ImageRebatcher([BASE_NAME_KEY]), ForcedPhotometryDetector(ra_header_key="TARGRA", dec_header_key="TARGDEC"), AperturePhotometry( aper_diameters=[5, 8, 10, 15], diff --git a/mirar/pipelines/winter/generator.py b/mirar/pipelines/winter/generator.py index a109e1eb5..ff88430ef 100644 --- a/mirar/pipelines/winter/generator.py +++ b/mirar/pipelines/winter/generator.py @@ -550,12 +550,16 @@ def winter_source_entry_updater(source_table: SourceBatch) -> SourceBatch: pd.DataFrame(src_df[SOURCE_HISTORY_KEY].loc[x]) for x in range(len(src_df)) ] - src_df["ndet"] = [len(x) + 1 for x in hist_dfs] + src_df["ndet"] = [len(x) for x in hist_dfs] new_fields = [] + # FIXME remove same detection by candid + for i, hist_df in enumerate(hist_dfs): - if len(hist_df) == 0: + if len(hist_df) == 1: + + new_hist_df = pd.DataFrame(columns=hist_df.columns) new_fields.append( { @@ -566,6 +570,7 @@ def winter_source_entry_updater(source_table: SourceBatch) -> SourceBatch: "jdstarthist": source["jd"], "jdendhist": source["jd"], "ndethist": 0, + SOURCE_HISTORY_KEY: new_hist_df, } ) @@ -591,6 +596,8 @@ def winter_source_entry_updater(source_table: SourceBatch) -> SourceBatch: min_jd = min(hist_df["jd"].tolist() + [source["jd"]]) max_jd = max(hist_df["jd"].tolist() + [source["jd"]]) + new_hist_df = hist_df[hist_df["candid"] != src_df["candid"].iloc[i]] + new_fields.append( { "average_ra": av_ra, @@ -599,7 +606,8 @@ def winter_source_entry_updater(source_table: SourceBatch) -> SourceBatch: "latest_det_utc": Time(max_jd, format="jd").isot, "jdstarthist": min_jd, "jdendhist": max_jd, - "ndethist": len(hist_df), + "ndethist": len(new_hist_df), + SOURCE_HISTORY_KEY: new_hist_df, } ) diff --git a/mirar/pipelines/winter/winter_pipeline.py b/mirar/pipelines/winter/winter_pipeline.py index 9c1224a53..4b2a0cbbd 100644 --- a/mirar/pipelines/winter/winter_pipeline.py +++ b/mirar/pipelines/winter/winter_pipeline.py @@ -95,7 +95,11 @@ class WINTERPipeline(Pipeline): "reftest": reftest, "only_ref": only_ref, "realtime": realtime, - "detect_candidates": load_final_stack + imsub + detect_candidates, + "detect_candidates": load_final_stack + + imsub + + detect_candidates + + process_candidates + + avro_broadcast, "full_imsub": load_final_stack + imsub + detect_candidates diff --git a/mirar/pipelines/wirc/blocks.py b/mirar/pipelines/wirc/blocks.py index 38a4eeebe..1236c6832 100644 --- a/mirar/pipelines/wirc/blocks.py +++ b/mirar/pipelines/wirc/blocks.py @@ -53,10 +53,7 @@ from mirar.processors.csvlog import CSVLog from mirar.processors.dark import DarkCalibrator from mirar.processors.database.database_inserter import DatabaseSourceInserter -from mirar.processors.database.database_selector import ( - DatabaseHistorySelector, - SpatialCrossmatchSourceWithDatabase, -) +from mirar.processors.database.database_selector import DatabaseHistorySelector from mirar.processors.flat import SkyFlatCalibrator from mirar.processors.mask import ( MaskAboveThreshold, @@ -250,16 +247,15 @@ XMatch(catalog=TMASS(num_sources=3, search_radius_arcmin=0.5)), XMatch(catalog=PS1(num_sources=3, search_radius_arcmin=0.5)), SourceWriter(output_dir_name="kowalski"), - SpatialCrossmatchSourceWithDatabase( - db_table=Candidate, - db_output_columns=[SOURCE_NAME_KEY], - crossmatch_radius_arcsec=2.0, - max_num_results=1, - ), + HeaderAnnotator(input_keys=[LATEST_SAVE_KEY], output_key="diffimgname"), + HeaderAnnotator(input_keys=[SCI_IMG_KEY], output_key="sciimgname"), + HeaderAnnotator(input_keys=[REF_IMG_KEY], output_key="refimgname"), CandidateNamer( db_table=Candidate, base_name=CANDIDATE_PREFIX, name_start=NAME_START, + db_output_columns=[SOURCE_NAME_KEY], + crossmatch_radius_arcsec=2.0, ), DatabaseHistorySelector( crossmatch_radius_arcsec=2.0, @@ -267,10 +263,7 @@ db_table=Candidate, db_output_columns=[SOURCE_NAME_KEY] + prv_candidate_cols, ), - HeaderAnnotator(input_keys=[LATEST_SAVE_KEY], output_key="diffimgname"), - HeaderAnnotator(input_keys=[SCI_IMG_KEY], output_key="sciimgname"), - HeaderAnnotator(input_keys=[REF_IMG_KEY], output_key="refimgname"), - DatabaseSourceInserter(db_table=Candidate, duplicate_protocol="fail"), + DatabaseSourceInserter(db_table=Candidate, duplicate_protocol="replace"), SourceWriter(output_dir_name="candidates"), # EdgeCandidatesMask(edge_boundary_size=100) # FilterCandidates(), diff --git a/mirar/processors/base_processor.py b/mirar/processors/base_processor.py index 3d13bcd11..bd518fd6f 100644 --- a/mirar/processors/base_processor.py +++ b/mirar/processors/base_processor.py @@ -25,7 +25,7 @@ NoncriticalProcessingError, ProcessorError, ) -from mirar.io import open_fits, save_fits +from mirar.io import MissingCoreFieldError, open_fits, save_fits from mirar.paths import ( BASE_NAME_KEY, CAL_OUTPUT_SUB_DIR, @@ -33,6 +33,7 @@ PACKAGE_NAME, PROC_HISTORY_KEY, RAW_IMG_KEY, + core_source_fields, get_mask_path, get_output_path, max_n_cpu, @@ -536,6 +537,15 @@ def _apply(self, batch: ImageBatch) -> SourceBatch: msg = "No sources found in image batch" logger.warning(msg) + for batch in source_batch: + cols = batch.get_data().columns + for field in core_source_fields: + if field not in cols: + raise MissingCoreFieldError( + f"Field {field} not found in source table. " + f"Available fields are {cols}." + ) + return source_batch def _apply_to_images(self, batch: ImageBatch) -> SourceBatch: diff --git a/mirar/processors/csvlog.py b/mirar/processors/csvlog.py index 59d6ebf45..24cb1c0d2 100644 --- a/mirar/processors/csvlog.py +++ b/mirar/processors/csvlog.py @@ -32,6 +32,7 @@ def __init__( export_keys: Optional[list[str]] = None, output_sub_dir: str = "", output_base_dir: Optional[str] = None, + output_name: str = "log", ): super().__init__() if export_keys is None: @@ -39,6 +40,7 @@ def __init__( self.export_keys = export_keys self.output_sub_dir = output_sub_dir self.output_base_dir = output_base_dir + self.output_name = output_name self.all_rows = [] def __str__(self) -> str: @@ -51,9 +53,9 @@ def get_log_name(self) -> str: """ Returns the custom log name - :return: Lof file name + :return: Log file name """ - return f"{self.night}_log.csv" + return f"{self.night}_{self.output_name}.csv" def get_output_path(self) -> Path: """ diff --git a/mirar/processors/database/database_inserter.py b/mirar/processors/database/database_inserter.py index dce795c33..55d99a1c8 100644 --- a/mirar/processors/database/database_inserter.py +++ b/mirar/processors/database/database_inserter.py @@ -47,7 +47,7 @@ def __init__(self, *args, duplicate_protocol: str = "fail", **kwargs): def __str__(self): return ( f"Processor to save " - f"{['candidates', 'images'][isinstance(self, BaseImageProcessor)]} " + f"{['sources', 'images'][isinstance(self, BaseImageProcessor)]} " f"to the '{self.db_table.__name__}' table of " f"the '{self.db_name}' Postgres database." ) diff --git a/mirar/processors/database/database_selector.py b/mirar/processors/database/database_selector.py index 9fccf8144..54596495e 100644 --- a/mirar/processors/database/database_selector.py +++ b/mirar/processors/database/database_selector.py @@ -180,24 +180,8 @@ def _apply_to_sources( candidate_table = source_table.get_data() results = [] for _, source in candidate_table.iterrows(): - super_dict = self.generate_super_dict(metadata, source) - query_constraints = self.get_constraints(super_dict) - logger.debug( - f"Query constraints: " f"{query_constraints.parse_constraints()}" - ) - if self.additional_query_constraints is not None: - query_constraints = ( - query_constraints + self.additional_query_constraints - ) - logger.debug( - f"Query constraints: " f"{query_constraints.parse_constraints()}" - ) - res = select_from_table( - sql_table=self.db_table.sql_model, - db_constraints=query_constraints, - output_columns=self.db_output_columns, - max_num_results=self.max_num_results, - ) + + res = self.query_for_source(source, metadata) results.append(res) @@ -205,6 +189,28 @@ def _apply_to_sources( source_table.set_data(new_table) return batch + def query_for_source(self, source: pd.Series, metadata: dict) -> pd.DataFrame: + """ + Query the database for a single source + + :param source: Source data + :param metadata: Source Batch metadata + :return: Results from the database + """ + super_dict = self.generate_super_dict(metadata, source) + query_constraints = self.get_constraints(super_dict) + logger.debug(f"Query constraints: " f"{query_constraints.parse_constraints()}") + if self.additional_query_constraints is not None: + query_constraints = query_constraints + self.additional_query_constraints + logger.debug(f"Query constraints: " f"{query_constraints.parse_constraints()}") + res = select_from_table( + sql_table=self.db_table.sql_model, + db_constraints=query_constraints, + output_columns=self.db_output_columns, + max_num_results=self.max_num_results, + ) + return res + class DatabaseSingleMatchSelector(BaseDatabaseSourceSelector, ABC): """ diff --git a/mirar/processors/sources/__init__.py b/mirar/processors/sources/__init__.py index 76a2c3fd6..535dc8e88 100644 --- a/mirar/processors/sources/__init__.py +++ b/mirar/processors/sources/__init__.py @@ -10,4 +10,10 @@ from mirar.processors.sources.source_exporter import SourceWriter from mirar.processors.sources.source_filter import BaseSourceFilter from mirar.processors.sources.source_loader import SourceLoader +from mirar.processors.sources.source_selector import ( + SourceBatcher, + SourceDebatcher, + SourceRebatcher, + SourceSelector, +) from mirar.processors.sources.source_table_modifier import CustomSourceTableModifier diff --git a/mirar/processors/sources/forced_photometry.py b/mirar/processors/sources/forced_photometry.py index 2b5dcfd5d..e00437bb8 100644 --- a/mirar/processors/sources/forced_photometry.py +++ b/mirar/processors/sources/forced_photometry.py @@ -2,14 +2,25 @@ Module to extract a candidates table from an image header """ +import logging + import pandas as pd from mirar.data import ImageBatch, SourceBatch, SourceTable from mirar.data.utils import get_xy_from_wcs from mirar.errors import ProcessorError -from mirar.paths import CAND_DEC_KEY, CAND_RA_KEY, XPOS_KEY, YPOS_KEY +from mirar.paths import ( + CAND_DEC_KEY, + CAND_RA_KEY, + SOURCE_HISTORY_KEY, + SOURCE_NAME_KEY, + XPOS_KEY, + YPOS_KEY, +) from mirar.processors.base_processor import BaseSourceGenerator +logger = logging.getLogger(__name__) + class TableFromHeaderError(ProcessorError): """Error relating to writing a source table from a header""" @@ -31,11 +42,13 @@ def __init__( calculate_image_coordinates: bool = True, ra_header_key: str = CAND_RA_KEY, dec_header_key: str = CAND_DEC_KEY, + name_header_key: str | None = None, ): super().__init__() self.calculate_image_coordinates = calculate_image_coordinates self.ra_header_key = ra_header_key self.dec_header_key = dec_header_key + self.name_header_key = name_header_key def _apply_to_images(self, batch: ImageBatch) -> SourceBatch: all_cands = SourceBatch() @@ -43,9 +56,26 @@ def _apply_to_images(self, batch: ImageBatch) -> SourceBatch: # Save the image here, to facilitate processing downstream header = image.get_header() + metadata = self.get_metadata(image) + + name = None + + if self.name_header_key is not None: + try: + name = header[self.name_header_key] + except KeyError as exc: + err = ( + f"Header key for name ({self.name_header_key}) not " + f"found in header. Available keys: {header.keys()}" + ) + logger.error(err) + raise HeaderKeyMissingError(err) from exc + new_dict = { CAND_RA_KEY: image[self.ra_header_key], CAND_DEC_KEY: image[self.dec_header_key], + SOURCE_HISTORY_KEY: pd.DataFrame(), + SOURCE_NAME_KEY: name, } if self.calculate_image_coordinates: @@ -59,8 +89,6 @@ def _apply_to_images(self, batch: ImageBatch) -> SourceBatch: src_table = pd.DataFrame([new_dict]) - metadata = self.get_metadata(image) - all_cands.append(SourceTable(src_table, metadata=metadata)) return all_cands diff --git a/mirar/processors/sources/namer.py b/mirar/processors/sources/namer.py index 18b58caa3..45f196a2e 100644 --- a/mirar/processors/sources/namer.py +++ b/mirar/processors/sources/namer.py @@ -11,12 +11,12 @@ from mirar.data import SourceBatch from mirar.database.transactions.select import run_select from mirar.paths import SOURCE_NAME_KEY, TIME_KEY -from mirar.processors.database.database_selector import BaseDatabaseSourceSelector +from mirar.processors.database.database_selector import SingleSpatialCrossmatchSource logger = logging.getLogger(__name__) -class CandidateNamer(BaseDatabaseSourceSelector): +class CandidateNamer(SingleSpatialCrossmatchSource): """Processor to sequentially assign names to sources, of the form a, aa, aba...""" base_key = "namer" @@ -29,12 +29,20 @@ def __init__( base_name: str, name_start: str = "aaaaa", db_name_field: str = SOURCE_NAME_KEY, + name_key: str = SOURCE_NAME_KEY, **kwargs, ): - super().__init__(db_output_columns=[db_name_field], **kwargs) + super().__init__(**kwargs) self.db_name_field = db_name_field + + # Ensure that the name field is in the output columns + self.db_output_columns = list( + set([self.db_name_field] + self.db_output_columns) + ) + self.base_name = base_name self.name_start = name_start + self.name_key = name_key self.lastname = None def __str__(self) -> str: @@ -143,27 +151,45 @@ def _apply_to_sources( for source_table in batch: sources = source_table.get_data() - names = [] + metadata = source_table.get_metadata() detection_time = Time(source_table[TIME_KEY]) + + matches = [] + for ind, source in sources.iterrows(): - source_name = None + match = self.query_for_source(source, metadata) - if SOURCE_NAME_KEY in source: - source_name = source[SOURCE_NAME_KEY] + if len(match) > 0: + source_name = match[self.name_key].iloc[0] + logger.debug(f"Source already has name: {source_name}") + matches.append(match) - if pd.isnull(source_name): + else: source_name = self.get_next_name( detection_time, last_name=self.lastname ) self.lastname = source_name logger.debug(f"Assigning name: {source_name} to source # {ind}.") - else: - logger.debug(f"Source # {ind} already has a name: {source_name}.") - names.append(source_name) - sources[self.db_name_field] = names + # Insert the name into the source table + source[self.name_key] = source_name + + metadata_dict = self.generate_super_dict(metadata, source) + + new = self.db_table(**metadata_dict) + match = new.insert_entry( + duplicate_protocol="fail", + returning_key_names=self.db_output_columns, + ) + matches.append(match) + + match_df = pd.concat(matches, ignore_index=True, axis=0) + + for column in self.db_output_columns: + sources[column] = match_df[column] + source_table.set_data(sources) return batch diff --git a/mirar/processors/sources/sextractor_source_detector.py b/mirar/processors/sources/sextractor_source_detector.py index 221d73566..34ca4826c 100644 --- a/mirar/processors/sources/sextractor_source_detector.py +++ b/mirar/processors/sources/sextractor_source_detector.py @@ -17,6 +17,8 @@ CAND_DEC_KEY, CAND_RA_KEY, SEXTRACTOR_HEADER_KEY, + SOURCE_HISTORY_KEY, + SOURCE_NAME_KEY, XPOS_KEY, YPOS_KEY, get_output_dir, @@ -35,7 +37,10 @@ def generate_candidates_table( ) -> pd.DataFrame: """ Generate a candidates table from a sextractor catalog + + :param image: Image object :param sextractor_catalog_path: Path to the sextractor catalog + :param target_only: Whether to return only the target source :return: Candidates table """ det_srcs = get_table_from_ldac(sextractor_catalog_path) @@ -68,6 +73,8 @@ def generate_candidates_table( det_srcs[CAND_DEC_KEY] = det_srcs["DELTAWIN_J2000"] det_srcs["fwhm"] = det_srcs["FWHM_IMAGE"] det_srcs["elong"] = det_srcs["ELONGATION"] + det_srcs[SOURCE_HISTORY_KEY] = pd.DataFrame() + det_srcs[SOURCE_NAME_KEY] = None return det_srcs diff --git a/mirar/processors/sources/source_detector.py b/mirar/processors/sources/source_detector.py index a6179af27..414a74583 100644 --- a/mirar/processors/sources/source_detector.py +++ b/mirar/processors/sources/source_detector.py @@ -23,6 +23,8 @@ REF_IMG_KEY, SCI_IMG_KEY, SCOR_IMG_KEY, + SOURCE_HISTORY_KEY, + SOURCE_NAME_KEY, XPOS_KEY, YPOS_KEY, get_output_dir, @@ -88,6 +90,9 @@ def generate_candidates_table( det_srcs = det_srcs.to_pandas() + det_srcs[SOURCE_HISTORY_KEY] = [pd.DataFrame() for _ in range(len(det_srcs))] + det_srcs[SOURCE_NAME_KEY] = None + logger.debug( f"Filtered to {len(det_srcs)} candidates in image with " f"scorr peak > 5." ) diff --git a/mirar/processors/sources/source_selector.py b/mirar/processors/sources/source_selector.py new file mode 100644 index 000000000..9e6273673 --- /dev/null +++ b/mirar/processors/sources/source_selector.py @@ -0,0 +1,234 @@ +""" +Module containing processors and functions to select a subset of sources from a batch +""" + +# pylint: disable=duplicate-code + +import logging + +from mirar.data import Dataset, SourceBatch +from mirar.paths import TARGET_KEY +from mirar.processors.base_processor import BaseSourceProcessor, CleanupProcessor +from mirar.processors.utils.image_selector import ParsingError + +logger = logging.getLogger(__name__) + + +def select_from_sources( + batch: SourceBatch, + key: str = TARGET_KEY, + target_values: str | list[str] = "science", +) -> SourceBatch: + """ + Returns a subset of sources in a batch with have values of equal to + a value in + + :param batch: source batch to sort + :param key: header key to filter on + :param target_values: accepted value(s) for key + :return: source batch containing the subset of sources which pass + """ + + # Enforce string in list for later matching + if not isinstance(target_values, list): + target_values = [str(target_values)] + else: + target_values = [str(x) for x in target_values] + + new_batch = SourceBatch() + + for source_table in batch: + try: + if str(source_table[key]) in target_values: + new_batch.append(source_table) + except KeyError as exc: + logger.error(exc) + raise ParsingError(exc) from exc + + return new_batch + + +class SourceSelector(BaseSourceProcessor, CleanupProcessor): + """ + Processor to only select a subset of sources from a batch. Sources can + be selected using header keywords. For example, using: + SourceSelector(("OBSCLASS", "SCIENCE")) + selects Sources with header["OBSCLASS"]=="SCIENCE" + """ + + base_key = "select" + + def __init__(self, *args: tuple[str, str | list[str]]): + super().__init__() + self.targets = args + + def __str__(self): + reqs = [] + for target in self.targets: + if isinstance(target[1], list): + reqs.append(f"{target[0]} = {' or '.join(target[1])}") + else: + reqs.append(f"{target[0]} = {target[1]}") + + return f"Processor to select sources where {', and '.join(reqs)}" + + def _apply_to_sources( + self, + batch: SourceBatch, + ) -> SourceBatch: + for header_key, target_values in self.targets: + batch = select_from_sources( + batch, key=header_key, target_values=target_values + ) + + return batch + + +def split_sources_into_batches( + sources: SourceBatch, split_key: str | list[str] +) -> Dataset: + """ + Function to split a single :class:`~mirar.data.source_data.SourceBatch` object + into multiple :class:`~mirar.data.base_data.DataBatch` objects. + Each new batch will have the same value of . + Returns a dataset containing the new batches + + :param sources: Source batch to split + :param split_key: Key to split batch + :return: Dataset containing new source batches + """ + + if isinstance(split_key, str): + split_key = [split_key] + + groups = {} + + for source_table in sources: + uid = [] + + for key in split_key: + uid.append(str(source_table[key])) + + uid = "_".join(uid) + + if uid not in groups: + groups[uid] = [source_table] + else: + groups[uid] += [source_table] + logger.debug(groups) + res = Dataset([SourceBatch(x) for x in groups.values()]) + + return res + + +class SourceBatcher(BaseSourceProcessor): + """ + Module to split :class:`~mirar.data.source_data.SourceBatch` object + into multiple :class:`~mirar.data.base_data.DataBatch` objects. + + Sources are batched using the `split_key` argument. For example, + you can batch by filter, like this: + SourceBatcher(split_key="filter") + which will return N batches for the N different filters present + in the directory you are reducing. + If you do not require batching at some point in your reductions, + you can split by BASE_NAME_KEY: + SourceBatcher(split_key=BASE_NAME_KEY) + which returns SourceBatches of length 1, one for each file in the + directory you're working with. + """ + + base_key = "batch" + + def __init__(self, split_key: str | list[str]): + super().__init__() + self.split_key = split_key + + def __str__(self) -> str: + if isinstance(self.split_key, list): + split = self.split_key + else: + split = [self.split_key] + + return ( + f"Groups sources into batches, with each batch having " + f"the same value of {' and '.join(split)}" + ) + + def _apply_to_sources( + self, + batch: SourceBatch, + ) -> SourceBatch: + return batch + + def update_dataset(self, dataset: Dataset) -> Dataset: + new_dataset = Dataset() + + for batch in dataset: + new = split_sources_into_batches(batch, split_key=self.split_key) + new_dataset += new + + return new_dataset + + +class SourceDebatcher(BaseSourceProcessor): + """ + Processor to group all incoming :class:`~mirar.data.source_data.SourceBatch` + objects into a single batch. + This is helpful if you've already batched at an earlier stage in your workflow, and + you want to start over and batch by a different split key. + """ + + base_key = "debatch" + + def _apply_to_sources( + self, + batch: SourceBatch, + ) -> SourceBatch: + return batch + + def __str__(self) -> str: + return "Processor to combine all sources into a single SourceBatch" + + def update_dataset(self, dataset: Dataset) -> Dataset: + combo_batch = SourceBatch() + + for batch in dataset: + combo_batch += batch + + return Dataset([combo_batch]) + + +class SourceRebatcher(SourceBatcher): + """ + Processor to regroup all incoming :class:`~mirar.data.source_data.SourceBatch` + objects into a single batch, and then split by new keys. + This is helpful if you've already batched at an earlier stage in your workflow, and + you want to start over and batch by a different split key. + """ + + base_key = "rebatch" + + def _apply_to_sources( + self, + batch: SourceBatch, + ) -> SourceBatch: + return batch + + def __str__(self) -> str: + if isinstance(self.split_key, list): + split = self.split_key + else: + split = [self.split_key] + + return f"Processor to regroup sources into batches by {' and '.join(split)}" + + def update_dataset(self, dataset: Dataset) -> Dataset: + combo_batch = SourceBatch() + + for batch in dataset: + combo_batch += batch + + dataset = split_sources_into_batches(combo_batch, split_key=self.split_key) + + return dataset diff --git a/mirar/processors/utils/__init__.py b/mirar/processors/utils/__init__.py index 592d24f20..6117bac40 100644 --- a/mirar/processors/utils/__init__.py +++ b/mirar/processors/utils/__init__.py @@ -12,6 +12,7 @@ from mirar.processors.utils.image_selector import ( ImageBatcher, ImageDebatcher, + ImageRebatcher, ImageSelector, select_from_images, ) diff --git a/mirar/processors/utils/image_selector.py b/mirar/processors/utils/image_selector.py index 10772e684..fe14780f4 100644 --- a/mirar/processors/utils/image_selector.py +++ b/mirar/processors/utils/image_selector.py @@ -201,3 +201,38 @@ def update_dataset(self, dataset: Dataset) -> Dataset: combo_batch += batch return Dataset([combo_batch]) + + +class ImageRebatcher(ImageBatcher): + """ + Processor to regroup all incoming :class:`~mirar.data.image_data.ImageBatch` + objects into a single batch, and then split by new keys. + This is helpful if you've already batched at an earlier stage in your workflow, and + you want to start over and batch by a different split key. + """ + + base_key = "rebatch" + + def _apply_to_images( + self, + batch: ImageBatch, + ) -> ImageBatch: + return batch + + def __str__(self) -> str: + if isinstance(self.split_key, list): + split = self.split_key + else: + split = [self.split_key] + + return f"Processor to regroup images into batches by {' and '.join(split)}" + + def update_dataset(self, dataset: Dataset) -> Dataset: + combo_batch = ImageBatch() + + for batch in dataset: + combo_batch += batch + + dataset = split_images_into_batches(combo_batch, split_key=self.split_key) + + return dataset