Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1926] Fix Reminder Event Epsilon Comparison #3797

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected interface CheckedFunction<T, R> {
protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = "SELECT "
+ "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, "
+ "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, "
+ "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) / 1000 <= epsilon as is_within_epsilon, CASE "
+ "TIMESTAMPDIFF(microsecond, event_timestamp, CONVERT_TZ(?, '+00:00', @@session.time_zone)) / 1000 <= epsilon as is_within_epsilon, CASE "
+ "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 "
+ "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 "
+ "ELSE 3 END as lease_validity_status, linger, "
Expand Down Expand Up @@ -269,7 +269,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
log.info("Multi-active scheduler about to handle trigger event: [{}, is: {}, triggerEventTimestamp: {}]",
flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis);
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, isReminderEvent);
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, isReminderEvent, eventTimeMillis);

// TODO: change all the `CASE N: ...` statements back to debug statements after uncovering issue
try {
Expand Down Expand Up @@ -299,11 +299,18 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
return new NoLongerLeasingStatus();
}
if (eventTimeMillis > dbEventTimestamp.getTime()) {
// TODO: emit metric here to capture this unexpected behavior
log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Severe constraint "
+ "violation encountered: a reminder event newer than db event was found when db laundering should "
+ "ensure monotonically increasing laundered event times.", flowAction,
isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp.getTime());
}
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time"
+ "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis, dbEventTimestamp);
}
}

log.info("Multi-active arbiter replacing local trigger event timestamp [{}, is: {}, triggerEventTimestamp: {}] "
Expand Down Expand Up @@ -359,10 +366,13 @@ else if (leaseValidityStatus == 2) {
* Checks leaseArbiterTable for an existing entry for this flow action and event time
*/
protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAction flowAction,
boolean isReminderEvent) throws IOException {
boolean isReminderEvent, long eventTimeMillis) throws IOException {
return withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
if (isReminderEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not get the timestamp regardless if it's a reminder event or not? Seems useful

Copy link
Contributor Author

@umustafi umustafi Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is setting the timestamp in SELECT query for comparison to the db event timestamp only for reminder event. Otherwise for original events we do db laundering and use current timestamp in db.

getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis), UTC_CAL.get());
}
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowActionType().toString());
Expand Down Expand Up @@ -511,7 +521,7 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: ",
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}",
flowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
@Slf4j
public class MysqlMultiActiveLeaseArbiterTest {
private static final int EPSILON = 10000;
private static final int MORE_THAN_EPSILON = (int) (EPSILON * 1.1);
private static final int LINGER = 50000;
private static final String USER = "testUser";
private static final String PASSWORD = "testPassword";
Expand Down Expand Up @@ -117,7 +118,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 3 of trying to acquire a lease for a distinct flow action event, while the previous event's lease is
// valid
// Allow enough time to pass for this trigger to be considered distinct, but not enough time so the lease expires
Thread.sleep(EPSILON * 3/2);
Thread.sleep(MORE_THAN_EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(thirdLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
Expand Down Expand Up @@ -147,7 +148,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {

// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be acquired
Thread.sleep(EPSILON * 3/2);
Thread.sleep(MORE_THAN_EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(sixthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
Expand Down Expand Up @@ -286,17 +287,21 @@ public void testReminderEventAcquireLeaseOnInvalidLease() throws IOException, In

/*
Tests calling `tryAcquireLease` for a reminder event whose lease has completed in the database and should return
`NoLongerLeasing` status
`NoLongerLeasing` status.
Note: that we wait for enough time to pass that the event would have been considered distinct for a non-reminder case
to ensure that the comparison made for reminder events is against the preserved event time not current time in db
*/
@Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease")
public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException {
public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException, InterruptedException {
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);

// Sleep enough time for the event to have been considered distinct
Thread.sleep(MORE_THAN_EPSILON);
// Now have a reminder event check-in on the completed lease
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true);
Expand Down
Loading