Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1930] Improve Multi-active related logs and metrics #3800

Merged
merged 7 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class ServiceMetricNames {
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";

// DagManager Related Metrics
public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + ".dagManager";
public static final String
DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount";
public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount";

//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ private void runRetentionOnArbitrationTable() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
Thread.sleep(10000);
umustafi marked this conversation as resolved.
Show resolved Hide resolved
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
retentionStatement.setLong(1, retentionPeriodMillis);
Expand All @@ -253,7 +252,7 @@ private void runRetentionOnArbitrationTable() {
}
return numRowsDeleted;
}, true);
} catch (InterruptedException | IOException e) {
} catch (IOException e) {
log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
+ "affect our system performance. Examine exception: ", e);
}
Expand Down Expand Up @@ -307,7 +306,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
}
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time"
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
+ "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis, dbEventTimestamp);
}
Expand All @@ -320,16 +319,18 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(flowAction, dbEventTimestamp.getTime(),
return new LeasedToAnotherStatus(updatedFlowAction, dbEventTimestamp.getTime(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each of the flowExecId updates uses the timestamp value that is also passed as the second arg to the LeasedToAnotherStatus or LeaseObtainedStatus ctor. seems potentially unnecessary for that object still to maintain both--do we want it to?

e.g. it could always implement:

long getEventTimestamp() {
  return this.dagAction.getFlowExecutionId();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
return new LeasedToAnotherStatus(flowAction, dbCurrentTimestamp.getTime(),
return new LeasedToAnotherStatus(updatedFlowAction, dbCurrentTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
Expand Down Expand Up @@ -515,20 +516,30 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", flowAction,
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
return new LeaseObtainedStatus(updatedFlowAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}",
flowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
return new LeasedToAnotherStatus(updatedFlowAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get() + selectInfoResult.getDbLinger()
- (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}

/**
* Replace flow execution id with agreed upon event time to easily track the flow
*/
protected static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction,
umustafi marked this conversation as resolved.
Show resolved Hide resolved
long eventTimeMillis) {
return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(),
String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
}

/**
* Complete the INSERT statement for a new flow action lease where the flow action is not present in the table
* @param statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed";
public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.flows.launched";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.produce.to.consume.delay";
public static final String DAG_ACTION_STORE_MONITOR_PREFIX = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";

public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ protected void startUp() {
* Note this should only be called from the {@link Orchestrator} or {@link org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
*/
public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus) throws IOException {
// TODO: Used to track missing dag issue, remove later as needed
log.info("Add dag (persist: {}, setStatus: {}): {}", persist, setStatus, dag);
if (!isActive) {
log.warn("Skipping add dag because this instance of DagManager is not active for dag: {}", dag);
return;
Expand Down Expand Up @@ -509,14 +511,19 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
// Upon handling the action, delete it so on leadership change this is not duplicated
this.dagActionStore.get().deleteDagAction(launchAction);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {} due to exception {}", flowId, e.getMessage());
log.warn(String.format("Could not create URI object for flowId %s due to exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (SpecNotFoundException e) {
log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage());
log.warn(String.format("Spec not found for flowId %s due to exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (IOException e) {
log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag action from dagActionStore (check "
+ "stacktrace) due to exception {}", flowId, e.getMessage());
log.warn(String.format("Failed to add Job Execution Plan for flowId %s OR delete dag action from dagActionStore "
+ "(check stacktrace) due to exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (InterruptedException e) {
log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e);
log.warn(String.format("SpecCompiler failed to reach healthy state before compilation of flowId %s due to "
+ "exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
}
}

Expand Down Expand Up @@ -620,6 +627,8 @@ public void run() {
}
//Initialize dag.
initialize(dag);
} else {
log.warn("Null dag despite non-empty queue; ignoring the dag");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class DagManagerMetrics {
private final Map<String, ContextAwareMeter> executorStartSlaExceededMeters = Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorSlaExceededMeters = Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorJobSentMeters = Maps.newConcurrentMap();

// Metrics for unexpected flow handling failures
private ContextAwareCounter failedLaunchEventsOnActivationCount;
MetricContext metricContext;

public DagManagerMetrics(MetricContext metricContext) {
Expand All @@ -100,6 +103,9 @@ public void activate() {
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter(
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT));
}
}

Expand Down Expand Up @@ -199,6 +205,13 @@ public void incrementCountsStartSlaExceeded(Dag.DagNode<JobExecutionPlan> node)
}
}

// Increment the count for num of failed launches during leader activation
public void incrementFailedLaunchCount() {
if (this.metricContext != null) {
this.failedLaunchEventsOnActivationCount.inc();
}
}

private List<ContextAwareCounter> getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
Config configs = dagNode.getValue().getJobSpec().getConfig();
String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.service.modules.orchestration;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -93,6 +94,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private Optional<Meter> flowOrchestrationFailedMeter;
@Getter
private Optional<Timer> flowOrchestrationTimer;
private Optional<Counter> flowFailedForwardToDagManagerCounter;
@Setter
private FlowStatusGenerator flowStatusGenerator;

Expand Down Expand Up @@ -137,12 +139,14 @@ public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Op
this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
this.flowFailedForwardToDagManagerCounter = Optional.of(this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT));
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
} else {
this.metricContext = null;
this.flowOrchestrationSuccessFulMeter = Optional.absent();
this.flowOrchestrationFailedMeter = Optional.absent();
this.flowOrchestrationTimer = Optional.absent();
this.flowFailedForwardToDagManagerCounter = Optional.absent();
this.eventSubmitter = Optional.absent();
}
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
Expand Down Expand Up @@ -337,6 +341,7 @@ public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, Interr
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
} else {
_log.warn("Flow: {} submitted to dagManager failed to compile and produce a job execution plan dag", flowSpec);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
}
}
Expand All @@ -347,9 +352,13 @@ public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> jobE
//Send the dag to the DagManager.
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
_log.warn("Orchestrator call - " + failureMessage, ex);
if (this.flowFailedForwardToDagManagerCounter.isPresent()) {
this.flowFailedForwardToDagManagerCounter.get().inc();
}
if (this.eventSubmitter.isPresent()) {
// pronounce failed before stack unwinds, to ensure flow not marooned in `COMPILED` state; (failure likely attributable to DB connection/failover)
String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
Expand Down
Loading
Loading