Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: utc to pacific and improved error handling #1493

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions sync/config/SQL/SPAR/POSTGRES_SEEDLOT_EXTRACT.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ SELECT
s.applicant_client_number,
s.applicant_email_address,
s.applicant_locn_code AS applicant_client_locn,
s.approved_timestamp,
s.approved_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS approved_timestamp,
replace(s.approved_userid, '\', '@') approved_userid -- 'Replacing @ to \ for Provider@User
,CASE WHEN s.bc_source_ind THEN 'Y' ELSE 'N' END as bc_source_ind
,S.bec_version_id
Expand All @@ -99,7 +99,7 @@ SELECT
,S.collection_elevation_max
,S.collection_elevation_min
,CASE WHEN s.seedlot_status_code = 'PND' THEN drft.collection_end_date
ELSE s.collection_end_date
ELSE s.collection_end_date::date
END as collection_end_date
,s.collection_latitude_code
,s.collection_latitude_deg as collection_lat_deg
Expand All @@ -110,7 +110,7 @@ SELECT
,s.collection_longitude_min as collection_long_min
,s.collection_longitude_sec as collection_long_sec
,CASE WHEN s.seedlot_status_code = 'PND' THEN drft.collection_start_date
ELSE s.collection_start_date
ELSE s.collection_start_date::date
END as collection_start_date
,CASE WHEN s.seedlot_status_code = 'PND' THEN drft.cone_collection_method_code
ELSE scm1.cone_collection_method_code
Expand All @@ -120,16 +120,16 @@ SELECT
END as cone_collection_method2_code
,s.contaminant_pollen_bv
,CASE WHEN s.controlled_cross_ind = True THEN 'Y' WHEN controlled_cross_ind = False THEN 'N' ELSE '' END as CONTROLLED_CROSS_IND
,s.declared_timestamp
,s.declared_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS declared_timestamp
,REPLACE(s.declared_userid,'\', '@') declared_userid -- 'Replacing @ to \ for Provider@User
,s.effective_pop_size
,s.elevation
,s.elevation_max
,s.elevation_min
,s.entry_timestamp
,s.entry_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS entry_timestamp
,REPLACE(s.entry_userid,'\', '@') entry_userid -- 'Replacing @ to \ for Provider@User
,S.extraction_end_date
,S.extraction_st_date
,S.extraction_end_date::date
,S.extraction_st_date::date
,CASE WHEN s.seedlot_status_code = 'PND' THEN drft.extrct_cli_number
ELSE s.extractory_client_number
END as extrct_cli_number
Expand All @@ -143,9 +143,9 @@ SELECT
END as interm_facility_code
,s.interm_strg_locn_code as interm_strg_client_locn
,S.interm_strg_client_number
,S.interm_strg_end_date
,S.interm_strg_end_date::date
,S.interm_strg_locn
,S.interm_strg_st_date
,S.interm_strg_st_date::date
,S.latitude_deg_max
,S.latitude_deg_min
,S.latitude_degrees
Expand Down Expand Up @@ -193,7 +193,7 @@ SELECT
,s.temporary_strg_start_date::date as temporary_storage_start_date
,CASE WHEN s.to_be_registrd_ind THEN 'Y' Else 'N' END as to_be_registrd_ind
,s.total_parent_trees
,s.update_timestamp
,s.update_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS update_timestamp
,REPLACE(s.update_userid,'\', '@') update_userid -- 'Replacing @ to \ for Provider@User
,s.variant
,s.vegetation_code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ SELECT
, sgw.genetic_worth_code
, sgw.genetic_quality_value genetic_worth_rtng
, REPLACE(sgw.entry_userid,'\', '@') as entry_userid
, sgw.entry_timestamp
, sgw.entry_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS entry_timestamp
, REPLACE(sgw.update_userid,'\', '@') as update_userid
, sgw.update_timestamp
, sgw.update_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS update_timestamp
, sgw.revision_count
FROM spar.seedlot_genetic_worth sgw
JOIN spar.seedlot s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,7 @@ SELECT drft.seedlot_number
, json_array_elements(cast(all_step_data->'ownershipStep' as json)) ownerdata
WHERE drft.seedlot_number = %(p_seedlot_number)s
AND s.seedlot_status_code = 'PND'
AND ownerdata->'ownerAgency'->>'isInvalid' = 'false'
AND ownerdata->'ownerCode'->>'isInvalid' = 'false'
AND ownerdata->'ownerPortion'->>'isInvalid' = 'false'
AND ownerdata->'reservedPerc'->>'isInvalid' = 'false'
AND ownerdata->'surplusPerc'->>'isInvalid' = 'false'
AND ownerdata->'fundingSource'->>'isInvalid' = 'false'
AND NOT(drft.all_step_data @? '$.ownershipStep[*].*.isInvalid ? (@ == true)')
ORDER BY 1
, 2
, 3
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ SELECT
, slpz.seed_plan_zone_code
, CASE slpz.primary_ind WHEN TRUE THEN 'Y' ELSE 'N' END primary_ind
, REPLACE(slpz.entry_userid,'\', '@') as entry_userid
, slpz.entry_timestamp
, slpz.entry_timestamp AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles' AS entry_timestamp
, slpz.revision_count
FROM spar.seedlot_seed_plan_zone slpz
JOIN spar.seedlot s
Expand Down
84 changes: 45 additions & 39 deletions sync/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,51 +111,57 @@ def read_settings():
print(f"A fatal error has occurred when trying to load settings.yml ({type(err)}): {err}")


def main() -> None:
definition_of_yes = ["Y","YES","1","T","TRUE","t","true"]
def main() -> int:
try:
definition_of_yes = ["Y","YES","1","T","TRUE","t","true"]
job_return_code = 0

build_number = get_build_number()
print("<------------------ b.u.i.l.d n.u.m.b.e.r ----------------->")
print(f"Running Sync BUILD NUMBER: {build_number}")
print("<------------------ b.u.i.l.d n.u.m.b.e.r ----------------->")
build_number = get_build_number()
print("<------------------ b.u.i.l.d n.u.m.b.e.r ----------------->")
print(f"Running Sync BUILD NUMBER: {build_number}")
print("<------------------ b.u.i.l.d n.u.m.b.e.r ----------------->")

# print(os.environ.get("TEST_MODE"))
if os.environ.get("TEST_MODE") is None:
print("Error: test mode variable is None")
elif os.environ.get("EXECUTION_ID") is None:
print("Error: EXECUTION_ID is None, no execution defined to be executed in this run.")
else:
this_is_a_test = os.environ.get("TEST_MODE")
settings = read_settings()
print("<------------------ settings ----------------->")
print(settings)
print("<------------------ settings ----------------->")
if this_is_a_test in definition_of_yes:
print("Executing in Test mode")
required_variables_exists()
testPostgresConnection(settings["postgres"])
testOracleConnection(settings["oracle"])
# Vault disabled
# testVault()
else:
print("-------------------------------------")
print("Starting ETL main process ")
print("-------------------------------------")

dbOracle = generate_db_config("ORACLE","THE",settings["oracle"])
dbPostgres = generate_db_config("POSTGRES","spar",settings["postgres"])

execute_etl(dbPostgres, dbOracle)
# print(os.environ.get("TEST_MODE"))
if os.environ.get("TEST_MODE") is None:
print("Error: test mode variable is None")
elif os.environ.get("EXECUTION_ID") is None:
print("Error: EXECUTION_ID is None, no execution defined to be executed in this run.")
else:
this_is_a_test = os.environ.get("TEST_MODE")
settings = read_settings()
print("<------------------ settings ----------------->")
print(settings)
print("<------------------ settings ----------------->")
if this_is_a_test in definition_of_yes:
print("Executing in Test mode")
required_variables_exists()
testPostgresConnection(settings["postgres"])
testOracleConnection(settings["oracle"])
# Vault disabled
# testVault()
else:
print("-------------------------------------")
print("Starting ETL main process ")
print("-------------------------------------")
dbOracle = generate_db_config("ORACLE","THE",settings["oracle"])
dbPostgres = generate_db_config("POSTGRES","spar",settings["postgres"])
job_return_code = execute_etl(dbPostgres, dbOracle)

print("-------------------------------------")
print("ETL Main process finished ")
print("-------------------------------------")
print("-------------------------------------")
print("ETL Main process finished ")
print("-------------------------------------")
return job_return_code

except Exception as err:
return 1 #failure

# MAIN Execution
def execute_etl(dbPostgres, dbOracle) -> None:
def execute_etl(dbPostgres, dbOracle):
loggingBasicConfig(level=loggingDEBUG, stream=sys.stdout)
data_sync.execute_instance( oracle_config = dbOracle, postgres_config = dbPostgres ,track_config = dbPostgres)
return data_sync.execute_instance( oracle_config = dbOracle, postgres_config = dbPostgres ,track_config = dbPostgres)

if __name__ == '__main__':
main()
sys.exit(main())

12 changes: 11 additions & 1 deletion sync/src/module/data_synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def execute_instance(oracle_config, postgres_config, track_config):
current_cwd = path.join(path.abspath(path.dirname(__file__).split('src')[0]) , "config")
logger.info('Initializing Tracking Database Connection')
is_error = False
job_return_code = 1 #fail

with db_conn.database_connection(track_config) as track_db_conn:
temp_time = time.time()
Expand Down Expand Up @@ -66,7 +67,9 @@ def execute_instance(oracle_config, postgres_config, track_config):
#if not data_sync_ctl.validate_execution_map(execution_map):
# raise ETLConfigurationException ("ETL configuration validation failed")

process_seedlots(oracle_config, postgres_config, track_config, track_db_conn, schedule_times)
seedlot_metrics = process_seedlots(oracle_config, postgres_config, track_config, track_db_conn, schedule_times)
if "ERROR" in seedlot_metrics:
raise Exception(seedlot_metrics["ERROR"])

# Exception when validate_execution_map is false
except ETLConfigurationException:
Expand All @@ -83,10 +86,12 @@ def execute_instance(oracle_config, postgres_config, track_config):
logger.info('***** ETL Process finished with error *****')
logger.info(f'ETL Tool whole process took {timedelta(seconds=sync_elapsed_time)}')
run_status = "FAILURE"
job_return_code = 1
else:
stored_metrics["time_process"]=timedelta(seconds=sync_elapsed_time)
print_process_metrics(stored_metrics)
run_status = "SUCCESS"
job_return_code = 0

data_sync_ctl.update_execution_log(database_conn=track_db_conn,
database_schema=track_config['schema'],
Expand All @@ -95,6 +100,7 @@ def execute_instance(oracle_config, postgres_config, track_config):
run_status=run_status)

logger.info('***** Finish ETL Run *****')
return job_return_code

def identifyQueryParams(query, db_type, params) -> object:
if db_type == 'ORACLE':
Expand Down Expand Up @@ -352,6 +358,8 @@ def process_seedlots(oracle_config, postgres_config, track_config, track_db_conn
logger.critical("A fatal error has occurred", exc_info = True)
log_message =f"Error type: {type(err)}: {err}"
metrics['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
metrics.setdefault('ERROR', '')
metrics['ERROR'] += log_message + "\r\n"
data_sync_ctl.save_execution_log(track_db_conn,track_config['schema'],metrics)
seedlot_metrics['processes'] = processlst
seedlotlst.append(seedlot_metrics)
Expand All @@ -361,6 +369,8 @@ def process_seedlots(oracle_config, postgres_config, track_config, track_db_conn
logger.critical("A fatal error has occurred", exc_info = True)
log_message =f"Error type: {type(err)}: {err}"
metrics['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
metrics.setdefault('ERROR', '')
metrics['ERROR'] += log_message + "\r\n"
data_sync_ctl.save_execution_log(track_db_conn,track_config['schema'],metrics)

#data_sync_ctl.save_execution_log(track_db_conn,track_config['schema'],process["interface_id"],process["execution_id"],process_log)
Expand Down
Loading