Skip to content

Commit

Permalink
feat: utc to pacific and improved error handling (#1493)
Browse files Browse the repository at this point in the history
Co-authored-by: Derek Roberts <derek.roberts@gmail.com>
Co-authored-by: Craig Yu <craig.yu93@gmail.com>
  • Loading branch information
3 people authored Aug 14, 2024
1 parent 301f6c1 commit 8fa2219
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 59 deletions.
20 changes: 10 additions & 10 deletions sync/config/SQL/SPAR/POSTGRES_SEEDLOT_EXTRACT.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ SELECT
s.applicant_client_number,
s.applicant_email_address,
s.applicant_locn_code AS applicant_client_locn,
s.approved_timestamp,
s.approved_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS approved_timestamp,
replace(s.approved_userid, '\', '@') approved_userid -- 'Replacing @ to \ for Provider@User
,CASE WHEN s.bc_source_ind THEN 'Y' ELSE 'N' END as bc_source_ind
,S.bec_version_id
Expand All @@ -99,7 +99,7 @@ SELECT
,S.collection_elevation_max
,S.collection_elevation_min
,CASE WHEN s.seedlot_status_code = 'PND' THEN drft.collection_end_date
ELSE s.collection_end_date
ELSE s.collection_end_date::date
END as collection_end_date
,s.collection_latitude_code
,s.collection_latitude_deg as collection_lat_deg
Expand All @@ -110,7 +110,7 @@ SELECT
,s.collection_longitude_min as collection_long_min
,s.collection_longitude_sec as collection_long_sec
,CASE WHEN s.seedlot_status_code = 'PND' THEN drft.collection_start_date
ELSE s.collection_start_date
ELSE s.collection_start_date::date
END as collection_start_date
,CASE WHEN s.seedlot_status_code = 'PND' THEN drft.cone_collection_method_code
ELSE scm1.cone_collection_method_code
Expand All @@ -120,16 +120,16 @@ SELECT
END as cone_collection_method2_code
,s.contaminant_pollen_bv
,CASE WHEN s.controlled_cross_ind = True THEN 'Y' WHEN controlled_cross_ind = False THEN 'N' ELSE '' END as CONTROLLED_CROSS_IND
,s.declared_timestamp
,s.declared_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS declared_timestamp
,REPLACE(s.declared_userid,'\', '@') declared_userid -- 'Replacing @ to \ for Provider@User
,s.effective_pop_size
,s.elevation
,s.elevation_max
,s.elevation_min
,s.entry_timestamp
,s.entry_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS entry_timestamp
,REPLACE(s.entry_userid,'\', '@') entry_userid -- 'Replacing @ to \ for Provider@User
,S.extraction_end_date
,S.extraction_st_date
,S.extraction_end_date::date
,S.extraction_st_date::date
,CASE WHEN s.seedlot_status_code = 'PND' THEN drft.extrct_cli_number
ELSE s.extractory_client_number
END as extrct_cli_number
Expand All @@ -143,9 +143,9 @@ SELECT
END as interm_facility_code
,s.interm_strg_locn_code as interm_strg_client_locn
,S.interm_strg_client_number
,S.interm_strg_end_date
,S.interm_strg_end_date::date
,S.interm_strg_locn
,S.interm_strg_st_date
,S.interm_strg_st_date::date
,S.latitude_deg_max
,S.latitude_deg_min
,S.latitude_degrees
Expand Down Expand Up @@ -193,7 +193,7 @@ SELECT
,s.temporary_strg_start_date::date as temporary_storage_start_date
,CASE WHEN s.to_be_registrd_ind THEN 'Y' Else 'N' END as to_be_registrd_ind
,s.total_parent_trees
,s.update_timestamp
,s.update_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS update_timestamp
,REPLACE(s.update_userid,'\', '@') update_userid -- 'Replacing @ to \ for Provider@User
,s.variant
,s.vegetation_code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ SELECT
, sgw.genetic_worth_code
, sgw.genetic_quality_value genetic_worth_rtng
, REPLACE(sgw.entry_userid,'\', '@') as entry_userid
, sgw.entry_timestamp
, sgw.entry_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS entry_timestamp
, REPLACE(sgw.update_userid,'\', '@') as update_userid
, sgw.update_timestamp
, sgw.update_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS update_timestamp
, sgw.revision_count
FROM spar.seedlot_genetic_worth sgw
JOIN spar.seedlot s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,7 @@ SELECT drft.seedlot_number
, json_array_elements(cast(all_step_data->'ownershipStep' as json)) ownerdata
WHERE drft.seedlot_number = %(p_seedlot_number)s
AND s.seedlot_status_code = 'PND'
AND ownerdata->'ownerAgency'->>'isInvalid' = 'false'
AND ownerdata->'ownerCode'->>'isInvalid' = 'false'
AND ownerdata->'ownerPortion'->>'isInvalid' = 'false'
AND ownerdata->'reservedPerc'->>'isInvalid' = 'false'
AND ownerdata->'surplusPerc'->>'isInvalid' = 'false'
AND ownerdata->'fundingSource'->>'isInvalid' = 'false'
AND NOT(drft.all_step_data @? '$.ownershipStep[*].*.isInvalid ? (@ == true)')
ORDER BY 1
, 2
, 3
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ SELECT
, slpz.seed_plan_zone_code
, CASE slpz.primary_ind WHEN TRUE THEN 'Y' ELSE 'N' END primary_ind
, REPLACE(slpz.entry_userid,'\', '@') as entry_userid
, slpz.entry_timestamp
, slpz.entry_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS entry_timestamp
, slpz.revision_count
FROM spar.seedlot_seed_plan_zone slpz
JOIN spar.seedlot s
Expand Down
84 changes: 45 additions & 39 deletions sync/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,51 +111,57 @@ def read_settings():
print(f"A fatal error has occurred when trying to load settings.yml ({type(err)}): {err}")


def main() -> None:
definition_of_yes = ["Y","YES","1","T","TRUE","t","true"]
def main() -> int:
try:
definition_of_yes = ["Y","YES","1","T","TRUE","t","true"]
job_return_code = 0

build_number = get_build_number()
print("<------------------ b.u.i.l.d n.u.m.b.e.r ----------------->")
print(f"Running Sync BUILD NUMBER: {build_number}")
print("<------------------ b.u.i.l.d n.u.m.b.e.r ----------------->")
build_number = get_build_number()
print("<------------------ b.u.i.l.d n.u.m.b.e.r ----------------->")
print(f"Running Sync BUILD NUMBER: {build_number}")
print("<------------------ b.u.i.l.d n.u.m.b.e.r ----------------->")

# print(os.environ.get("TEST_MODE"))
if os.environ.get("TEST_MODE") is None:
print("Error: test mode variable is None")
elif os.environ.get("EXECUTION_ID") is None:
print("Error: EXECUTION_ID is None, no execution defined to be executed in this run.")
else:
this_is_a_test = os.environ.get("TEST_MODE")
settings = read_settings()
print("<------------------ settings ----------------->")
print(settings)
print("<------------------ settings ----------------->")
if this_is_a_test in definition_of_yes:
print("Executing in Test mode")
required_variables_exists()
testPostgresConnection(settings["postgres"])
testOracleConnection(settings["oracle"])
# Vault disabled
# testVault()
else:
print("-------------------------------------")
print("Starting ETL main process ")
print("-------------------------------------")

dbOracle = generate_db_config("ORACLE","THE",settings["oracle"])
dbPostgres = generate_db_config("POSTGRES","spar",settings["postgres"])

execute_etl(dbPostgres, dbOracle)
# print(os.environ.get("TEST_MODE"))
if os.environ.get("TEST_MODE") is None:
print("Error: test mode variable is None")
elif os.environ.get("EXECUTION_ID") is None:
print("Error: EXECUTION_ID is None, no execution defined to be executed in this run.")
else:
this_is_a_test = os.environ.get("TEST_MODE")
settings = read_settings()
print("<------------------ settings ----------------->")
print(settings)
print("<------------------ settings ----------------->")
if this_is_a_test in definition_of_yes:
print("Executing in Test mode")
required_variables_exists()
testPostgresConnection(settings["postgres"])
testOracleConnection(settings["oracle"])
# Vault disabled
# testVault()
else:
print("-------------------------------------")
print("Starting ETL main process ")
print("-------------------------------------")
dbOracle = generate_db_config("ORACLE","THE",settings["oracle"])
dbPostgres = generate_db_config("POSTGRES","spar",settings["postgres"])
job_return_code = execute_etl(dbPostgres, dbOracle)

print("-------------------------------------")
print("ETL Main process finished ")
print("-------------------------------------")
print("-------------------------------------")
print("ETL Main process finished ")
print("-------------------------------------")
return job_return_code

except Exception as err:
return 1 #failure

# MAIN Execution
def execute_etl(dbPostgres, dbOracle) -> None:
def execute_etl(dbPostgres, dbOracle):
loggingBasicConfig(level=loggingDEBUG, stream=sys.stdout)
data_sync.execute_instance( oracle_config = dbOracle, postgres_config = dbPostgres ,track_config = dbPostgres)
return data_sync.execute_instance( oracle_config = dbOracle, postgres_config = dbPostgres ,track_config = dbPostgres)

if __name__ == '__main__':
main()
sys.exit(main())

12 changes: 11 additions & 1 deletion sync/src/module/data_synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def execute_instance(oracle_config, postgres_config, track_config):
current_cwd = path.join(path.abspath(path.dirname(__file__).split('src')[0]) , "config")
logger.info('Initializing Tracking Database Connection')
is_error = False
job_return_code = 1 #fail

with db_conn.database_connection(track_config) as track_db_conn:
temp_time = time.time()
Expand Down Expand Up @@ -66,7 +67,9 @@ def execute_instance(oracle_config, postgres_config, track_config):
#if not data_sync_ctl.validate_execution_map(execution_map):
# raise ETLConfigurationException ("ETL configuration validation failed")

process_seedlots(oracle_config, postgres_config, track_config, track_db_conn, schedule_times)
seedlot_metrics = process_seedlots(oracle_config, postgres_config, track_config, track_db_conn, schedule_times)
if "ERROR" in seedlot_metrics:
raise Exception(seedlot_metrics["ERROR"])

# Exception when validate_execution_map is false
except ETLConfigurationException:
Expand All @@ -83,10 +86,12 @@ def execute_instance(oracle_config, postgres_config, track_config):
logger.info('***** ETL Process finished with error *****')
logger.info(f'ETL Tool whole process took {timedelta(seconds=sync_elapsed_time)}')
run_status = "FAILURE"
job_return_code = 1
else:
stored_metrics["time_process"]=timedelta(seconds=sync_elapsed_time)
print_process_metrics(stored_metrics)
run_status = "SUCCESS"
job_return_code = 0

data_sync_ctl.update_execution_log(database_conn=track_db_conn,
database_schema=track_config['schema'],
Expand All @@ -95,6 +100,7 @@ def execute_instance(oracle_config, postgres_config, track_config):
run_status=run_status)

logger.info('***** Finish ETL Run *****')
return job_return_code

def identifyQueryParams(query, db_type, params) -> object:
if db_type == 'ORACLE':
Expand Down Expand Up @@ -352,6 +358,8 @@ def process_seedlots(oracle_config, postgres_config, track_config, track_db_conn
logger.critical("A fatal error has occurred", exc_info = True)
log_message =f"Error type: {type(err)}: {err}"
metrics['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
metrics.setdefault('ERROR', '')
metrics['ERROR'] += log_message + "\r\n"
data_sync_ctl.save_execution_log(track_db_conn,track_config['schema'],metrics)
seedlot_metrics['processes'] = processlst
seedlotlst.append(seedlot_metrics)
Expand All @@ -361,6 +369,8 @@ def process_seedlots(oracle_config, postgres_config, track_config, track_db_conn
logger.critical("A fatal error has occurred", exc_info = True)
log_message =f"Error type: {type(err)}: {err}"
metrics['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
metrics.setdefault('ERROR', '')
metrics['ERROR'] += log_message + "\r\n"
data_sync_ctl.save_execution_log(track_db_conn,track_config['schema'],metrics)

#data_sync_ctl.save_execution_log(track_db_conn,track_config['schema'],process["interface_id"],process["execution_id"],process_log)
Expand Down

0 comments on commit 8fa2219

Please sign in to comment.