Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1930] Improve Multi-active related logs and metrics #3800

Merged
merged 7 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ class DagAction {
public FlowId getFlowId() {
return new FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
}

/**
* Replace flow execution id with agreed upon event time to easily track the flow
*/
public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would better belong as a method of the DagAction class defined on line 39

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is included as a method of the class, it's static because it returns a new dagAction obj

long eventTimeMillis) {
return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(),
String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,42 @@ abstract class LeaseAttemptStatus {}
class NoLongerLeasingStatus extends LeaseAttemptStatus {}

/*
The participant calling this method acquired the lease for the event in question. The class contains the
`eventTimestamp` associated with the lease as well as the time the caller obtained the lease or
`leaseAcquisitionTimestamp`.
The participant calling this method acquired the lease for the event in question. `Flow action`'s flow execution id
is the timestamp associated with the lease and the time the caller obtained the lease is stored within the
`leaseAcquisitionTimestamp` field.
*/
@Data
class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
private final long eventTimestamp;
private final long leaseAcquisitionTimestamp;

/**
* Returns event time in millis since epoch for the event of this lease acquisition.
* @return
umustafi marked this conversation as resolved.
Show resolved Hide resolved
*/
public long getEventTimeMillis() {
return Long.parseLong(flowAction.getFlowExecutionId());
}
}

/*
This flow action event already has a valid lease owned by another participant.
`eventTimeMillis` is the timestamp the lease is associated with, which may be a different timestamp for the same flow
action corresponding to the same instance of the event or a distinct one.
`Flow action`'s flow execution id is the timestamp the lease is associated with, which may be a different timestamp
for the same flow action corresponding to the same instance of the event or a distinct one.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhat confusing, "which may be a different timestamp for the same..." - reword?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried to reword lmk what u think

`minimumLingerDurationMillis` is the minimum amount of time to wait before this participant should return to check if
the lease has completed or expired
*/
@Data
class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
private final long eventTimeMillis;
private final long minimumLingerDurationMillis;

/**
* Returns event time in millis since epoch for the event whose lease was obtained by another participant.
* @return
*/
public long getEventTimeMillis() {
return Long.parseLong(flowAction.getFlowExecutionId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;

import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;


/**
* MySQL based implementation of the {@link MultiActiveLeaseArbiter} which uses a MySQL store to resolve ownership of
Expand Down Expand Up @@ -323,14 +325,14 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(updatedFlowAction, dbEventTimestamp.getTime(),
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
return new LeasedToAnotherStatus(updatedFlowAction, dbCurrentTimestamp.getTime(),
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
Expand Down Expand Up @@ -520,26 +522,16 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(updatedFlowAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
return new LeaseObtainedStatus(updatedFlowAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}",
updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(updatedFlowAction, selectInfoResult.getEventTimeMillis(),
return new LeasedToAnotherStatus(updatedFlowAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get() + selectInfoResult.getDbLinger()
- (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}

/**
* Replace flow execution id with agreed upon event time to easily track the flow
*/
protected static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction,
long eventTimeMillis) {
return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(),
String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
}

/**
* Complete the INSERT statement for a new flow action lease where the flow action is not present in the table
* @param statement
Expand Down Expand Up @@ -610,22 +602,22 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowActionType.toString());
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL.get());
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimeMillis()), UTC_CAL.get());
updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get());
int numRowsUpdated = updateStatement.executeUpdate();
if (numRowsUpdated == 0) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because "
+ "lease expired or event cleaned up before host completed required actions", flowAction,
status.getEventTimestamp());
status.getEventTimeMillis());
return false;
}
if( numRowsUpdated == 1) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - COMPLETED, no longer leasing"
+ " this event after this.", flowAction, status.getEventTimestamp());
+ " this event after this.", flowAction, status.getEventTimeMillis());
return true;
};
throw new IOException(String.format("Attempt to complete lease use: [%s, eventTimestamp: %s] - updated more "
+ "rows than expected", flowAction, status.getEventTimestamp()));
+ "rows than expected", flowAction, status.getEventTimeMillis()));
}, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
Assert.assertTrue(firstLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
Assert.assertTrue(firstObtainedStatus.getEventTimestamp() <=
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH)));
new DagActionStore.DagAction(flowGroup, flowName, String.valueOf(firstObtainedStatus.getEventTimeMillis()),
DagActionStore.FlowActionType.LAUNCH)));

// Verify that different DagAction types for the same flow can have leases at the same time
DagActionStore.DagAction killDagAction = new
Expand All @@ -102,7 +103,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
Assert.assertTrue(
killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimestamp());
killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimeMillis());

// Tests CASE 2 of acquire lease for a flow action event that already has a valid lease for the same event in db
// Very little time should have passed if this test directly follows the one above so this call will be considered
Expand All @@ -112,7 +113,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
Assert.assertTrue(secondLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertEquals(firstObtainedStatus.getEventTimestamp(), secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() > 0);

// Tests CASE 3 of trying to acquire a lease for a distinct flow action event, while the previous event's lease is
Expand All @@ -124,7 +125,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
Assert.assertTrue(thirdLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > firstObtainedStatus.getEventTimestamp());
Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > firstObtainedStatus.getEventTimeMillis());
Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() < LINGER);

// Tests CASE 4 of lease out of date
Expand All @@ -134,14 +135,14 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
Assert.assertTrue(fourthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
Assert.assertTrue(fourthObtainedStatus.getEventTimestamp() > eventTimeMillis + LINGER);
Assert.assertTrue(fourthObtainedStatus.getEventTimestamp()
Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis() > eventTimeMillis + LINGER);
Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis()
<= fourthObtainedStatus.getLeaseAcquisitionTimestamp());

// Tests CASE 5 of no longer leasing the same event in DB
// done immediately after previous lease obtainment so should be marked as the same event
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
Assert.assertTrue(System.currentTimeMillis() - fourthObtainedStatus.getEventTimestamp() < EPSILON);
Assert.assertTrue(System.currentTimeMillis() - fourthObtainedStatus.getEventTimeMillis() < EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(fifthLaunchStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus);
Expand All @@ -154,7 +155,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
Assert.assertTrue(sixthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
Assert.assertTrue(sixthObtainedStatus.getEventTimeMillis()
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}

Expand Down Expand Up @@ -216,8 +217,10 @@ public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
selectInfoResult.getEventTimeMillis());
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been completed and acquisition timestamp val is NULL
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, resumeDagAction,
Expand Down Expand Up @@ -281,7 +284,7 @@ public void testReminderEventAcquireLeaseOnInvalidLease() throws IOException, In
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true);
Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
Assert.assertTrue(obtainedStatus.getEventTimestamp() > selectInfoResult.getEventTimeMillis());
Assert.assertTrue(obtainedStatus.getEventTimeMillis() > selectInfoResult.getEventTimeMillis());
Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() > selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue());
}

Expand All @@ -296,8 +299,10 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException,
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
selectInfoResult.getEventTimeMillis());
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);

// Sleep enough time for the event to have been considered distinct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,15 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(),
leaseObtainedStatus.getEventTimestamp());
leaseObtainedStatus.getEventTimeMillis());
return;
}
// If persisting the flow action failed, then we set another trigger for this event to occur immediately to
// re-attempt handling the event
DagActionStore.DagAction updatedFlowAction = DagActionStore.DagAction.updateFlowExecutionId(flowAction,
leaseObtainedStatus.getEventTimeMillis());
scheduleReminderForEvent(jobProps,
new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, leaseObtainedStatus.getEventTimestamp(), 0L),
new MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L),
Comment on lines +125 to +128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this if we are using the event timetime millisecond, which should be synchronized across all hosts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do, the timestamp within leaseObtainedStatus is the agreed upon time that is synchronized across all hosts while the following param, eventTimeMillis (later called) triggerEventTimeMillis is local to the host and is only used for logging purposes to show us that we switch from local trigger to the synchronized trigger. Here we are just changing where it's being stored not the fact that we do update it.

eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public class FlowTriggerHandlerTest {
String cronExpressionSuffix = truncateFirstTwoFieldsOfCronExpression(cronExpression);
int schedulerBackOffMillis = 10;
DagActionStore.DagAction flowAction = new DagActionStore.DagAction("flowName", "flowGroup",
"999999", DagActionStore.FlowActionType.LAUNCH);
String.valueOf(eventToRevisit), DagActionStore.FlowActionType.LAUNCH);
MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus =
new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, eventToRevisit, minimumLingerDurationMillis);
new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, minimumLingerDurationMillis);

/**
* Remove first two fields from cron expression representing seconds and minutes to return truncated cron expression
Expand Down
Loading