Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Improve the accuracy of model memory control #122

Merged
merged 7 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,8 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
public:
using TPersistCompleteFunc =
std::function<void(const CModelSnapshotJsonWriter::SModelSnapshotReport&)>;
using TAnomalyDetectorPtr = model::CAnomalyDetector::TAnomalyDetectorPtr;
using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>;
using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>;
using TAnomalyDetectorPtrVecItr = std::vector<TAnomalyDetectorPtr>::iterator;
using TAnomalyDetectorPtrVecCItr = std::vector<TAnomalyDetectorPtr>::const_iterator;
using TKeyVec = std::vector<model::CSearchKey>;
using TKeyAnomalyDetectorPtrUMap =
boost::unordered_map<model::CSearchKey::TStrKeyPr, TAnomalyDetectorPtr, model::CStrKeyPrHash, model::CStrKeyPrEqual>;
Expand Down Expand Up @@ -359,7 +357,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! Update configuration
void doForecast(const std::string& controlMessage);

model::CAnomalyDetector::TAnomalyDetectorPtr
TAnomalyDetectorPtr
makeDetector(int identifier,
const model::CAnomalyDetectorModelConfig& modelConfig,
model::CLimits& limits,
Expand Down
2 changes: 1 addition & 1 deletion include/api/CForecastRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
using TOStreamConcurrentWrapper = core::CConcurrentWrapper<std::ostream>;
using TOStreamConcurrentWrapperPtr = std::shared_ptr<TOStreamConcurrentWrapper>;

using TAnomalyDetectorPtr = model::CAnomalyDetector::TAnomalyDetectorPtr;
using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>;
using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>;

using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper;
Expand Down
12 changes: 3 additions & 9 deletions include/model/CAnomalyDetector.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,9 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable {
using TStrVec = std::vector<std::string>;
using TStrCPtrVec = std::vector<const std::string*>;
using TModelPlotDataVec = std::vector<CModelPlotData>;

using TDataGathererPtr = std::shared_ptr<CDataGatherer>;
using TModelFactoryCPtr = std::shared_ptr<const CModelFactory>;
using TModelPtr = std::unique_ptr<CAnomalyDetectorModel>;

//! A shared pointer to an instance of this class
using TAnomalyDetectorPtr = std::shared_ptr<CAnomalyDetector>;

using TOutputModelPlotDataFunc =
std::function<void(const std::string&, const std::string&, const std::string&, const std::string&, const CModelPlotData&)>;
using TStrSet = CAnomalyDetectorModelConfig::TStrSet;
Expand Down Expand Up @@ -334,14 +329,13 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable {
//! in the model ensemble class.
void legacyModelsAcceptPersistInserter(core::CStatePersistInserter& inserter) const;

protected:
//! Configurable limits
CLimits& m_Limits;

private:
//! An identifier for the search for which this is detecting anomalies.
int m_DetectorIndex;

//! Configurable limits
CLimits& m_Limits;

//! Configurable behaviour
const CAnomalyDetectorModelConfig& m_ModelConfig;

Expand Down
40 changes: 9 additions & 31 deletions include/model/CDataGatherer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,36 +108,24 @@ class MODEL_EXPORT CDataGatherer {
using TSizeUInt64Pr = std::pair<std::size_t, uint64_t>;
using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>;
using TFeatureVec = model_t::TFeatureVec;
using TFeatureVecCItr = TFeatureVec::const_iterator;
using TSizeSizePr = std::pair<std::size_t, std::size_t>;
using TSizeSizePrUInt64Pr = std::pair<TSizeSizePr, uint64_t>;
using TSizeSizePrUInt64PrVec = std::vector<TSizeSizePrUInt64Pr>;
using TSizeSizePrUInt64UMap = boost::unordered_map<TSizeSizePr, uint64_t>;
using TSizeSizePrUInt64UMapItr = TSizeSizePrUInt64UMap::iterator;
using TSizeSizePrUInt64UMapCItr = TSizeSizePrUInt64UMap::const_iterator;
using TSizeSizePrUInt64UMapQueue = CBucketQueue<TSizeSizePrUInt64UMap>;
using TSizeSizePrUInt64UMapQueueItr = TSizeSizePrUInt64UMapQueue::iterator;
using TSizeSizePrUInt64UMapQueueCItr = TSizeSizePrUInt64UMapQueue::const_iterator;
using TSizeSizePrUInt64UMapQueueCRItr = TSizeSizePrUInt64UMapQueue::const_reverse_iterator;
using TSizeSizePrStoredStringPtrPrUInt64UMap = CBucketGatherer::TSizeSizePrStoredStringPtrPrUInt64UMap;
using TSizeSizePrStoredStringPtrPrUInt64UMapCItr =
TSizeSizePrStoredStringPtrPrUInt64UMap::const_iterator;
using TSizeSizePrStoredStringPtrPrUInt64UMapItr =
TSizeSizePrStoredStringPtrPrUInt64UMap::iterator;
using TSizeSizePrStoredStringPtrPrUInt64UMapVec =
std::vector<TSizeSizePrStoredStringPtrPrUInt64UMap>;
using TSizeSizePrStoredStringPtrPrUInt64UMapVecQueue =
CBucketQueue<TSizeSizePrStoredStringPtrPrUInt64UMapVec>;
using TSearchKeyCRef = boost::reference_wrapper<const CSearchKey>;
using TBucketGathererPVec = std::vector<CBucketGatherer*>;
using TBucketGathererPVecItr = TBucketGathererPVec::iterator;
using TBucketGathererPVecCItr = TBucketGathererPVec::const_iterator;
using TBucketGathererPtr = std::unique_ptr<CBucketGatherer>;
using TBucketGathererPtrVec = std::vector<TBucketGathererPtr>;
using TFeatureAnyPr = std::pair<model_t::EFeature, boost::any>;
using TFeatureAnyPrVec = std::vector<TFeatureAnyPr>;
using TMetricCategoryVec = std::vector<model_t::EMetricCategory>;
using TSampleCountsPtr = std::shared_ptr<CSampleCounts>;
using TSampleCountsPtr = std::unique_ptr<CSampleCounts>;
using TTimeVec = std::vector<core_t::TTime>;
using TTimeVecCItr = TTimeVec::const_iterator;

public:
//! The summary count indicating an explicit null record.
Expand All @@ -161,8 +149,6 @@ class MODEL_EXPORT CDataGatherer {
//! \param[in] modelParams The global configuration parameters.
//! \param[in] summaryCountFieldName If \p summaryMode is E_Manual
//! then this is the name of the field holding the summary count.
//! \param[in] partitionFieldName The name of the field which splits
//! the data.
//! \param[in] partitionFieldValue The value of the field which splits
//! the data.
//! \param[in] personFieldName The name of the field which identifies
Expand All @@ -173,8 +159,6 @@ class MODEL_EXPORT CDataGatherer {
//! the metric values.
//! \param[in] influenceFieldNames The field names for which we will
//! compute influences.
//! \param[in] useNull If true the gatherer will process missing
//! person and attribute field values (assuming they are empty).
//! \param[in] key The key of the search for which to gatherer data.
//! \param[in] features The features of the data to model.
//! \param[in] startTime The start of the time interval for which
Expand All @@ -187,13 +171,11 @@ class MODEL_EXPORT CDataGatherer {
model_t::ESummaryMode summaryMode,
const SModelParams& modelParams,
const std::string& summaryCountFieldName,
const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const std::string& personFieldName,
const std::string& attributeFieldName,
const std::string& valueFieldName,
const TStrVec& influenceFieldNames,
bool useNull,
const CSearchKey& key,
const TFeatureVec& features,
core_t::TTime startTime,
Expand All @@ -204,13 +186,11 @@ class MODEL_EXPORT CDataGatherer {
model_t::ESummaryMode summaryMode,
const SModelParams& modelParams,
const std::string& summaryCountFieldName,
const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const std::string& personFieldName,
const std::string& attributeFieldName,
const std::string& valueFieldName,
const TStrVec& influenceFieldNames,
bool useNull,
const CSearchKey& key,
core::CStateRestoreTraverser& traverser);

Expand All @@ -220,8 +200,9 @@ class MODEL_EXPORT CDataGatherer {
//! redundant except to create a signature that will not be mistaken for
//! a general purpose copy constructor.
CDataGatherer(bool isForPersistence, const CDataGatherer& other);

~CDataGatherer();
CDataGatherer(const CDataGatherer&) = delete;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a cool language feature!

CDataGatherer& operator=(const CDataGatherer&) = delete;
//@}

//! \name Persistence
Expand Down Expand Up @@ -546,7 +527,7 @@ class MODEL_EXPORT CDataGatherer {
void resetSampleCount(std::size_t id);

//! Get the sample counts.
TSampleCountsPtr sampleCounts() const;
const TSampleCountsPtr& sampleCounts() const;
//@}

//! \name Time
Expand Down Expand Up @@ -759,7 +740,7 @@ class MODEL_EXPORT CDataGatherer {

//! The collection of bucket gatherers which contain the bucket-specific
//! metrics and counts.
TBucketGathererPVec m_Gatherers;
TBucketGathererPtrVec m_Gatherers;

//! Indicates whether the data being gathered are already summarized
//! by an external aggregation process.
Expand All @@ -768,15 +749,12 @@ class MODEL_EXPORT CDataGatherer {
//! The global configuration parameters.
TModelParamsCRef m_Params;

//! The partition field name or an empty string if there isn't one.
std::string m_PartitionFieldName;
//! The key of the search for which data is being gathered.
TSearchKeyCRef m_SearchKey;

//! The value of the partition field for this detector.
core::CStoredStringPtr m_PartitionFieldValue;

//! The key of the search for which data is being gathered.
TSearchKeyCRef m_SearchKey;

//! A registry where person names are mapped to unique IDs.
CDynamicStringIdRegistry m_PeopleRegistry;

Expand Down
28 changes: 23 additions & 5 deletions include/model/CResourceMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
#include <model/ImportExport.h>
#include <model/ModelTypes.h>

#include <boost/unordered_map.hpp>

#include <functional>
#include <map>

class CResourceMonitorTest;
class CResourceLimitTest;
Expand Down Expand Up @@ -42,8 +43,8 @@ class MODEL_EXPORT CResourceMonitor {
};

public:
using TModelPtrSizePr = std::pair<CAnomalyDetectorModel*, std::size_t>;
using TModelPtrSizeMap = std::map<CAnomalyDetectorModel*, std::size_t>;
using TDetectorPtrSizePr = std::pair<CAnomalyDetector*, std::size_t>;
using TDetectorPtrSizeUMap = boost::unordered_map<CAnomalyDetector*, std::size_t>;
using TMemoryUsageReporterFunc = std::function<void(const CResourceMonitor::SResults&)>;
using TTimeSizeMap = std::map<core_t::TTime, std::size_t>;

Expand Down Expand Up @@ -127,13 +128,21 @@ class MODEL_EXPORT CResourceMonitor {
//! Clears all extra memory
void clearExtraMemory();

//! Decrease the margin on the memory limit.
//!
//! We start off applying a margin to the memory limit because
//! it is difficult to accurately estimate the long term memory
//! usage at this point. This is gradually decreased over time
//! by calling this pnce per bucket processed.
void decreaseMargin();

private:
//! Updates the memory limit fields and the prune threshold
//! to the given value.
void updateMemoryLimitsAndPruneThreshold(std::size_t limitMBs);

//! Update the given model and recalculate the total usage
void memUsage(CAnomalyDetectorModel* model);
void memUsage(CAnomalyDetector* detector);

//! Determine if we need to send a usage report, based on
//! increased usage, or increased errors
Expand All @@ -143,16 +152,25 @@ class MODEL_EXPORT CResourceMonitor {
//! shoule be allowed or not
void updateAllowAllocations();

//! Get the high memory limit with margin applied.
std::size_t highLimit() const;

//! Get the low memory limit with margin applied.
std::size_t lowLimit() const;

//! Returns the sum of used memory plus any extra memory
std::size_t totalMemory() const;

private:
//! The registered collection of components
TModelPtrSizeMap m_Models;
TDetectorPtrSizeUMap m_Detectors;

//! Is there enough free memory to allow creating new components
bool m_AllowAllocations;

//! The relative margin to apply to the byte limits.
double m_ByteLimitMargin;

//! The upper limit for memory usage, checked on increasing values
std::size_t m_ByteLimitHigh;

Expand Down
5 changes: 3 additions & 2 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ void CAnomalyJob::outputBucketResultsUntil(core_t::TTime time) {
lastBucketEndTime + bucketLength + latency <= time;
lastBucketEndTime += effectiveBucketLength) {
this->outputResults(lastBucketEndTime);
m_Limits.resourceMonitor().decreaseMargin();
m_Limits.resourceMonitor().sendMemoryUsageReportIfSignificantlyChanged(lastBucketEndTime);
m_LastFinalisedBucketEndTime = lastBucketEndTime + effectiveBucketLength;

Expand Down Expand Up @@ -1403,7 +1404,7 @@ CAnomalyJob::detectorForKey(bool isRestoring,
// Check if we need to and are allowed to create a new detector.
if (itr == m_Detectors.end() && resourceMonitor.areAllocationsAllowed()) {
// Create an placeholder for the anomaly detector.
model::CAnomalyDetector::TAnomalyDetectorPtr& detector =
TAnomalyDetectorPtr& detector =
m_Detectors
.emplace(model::CSearchKey::TStrKeyPr(partition, key), TAnomalyDetectorPtr())
.first->second;
Expand Down Expand Up @@ -1450,7 +1451,7 @@ void CAnomalyJob::pruneAllModels() {
}
}

model::CAnomalyDetector::TAnomalyDetectorPtr
CAnomalyJob::TAnomalyDetectorPtr
CAnomalyJob::makeDetector(int identifier,
const model::CAnomalyDetectorModelConfig& modelConfig,
model::CLimits& limits,
Expand Down
6 changes: 3 additions & 3 deletions lib/api/unittest/CAnomalyJobLimitTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ void CAnomalyJobLimitTest::testModelledEntityCountForFixedMemoryLimit() {
LOG_DEBUG(<< "# partition = " << used.s_PartitionFields);
LOG_DEBUG(<< "Memory status = " << used.s_MemoryStatus);
LOG_DEBUG(<< "Memory usage = " << used.s_Usage);
CPPUNIT_ASSERT(used.s_ByFields > 820 && used.s_ByFields < 980);
CPPUNIT_ASSERT(used.s_ByFields > 700 && used.s_ByFields < 860);
CPPUNIT_ASSERT_EQUAL(std::size_t(2), used.s_PartitionFields);
}

Expand Down Expand Up @@ -433,7 +433,7 @@ void CAnomalyJobLimitTest::testModelledEntityCountForFixedMemoryLimit() {
LOG_DEBUG(<< "# partition = " << used.s_PartitionFields);
LOG_DEBUG(<< "Memory status = " << used.s_MemoryStatus);
LOG_DEBUG(<< "Memory usage = " << used.s_Usage);
CPPUNIT_ASSERT(used.s_PartitionFields > 390 && used.s_PartitionFields < 470);
CPPUNIT_ASSERT(used.s_PartitionFields > 430 && used.s_PartitionFields < 510);
CPPUNIT_ASSERT(static_cast<double>(used.s_ByFields) >
0.95 * static_cast<double>(used.s_PartitionFields));
}
Expand Down Expand Up @@ -475,6 +475,6 @@ void CAnomalyJobLimitTest::testModelledEntityCountForFixedMemoryLimit() {
LOG_DEBUG(<< "# over = " << used.s_OverFields);
LOG_DEBUG(<< "Memory status = " << used.s_MemoryStatus);
LOG_DEBUG(<< "Memory usage = " << used.s_Usage);
CPPUNIT_ASSERT(used.s_OverFields > 10000 && used.s_OverFields < 12000);
CPPUNIT_ASSERT(used.s_OverFields > 8500 && used.s_OverFields < 10500);
}
}
2 changes: 1 addition & 1 deletion lib/maths/CTimeSeriesDecompositionDetail.cc
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ std::size_t CTimeSeriesDecompositionDetail::CPeriodicityTest::extraMemoryOnIniti
if (result == 0) {
for (auto i : {E_Short, E_Long}) {
TExpandingWindowPtr window(this->newWindow(i, false));
result += core::CMemory::dynamicSize(window);
result += 0.3 * core::CMemory::dynamicSize(window);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we only compress on persist?

Copy link
Contributor Author

@tveasey tveasey Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, no. As of #100, we compress the raw bytes of some this object we actually hold in memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I recall you saying we'd do that but I missed the fact it's already in. Cool.

}
}
return result;
Expand Down
11 changes: 5 additions & 6 deletions lib/model/CAnomalyDetector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <model/CForecastModelPersist.h>
#include <model/CModelDetailsView.h>
#include <model/CModelPlotData.h>
#include <model/CSampleCounts.h>
#include <model/CSearchKey.h>

#include <boost/bind.hpp>
Expand Down Expand Up @@ -99,7 +100,7 @@ CAnomalyDetector::CAnomalyDetector(int detectorIndex,
const std::string& partitionFieldValue,
core_t::TTime firstTime,
const TModelFactoryCPtr& modelFactory)
: m_Limits(limits), m_DetectorIndex(detectorIndex), m_ModelConfig(modelConfig),
: m_DetectorIndex(detectorIndex), m_Limits(limits), m_ModelConfig(modelConfig),
m_LastBucketEndTime(maths::CIntegerTools::ceil(firstTime, modelConfig.bucketLength())),
m_DataGatherer(makeDataGatherer(modelFactory, m_LastBucketEndTime, partitionFieldValue)),
m_ModelFactory(modelFactory),
Expand All @@ -120,7 +121,7 @@ CAnomalyDetector::CAnomalyDetector(int detectorIndex,
}

CAnomalyDetector::CAnomalyDetector(bool isForPersistence, const CAnomalyDetector& other)
: m_Limits(other.m_Limits), m_DetectorIndex(other.m_DetectorIndex),
: m_DetectorIndex(other.m_DetectorIndex), m_Limits(other.m_Limits),
m_ModelConfig(other.m_ModelConfig),
// Empty result function is fine in this case
// Empty result count function is fine in this case
Expand Down Expand Up @@ -612,14 +613,12 @@ void CAnomalyDetector::showMemoryUsage(std::ostream& stream) const {

void CAnomalyDetector::debugMemoryUsage(core::CMemoryUsage::TMemoryUsagePtr mem) const {
mem->setName("Anomaly Detector Memory Usage");
core::CMemoryDebug::dynamicSize("m_DataGatherer", m_DataGatherer, mem);
core::CMemoryDebug::dynamicSize("m_Model", m_Model, mem);
}

std::size_t CAnomalyDetector::memoryUsage() const {
// We only account for the model in CResourceMonitor,
// so we just include that here.
std::size_t mem = core::CMemory::dynamicSize(m_Model);
return mem;
return core::CMemory::dynamicSize(m_DataGatherer) + core::CMemory::dynamicSize(m_Model);
}

const core_t::TTime& CAnomalyDetector::lastBucketEndTime() const {
Expand Down
2 changes: 1 addition & 1 deletion lib/model/CAnomalyDetectorModel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ CAnomalyDetectorModel::CAnomalyDetectorModel(bool isForPersistence,
: // The copy of m_DataGatherer is a shallow copy. This would be unacceptable
// if we were going to persist the data gatherer from within this class.
// We don't, so that's OK, but the next issue is that another thread will be
// modifying the data gatherer m_DataGatherer points to whilst this object
// modifying the data gatherer m_DataGatherer points too whilst this object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a typo

Copy link
Contributor Author

@tveasey tveasey Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a typo before, the to[o] in this context means as well which is too rather than to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still kind of a weird sentence but fair enough!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait a sec, I'm sorry you're actually completely right. I'd somehow (repeatedly) misread!

// is being persisted. Therefore, persistence must only call methods on the
// data gatherer that are invariant.
m_Params(other.m_Params), m_DataGatherer(other.m_DataGatherer),
Expand Down
Loading