Skip to content

Commit

Permalink
[ML] Rate limit established model memory updates (#31768)
Browse files Browse the repository at this point in the history
There is at most one model size stats document per bucket, but
during lookback a job can churn through many buckets very quickly.
This can lead to many cluster state updates if established model
memory needs to be updated for a given model size stats document.

This change rate limits established model memory updates to one
per job per 5 seconds.  This is done by scheduling the updates 5
seconds in the future, but replacing the value to be written if
another model size stats document is received during the waiting
period.  Updating the values in arrears like this means that the
last value received will be the one associated with the job in the
long term, whereas alternative approaches such as not updating the
value if a new value was close to the old value would not.
  • Loading branch information
droberts195 committed Jul 4, 2018
1 parent a1121e7 commit b7550ab
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
Expand All @@ -30,6 +33,7 @@
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
Expand All @@ -43,6 +47,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -71,6 +76,13 @@ public class AutoDetectResultProcessor {

private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class);

/**
* This is how far behind real-time we'll update the job with the latest established model memory.
* If more updates are received during the delay period then they'll take precedence.
* As a result there will be at most one update of established model memory per delay period.
*/
private static final TimeValue ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY = TimeValue.timeValueSeconds(5);

private final Client client;
private final Auditor auditor;
private final String jobId;
Expand All @@ -90,8 +102,10 @@ public class AutoDetectResultProcessor {
* New model size stats are read as the process is running
*/
private volatile ModelSizeStats latestModelSizeStats;
private volatile Date latestDateForEstablishedModelMemoryCalc;
private volatile long latestEstablishedModelMemory;
private volatile boolean haveNewLatestModelSizeStats;
private Future<?> scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods

public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) {
Expand Down Expand Up @@ -148,6 +162,7 @@ public void process(AutodetectProcess process) {
}

LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
runEstablishedModelMemoryUpdate(true);
} catch (Exception e) {
failed = true;

Expand Down Expand Up @@ -194,15 +209,15 @@ void processResult(Context context, AutodetectResult result) {
// persist after deleting interim results in case the new
// results are also interim
context.bulkResultsPersister.persistBucket(bucket).executeRequest();
latestDateForEstablishedModelMemoryCalc = bucket.getTimestamp();
++bucketCount;

// if we haven't previously set established model memory, consider trying again after
// a reasonable amount of time has elapsed since the last model size stats update
// a reasonable number of buckets have elapsed since the last model size stats update
long minEstablishedTimespanMs = JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L;
if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0
&& bucket.getTimestamp().getTime() > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) {
persister.commitResultWrites(context.jobId);
updateEstablishedModelMemoryOnJob(bucket.getTimestamp(), latestModelSizeStats);
if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc.getTime()
> latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) {
scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY);
haveNewLatestModelSizeStats = false;
}
}
Expand Down Expand Up @@ -293,15 +308,14 @@ private void processModelSizeStats(Context context, ModelSizeStats modelSizeStat
persister.persistModelSizeStats(modelSizeStats);
notifyModelMemoryStatusChange(context, modelSizeStats);
latestModelSizeStats = modelSizeStats;
latestDateForEstablishedModelMemoryCalc = modelSizeStats.getTimestamp();
haveNewLatestModelSizeStats = true;

// This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets
// because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and
// we'll NEVER consider memory usage to be established during this period
if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) {
// We need to make all results written up to and including these stats available for the established memory calculation
persister.commitResultWrites(context.jobId);
updateEstablishedModelMemoryOnJob(modelSizeStats.getTimestamp(), modelSizeStats);
scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY);
}
}

Expand Down Expand Up @@ -348,26 +362,91 @@ public void onFailure(Exception e) {
});
}

private void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) {
jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> {
JobUpdate update = new JobUpdate.Builder(jobId)
.setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
updateRequest.setWaitForAck(false);

executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
latestEstablishedModelMemory = establishedModelMemory;
LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory);
}
/**
* The purpose of this method is to avoid saturating the cluster state update thread
* when a lookback job is churning through buckets very fast and the memory usage of
* the job is changing regularly. The idea is to only update the established model
* memory associated with the job a few seconds after the new value has been received.
* If more updates are received during the delay period then they simply replace the
* value that originally caused the update to be scheduled. This rate limits cluster
* state updates due to established model memory changing to one per job per delay period.
* (In reality updates will only occur this rapidly during lookback. During real-time
* operation the limit of one model size stats document per bucket will mean there is a
* maximum of one cluster state update per job per bucket, and usually the bucket span
* is 5 minutes or more.)
* @param delay The delay before updating established model memory.
*/
synchronized void scheduleEstablishedModelMemoryUpdate(TimeValue delay) {

@Override
public void onFailure(Exception e) {
LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + establishedModelMemory + "]",
e);
if (scheduledEstablishedModelMemoryUpdate == null) {
try {
scheduledEstablishedModelMemoryUpdate = client.threadPool().schedule(delay, MachineLearning.UTILITY_THREAD_POOL_NAME,
() -> runEstablishedModelMemoryUpdate(false));
LOGGER.trace("[{}] Scheduled established model memory update to run in [{}]", jobId, delay);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
LOGGER.debug("failed to schedule established model memory update; shutting down", e);
} else {
throw e;
}
});
}
}
}

/**
* This method is called from two places:
* - From the {@link Future} used for delayed updates
* - When shutting down this result processor
* When shutting down the result processor it's only necessary to do anything
* if an update has been scheduled, but we want to do the update immediately.
* Despite cancelling the scheduled update in this case, it's possible that
* it's already started running, in which case this method will get called
* twice in quick succession. But the second call will do nothing, as
* <code>scheduledEstablishedModelMemoryUpdate</code> will have been reset
* to <code>null</code> by the first call.
*/
private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting) {

if (scheduledEstablishedModelMemoryUpdate != null) {
if (cancelExisting) {
LOGGER.debug("[{}] Bringing forward previously scheduled established model memory update", jobId);
FutureUtils.cancel(scheduledEstablishedModelMemoryUpdate);
}
scheduledEstablishedModelMemoryUpdate = null;
updateEstablishedModelMemoryOnJob();
}
}

private void updateEstablishedModelMemoryOnJob() {

// Copy these before committing writes, so the calculation is done based on committed documents
Date latestBucketTimestamp = latestDateForEstablishedModelMemoryCalc;
ModelSizeStats modelSizeStatsForCalc = latestModelSizeStats;

// We need to make all results written up to and including these stats available for the established memory calculation
persister.commitResultWrites(jobId);

jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> {
if (latestEstablishedModelMemory != establishedModelMemory) {
JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
updateRequest.setWaitForAck(false);

executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest,
new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
latestEstablishedModelMemory = establishedModelMemory;
LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory);
}

@Override
public void onFailure(Exception e) {
LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" +
establishedModelMemory + "]", e);
}
});
}
}, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e));
}

Expand Down
Loading

0 comments on commit b7550ab

Please sign in to comment.