Skip to content

Commit

Permalink
[ML] Improve the accuracy of model memory control (elastic#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
tveasey committed Jun 14, 2018
1 parent 5e964e1 commit 52d1dcc
Show file tree
Hide file tree
Showing 35 changed files with 622 additions and 891 deletions.
1 change: 1 addition & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Improve behavior when there are abrupt changes in the seasonal components presen
Explicit change point detection and modelling ({pull}92[#92])
Improve partition analysis memory usage ({pull}97[#97])
Reduce model memory by storing state for periodicity testing in a compressed format ({pull}100[#100])
Improve the accuracy of model memory control ({pull}122[#122])

Forecasting of Machine Learning job time series is now supported for large jobs by temporarily storing
model state on disk ({pull}89[#89])
Expand Down
6 changes: 2 additions & 4 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,8 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
public:
using TPersistCompleteFunc =
std::function<void(const CModelSnapshotJsonWriter::SModelSnapshotReport&)>;
using TAnomalyDetectorPtr = model::CAnomalyDetector::TAnomalyDetectorPtr;
using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>;
using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>;
using TAnomalyDetectorPtrVecItr = std::vector<TAnomalyDetectorPtr>::iterator;
using TAnomalyDetectorPtrVecCItr = std::vector<TAnomalyDetectorPtr>::const_iterator;
using TKeyVec = std::vector<model::CSearchKey>;
using TKeyAnomalyDetectorPtrUMap =
boost::unordered_map<model::CSearchKey::TStrKeyPr, TAnomalyDetectorPtr, model::CStrKeyPrHash, model::CStrKeyPrEqual>;
Expand Down Expand Up @@ -359,7 +357,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! Update configuration
void doForecast(const std::string& controlMessage);

model::CAnomalyDetector::TAnomalyDetectorPtr
TAnomalyDetectorPtr
makeDetector(int identifier,
const model::CAnomalyDetectorModelConfig& modelConfig,
model::CLimits& limits,
Expand Down
2 changes: 1 addition & 1 deletion include/api/CForecastRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
using TOStreamConcurrentWrapper = core::CConcurrentWrapper<std::ostream>;
using TOStreamConcurrentWrapperPtr = std::shared_ptr<TOStreamConcurrentWrapper>;

using TAnomalyDetectorPtr = model::CAnomalyDetector::TAnomalyDetectorPtr;
using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>;
using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>;

using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper;
Expand Down
12 changes: 3 additions & 9 deletions include/model/CAnomalyDetector.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,9 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable {
using TStrVec = std::vector<std::string>;
using TStrCPtrVec = std::vector<const std::string*>;
using TModelPlotDataVec = std::vector<CModelPlotData>;

using TDataGathererPtr = std::shared_ptr<CDataGatherer>;
using TModelFactoryCPtr = std::shared_ptr<const CModelFactory>;
using TModelPtr = std::unique_ptr<CAnomalyDetectorModel>;

//! A shared pointer to an instance of this class
using TAnomalyDetectorPtr = std::shared_ptr<CAnomalyDetector>;

using TOutputModelPlotDataFunc =
std::function<void(const std::string&, const std::string&, const std::string&, const std::string&, const CModelPlotData&)>;
using TStrSet = CAnomalyDetectorModelConfig::TStrSet;
Expand Down Expand Up @@ -334,14 +329,13 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable {
//! in the model ensemble class.
void legacyModelsAcceptPersistInserter(core::CStatePersistInserter& inserter) const;

protected:
//! Configurable limits
CLimits& m_Limits;

private:
//! An identifier for the search for which this is detecting anomalies.
int m_DetectorIndex;

//! Configurable limits
CLimits& m_Limits;

//! Configurable behaviour
const CAnomalyDetectorModelConfig& m_ModelConfig;

Expand Down
40 changes: 9 additions & 31 deletions include/model/CDataGatherer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,36 +108,24 @@ class MODEL_EXPORT CDataGatherer {
using TSizeUInt64Pr = std::pair<std::size_t, uint64_t>;
using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>;
using TFeatureVec = model_t::TFeatureVec;
using TFeatureVecCItr = TFeatureVec::const_iterator;
using TSizeSizePr = std::pair<std::size_t, std::size_t>;
using TSizeSizePrUInt64Pr = std::pair<TSizeSizePr, uint64_t>;
using TSizeSizePrUInt64PrVec = std::vector<TSizeSizePrUInt64Pr>;
using TSizeSizePrUInt64UMap = boost::unordered_map<TSizeSizePr, uint64_t>;
using TSizeSizePrUInt64UMapItr = TSizeSizePrUInt64UMap::iterator;
using TSizeSizePrUInt64UMapCItr = TSizeSizePrUInt64UMap::const_iterator;
using TSizeSizePrUInt64UMapQueue = CBucketQueue<TSizeSizePrUInt64UMap>;
using TSizeSizePrUInt64UMapQueueItr = TSizeSizePrUInt64UMapQueue::iterator;
using TSizeSizePrUInt64UMapQueueCItr = TSizeSizePrUInt64UMapQueue::const_iterator;
using TSizeSizePrUInt64UMapQueueCRItr = TSizeSizePrUInt64UMapQueue::const_reverse_iterator;
using TSizeSizePrStoredStringPtrPrUInt64UMap = CBucketGatherer::TSizeSizePrStoredStringPtrPrUInt64UMap;
using TSizeSizePrStoredStringPtrPrUInt64UMapCItr =
TSizeSizePrStoredStringPtrPrUInt64UMap::const_iterator;
using TSizeSizePrStoredStringPtrPrUInt64UMapItr =
TSizeSizePrStoredStringPtrPrUInt64UMap::iterator;
using TSizeSizePrStoredStringPtrPrUInt64UMapVec =
std::vector<TSizeSizePrStoredStringPtrPrUInt64UMap>;
using TSizeSizePrStoredStringPtrPrUInt64UMapVecQueue =
CBucketQueue<TSizeSizePrStoredStringPtrPrUInt64UMapVec>;
using TSearchKeyCRef = boost::reference_wrapper<const CSearchKey>;
using TBucketGathererPVec = std::vector<CBucketGatherer*>;
using TBucketGathererPVecItr = TBucketGathererPVec::iterator;
using TBucketGathererPVecCItr = TBucketGathererPVec::const_iterator;
using TBucketGathererPtr = std::unique_ptr<CBucketGatherer>;
using TBucketGathererPtrVec = std::vector<TBucketGathererPtr>;
using TFeatureAnyPr = std::pair<model_t::EFeature, boost::any>;
using TFeatureAnyPrVec = std::vector<TFeatureAnyPr>;
using TMetricCategoryVec = std::vector<model_t::EMetricCategory>;
using TSampleCountsPtr = std::shared_ptr<CSampleCounts>;
using TSampleCountsPtr = std::unique_ptr<CSampleCounts>;
using TTimeVec = std::vector<core_t::TTime>;
using TTimeVecCItr = TTimeVec::const_iterator;

public:
//! The summary count indicating an explicit null record.
Expand All @@ -161,8 +149,6 @@ class MODEL_EXPORT CDataGatherer {
//! \param[in] modelParams The global configuration parameters.
//! \param[in] summaryCountFieldName If \p summaryMode is E_Manual
//! then this is the name of the field holding the summary count.
//! \param[in] partitionFieldName The name of the field which splits
//! the data.
//! \param[in] partitionFieldValue The value of the field which splits
//! the data.
//! \param[in] personFieldName The name of the field which identifies
Expand All @@ -173,8 +159,6 @@ class MODEL_EXPORT CDataGatherer {
//! the metric values.
//! \param[in] influenceFieldNames The field names for which we will
//! compute influences.
//! \param[in] useNull If true the gatherer will process missing
//! person and attribute field values (assuming they are empty).
//! \param[in] key The key of the search for which to gatherer data.
//! \param[in] features The features of the data to model.
//! \param[in] startTime The start of the time interval for which
Expand All @@ -187,13 +171,11 @@ class MODEL_EXPORT CDataGatherer {
model_t::ESummaryMode summaryMode,
const SModelParams& modelParams,
const std::string& summaryCountFieldName,
const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const std::string& personFieldName,
const std::string& attributeFieldName,
const std::string& valueFieldName,
const TStrVec& influenceFieldNames,
bool useNull,
const CSearchKey& key,
const TFeatureVec& features,
core_t::TTime startTime,
Expand All @@ -204,13 +186,11 @@ class MODEL_EXPORT CDataGatherer {
model_t::ESummaryMode summaryMode,
const SModelParams& modelParams,
const std::string& summaryCountFieldName,
const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const std::string& personFieldName,
const std::string& attributeFieldName,
const std::string& valueFieldName,
const TStrVec& influenceFieldNames,
bool useNull,
const CSearchKey& key,
core::CStateRestoreTraverser& traverser);

Expand All @@ -220,8 +200,9 @@ class MODEL_EXPORT CDataGatherer {
//! redundant except to create a signature that will not be mistaken for
//! a general purpose copy constructor.
CDataGatherer(bool isForPersistence, const CDataGatherer& other);

~CDataGatherer();
CDataGatherer(const CDataGatherer&) = delete;
CDataGatherer& operator=(const CDataGatherer&) = delete;
//@}

//! \name Persistence
Expand Down Expand Up @@ -546,7 +527,7 @@ class MODEL_EXPORT CDataGatherer {
void resetSampleCount(std::size_t id);

//! Get the sample counts.
TSampleCountsPtr sampleCounts() const;
const TSampleCountsPtr& sampleCounts() const;
//@}

//! \name Time
Expand Down Expand Up @@ -759,7 +740,7 @@ class MODEL_EXPORT CDataGatherer {

//! The collection of bucket gatherers which contain the bucket-specific
//! metrics and counts.
TBucketGathererPVec m_Gatherers;
TBucketGathererPtrVec m_Gatherers;

//! Indicates whether the data being gathered are already summarized
//! by an external aggregation process.
Expand All @@ -768,15 +749,12 @@ class MODEL_EXPORT CDataGatherer {
//! The global configuration parameters.
TModelParamsCRef m_Params;

//! The partition field name or an empty string if there isn't one.
std::string m_PartitionFieldName;
//! The key of the search for which data is being gathered.
TSearchKeyCRef m_SearchKey;

//! The value of the partition field for this detector.
core::CStoredStringPtr m_PartitionFieldValue;

//! The key of the search for which data is being gathered.
TSearchKeyCRef m_SearchKey;

//! A registry where person names are mapped to unique IDs.
CDynamicStringIdRegistry m_PeopleRegistry;

Expand Down
5 changes: 1 addition & 4 deletions include/model/CLimits.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ class MODEL_EXPORT CLimits {

public:
//! Default constructor
CLimits();

//! Default destructor
~CLimits();
explicit CLimits(double byteLimitMargin = CResourceMonitor::DEFAULT_BYTE_LIMIT_MARGIN);

//! Initialise from a config file. This overwrites current settings
//! with any found in the config file. Settings that are not present
Expand Down
33 changes: 26 additions & 7 deletions include/model/CResourceMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
#include <model/ImportExport.h>
#include <model/ModelTypes.h>

#include <boost/unordered_map.hpp>

#include <functional>
#include <map>

class CResourceMonitorTest;
class CResourceLimitTest;
Expand Down Expand Up @@ -42,20 +43,21 @@ class MODEL_EXPORT CResourceMonitor {
};

public:
using TModelPtrSizePr = std::pair<CAnomalyDetectorModel*, std::size_t>;
using TModelPtrSizeMap = std::map<CAnomalyDetectorModel*, std::size_t>;
using TDetectorPtrSizePr = std::pair<CAnomalyDetector*, std::size_t>;
using TDetectorPtrSizeUMap = boost::unordered_map<CAnomalyDetector*, std::size_t>;
using TMemoryUsageReporterFunc = std::function<void(const CResourceMonitor::SResults&)>;
using TTimeSizeMap = std::map<core_t::TTime, std::size_t>;

//! The minimum time between prunes
static const core_t::TTime MINIMUM_PRUNE_FREQUENCY;

//! Default memory limit for resource monitor
static const std::size_t DEFAULT_MEMORY_LIMIT_MB;
//! The initial byte limit margin to use if none is supplied
static const double DEFAULT_BYTE_LIMIT_MARGIN;

public:
//! Default constructor
CResourceMonitor();
explicit CResourceMonitor(double byteLimitMargin = DEFAULT_BYTE_LIMIT_MARGIN);

//! Query the resource monitor to find out if the models are
//! taking up too much memory and further allocations should be banned
Expand Down Expand Up @@ -127,13 +129,21 @@ class MODEL_EXPORT CResourceMonitor {
//! Clears all extra memory
void clearExtraMemory();

//! Decrease the margin on the memory limit.
//!
//! We start off applying a margin to the memory limit because
//! it is difficult to accurately estimate the long term memory
//! usage at this point. This is gradually decreased over time
//! by calling this pnce per bucket processed.
void decreaseMargin(core_t::TTime elapsedTime);

private:
//! Updates the memory limit fields and the prune threshold
//! to the given value.
void updateMemoryLimitsAndPruneThreshold(std::size_t limitMBs);

//! Update the given model and recalculate the total usage
void memUsage(CAnomalyDetectorModel* model);
void memUsage(CAnomalyDetector* detector);

//! Determine if we need to send a usage report, based on
//! increased usage, or increased errors
Expand All @@ -143,16 +153,25 @@ class MODEL_EXPORT CResourceMonitor {
//! shoule be allowed or not
void updateAllowAllocations();

//! Get the high memory limit with margin applied.
std::size_t highLimit() const;

//! Get the low memory limit with margin applied.
std::size_t lowLimit() const;

//! Returns the sum of used memory plus any extra memory
std::size_t totalMemory() const;

private:
//! The registered collection of components
TModelPtrSizeMap m_Models;
TDetectorPtrSizeUMap m_Detectors;

//! Is there enough free memory to allow creating new components
bool m_AllowAllocations;

//! The relative margin to apply to the byte limits.
double m_ByteLimitMargin;

//! The upper limit for memory usage, checked on increasing values
std::size_t m_ByteLimitHigh;

Expand Down
5 changes: 3 additions & 2 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ void CAnomalyJob::outputBucketResultsUntil(core_t::TTime time) {
lastBucketEndTime + bucketLength + latency <= time;
lastBucketEndTime += effectiveBucketLength) {
this->outputResults(lastBucketEndTime);
m_Limits.resourceMonitor().decreaseMargin(bucketLength);
m_Limits.resourceMonitor().sendMemoryUsageReportIfSignificantlyChanged(lastBucketEndTime);
m_LastFinalisedBucketEndTime = lastBucketEndTime + effectiveBucketLength;

Expand Down Expand Up @@ -1399,7 +1400,7 @@ CAnomalyJob::detectorForKey(bool isRestoring,
// Check if we need to and are allowed to create a new detector.
if (itr == m_Detectors.end() && resourceMonitor.areAllocationsAllowed()) {
// Create an placeholder for the anomaly detector.
model::CAnomalyDetector::TAnomalyDetectorPtr& detector =
TAnomalyDetectorPtr& detector =
m_Detectors
.emplace(model::CSearchKey::TStrKeyPr(partition, key), TAnomalyDetectorPtr())
.first->second;
Expand Down Expand Up @@ -1446,7 +1447,7 @@ void CAnomalyJob::pruneAllModels() {
}
}

model::CAnomalyDetector::TAnomalyDetectorPtr
CAnomalyJob::TAnomalyDetectorPtr
CAnomalyJob::makeDetector(int identifier,
const model::CAnomalyDetectorModelConfig& modelConfig,
model::CLimits& limits,
Expand Down
Loading

0 comments on commit 52d1dcc

Please sign in to comment.