Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Improve the accuracy of model memory control #122

Merged
merged 7 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Improve behavior when there are abrupt changes in the seasonal components presen
Explicit change point detection and modelling ({pull}92[#92])
Improve partition analysis memory usage ({pull}97[#97])
Reduce model memory by storing state for periodicity testing in a compressed format ({pull}100[#100])
Improve the accuracy of model memory control ({pull}122[#122])

Forecasting of Machine Learning job time series is now supported for large jobs by temporarily storing
model state on disk ({pull}89[#89])
Expand Down
6 changes: 2 additions & 4 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,8 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
public:
using TPersistCompleteFunc =
std::function<void(const CModelSnapshotJsonWriter::SModelSnapshotReport&)>;
using TAnomalyDetectorPtr = model::CAnomalyDetector::TAnomalyDetectorPtr;
using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>;
using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>;
using TAnomalyDetectorPtrVecItr = std::vector<TAnomalyDetectorPtr>::iterator;
using TAnomalyDetectorPtrVecCItr = std::vector<TAnomalyDetectorPtr>::const_iterator;
using TKeyVec = std::vector<model::CSearchKey>;
using TKeyAnomalyDetectorPtrUMap =
boost::unordered_map<model::CSearchKey::TStrKeyPr, TAnomalyDetectorPtr, model::CStrKeyPrHash, model::CStrKeyPrEqual>;
Expand Down Expand Up @@ -359,7 +357,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! Update configuration
void doForecast(const std::string& controlMessage);

model::CAnomalyDetector::TAnomalyDetectorPtr
TAnomalyDetectorPtr
makeDetector(int identifier,
const model::CAnomalyDetectorModelConfig& modelConfig,
model::CLimits& limits,
Expand Down
2 changes: 1 addition & 1 deletion include/api/CForecastRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
using TOStreamConcurrentWrapper = core::CConcurrentWrapper<std::ostream>;
using TOStreamConcurrentWrapperPtr = std::shared_ptr<TOStreamConcurrentWrapper>;

using TAnomalyDetectorPtr = model::CAnomalyDetector::TAnomalyDetectorPtr;
using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>;
using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>;

using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper;
Expand Down
12 changes: 3 additions & 9 deletions include/model/CAnomalyDetector.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,9 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable {
using TStrVec = std::vector<std::string>;
using TStrCPtrVec = std::vector<const std::string*>;
using TModelPlotDataVec = std::vector<CModelPlotData>;

using TDataGathererPtr = std::shared_ptr<CDataGatherer>;
using TModelFactoryCPtr = std::shared_ptr<const CModelFactory>;
using TModelPtr = std::unique_ptr<CAnomalyDetectorModel>;

//! A shared pointer to an instance of this class
using TAnomalyDetectorPtr = std::shared_ptr<CAnomalyDetector>;

using TOutputModelPlotDataFunc =
std::function<void(const std::string&, const std::string&, const std::string&, const std::string&, const CModelPlotData&)>;
using TStrSet = CAnomalyDetectorModelConfig::TStrSet;
Expand Down Expand Up @@ -334,14 +329,13 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable {
//! in the model ensemble class.
void legacyModelsAcceptPersistInserter(core::CStatePersistInserter& inserter) const;

protected:
//! Configurable limits
CLimits& m_Limits;

private:
//! An identifier for the search for which this is detecting anomalies.
int m_DetectorIndex;

//! Configurable limits
CLimits& m_Limits;

//! Configurable behaviour
const CAnomalyDetectorModelConfig& m_ModelConfig;

Expand Down
40 changes: 9 additions & 31 deletions include/model/CDataGatherer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,36 +108,24 @@ class MODEL_EXPORT CDataGatherer {
using TSizeUInt64Pr = std::pair<std::size_t, uint64_t>;
using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>;
using TFeatureVec = model_t::TFeatureVec;
using TFeatureVecCItr = TFeatureVec::const_iterator;
using TSizeSizePr = std::pair<std::size_t, std::size_t>;
using TSizeSizePrUInt64Pr = std::pair<TSizeSizePr, uint64_t>;
using TSizeSizePrUInt64PrVec = std::vector<TSizeSizePrUInt64Pr>;
using TSizeSizePrUInt64UMap = boost::unordered_map<TSizeSizePr, uint64_t>;
using TSizeSizePrUInt64UMapItr = TSizeSizePrUInt64UMap::iterator;
using TSizeSizePrUInt64UMapCItr = TSizeSizePrUInt64UMap::const_iterator;
using TSizeSizePrUInt64UMapQueue = CBucketQueue<TSizeSizePrUInt64UMap>;
using TSizeSizePrUInt64UMapQueueItr = TSizeSizePrUInt64UMapQueue::iterator;
using TSizeSizePrUInt64UMapQueueCItr = TSizeSizePrUInt64UMapQueue::const_iterator;
using TSizeSizePrUInt64UMapQueueCRItr = TSizeSizePrUInt64UMapQueue::const_reverse_iterator;
using TSizeSizePrStoredStringPtrPrUInt64UMap = CBucketGatherer::TSizeSizePrStoredStringPtrPrUInt64UMap;
using TSizeSizePrStoredStringPtrPrUInt64UMapCItr =
TSizeSizePrStoredStringPtrPrUInt64UMap::const_iterator;
using TSizeSizePrStoredStringPtrPrUInt64UMapItr =
TSizeSizePrStoredStringPtrPrUInt64UMap::iterator;
using TSizeSizePrStoredStringPtrPrUInt64UMapVec =
std::vector<TSizeSizePrStoredStringPtrPrUInt64UMap>;
using TSizeSizePrStoredStringPtrPrUInt64UMapVecQueue =
CBucketQueue<TSizeSizePrStoredStringPtrPrUInt64UMapVec>;
using TSearchKeyCRef = boost::reference_wrapper<const CSearchKey>;
using TBucketGathererPVec = std::vector<CBucketGatherer*>;
using TBucketGathererPVecItr = TBucketGathererPVec::iterator;
using TBucketGathererPVecCItr = TBucketGathererPVec::const_iterator;
using TBucketGathererPtr = std::unique_ptr<CBucketGatherer>;
using TBucketGathererPtrVec = std::vector<TBucketGathererPtr>;
using TFeatureAnyPr = std::pair<model_t::EFeature, boost::any>;
using TFeatureAnyPrVec = std::vector<TFeatureAnyPr>;
using TMetricCategoryVec = std::vector<model_t::EMetricCategory>;
using TSampleCountsPtr = std::shared_ptr<CSampleCounts>;
using TSampleCountsPtr = std::unique_ptr<CSampleCounts>;
using TTimeVec = std::vector<core_t::TTime>;
using TTimeVecCItr = TTimeVec::const_iterator;

public:
//! The summary count indicating an explicit null record.
Expand All @@ -161,8 +149,6 @@ class MODEL_EXPORT CDataGatherer {
//! \param[in] modelParams The global configuration parameters.
//! \param[in] summaryCountFieldName If \p summaryMode is E_Manual
//! then this is the name of the field holding the summary count.
//! \param[in] partitionFieldName The name of the field which splits
//! the data.
//! \param[in] partitionFieldValue The value of the field which splits
//! the data.
//! \param[in] personFieldName The name of the field which identifies
Expand All @@ -173,8 +159,6 @@ class MODEL_EXPORT CDataGatherer {
//! the metric values.
//! \param[in] influenceFieldNames The field names for which we will
//! compute influences.
//! \param[in] useNull If true the gatherer will process missing
//! person and attribute field values (assuming they are empty).
//! \param[in] key The key of the search for which to gatherer data.
//! \param[in] features The features of the data to model.
//! \param[in] startTime The start of the time interval for which
Expand All @@ -187,13 +171,11 @@ class MODEL_EXPORT CDataGatherer {
model_t::ESummaryMode summaryMode,
const SModelParams& modelParams,
const std::string& summaryCountFieldName,
const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const std::string& personFieldName,
const std::string& attributeFieldName,
const std::string& valueFieldName,
const TStrVec& influenceFieldNames,
bool useNull,
const CSearchKey& key,
const TFeatureVec& features,
core_t::TTime startTime,
Expand All @@ -204,13 +186,11 @@ class MODEL_EXPORT CDataGatherer {
model_t::ESummaryMode summaryMode,
const SModelParams& modelParams,
const std::string& summaryCountFieldName,
const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const std::string& personFieldName,
const std::string& attributeFieldName,
const std::string& valueFieldName,
const TStrVec& influenceFieldNames,
bool useNull,
const CSearchKey& key,
core::CStateRestoreTraverser& traverser);

Expand All @@ -220,8 +200,9 @@ class MODEL_EXPORT CDataGatherer {
//! redundant except to create a signature that will not be mistaken for
//! a general purpose copy constructor.
CDataGatherer(bool isForPersistence, const CDataGatherer& other);

~CDataGatherer();
CDataGatherer(const CDataGatherer&) = delete;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a cool language feature!

CDataGatherer& operator=(const CDataGatherer&) = delete;
//@}

//! \name Persistence
Expand Down Expand Up @@ -546,7 +527,7 @@ class MODEL_EXPORT CDataGatherer {
void resetSampleCount(std::size_t id);

//! Get the sample counts.
TSampleCountsPtr sampleCounts() const;
const TSampleCountsPtr& sampleCounts() const;
//@}

//! \name Time
Expand Down Expand Up @@ -759,7 +740,7 @@ class MODEL_EXPORT CDataGatherer {

//! The collection of bucket gatherers which contain the bucket-specific
//! metrics and counts.
TBucketGathererPVec m_Gatherers;
TBucketGathererPtrVec m_Gatherers;

//! Indicates whether the data being gathered are already summarized
//! by an external aggregation process.
Expand All @@ -768,15 +749,12 @@ class MODEL_EXPORT CDataGatherer {
//! The global configuration parameters.
TModelParamsCRef m_Params;

//! The partition field name or an empty string if there isn't one.
std::string m_PartitionFieldName;
//! The key of the search for which data is being gathered.
TSearchKeyCRef m_SearchKey;

//! The value of the partition field for this detector.
core::CStoredStringPtr m_PartitionFieldValue;

//! The key of the search for which data is being gathered.
TSearchKeyCRef m_SearchKey;

//! A registry where person names are mapped to unique IDs.
CDynamicStringIdRegistry m_PeopleRegistry;

Expand Down
5 changes: 1 addition & 4 deletions include/model/CLimits.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ class MODEL_EXPORT CLimits {

public:
//! Default constructor
CLimits();

//! Default destructor
~CLimits();
explicit CLimits(double byteLimitMargin = CResourceMonitor::DEFAULT_BYTE_LIMIT_MARGIN);

//! Initialise from a config file. This overwrites current settings
//! with any found in the config file. Settings that are not present
Expand Down
33 changes: 26 additions & 7 deletions include/model/CResourceMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
#include <model/ImportExport.h>
#include <model/ModelTypes.h>

#include <boost/unordered_map.hpp>

#include <functional>
#include <map>

class CResourceMonitorTest;
class CResourceLimitTest;
Expand Down Expand Up @@ -42,20 +43,21 @@ class MODEL_EXPORT CResourceMonitor {
};

public:
using TModelPtrSizePr = std::pair<CAnomalyDetectorModel*, std::size_t>;
using TModelPtrSizeMap = std::map<CAnomalyDetectorModel*, std::size_t>;
using TDetectorPtrSizePr = std::pair<CAnomalyDetector*, std::size_t>;
using TDetectorPtrSizeUMap = boost::unordered_map<CAnomalyDetector*, std::size_t>;
using TMemoryUsageReporterFunc = std::function<void(const CResourceMonitor::SResults&)>;
using TTimeSizeMap = std::map<core_t::TTime, std::size_t>;

//! The minimum time between prunes
static const core_t::TTime MINIMUM_PRUNE_FREQUENCY;

//! Default memory limit for resource monitor
static const std::size_t DEFAULT_MEMORY_LIMIT_MB;
//! The initial byte limit margin to use if none is supplied
static const double DEFAULT_BYTE_LIMIT_MARGIN;

public:
//! Default constructor
CResourceMonitor();
explicit CResourceMonitor(double byteLimitMargin = DEFAULT_BYTE_LIMIT_MARGIN);

//! Query the resource monitor to find out if the models are
//! taking up too much memory and further allocations should be banned
Expand Down Expand Up @@ -127,13 +129,21 @@ class MODEL_EXPORT CResourceMonitor {
//! Clears all extra memory
void clearExtraMemory();

//! Decrease the margin on the memory limit.
//!
//! We start off applying a margin to the memory limit because
//! it is difficult to accurately estimate the long term memory
//! usage at this point. This is gradually decreased over time
//! by calling this pnce per bucket processed.
void decreaseMargin(core_t::TTime elapsedTime);

private:
//! Updates the memory limit fields and the prune threshold
//! to the given value.
void updateMemoryLimitsAndPruneThreshold(std::size_t limitMBs);

//! Update the given model and recalculate the total usage
void memUsage(CAnomalyDetectorModel* model);
void memUsage(CAnomalyDetector* detector);

//! Determine if we need to send a usage report, based on
//! increased usage, or increased errors
Expand All @@ -143,16 +153,25 @@ class MODEL_EXPORT CResourceMonitor {
//! shoule be allowed or not
void updateAllowAllocations();

//! Get the high memory limit with margin applied.
std::size_t highLimit() const;

//! Get the low memory limit with margin applied.
std::size_t lowLimit() const;

//! Returns the sum of used memory plus any extra memory
std::size_t totalMemory() const;

private:
//! The registered collection of components
TModelPtrSizeMap m_Models;
TDetectorPtrSizeUMap m_Detectors;

//! Is there enough free memory to allow creating new components
bool m_AllowAllocations;

//! The relative margin to apply to the byte limits.
double m_ByteLimitMargin;

//! The upper limit for memory usage, checked on increasing values
std::size_t m_ByteLimitHigh;

Expand Down
5 changes: 3 additions & 2 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ void CAnomalyJob::outputBucketResultsUntil(core_t::TTime time) {
lastBucketEndTime + bucketLength + latency <= time;
lastBucketEndTime += effectiveBucketLength) {
this->outputResults(lastBucketEndTime);
m_Limits.resourceMonitor().decreaseMargin(bucketLength);
m_Limits.resourceMonitor().sendMemoryUsageReportIfSignificantlyChanged(lastBucketEndTime);
m_LastFinalisedBucketEndTime = lastBucketEndTime + effectiveBucketLength;

Expand Down Expand Up @@ -1403,7 +1404,7 @@ CAnomalyJob::detectorForKey(bool isRestoring,
// Check if we need to and are allowed to create a new detector.
if (itr == m_Detectors.end() && resourceMonitor.areAllocationsAllowed()) {
// Create an placeholder for the anomaly detector.
model::CAnomalyDetector::TAnomalyDetectorPtr& detector =
TAnomalyDetectorPtr& detector =
m_Detectors
.emplace(model::CSearchKey::TStrKeyPr(partition, key), TAnomalyDetectorPtr())
.first->second;
Expand Down Expand Up @@ -1450,7 +1451,7 @@ void CAnomalyJob::pruneAllModels() {
}
}

model::CAnomalyDetector::TAnomalyDetectorPtr
CAnomalyJob::TAnomalyDetectorPtr
CAnomalyJob::makeDetector(int identifier,
const model::CAnomalyDetectorModelConfig& modelConfig,
model::CLimits& limits,
Expand Down
Loading