Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit metric to tune LeaseArbiter Linger metric #3824

Merged
merged 4 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ public class ConfigurationKeys {
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = "-1";
public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 2000;
// Note: linger should be on the order of seconds even though we measure in millis
public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 90000;
public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ public class ServiceMetricNames {
public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.delay";

// Flow Trigger Handler
public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler";
public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted";
public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained";
public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother";
public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing";
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";
public static final String FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasesObtainedDueToReminderCount";
public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler.";
public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + "numFlowsSubmitted";
public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leaseObtained";
public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leasedToAnother";
public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "noLongerLeasing";
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "failedToSetReminderCount";
public static final String FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leasesObtainedDueToReminderCount";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_RECORD_LEASE_SUCCESS_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "failedtoRecordLeaseSuccessCount";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

above on line 46, looks like we capitalize 'To' within camel-case

public static final String FLOW_TRIGGER_HANDLER_COMPLETED_RECORD_LEASE_SUCCESS_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "completedRecordLeaseSuccessCount";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: recordedLeaseSuccessCount?


// DagManager Related Metrics
public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,9 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, isReminderEvent, eventTimeMillis);

// TODO: change all the `CASE N: ...` statements back to debug statements after uncovering issue
try {
if (!getResult.isPresent()) {
log.info("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 1: no existing row for this flow action,"
log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 1: no existing row for this flow action,"
+ " then go ahead and insert", flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis);
int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.empty(), isReminderEvent);
Expand All @@ -265,7 +264,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
// because db laundering tells us that the currently worked on db event is newer and will have its own reminders
if (isReminderEvent) {
if (eventTimeMillis < dbEventTimestamp.getTime()) {
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - A new event trigger "
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - A new event trigger "
+ "is being worked on, so this older reminder will be dropped.", flowAction,
isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp);
return new NoLongerLeasingStatus();
Expand All @@ -278,8 +277,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp.getTime());
}
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
+ "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis, dbEventTimestamp);
}
Expand All @@ -293,21 +291,21 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbEventTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of "
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of "
+ "whether same or distinct event)", flowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
if (isWithinEpsilon && !isReminderEvent) {
Expand All @@ -321,11 +319,11 @@ else if (leaseValidityStatus == 2) {
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp), isReminderEvent);
} // No longer leasing this event
if (isWithinEpsilon) {
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event"
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event"
+ " in db", flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
return new NoLongerLeasingStatus();
}
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing "
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing "
+ "event in db", flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db eventTimestamp and NULL leaseAcquisitionTimestamp
int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class FlowTriggerHandler {
private ContextAwareCounter jobDoesNotExistInSchedulerCount;
private ContextAwareCounter failedToSetEventReminderCount;
private ContextAwareMeter leasesObtainedDueToReminderCount;
private ContextAwareMeter failedToRecordLeaseSuccessCount;
private ContextAwareMeter completedRecordLeaseSuccessCount;

@Inject
public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
Expand All @@ -98,6 +100,8 @@ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> lease
this.jobDoesNotExistInSchedulerCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT);
this.failedToSetEventReminderCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT);
this.leasesObtainedDueToReminderCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT);
this.failedToRecordLeaseSuccessCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_RECORD_LEASE_SUCCESS_COUNT);
this.completedRecordLeaseSuccessCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_COMPLETED_RECORD_LEASE_SUCCESS_COUNT);
}

/**
Expand Down Expand Up @@ -127,8 +131,10 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(),
leaseObtainedStatus.getEventTimeMillis());
this.completedRecordLeaseSuccessCount.mark();
return;
}
this.failedToRecordLeaseSuccessCount.mark();
// If persisting the flow action failed, then we set another trigger for this event to occur immediately to
// re-attempt handling the event
scheduleReminderForEvent(jobProps,
Expand Down
Loading