Skip to content

Commit

Permalink
switch elp pipeline to use ModelLibrary
Browse files Browse the repository at this point in the history
  • Loading branch information
braingram committed Nov 15, 2024
1 parent c92c48c commit d6d0ed0
Showing 1 changed file with 63 additions and 144 deletions.
207 changes: 63 additions & 144 deletions romancal/pipeline/exposure_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
#!/usr/bin/env python
import logging
from os.path import basename

import numpy as np
from roman_datamodels import datamodels as rdm
from roman_datamodels.dqflags import group

import romancal.datamodels.filetype as filetype

# step imports
from romancal.assign_wcs import AssignWcsStep
from romancal.associations.asn_from_list import asn_from_list
from romancal.associations.load_asn import load_asn
from romancal.dark_current import DarkCurrentStep
from romancal.datamodels.library import ModelLibrary
from romancal.dq_init import dq_init_step
from romancal.flatfield import FlatFieldStep
from romancal.lib.basic_utils import is_fully_saturated
Expand Down Expand Up @@ -73,152 +67,77 @@ def process(self, input):
self.source_catalog.return_updated_model = True
# make sure we update source catalog coordinates afer running TweakRegStep
self.tweakreg.update_source_catalog_coordinates = True
# make output filenames based on input filenames
self.output_use_model = True

log.info("Starting Roman exposure calibration pipeline ...")
if isinstance(input, str):
input_filename = basename(input)
else:
input_filename = None

# determine the input type
file_type = filetype.check(input)
if file_type == "asn":
with open(input_filename) as f:
asn = load_asn(f)
elif file_type == "asdf":
try:
# set the product name based on the input filename
asn = asn_from_list([input], product_name=input_filename.split(".")[0])
file_type = "asn"
except TypeError:
log.debug("Error opening file:")
return

# Build a list of observations to process
expos_file = []
n_members = 0
# extract the members from the asn to run the files through the steps
results = []
tweakreg_input = []

# populate expos_file
if file_type == "asn":
for product in asn["products"]:
# extract the members from the asn to run the files through the steps
n_members = len(product["members"])
for member in product["members"]:
expos_file.append(member["expname"])
elif file_type == "DataModel":
expos_file.append(input)

for in_file in expos_file:
if isinstance(in_file, str):
input_filename = basename(in_file)
# Open the file
input = rdm.open(in_file)
elif isinstance(in_file, rdm.DataModel):
input_filename = in_file.meta.filename
else:
input_filename = None

log.info(f"Processing a WFI exposure {input_filename}")

self.dq_init.suffix = "dq_init"
result = self.dq_init(input)

if input_filename:
result.meta.filename = input_filename

result = self.saturation(result)

# Test for fully saturated data
if is_fully_saturated(result):
# Return fully saturated image file (stopping pipeline)
log.info("All pixels are saturated. Returning a zeroed-out image.")

# if is_fully_saturated(result):
# Set all subsequent steps to skipped
for step_str in [
"assign_wcs",
"flat_field",
"photom",
"source_detection",
"dark",
"refpix",
"linearity",
"ramp_fit",
"jump",
"tweakreg",
]:
result.meta.cal_step[step_str] = "SKIPPED"

results.append(result)
return results

result = self.refpix(result)
result = self.linearity(result)
result = self.dark_current(result)
result = self.rampfit(result)
result = self.assign_wcs(result)

if result.meta.exposure.type == "WFI_IMAGE":
result = self.flatfield(result)
result = self.photom(result)
result = self.source_catalog(result)
tweakreg_input.append(result)
log.info(
f"Number of models to tweakreg: {len(tweakreg_input), n_members}"
)
else:
log.info("Flat Field step is being SKIPPED")
log.info("Photom step is being SKIPPED")
log.info("Source Detection step is being SKIPPED")
log.info("Tweakreg step is being SKIPPED")
result.meta.cal_step.flat_field = "SKIPPED"
result.meta.cal_step.photom = "SKIPPED"
result.meta.cal_step.source_detection = "SKIPPED"
result.meta.cal_step.tweakreg = "SKIPPED"

self.output_use_model = True
results.append(result)
if file_type == "ModelLibrary":
lib = input
elif file_type == "asn":
lib = ModelLibrary(input)
else:
lib = ModelLibrary([input])

with lib:
for model_index, model in enumerate(lib):
self.dq_init.suffix = "dq_init"
result = self.dq_init(model)

del model

result = self.saturation(result)

# Test for fully saturated data
if is_fully_saturated(result):
# Return fully saturated image file (stopping pipeline)
log.info("All pixels are saturated. Returning a zeroed-out image.")

# if is_fully_saturated(result):
# Set all subsequent steps to skipped
for step_str in [
"assign_wcs",
"flat_field",
"photom",
"source_detection",
"dark",
"refpix",
"linearity",
"ramp_fit",
"jump",
"tweakreg",
]:
result.meta.cal_step[step_str] = "SKIPPED"
else:
result = self.refpix(result)
result = self.linearity(result)
result = self.dark_current(result)
result = self.rampfit(result)
result = self.assign_wcs(result)

if result.meta.exposure.type == "WFI_IMAGE":
result = self.flatfield(result)
result = self.photom(result)
result = self.source_catalog(result)
else:
log.info("Flat Field step is being SKIPPED")
log.info("Photom step is being SKIPPED")
log.info("Source Detection step is being SKIPPED")
log.info("Tweakreg step is being SKIPPED")
result.meta.cal_step.flat_field = "SKIPPED"
result.meta.cal_step.photom = "SKIPPED"
result.meta.cal_step.source_detection = "SKIPPED"
result.meta.cal_step.tweakreg = "SKIPPED"

lib.shelve(result, model_index)

# Now that all the exposures are collated, run tweakreg
# Note: this does not cover the case where the asn mixes imaging and spectral
# observations. This should not occur on-prem
result = self.tweakreg(results)
self.tweakreg(lib)

log.info("Roman exposure calibration pipeline ending...")

return results

def create_fully_saturated_zeroed_image(self, input_model):
"""
Create zeroed-out image file
"""
# The set order is: data, dq, var_poisson, var_rnoise, err
fully_saturated_model = ramp_fit_step.create_image_model(
input_model,
(
np.zeros(input_model.data.shape[1:], dtype=input_model.data.dtype),
input_model.pixeldq | input_model.groupdq[0] | group.SATURATED,
np.zeros(input_model.err.shape[1:], dtype=input_model.err.dtype),
np.zeros(input_model.err.shape[1:], dtype=input_model.err.dtype),
np.zeros(input_model.err.shape[1:], dtype=input_model.err.dtype),
),
)

# Set all subsequent steps to skipped
for step_str in [
"linearity",
"dark",
"ramp_fit",
"assign_wcs",
"flat_field",
"photom",
"source_detection",
"tweakreg",
]:
fully_saturated_model.meta.cal_step[step_str] = "SKIPPED"

# Return zeroed-out image file
return fully_saturated_model
return lib

0 comments on commit d6d0ed0

Please sign in to comment.