From 5f2a4879163d0eda53241dbc7f3c4e8f19a62c6b Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 4 Apr 2024 14:26:22 -0700 Subject: [PATCH 01/21] replace the dbstore shutdown method with a java autoclose also use try with resources statements to make sure the connections get closed --- .../nceas/mdqengine/collections/Runs.java | 265 +++++---- .../edu/ucsb/nceas/mdqengine/model/Run.java | 76 ++- .../nceas/mdqengine/scheduler/MonitorJob.java | 168 +++--- .../nceas/mdqengine/scheduler/NodeList.java | 109 ++-- .../mdqengine/scheduler/RequestReportJob.java | 539 ++++++++++-------- .../mdqengine/scheduler/RequestScorerJob.java | 386 +++++++------ .../nceas/mdqengine/store/DatabaseStore.java | 4 +- .../nceas/mdqengine/store/InMemoryStore.java | 11 +- .../ucsb/nceas/mdqengine/store/MDQStore.java | 2 +- 9 files changed, 858 insertions(+), 702 deletions(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/collections/Runs.java b/src/main/java/edu/ucsb/nceas/mdqengine/collections/Runs.java index feb5b95f..6eeb12bb 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/collections/Runs.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/collections/Runs.java @@ -3,6 +3,7 @@ import edu.ucsb.nceas.mdqengine.exception.MetadigException; import edu.ucsb.nceas.mdqengine.model.Run; import edu.ucsb.nceas.mdqengine.model.SysmetaModel; +import edu.ucsb.nceas.mdqengine.store.DatabaseStore; import edu.ucsb.nceas.mdqengine.store.MDQStore; import edu.ucsb.nceas.mdqengine.store.StoreFactory; import org.apache.commons.logging.Log; @@ -19,16 +20,18 @@ public class Runs { private Boolean foundFirstPid = false; private String firstPidInSequence = null; - public Runs () { + public Runs() { } - /** - * Determine the correct sequenceId for this run by finding the sequenceId assigned to previous pids + * Determine the correct sequenceId for this run by finding the sequenceId + * assigned to previous pids * in this obsolescence chain. *

- * Evaluate the sequenceId of all runs in the DataONE obsolescence chain and determine the correct - * seriedId to use for the current pid. If one doesn't exist, then create a new one. + * Evaluate the sequenceId of all runs in the DataONE obsolescence chain and + * determine the correct + * seriedId to use for the current pid. If one doesn't exist, then create a new + * one. *

* * @throws Exception @@ -41,11 +44,15 @@ public String getSequenceId() { /** * Get the next run in a series, recursively *

- * Recursion ends when either the next run is not available (not in store) or the pointer to the + * Recursion ends when either the next run is not available (not in store) or + * the pointer to the * next pid in the chain isn't specified (we are at the end of the chain). *

- * @param metadataId The identifier of the metadata document associated with the report - * @param suiteId The identifier for the suite used to score the metadata + * + * @param metadataId The identifier of the metadata document associated + * with the report + * @param suiteId The identifier for the suite used to score the + * metadata * @param stopWhenSIfound * @param store * @param forward @@ -53,7 +60,8 @@ public String getSequenceId() { * @throws Exception */ - public void getNextRun(String metadataId, String suiteId, Boolean stopWhenSIfound, MDQStore store, Boolean forward, Integer level) { + public void getNextRun(String metadataId, String suiteId, Boolean stopWhenSIfound, MDQStore store, Boolean forward, + Integer level) { Run run = null; String obsoletedBy = null; @@ -70,10 +78,11 @@ public void getNextRun(String metadataId, String suiteId, Boolean stopWhenSIfoun run = runs.get(metadataId); // If not in the collection, see if it is in the store - if(run == null) { + if (run == null) { try { log.debug("Run for pid: " + metadataId + " not in collection, getting from store."); - // the 'store.getRun' returns null if the run isn't found in the store. This will typically happen + // the 'store.getRun' returns null if the run isn't found in the store. This + // will typically happen // if the next pid in the chain hasn't been cataloged. run = store.getRun(metadataId, suiteId); } catch (MetadigException me) { @@ -83,18 +92,21 @@ public void getNextRun(String metadataId, String suiteId, Boolean stopWhenSIfoun } } - // If a run was found for this pid in the chain, check if a sequence id was previously - // defined for it. We want to use the same sequence id for all pids in the chain, right? - if(run != null) { + // If a run was found for this pid in the chain, check if a sequence id was + // previously + // defined for it. We want to use the same sequence id for all pids in the + // chain, right? + if (run != null) { // get sequence id for this run sequenceId = run.getSequenceId(); // End recursion if the sequence id for this chain has been found. - if(sequenceId != null) { + if (sequenceId != null) { SIfound = true; // Has the sequence id for the collection been defined yet? - if(this.sequenceId != null) { - if(! this.sequenceId.equals(sequenceId)) { - log.error("Warning, new sequenceId found for chain: " + sequenceId + " found at pid: " + metadataId); + if (this.sequenceId != null) { + if (!this.sequenceId.equals(sequenceId)) { + log.error("Warning, new sequenceId found for chain: " + sequenceId + " found at pid: " + + metadataId); } } else { // We got the right sequence id for this chain @@ -103,15 +115,17 @@ public void getNextRun(String metadataId, String suiteId, Boolean stopWhenSIfoun } } - if(stopWhenSIfound) - if(SIfound) return; + if (stopWhenSIfound) + if (SIfound) + return; // The termination tests have passed, add this run to the collection this.addRun(metadataId, run); - // Get the sysmeta object within the run, to retrieve the 'obsoletes' or 'obsoletedBy' pid + // Get the sysmeta object within the run, to retrieve the 'obsoletes' or + // 'obsoletedBy' pid sysmetaModel = run.getSysmeta(); - if(sysmetaModel == null) { + if (sysmetaModel == null) { log.error("Missing sysmeta model for run with id: " + run.getObjectIdentifier()); return; } @@ -119,9 +133,9 @@ public void getNextRun(String metadataId, String suiteId, Boolean stopWhenSIfoun if (forward) { log.debug("Checking for next forward pid (obsoletedBy)"); obsoletedBy = sysmetaModel.getObsoletedBy(); - if(obsoletedBy != null) { + if (obsoletedBy != null) { // Check for an invalid obsoletedBy - pid links to itself - if(obsoletedBy.compareToIgnoreCase(metadataId) == 0) { + if (obsoletedBy.compareToIgnoreCase(metadataId) == 0) { log.debug("Stopping traversal at invalid obsoletedBy, pid " + metadataId + " obsoletes itself"); return; } @@ -134,17 +148,18 @@ public void getNextRun(String metadataId, String suiteId, Boolean stopWhenSIfoun // Moving in the backward direction, get the next pid in the chain log.debug("Checking for next backward pid (obsoletes)"); obsoletes = sysmetaModel.getObsoletes(); - if(obsoletes != null) { + if (obsoletes != null) { // Check for an invalid obsoletedBy - pid links to itself - if(obsoletes.compareToIgnoreCase(metadataId) == 0) { - log.debug("Stopping traversal at invalid obsoletes, pid " + metadataId + " is obsoleted by itself"); + if (obsoletes.compareToIgnoreCase(metadataId) == 0) { + log.debug("Stopping traversal at invalid obsoletes, pid " + metadataId + + " is obsoleted by itself"); return; } log.debug("traversing backward to obsoletes: " + obsoletes); getNextRun(obsoletes, suiteId, stopWhenSIfound, store, forward, level); } else { // Have we reached the first run in the sequence (not obsoleted by any pid)? - if(run.getObsoletes() == null) { + if (run.getObsoletes() == null) { this.foundFirstPid = true; log.debug("Found first pid in sequence: " + run.getObjectIdentifier()); firstPidInSequence = run.getObjectIdentifier(); @@ -154,7 +169,8 @@ public void getNextRun(String metadataId, String suiteId, Boolean stopWhenSIfoun } } else { // The run was null, recursion in the current direction ends. - log.debug("Run not found in store for pid: " + metadataId + ", suiteId: " + suiteId + ", terminating search in current direction."); + log.debug("Run not found in store for pid: " + metadataId + ", suiteId: " + suiteId + + ", terminating search in current direction."); } return; @@ -163,73 +179,90 @@ public void getNextRun(String metadataId, String suiteId, Boolean stopWhenSIfoun /** * Get runs for all pids in a DataONE obsolescence chain from the DataStore *

- * Successive versions of a metadata document are represented in DataONE as pids comprising an obsolecence chain (i.e. linked list) or sequence, + * Successive versions of a metadata document are represented in DataONE as pids + * comprising an obsolecence chain (i.e. linked list) or sequence, * where new versions 'obsolete' older, outdated ones. - * This method follows the obsolecence chain to retrieve all runs corresponding to pids in the obsolescence chain. - * The least number of pids to get will be for all versions that are within the current month of the starting pid. - * If stopWhenSIfound is specified, the search will continue until the sequenceId is found, or the entire chain is + * This method follows the obsolecence chain to retrieve all runs corresponding + * to pids in the obsolescence chain. + * The least number of pids to get will be for all versions that are within the + * current month of the starting pid. + * If stopWhenSIfound is specified, the search will continue until the + * sequenceId is found, or the entire chain is * fetched. * - * A new sequenceId can only be assinged to the first version in a chain. If pids are processed out of 'dateUploaded' order, - * i.e. due to multi-processing, there may be 'breaks' in the chain and pids may temporarily be without sequenceIds. When a - * new pid is added, a check will be made to re-link pids back to the chain by traversing backward until a sequenceId is - * found, then previously saved pids with no sequenceId will have the correct one assinged. + * A new sequenceId can only be assinged to the first version in a chain. If + * pids are processed out of 'dateUploaded' order, + * i.e. due to multi-processing, there may be 'breaks' in the chain and pids may + * temporarily be without sequenceIds. When a + * new pid is added, a check will be made to re-link pids back to the chain by + * traversing backward until a sequenceId is + * found, then previously saved pids with no sequenceId will have the correct + * one assinged. *

* - * @param run The starting run - get all pids in this runs obsolesence chain + * @param run The starting run - get all pids in this runs obsolesence chain * @param suiteId The metadig-engine suite id of the suite to match * @throws Exception */ public void getRunSequence(Run run, String suiteId, Boolean stopWhenSIfound) throws MetadigException { - boolean persist = true; - MDQStore store = StoreFactory.getStore(persist); - Boolean forward; - String metadataId = run.getObjectIdentifier(); - this.sequenceId = null; - - // Keep track of the current recursion level - Integer level = 0; - - // Convert input date string to JodaTime - DateTime targetDate = new DateTime(run.getDateUploaded()); - // get target month start - DateTime minDate = targetDate.withDayOfMonth(1); - // get target month end - DateTime maxDate = targetDate.plusMonths(1).withDayOfMonth(1).minusDays(1); - - // Start the traversal in the backward direction - log.debug("Getting all runs (backward) for suiteId: " + suiteId + ", metadataId: " + metadataId + ", minDate: " + minDate + ", " + maxDate); - forward = false; - getNextRun(metadataId, suiteId, stopWhenSIfound, store, forward, level); - - // If the sequenceId has not been obtained when searching in the backward direction, then there - // is no reason to search forward, as this means that the first pid in the series has not been found, - // it will not be found searching forward. - if(getSequenceId() == null && !foundFirstPid) { - log.debug("Unable to find sequenceId for this sequence, will not search in forward direction."); - return; + try (DatabaseStore store = new DatabaseStore()) { + + Boolean forward; + String metadataId = run.getObjectIdentifier(); + this.sequenceId = null; + + // Keep track of the current recursion level + Integer level = 0; + + // Convert input date string to JodaTime + DateTime targetDate = new DateTime(run.getDateUploaded()); + // get target month start + DateTime minDate = targetDate.withDayOfMonth(1); + // get target month end + DateTime maxDate = targetDate.plusMonths(1).withDayOfMonth(1).minusDays(1); + + // Start the traversal in the backward direction + log.debug("Getting all runs (backward) for suiteId: " + suiteId + ", metadataId: " + metadataId + + ", minDate: " + + minDate + ", " + maxDate); + forward = false; + getNextRun(metadataId, suiteId, stopWhenSIfound, store, forward, level); + + // If the sequenceId has not been obtained when searching in the backward + // direction, then there + // is no reason to search forward, as this means that the first pid in the + // series has not been found, + // it will not be found searching forward. + if (getSequenceId() == null && !foundFirstPid) { + log.debug("Unable to find sequenceId for this sequence, will not search in forward direction."); + return; + } + // Continue traversal in the forward direction, if necessary + log.debug("Getting all runs (forward) for suiteId: " + suiteId + ", metadataId: " + metadataId); + forward = true; + getNextRun(metadataId, suiteId, stopWhenSIfound, store, forward, level); + + log.debug("Shutting down store"); + log.debug("Done getting all runs (in DataONE obsolescence chain) for : metadata PID: " + metadataId + + ", suite id: " + suiteId); } - // Continue traversal in the forward direction, if necessary - log.debug("Getting all runs (forward) for suiteId: " + suiteId + ", metadataId: " + metadataId); - forward = true; - getNextRun(metadataId, suiteId, stopWhenSIfound, store, forward, level); - - log.debug("Shutting down store"); - store.shutdown(); - log.debug("Done getting all runs (in DataONE obsolescence chain) for : metadata PID: " + metadataId + ", suite id: " + suiteId); } /** * Update each run in an obsolescence sequence with a new sequenceId. *

- * The runs in a sequence have already been fetched via getRunsInSeq, but some of them may not - * have been assigned a sequenceId, i.e. possibly due to broken chains that have now been joined. Check - * each pid in the sequence and assign it the sequenceId if it doesn't have it already. + * The runs in a sequence have already been fetched via getRunsInSeq, but + * some of them may not + * have been assigned a sequenceId, i.e. possibly due to broken chains that have + * now been joined. Check + * each pid in the sequence and assign it the sequenceId if it doesn't have it + * already. *

* - * @param sequenceId a quality engine maintained sequence identifier, similiar in function to the DataONE series id. + * @param sequenceId a quality engine maintained sequence identifier, similiar + * in function to the DataONE series id. */ public void updateSequenceId(String sequenceId) { @@ -247,11 +280,11 @@ public void updateSequenceId(String sequenceId) { pid = (String) entry.getKey(); run = (Run) entry.getValue(); thisSeqId = run.getSequenceId(); - if(thisSeqId != null && thisSeqId.equals(sequenceId)) { + if (thisSeqId != null && thisSeqId.equals(sequenceId)) { continue; } - if(thisSeqId != null && ! thisSeqId.equals(sequenceId)) { + if (thisSeqId != null && !thisSeqId.equals(sequenceId)) { log.error("Multiple sequence ids found in one sequence chain for pid: " + run.getObjectIdentifier()); } log.debug("Updating sequence id for pid: " + run.getObjectIdentifier() + " to id: " + sequenceId); @@ -265,7 +298,7 @@ public void updateSequenceId(String sequenceId) { */ public void addRun(String metadataPid, Run run) { - if(! this.runs.containsKey(metadataPid)) { + if (!this.runs.containsKey(metadataPid)) { log.trace("Adding run for pid: " + metadataPid); this.runs.put(metadataPid, run); } @@ -282,11 +315,14 @@ public String generateId() { } /** - *

Determine which run in this obsolecense sequence is the latest in the month, give a date.

+ *

+ * Determine which run in this obsolecense sequence is the latest in the month, + * give a date. + *

* * @param date the date to use for comparison */ - public void setLatestRunInMonth (Date date) { + public void setLatestRunInMonth(Date date) { Run run = null; @@ -312,23 +348,26 @@ public void setLatestRunInMonth (Date date) { run = entry.getValue(); dateUploaded = new DateTime(run.getDateUploaded()); latestSet = run.getIsLatest(); - log.debug("Checking run with pid: " + thisPid + ", dateUploaded: " + dateUploaded + ", isLatest: " + latestSet); + log.debug("Checking run with pid: " + thisPid + ", dateUploaded: " + dateUploaded + ", isLatest: " + + latestSet); // Don't consider this run if it is outside the target month - if(dateUploaded.isBefore(minDate) || dateUploaded.isAfter(maxDate)) { + if (dateUploaded.isBefore(minDate) || dateUploaded.isAfter(maxDate)) { log.debug("Skipping out of date range pid: " + run.getObjectIdentifier()); continue; } // Update the run for the latest pid. - if(thisPid.equalsIgnoreCase(latestPid)) { - log.info("Setting latest run in month to pid: " + run.getObjectIdentifier() + " with date: " + run.getDateUploaded()); + if (thisPid.equalsIgnoreCase(latestPid)) { + log.info("Setting latest run in month to pid: " + run.getObjectIdentifier() + " with date: " + + run.getDateUploaded()); run.setIsLatest(true); run.setModified(true); // Update the collection with this updated run this.runs.replace(thisPid, run); } else { // Not the latest pid, 'unmark' it if needed - if(latestSet) { - log.info("Unsetting latest run in month to pid: " + run.getObjectIdentifier() + " with date: " + run.getDateUploaded()); + if (latestSet) { + log.info("Unsetting latest run in month to pid: " + run.getObjectIdentifier() + " with date: " + + run.getDateUploaded()); run.setIsLatest(false); run.setModified(true); // Update the collection with this updated run @@ -340,11 +379,15 @@ public void setLatestRunInMonth (Date date) { } /** - *

Return the run from the collection that is the latest in the given month. The - * field 'dateUploaded' is used for comparision. Since all runs in the collection are - * in the same DateONE obsolecense chain, returning the 'latest' in a given month + *

+ * Return the run from the collection that is the latest in the given month. The + * field 'dateUploaded' is used for comparision. Since all runs in the + * collection are + * in the same DateONE obsolecense chain, returning the 'latest' in a given + * month * should correspond to the pid with the highest metadata quality score. - * Finding this run (pid) can be useful to aggregation and display routines to filter out + * Finding this run (pid) can be useful to aggregation and display routines to + * filter out * the 'best' runs in a month, to more accurately represent the progression of * quality scores over time. *

@@ -355,7 +398,7 @@ public void setLatestRunInMonth (Date date) { * * @return Run - the latest run in the month */ - public Run getLatestRun(DateTime targetDate, DateTime minDate, DateTime maxDate) { + public Run getLatestRun(DateTime targetDate, DateTime minDate, DateTime maxDate) { Run run = null; @@ -365,7 +408,7 @@ public Run getLatestRun(DateTime targetDate, DateTime minDate, DateTime maxDate DateTime thisDate = null; String thisPid = null; - //DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy MM dd"); + // DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy MM dd"); log.debug("Getting latest pid in date range " + minDate.toString() + " to " + maxDate.toString()); // Loop through each run, find latest in the month specified @@ -376,7 +419,7 @@ public Run getLatestRun(DateTime targetDate, DateTime minDate, DateTime maxDate Date entryDate = run.getDateUploaded(); // This run doesn't contain a sysmeta and so doesn't have a date. This probably // should never happen but just in case... - if(entryDate == null) { + if (entryDate == null) { log.debug("Run pid: " + thisPid + ", date is null"); continue; } else { @@ -386,11 +429,11 @@ public Run getLatestRun(DateTime targetDate, DateTime minDate, DateTime maxDate } // Is this current month - if(thisDate.isBefore(minDate)) { + if (thisDate.isBefore(minDate)) { log.debug("Skipping pid: " + thisPid + " with date: " + entryDate + " (after end of month)"); continue; } - if(thisDate.isAfter(maxDate)) { + if (thisDate.isAfter(maxDate)) { log.debug("Skipping pid: " + thisPid + " with date: " + entryDate + " before start of month"); continue; } @@ -399,7 +442,7 @@ public Run getLatestRun(DateTime targetDate, DateTime minDate, DateTime maxDate // a new 'leader'. This assumes that the newer pid in this sequence // is actually 'better' than the previous one. // Have to check for date equals in case this is the only pid in the sequence - if(thisDate.isAfter(targetDate) || thisDate.isEqual(targetDate)) { + if (thisDate.isAfter(targetDate) || thisDate.isEqual(targetDate)) { log.trace("Setting latest pid: " + thisPid + ", date: " + thisDate); latestPid = thisPid; latestDate = thisDate; @@ -411,7 +454,6 @@ public Run getLatestRun(DateTime targetDate, DateTime minDate, DateTime maxDate return this.runs.get(latestPid); } - /** * Set the sequence identifier for the run sequence. * @@ -419,7 +461,7 @@ public Run getLatestRun(DateTime targetDate, DateTime minDate, DateTime maxDate * */ - public void setSequenceId(String sequenceId) { + public void setSequenceId(String sequenceId) { this.sequenceId = sequenceId; } @@ -448,14 +490,13 @@ public ArrayList getModifiedRuns() { for (Map.Entry entry : this.runs.entrySet()) { thisPid = entry.getKey(); run = entry.getValue(); - if(run.getModified()) { + if (run.getModified()) { modRuns.add(run); } } return modRuns; } - /** * Update all modified runs in the collection to the datastore. * @@ -473,14 +514,15 @@ public void update() { thisPid = entry.getKey(); run = entry.getValue(); modified = run.getModified(); - if(modified) { + if (modified) { try { log.debug("Updating modified quality run for pid: " + run.getObjectIdentifier() + ", suite: " - + run.getSuiteId() + ", dateUploaded: " + run.getDateUploaded() + ", sequenceId: " - + run.getSequenceId() + ", isLatest: " + run.getIsLatest()); + + run.getSuiteId() + ", dateUploaded: " + run.getDateUploaded() + ", sequenceId: " + + run.getSequenceId() + ", isLatest: " + run.getIsLatest()); run.save(); - // Keep modified setting for modified runs in case we need to do other operations on the runs (e.g. indexing) - //this.runs.replace(thisPid, run); + // Keep modified setting for modified runs in case we need to do other + // operations on the runs (e.g. indexing) + // this.runs.replace(thisPid, run); } catch (Exception ex) { log.error("Unable to save the quality report to database:" + ex.getMessage()); } @@ -489,10 +531,13 @@ public void update() { } /** - * If pids are missing in the current obsolesence chain, then a traversal may not reach the starting pid, i.e. - * the very first one in the chain. This must be determined, as we only assign a sequenceId when the first pids is found. + * If pids are missing in the current obsolesence chain, then a traversal may + * not reach the starting pid, i.e. + * the very first one in the chain. This must be determined, as we only assign a + * sequenceId when the first pids is found. * - * @return foundFirstPid - was the starting pid in this obsolesense chain reached? + * @return foundFirstPid - was the starting pid in this obsolesense chain + * reached? */ public Boolean getFoundFirstPid() { return this.foundFirstPid; diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/model/Run.java b/src/main/java/edu/ucsb/nceas/mdqengine/model/Run.java index cd85f705..d45a33da 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/model/Run.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/model/Run.java @@ -1,6 +1,7 @@ package edu.ucsb.nceas.mdqengine.model; import edu.ucsb.nceas.mdqengine.exception.MetadigException; +import edu.ucsb.nceas.mdqengine.store.DatabaseStore; import edu.ucsb.nceas.mdqengine.store.MDQStore; import edu.ucsb.nceas.mdqengine.store.StoreFactory; import org.apache.commons.configuration2.ex.ConfigurationException; @@ -266,32 +267,27 @@ public Integer getRunCount() { */ public void save() throws MetadigException { - boolean persist = true; - MDQStore store = StoreFactory.getStore(persist); + try (DatabaseStore store = new DatabaseStore()) { - log.debug("Saving to persistent storage: metadata PID: " + this.getObjectIdentifier() + ", suite id: " - + this.getSuiteId()); + log.debug("Saving to persistent storage: metadata PID: " + this.getObjectIdentifier() + ", suite id: " + + this.getSuiteId()); - try { - store.saveRun(this); - } catch (MetadigException me) { - log.debug("Error saving run: " + me.getCause()); - if (me.getCause() instanceof SQLException) { - log.debug("Retrying saveRun() due to error"); - store.renew(); + try { store.saveRun(this); - } else { - throw (me); + log.debug("Done saving to persistent storage: metadata PID: " + this.getObjectIdentifier() + + ", suite id: " + + this.getSuiteId()); + } catch (MetadigException me) { + log.debug("Error saving run: " + me.getCause()); + if (me.getCause() instanceof SQLException) { + log.debug("Retrying saveRun() due to error"); + store.renew(); + store.saveRun(this); + } else { + throw (me); + } } } - - // Note that when the connection pooler 'pgbouncer' is used, closing the - // connection actually just returns - // the connection to the pool that pgbouncer maintains. - log.debug("Shutting down store"); - store.shutdown(); - log.debug("Done saving to persistent storage: metadata PID: " + this.getObjectIdentifier() + ", suite id: " - + this.getSuiteId()); } /** @@ -306,27 +302,25 @@ public void save() throws MetadigException { */ public static Run getRun(String metadataId, String suiteId) throws MetadigException, IOException, ConfigurationException { - boolean persist = true; - MDQStore store = StoreFactory.getStore(persist); - - log.debug("Getting run for suiteId: " + suiteId + ", metadataId: " + metadataId); - - Run run = null; - try { - run = store.getRun(metadataId, suiteId); - } catch (MetadigException me) { - log.debug("Error getting run: " + me.getCause()); - if (me.getCause() instanceof SQLException) { - log.debug("Retrying getRun() due to error"); - store.renew(); - store.getRun(metadataId, suiteId); - } else { - throw (me); + + try (DatabaseStore store = new DatabaseStore()) { + + log.debug("Getting run for suiteId: " + suiteId + ", metadataId: " + metadataId); + + Run run = null; + try { + run = store.getRun(metadataId, suiteId); + } catch (MetadigException me) { + log.debug("Error getting run: " + me.getCause()); + if (me.getCause() instanceof SQLException) { + log.debug("Retrying getRun() due to error"); + store.renew(); + store.getRun(metadataId, suiteId); + } else { + throw (me); + } } + return run; } - log.debug("Shutting down store"); - store.shutdown(); - log.debug("Done getting from persistent storage: metadata PID: " + metadataId + ", suite id: " + suiteId); - return run; } } diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/MonitorJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/MonitorJob.java index 9ecda039..b9a0b851 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/MonitorJob.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/MonitorJob.java @@ -67,109 +67,109 @@ public MonitorJob() { */ public void execute(JobExecutionContext context) throws JobExecutionException { - MDQStore store = null; List processing = new ArrayList(); // Get a connection to the database - try { - store = new DatabaseStore(); - } catch (MetadigStoreException e) { - e.printStackTrace(); - JobExecutionException jee = new JobExecutionException("Cannot create store, unable to schedule job", e); - throw jee; - } + try (DatabaseStore store = new DatabaseStore()) { - if (!store.isAvailable()) { + if (!store.isAvailable()) { + try { + store.renew(); + } catch (MetadigStoreException e) { + e.printStackTrace(); + JobExecutionException jee = new JobExecutionException("Cannot renew store, unable to schedule job", + e); + throw jee; + } + } + + // query database try { - store.renew(); + processing = store.listInProcessRuns(); } catch (MetadigStoreException e) { - e.printStackTrace(); - JobExecutionException jee = new JobExecutionException("Cannot renew store, unable to schedule job", e); + JobExecutionException jee = new JobExecutionException( + "Monitor: Error getting in process runs from store.", + e); throw jee; } - } - - // query database - try { - processing = store.listInProcessRuns(); - } catch (MetadigStoreException e) { - JobExecutionException jee = new JobExecutionException("Monitor: Error getting in process runs from store.", - e); - throw jee; - } - - if (processing.isEmpty()) { // if no stuck jobs are found go ahead and exit - log.info("Monitor: No stuck jobs found."); - return; - } - - // get a session - Session session = null; - try { - session = getSession(); - } catch (MetadigException me) { - JobExecutionException jee = new JobExecutionException("Could not connect to a DataONE session." + me); - jee.setRefireImmediately(true); - throw jee; - } - - // request job via rabbitMQ - for (Run run : processing) { - log.info("Requesting monitor job: " + run.getObjectIdentifier() + ", " + run.getNodeId()); - String suiteId = run.getSuiteId(); - String pidStr = run.getObjectIdentifier(); - String nodeId = run.getNodeId(); - InputStream metadata = null; - InputStream sysmeta = null; - - try { - metadata = getMetadata(run, session, store); - } catch (MetadigException me) { - JobExecutionException jee = new JobExecutionException(me); - jee.setRefireImmediately(true); - log.error("Problem getting metadata:" + me.getMessage()); - continue; // the run will be refired immediately, continue to next run - } catch (ConfigurationException ce) { - JobExecutionException jee = new JobExecutionException(ce); - jee.setRefireImmediately(false); - log.error("Configuration error:" + ce.getMessage()); - continue; // the run will NOT be refired immediately, continue to next run + if (processing.isEmpty()) { // if no stuck jobs are found go ahead and exit + log.info("Monitor: No stuck jobs found."); + return; } + // get a session + Session session = null; try { - sysmeta = getSystemMetadata(run, session, store); + session = getSession(); } catch (MetadigException me) { - JobExecutionException jee = new JobExecutionException(me); + JobExecutionException jee = new JobExecutionException("Could not connect to a DataONE session." + me); jee.setRefireImmediately(true); - log.error("Problem getting metadata:" + me.getMessage()); - continue; // the run will be refired immediately, continue to next run - } catch (ConfigurationException ce) { - JobExecutionException jee = new JobExecutionException(ce); - jee.setRefireImmediately(false); - log.error("Configuration error:" + ce.getMessage()); - continue; // the run will NOT be refired immediately, continue to next run + throw jee; } - if (metadata == null | sysmeta == null) { // any case where the metadata or sysmeta should be thrown above - log.error("Monitor: Aborting run - Metadata or system metadata not found for " + pidStr); - continue; - } + // request job via rabbitMQ + for (Run run : processing) { + log.info("Requesting monitor job: " + run.getObjectIdentifier() + ", " + run.getNodeId()); - String localFilePath = null; - DateTime requestDateTime = new DateTime(DateTimeZone.forOffsetHours(-7)); - try { - controller.processQualityRequest(nodeId, pidStr, metadata, suiteId, - localFilePath, requestDateTime, - sysmeta); - } catch (IOException io) { - JobExecutionException jee = new JobExecutionException("Monitor: Error processing quality request."); - jee.initCause(io); - throw jee; - } + String suiteId = run.getSuiteId(); + String pidStr = run.getObjectIdentifier(); + String nodeId = run.getNodeId(); + InputStream metadata = null; + InputStream sysmeta = null; + + try { + metadata = getMetadata(run, session, store); + } catch (MetadigException me) { + JobExecutionException jee = new JobExecutionException(me); + jee.setRefireImmediately(true); + log.error("Problem getting metadata:" + me.getMessage()); + continue; // the run will be refired immediately, continue to next run + } catch (ConfigurationException ce) { + JobExecutionException jee = new JobExecutionException(ce); + jee.setRefireImmediately(false); + log.error("Configuration error:" + ce.getMessage()); + continue; // the run will NOT be refired immediately, continue to next run + } + + try { + sysmeta = getSystemMetadata(run, session, store); + } catch (MetadigException me) { + JobExecutionException jee = new JobExecutionException(me); + jee.setRefireImmediately(true); + log.error("Problem getting metadata:" + me.getMessage()); + continue; // the run will be refired immediately, continue to next run + } catch (ConfigurationException ce) { + JobExecutionException jee = new JobExecutionException(ce); + jee.setRefireImmediately(false); + log.error("Configuration error:" + ce.getMessage()); + continue; // the run will NOT be refired immediately, continue to next run + } + + if (metadata == null | sysmeta == null) { // any case where the metadata or sysmeta should be thrown + // above + log.error("Monitor: Aborting run - Metadata or system metadata not found for " + pidStr); + continue; + } + String localFilePath = null; + DateTime requestDateTime = new DateTime(DateTimeZone.forOffsetHours(-7)); + try { + controller.processQualityRequest(nodeId, pidStr, metadata, suiteId, + localFilePath, requestDateTime, + sysmeta); + } catch (IOException io) { + JobExecutionException jee = new JobExecutionException("Monitor: Error processing quality request."); + jee.initCause(io); + throw jee; + } + + } + } catch (MetadigStoreException e) { + e.printStackTrace(); + JobExecutionException jee = new JobExecutionException("Cannot create store, unable to schedule job", e); + throw jee; } - store.shutdown(); } /** diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java index 5eecc2cd..710c3eb4 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java @@ -50,7 +50,8 @@ public class NodeList implements Job { * the Job. *

* - * @throws JobExecutionException if there is an exception while executing the job. + * @throws JobExecutionException if there is an exception while executing the + * job. */ public void execute(JobExecutionContext context) throws JobExecutionException { @@ -70,7 +71,7 @@ public void execute(JobExecutionContext context) try { MDQconfig cfg = new MDQconfig(); String nodeAbbr = nodeId.replace("urn:node:", ""); - // TODO: Cache the node values from the CN listNode service + // TODO: Cache the node values from the CN listNode service nodeServiceUrl = cfg.getString(nodeAbbr + ".serviceUrl"); } catch (ConfigurationException | IOException ce) { JobExecutionException jee = new JobExecutionException(taskName + ": error executing task."); @@ -102,67 +103,69 @@ public void execute(JobExecutionContext context) } // Get a connection to the database - MDQStore store = null; - try { - store = new DatabaseStore(); - } catch (Exception e) { - e.printStackTrace(); - throw new JobExecutionException(taskName + ": cannot create store, unable to schedule job", e); - } - - if (!store.isAvailable()) { - try { - store.renew(); - } catch (MetadigStoreException e) { - e.printStackTrace(); - throw new JobExecutionException(taskName + ": cannot renew store, unable to schedule job", e); + try (DatabaseStore store = new DatabaseStore()) { + // TODO: consider removing this if block? seems like it will never be hit + if (!store.isAvailable()) { + try { + store.renew(); + } catch (MetadigStoreException e) { + e.printStackTrace(); + throw new JobExecutionException(taskName + ": cannot renew store, unable to schedule job", e); + } } - } - Property property = null; - ArrayList plist = null; - for (Node node : nodeList.getNodeList()) { - log.debug("node: " + node.getName()); - log.debug("type: " + node.getType().toString()); - log.debug("id: " + node.getIdentifier().getValue()); - log.debug("state: " + node.getState().toString()); - log.debug("is synchonized: " + node.isSynchronize()); - - if (! node.isSynchronize()) { - log.debug(taskName + ": Skipping unsynchronized node " + node.getIdentifier().getValue()); - continue; - } else if (node.getType().toString().equalsIgnoreCase("MN")) { - log.debug(taskName + ": saving node " + node.getIdentifier().getValue()); - try { - store.saveNode(node); - } catch (MetadigStoreException mse) { - mse.printStackTrace(); - throw new JobExecutionException("Cannot save node " + node.getIdentifier().getValue() + " to store", mse); + Property property = null; + ArrayList plist = null; + for (Node node : nodeList.getNodeList()) { + log.debug("node: " + node.getName()); + log.debug("type: " + node.getType().toString()); + log.debug("id: " + node.getIdentifier().getValue()); + log.debug("state: " + node.getState().toString()); + log.debug("is synchonized: " + node.isSynchronize()); + + if (!node.isSynchronize()) { + log.debug(taskName + ": Skipping unsynchronized node " + node.getIdentifier().getValue()); + continue; + } else if (node.getType().toString().equalsIgnoreCase("MN")) { + log.debug(taskName + ": saving node " + node.getIdentifier().getValue()); + try { + store.saveNode(node); + } catch (MetadigStoreException mse) { + mse.printStackTrace(); + throw new JobExecutionException( + "Cannot save node " + node.getIdentifier().getValue() + " to store", + mse); + } + } else { + log.debug(taskName + ": skipping CN node: " + node.getIdentifier().getValue()); } - } else { - log.debug(taskName + ": skipping CN node: " + node.getIdentifier().getValue()); } - } - // For debugging purposes: retrieve and print out all node entries if trace logging is enabled. - if (log.isTraceEnabled()) { - log.trace("Retrieving and printing out all saved node harvest dates..."); + // For debugging purposes: retrieve and print out all node entries if trace + // logging is enabled. + if (log.isTraceEnabled()) { + log.trace("Retrieving and printing out all saved node harvest dates..."); - ArrayList nodes = store.getNodes(); - for (Node node : nodes) { - log.trace("identifier: " + node.getIdentifier().getValue()); + ArrayList nodes = store.getNodes(); + for (Node node : nodes) { + log.trace("identifier: " + node.getIdentifier().getValue()); - DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); - dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); - String lastHarvestDatetimeStr = dateFormat.format(node.getSynchronization().getLastHarvested()); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + String lastHarvestDatetimeStr = dateFormat.format(node.getSynchronization().getLastHarvested()); - log.trace("harvest: " + lastHarvestDatetimeStr); - log.trace("synchronize: " + node.isSynchronize()); - log.trace("state: " + node.getState().toString()); - log.trace("baseURL: " + node.getBaseURL()); + log.trace("harvest: " + lastHarvestDatetimeStr); + log.trace("synchronize: " + node.isSynchronize()); + log.trace("state: " + node.getState().toString()); + log.trace("baseURL: " + node.getBaseURL()); + } } + + } catch (Exception e) { + e.printStackTrace(); + throw new JobExecutionException(taskName + ": cannot create store, unable to schedule job", e); } } -} +} diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java index 3a9c3639..9a99209c 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java @@ -55,19 +55,25 @@ public class RequestReportJob implements Job { private Log log = LogFactory.getLog(RequestReportJob.class); class ListResult { - // The total result count for all object types returned from DataONE. This is the count of all object types - // that were retrieved for a given request. The DataONE 'listObjects' service does provide - // parameters to filter by formatId wildcard, so we have to retrieve all pids for a time range + // The total result count for all object types returned from DataONE. This is + // the count of all object types + // that were retrieved for a given request. The DataONE 'listObjects' service + // does provide + // parameters to filter by formatId wildcard, so we have to retrieve all pids + // for a time range // and filter the result list. private Integer totalResultCount = 0; // The filtered result count returned from DataONE. // The DataONE listObjects service returns all new pids for all formatIds - // but we are typically only interested in a subset of those, i.e. EML metadata pids, - // so this is the count of pids from the result that we are actually interested in. + // but we are typically only interested in a subset of those, i.e. EML metadata + // pids, + // so this is the count of pids from the result that we are actually interested + // in. private Integer filteredResultCount = 0; private ArrayList result = new ArrayList<>(); - // The scheduler keeps track of the sysmeta 'dateSystemMetadataModified' of the last pid harvested, + // The scheduler keeps track of the sysmeta 'dateSystemMetadataModified' of the + // last pid harvested, // which will be used as the starting time of the next harvest. private DateTime lastDateModifiedDT = null; @@ -82,14 +88,22 @@ public ArrayList getResult() { void setTotalResultCount(Integer count) { this.totalResultCount = count; } - void setFilteredResultCount(Integer count) { this.filteredResultCount = count; } + + void setFilteredResultCount(Integer count) { + this.filteredResultCount = count; + } + void setLastDateModified(DateTime date) { this.lastDateModifiedDT = date; } - public Integer getTotalResultCount() { return this.totalResultCount; } + public Integer getTotalResultCount() { + return this.totalResultCount; + } - public Integer getFilteredResultCount() { return this.filteredResultCount; } + public Integer getFilteredResultCount() { + return this.filteredResultCount; + } public DateTime getLastDateModified() { return this.lastDateModifiedDT; @@ -119,14 +133,15 @@ public RequestReportJob() { * the Job. *

* - * @throws JobExecutionException if there is an exception while executing the job. + * @throws JobExecutionException if there is an exception while executing the + * job. */ public void execute(JobExecutionContext context) throws JobExecutionException { String qualityServiceUrl = null; - //Log log = LogFactory.getLog(RequestReportJob.class); + // Log log = LogFactory.getLog(RequestReportJob.class); JobKey key = context.getJobDetail().getKey(); JobDataMap dataMap = context.getJobDetail().getJobDataMap(); @@ -151,14 +166,14 @@ public void execute(JobExecutionContext context) qualityServiceUrl = cfg.getString("quality.serviceUrl"); dataOneAuthToken = System.getenv("DATAONE_AUTH_TOKEN"); if (dataOneAuthToken == null) { - dataOneAuthToken = cfg.getString("DataONE.authToken"); + dataOneAuthToken = cfg.getString("DataONE.authToken"); log.debug("Got token from properties file."); } else { log.debug("Got token from env."); } String nodeAbbr = nodeId.replace("urn:node:", ""); subjectId = cfg.getString(nodeAbbr + ".subjectId"); - // TODO: Cache the node values from the CN listNode service + // TODO: Cache the node values from the CN listNode service nodeServiceUrl = cfg.getString(nodeAbbr + ".serviceUrl"); } catch (ConfigurationException | IOException ce) { JobExecutionException jee = new JobExecutionException(taskName + ": error executing task."); @@ -182,233 +197,265 @@ public void execute(JobExecutionContext context) // Don't know node type yet from the id, so have to manually check if it's a CN Boolean isCN = DataONE.isCN(nodeServiceUrl); - if(isCN) { + if (isCN) { cnNode = new MultipartCNode(mrc, nodeServiceUrl, session); } else { mnNode = new MultipartMNode(mrc, nodeServiceUrl, session); } // Get a connection to the database - MDQStore store = null; - try { - store = new DatabaseStore(); - } catch (Exception e) { - e.printStackTrace(); - throw new JobExecutionException("Cannot create store, unable to schedule job", e); - } + try (DatabaseStore store = new DatabaseStore();) { - if(!store.isAvailable()) { - try { - store.renew(); - } catch (MetadigStoreException e) { - e.printStackTrace(); - throw new JobExecutionException("Cannot renew store, unable to schedule job", e); + if (!store.isAvailable()) { + try { + store.renew(); + } catch (MetadigStoreException e) { + e.printStackTrace(); + throw new JobExecutionException("Cannot renew store, unable to schedule job", e); + } } - } - ArrayList nodes = new ArrayList<>(); + ArrayList nodes = new ArrayList<>(); - /* If the CN is being harvested, then get all the nodes in the node db. The node db contains - info about all nodes registered with the CN. - */ - if (isCN) { - nodes = store.getNodes(); - } else { - Node node = store.getNode(nodeId); - if (node.getIdentifier().getValue() == null) { - String msg = ("Node entry not found for node: " + nodeId); - log.error(msg); - JobExecutionException jee = new JobExecutionException(msg); - jee.setRefireImmediately(false); - throw jee; + /* + * If the CN is being harvested, then get all the nodes in the node db. The node + * db contains + * info about all nodes registered with the CN. + */ + if (isCN) { + nodes = store.getNodes(); } else { - log.trace("Got node " + node.getIdentifier().getValue()); - nodes.add(node); + Node node = store.getNode(nodeId); + if (node.getIdentifier().getValue() == null) { + String msg = ("Node entry not found for node: " + nodeId); + log.error(msg); + JobExecutionException jee = new JobExecutionException(msg); + jee.setRefireImmediately(false); + throw jee; + } else { + log.trace("Got node " + node.getIdentifier().getValue()); + nodes.add(node); + } } - } - /* Depending on the scheduled task, either process a single MN or if the task is for the CN, - process all nodes current registered with the CN. - */ - String harvestNodeId = null; - for (Node node : nodes) { + /* + * Depending on the scheduled task, either process a single MN or if the task is + * for the CN, + * process all nodes current registered with the CN. + */ + String harvestNodeId = null; + for (Node node : nodes) { + + harvestNodeId = node.getIdentifier().getValue(); + // If processing a CN, check each MN to see if it is being synchronized and if + // it + // is marked as up. + if (isCN) { + // The NodeList task doesn't save CN entries from the DataONE 'listNodes()' + // service, but check + // just in case. + if (node.getType().equals(NodeType.CN)) { + log.debug("Harvesting from CN, skipping CN entry from node list for " + + node.getIdentifier().getValue()); + continue; + } - harvestNodeId = node.getIdentifier().getValue(); - // If processing a CN, check each MN to see if it is being synchronized and if it - // is marked as up. - if (isCN) { - // The NodeList task doesn't save CN entries from the DataONE 'listNodes()' service, but check - // just in case. - if (node.getType().equals(NodeType.CN)) { - log.debug("Harvesting from CN, skipping CN entry from node list for " + node.getIdentifier().getValue()); - continue; - } + // Skip MN entries that have not been synchronized + if (!node.isSynchronize() || !node.getState().equals(NodeState.UP)) { + log.trace("Skipping disabled node: " + node.getIdentifier().getValue() + ", sync: " + + node.isSynchronize() + + ", status: " + node.getState().toString()); + continue; + } - // Skip MN entries that have not been synchronized - if (! node.isSynchronize() || ! node.getState().equals(NodeState.UP)) { - log.trace("Skipping disabled node: " + node.getIdentifier().getValue() + ", sync: " + node.isSynchronize() - + ", status: " + node.getState().toString()); - continue; - } + DateTime mnLastHarvestDT = new DateTime(node.getSynchronization().getLastHarvested(), + DateTimeZone.UTC); + DateTime oneMonthAgoDT = new DateTime(DateTimeZone.UTC).minusMonths(1); + + /* + * If an MN hasn't been harvested for a month, then skip it - we don't want to + * waste time contacting MNs that + * don't have new content. + */ + if (mnLastHarvestDT.isBefore(oneMonthAgoDT.toInstant())) { + DateTimeZone.setDefault(DateTimeZone.UTC); + DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + log.trace("Skipping node " + node.getIdentifier().getValue() + " that hasn't been sync'd since " + + dtfOut.print(mnLastHarvestDT)); + continue; + } - DateTime mnLastHarvestDT = new DateTime(node.getSynchronization().getLastHarvested(), DateTimeZone.UTC); - DateTime oneMonthAgoDT = new DateTime(DateTimeZone.UTC).minusMonths(1); - - /* If an MN hasn't been harvested for a month, then skip it - we don't want to waste time contacting MNs that - don't have new content. - */ - if (mnLastHarvestDT.isBefore(oneMonthAgoDT.toInstant())) { - DateTimeZone.setDefault(DateTimeZone.UTC); - DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); - log.trace("Skipping node " + node.getIdentifier().getValue() + " that hasn't been sync'd since " + dtfOut.print(mnLastHarvestDT)); - continue; } - } - - log.trace("Harvesting node: " + node.getIdentifier().getValue()); - - // Set UTC as the default time zone for all DateTime operations. - // Get current datetime, which may be used for start time range. - DateTimeZone.setDefault(DateTimeZone.UTC); - DateTime currentDT = new DateTime(DateTimeZone.UTC); - DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); - String lastHarvestDateStr = null; - - Task task; - task = store.getTask(taskName, taskType, harvestNodeId); - // If a 'task' entry has not been saved for this task name yet (i.e. this is an MN that has just been - // registerd with DataONE), then a 'lastHarvested' DataTime will not be available, in which case the - // 'startHarvestDataTime' from the config file will be used. - if (task.getLastHarvestDatetime(harvestNodeId) == null) { - task.setTaskName(taskName); - task.setTaskType(taskType); - lastHarvestDateStr = startHarvestDatetimeStr; - task.setLastHarvestDatetime(lastHarvestDateStr, harvestNodeId); - } else { - lastHarvestDateStr = task.getLastHarvestDatetime(harvestNodeId); - } - - DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr); - // Set the search start datetime to the last harvest datetime, unless it is in the - // future. (This can happen when the previous time range end was for the current day, - // as the end datetime range for the previous task run will have been stored as the - // new lastharvestDateTime. - DateTime startDT = null; - if (lastHarvestDateDT.isAfter(currentDT.toInstant())) { - startDT = currentDT; - } else { - startDT = new DateTime(lastHarvestDateDT); - } + log.trace("Harvesting node: " + node.getIdentifier().getValue()); + + // Set UTC as the default time zone for all DateTime operations. + // Get current datetime, which may be used for start time range. + DateTimeZone.setDefault(DateTimeZone.UTC); + DateTime currentDT = new DateTime(DateTimeZone.UTC); + DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + String lastHarvestDateStr = null; + + Task task; + task = store.getTask(taskName, taskType, harvestNodeId); + // If a 'task' entry has not been saved for this task name yet (i.e. this is an + // MN that has just been + // registerd with DataONE), then a 'lastHarvested' DataTime will not be + // available, in which case the + // 'startHarvestDataTime' from the config file will be used. + if (task.getLastHarvestDatetime(harvestNodeId) == null) { + task.setTaskName(taskName); + task.setTaskType(taskType); + lastHarvestDateStr = startHarvestDatetimeStr; + task.setLastHarvestDatetime(lastHarvestDateStr, harvestNodeId); + } else { + lastHarvestDateStr = task.getLastHarvestDatetime(harvestNodeId); + } - DateTime endDT = new DateTime(currentDT); + DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr); + // Set the search start datetime to the last harvest datetime, unless it is in + // the + // future. (This can happen when the previous time range end was for the current + // day, + // as the end datetime range for the previous task run will have been stored as + // the + // new lastharvestDateTime. + DateTime startDT = null; + if (lastHarvestDateDT.isAfter(currentDT.toInstant())) { + startDT = currentDT; + } else { + startDT = new DateTime(lastHarvestDateDT); + } - // If the start and end harvest dates are the same (happens for a new node), then - // tweak the start so that DataONE listObjects doesn't complain. - if (startDT == endDT) { - startDT = startDT.minusMinutes(1); - } + DateTime endDT = new DateTime(currentDT); - // Track the sysmeta dateUploaded of the latest harvested pid. This will become the starting time of - // the next harvest. - DateTime lastDateModifiedDT = startDT; + // If the start and end harvest dates are the same (happens for a new node), + // then + // tweak the start so that DataONE listObjects doesn't complain. + if (startDT == endDT) { + startDT = startDT.minusMinutes(1); + } - String startDTstr = dtfOut.print(startDT); - String endDTstr = dtfOut.print(endDT); + // Track the sysmeta dateUploaded of the latest harvested pid. This will become + // the starting time of + // the next harvest. + DateTime lastDateModifiedDT = startDT; - log.trace("start time: " + startDTstr); + String startDTstr = dtfOut.print(startDT); + String endDTstr = dtfOut.print(endDT); - Integer startCount = new Integer(0); - ListResult result = null; - Integer totalResultCount = 0; - Integer filteredResultCount = 0; - Integer allPidsCnt = 0; + log.trace("start time: " + startDTstr); - log.trace("Getting pids for nodeId: " + harvestNodeId); - boolean morePids = true; - while (morePids) { - ArrayList pidsToProcess = null; - try { - result = getPidsToProcess(cnNode, mnNode, isCN, session, suiteId, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT, harvestNodeId, taskName); - pidsToProcess = result.getResult(); - totalResultCount = result.getTotalResultCount(); - filteredResultCount = result.getFilteredResultCount(); - lastDateModifiedDT = result.getLastDateModified(); - } catch (Exception e) { - JobExecutionException jee = new JobExecutionException("Unable to get pids to process", e); - jee.setRefireImmediately(false); - throw jee; - } + Integer startCount = new Integer(0); + ListResult result = null; + Integer totalResultCount = 0; + Integer filteredResultCount = 0; + Integer allPidsCnt = 0; - allPidsCnt += pidsToProcess.size(); - for (String pidStr : pidsToProcess) { + log.trace("Getting pids for nodeId: " + harvestNodeId); + boolean morePids = true; + while (morePids) { + ArrayList pidsToProcess = null; try { - log.debug(taskName + ": submitting pid: " + pidStr); - submitReportRequest(cnNode, mnNode, isCN, session, qualityServiceUrl, pidStr, suiteId); - } catch (org.dataone.service.exceptions.NotFound nfe) { - log.error("Unable to process pid: " + pidStr + nfe.getMessage()); - continue; + result = getPidsToProcess(cnNode, mnNode, isCN, session, suiteId, pidFilter, startDTstr, + endDTstr, startCount, countRequested, lastDateModifiedDT, harvestNodeId, taskName); + pidsToProcess = result.getResult(); + totalResultCount = result.getTotalResultCount(); + filteredResultCount = result.getFilteredResultCount(); + lastDateModifiedDT = result.getLastDateModified(); } catch (Exception e) { - log.error("Unable to process pid: " + pidStr + " - " + e.getMessage()); - continue; + JobExecutionException jee = new JobExecutionException("Unable to get pids to process", e); + jee.setRefireImmediately(false); + throw jee; } - } - // Check if DataONE returned the max number of results. If so, we have to request more by paging through - // the results returned pidsToProcess (i.e. DataONE listObjects service). If the returned result is - // less than the requested result, then all pids have been retrieved. - if (totalResultCount >= countRequested) { - morePids = true; - startCount = startCount + totalResultCount; - log.trace("Paging through more results, current start is " + startCount); - } else { - morePids = false; + allPidsCnt += pidsToProcess.size(); + for (String pidStr : pidsToProcess) { + try { + log.debug(taskName + ": submitting pid: " + pidStr); + submitReportRequest(cnNode, mnNode, isCN, session, qualityServiceUrl, pidStr, suiteId); + } catch (org.dataone.service.exceptions.NotFound nfe) { + log.error("Unable to process pid: " + pidStr + nfe.getMessage()); + continue; + } catch (Exception e) { + log.error("Unable to process pid: " + pidStr + " - " + e.getMessage()); + continue; + } + } + + // Check if DataONE returned the max number of results. If so, we have to + // request more by paging through + // the results returned pidsToProcess (i.e. DataONE listObjects service). If the + // returned result is + // less than the requested result, then all pids have been retrieved. + if (totalResultCount >= countRequested) { + morePids = true; + startCount = startCount + totalResultCount; + log.trace("Paging through more results, current start is " + startCount); + } else { + morePids = false; + } } - } - // Don't update the lastHarvestDateDT if no pids were found. - if (allPidsCnt > 0) { - // Add a millisecond to the last modified datetime, as this date will be used for the next scheduled - // harvest, and we don't want to re-harvest this same pid again. Note that the DataONE object service - // (get) does harvest based on requested milliseonds. - task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT.plusMillis(1)), harvestNodeId); - log.trace("Saving lastHarvestDate: " + dtfOut.print(lastDateModifiedDT.plusMillis(1)) + " for node: " + harvestNodeId); - try { - store.saveTask(task, harvestNodeId); - } catch (MetadigStoreException mse) { - log.error("Error saving task: " + task.getTaskName()); - JobExecutionException jee = new JobExecutionException("Unable to save new harvest date", mse); - jee.setRefireImmediately(false); - throw jee; + // Don't update the lastHarvestDateDT if no pids were found. + if (allPidsCnt > 0) { + // Add a millisecond to the last modified datetime, as this date will be used + // for the next scheduled + // harvest, and we don't want to re-harvest this same pid again. Note that the + // DataONE object service + // (get) does harvest based on requested milliseonds. + task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT.plusMillis(1)), harvestNodeId); + log.trace("Saving lastHarvestDate: " + dtfOut.print(lastDateModifiedDT.plusMillis(1)) + + " for node: " + harvestNodeId); + try { + store.saveTask(task, harvestNodeId); + } catch (MetadigStoreException mse) { + log.error("Error saving task: " + task.getTaskName()); + JobExecutionException jee = new JobExecutionException("Unable to save new harvest date", mse); + jee.setRefireImmediately(false); + throw jee; + } + log.info(taskName + ": found " + allPidsCnt + " pids for nodeId: " + harvestNodeId + ", start: " + + startDTstr + ", end: " + endDTstr + ", servierUrl: " + nodeServiceUrl); } - log.info(taskName + ": found " + allPidsCnt + " pids for nodeId: " + harvestNodeId + ", start: " + startDTstr + ", end: " + endDTstr + ", servierUrl: " + nodeServiceUrl); } + } catch (Exception e) { + e.printStackTrace(); + throw new JobExecutionException("Cannot create store, unable to schedule job", e); } - store.shutdown(); + } /** - * Query a DataONE CN or MN to obtain a list of persistent identifiers (pids) for metadata objects have been + * Query a DataONE CN or MN to obtain a list of persistent identifiers (pids) + * for metadata objects have been * added to the system during a specific time period. - * @param cnNode a DataONE CN connection client object - * @param mnNode a DataONE MN connection client object - * @param isCN a logical indicating whether a CN of MN object is being used - * @param session a DataONE authentication session - * @param suiteId the quality suite to check (if this pids has already been processed) - * @param pidFilter the DataONE format identifies to filter for + * + * @param cnNode a DataONE CN connection client object + * @param mnNode a DataONE MN connection client object + * @param isCN a logical indicating whether a CN of MN object + * is being used + * @param session a DataONE authentication session + * @param suiteId the quality suite to check (if this pids has + * already been processed) + * @param pidFilter the DataONE format identifies to filter for * @param startHarvestDatetimeStr the starting date to harvest pids from - * @param endHarvestDatetimeStr the ending data to harvest pids from - * @param startCount the start count for paging results from DataONE, for large results - * @param countRequested the number of items to get from DataONE on each request - * @param lastDateModifiedDT the sysmeta 'dateSystemMetadataModified' value of the last harvested pid - * @param nodeIdFilter filter results for this nodeId (applies only to CN) + * @param endHarvestDatetimeStr the ending data to harvest pids from + * @param startCount the start count for paging results from + * DataONE, for large results + * @param countRequested the number of items to get from DataONE on + * each request + * @param lastDateModifiedDT the sysmeta 'dateSystemMetadataModified' value + * of the last harvested pid + * @param nodeIdFilter filter results for this nodeId (applies only + * to CN) * @throws Exception if there is an exception while executing the job. * @return a ListResult object containing the matching pids */ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, Boolean isCN, Session session, - String suiteId, String pidFilter, String startHarvestDatetimeStr, - String endHarvestDatetimeStr, int startCount, - int countRequested, DateTime lastDateModifiedDT, String nodeIdFilter, String taskName) throws Exception { + String suiteId, String pidFilter, String startHarvestDatetimeStr, + String endHarvestDatetimeStr, int startCount, + int countRequested, DateTime lastDateModifiedDT, String nodeIdFilter, String taskName) throws Exception { ArrayList pids = new ArrayList(); InputStream qis = null; @@ -419,7 +466,8 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, Identifier identifier = null; Boolean replicaStatus = false; - // Do some back-flips to convert the start and end date to the ancient Java 'Date' type that is + // Do some back-flips to convert the start and end date to the ancient Java + // 'Date' type that is // used by DataONE 'listObjects()'. ZonedDateTime zdt = ZonedDateTime.parse(startHarvestDatetimeStr); // start date milliseconds since the epoch date "midnight, January 1, 1970 UTC" @@ -431,18 +479,23 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, Date endDate = new Date(msSinceEpoch); try { - // Even though MultipartMNode and MultipartCNode have the same parent class D1Node, the interface for D1Node doesn't - // include listObjects, as the parameters differ from CN to MN, so we have to use a different object for each. - if(isCN) { + // Even though MultipartMNode and MultipartCNode have the same parent class + // D1Node, the interface for D1Node doesn't + // include listObjects, as the parameters differ from CN to MN, so we have to + // use a different object for each. + if (isCN) { log.trace("Getting pids for cn, for nodeid: " + nodeIdFilter); nodeRef = new NodeReference(); nodeRef.setValue(nodeIdFilter); - objList = cnNode.listObjects(session, startDate, endDate, formatId, nodeRef, identifier, startCount, countRequested); + objList = cnNode.listObjects(session, startDate, endDate, formatId, nodeRef, identifier, startCount, + countRequested); } else { log.trace("Getting pids for mn"); - objList = mnNode.listObjects(session, startDate, endDate, formatId, identifier, replicaStatus, startCount, countRequested); + objList = mnNode.listObjects(session, startDate, endDate, formatId, identifier, replicaStatus, + startCount, countRequested); } - //log.info("Got " + objList.getCount() + " pids for format: " + formatId.getValue() + " pids."); + // log.info("Got " + objList.getCount() + " pids for format: " + + // formatId.getValue() + " pids."); } catch (Exception e) { log.error(taskName + ": error retrieving pids: " + e.getMessage()); throw e; @@ -454,48 +507,55 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, DateTime thisDateModifiedDT; if (objList.getCount() > 0) { - for(ObjectInfo oi: objList.getObjectInfoList()) { + for (ObjectInfo oi : objList.getObjectInfoList()) { thisFormatId = oi.getFormatId().getValue(); thisPid = oi.getIdentifier().getValue(); log.trace("Checking pid: " + thisPid + ", format: " + thisFormatId); - // Check all pid filters to see if this pids's format was found in the list of desired formats. + // Check all pid filters to see if this pids's format was found in the list of + // desired formats. // There could be multiple wildcard filters, which are separated by ','. - String [] filters = pidFilter.split("\\|"); + String[] filters = pidFilter.split("\\|"); Boolean found = false; - for(String thisFilter:filters) { - if(thisFormatId.matches(thisFilter)) { + for (String thisFilter : filters) { + if (thisFormatId.matches(thisFilter)) { found = true; continue; } } - // Always re-create a report, even if it exists for a pid, as the sysmeta could have - // been updated (i.e. obsoletedBy, access) and the quality report and index contain + // Always re-create a report, even if it exists for a pid, as the sysmeta could + // have + // been updated (i.e. obsoletedBy, access) and the quality report and index + // contain // sysmeta fields. - if(found) { - // if (!runExists(thisPid, suiteId, store)) { + if (found) { + // if (!runExists(thisPid, suiteId, store)) { pidCount = pidCount++; pids.add(thisPid); log.trace("adding pid " + thisPid + ", formatId: " + thisFormatId); - // If this pid's modified date is after the stored latest encountered modified date, then update + // If this pid's modified date is after the stored latest encountered modified + // date, then update // the lastModified date thisDateModifiedDT = new DateTime(oi.getDateSysMetadataModified()); - // Add a millisecond to lastDateModfiedDT so that this pid won't be harvested again (in the event + // Add a millisecond to lastDateModfiedDT so that this pid won't be harvested + // again (in the event // that this is the last pid to be harvested in this round. if (thisDateModifiedDT.isAfter(lastDateModifiedDT)) { lastDateModifiedDT = thisDateModifiedDT.plusMillis(1); log.debug("New value for lastDateMoidifed: " + lastDateModifiedDT.toString()); } - // } + // } } } } ListResult result = new ListResult(); - // Set the count for the number of desired pids filtered from the total result set + // Set the count for the number of desired pids filtered from the total result + // set result.setFilteredResultCount(pidCount); - // Set the count for the total number of pids returned from DataONE (all formatIds) for this query + // Set the count for the total number of pids returned from DataONE (all + // formatIds) for this query result.setTotalResultCount(objList.getCount()); result.setResult(pids); // Return the sysmeta 'dateSystemMetadataModified' of the last pid harvested. @@ -504,7 +564,6 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, return result; } - /** * Check if the specified quality suite has already been run for a pid. *

@@ -515,18 +574,20 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, * the system is stored in the run object, and this run object is * parsed when the run is inserted into the Solr index. *

- * @param pid the pid to check + * + * @param pid the pid to check * @param suiteId the suite identifier to check (e.g. "FAIR-suite-0.3.1") - * @param store the DataStore object to send the check request to. + * @param store the DataStore object to send the check request to. * @throws MetadigStoreException * */ - public boolean runExists(String pid, String suiteId, MDQStore store, Date dateSystemMetadataModified) throws MetadigStoreException { + public boolean runExists(String pid, String suiteId, MDQStore store, Date dateSystemMetadataModified) + throws MetadigStoreException { boolean found = false; Date runDateSystemMetadataModified = null; - if(!store.isAvailable()) { + if (!store.isAvailable()) { try { store.renew(); } catch (MetadigStoreException e) { @@ -536,7 +597,7 @@ public boolean runExists(String pid, String suiteId, MDQStore store, Date dateSy } Run run = store.getRun(pid, suiteId); - if(run != null) { + if (run != null) { found = true; } else { found = false; @@ -546,22 +607,24 @@ public boolean runExists(String pid, String suiteId, MDQStore store, Date dateSy } /** - * Submit a request to the metadig controller to run a quality suite for the specified pid. + * Submit a request to the metadig controller to run a quality suite for the + * specified pid. *

- * The system metadata for a pid is also obtained and sent with the request + * The system metadata for a pid is also obtained and sent with the request *

* - * @param cnNode a DataONE CN connection client object - * @param mnNode a DataONE MN connection client object - * @param isCN a logical indicating whether a CN of MN object - * @param session a DataONE authentication session + * @param cnNode a DataONE CN connection client object + * @param mnNode a DataONE MN connection client object + * @param isCN a logical indicating whether a CN of MN object + * @param session a DataONE authentication session * @param qualityServiceUrl the URL of the MetaDIG quality service - * @param pidStr the pid to submit the request for - * @param suiteId the suite identifier to submit the request for + * @param pidStr the pid to submit the request for + * @param suiteId the suite identifier to submit the request for * * @throws Exception */ - public void submitReportRequest(MultipartCNode cnNode, MultipartMNode mnNode, Boolean isCN, Session session, String qualityServiceUrl, String pidStr, String suiteId) throws Exception { + public void submitReportRequest(MultipartCNode cnNode, MultipartMNode mnNode, Boolean isCN, Session session, + String qualityServiceUrl, String pidStr, String suiteId) throws Exception { SystemMetadata sysmeta = null; InputStream objectIS = null; @@ -580,22 +643,24 @@ public void submitReportRequest(MultipartCNode cnNode, MultipartMNode mnNode, Bo log.error("Not authorized to read sysmeta for pid: " + pid.getValue() + ", continuing with next pid..."); return; } catch (Exception e) { - throw(e); + throw (e); } try { - if(isCN) { + if (isCN) { objectIS = cnNode.get(session, pid); - } else { + } else { objectIS = mnNode.get(session, pid); } log.trace("Retrieved metadata object for pid: " + pidStr); } catch (NotAuthorized na) { - log.error("Not authorized to read pid: " + pid + ", unable to retrieve metadata, continuing with next pid..."); + log.error("Not authorized to read pid: " + pid + + ", unable to retrieve metadata, continuing with next pid..."); return; } - // quality suite service url, i.e. "http://docke-ucsb-1.dataone.org:30433/quality/suites/knb.suite.1/run + // quality suite service url, i.e. + // "http://docke-ucsb-1.dataone.org:30433/quality/suites/knb.suite.1/run qualityServiceUrl = qualityServiceUrl + "/suites/" + suiteId + "/run"; HttpPost post = new HttpPost(qualityServiceUrl); diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java index 7e7e933f..c3759a54 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java @@ -75,7 +75,9 @@ void setLastDateModified(DateTime date) { this.lastDateModifiedDT = date; } - public DateTime getLastDateModified() { return this.lastDateModifiedDT; } + public DateTime getLastDateModified() { + return this.lastDateModifiedDT; + } } // Since Quartz will re-instantiate a class every time it @@ -101,18 +103,23 @@ public RequestScorerJob() { * the Job. *

*

- * This method sends a request from the scheduler (metadig-scheduler) to the controller (metadig-controller) - * to execute a 'scorer' request (metadig-scorer). This request goes through the controller so that it can - * keep track and log all requests and their completion status. The current method to send requests to the - * controller is to send a REST request to the servlet running the controller, using the metadig-engine API. + * This method sends a request from the scheduler (metadig-scheduler) to the + * controller (metadig-controller) + * to execute a 'scorer' request (metadig-scorer). This request goes through the + * controller so that it can + * keep track and log all requests and their completion status. The current + * method to send requests to the + * controller is to send a REST request to the servlet running the controller, + * using the metadig-engine API. *

* - * @throws JobExecutionException if there is an exception while executing the job. + * @throws JobExecutionException if there is an exception while executing the + * job. */ public void execute(JobExecutionContext context) throws JobExecutionException { - String qualityServiceUrl = null; + String qualityServiceUrl = null; MDQconfig cfg = null; JobKey key = context.getJobDetail().getKey(); @@ -122,12 +129,15 @@ public void execute(JobExecutionContext context) String taskType = dataMap.getString("taskType"); String pidFilter = dataMap.getString("pidFilter"); String suiteId = dataMap.getString("suiteId"); - // The nodeId is used for filterine queries based on DataONE sysmeta 'datasource'. - // For example, if one wished to get scores for Arctic Data Center, the urn:node:ARCTIC would be specified. + // The nodeId is used for filterine queries based on DataONE sysmeta + // 'datasource'. + // For example, if one wished to get scores for Arctic Data Center, the + // urn:node:ARCTIC would be specified. String nodeId = dataMap.getString("nodeId"); String startHarvestDatetimeStr = dataMap.getString("startHarvestDatetime"); int harvestDatetimeInc = dataMap.getInt("harvestDatetimeInc"); - // Number of pids to get each query (this number of pids will be fetched each query until all pids are obtained) + // Number of pids to get each query (this number of pids will be fetched each + // query until all pids are obtained) int countRequested = dataMap.getInt("countRequested"); String requestType = null; String formatFamily = null; @@ -152,18 +162,19 @@ public void execute(JobExecutionContext context) dataOneAuthToken = System.getenv("DATAONE_AUTH_TOKEN"); if (dataOneAuthToken == null) { - dataOneAuthToken = cfg.getString("DataONE.authToken"); + dataOneAuthToken = cfg.getString("DataONE.authToken"); log.debug("Got token from properties file."); } else { log.debug("Got token from env."); } } catch (ConfigurationException | IOException ce) { - JobExecutionException jee = new JobExecutionException(taskName + ": Error executing task: " + ce.getMessage()); + JobExecutionException jee = new JobExecutionException( + taskName + ": Error executing task: " + ce.getMessage()); jee.initCause(ce); throw jee; } - if(nodeServiceUrl == null) { + if (nodeServiceUrl == null) { String msg = taskName + "Unable to read serviceUrl from config file for: " + nodeId; throw new JobExecutionException(msg); } @@ -175,181 +186,205 @@ public void execute(JobExecutionContext context) d1Node = DataONE.getMultipartD1Node(session, nodeServiceUrl); } catch (MetadigException mpe) { mpe.printStackTrace(); - throw new JobExecutionException(taskName + ": unable to create connection to service URL " + nodeServiceUrl , mpe); + throw new JobExecutionException(taskName + ": unable to create connection to service URL " + nodeServiceUrl, + mpe); } - MDQStore store = null; - // Get stored task info from the last task execution - try { - store = new DatabaseStore(); - } catch (Exception e) { - e.printStackTrace(); - throw new JobExecutionException("Cannot create store, unable to schedule job", e); - } - - if(!store.isAvailable()) { - try { - store.renew(); - } catch (MetadigStoreException e) { - e.printStackTrace(); - throw new JobExecutionException("Cannot renew store, unable to schedule job", e); + try (DatabaseStore store = new DatabaseStore()) { + // TODO: consider removing this block not sure if it's ever hit + if (!store.isAvailable()) { + try { + store.renew(); + } catch (MetadigStoreException e) { + e.printStackTrace(); + throw new JobExecutionException("Cannot renew store, unable to schedule job", e); + } } - } - // Set UTC as the default time zone for all DateTime operations. - // Get current datetime, which may be used for start time range. - DateTimeZone.setDefault(DateTimeZone.UTC); - DateTime currentDT = new DateTime(DateTimeZone.UTC); - DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); - String lastHarvestDateStr = null; - - Task task; - task = store.getTask(taskName, taskType, nodeId); - - // If a 'task' entry has not been saved for this task name yet, then a 'lastHarvested' - // DataTime will not be available, in which case the 'startHarvestDataTime' from the - // config file will be used. - if(task.getLastHarvestDatetime(nodeId) == null) { - task = new Task(); - task.setTaskName(taskName); - task.setTaskType(taskType); - lastHarvestDateStr = startHarvestDatetimeStr; - task.setLastHarvestDatetime(lastHarvestDateStr, nodeId); - } else { - lastHarvestDateStr = task.getLastHarvestDatetime(nodeId); - } + // Set UTC as the default time zone for all DateTime operations. + // Get current datetime, which may be used for start time range. + DateTimeZone.setDefault(DateTimeZone.UTC); + DateTime currentDT = new DateTime(DateTimeZone.UTC); + DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + String lastHarvestDateStr = null; + + Task task; + task = store.getTask(taskName, taskType, nodeId); + + // If a 'task' entry has not been saved for this task name yet, then a + // 'lastHarvested' + // DataTime will not be available, in which case the 'startHarvestDataTime' from + // the + // config file will be used. + if (task.getLastHarvestDatetime(nodeId) == null) { + task = new Task(); + task.setTaskName(taskName); + task.setTaskType(taskType); + lastHarvestDateStr = startHarvestDatetimeStr; + task.setLastHarvestDatetime(lastHarvestDateStr, nodeId); + } else { + lastHarvestDateStr = task.getLastHarvestDatetime(nodeId); + } - DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr); - // Set the search start datetime to the last harvest datetime, unless it is in the - // future. (This can happen when the previous time range end was for the current day, - // as the end datetime range for the previous task run will have been stored as the - // new lastharvestDateTime. - DateTime startDT = null; - if(lastHarvestDateDT.isAfter(currentDT.toInstant())) { - startDT = currentDT; - } else { - startDT = new DateTime(lastHarvestDateDT); - } + DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr); + // Set the search start datetime to the last harvest datetime, unless it is in + // the + // future. (This can happen when the previous time range end was for the current + // day, + // as the end datetime range for the previous task run will have been stored as + // the + // new lastharvestDateTime. + DateTime startDT = null; + if (lastHarvestDateDT.isAfter(currentDT.toInstant())) { + startDT = currentDT; + } else { + startDT = new DateTime(lastHarvestDateDT); + } - DateTime endDT = new DateTime(currentDT); + DateTime endDT = new DateTime(currentDT); - // If the start and end harvest dates are the same (happends for a new node), then - // tweek the start so that DataONE listObjects doesn't complain. - if(startDT == endDT ) { - startDT = startDT.minusMinutes(1); - } + // If the start and end harvest dates are the same (happends for a new node), + // then + // tweek the start so that DataONE listObjects doesn't complain. + if (startDT == endDT) { + startDT = startDT.minusMinutes(1); + } - // Track the sysmeta dateUploaded of the latest harvested pid. This will become the starting time of - // the next harvest. - DateTime lastDateModifiedDT = startDT; + // Track the sysmeta dateUploaded of the latest harvested pid. This will become + // the starting time of + // the next harvest. + DateTime lastDateModifiedDT = startDT; - String startDTstr = dtfOut.print(startDT); - String endDTstr = dtfOut.print(endDT); + String startDTstr = dtfOut.print(startDT); + String endDTstr = dtfOut.print(endDT); - int startCount = 0; - RequestScorerJob.ListResult result = null; - Integer resultCount = 0; - - // Two types of score requests can be processed - a "node" request that will get score info for an - // entire repository (e.g. urn:node:ARCTIC) or a "portal" request that will get scores for a - // specific portal (from the Solr portal entry collectionQuery). - if(requestType != null && requestType.equalsIgnoreCase("node")) { - try { - // For a 'node' scores request, the 'collection' is the entire node, so specify - // the nodeId as the collectionid. It is not necessary to retrieve a collectionQuery for this - // 'node' portal, as there is no Solr entry for this type collection. All quality scores available - // in the quality Solr server will be directly retrieved, filtering on the 'nodeId' (datasource) - log.info("TaskName: " + taskName + ", taskType: " + taskType + " submitting node request for nodeId: " - + nodeId + ", suiteId: " + suiteId + "formatFamily: " + formatFamily); - submitScorerRequest(qualityServiceUrl, nodeId, suiteId, nodeId, formatFamily); - } catch (Exception e) { - JobExecutionException jee = new JobExecutionException("Unable to submit request to create new node (" - + nodeId + ")" + " score graph/data file ", e); - jee.setRefireImmediately(false); - throw jee; - } - } else { - Integer allIds = 0; - boolean morePids = true; - while (morePids) { - // Get a list of pids selected by a collection (portal) search filter (collectionQuery) and get - // the quality scores (from the quality Solr server) for that list of pids. - ArrayList pidsToProcess = null; - log.trace("Getting portal pids to process, startCount: " + startCount + ", countRequested: " + countRequested); + int startCount = 0; + RequestScorerJob.ListResult result = null; + Integer resultCount = 0; + // Two types of score requests can be processed - a "node" request that will get + // score info for an + // entire repository (e.g. urn:node:ARCTIC) or a "portal" request that will get + // scores for a + // specific portal (from the Solr portal entry collectionQuery). + if (requestType != null && requestType.equalsIgnoreCase("node")) { try { - result = getPidsToProcess(d1Node, session, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT, taskName); - pidsToProcess = result.getResult(); - resultCount = result.getResultCount(); - lastDateModifiedDT = result.getLastDateModified(); + // For a 'node' scores request, the 'collection' is the entire node, so specify + // the nodeId as the collectionid. It is not necessary to retrieve a + // collectionQuery for this + // 'node' portal, as there is no Solr entry for this type collection. All + // quality scores available + // in the quality Solr server will be directly retrieved, filtering on the + // 'nodeId' (datasource) + log.info("TaskName: " + taskName + ", taskType: " + taskType + + " submitting node request for nodeId: " + + nodeId + ", suiteId: " + suiteId + "formatFamily: " + formatFamily); + submitScorerRequest(qualityServiceUrl, nodeId, suiteId, nodeId, formatFamily); } catch (Exception e) { - JobExecutionException jee = new JobExecutionException("Unable to get pids to process", e); + JobExecutionException jee = new JobExecutionException( + "Unable to submit request to create new node (" + + nodeId + ")" + " score graph/data file ", + e); jee.setRefireImmediately(false); throw jee; } + } else { + Integer allIds = 0; + boolean morePids = true; + while (morePids) { + // Get a list of pids selected by a collection (portal) search filter + // (collectionQuery) and get + // the quality scores (from the quality Solr server) for that list of pids. + ArrayList pidsToProcess = null; + log.trace("Getting portal pids to process, startCount: " + startCount + ", countRequested: " + + countRequested); - log.trace(taskName + ": found " + resultCount + " seriesIds" + " for date: " + startDTstr + " at servierUrl: " + nodeServiceUrl); - for (String pidStr : pidsToProcess) { try { - log.debug(taskName + ": submitting seriesId: " + pidStr); - submitScorerRequest(qualityServiceUrl, pidStr, suiteId, nodeId, formatFamily); + result = getPidsToProcess(d1Node, session, pidFilter, startDTstr, endDTstr, startCount, + countRequested, lastDateModifiedDT, taskName); + pidsToProcess = result.getResult(); + resultCount = result.getResultCount(); + lastDateModifiedDT = result.getLastDateModified(); } catch (Exception e) { - JobExecutionException jee = new JobExecutionException("Unable to submit request to create new score graph/data file", e); + JobExecutionException jee = new JobExecutionException("Unable to get pids to process", e); jee.setRefireImmediately(false); throw jee; } - } - // Check if DataONE returned the max number of results. If so, we have to request more by paging through - // the results. - allIds += pidsToProcess.size(); - if (resultCount >= countRequested) { - morePids = true; - startCount = startCount + resultCount; - log.trace("Paging through more results, current start is " + startCount); - } else { - morePids = false; + log.trace(taskName + ": found " + resultCount + " seriesIds" + " for date: " + startDTstr + + " at servierUrl: " + nodeServiceUrl); + for (String pidStr : pidsToProcess) { + try { + log.debug(taskName + ": submitting seriesId: " + pidStr); + submitScorerRequest(qualityServiceUrl, pidStr, suiteId, nodeId, formatFamily); + } catch (Exception e) { + JobExecutionException jee = new JobExecutionException( + "Unable to submit request to create new score graph/data file", e); + jee.setRefireImmediately(false); + throw jee; + } + } + + // Check if DataONE returned the max number of results. If so, we have to + // request more by paging through + // the results. + allIds += pidsToProcess.size(); + if (resultCount >= countRequested) { + morePids = true; + startCount = startCount + resultCount; + log.trace("Paging through more results, current start is " + startCount); + } else { + morePids = false; + } } - } - if (allIds > 0) { - // Record the new "last harvested" date - task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT), nodeId); - log.debug("Saving lastHarvestDate: " + dtfOut.print(lastDateModifiedDT)); - try { - store.saveTask(task, nodeId); - } catch (MetadigStoreException mse) { - log.error("Error saving task: " + task.getTaskName()); - JobExecutionException jee = new JobExecutionException("Unable to save new harvest date", mse); - jee.setRefireImmediately(false); - throw jee; + if (allIds > 0) { + // Record the new "last harvested" date + task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT), nodeId); + log.debug("Saving lastHarvestDate: " + dtfOut.print(lastDateModifiedDT)); + try { + store.saveTask(task, nodeId); + } catch (MetadigStoreException mse) { + log.error("Error saving task: " + task.getTaskName()); + JobExecutionException jee = new JobExecutionException("Unable to save new harvest date", mse); + jee.setRefireImmediately(false); + throw jee; + } + log.info(taskName + ": found " + allIds + " seriesIds" + " for start: " + startDTstr + ", end: " + + endDTstr + " at servierUrl: " + nodeServiceUrl); } - log.info(taskName + ": found " + allIds + " seriesIds" + " for start: " + startDTstr + ", end: " + endDTstr + " at servierUrl: " + nodeServiceUrl); } + } catch (Exception e) { + e.printStackTrace(); + throw new JobExecutionException("Cannot create store, unable to schedule job", e); } - store.shutdown(); + } /** - * Query a DataONE CN or MN object store for a list of object that match the time range and formatId filters provided. + * Query a DataONE CN or MN object store for a list of object that match the + * time range and formatId filters provided. * - * @param d1Node a DataONE CN or MN connection client object - * @param session a DataONE authentication session - * @param pidFilter the DataONE format identifies to filter for + * @param d1Node a DataONE CN or MN connection client object + * @param session a DataONE authentication session + * @param pidFilter the DataONE format identifies to filter for * @param startHarvestDatetimeStr the starting date to harvest pids from - * @param endHarvestDatetimeStr the ending data to harvest pids from - * @param startCount the start count for paging results from DataONE, for large results - * @param countRequested the number of items to get from DataONE on each request - * @param lastDateModifiedDT the sysmeta 'dateSystemMetadataModified' value of the last harvested pid + * @param endHarvestDatetimeStr the ending data to harvest pids from + * @param startCount the start count for paging results from + * DataONE, for large results + * @param countRequested the number of items to get from DataONE on + * each request + * @param lastDateModifiedDT the sysmeta 'dateSystemMetadataModified' value + * of the last harvested pid * @throws Exception if there is an exception while executing the job. * @return a ListResult object containing the matching pids * @throws Exception */ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session, - String pidFilter, String startHarvestDatetimeStr, String endHarvestDatetimeStr, - int startCount, int countRequested, DateTime lastDateModifiedDT, String taskName) throws Exception { + String pidFilter, String startHarvestDatetimeStr, String endHarvestDatetimeStr, + int startCount, int countRequested, DateTime lastDateModifiedDT, String taskName) throws Exception { MetadigProcessException metadigException = null; @@ -361,16 +396,21 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session, ArrayList pids = new ArrayList(); Document xmldoc = null; - String queryStr = "?q=formatId:" + pidFilter + "+-obsoletedBy:*" + "+dateModified:[" + startHarvestDatetimeStr + "%20TO%20" + String queryStr = "?q=formatId:" + pidFilter + "+-obsoletedBy:*" + "+dateModified:[" + startHarvestDatetimeStr + + "%20TO%20" + endHarvestDatetimeStr + "]" + "&fl=seriesId,dateModified&q.op=AND"; log.trace("query: " + queryStr); - // Send the query to DataONE Solr to retrieve portal seriesIds for a given time frame + // Send the query to DataONE Solr to retrieve portal seriesIds for a given time + // frame - // One query can return many documents, so use the paging mechanism to make sure we retrieve them all. - // Keep paging through query results until all pids have been fetched. The last 'page' of query - // results is indicated by the number of items returned being less than the number requested. + // One query can return many documents, so use the paging mechanism to make sure + // we retrieve them all. + // Keep paging through query results until all pids have been fetched. The last + // 'page' of query + // results is indicated by the number of items returned being less than the + // number requested. int thisResultLength; // Now setup the xpath to retrieve the ids returned from the collection query. try { @@ -387,14 +427,15 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session, throw metadigException; } - // Loop through the Solr result. As the result may be large, page through the results, accumulating + // Loop through the Solr result. As the result may be large, page through the + // results, accumulating // the pids returned into a ListResult object. - log.trace("Getting portal seriesIds from Solr " ); + log.trace("Getting portal seriesIds from Solr "); int startPos = startCount; do { xmldoc = DataONE.querySolr(queryStr, startPos, countRequested, d1Node, session); - if(xmldoc == null) { + if (xmldoc == null) { log.info("no values returned from query"); break; } @@ -409,7 +450,8 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session, String currentPid = null; thisResultLength = xpathResult.getLength(); log.trace("Got " + thisResultLength + " pids this query"); - if(thisResultLength == 0) break; + if (thisResultLength == 0) + break; for (int index = 0; index < xpathResult.getLength(); index++) { node = xpathResult.item(index); currentPid = node.getTextContent(); @@ -429,14 +471,15 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session, DateTime thisDateModified; thisResultLength = xpathResult.getLength(); - if(thisResultLength == 0) break; + if (thisResultLength == 0) + break; for (int index = 0; index < xpathResult.getLength(); index++) { node = xpathResult.item(index); String dateStr = node.getTextContent(); log.debug("Checking date str: " + dateStr); thisDateModified = DateTime.parse(dateStr, DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - if(thisDateModified.isAfter(lastDateModifiedDT)) { + if (thisDateModified.isAfter(lastDateModifiedDT)) { lastDateModifiedDT = thisDateModified.plusMillis(1); log.debug("Updated lastDateModified to " + lastDateModifiedDT); } @@ -455,32 +498,36 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session, } /** - * Submit a requst to the metadig controller to get quality score info and create a graph for the specified collection. + * Submit a requst to the metadig controller to get quality score info and + * create a graph for the specified collection. * * @param qualityServiceUrl the URL of the MetaDIG quality service - * @param collectionId the DataONE collection (portal) seriesId - * @param suiteId the quality suite to run for the collection - * @param nodeId the DataONE node identifier that the collection is hosted on - * @param formatFamily the format identifier family (e.g. "eml" for all EML format identifier versions) + * @param collectionId the DataONE collection (portal) seriesId + * @param suiteId the quality suite to run for the collection + * @param nodeId the DataONE node identifier that the collection is + * hosted on + * @param formatFamily the format identifier family (e.g. "eml" for all EML + * format identifier versions) * * @throws Exception * */ - public void submitScorerRequest(String qualityServiceUrl, String collectionId, String suiteId, String nodeId, String formatFamily) throws Exception { + public void submitScorerRequest(String qualityServiceUrl, String collectionId, String suiteId, String nodeId, + String formatFamily) throws Exception { InputStream runResultIS = null; String scorerServiceUrl = qualityServiceUrl + "/scores" + "?suite=" + suiteId; - if(collectionId != null && ! collectionId.isEmpty()) { + if (collectionId != null && !collectionId.isEmpty()) { scorerServiceUrl += "&id=" + collectionId; } - if(nodeId != null && ! nodeId.isEmpty()) { + if (nodeId != null && !nodeId.isEmpty()) { scorerServiceUrl += "&node=" + nodeId; } - if(formatFamily != null && ! formatFamily.isEmpty()) { + if (formatFamily != null && !formatFamily.isEmpty()) { scorerServiceUrl += "&format=" + formatFamily; } @@ -501,8 +548,7 @@ public void submitScorerRequest(String qualityServiceUrl, String collectionId, S runResultIS = reponseEntity.getContent(); } } catch (Exception e) { - throw(e); + throw (e); } } } - diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java b/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java index f191545b..b55b1f03 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java @@ -36,7 +36,7 @@ * @author slaughter * */ -public class DatabaseStore implements MDQStore { +public class DatabaseStore implements MDQStore, AutoCloseable { protected Log log = LogFactory.getLog(this.getClass()); @@ -404,7 +404,7 @@ public void renew() throws MetadigStoreException { } } - public void shutdown() { + public void close() { try { conn.close(); diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/store/InMemoryStore.java b/src/main/java/edu/ucsb/nceas/mdqengine/store/InMemoryStore.java index f721e249..3bbca652 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/store/InMemoryStore.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/store/InMemoryStore.java @@ -14,6 +14,7 @@ import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.xml.sax.SAXException; +import javax.script.ScriptContext; import javax.xml.bind.JAXBException; import java.io.IOException; import java.net.URL; @@ -49,6 +50,12 @@ public InMemoryStore() throws MetadigStoreException { this.init(); } + @Override + public void close() { + throw new UnsupportedOperationException( + "close() is not implemented for the InMemoryStore"); + } + protected Log log = LogFactory.getLog(this.getClass()); private void init() throws MetadigStoreException { @@ -267,10 +274,6 @@ public Task getTask(String taskName, String taskType, String nodeId) { public void saveTask(Task task, String nodeId) throws MetadigStoreException { } - @Override - public void shutdown() { - } - @Override public Node getNode(String nodeId) { return new Node(); diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java b/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java index 904c4b32..d52870f8 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java @@ -29,7 +29,7 @@ public interface MDQStore { void createRun(Run run); void deleteRun(Run run); - void shutdown(); + void close(); boolean isAvailable(); void renew() throws MetadigStoreException; From 2f06d3b0c05cd9dc2c56089e101be3f546ca44e2 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 4 Apr 2024 14:26:30 -0700 Subject: [PATCH 02/21] start the patch release --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index cd942b78..c58c9a1f 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 edu.ucsb.nceas metadig-engine - 3.0.0 + 3.0.1-SNAPSHOT jar metadig-engine MetaDIG library for running metadata quality tests From 0793f4db05233b2480ac846c723ac75e0a2e5489 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Mon, 8 Apr 2024 13:23:44 -0700 Subject: [PATCH 03/21] build bug branches --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a0543487..e31d36ef 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -75,7 +75,7 @@ jobs: docker-publish: name: Docker Build and Publish - if: github.ref_name == 'develop' || startsWith(github.ref, 'refs/tags/v') || startsWith(github.ref_name, 'feature') + if: github.ref_name == 'develop' || startsWith(github.ref, 'refs/tags/v') || startsWith(github.ref_name, 'bugfix') needs: maven-build runs-on: ubuntu-latest permissions: From d7a54d6d703026bccae22ef5e6e57d0ddb9aa61c Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 11 Apr 2024 15:44:53 -0700 Subject: [PATCH 04/21] prevent null pointer in jep startup --- .../java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java b/src/main/java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java index 2a366c44..df386777 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java @@ -281,6 +281,11 @@ public static void setupJep() throws MetadigException { throw new RuntimeException("Error reading metadig configuration, IOException: " + io); } } + // if its still null, throw a runtime exception + // we don't want to start without Jep configured properly + if (pythonFolder == null){ + throw new RuntimeException("Could not find path to jep install. Check JEP_LIBRARY_PATH in metadig.proerties and ensure it is correct."); + } // define the jep library path String jepPath = pythonFolder + "/libjep.jnilib"; From 972a55240f25858b0fd44f6d81cf4b5732fa4d39 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 11 Apr 2024 15:47:09 -0700 Subject: [PATCH 05/21] update chart and app versions --- helm/metadig-controller/Chart.yaml | 4 ++-- helm/metadig-controller/values.yaml | 2 +- helm/metadig-scheduler/Chart.yaml | 4 ++-- helm/metadig-scheduler/values.yaml | 2 +- helm/metadig-scorer/Chart.yaml | 4 ++-- helm/metadig-scorer/values.yaml | 2 +- helm/metadig-worker/Chart.yaml | 4 ++-- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/helm/metadig-controller/Chart.yaml b/helm/metadig-controller/Chart.yaml index a4eebb89..6bf53a1e 100644 --- a/helm/metadig-controller/Chart.yaml +++ b/helm/metadig-controller/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.0.4 +version: 1.0.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "3.0.0" +appVersion: "3.0.1-SNAPSHOT" diff --git a/helm/metadig-controller/values.yaml b/helm/metadig-controller/values.yaml index 0643279d..c96f2f75 100644 --- a/helm/metadig-controller/values.yaml +++ b/helm/metadig-controller/values.yaml @@ -7,7 +7,7 @@ replicaCount: 1 image: repository: ghcr.io/nceas/metadig-controller pullPolicy: Always - tag: "v.3.0.0" + tag: "bugfix-postgres-connections" imagePullSecrets: [] diff --git a/helm/metadig-scheduler/Chart.yaml b/helm/metadig-scheduler/Chart.yaml index 745af7e7..b7d2bb09 100644 --- a/helm/metadig-scheduler/Chart.yaml +++ b/helm/metadig-scheduler/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.0.4 +version: 1.0.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "3.0.0" +appVersion: "3.0.1-SNAPSHOT" diff --git a/helm/metadig-scheduler/values.yaml b/helm/metadig-scheduler/values.yaml index 8d49cb2d..ef86a228 100644 --- a/helm/metadig-scheduler/values.yaml +++ b/helm/metadig-scheduler/values.yaml @@ -7,7 +7,7 @@ replicaCount: 1 image: repository: ghcr.io/nceas/metadig-scheduler pullPolicy: Always - tag: "v.3.0.0" + tag: "bugfix-postgres-connections" imagePullSecrets: [] nameOverride: "" diff --git a/helm/metadig-scorer/Chart.yaml b/helm/metadig-scorer/Chart.yaml index 0ff7f758..648bc136 100644 --- a/helm/metadig-scorer/Chart.yaml +++ b/helm/metadig-scorer/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.0.4 +version: 1.0.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "3.0.0" +appVersion: "3.0.0-SNAPSHOT" diff --git a/helm/metadig-scorer/values.yaml b/helm/metadig-scorer/values.yaml index 3d6952df..96bf9753 100644 --- a/helm/metadig-scorer/values.yaml +++ b/helm/metadig-scorer/values.yaml @@ -7,7 +7,7 @@ replicaCount: 1 image: repository: ghcr.io/nceas/metadig-scorer pullPolicy: Always - tag: "v.3.0.0" + tag: "bugfix-postgres-connections" imagePullSecrets: [] diff --git a/helm/metadig-worker/Chart.yaml b/helm/metadig-worker/Chart.yaml index 18287cc9..8355692f 100644 --- a/helm/metadig-worker/Chart.yaml +++ b/helm/metadig-worker/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.0.4 +version: 1.0.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "3.0.0" +appVersion: "3.0.1-SNAPSHOT" From 625b2e799bd781dedb4ccd694acb43871200f02b Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Tue, 16 Apr 2024 16:05:30 -0700 Subject: [PATCH 06/21] only close the connection in the event of a rmq timeout --- src/main/java/edu/ucsb/nceas/mdqengine/Worker.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/Worker.java b/src/main/java/edu/ucsb/nceas/mdqengine/Worker.java index 471c8dd8..b0bd5876 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/Worker.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/Worker.java @@ -451,7 +451,6 @@ private void returnReport(String metadataPid, String suiteId, QueueEntry qEntry, try { log.info("Resetting RabbitMQ queues and resending completed report..."); // destroy channel before setting up queues again - RabbitMQchannel.close(); RabbitMQconnection.close(); // setup queues this.setupQueues(); From 04fbd7dd1f2298ca036860b90552264c1cf74186 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 18 Apr 2024 09:18:22 -0700 Subject: [PATCH 07/21] add an app label --- helm/metadig-worker/templates/deployment.yaml | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/helm/metadig-worker/templates/deployment.yaml b/helm/metadig-worker/templates/deployment.yaml index a56a5ee8..2311658a 100644 --- a/helm/metadig-worker/templates/deployment.yaml +++ b/helm/metadig-worker/templates/deployment.yaml @@ -3,7 +3,7 @@ kind: Deployment metadata: name: {{ include "metadig-worker.fullname" . }} labels: - {{- include "metadig-worker.labels" . | nindent 4 }} + app: metadig-worker spec: {{- if not .Values.autoscaling.enabled }} replicas: {{ .Values.replicaCount }} @@ -27,6 +27,8 @@ spec: serviceAccountName: {{ include "metadig-worker.serviceAccountName" . }} securityContext: {{- toYaml .Values.podSecurityContext | nindent 8 }} + topologySpreadConstraints: + {{- toYaml .Values.topologySpreadConstraints | nindent 8}} containers: - name: {{ .Chart.Name }} securityContext: @@ -60,14 +62,6 @@ spec: defaultMode: 0644 restartPolicy: Always {{- with .Values.nodeSelector }} - nodeSelector: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.affinity }} - affinity: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.tolerations }} tolerations: {{- toYaml . | nindent 8 }} {{- end }} From 7c54fd01fd9890061fd8d1a4aceafe5509b5b3f3 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 18 Apr 2024 09:19:51 -0700 Subject: [PATCH 08/21] add topology spread to worker chart to distribute pods evenly --- helm/metadig-worker/values.yaml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/helm/metadig-worker/values.yaml b/helm/metadig-worker/values.yaml index 5c2bc8bd..40d60b9e 100644 --- a/helm/metadig-worker/values.yaml +++ b/helm/metadig-worker/values.yaml @@ -7,7 +7,7 @@ replicaCount: 1 image: repository: ghcr.io/nceas/metadig-worker pullPolicy: Always - tag: "v.3.0.0" + tag: "bugfix-postgres-connections" imagePullSecrets: [] nameOverride: "" @@ -38,6 +38,14 @@ podAnnotations: {} podSecurityContext: {} # fsGroup: 2000 +topologySpreadConstraints: + - maxSkew: 10 + topologyKey: kubernetes.io/hostname + whenUnsatisfiable: ScheduleAnyway + labelSelector: + matchLabels: + app: metadig-worker + securityContext: {} # capabilities: # drop: From 27ec67c5aac457b586f7ba1a12a6ddb2c524bb6c Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 18 Apr 2024 09:20:29 -0700 Subject: [PATCH 09/21] match the default pool size to max client connections also increase the number of possible connections everywhere --- helm/metadig-postgres/config/pgbouncer.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helm/metadig-postgres/config/pgbouncer.ini b/helm/metadig-postgres/config/pgbouncer.ini index 293f0701..907c3751 100644 --- a/helm/metadig-postgres/config/pgbouncer.ini +++ b/helm/metadig-postgres/config/pgbouncer.ini @@ -198,7 +198,7 @@ max_client_conn = 200 ; default pool size. 20 is good number when transaction pooling ; is in use, in session pooling it needs to be the number of ; max clients you want to handle at any moment -default_pool_size = 50 +default_pool_size = 200 ;; Minimum number of server connections to keep in pool. ;min_pool_size = 0 @@ -213,7 +213,7 @@ default_pool_size = 50 ;max_db_connections = 0 ;max_user_connections = 0 -max_db_connections = 90 +max_db_connections = 200 #max_user_connections = 100 max_user_connections = 200 From b316ed4b8d3fe235730ea5839ab7582916c9be77 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 18 Apr 2024 09:20:48 -0700 Subject: [PATCH 10/21] update postgres chart version --- helm/metadig-postgres/Chart.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm/metadig-postgres/Chart.yaml b/helm/metadig-postgres/Chart.yaml index 19d06044..4ac365ee 100644 --- a/helm/metadig-postgres/Chart.yaml +++ b/helm/metadig-postgres/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.0.0 +version: 1.0.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to From 9c499ac150b9d7c1d6c0e0c3f366b910476587b6 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Mon, 22 Apr 2024 15:24:49 -0700 Subject: [PATCH 11/21] see if we can install metadig-rake into the worker container --- Docker/metadig-worker/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Docker/metadig-worker/Dockerfile b/Docker/metadig-worker/Dockerfile index b1a24c53..fd4e3519 100644 --- a/Docker/metadig-worker/Dockerfile +++ b/Docker/metadig-worker/Dockerfile @@ -34,6 +34,7 @@ ENV PATH="$PATH:$JAVA_HOME/bin" RUN echo 'options(repos = c(CRAN = "http://cran.rstudio.com"))' >> /usr/lib/R/etc/Rprofile.site RUN Rscript -e "install.packages(c('remotes', 'stringr', 'jsonlite'))" RUN Rscript -e "remotes::install_github('NCEAS/metadig-r', ref = 'v.0.2.0')" +RUN Rscript -e "remotes::install_github('NCEAS/metadig-rake', ref = 'develop')" # DataONE indexer prints copious error msgs if these files don't exist RUN mkdir -p /etc/dataone/index && touch /etc/dataone/index/d1client.properties && touch /etc/dataone/node.properties && touch /etc/dataone/index/cn-index-processor.properties From dca11faeed5be20d7c6264db031dc71f8e1e3ce0 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 2 May 2024 14:55:59 -0700 Subject: [PATCH 12/21] fix formatting --- .../java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java b/src/main/java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java index df386777..cb571618 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/dispatch/Dispatcher.java @@ -283,8 +283,9 @@ public static void setupJep() throws MetadigException { } // if its still null, throw a runtime exception // we don't want to start without Jep configured properly - if (pythonFolder == null){ - throw new RuntimeException("Could not find path to jep install. Check JEP_LIBRARY_PATH in metadig.proerties and ensure it is correct."); + if (pythonFolder == null) { + throw new RuntimeException( + "Could not find path to jep install. Check JEP_LIBRARY_PATH in metadig.proerties and ensure it is correct."); } // define the jep library path From 9ba6853e85927ebd28426ebc40c046388bba3cf7 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 2 May 2024 15:02:12 -0700 Subject: [PATCH 13/21] replace print statements with actual logging --- .../java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java index 710c3eb4..dee2fb95 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java @@ -98,7 +98,7 @@ public void execute(JobExecutionContext context) try { nodeList = cnNode.listNodes(); } catch (NotImplemented | ServiceFailure e) { - e.printStackTrace(); + log.error(taskName + ": cannot renew store, unable to schedule job", e); throw new JobExecutionException(taskName + ": cannot renew store, unable to schedule job", e); } @@ -110,7 +110,7 @@ public void execute(JobExecutionContext context) try { store.renew(); } catch (MetadigStoreException e) { - e.printStackTrace(); + log.error(taskName + ": cannot renew store, unable to schedule job" + e); throw new JobExecutionException(taskName + ": cannot renew store, unable to schedule job", e); } } @@ -132,7 +132,7 @@ public void execute(JobExecutionContext context) try { store.saveNode(node); } catch (MetadigStoreException mse) { - mse.printStackTrace(); + log.error("Cannot save node to store." + mse.getStackTrace()); throw new JobExecutionException( "Cannot save node " + node.getIdentifier().getValue() + " to store", mse); @@ -163,7 +163,7 @@ public void execute(JobExecutionContext context) } } catch (Exception e) { - e.printStackTrace(); + log.error(taskName + ": cannot create store, unable to schedule job", e); throw new JobExecutionException(taskName + ": cannot create store, unable to schedule job", e); } } From f880f6d498261112907975cc5a3f9f8e7e56aec0 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 2 May 2024 15:42:50 -0700 Subject: [PATCH 14/21] change more prints to logging statements --- .../ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java index c3759a54..aa19c643 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java @@ -185,7 +185,7 @@ public void execute(JobExecutionContext context) try { d1Node = DataONE.getMultipartD1Node(session, nodeServiceUrl); } catch (MetadigException mpe) { - mpe.printStackTrace(); + log.error(taskName + ": unable to create connection to service URL " + nodeServiceUrl, mpe); throw new JobExecutionException(taskName + ": unable to create connection to service URL " + nodeServiceUrl, mpe); } @@ -197,7 +197,7 @@ public void execute(JobExecutionContext context) try { store.renew(); } catch (MetadigStoreException e) { - e.printStackTrace(); + log.error("Cannot renew store, unable to schedule job", e); throw new JobExecutionException("Cannot renew store, unable to schedule job", e); } } @@ -357,7 +357,7 @@ public void execute(JobExecutionContext context) } } } catch (Exception e) { - e.printStackTrace(); + log.error("Cannot create store, unable to schedule job", e); throw new JobExecutionException("Cannot create store, unable to schedule job", e); } From 1d36e9d9c7e2ee47838d915a53986a0b9b7c08c9 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 2 May 2024 15:43:52 -0700 Subject: [PATCH 15/21] more formatting --- .../edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java index aa19c643..c2a07eb9 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java @@ -397,9 +397,7 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session, Document xmldoc = null; String queryStr = "?q=formatId:" + pidFilter + "+-obsoletedBy:*" + "+dateModified:[" + startHarvestDatetimeStr - + "%20TO%20" - + endHarvestDatetimeStr + "]" - + "&fl=seriesId,dateModified&q.op=AND"; + + "%20TO%20" + endHarvestDatetimeStr + "]" + "&fl=seriesId,dateModified&q.op=AND"; log.trace("query: " + queryStr); // Send the query to DataONE Solr to retrieve portal seriesIds for a given time From 845a8abaf21e1c0bffcc222ef5f047919396d309 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 2 May 2024 15:45:06 -0700 Subject: [PATCH 16/21] increase max db connections to account for su connection usage --- helm/metadig-postgres/config/pgbouncer.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm/metadig-postgres/config/pgbouncer.ini b/helm/metadig-postgres/config/pgbouncer.ini index 907c3751..eccccd78 100644 --- a/helm/metadig-postgres/config/pgbouncer.ini +++ b/helm/metadig-postgres/config/pgbouncer.ini @@ -213,7 +213,7 @@ default_pool_size = 200 ;max_db_connections = 0 ;max_user_connections = 0 -max_db_connections = 200 +max_db_connections = 220 #max_user_connections = 100 max_user_connections = 200 From 8736e90b6d303b4387646650a35a2c25e0fda239 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 2 May 2024 15:49:06 -0700 Subject: [PATCH 17/21] fomat a bunch of comments --- .../mdqengine/scheduler/RequestScorerJob.java | 47 +++++++------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java index c2a07eb9..abc56117 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java @@ -130,9 +130,8 @@ public void execute(JobExecutionContext context) String pidFilter = dataMap.getString("pidFilter"); String suiteId = dataMap.getString("suiteId"); // The nodeId is used for filterine queries based on DataONE sysmeta - // 'datasource'. - // For example, if one wished to get scores for Arctic Data Center, the - // urn:node:ARCTIC would be specified. + // 'datasource'. For example, if one wished to get scores for Arctic Data + // Center, the urn:node:ARCTIC would be specified. String nodeId = dataMap.getString("nodeId"); String startHarvestDatetimeStr = dataMap.getString("startHarvestDatetime"); int harvestDatetimeInc = dataMap.getInt("harvestDatetimeInc"); @@ -213,10 +212,8 @@ public void execute(JobExecutionContext context) task = store.getTask(taskName, taskType, nodeId); // If a 'task' entry has not been saved for this task name yet, then a - // 'lastHarvested' - // DataTime will not be available, in which case the 'startHarvestDataTime' from - // the - // config file will be used. + // 'lastHarvested' DataTime will not be available, in which case the + // 'startHarvestDataTime' from the config file will be used. if (task.getLastHarvestDatetime(nodeId) == null) { task = new Task(); task.setTaskName(taskName); @@ -229,12 +226,9 @@ public void execute(JobExecutionContext context) DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr); // Set the search start datetime to the last harvest datetime, unless it is in - // the - // future. (This can happen when the previous time range end was for the current - // day, - // as the end datetime range for the previous task run will have been stored as - // the - // new lastharvestDateTime. + // the future. (This can happen when the previous time range end was for the + // current day, as the end datetime range for the previous task run will have + // been stored as the new lastharvestDateTime. DateTime startDT = null; if (lastHarvestDateDT.isAfter(currentDT.toInstant())) { startDT = currentDT; @@ -245,15 +239,13 @@ public void execute(JobExecutionContext context) DateTime endDT = new DateTime(currentDT); // If the start and end harvest dates are the same (happends for a new node), - // then - // tweek the start so that DataONE listObjects doesn't complain. + // then tweak the start so that DataONE listObjects doesn't complain. if (startDT == endDT) { startDT = startDT.minusMinutes(1); } // Track the sysmeta dateUploaded of the latest harvested pid. This will become - // the starting time of - // the next harvest. + // the starting time of the next harvest. DateTime lastDateModifiedDT = startDT; String startDTstr = dtfOut.print(startDT); @@ -264,10 +256,9 @@ public void execute(JobExecutionContext context) Integer resultCount = 0; // Two types of score requests can be processed - a "node" request that will get - // score info for an - // entire repository (e.g. urn:node:ARCTIC) or a "portal" request that will get - // scores for a - // specific portal (from the Solr portal entry collectionQuery). + // score info for an entire repository (e.g. urn:node:ARCTIC) or a "portal" + // request that will get scores for a specific portal (from the Solr portal + // entry collectionQuery). if (requestType != null && requestType.equalsIgnoreCase("node")) { try { // For a 'node' scores request, the 'collection' is the entire node, so specify @@ -327,8 +318,7 @@ public void execute(JobExecutionContext context) } // Check if DataONE returned the max number of results. If so, we have to - // request more by paging through - // the results. + // request more by paging through the results. allIds += pidsToProcess.size(); if (resultCount >= countRequested) { morePids = true; @@ -404,11 +394,9 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session, // frame // One query can return many documents, so use the paging mechanism to make sure - // we retrieve them all. - // Keep paging through query results until all pids have been fetched. The last - // 'page' of query - // results is indicated by the number of items returned being less than the - // number requested. + // we retrieve them all. Keep paging through query results until all pids have + // been fetched. The last 'page' of query results is indicated by the number of + // items returned being less than the number requested. int thisResultLength; // Now setup the xpath to retrieve the ids returned from the collection query. try { @@ -426,8 +414,7 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session, } // Loop through the Solr result. As the result may be large, page through the - // results, accumulating - // the pids returned into a ListResult object. + // results, accumulating the pids returned into a ListResult object. log.trace("Getting portal seriesIds from Solr "); int startPos = startCount; From d323856d22b1f4ff55fbc3a3da3333d6efef2719 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 2 May 2024 16:00:47 -0700 Subject: [PATCH 18/21] fix line wrapping --- .../mdqengine/scheduler/RequestReportJob.java | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java index 9a99209c..d99d8c8c 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java @@ -249,12 +249,10 @@ public void execute(JobExecutionContext context) harvestNodeId = node.getIdentifier().getValue(); // If processing a CN, check each MN to see if it is being synchronized and if - // it - // is marked as up. + // it is marked as up. if (isCN) { // The NodeList task doesn't save CN entries from the DataONE 'listNodes()' - // service, but check - // just in case. + // service, but check just in case. if (node.getType().equals(NodeType.CN)) { log.debug("Harvesting from CN, skipping CN entry from node list for " + node.getIdentifier().getValue()); @@ -300,10 +298,9 @@ public void execute(JobExecutionContext context) Task task; task = store.getTask(taskName, taskType, harvestNodeId); // If a 'task' entry has not been saved for this task name yet (i.e. this is an - // MN that has just been - // registerd with DataONE), then a 'lastHarvested' DataTime will not be - // available, in which case the - // 'startHarvestDataTime' from the config file will be used. + // MN that has just been registerd with DataONE), then a 'lastHarvested' + // DataTime will not be available, in which case the 'startHarvestDataTime' from + // the config file will be used. if (task.getLastHarvestDatetime(harvestNodeId) == null) { task.setTaskName(taskName); task.setTaskType(taskType); @@ -315,12 +312,9 @@ public void execute(JobExecutionContext context) DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr); // Set the search start datetime to the last harvest datetime, unless it is in - // the - // future. (This can happen when the previous time range end was for the current - // day, - // as the end datetime range for the previous task run will have been stored as - // the - // new lastharvestDateTime. + // the future. (This can happen when the previous time range end was for the + // current day, as the end datetime range for the previous task run will have + // been stored as the new lastharvestDateTime. DateTime startDT = null; if (lastHarvestDateDT.isAfter(currentDT.toInstant())) { startDT = currentDT; @@ -331,15 +325,13 @@ public void execute(JobExecutionContext context) DateTime endDT = new DateTime(currentDT); // If the start and end harvest dates are the same (happens for a new node), - // then - // tweak the start so that DataONE listObjects doesn't complain. + // then tweak the start so that DataONE listObjects doesn't complain. if (startDT == endDT) { startDT = startDT.minusMinutes(1); } // Track the sysmeta dateUploaded of the latest harvested pid. This will become - // the starting time of - // the next harvest. + // the starting time of the next harvest. DateTime lastDateModifiedDT = startDT; String startDTstr = dtfOut.print(startDT); @@ -467,8 +459,7 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, Boolean replicaStatus = false; // Do some back-flips to convert the start and end date to the ancient Java - // 'Date' type that is - // used by DataONE 'listObjects()'. + // 'Date' type that is used by DataONE 'listObjects()'. ZonedDateTime zdt = ZonedDateTime.parse(startHarvestDatetimeStr); // start date milliseconds since the epoch date "midnight, January 1, 1970 UTC" long msSinceEpoch = zdt.toInstant().toEpochMilli(); From dac499b4f713c17e61e780fa597ceb1fe8998253 Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 2 May 2024 16:03:02 -0700 Subject: [PATCH 19/21] remove an unecessary try block and more line wrapping --- .../mdqengine/scheduler/RequestReportJob.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java index d99d8c8c..bfd9866c 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java @@ -205,23 +205,13 @@ public void execute(JobExecutionContext context) // Get a connection to the database - try (DatabaseStore store = new DatabaseStore();) { - - if (!store.isAvailable()) { - try { - store.renew(); - } catch (MetadigStoreException e) { - e.printStackTrace(); - throw new JobExecutionException("Cannot renew store, unable to schedule job", e); - } - } + try (DatabaseStore store = new DatabaseStore()) { ArrayList nodes = new ArrayList<>(); /* * If the CN is being harvested, then get all the nodes in the node db. The node - * db contains - * info about all nodes registered with the CN. + * db contains info about all nodes registered with the CN. */ if (isCN) { nodes = store.getNodes(); From da30cec981334b8782cbc3ecbd93160b3f38aedd Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Thu, 2 May 2024 16:04:33 -0700 Subject: [PATCH 20/21] formatting --- src/main/java/edu/ucsb/nceas/mdqengine/collections/Runs.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/collections/Runs.java b/src/main/java/edu/ucsb/nceas/mdqengine/collections/Runs.java index 6eeb12bb..37cbd276 100644 --- a/src/main/java/edu/ucsb/nceas/mdqengine/collections/Runs.java +++ b/src/main/java/edu/ucsb/nceas/mdqengine/collections/Runs.java @@ -225,8 +225,7 @@ public void getRunSequence(Run run, String suiteId, Boolean stopWhenSIfound) thr // Start the traversal in the backward direction log.debug("Getting all runs (backward) for suiteId: " + suiteId + ", metadataId: " + metadataId - + ", minDate: " - + minDate + ", " + maxDate); + + ", minDate: " + minDate + ", " + maxDate); forward = false; getNextRun(metadataId, suiteId, stopWhenSIfound, store, forward, level); From a114df93b17d70586c0b314037bc2ccdb84fc98a Mon Sep 17 00:00:00 2001 From: Jeanette Clark Date: Fri, 3 May 2024 11:14:58 -0700 Subject: [PATCH 21/21] move labels to be configurable --- helm/metadig-worker/templates/deployment.yaml | 2 +- helm/metadig-worker/values.yaml | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/helm/metadig-worker/templates/deployment.yaml b/helm/metadig-worker/templates/deployment.yaml index 2311658a..1b1b8376 100644 --- a/helm/metadig-worker/templates/deployment.yaml +++ b/helm/metadig-worker/templates/deployment.yaml @@ -3,7 +3,7 @@ kind: Deployment metadata: name: {{ include "metadig-worker.fullname" . }} labels: - app: metadig-worker + {{- include "metadig-worker.labels" . | nindent 4 }} spec: {{- if not .Values.autoscaling.enabled }} replicas: {{ .Values.replicaCount }} diff --git a/helm/metadig-worker/values.yaml b/helm/metadig-worker/values.yaml index 40d60b9e..cede60de 100644 --- a/helm/metadig-worker/values.yaml +++ b/helm/metadig-worker/values.yaml @@ -4,6 +4,9 @@ replicaCount: 1 +labels: + app: metadig-worker + image: repository: ghcr.io/nceas/metadig-worker pullPolicy: Always