Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1930] Improve Multi-active related logs and metrics #3800

Merged
merged 7 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class ServiceMetricNames {
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";

// DagManager Related Metrics
public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + ".dagManager";
public static final String
DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount";
public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount";

//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ class DagAction {
public FlowId getFlowId() {
return new FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
}

/**
* Replace flow execution id with agreed upon event time to easily track the flow
*/
public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would better belong as a method of the DagAction class defined on line 39

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is included as a method of the class, it's static because it returns a new dagAction obj

long eventTimeMillis) {
return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(),
String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,41 @@ abstract class LeaseAttemptStatus {}
class NoLongerLeasingStatus extends LeaseAttemptStatus {}

/*
The participant calling this method acquired the lease for the event in question. The class contains the
`eventTimestamp` associated with the lease as well as the time the caller obtained the lease or
`leaseAcquisitionTimestamp`.
The participant calling this method acquired the lease for the event in question. `Flow action`'s flow execution id
is the timestamp associated with the lease and the time the caller obtained the lease is stored within the
`leaseAcquisitionTimestamp` field.
*/
@Data
class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
private final long eventTimestamp;
private final long leaseAcquisitionTimestamp;

/**
* @return event time in millis since epoch for the event of this lease acquisition
*/
public long getEventTimeMillis() {
return Long.parseLong(flowAction.getFlowExecutionId());
}
}

/*
This flow action event already has a valid lease owned by another participant.
`eventTimeMillis` is the timestamp the lease is associated with, which may be a different timestamp for the same flow
action corresponding to the same instance of the event or a distinct one.
`Flow action`'s flow execution id is the timestamp the lease is associated with, however the flow action event it
corresponds to may be a different and distinct occurrence of the same event.
`minimumLingerDurationMillis` is the minimum amount of time to wait before this participant should return to check if
the lease has completed or expired
*/
@Data
class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
private final long eventTimeMillis;
private final long minimumLingerDurationMillis;

/**
* Returns event time in millis since epoch for the event whose lease was obtained by another participant.
* @return
*/
public long getEventTimeMillis() {
return Long.parseLong(flowAction.getFlowExecutionId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;

import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;


/**
* MySQL based implementation of the {@link MultiActiveLeaseArbiter} which uses a MySQL store to resolve ownership of
Expand Down Expand Up @@ -242,7 +244,6 @@ private void runRetentionOnArbitrationTable() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
Thread.sleep(10000);
umustafi marked this conversation as resolved.
Show resolved Hide resolved
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
retentionStatement.setLong(1, retentionPeriodMillis);
Expand All @@ -253,7 +254,7 @@ private void runRetentionOnArbitrationTable() {
}
return numRowsDeleted;
}, true);
} catch (InterruptedException | IOException e) {
} catch (IOException e) {
log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
+ "affect our system performance. Examine exception: ", e);
}
Expand Down Expand Up @@ -307,7 +308,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
}
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time"
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
+ "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis, dbEventTimestamp);
}
Expand All @@ -320,16 +321,18 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(flowAction, dbEventTimestamp.getTime(),
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
return new LeasedToAnotherStatus(flowAction, dbCurrentTimestamp.getTime(),
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
Expand Down Expand Up @@ -515,16 +518,16 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", flowAction,
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
return new LeaseObtainedStatus(updatedFlowAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}",
flowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
return new LeasedToAnotherStatus(updatedFlowAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get() + selectInfoResult.getDbLinger()
- (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}
Expand Down Expand Up @@ -599,22 +602,22 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowActionType.toString());
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL.get());
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimeMillis()), UTC_CAL.get());
updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get());
int numRowsUpdated = updateStatement.executeUpdate();
if (numRowsUpdated == 0) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because "
+ "lease expired or event cleaned up before host completed required actions", flowAction,
status.getEventTimestamp());
status.getEventTimeMillis());
return false;
}
if( numRowsUpdated == 1) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - COMPLETED, no longer leasing"
+ " this event after this.", flowAction, status.getEventTimestamp());
+ " this event after this.", flowAction, status.getEventTimeMillis());
return true;
};
throw new IOException(String.format("Attempt to complete lease use: [%s, eventTimestamp: %s] - updated more "
+ "rows than expected", flowAction, status.getEventTimestamp()));
+ "rows than expected", flowAction, status.getEventTimeMillis()));
}, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed";
public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.flows.launched";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.produce.to.consume.delay";
public static final String DAG_ACTION_STORE_MONITOR_PREFIX = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";

public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
Expand Down
Loading
Loading