diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 1a544a3dfdd..aff097bbdc4 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -43,6 +43,12 @@ public class ServiceMetricNames { public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler"; public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount"; + // DagManager Related Metrics + public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + ".dagManager"; + public static final String + DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount"; + public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount"; + //Job status poll timer public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time"; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java index 3e11d7c7228..eb26acd161a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java @@ -45,6 +45,15 @@ class DagAction { public FlowId getFlowId() { return new FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName); } + + /** + * Replace flow execution id with agreed upon event time to easily track the flow + */ + public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction, + long eventTimeMillis) { + return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(), + String.valueOf(eventTimeMillis), flowAction.getFlowActionType()); + } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java index faacb099574..253db49ba92 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java @@ -79,28 +79,41 @@ abstract class LeaseAttemptStatus {} class NoLongerLeasingStatus extends LeaseAttemptStatus {} /* - The participant calling this method acquired the lease for the event in question. The class contains the - `eventTimestamp` associated with the lease as well as the time the caller obtained the lease or - `leaseAcquisitionTimestamp`. + The participant calling this method acquired the lease for the event in question. `Flow action`'s flow execution id + is the timestamp associated with the lease and the time the caller obtained the lease is stored within the + `leaseAcquisitionTimestamp` field. */ @Data class LeaseObtainedStatus extends LeaseAttemptStatus { private final DagActionStore.DagAction flowAction; - private final long eventTimestamp; private final long leaseAcquisitionTimestamp; + + /** + * @return event time in millis since epoch for the event of this lease acquisition + */ + public long getEventTimeMillis() { + return Long.parseLong(flowAction.getFlowExecutionId()); + } } /* This flow action event already has a valid lease owned by another participant. - `eventTimeMillis` is the timestamp the lease is associated with, which may be a different timestamp for the same flow - action corresponding to the same instance of the event or a distinct one. + `Flow action`'s flow execution id is the timestamp the lease is associated with, however the flow action event it + corresponds to may be a different and distinct occurrence of the same event. `minimumLingerDurationMillis` is the minimum amount of time to wait before this participant should return to check if the lease has completed or expired */ @Data class LeasedToAnotherStatus extends LeaseAttemptStatus { private final DagActionStore.DagAction flowAction; - private final long eventTimeMillis; private final long minimumLingerDurationMillis; + + /** + * Returns event time in millis since epoch for the event whose lease was obtained by another participant. + * @return + */ + public long getEventTimeMillis() { + return Long.parseLong(flowAction.getFlowExecutionId()); + } } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 4c2e8d2da2b..c6161d93695 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -41,6 +41,8 @@ import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.util.ConfigUtils; +import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*; + /** * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which uses a MySQL store to resolve ownership of @@ -242,7 +244,6 @@ private void runRetentionOnArbitrationTable() { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); Runnable retentionTask = () -> { try { - Thread.sleep(10000); withPreparedStatement(thisTableRetentionStatement, retentionStatement -> { retentionStatement.setLong(1, retentionPeriodMillis); @@ -253,7 +254,7 @@ private void runRetentionOnArbitrationTable() { } return numRowsDeleted; }, true); - } catch (InterruptedException | IOException e) { + } catch (IOException e) { log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and " + "affect our system performance. Examine exception: ", e); } @@ -307,7 +308,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l } if (eventTimeMillis == dbEventTimestamp.getTime()) { // TODO: change this to a debug after fixing issue - log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time" + log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time " + "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp); } @@ -320,16 +321,18 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l // Lease is valid if (leaseValidityStatus == 1) { if (isWithinEpsilon) { + DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime()); log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid", - flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); + updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db timestamp for reminder - return new LeasedToAnotherStatus(flowAction, dbEventTimestamp.getTime(), + return new LeasedToAnotherStatus(updatedFlowAction, dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); } + DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime()); log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid", - flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); + updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db lease acquisition timestamp for wait time - return new LeasedToAnotherStatus(flowAction, dbCurrentTimestamp.getTime(), + return new LeasedToAnotherStatus(updatedFlowAction, dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); } // Lease is invalid else if (leaseValidityStatus == 2) { @@ -515,16 +518,16 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated, if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) { return new NoLongerLeasingStatus(); } + DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis); if (numRowsUpdated == 1) { - log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", flowAction, + log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis); - return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis, - selectInfoResult.getLeaseAcquisitionTimeMillis().get()); + return new LeaseObtainedStatus(updatedFlowAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()); } log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}", - flowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated); + updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated); // Another participant acquired lease in between - return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(), + return new LeasedToAnotherStatus(updatedFlowAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get() + selectInfoResult.getDbLinger() - (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis())); } @@ -599,22 +602,22 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status) updateStatement.setString(++i, flowGroup); updateStatement.setString(++i, flowName); updateStatement.setString(++i, flowActionType.toString()); - updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL.get()); + updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimeMillis()), UTC_CAL.get()); updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get()); int numRowsUpdated = updateStatement.executeUpdate(); if (numRowsUpdated == 0) { log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because " + "lease expired or event cleaned up before host completed required actions", flowAction, - status.getEventTimestamp()); + status.getEventTimeMillis()); return false; } if( numRowsUpdated == 1) { log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - COMPLETED, no longer leasing" - + " this event after this.", flowAction, status.getEventTimestamp()); + + " this event after this.", flowAction, status.getEventTimeMillis()); return true; }; throw new IOException(String.format("Attempt to complete lease use: [%s, eventTimestamp: %s] - updated more " - + "rows than expected", flowAction, status.getEventTimestamp())); + + "rows than expected", flowAction, status.getEventTimeMillis())); }, true); } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java index 33734cfcbad..8fc1258ab50 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java @@ -43,14 +43,17 @@ public class RuntimeMetrics { public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed"; public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.kills.invoked"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.message.processed"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.resumes.invoked"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.flows.launched"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.unexpected.errors"; - public static final String - GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.produce.to.consume.delay"; + public static final String DAG_ACTION_STORE_MONITOR_PREFIX = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched"; + public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors"; + public static final String + GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay"; public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors"; public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded"; public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota"; diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java index 15090e8f14b..7bafc78ff34 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java @@ -88,10 +88,11 @@ public void testAcquireLeaseSingleParticipant() throws Exception { Assert.assertTrue(firstLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus); MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus; - Assert.assertTrue(firstObtainedStatus.getEventTimestamp() <= + Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <= firstObtainedStatus.getLeaseAcquisitionTimestamp()); Assert.assertTrue(firstObtainedStatus.getFlowAction().equals( - new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH))); + new DagActionStore.DagAction(flowGroup, flowName, String.valueOf(firstObtainedStatus.getEventTimeMillis()), + DagActionStore.FlowActionType.LAUNCH))); // Verify that different DagAction types for the same flow can have leases at the same time DagActionStore.DagAction killDagAction = new @@ -102,7 +103,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception { MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus; Assert.assertTrue( - killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimestamp()); + killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimeMillis()); // Tests CASE 2 of acquire lease for a flow action event that already has a valid lease for the same event in db // Very little time should have passed if this test directly follows the one above so this call will be considered @@ -112,7 +113,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception { Assert.assertTrue(secondLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus); MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus = (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus; - Assert.assertEquals(firstObtainedStatus.getEventTimestamp(), secondLeasedToAnotherStatus.getEventTimeMillis()); + Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), secondLeasedToAnotherStatus.getEventTimeMillis()); Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() > 0); // Tests CASE 3 of trying to acquire a lease for a distinct flow action event, while the previous event's lease is @@ -124,7 +125,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception { Assert.assertTrue(thirdLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus); MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus = (MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus; - Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > firstObtainedStatus.getEventTimestamp()); + Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > firstObtainedStatus.getEventTimeMillis()); Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() < LINGER); // Tests CASE 4 of lease out of date @@ -134,14 +135,14 @@ public void testAcquireLeaseSingleParticipant() throws Exception { Assert.assertTrue(fourthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus); MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus; - Assert.assertTrue(fourthObtainedStatus.getEventTimestamp() > eventTimeMillis + LINGER); - Assert.assertTrue(fourthObtainedStatus.getEventTimestamp() + Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis() > eventTimeMillis + LINGER); + Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis() <= fourthObtainedStatus.getLeaseAcquisitionTimestamp()); // Tests CASE 5 of no longer leasing the same event in DB // done immediately after previous lease obtainment so should be marked as the same event Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus)); - Assert.assertTrue(System.currentTimeMillis() - fourthObtainedStatus.getEventTimestamp() < EPSILON); + Assert.assertTrue(System.currentTimeMillis() - fourthObtainedStatus.getEventTimeMillis() < EPSILON); MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus = mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false); Assert.assertTrue(fifthLaunchStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus); @@ -154,7 +155,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception { Assert.assertTrue(sixthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus); MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus; - Assert.assertTrue(sixthObtainedStatus.getEventTimestamp() + Assert.assertTrue(sixthObtainedStatus.getEventTimeMillis() <= sixthObtainedStatus.getLeaseAcquisitionTimestamp()); } @@ -216,8 +217,10 @@ public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction); + DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction, + selectInfoResult.getEventTimeMillis()); boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus( - resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get())); + updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get())); Assert.assertTrue(markedSuccess); // Ensure no NPE results from calling this after a lease has been completed and acquisition timestamp val is NULL mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, resumeDagAction, @@ -281,7 +284,7 @@ public void testReminderEventAcquireLeaseOnInvalidLease() throws IOException, In mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true); Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus); LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus; - Assert.assertTrue(obtainedStatus.getEventTimestamp() > selectInfoResult.getEventTimeMillis()); + Assert.assertTrue(obtainedStatus.getEventTimeMillis() > selectInfoResult.getEventTimeMillis()); Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() > selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue()); } @@ -296,8 +299,10 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException, // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction); + DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction, + selectInfoResult.getEventTimeMillis()); boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus( - resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get())); + updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get())); Assert.assertTrue(markedSuccess); // Sleep enough time for the event to have been considered distinct diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index c8d6bf598c4..b4ec9c0ceef 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -306,6 +306,8 @@ protected void startUp() { * Note this should only be called from the {@link Orchestrator} or {@link org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor} */ public synchronized void addDag(Dag dag, boolean persist, boolean setStatus) throws IOException { + // TODO: Used to track missing dag issue, remove later as needed + log.info("Add dag (persist: {}, setStatus: {}): {}", persist, setStatus, dag); if (!isActive) { log.warn("Skipping add dag because this instance of DagManager is not active for dag: {}", dag); return; @@ -509,14 +511,19 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { // Upon handling the action, delete it so on leadership change this is not duplicated this.dagActionStore.get().deleteDagAction(launchAction); } catch (URISyntaxException e) { - log.warn("Could not create URI object for flowId {} due to exception {}", flowId, e.getMessage()); + log.warn(String.format("Could not create URI object for flowId %s due to exception", flowId), e); + this.dagManagerMetrics.incrementFailedLaunchCount(); } catch (SpecNotFoundException e) { - log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage()); + log.warn(String.format("Spec not found for flowId %s due to exception", flowId), e); + this.dagManagerMetrics.incrementFailedLaunchCount(); } catch (IOException e) { - log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag action from dagActionStore (check " - + "stacktrace) due to exception {}", flowId, e.getMessage()); + log.warn(String.format("Failed to add Job Execution Plan for flowId %s OR delete dag action from dagActionStore " + + "(check stacktrace) due to exception", flowId), e); + this.dagManagerMetrics.incrementFailedLaunchCount(); } catch (InterruptedException e) { - log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e); + log.warn(String.format("SpecCompiler failed to reach healthy state before compilation of flowId %s due to " + + "exception", flowId), e); + this.dagManagerMetrics.incrementFailedLaunchCount(); } } @@ -620,6 +627,8 @@ public void run() { } //Initialize dag. initialize(dag); + } else { + log.warn("Null dag despite non-empty queue; ignoring the dag"); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index a5f34cff7f2..6d6c545b5b4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -75,6 +75,9 @@ public class DagManagerMetrics { private final Map executorStartSlaExceededMeters = Maps.newConcurrentMap(); private final Map executorSlaExceededMeters = Maps.newConcurrentMap(); private final Map executorJobSentMeters = Maps.newConcurrentMap(); + + // Metrics for unexpected flow handling failures + private ContextAwareCounter failedLaunchEventsOnActivationCount; MetricContext metricContext; public DagManagerMetrics(MetricContext metricContext) { @@ -100,6 +103,9 @@ public void activate() { ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER)); allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR)); + failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter( + MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, + ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT)); } } @@ -199,6 +205,13 @@ public void incrementCountsStartSlaExceeded(Dag.DagNode node) } } + // Increment the count for num of failed launches during leader activation + public void incrementFailedLaunchCount() { + if (this.metricContext != null) { + this.failedLaunchEventsOnActivationCount.inc(); + } + } + private List getRunningJobsCounterForUser(Dag.DagNode dagNode) { Config configs = dagNode.getValue().getJobSpec().getConfig(); String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index 6d9dcc9d6a9..8abaa209c25 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -117,13 +117,15 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo this.leaseObtainedCount.inc(); if (persistFlowAction(leaseObtainedStatus)) { log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(), - leaseObtainedStatus.getEventTimestamp()); + leaseObtainedStatus.getEventTimeMillis()); return; } // If persisting the flow action failed, then we set another trigger for this event to occur immediately to // re-attempt handling the event + DagActionStore.DagAction updatedFlowAction = DagActionStore.DagAction.updateFlowExecutionId(flowAction, + leaseObtainedStatus.getEventTimeMillis()); scheduleReminderForEvent(jobProps, - new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, leaseObtainedStatus.getEventTimestamp(), 0L), + new MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L), eventTimeMillis); return; } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 7afd8bba4c2..bcb1743200a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service.modules.orchestration; +import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; @@ -93,6 +94,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { private Optional flowOrchestrationFailedMeter; @Getter private Optional flowOrchestrationTimer; + private Optional flowFailedForwardToDagManagerCounter; @Setter private FlowStatusGenerator flowStatusGenerator; @@ -137,12 +139,14 @@ public Orchestrator(Config config, Optional topologyCatalog, Op this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER)); this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER)); this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER)); + this.flowFailedForwardToDagManagerCounter = Optional.of(this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT)); this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build()); } else { this.metricContext = null; this.flowOrchestrationSuccessFulMeter = Optional.absent(); this.flowOrchestrationFailedMeter = Optional.absent(); this.flowOrchestrationTimer = Optional.absent(); + this.flowFailedForwardToDagManagerCounter = Optional.absent(); this.eventSubmitter = Optional.absent(); } this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, @@ -337,6 +341,7 @@ public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, Interr if (optionalJobExecutionPlanDag.isPresent()) { submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get()); } else { + _log.warn("Flow: {} submitted to dagManager failed to compile and produce a job execution plan dag", flowSpec); Instrumented.markMeter(this.flowOrchestrationFailedMeter); } } @@ -347,9 +352,13 @@ public void submitFlowToDagManager(FlowSpec flowSpec, Dag jobE //Send the dag to the DagManager. this.dagManager.get().addDag(jobExecutionPlanDag, true, true); } catch (Exception ex) { + String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage(); + _log.warn("Orchestrator call - " + failureMessage, ex); + if (this.flowFailedForwardToDagManagerCounter.isPresent()) { + this.flowFailedForwardToDagManagerCounter.get().inc(); + } if (this.eventSubmitter.isPresent()) { // pronounce failed before stack unwinds, to ensure flow not marooned in `COMPILED` state; (failure likely attributable to DB connection/failover) - String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage(); Map flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage); new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 870b68f53dd..1435e076ae1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -58,8 +58,10 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer { private ContextAwareMeter killsInvoked; private ContextAwareMeter resumesInvoked; private ContextAwareMeter flowsLaunched; + private ContextAwareMeter failedFlowLaunchSubmissions; private ContextAwareMeter unexpectedErrors; private ContextAwareMeter messageProcessedMeter; + private ContextAwareMeter messageFilteredOutMeter; private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge private volatile Long produceToConsumeDelayValue = -1L; @@ -130,19 +132,23 @@ protected void processMessage(DecodeableKafkaRecord message) { String changeIdentifier = tid + key; if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, dagActionsSeenCache, operation, produceTimestamp.toString())) { + this.messageFilteredOutMeter.mark(); return; } + // Used to easily log information to identify the dag action + DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, + dagActionType); + // We only expect INSERT and DELETE operations done to this table. INSERTs correspond to any type of // {@link DagActionStore.FlowActionType} flow requests that have to be processed. DELETEs require no action. try { if (operation.equals("INSERT")) { + log.info("DagAction change ({}) received for flow: {}", dagActionType, dagAction); if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) { - log.info("Received insert dag action and about to send resume flow request"); dagManager.handleResumeFlowRequest(flowGroup, flowName,Long.parseLong(flowExecutionId)); this.resumesInvoked.mark(); } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) { - log.info("Received insert dag action and about to send kill flow request"); dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId)); this.killsInvoked.mark(); } else if (dagActionType.equals(DagActionStore.FlowActionType.LAUNCH)) { @@ -150,10 +156,8 @@ protected void processMessage(DecodeableKafkaRecord message) { if (!this.isMultiActiveSchedulerEnabled) { this.unexpectedErrors.mark(); throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler " - + "mode for flowAction: %s", - new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, dagActionType))); + + "mode for flowAction: %s", dagAction)); } - log.info("Received insert dag action and about to forward launch request to DagManager"); submitFlowToDagManagerHelper(flowGroup, flowName); } else { log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagActionType); @@ -191,19 +195,19 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) { this.orchestrator.submitFlowToDagManager(spec); } catch (URISyntaxException e) { log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage()); - this.unexpectedErrors.mark(); + this.failedFlowLaunchSubmissions.mark(); return; } catch (SpecNotFoundException e) { log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage()); - this.unexpectedErrors.mark(); + this.failedFlowLaunchSubmissions.mark(); return; } catch (IOException e) { log.warn("Failed to add Job Execution Plan for flowId {} due to exception {}", flowId, e.getMessage()); - this.unexpectedErrors.mark(); + this.failedFlowLaunchSubmissions.mark(); return; } catch (InterruptedException e) { log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e); - this.unexpectedErrors.mark(); + this.failedFlowLaunchSubmissions.mark(); return; } // Only mark this if the dag was successfully added @@ -216,8 +220,10 @@ protected void createMetrics() { this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED); this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED); this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED); + this.failedFlowLaunchSubmissions = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS); this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS); this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED); + this.messageFilteredOutMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT); this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue); this.getMetricContext().register(this.produceToConsumeDelayMillis); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java index d580704d2c1..67289267332 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java @@ -34,9 +34,9 @@ public class FlowTriggerHandlerTest { String cronExpressionSuffix = truncateFirstTwoFieldsOfCronExpression(cronExpression); int schedulerBackOffMillis = 10; DagActionStore.DagAction flowAction = new DagActionStore.DagAction("flowName", "flowGroup", - "999999", DagActionStore.FlowActionType.LAUNCH); + String.valueOf(eventToRevisit), DagActionStore.FlowActionType.LAUNCH); MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus = - new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, eventToRevisit, minimumLingerDurationMillis); + new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, minimumLingerDurationMillis); /** * Remove first two fields from cron expression representing seconds and minutes to return truncated cron expression