Skip to content

Commit

Permalink
[GOBBLIN-1930] Improve Multi-active related logs and metrics (#3800)
Browse files Browse the repository at this point in the history
* Improve Multi-active related logs and metrics

* Add more metrics and logs around forwarding dag action to DagManager

* Improve logs in response to review comments

* Replace flow execution id with trigger timestamp from multi-active

* Update flow action execution id within lease arbiter

* Fix test & make Lease Statuses more lean

* Update javadoc

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>
  • Loading branch information
umustafi and Urmi Mustafi authored Oct 17, 2023
1 parent 9b254b6 commit eeb5142
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class ServiceMetricNames {
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";

// DagManager Related Metrics
public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + ".dagManager";
public static final String
DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount";
public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount";

//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ class DagAction {
public FlowId getFlowId() {
return new FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
}

/**
* Replace flow execution id with agreed upon event time to easily track the flow
*/
public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction,
long eventTimeMillis) {
return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(),
String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,41 @@ abstract class LeaseAttemptStatus {}
class NoLongerLeasingStatus extends LeaseAttemptStatus {}

/*
The participant calling this method acquired the lease for the event in question. The class contains the
`eventTimestamp` associated with the lease as well as the time the caller obtained the lease or
`leaseAcquisitionTimestamp`.
The participant calling this method acquired the lease for the event in question. `Flow action`'s flow execution id
is the timestamp associated with the lease and the time the caller obtained the lease is stored within the
`leaseAcquisitionTimestamp` field.
*/
@Data
class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
private final long eventTimestamp;
private final long leaseAcquisitionTimestamp;

/**
* @return event time in millis since epoch for the event of this lease acquisition
*/
public long getEventTimeMillis() {
return Long.parseLong(flowAction.getFlowExecutionId());
}
}

/*
This flow action event already has a valid lease owned by another participant.
`eventTimeMillis` is the timestamp the lease is associated with, which may be a different timestamp for the same flow
action corresponding to the same instance of the event or a distinct one.
`Flow action`'s flow execution id is the timestamp the lease is associated with, however the flow action event it
corresponds to may be a different and distinct occurrence of the same event.
`minimumLingerDurationMillis` is the minimum amount of time to wait before this participant should return to check if
the lease has completed or expired
*/
@Data
class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
private final long eventTimeMillis;
private final long minimumLingerDurationMillis;

/**
* Returns event time in millis since epoch for the event whose lease was obtained by another participant.
* @return
*/
public long getEventTimeMillis() {
return Long.parseLong(flowAction.getFlowExecutionId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;

import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;


/**
* MySQL based implementation of the {@link MultiActiveLeaseArbiter} which uses a MySQL store to resolve ownership of
Expand Down Expand Up @@ -242,7 +244,6 @@ private void runRetentionOnArbitrationTable() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
Thread.sleep(10000);
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
retentionStatement.setLong(1, retentionPeriodMillis);
Expand All @@ -253,7 +254,7 @@ private void runRetentionOnArbitrationTable() {
}
return numRowsDeleted;
}, true);
} catch (InterruptedException | IOException e) {
} catch (IOException e) {
log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
+ "affect our system performance. Examine exception: ", e);
}
Expand Down Expand Up @@ -307,7 +308,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
}
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time"
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
+ "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis, dbEventTimestamp);
}
Expand All @@ -320,16 +321,18 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(flowAction, dbEventTimestamp.getTime(),
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
return new LeasedToAnotherStatus(flowAction, dbCurrentTimestamp.getTime(),
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
Expand Down Expand Up @@ -515,16 +518,16 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", flowAction,
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
return new LeaseObtainedStatus(updatedFlowAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}",
flowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
return new LeasedToAnotherStatus(updatedFlowAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get() + selectInfoResult.getDbLinger()
- (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}
Expand Down Expand Up @@ -599,22 +602,22 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowActionType.toString());
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL.get());
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimeMillis()), UTC_CAL.get());
updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get());
int numRowsUpdated = updateStatement.executeUpdate();
if (numRowsUpdated == 0) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because "
+ "lease expired or event cleaned up before host completed required actions", flowAction,
status.getEventTimestamp());
status.getEventTimeMillis());
return false;
}
if( numRowsUpdated == 1) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - COMPLETED, no longer leasing"
+ " this event after this.", flowAction, status.getEventTimestamp());
+ " this event after this.", flowAction, status.getEventTimeMillis());
return true;
};
throw new IOException(String.format("Attempt to complete lease use: [%s, eventTimestamp: %s] - updated more "
+ "rows than expected", flowAction, status.getEventTimestamp()));
+ "rows than expected", flowAction, status.getEventTimeMillis()));
}, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed";
public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.flows.launched";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.produce.to.consume.delay";
public static final String DAG_ACTION_STORE_MONITOR_PREFIX = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";

public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
Expand Down
Loading

0 comments on commit eeb5142

Please sign in to comment.