From e2c0fd8149a760527a74dd984a31c58e018e2df7 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 17 Oct 2023 17:37:20 -0700 Subject: [PATCH 1/4] Refactor dag action updating method & add clarifying comment --- .../apache/gobblin/runtime/api/DagActionStore.java | 7 +++---- .../runtime/api/MysqlMultiActiveLeaseArbiter.java | 6 +++--- .../api/MysqlMultiActiveLeaseArbiterTest.java | 4 ++-- .../modules/orchestration/FlowTriggerHandler.java | 12 +++++++----- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java index eb26acd161a..4f3442597fb 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java @@ -49,10 +49,9 @@ public FlowId getFlowId() { /** * Replace flow execution id with agreed upon event time to easily track the flow */ - public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction, - long eventTimeMillis) { - return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(), - String.valueOf(eventTimeMillis), flowAction.getFlowActionType()); + public DagAction updateFlowExecutionId(long eventTimeMillis) { + return new DagAction(this.getFlowGroup(), this.getFlowName(), + String.valueOf(eventTimeMillis), this.getFlowActionType()); } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index c6161d93695..338e908a2e0 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -321,14 +321,14 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l // Lease is valid if (leaseValidityStatus == 1) { if (isWithinEpsilon) { - DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime()); + DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbEventTimestamp.getTime()); log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid", updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db timestamp for reminder return new LeasedToAnotherStatus(updatedFlowAction, dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); } - DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime()); + DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime()); log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid", updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db lease acquisition timestamp for wait time @@ -518,7 +518,7 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated, if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) { return new NoLongerLeasingStatus(); } - DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis); + DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis); if (numRowsUpdated == 1) { log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis); diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java index 7bafc78ff34..08630ab3661 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java @@ -217,7 +217,7 @@ public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction); - DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction, + DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId( selectInfoResult.getEventTimeMillis()); boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus( updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get())); @@ -299,7 +299,7 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException, // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction); - DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction, + DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId( selectInfoResult.getEventTimeMillis()); boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus( updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get())); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index 8abaa209c25..c5a5bb8e0ee 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -112,8 +112,12 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo if (multiActiveLeaseArbiter.isPresent()) { MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease( flowAction, eventTimeMillis, isReminderEvent); + // The flow action contained in the`LeaseAttemptStatus` from the lease arbiter contains an updated flow execution + // id. From this point onwards, always use the newer version of the flow action to easily track the action through + // orchestration and execution. if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { - MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus; + MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) + leaseAttemptStatus; this.leaseObtainedCount.inc(); if (persistFlowAction(leaseObtainedStatus)) { log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(), @@ -122,11 +126,9 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo } // If persisting the flow action failed, then we set another trigger for this event to occur immediately to // re-attempt handling the event - DagActionStore.DagAction updatedFlowAction = DagActionStore.DagAction.updateFlowExecutionId(flowAction, - leaseObtainedStatus.getEventTimeMillis()); scheduleReminderForEvent(jobProps, - new MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L), - eventTimeMillis); + new MultiActiveLeaseArbiter.LeasedToAnotherStatus(leaseObtainedStatus.getFlowAction(), + 0L), eventTimeMillis); return; } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) { this.leasedToAnotherStatusCount.inc(); From 531bf09424ad9945d36ee082958662aab1db42da Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Thu, 19 Oct 2023 12:04:15 -0700 Subject: [PATCH 2/4] Log filtered out duplicate messages --- .../apache/gobblin/service/monitoring/ChangeMonitorUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java index 33934ef0644..a2d68fbc0d1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java @@ -35,7 +35,7 @@ public static boolean shouldProcessMessage(String changeIdentifier, LoadingCache String operation, String timestamp) { // If we've already processed a message with this timestamp and key before then skip duplicate message if (cache.getIfPresent(changeIdentifier) != null) { - log.debug("Duplicate change event with identifier {}", changeIdentifier); + log.info("Duplicate change event with identifier {}", changeIdentifier); return false; } From d2a9b9b04391830a9f3979ce75956e0b1abc4d5c Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 20 Oct 2023 12:49:59 -0700 Subject: [PATCH 3/4] logs and metrics for missing messages from change monitor --- .../org/apache/gobblin/runtime/kafka/HighLevelConsumer.java | 2 ++ .../java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java index 5e8daaa26a3..d09accff295 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java @@ -329,6 +329,8 @@ public void run() { } } } catch (InterruptedException e) { + log.warn("Encountered exception while processing queue ", e); + // TODO: evaluate whether we should interrupt the thread or continue processing Thread.currentThread().interrupt(); } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java index 8fc1258ab50..32b6eb9ad7e 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java @@ -26,7 +26,7 @@ public class RuntimeMetrics { // Metric names - public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ = + public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.kafka.highLevelConsumer.messagesRead"; public static final String GOBBLIN_JOB_MONITOR_KAFKA_TOTAL_SPECS = "gobblin.jobMonitor.kafka.totalSpecs"; public static final String GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS = "gobblin.jobMonitor.kafka.newSpecs"; From ec674622f65da13f0d2547be2a8ad2245fe6d6ee Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 20 Oct 2023 12:56:12 -0700 Subject: [PATCH 4/4] Only add gobblin.service prefix for dagActionStoreChangeMonitor --- .../org/apache/gobblin/runtime/kafka/HighLevelConsumer.java | 2 +- .../org/apache/gobblin/runtime/metrics/RuntimeMetrics.java | 2 +- .../service/monitoring/DagActionStoreChangeMonitor.java | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java index d09accff295..7b494201eca 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java @@ -95,7 +95,7 @@ public abstract class HighLevelConsumer extends AbstractIdleService { */ @Getter private MetricContext metricContext; - private Counter messagesRead; + protected Counter messagesRead; @Getter private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient; private final ScheduledExecutorService consumerExecutor; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java index 32b6eb9ad7e..8fc1258ab50 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java @@ -26,7 +26,7 @@ public class RuntimeMetrics { // Metric names - public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + + public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ = "gobblin.kafka.highLevelConsumer.messagesRead"; public static final String GOBBLIN_JOB_MONITOR_KAFKA_TOTAL_SPECS = "gobblin.jobMonitor.kafka.totalSpecs"; public static final String GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS = "gobblin.jobMonitor.kafka.newSpecs"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 1435e076ae1..e5a2d090d3a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -34,6 +34,7 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; +import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.DagActionStore; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.SpecNotFoundException; @@ -216,7 +217,7 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) { @Override protected void createMetrics() { - super.createMetrics(); + super.messagesRead = this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ); this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED); this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED); this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);