Skip to content

Commit

Permalink
Fix history/naming (#858)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdstein authored Apr 22, 2024
1 parent ac3fc7e commit 834465c
Show file tree
Hide file tree
Showing 17 changed files with 488 additions and 135 deletions.
14 changes: 10 additions & 4 deletions mirar/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
doc_dir = base_code_dir.joinpath("docs/")

# Load environment variables for .env file
variables_loaded = dotenv.load_dotenv()
VARIABLES_LOADED = dotenv.load_dotenv()

if variables_loaded:
info = "Environment variables were automatically loaded from .env file."
logger.info(info)
if VARIABLES_LOADED:
logger.info("Environment variables were automatically loaded from .env file.")

_n_cpu = os.cpu_count()
if _n_cpu is None:
Expand Down Expand Up @@ -346,6 +345,13 @@ def get_astrometry_keys() -> list:
EXPTIME_KEY,
]

core_source_fields = [
CAND_RA_KEY,
CAND_DEC_KEY,
SOURCE_NAME_KEY,
SOURCE_HISTORY_KEY,
]

MONITOR_EMAIL_KEY = "WATCHDOG_EMAIL"
MONITOR_PASSWORD_KEY = "WATCHDOG_EMAIL_PASSWORD"
MONITOR_RECIPIENT_KEY = "WATCHDOG_EMAIL_RECIPIENTS"
Expand Down
134 changes: 58 additions & 76 deletions mirar/pipelines/winter/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@
DatabaseImageInserter,
DatabaseSourceInserter,
)
from mirar.processors.database.database_selector import (
SelectSourcesWithMetadata,
SingleSpatialCrossmatchSource,
)
from mirar.processors.database.database_selector import SelectSourcesWithMetadata
from mirar.processors.database.database_updater import ImageDatabaseMultiEntryUpdater
from mirar.processors.flat import FlatCalibrator
from mirar.processors.mask import ( # MaskAboveThreshold,
Expand All @@ -128,6 +125,7 @@
CandidateNamer,
CustomSourceTableModifier,
ForcedPhotometryDetector,
SourceBatcher,
SourceLoader,
SourceWriter,
ZOGYSourceDetector,
Expand All @@ -140,6 +138,7 @@
ImageDebatcher,
ImageLoader,
ImagePlotter,
ImageRebatcher,
ImageRejector,
ImageSaver,
ImageSelector,
Expand Down Expand Up @@ -267,8 +266,7 @@
("FIELDID", str(3944)),
("BOARD_ID", str(BOARD_ID)),
),
ImageDebatcher(),
ImageBatcher("STACKID"),
ImageRebatcher("STACKID"),
]

# mask
Expand All @@ -292,8 +290,7 @@
save_raw = [
ImageSaver(output_dir_name="raw_unpacked", write_mask=False),
DatabaseImageInserter(db_table=Raw, duplicate_protocol="replace"),
ImageDebatcher(),
ImageBatcher(["BOARD_ID", "FILTER", "EXPTIME", TARGET_KEY, "SUBCOORD"]),
ImageRebatcher(["BOARD_ID", "FILTER", "EXPTIME", TARGET_KEY, "SUBCOORD"]),
CustomImageBatchModifier(winter_stackid_annotator),
ImageSaver(output_dir_name="raw_unpacked", write_mask=False),
HeaderAnnotator(input_keys=LATEST_SAVE_KEY, output_key=RAW_IMG_KEY),
Expand All @@ -307,8 +304,7 @@

load_unpacked = [
ImageLoader(input_sub_dir="raw_unpacked", input_img_dir=base_output_dir),
ImageDebatcher(),
ImageBatcher("UTCTIME"),
ImageRebatcher("UTCTIME"),
CSVLog(
export_keys=[
"UTCTIME",
Expand Down Expand Up @@ -340,24 +336,21 @@
# Detrend blocks

dark_calibrate = [
ImageDebatcher(),
ImageBatcher(
ImageRebatcher(
["BOARD_ID", EXPTIME_KEY, "SUBCOORD", "GAINCOLT", "GAINCOLB", "GAINROW"]
),
DarkCalibrator(
cache_sub_dir="calibration_darks",
cache_image_name_header_keys=[EXPTIME_KEY, "BOARD_ID"],
),
ImageSelector((OBSCLASS_KEY, ["science", "flat"])),
ImageDebatcher(),
ImageBatcher(["BOARD_ID", "UTCTIME", "SUBCOORD"]),
ImageRebatcher(["BOARD_ID", "UTCTIME", "SUBCOORD"]),
ImageSaver(output_dir_name="darkcal"),
CustomImageBatchModifier(winter_dark_oversubtraction_rejector),
]

flat_calibrate = [
ImageDebatcher(),
ImageBatcher(
ImageRebatcher(
[
"BOARD_ID",
"FILTER",
Expand All @@ -373,8 +366,7 @@
cache_image_name_header_keys=["FILTER", "BOARD_ID"],
),
ImageSaver(output_dir_name="skyflatcal"),
ImageDebatcher(),
ImageBatcher(["BOARD_ID", "UTCTIME", "SUBCOORD"]),
ImageRebatcher(["BOARD_ID", "UTCTIME", "SUBCOORD"]),
Sextractor(
**sextractor_astrometry_config,
write_regions_bool=True,
Expand All @@ -395,8 +387,7 @@
]

astrometry = [
ImageDebatcher(),
ImageBatcher(["UTCTIME", "BOARD_ID", "SUBCOORD"]),
ImageRebatcher(["UTCTIME", "BOARD_ID", "SUBCOORD"]),
AstrometryNet(
output_sub_dir="anet",
scale_bounds=[1.0, 1.3],
Expand All @@ -410,8 +401,7 @@
cache=True,
),
ImageSaver(output_dir_name="post_anet"),
ImageDebatcher(),
ImageBatcher(
ImageRebatcher(
[TARGET_KEY, "FILTER", EXPTIME_KEY, "BOARD_ID", "SUBCOORD", "DITHGRP"]
),
Sextractor(
Expand All @@ -431,8 +421,7 @@
]

validate_astrometry = [
ImageDebatcher(),
ImageBatcher(["UTCTIME", "BOARD_ID", "SUBCOORD", "DITHGRP"]),
ImageRebatcher(["UTCTIME", "BOARD_ID", "SUBCOORD", "DITHGRP"]),
Sextractor(
**sextractor_astromstats_config,
write_regions_bool=True,
Expand Down Expand Up @@ -467,8 +456,7 @@
]

photcal_and_export = [
ImageDebatcher(),
ImageBatcher([BASE_NAME_KEY]),
ImageRebatcher([BASE_NAME_KEY]),
HeaderAnnotator(input_keys=LATEST_SAVE_KEY, output_key=RAW_IMG_KEY),
CustomImageBatchModifier(masked_images_rejector),
Sextractor(
Expand Down Expand Up @@ -541,8 +529,7 @@
]

plot_stack = [
ImageDebatcher(),
ImageBatcher([TARGET_KEY, "BOARD_ID"]),
ImageRebatcher([TARGET_KEY, "BOARD_ID"]),
ImagePlotter(
output_sub_dir="final_stacks_plots",
annotate_fields=[
Expand All @@ -559,15 +546,13 @@
]

split_stack = [
ImageDebatcher(),
ImageBatcher(["BOARD_ID", "FILTER", TARGET_KEY, "SUBCOORD", "STACKID"]),
ImageRebatcher(["BOARD_ID", "FILTER", TARGET_KEY, "SUBCOORD", "STACKID"]),
SwarpImageSplitter(swarp_config_path=swarp_config_path, n_x=2, n_y=1),
ImageSaver(output_dir_name="split_stacks"),
]

imsub = [
ImageDebatcher(),
ImageBatcher([BASE_NAME_KEY]),
ImageRebatcher([BASE_NAME_KEY]),
HeaderAnnotator(input_keys=[SUB_ID_KEY], output_key="SUBDETID"),
ProcessReference(
ref_image_generator=winter_reference_generator,
Expand Down Expand Up @@ -651,6 +636,10 @@
SourceWriter(output_dir_name="kowalski"),
]

load_post_kowalski = [
SourceLoader(input_dir_name="kowalski"),
]

select_history = [
SelectSourcesWithMetadata(
db_query_columns=["sourceid"],
Expand All @@ -661,48 +650,42 @@
),
]

name_candidates = (
[
# Check if the source is already in the source table
SingleSpatialCrossmatchSource(
db_table=Source,
db_output_columns=["sourceid", SOURCE_NAME_KEY],
crossmatch_radius_arcsec=2.0,
ra_field_name="average_ra",
dec_field_name="average_dec",
),
# Assign names to the new sources
CandidateNamer(
db_table=Source,
base_name=SOURCE_PREFIX,
name_start=NAME_START,
db_name_field=SOURCE_NAME_KEY,
),
# Add the new sources to the source table
CustomSourceTableModifier(modifier_function=winter_new_source_updater),
DatabaseSourceInserter(
db_table=Source,
duplicate_protocol="ignore",
),
# Get all candidates associated with source
]
+ select_history
+ [
# Update average ra and dec for source
CustomSourceTableModifier(modifier_function=winter_source_entry_updater),
# Update sources in the source table
DatabaseSourceInserter(
db_table=Source,
duplicate_protocol="replace",
),
# Add candidates in the candidate table
DatabaseSourceInserter(
db_table=Candidate,
duplicate_protocol="fail",
),
SourceWriter(output_dir_name="preavro"),
]
)
name_candidates = [
SourceBatcher(BASE_NAME_KEY),
# Add the new sources to the source table
CustomSourceTableModifier(modifier_function=winter_new_source_updater),
# Assign names to the new sources
CandidateNamer(
db_table=Source,
db_output_columns=["sourceid", SOURCE_NAME_KEY],
base_name=SOURCE_PREFIX,
name_start=NAME_START,
db_name_field=SOURCE_NAME_KEY,
crossmatch_radius_arcsec=2.0,
ra_field_name="average_ra",
dec_field_name="average_dec",
),
# Add candidates in the candidate table
DatabaseSourceInserter(
db_table=Candidate,
duplicate_protocol="fail",
),
SelectSourcesWithMetadata(
db_query_columns=["sourceid"],
db_table=Candidate,
db_output_columns=prv_candidate_cols + [SOURCE_NAME_KEY],
base_output_column=SOURCE_HISTORY_KEY,
additional_query_constraints=winter_history_deprecated_constraint,
),
# Update average ra and dec for source
CustomSourceTableModifier(modifier_function=winter_source_entry_updater),
# Update sources in the source table
DatabaseSourceInserter(
db_table=Source,
duplicate_protocol="replace",
),
SourceWriter(output_dir_name="preavro"),
]

avro_write = [
# Add in the skyportal fields and all save locally
Expand Down Expand Up @@ -844,8 +827,7 @@
)

stack_forced_photometry = [
ImageDebatcher(),
ImageBatcher([BASE_NAME_KEY]),
ImageRebatcher([BASE_NAME_KEY]),
ForcedPhotometryDetector(ra_header_key="TARGRA", dec_header_key="TARGDEC"),
AperturePhotometry(
aper_diameters=[5, 8, 10, 15],
Expand Down
14 changes: 11 additions & 3 deletions mirar/pipelines/winter/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,16 @@ def winter_source_entry_updater(source_table: SourceBatch) -> SourceBatch:
pd.DataFrame(src_df[SOURCE_HISTORY_KEY].loc[x]) for x in range(len(src_df))
]

src_df["ndet"] = [len(x) + 1 for x in hist_dfs]
src_df["ndet"] = [len(x) for x in hist_dfs]

new_fields = []

# FIXME remove same detection by candid

for i, hist_df in enumerate(hist_dfs):
if len(hist_df) == 0:
if len(hist_df) == 1:

new_hist_df = pd.DataFrame(columns=hist_df.columns)

new_fields.append(
{
Expand All @@ -566,6 +570,7 @@ def winter_source_entry_updater(source_table: SourceBatch) -> SourceBatch:
"jdstarthist": source["jd"],
"jdendhist": source["jd"],
"ndethist": 0,
SOURCE_HISTORY_KEY: new_hist_df,
}
)

Expand All @@ -591,6 +596,8 @@ def winter_source_entry_updater(source_table: SourceBatch) -> SourceBatch:
min_jd = min(hist_df["jd"].tolist() + [source["jd"]])
max_jd = max(hist_df["jd"].tolist() + [source["jd"]])

new_hist_df = hist_df[hist_df["candid"] != src_df["candid"].iloc[i]]

new_fields.append(
{
"average_ra": av_ra,
Expand All @@ -599,7 +606,8 @@ def winter_source_entry_updater(source_table: SourceBatch) -> SourceBatch:
"latest_det_utc": Time(max_jd, format="jd").isot,
"jdstarthist": min_jd,
"jdendhist": max_jd,
"ndethist": len(hist_df),
"ndethist": len(new_hist_df),
SOURCE_HISTORY_KEY: new_hist_df,
}
)

Expand Down
6 changes: 5 additions & 1 deletion mirar/pipelines/winter/winter_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ class WINTERPipeline(Pipeline):
"reftest": reftest,
"only_ref": only_ref,
"realtime": realtime,
"detect_candidates": load_final_stack + imsub + detect_candidates,
"detect_candidates": load_final_stack
+ imsub
+ detect_candidates
+ process_candidates
+ avro_broadcast,
"full_imsub": load_final_stack
+ imsub
+ detect_candidates
Expand Down
Loading

0 comments on commit 834465c

Please sign in to comment.