Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Rate limit established model memory updates #31768

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -176,7 +177,7 @@ public void close(boolean restart, String reason) {
// In this case the original exception is spurious and highly misleading
throw ExceptionsHelper.conflictStatusException("Close job interrupted by kill request");
} else {
throw new ElasticsearchException(e);
throw FutureUtils.rethrowExecutionException(e);
}
} finally {
destroyCategorizationAnalyzer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
Expand All @@ -30,6 +33,7 @@
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
Expand All @@ -43,6 +47,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -71,6 +76,13 @@ public class AutoDetectResultProcessor {

private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class);

/**
* This is how far behind real-time we'll update the job with the latest established model memory.
* If more updates are received during the delay period then they'll take precedence.
* As a result there will be at most one update of established model memory per delay period.
*/
private static final TimeValue ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY = TimeValue.timeValueSeconds(5);

private final Client client;
private final Auditor auditor;
private final String jobId;
Expand All @@ -90,8 +102,10 @@ public class AutoDetectResultProcessor {
* New model size stats are read as the process is running
*/
private volatile ModelSizeStats latestModelSizeStats;
private volatile Date latestDateForEstablishedModelMemoryCalc;
private volatile long latestEstablishedModelMemory;
private volatile boolean haveNewLatestModelSizeStats;
private Future<?> scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods

public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) {
Expand Down Expand Up @@ -148,6 +162,7 @@ public void process(AutodetectProcess process) {
}

LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
runEstablishedModelMemoryUpdate(true);
} catch (Exception e) {
failed = true;

Expand Down Expand Up @@ -194,15 +209,15 @@ void processResult(Context context, AutodetectResult result) {
// persist after deleting interim results in case the new
// results are also interim
context.bulkResultsPersister.persistBucket(bucket).executeRequest();
latestDateForEstablishedModelMemoryCalc = bucket.getTimestamp();
++bucketCount;

// if we haven't previously set established model memory, consider trying again after
// a reasonable amount of time has elapsed since the last model size stats update
// a reasonable number of buckets have elapsed since the last model size stats update
long minEstablishedTimespanMs = JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L;
if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0
&& bucket.getTimestamp().getTime() > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) {
persister.commitResultWrites(context.jobId);
updateEstablishedModelMemoryOnJob(bucket.getTimestamp(), latestModelSizeStats);
if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc.getTime()
> latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) {
scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY);
haveNewLatestModelSizeStats = false;
}
}
Expand Down Expand Up @@ -293,15 +308,14 @@ private void processModelSizeStats(Context context, ModelSizeStats modelSizeStat
persister.persistModelSizeStats(modelSizeStats);
notifyModelMemoryStatusChange(context, modelSizeStats);
latestModelSizeStats = modelSizeStats;
latestDateForEstablishedModelMemoryCalc = modelSizeStats.getTimestamp();
haveNewLatestModelSizeStats = true;

// This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets
// because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and
// we'll NEVER consider memory usage to be established during this period
if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) {
// We need to make all results written up to and including these stats available for the established memory calculation
persister.commitResultWrites(context.jobId);
updateEstablishedModelMemoryOnJob(modelSizeStats.getTimestamp(), modelSizeStats);
scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY);
}
}

Expand Down Expand Up @@ -351,26 +365,91 @@ public void onFailure(Exception e) {
});
}

private void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) {
jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> {
JobUpdate update = new JobUpdate.Builder(jobId)
.setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
updateRequest.setWaitForAck(false);

executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
latestEstablishedModelMemory = establishedModelMemory;
LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory);
}
/**
* The purpose of this method is to avoid saturating the cluster state update thread
* when a lookback job is churning through buckets very fast and the memory usage of
* the job is changing regularly. The idea is to only update the established model
* memory associated with the job a few seconds after the new value has been received.
* If more updates are received during the delay period then they simply replace the
* value that originally caused the update to be scheduled. This rate limits cluster
* state updates due to established model memory changing to one per job per delay period.
* (In reality updates will only occur this rapidly during lookback. During real-time
* operation the limit of one model size stats document per bucket will mean there is a
* maximum of one cluster state update per job per bucket, and usually the bucket span
* is 5 minutes or more.)
* @param delay The delay before updating established model memory.
*/
synchronized void scheduleEstablishedModelMemoryUpdate(TimeValue delay) {

@Override
public void onFailure(Exception e) {
LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + establishedModelMemory + "]",
e);
if (scheduledEstablishedModelMemoryUpdate == null) {
try {
scheduledEstablishedModelMemoryUpdate = client.threadPool().schedule(delay, MachineLearning.UTILITY_THREAD_POOL_NAME,
() -> runEstablishedModelMemoryUpdate(false));
LOGGER.trace("[{}] Scheduled established model memory update to run in [{}]", jobId, delay);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
LOGGER.debug("failed to schedule established model memory update; shutting down", e);
} else {
throw e;
}
});
}
}
}

/**
* This method is called from two places:
* - From the {@link Future} used for delayed updates
* - When shutting down this result processor
* When shutting down the result processor it's only necessary to do anything
* if an update has been scheduled, but we want to do the update immediately.
* Despite cancelling the scheduled update in this case, it's possible that
* it's already started running, in which case this method will get called
* twice in quick succession. But the second call will do nothing, as
* <code>scheduledEstablishedModelMemoryUpdate</code> will have been reset
* to <code>null</code> by the first call.
*/
private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting) {

if (scheduledEstablishedModelMemoryUpdate != null) {
if (cancelExisting) {
LOGGER.debug("[{}] Bringing forward previously scheduled established model memory update", jobId);
FutureUtils.cancel(scheduledEstablishedModelMemoryUpdate);
}
scheduledEstablishedModelMemoryUpdate = null;
updateEstablishedModelMemoryOnJob();
}
}

private void updateEstablishedModelMemoryOnJob() {

// Copy these before committing writes, so the calculation is done based on committed documents
Date latestBucketTimestamp = latestDateForEstablishedModelMemoryCalc;
ModelSizeStats modelSizeStatsForCalc = latestModelSizeStats;

// We need to make all results written up to and including these stats available for the established memory calculation
persister.commitResultWrites(jobId);

jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> {
if (latestEstablishedModelMemory != establishedModelMemory) {
JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
updateRequest.setWaitForAck(false);

executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest,
new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
latestEstablishedModelMemory = establishedModelMemory;
LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory);
}

@Override
public void onFailure(Exception e) {
LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" +
establishedModelMemory + "]", e);
}
});
}
}, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e));
}

Expand Down
Loading