From 4c4c2a14cfafc8af1feee278618de273a8f5cdb5 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 10 Oct 2023 11:16:44 -0700 Subject: [PATCH 1/2] Fix Reminder Event Epsilon Comparison --- .../api/MysqlMultiActiveLeaseArbiter.java | 17 +++++++++++++---- .../api/MysqlMultiActiveLeaseArbiterTest.java | 13 +++++++++---- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 82950ef7fa8..0f69bbbd6a9 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -151,7 +151,7 @@ protected interface CheckedFunction { protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = "SELECT " + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, " + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, " - + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) / 1000 <= epsilon as is_within_epsilon, CASE " + + "TIMESTAMPDIFF(microsecond, event_timestamp, CONVERT_TZ(?, '+00:00', @@session.time_zone)) / 1000 <= epsilon as is_within_epsilon, CASE " + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 " + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 " + "ELSE 3 END as lease_validity_status, linger, " @@ -269,7 +269,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l log.info("Multi-active scheduler about to handle trigger event: [{}, is: {}, triggerEventTimestamp: {}]", flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis); // Query lease arbiter table about this flow action - Optional getResult = getExistingEventInfo(flowAction, isReminderEvent); + Optional getResult = getExistingEventInfo(flowAction, isReminderEvent, eventTimeMillis); // TODO: change all the `CASE N: ...` statements back to debug statements after uncovering issue try { @@ -304,6 +304,12 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l + "ensure monotonically increasing laundered event times.", flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp.getTime()); } + if (eventTimeMillis == dbEventTimestamp.getTime()) { + // TODO: change this to a debug after fixing issue + log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time" + + "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original", + eventTimeMillis, dbEventTimestamp); + } } log.info("Multi-active arbiter replacing local trigger event timestamp [{}, is: {}, triggerEventTimestamp: {}] " @@ -359,10 +365,13 @@ else if (leaseValidityStatus == 2) { * Checks leaseArbiterTable for an existing entry for this flow action and event time */ protected Optional getExistingEventInfo(DagActionStore.DagAction flowAction, - boolean isReminderEvent) throws IOException { + boolean isReminderEvent, long eventTimeMillis) throws IOException { return withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement, getInfoStatement -> { int i = 0; + if (isReminderEvent) { + getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis), UTC_CAL.get()); + } getInfoStatement.setString(++i, flowAction.getFlowGroup()); getInfoStatement.setString(++i, flowAction.getFlowName()); getInfoStatement.setString(++i, flowAction.getFlowActionType().toString()); @@ -511,7 +520,7 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated, return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis, selectInfoResult.getLeaseAcquisitionTimeMillis().get()); } - log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: ", + log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}", flowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated); // Another participant acquired lease in between return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(), diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java index 8f1fdf30c40..15090e8f14b 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java @@ -36,6 +36,7 @@ @Slf4j public class MysqlMultiActiveLeaseArbiterTest { private static final int EPSILON = 10000; + private static final int MORE_THAN_EPSILON = (int) (EPSILON * 1.1); private static final int LINGER = 50000; private static final String USER = "testUser"; private static final String PASSWORD = "testPassword"; @@ -117,7 +118,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception { // Tests CASE 3 of trying to acquire a lease for a distinct flow action event, while the previous event's lease is // valid // Allow enough time to pass for this trigger to be considered distinct, but not enough time so the lease expires - Thread.sleep(EPSILON * 3/2); + Thread.sleep(MORE_THAN_EPSILON); MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus = mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false); Assert.assertTrue(thirdLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus); @@ -147,7 +148,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception { // Tests CASE 6 of no longer leasing a distinct event in DB // Wait so this event is considered distinct and a new lease will be acquired - Thread.sleep(EPSILON * 3/2); + Thread.sleep(MORE_THAN_EPSILON); MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus = mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false); Assert.assertTrue(sixthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus); @@ -286,10 +287,12 @@ public void testReminderEventAcquireLeaseOnInvalidLease() throws IOException, In /* Tests calling `tryAcquireLease` for a reminder event whose lease has completed in the database and should return - `NoLongerLeasing` status + `NoLongerLeasing` status. + Note: that we wait for enough time to pass that the event would have been considered distinct for a non-reminder case + to ensure that the comparison made for reminder events is against the preserved event time not current time in db */ @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease") - public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException { + public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException, InterruptedException { // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction); @@ -297,6 +300,8 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException { resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get())); Assert.assertTrue(markedSuccess); + // Sleep enough time for the event to have been considered distinct + Thread.sleep(MORE_THAN_EPSILON); // Now have a reminder event check-in on the completed lease LeaseAttemptStatus attemptStatus = mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true); From af663be14d52774c602e39f97316673016b9e6bc Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 10 Oct 2023 13:05:59 -0700 Subject: [PATCH 2/2] Add TODO comment --- .../apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 0f69bbbd6a9..4c2e8d2da2b 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -299,6 +299,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l return new NoLongerLeasingStatus(); } if (eventTimeMillis > dbEventTimestamp.getTime()) { + // TODO: emit metric here to capture this unexpected behavior log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Severe constraint " + "violation encountered: a reminder event newer than db event was found when db laundering should " + "ensure monotonically increasing laundered event times.", flowAction,