Skip to content

Commit

Permalink
Added a Second Retry Queue - Useful while fetching Spark Metrics (#314)
Browse files Browse the repository at this point in the history
Added logic for making a new retry queue in where jobs are added if the data for these jobs is not fetched within 3 retries of one minute each. It basically gives a job more time to fetch data before skipping it.
  • Loading branch information
skakker authored and akshayrai committed Jan 8, 2018
1 parent 8b46933 commit a384fcc
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 12 deletions.
4 changes: 4 additions & 0 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ public void run() {
if (_analyticJob != null && _analyticJob.retry()) {
logger.error("Add analytic job id [" + _analyticJob.getAppId() + "] into the retry list.");
_analyticJobGenerator.addIntoRetries(_analyticJob);
} else if (_analyticJob != null && _analyticJob.isSecondPhaseRetry()) {
//Putting the job into a second retry queue which fetches jobs after some interval. Some spark jobs may need more time than usual to process, hence the queue.
logger.error("Add analytic job id [" + _analyticJob.getAppId() + "] into the second retry list.");
_analyticJobGenerator.addIntoSecondRetryQueue(_analyticJob);
} else {
if (_analyticJob != null) {
MetricsController.markSkippedJob();
Expand Down
25 changes: 24 additions & 1 deletion app/com/linkedin/drelephant/analysis/AnalyticJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,24 @@ public class AnalyticJob {
private static final Logger logger = Logger.getLogger(AnalyticJob.class);

private static final String UNKNOWN_JOB_TYPE = "Unknown"; // The default job type when the data matches nothing.
private static final int _RETRY_LIMIT = 3; // Number of times a job needs to be tried before dropping
private static final int _RETRY_LIMIT = 3; // Number of times a job needs to be tried before going into second retry queue
private static final int _SECOND_RETRY_LIMIT = 5; // Number of times a job needs to be tried before dropping
private static final String EXCLUDE_JOBTYPE = "exclude_jobtypes_filter"; // excluded Job Types for heuristic


public boolean readyForSecondRetry() {
this._timeLeftToRetry = this._timeLeftToRetry - 1;
return (this._timeLeftToRetry <= 0);
}

public AnalyticJob setTimeToSecondRetry() {
this._timeLeftToRetry = (this._secondRetries) * 5;
return this;
}

private int _timeLeftToRetry;
private int _retries = 0;
private int _secondRetries = 0;
private ApplicationType _type;
private String _appId;
private String _name;
Expand Down Expand Up @@ -315,6 +329,15 @@ public AppResult getAnalysis() throws Exception {
return result;
}

/**
* Indicate this promise should be retried in the second phase.
*
* @return true if should retry, else false
*/
public boolean isSecondPhaseRetry(){
return (_secondRetries++) < _SECOND_RETRY_LIMIT;
}

/**
* Indicate this promise should retry itself again.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,12 @@ public List<AnalyticJob> fetchAnalyticJobs()
* @param job The job to add
*/
public void addIntoRetries(AnalyticJob job);

/**
* Add an AnalyticJob into the second retry list. This queue fetches jobs on greater intervals of time. Those jobs will be provided again via #fetchAnalyticJobs under
* the generator's decision.
*
* @param job The job to add
*/
public void addIntoSecondRetryQueue(AnalyticJob job);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,8 @@
import com.linkedin.drelephant.math.Statistics;
import controllers.MetricsController;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import models.AppResult;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -67,7 +62,9 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
private AuthenticatedURL _authenticatedURL;
private final ObjectMapper _objectMapper = new ObjectMapper();

private final Queue<AnalyticJob> _retryQueue = new ConcurrentLinkedQueue<AnalyticJob>();
private final Queue<AnalyticJob> _firstRetryQueue = new ConcurrentLinkedQueue<AnalyticJob>();

private final ArrayList<AnalyticJob> _secondRetryQueue = new ArrayList<AnalyticJob>();

public void updateResourceManagerAddresses() {
if (Boolean.valueOf(configuration.get(IS_RM_HA_ENABLED))) {
Expand Down Expand Up @@ -160,8 +157,17 @@ public List<AnalyticJob> fetchAnalyticJobs()
appList.addAll(failedApps);

// Append promises from the retry queue at the end of the list
while (!_retryQueue.isEmpty()) {
appList.add(_retryQueue.poll());
while (!_firstRetryQueue.isEmpty()) {
appList.add(_firstRetryQueue.poll());
}

Iterator iteratorSecondRetry = _secondRetryQueue.iterator();
while (iteratorSecondRetry.hasNext()) {
AnalyticJob job = (AnalyticJob) iteratorSecondRetry.next();
if(job.readyForSecondRetry()) {
appList.add(job);
iteratorSecondRetry.remove();
}
}

_lastTime = _currentTime;
Expand All @@ -170,12 +176,20 @@ public List<AnalyticJob> fetchAnalyticJobs()

@Override
public void addIntoRetries(AnalyticJob promise) {
_retryQueue.add(promise);
int retryQueueSize = _retryQueue.size();
_firstRetryQueue.add(promise);
int retryQueueSize = _firstRetryQueue.size();
MetricsController.setRetryQueueSize(retryQueueSize);
logger.info("Retry queue size is " + retryQueueSize);
}

@Override
public void addIntoSecondRetryQueue(AnalyticJob promise) {
_secondRetryQueue.add(promise.setTimeToSecondRetry());
int secondRetryQueueSize = _secondRetryQueue.size();
MetricsController.setSecondRetryQueueSize(secondRetryQueueSize);
logger.info("Second Retry queue size is " + secondRetryQueueSize);
}

/**
* Authenticate and update the token
*/
Expand Down
5 changes: 5 additions & 0 deletions app/controllers/MetricsController.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class MetricsController extends Controller {

private static int _queueSize = -1;
private static int _retryQueueSize = -1;
private static int _secondRetryQueueSize = -1;
private static Meter _skippedJobs;
private static Meter _processedJobs;
private static Histogram _jobProcessingTime;
Expand Down Expand Up @@ -239,4 +240,8 @@ public static Result healthcheck() {
return ok(Json.toJson(HEALTHCHECK_NOT_ENABLED));
}
}

public static void setSecondRetryQueueSize(int secondRetryQueueSize) {
_secondRetryQueueSize = secondRetryQueueSize;
}
}

0 comments on commit a384fcc

Please sign in to comment.