Skip to content

Commit

Permalink
[ML] provide tmp storage for forecasting and possibly any ml native j…
Browse files Browse the repository at this point in the history
…obs #30399

This implementation lazily (on 1st forecast request) checks for available
diskspace and creates a subfolder for storing data outside of Lucene
indexes, but as part of the ES data paths.

Details:
 - tmp storage is managed and does not allow allocation if disk space is
   below a threshold (5GB at the moment)
 - tmp storage is supposed to be managed by the native component but in
   case this fails cleanup is provided:
    - on job close
    - on process crash
    - after node crash, on restart
 - available space is re-checked for every forecast call (the native
   component has to check again before writing)

Note: The 1st path that has enough space is chosen on job open (job
close/reopen triggers a new search)
  • Loading branch information
Hendrik Muhs committed May 18, 2018
1 parent 6bbd1b8 commit 5f16c92
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 26 deletions.
5 changes: 1 addition & 4 deletions x-pack/docs/en/ml/forecasting.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ For more information about any of these functions, see <<ml-functions>>.
* Forecasts run concurrently with real-time {ml} analysis. That is to say, {ml}
analysis does not stop while forecasts are generated. Forecasts can have an
impact on {ml} jobs, however, especially in terms of memory usage. For this
reason, forecasts run only if the model memory status is acceptable and the
snapshot models for the forecast do not require more than 20 MB. If these memory
limits are reached, consider splitting the job into multiple smaller jobs and
creating forecasts for these.
reason, forecasts run only if the model memory status is acceptable.
* The job must be open when you create a forecast. Otherwise, an error occurs.
* If there is insufficient data to generate any meaningful predictions, an
error occurs. In general, forecasts that are created early in the learning phase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ public List<Setting<?>> getSettings() {
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE));
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP));
}

public Settings additionalSettings() {
Expand Down Expand Up @@ -408,6 +409,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
// This object's constructor attaches to the license state, so there's no need to retain another reference to it
new InvalidLicenseEnforcer(settings, getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);

// run node startup tasks
autodetectProcessManager.onNodeStartup();

return Arrays.asList(
mlLifeCycleService,
jobProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -28,6 +30,7 @@
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.function.Consumer;

Expand All @@ -36,6 +39,8 @@
public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJobAction.Request,
ForecastJobAction.Response> {

private static final ByteSizeValue FORECAST_LOCAL_STORAGE_LIMIT = new ByteSizeValue(500, ByteSizeUnit.MB);

private final JobProvider jobProvider;
@Inject
public TransportForecastJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
Expand Down Expand Up @@ -73,6 +78,13 @@ protected void taskOperation(ForecastJobAction.Request request, TransportOpenJob
paramsBuilder.expiresIn(request.getExpiresIn());
}

// tmp storage might be null, we do not log here, because it might not be
// required
Path tmpStorage = processManager.tryGetTmpStorage(task, FORECAST_LOCAL_STORAGE_LIMIT);
if (tmpStorage != null) {
paramsBuilder.tmpStorage(tmpStorage.toString());
}

ForecastParams params = paramsBuilder.build();
processManager.forecastJob(task, params, e -> {
if (e == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.job.process;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

/**
* Provide storage for native components.
*/
public class NativeStorageProvider {

private static final Logger LOGGER = Loggers.getLogger(NativeStorageProvider.class);


private static final String LOCAL_STORAGE_SUBFOLDER = "ml-local-data";
private static final String LOCAL_STORAGE_TMP_FOLDER = "tmp";

private final Environment environment;

// do not allow any usage below this threshold
private final ByteSizeValue minLocalStorageAvailable;

public NativeStorageProvider(Environment environment, ByteSizeValue minDiskSpaceOffHeap) {
this.environment = environment;
this.minLocalStorageAvailable = minDiskSpaceOffHeap;
}

/**
* Removes any temporary storage leftovers.
*
* Removes all temp files and folder which might be there as a result of an
* unclean node shutdown or broken clients.
*
* Do not call while there are running jobs.
*
* @throws IOException if cleanup fails
*/
public void cleanupLocalTmpStorageInCaseOfUncleanShutdown() throws IOException {
for (Path p : environment.dataFiles()) {
IOUtils.rm(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER));
}
}

/**
* Tries to find local storage for storing temporary data.
*
* @param uniqueIdentifier An identifier to be used as sub folder
* @param requestedSize The maximum size required
* @return Path for temporary storage if available, null otherwise
*/
public Path tryGetLocalTmpStorage(String uniqueIdentifier, ByteSizeValue requestedSize) {
for (Path path : environment.dataFiles()) {
try {
if (getUsableSpace(path) >= requestedSize.getBytes() + minLocalStorageAvailable.getBytes()) {
Path tmpDirectory = path.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER).resolve(uniqueIdentifier);
Files.createDirectories(tmpDirectory);
return tmpDirectory;
}
} catch (IOException e) {
LOGGER.debug("Failed to obtain information about path [{}]: {}", path, e);
}

}
LOGGER.debug("Failed to find native storage for [{}], returning null", uniqueIdentifier);
return null;
}

public boolean localTmpStorageHasEnoughSpace(Path path, ByteSizeValue requestedSize) {
Path realPath = path.toAbsolutePath();
for (Path p : environment.dataFiles()) {
try {
if (realPath.startsWith(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER))) {
return getUsableSpace(p) >= requestedSize.getBytes() + minLocalStorageAvailable.getBytes();
}
} catch (IOException e) {
LOGGER.debug("Failed to optain information about path [{}]: {}", path, e);
}
}

LOGGER.debug("Not enough space left for path [{}]", path);
return false;
}

/**
* Delete temporary storage, previously allocated
*
* @param path
* Path to temporary storage
* @throws IOException
* if path can not be cleaned up
*/
public void cleanupLocalTmpStorage(Path path) throws IOException {
// do not allow to breakout from the tmp storage provided
Path realPath = path.toAbsolutePath();
for (Path p : environment.dataFiles()) {
if (realPath.startsWith(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER))) {
IOUtils.rm(path);
}
}
}

long getUsableSpace(Path path) throws IOException {
long freeSpaceInBytes = Environment.getFileStore(path).getUsableSpace();

/* See: https://bugs.openjdk.java.net/browse/JDK-8162520 */
if (freeSpaceInBytes < 0) {
freeSpaceInBytes = Long.MAX_VALUE;
}
return freeSpaceInBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
Expand All @@ -15,11 +16,12 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
Expand Down Expand Up @@ -47,6 +49,7 @@
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
Expand All @@ -59,6 +62,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Date;
Expand Down Expand Up @@ -96,6 +100,10 @@ public class AutodetectProcessManager extends AbstractComponent {
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", MAX_RUNNING_JOBS_PER_NODE, 1, Property.NodeScope);

// Undocumented setting for integration test purposes
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Property.NodeScope);

private final Client client;
private final Environment environment;
private final ThreadPool threadPool;
Expand All @@ -107,8 +115,12 @@ public class AutodetectProcessManager extends AbstractComponent {
private final JobResultsPersister jobResultsPersister;
private final JobDataCountsPersister jobDataCountsPersister;

private NativeStorageProvider nativeStorageProvider;
private final ConcurrentMap<Long, ProcessContext> processByAllocation = new ConcurrentHashMap<>();

// a map that manages the allocation of temporary space to jobs
private final ConcurrentMap<String, Path> nativeTmpStorage = new ConcurrentHashMap<>();

private final int maxAllowedRunningJobs;

private final NamedXContentRegistry xContentRegistry;
Expand All @@ -133,6 +145,15 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie
this.jobResultsPersister = jobResultsPersister;
this.jobDataCountsPersister = jobDataCountsPersister;
this.auditor = auditor;
this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
}

public void onNodeStartup() {
try {
nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();
} catch (Exception e) {
logger.warn("Failed to cleanup native storage from previous invocation", e);
}
}

public synchronized void closeAllJobsOnThisNode(String reason) throws IOException {
Expand Down Expand Up @@ -251,17 +272,40 @@ public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener<Flus
});
}

/**
* Request temporary storage to be used for the job
*
* @param jobTask The job task
* @param requestedSize requested size
* @return a Path to local storage or null if storage is not available
*/
public Path tryGetTmpStorage(JobTask jobTask, ByteSizeValue requestedSize) {
String jobId = jobTask.getJobId();
Path path = nativeTmpStorage.get(jobId);
if (path == null) {
path = nativeStorageProvider.tryGetLocalTmpStorage(jobId, requestedSize);
if (path != null) {
nativeTmpStorage.put(jobId, path);
}
} else if (!nativeStorageProvider.localTmpStorageHasEnoughSpace(path, requestedSize)) {
// the previous tmp location ran out of disk space, do not allow further usage
return null;
}
return path;
}

/**
* Do a forecast for the running job.
*
* @param jobTask The job task
* @param params Forecast parameters
*/
public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Exception> handler) {
logger.debug("Forecasting job {}", jobTask.getJobId());
String jobId = jobTask.getJobId();
logger.debug("Forecasting job {}", jobId);
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobTask.getJobId());
String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId);
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
Expand All @@ -271,7 +315,7 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
if (e == null) {
handler.accept(null);
} else {
String msg = String.format(Locale.ROOT, "[%s] exception while forecasting job", jobTask.getJobId());
String msg = String.format(Locale.ROOT, "[%s] exception while forecasting job", jobId);
logger.error(msg, e);
handler.accept(ExceptionsHelper.serverError(msg, e));
}
Expand Down Expand Up @@ -477,6 +521,11 @@ private Runnable onProcessCrash(JobTask jobTask) {
}
}
setJobState(jobTask, JobState.FAILED);
try {
removeTmpStorage(jobTask.getJobId());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobTask.getJobId()), e);
}
};
}

Expand Down Expand Up @@ -535,6 +584,12 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
// thread that gets into this method blocks until the first thread has finished closing the job
processContext.unlock();
}
// delete any tmp storage
try {
removeTmpStorage(jobId);
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e);
}
}

int numberOfOpenJobs() {
Expand Down Expand Up @@ -613,6 +668,13 @@ public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(JobTask jobTask
return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats()));
}

private void removeTmpStorage(String jobId) throws IOException {
Path path = nativeTmpStorage.get(jobId);
if (path != null) {
nativeStorageProvider.cleanupLocalTmpStorage(path);
}
}

ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
executorService.submit(autoDetectWorkerExecutor::start);
Expand Down
Loading

0 comments on commit 5f16c92

Please sign in to comment.