From 4e1a83b8ccbb5eda7769225a01c95ba1cab963d5 Mon Sep 17 00:00:00 2001 From: Tom Veasey Date: Fri, 25 May 2018 11:17:41 +0100 Subject: [PATCH] [ML] Store expanding window bucket values in a compressed format (#100) --- docs/CHANGELOG.asciidoc | 1 + include/core/CCompressUtils.h | 92 ------- include/core/CStringSimilarityTester.h | 4 +- include/core/CompressUtils.h | 218 +++++++++++++++++ include/maths/CExpandingWindow.h | 70 +++++- .../maths/CTimeSeriesDecompositionDetail.h | 4 +- lib/api/unittest/CAnomalyJobLimitTest.cc | 8 +- lib/config/CDataCountStatistics.cc | 2 +- lib/core/CCompressUtils.cc | 134 ----------- lib/core/CStringSimilarityTester.cc | 10 +- lib/core/CompressUtils.cc | 164 +++++++++++++ lib/core/Makefile | 2 +- lib/core/unittest/CCompressUtilsTest.cc | 111 +++++++-- lib/core/unittest/CCompressUtilsTest.h | 2 + lib/maths/CExpandingWindow.cc | 157 +++++++++--- lib/maths/CTimeSeriesDecompositionDetail.cc | 10 +- .../unittest/CDecayRateControllerTest.cc | 4 +- lib/maths/unittest/CExpandingWindowTest.cc | 227 +++++++++++++++--- lib/maths/unittest/CExpandingWindowTest.h | 2 + lib/maths/unittest/Main.cc | 2 +- lib/model/CEventRateBucketGatherer.cc | 8 +- .../CEventRatePopulationDataGathererTest.cc | 8 +- 22 files changed, 893 insertions(+), 347 deletions(-) delete mode 100644 include/core/CCompressUtils.h create mode 100644 include/core/CompressUtils.h delete mode 100644 lib/core/CCompressUtils.cc create mode 100644 lib/core/CompressUtils.cc diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 375c1015f9..0b474d2fe2 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -32,6 +32,7 @@ Improve robustness w.r.t. outliers of detection and initialisation of seasonal c Improve behavior when there are abrupt changes in the seasonal components present in a time series ({pull}91[#91]) Explicit change point detection and modelling ({pull}92[#92]) Improve partition analysis memory usage ({pull}97[#97]) +Reduce model memory by storing state for periodicity testing in a compressed format ({pull}100[#100]) Forecasting of Machine Learning job time series is now supported for large jobs by temporarily storing model state on disk ({pull}89[#89]) diff --git a/include/core/CCompressUtils.h b/include/core/CCompressUtils.h deleted file mode 100644 index e64a9355d1..0000000000 --- a/include/core/CCompressUtils.h +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -#ifndef INCLUDED_ml_core_CCompressUtils_h -#define INCLUDED_ml_core_CCompressUtils_h - -#include -#include - -#include - -#include -#include - -namespace ml { -namespace core { - -//! \brief -//! Shrink wrap zlib calls. -//! -//! DESCRIPTION:\n -//! Shrink wrap zlib calls. -//! -//! IMPLEMENTATION DECISIONS:\n -//! Data can be added incrementally and this 'finished' to -//! complete compression. -//! -//! This object retains in memory the entire compressed state -//! so it not good for file read/write. -//! -//! A single Z stream is used for the lifetime of the object, -//! so each object can only work on one task at a time. In -//! a multi-threaded application it would be best to create -//! one object for each thread. -//! -class CORE_EXPORT CCompressUtils : private CNonCopyable { -public: - //! The output type - using TByteVec = std::vector; - -public: - explicit CCompressUtils(bool lengthOnly, int level = Z_DEFAULT_COMPRESSION); - ~CCompressUtils(); - - //! Add a string. Multiple calls to this function without finishing the - //! compression are equivalent to compressing the concatenation of the - //! strings passed in the order they are passed. - bool addString(const std::string& input); - - //! Get compressed representation. This will fail if the lengthOnly - //! constructor argument was set to true. - //! - //! \note The compressed representation is a byte array NOT a string, - //! and hence not printable. - //! - //! If finish==false then retrieve partial compressed state. - bool compressedData(bool finish, TByteVec& result); - - //! Get compressed string length. - //! - //! If finish==false then retrieve partial compressed length. - bool compressedLength(bool finish, size_t& length); - - //! Reset the compressor. This will happen automatically when adding a - //! new string after having finished the previous compression, but - //! sometimes, for example when recovering from an error, it may be - //! desirable to explicitly reset the compressor state. - void reset(); - -private: - bool doCompress(bool finish, const std::string& input); - -private: - enum EState { E_Unused, E_Compressing, E_Finished }; - - EState m_State; - - //! Is this object only fit for getting compressed lengths? - bool m_LengthOnly; - - //! The output buffer when the compressed result is being stored - TByteVec m_FullResult; - - //! The zlib data structure. - z_stream m_ZlibStrm; -}; -} -} - -#endif // INCLUDED_ml_core_CCompressUtils_h diff --git a/include/core/CStringSimilarityTester.h b/include/core/CStringSimilarityTester.h index b8c5404b94..851d61cc78 100644 --- a/include/core/CStringSimilarityTester.h +++ b/include/core/CStringSimilarityTester.h @@ -6,9 +6,9 @@ #ifndef INCLUDED_ml_core_CStringSimilarityTester_h #define INCLUDED_ml_core_CStringSimilarityTester_h -#include #include #include +#include #include #include @@ -440,7 +440,7 @@ class CORE_EXPORT CStringSimilarityTester : private CNonCopyable { static const int MINUS_INFINITE_INT; //! Used by the compression-based similarity measures - mutable CCompressUtils m_Compressor; + mutable CDeflator m_Compressor; // For unit testing friend class ::CStringSimilarityTesterTest; diff --git a/include/core/CompressUtils.h b/include/core/CompressUtils.h new file mode 100644 index 0000000000..358ca37f16 --- /dev/null +++ b/include/core/CompressUtils.h @@ -0,0 +1,218 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +#ifndef INCLUDED_ml_core_CCompressUtils_h +#define INCLUDED_ml_core_CCompressUtils_h + +#include +#include + +#include + +#include +#include + +namespace ml { +namespace core { + +//! \brief +//! Shrink wrap zlib calls. +//! +//! DESCRIPTION:\n +//! Shrink wrap zlib calls. +//! +//! IMPLEMENTATION DECISIONS:\n +//! Data can be added incrementally and then 'finished' to +//! complete deflation or inflation. +//! +//! This object retains in memory the entire compressed state +//! so it not good for file read/write. +//! +//! A single Z stream is used for the lifetime of the object, +//! so each object can only work on one task at a time. In +//! a multi-threaded application it would be best to create +//! one object for each thread. +//! +class CORE_EXPORT CCompressUtil : private CNonCopyable { +public: + using TByteVec = std::vector; + +public: + CCompressUtil(bool lengthOnly); + virtual ~CCompressUtil() = default; + + //! Add a string. + //! + //! \note Multiple calls to this function without finishing + //! are equivalent to deflating or inflating the concatenation + //! of the strings passed in the order they are passed. + bool addString(const std::string& input); + + //! Add a vector of trivially copyable types. + //! + //! \note Multiple calls to this function without finishing + //! are equivalent to deflating or inflating the concatenation + //! of the vectors passed in the order they are passed. + template + bool addVector(const std::vector& input) { + static_assert(std::is_trivially_copyable::value, "Type must be trivially copyable"); + if (input.empty()) { + return true; + } + if (m_State == E_Finished) { + // If the last round of data processing has finished + // and we're adding a new vector then we need to reset + // the stream so that a new round starts from scratch. + this->reset(); + } + return this->processInput(false, input); + } + + //! Get transformed representation. + //! + //! \warning This will fail if the lengthOnly constructor argument + //! was set to true. + //! + //! \note The output representation is a byte array NOT a string, + //! and hence not printable. + //! + //! If finish==false then retrieve partial state. + bool data(bool finish, TByteVec& result); + + //! Get transformed representation. + //! + //! \note This is equivalent to calling data with finish==true, but + //! also takes the cached state (avoiding the copy). + bool finishAndTakeData(TByteVec& result); + + //! Get transformed data length. + //! + //! If finish==false then retrieve partial length. + bool length(bool finish, std::size_t& length); + + //! Reset the underlying stream. This will happen automatically + //! when adding a new string after having finished the previous + //! round, but sometimes, for example when recovering from an + //! error, it may be desirable to explicitly reset the state. + void reset(); + +protected: + //! Get the underlying stream. + z_stream& stream(); + +private: + enum EState { E_Unused, E_Active, E_Finished }; + +private: + static const std::size_t CHUNK_SIZE{4096}; + +private: + //! Get an unsigned character pointer to the address of the start + //! of the vector data. + template + static Bytef* bytes(const std::vector& input) { + return const_cast(reinterpret_cast(input.data())); + } + + //! Get an unsigned character pointer to the address of the start + //! of the string character array. + static Bytef* bytes(const std::string& input) { + return reinterpret_cast(const_cast(input.data())); + } + + //! Get the vector data size in bytes. + template + static uInt size(const std::vector& input) { + return static_cast(input.size() * sizeof(T)); + } + + //! Get the string size in bytes. + static uInt size(const std::string& input) { + return static_cast(input.size()); + } + + //! Process a chunk of state (optionally flushing). + bool processChunk(int flush); + + //! Process the input \p input in chunks. + template + bool processInput(bool finish, const T& input) { + if (input.empty() && m_State == E_Active && !finish) { + return true; + } + + m_State = E_Active; + + m_ZlibStrm.next_in = bytes(input); + m_ZlibStrm.avail_in = size(input); + + int flush{finish ? Z_FINISH : Z_NO_FLUSH}; + do { + if (this->processChunk(flush) == false) { + return false; + } + } while (m_ZlibStrm.avail_out == 0); + + m_State = finish ? E_Finished : E_Active; + + return true; + } + + //! Preparation before returning any data. + bool prepareToReturnData(bool finish); + + //! Process a chunk with the stream. + virtual int streamProcessChunk(int flush) = 0; + + //! Reset the underlying stream. + virtual int resetStream() = 0; + +private: + //! The current state of deflation or inflation. + EState m_State; + + //! Is this object only fit for getting output lengths? + bool m_LengthOnly; + + //! The buffer for a chunk of output from (de|in)flation. + Bytef m_Chunk[CHUNK_SIZE]; + + //! The output buffer when the compressed result is being + //! stored. + TByteVec m_FullResult; + + //! The zlib data structure. + z_stream m_ZlibStrm; +}; + +//! \brief Implementation of CompressUtil for deflating data. +class CORE_EXPORT CDeflator final : public CCompressUtil { +public: + CDeflator(bool lengthOnly, int level = Z_DEFAULT_COMPRESSION); + ~CDeflator(); + +private: + //! Process a chunk of state (optionally flushing). + virtual int streamProcessChunk(int flush); + //! Reset the underlying stream. + virtual int resetStream(); +}; + +//! \brief Implementation of CompressUtil for inflating data. +class CORE_EXPORT CInflator final : public CCompressUtil { +public: + CInflator(bool lengthOnly); + ~CInflator(); + +private: + //! Process a chunk of state (optionally flushing). + virtual int streamProcessChunk(int flush); + //! Reset the underlying stream. + virtual int resetStream(); +}; +} +} + +#endif // INCLUDED_ml_core_CCompressUtils_h diff --git a/include/maths/CExpandingWindow.h b/include/maths/CExpandingWindow.h index 18d75472e9..2d6c576655 100644 --- a/include/maths/CExpandingWindow.h +++ b/include/maths/CExpandingWindow.h @@ -36,6 +36,21 @@ namespace maths { //! constructor. At the point it overflows, i.e. time since the //! beginning of the window exceeds "size" x "maximum bucket length", //! it will re-initialize the bucketing and update the start time. +//! +//! IMPLEMENTATION:\n +//! It is expected that the full window of values only needs to be +//! accessed infrequently. For example, this class is currently used +//! by the test for seasonal components and as such the full window +//! of values is only accessed when doing a test at the point the +//! bucketing interval expands. +//! +//! Since the bucket values can constitute a significant amount of +//! memory, one can choose to store them in deflated format. Empirically, +//! this saves between 60% and 95% of the memory of this class depending +//! primarily on the number of populated buckets. +//! +//! The CPU cost of deflation is amortised by maintaining a small buffer +//! which is update with new values and only flushed when full. class MATHS_EXPORT CExpandingWindow { public: using TDoubleVec = std::vector; @@ -49,7 +64,8 @@ class MATHS_EXPORT CExpandingWindow { CExpandingWindow(core_t::TTime bucketLength, TTimeCRng bucketLengths, std::size_t size, - double decayRate = 0.0); + double decayRate = 0.0, + bool deflate = true); //! Initialize by reading state from \p traverser. bool acceptRestoreTraverser(core::CStateRestoreTraverser& traverser); @@ -66,8 +82,11 @@ class MATHS_EXPORT CExpandingWindow { //! Get the current bucket length. core_t::TTime bucketLength() const; + //! Get the number of bucket values. + std::size_t size() const; + //! Get the bucket values. - const TFloatMeanAccumulatorVec& values() const; + TFloatMeanAccumulatorVec values() const; //! Get the bucket values minus the values from \p trend. TFloatMeanAccumulatorVec valuesMinusPrediction(const TPredictor& predictor) const; @@ -94,9 +113,47 @@ class MATHS_EXPORT CExpandingWindow { std::size_t memoryUsage() const; private: + using TByte = unsigned char; + using TByteVec = std::vector; + using TSizeFloatMeanAccumulatorPr = std::pair; + using TSizeFloatMeanAccumulatorPrVec = std::vector; + + //! \brief Inflates the bucket values for the lifetime of the object. + class MATHS_EXPORT CScopeInflate : private core::CNonCopyable { + public: + CScopeInflate(const CExpandingWindow& window, bool commit); + ~CScopeInflate(); + + private: + //! The window to inflate. + const CExpandingWindow& m_Window; + //! True if any buffered changes are to be committed. + bool m_Commit; + }; + +private: + //! Convert to a compressed representation. + void deflate(bool commit) const; + + //! Implements deflate. + void doDeflate(bool commit); + + //! Extract from the compressed representation. + void inflate(bool commit) const; + + //! Implements inflate. + void doInflate(bool commit); + +private: + //! True if the bucket values are stored in deflated format. + bool m_Deflate; + //! The rate at which the bucket values are aged. double m_DecayRate; + //! The number of buckets. + std::size_t m_Size; + //! The data bucketing length. core_t::TTime m_BucketLength; @@ -109,9 +166,18 @@ class MATHS_EXPORT CExpandingWindow { //! The time of the first data point. core_t::TTime m_StartTime; + //! A buffer used to amortize the cost of compression. + TSizeFloatMeanAccumulatorPrVec m_BufferedValues; + + //! Get the total time to propagate the values forward on decompression. + double m_BufferedTimeToPropagate; + //! The bucket values. TFloatMeanAccumulatorVec m_BucketValues; + //! The deflated bucket values. + TByteVec m_DeflatedBucketValues; + //! The mean value time modulo the data bucketing length. TFloatMeanAccumulator m_MeanOffset; }; diff --git a/include/maths/CTimeSeriesDecompositionDetail.h b/include/maths/CTimeSeriesDecompositionDetail.h index dc1f62514b..37545ecbd9 100644 --- a/include/maths/CTimeSeriesDecompositionDetail.h +++ b/include/maths/CTimeSeriesDecompositionDetail.h @@ -237,8 +237,8 @@ class MATHS_EXPORT CTimeSeriesDecompositionDetail { //! Check if we should run the periodicity test on \p window. bool shouldTest(ETest test, core_t::TTime time) const; - //! Get a new \p test. (Warning owned by the caller.) - CExpandingWindow* newWindow(ETest test) const; + //! Get a new \p test. (Warning: this is owned by the caller.) + CExpandingWindow* newWindow(ETest test, bool deflate = true) const; //! Account for memory that is not yet allocated //! during the initial state diff --git a/lib/api/unittest/CAnomalyJobLimitTest.cc b/lib/api/unittest/CAnomalyJobLimitTest.cc index 83d3222966..0119643589 100644 --- a/lib/api/unittest/CAnomalyJobLimitTest.cc +++ b/lib/api/unittest/CAnomalyJobLimitTest.cc @@ -373,7 +373,7 @@ void CAnomalyJobLimitTest::testModelledEntityCountForFixedMemoryLimit() { LOG_DEBUG(<< "Processed " << std::floor(100.0 * progress) << "%"); reportProgress += 0.1; } - for (std::size_t i = 0; i < 700; ++i) { + for (std::size_t i = 0; i < 1000; ++i) { rng.generateUniformSamples(0, generators.size(), 1, generator); TOptionalDouble value{generators[generator[0]](time)}; if (value) { @@ -390,7 +390,7 @@ void CAnomalyJobLimitTest::testModelledEntityCountForFixedMemoryLimit() { LOG_DEBUG(<< "# partition = " << used.s_PartitionFields); LOG_DEBUG(<< "Memory status = " << used.s_MemoryStatus); LOG_DEBUG(<< "Memory usage = " << used.s_Usage); - CPPUNIT_ASSERT(used.s_ByFields > 440 && used.s_ByFields < 600); + CPPUNIT_ASSERT(used.s_ByFields > 820 && used.s_ByFields < 980); CPPUNIT_ASSERT_EQUAL(std::size_t(2), used.s_PartitionFields); } @@ -416,7 +416,7 @@ void CAnomalyJobLimitTest::testModelledEntityCountForFixedMemoryLimit() { LOG_DEBUG(<< "Processed " << std::floor(100.0 * progress) << "%"); reportProgress += 0.1; } - for (std::size_t i = 0; i < 500; ++i) { + for (std::size_t i = 0; i < 600; ++i) { rng.generateUniformSamples(0, generators.size(), 1, generator); TOptionalDouble value{generators[generator[0]](time)}; if (value) { @@ -433,7 +433,7 @@ void CAnomalyJobLimitTest::testModelledEntityCountForFixedMemoryLimit() { LOG_DEBUG(<< "# partition = " << used.s_PartitionFields); LOG_DEBUG(<< "Memory status = " << used.s_MemoryStatus); LOG_DEBUG(<< "Memory usage = " << used.s_Usage); - CPPUNIT_ASSERT(used.s_PartitionFields > 280 && used.s_PartitionFields < 360); + CPPUNIT_ASSERT(used.s_PartitionFields > 390 && used.s_PartitionFields < 470); CPPUNIT_ASSERT(static_cast(used.s_ByFields) > 0.95 * static_cast(used.s_PartitionFields)); } diff --git a/lib/config/CDataCountStatistics.cc b/lib/config/CDataCountStatistics.cc index 092fc567e5..1d28ceebab 100644 --- a/lib/config/CDataCountStatistics.cc +++ b/lib/config/CDataCountStatistics.cc @@ -6,11 +6,11 @@ #include -#include #include #include #include #include +#include #include #include diff --git a/lib/core/CCompressUtils.cc b/lib/core/CCompressUtils.cc deleted file mode 100644 index 1853598004..0000000000 --- a/lib/core/CCompressUtils.cc +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -#include - -#include - -#include - -namespace ml { -namespace core { - -CCompressUtils::CCompressUtils(bool lengthOnly, int level) - : m_State(E_Unused), m_LengthOnly(lengthOnly) { - ::memset(&m_ZlibStrm, 0, sizeof(z_stream)); - - m_ZlibStrm.zalloc = Z_NULL; - m_ZlibStrm.zfree = Z_NULL; - - int ret(::deflateInit(&m_ZlibStrm, level)); - if (ret != Z_OK) { - LOG_ABORT(<< "Error initialising Z stream: " << ::zError(ret)); - } -} - -CCompressUtils::~CCompressUtils() { - int ret(::deflateEnd(&m_ZlibStrm)); - if (ret != Z_OK) { - LOG_ERROR(<< "Error ending Z stream: " << ::zError(ret)); - } -} - -bool CCompressUtils::addString(const std::string& str) { - if (m_State == E_Finished) { - // If the previous compression has finished and we're adding a new - // string then we need to reset the stream so that a new compression - // starts from scratch - this->reset(); - } - - return this->doCompress(false, str); -} - -bool CCompressUtils::compressedData(bool finish, TByteVec& result) { - if (m_LengthOnly) { - LOG_ERROR(<< "Cannot get compressed data from length-only compressor"); - return false; - } - - if (m_State == E_Unused) { - LOG_ERROR(<< "Cannot get compressed data - no strings added"); - return false; - } - - if (finish && m_State == E_Compressing) { - if (this->doCompress(finish, std::string()) == false) { - LOG_ERROR(<< "Cannot finish compression"); - return false; - } - } - - result = m_FullResult; - - return true; -} - -bool CCompressUtils::compressedLength(bool finish, size_t& length) { - if (m_State == E_Unused) { - LOG_ERROR(<< "Cannot get compressed data - no strings added"); - return false; - } - - if (finish && m_State == E_Compressing) { - if (this->doCompress(finish, std::string()) == false) { - LOG_ERROR(<< "Cannot finish compression"); - return false; - } - } - - length = m_ZlibStrm.total_out; - - return true; -} - -void CCompressUtils::reset() { - int ret(::deflateReset(&m_ZlibStrm)); - if (ret != Z_OK) { - // deflateReset() will only fail if one or more of the critical members - // of the current stream struct are NULL. If this happens then memory - // corruption must have occurred, because there's nowhere where we set - // these pointers to NULL after initialisation, so it's reasonable to - // abort. - LOG_ABORT(<< "Error reseting Z stream: " << ::zError(ret)); - } - m_State = E_Unused; -} - -bool CCompressUtils::doCompress(bool finish, const std::string& str) { - if (str.empty() && m_State == E_Compressing && !finish) { - return true; - } - - m_State = E_Compressing; - - m_ZlibStrm.next_in = reinterpret_cast(const_cast(str.data())); - m_ZlibStrm.avail_in = static_cast(str.size()); - - static const size_t CHUNK_SIZE = 4096; - Bytef out[CHUNK_SIZE]; - - int flush(finish ? Z_FINISH : Z_NO_FLUSH); - do { - m_ZlibStrm.next_out = out; - m_ZlibStrm.avail_out = CHUNK_SIZE; - int ret(::deflate(&m_ZlibStrm, flush)); - if (ret == Z_STREAM_ERROR) { - LOG_ERROR(<< "Error deflating: " << ::zError(ret)); - return false; - } - - size_t have(CHUNK_SIZE - m_ZlibStrm.avail_out); - if (!m_LengthOnly) { - m_FullResult.insert(m_FullResult.end(), &out[0], &out[have]); - } - } while (m_ZlibStrm.avail_out == 0); - - m_State = finish ? E_Finished : E_Compressing; - - return true; -} -} -} diff --git a/lib/core/CStringSimilarityTester.cc b/lib/core/CStringSimilarityTester.cc index c2599116cf..83b5d5152f 100644 --- a/lib/core/CStringSimilarityTester.cc +++ b/lib/core/CStringSimilarityTester.cc @@ -22,9 +22,9 @@ bool CStringSimilarityTester::similarity(const std::string& first, size_t secondCompLength(0); if (m_Compressor.addString(first) == false || - m_Compressor.compressedLength(true, firstCompLength) == false || + m_Compressor.length(true, firstCompLength) == false || m_Compressor.addString(second) == false || - m_Compressor.compressedLength(true, secondCompLength) == false) { + m_Compressor.length(true, secondCompLength) == false) { // The compressor will have logged the detailed reason LOG_ERROR(<< "Compression problem"); return false; @@ -49,9 +49,9 @@ bool CStringSimilarityTester::similarity(const std::string& first, size_t secondPlusFirstCompLength(0); if (m_Compressor.addString(first) == false || m_Compressor.addString(second) == false || - m_Compressor.compressedLength(true, firstPlusSecondCompLength) == false || + m_Compressor.length(true, firstPlusSecondCompLength) == false || m_Compressor.addString(second) == false || m_Compressor.addString(first) == false || - m_Compressor.compressedLength(true, secondPlusFirstCompLength) == false) { + m_Compressor.length(true, secondPlusFirstCompLength) == false) { // The compressor will have logged the detailed reason LOG_ERROR(<< "Compression problem"); return false; @@ -71,7 +71,7 @@ bool CStringSimilarityTester::similarity(const std::string& first, } bool CStringSimilarityTester::compressedLengthOf(const std::string& str, size_t& length) const { - return m_Compressor.addString(str) && m_Compressor.compressedLength(true, length); + return m_Compressor.addString(str) && m_Compressor.length(true, length); } int** CStringSimilarityTester::setupBerghelRoachMatrix(int maxDist, diff --git a/lib/core/CompressUtils.cc b/lib/core/CompressUtils.cc new file mode 100644 index 0000000000..0f73873e48 --- /dev/null +++ b/lib/core/CompressUtils.cc @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +#include + +#include + +#include + +namespace ml { +namespace core { + +CCompressUtil::CCompressUtil(bool lengthOnly) + : m_State{E_Unused}, m_LengthOnly{lengthOnly} { + ::memset(&m_ZlibStrm, 0, sizeof(z_stream)); + m_ZlibStrm.zalloc = Z_NULL; + m_ZlibStrm.zfree = Z_NULL; +} + +bool CCompressUtil::addString(const std::string& input) { + if (m_State == E_Finished) { + // If the last round of data processing has finished + // and we're adding a new vector then we need to reset + // the stream so that a new round starts from scratch. + this->reset(); + } + return this->processInput(false, input); +} + +bool CCompressUtil::data(bool finish, TByteVec& result) { + if (this->prepareToReturnData(finish) == false) { + return false; + } + result = m_FullResult; + return true; +} + +bool CCompressUtil::finishAndTakeData(TByteVec& result) { + if (this->prepareToReturnData(true) == false) { + return false; + } + result = std::move(m_FullResult); + return true; +} + +bool CCompressUtil::length(bool finish, std::size_t& length) { + if (m_State == E_Unused) { + LOG_ERROR(<< "Cannot get length - nothing added"); + return false; + } + + if (finish && m_State == E_Active) { + if (this->processInput(finish, std::string()) == false) { + LOG_ERROR(<< "Cannot finish processing"); + return false; + } + } + + length = m_ZlibStrm.total_out; + + return true; +} + +void CCompressUtil::reset() { + int ret{this->resetStream()}; + if (ret != Z_OK) { + // resetStream() will only fail if one or more of the critical + // members of the current z_stream struct are NULL. If this + // happens then memory corruption must have occurred, because + // there's nowhere where we set these pointers to NULL after + // initialisation, so it's reasonable to abort. + LOG_ABORT(<< "Error reseting Z stream: " << ::zError(ret)); + } + m_State = E_Unused; +} + +z_stream& CCompressUtil::stream() { + return m_ZlibStrm; +} + +bool CCompressUtil::processChunk(int flush) { + m_ZlibStrm.next_out = m_Chunk; + m_ZlibStrm.avail_out = CHUNK_SIZE; + + int ret{this->streamProcessChunk(flush)}; + if (ret == Z_STREAM_ERROR) { + LOG_ERROR(<< "Error processing: " << ::zError(ret)); + return false; + } + + std::size_t have{CHUNK_SIZE - m_ZlibStrm.avail_out}; + if (!m_LengthOnly) { + m_FullResult.insert(m_FullResult.end(), &m_Chunk[0], &m_Chunk[have]); + } + return true; +} + +bool CCompressUtil::prepareToReturnData(bool finish) { + if (m_LengthOnly) { + LOG_ERROR(<< "Cannot get data if asked for length-only"); + return false; + } + + if (m_State == E_Unused) { + LOG_ERROR(<< "Cannot get data - nothing added"); + return false; + } + + if (finish && m_State == E_Active) { + if (this->processInput(finish, std::string()) == false) { + LOG_ERROR(<< "Failed to finish processing"); + return false; + } + } + return true; +} + +CDeflator::CDeflator(bool lengthOnly, int level) : CCompressUtil{lengthOnly} { + int ret{::deflateInit(&this->stream(), level)}; + if (ret != Z_OK) { + LOG_ABORT(<< "Error initialising Z stream: " << ::zError(ret)); + } +} + +CDeflator::~CDeflator() { + int ret{::deflateEnd(&this->stream())}; + if (ret != Z_OK) { + LOG_ERROR(<< "Error ending Z stream: " << ::zError(ret)); + } +} + +int CDeflator::streamProcessChunk(int flush) { + return ::deflate(&this->stream(), flush); +} + +int CDeflator::resetStream() { + return ::deflateReset(&this->stream()); +} + +CInflator::CInflator(bool lengthOnly) : CCompressUtil{lengthOnly} { + int ret{::inflateInit(&this->stream())}; + if (ret != Z_OK) { + LOG_ABORT(<< "Error initialising Z stream: " << ::zError(ret)); + } +} + +CInflator::~CInflator() { + int ret{::inflateEnd(&this->stream())}; + if (ret != Z_OK) { + LOG_ERROR(<< "Error ending Z stream: " << ::zError(ret)); + } +} + +int CInflator::streamProcessChunk(int flush) { + return ::inflate(&this->stream(), flush); +} + +int CInflator::resetStream() { + return ::inflateReset(&this->stream()); +} +} +} diff --git a/lib/core/Makefile b/lib/core/Makefile index 4da27e50f1..efcbc9dc61 100644 --- a/lib/core/Makefile +++ b/lib/core/Makefile @@ -71,7 +71,7 @@ CBase64Filter.cc \ CBufferFlushTimer.cc \ CCompressedDictionary.cc \ CCompressOStream.cc \ -CCompressUtils.cc \ +CompressUtils.cc \ CContainerPrinter.cc \ CDataAdder.cc \ CDataSearcher.cc \ diff --git a/lib/core/unittest/CCompressUtilsTest.cc b/lib/core/unittest/CCompressUtilsTest.cc index df07ae01cd..9bca31a92a 100644 --- a/lib/core/unittest/CCompressUtilsTest.cc +++ b/lib/core/unittest/CCompressUtilsTest.cc @@ -5,8 +5,8 @@ */ #include "CCompressUtilsTest.h" -#include #include +#include #include @@ -21,21 +21,26 @@ CppUnit::Test* CCompressUtilsTest::suite() { "CCompressUtilsTest::testManyAdds", &CCompressUtilsTest::testManyAdds)); suiteOfTests->addTest(new CppUnit::TestCaller( "CCompressUtilsTest::testLengthOnly", &CCompressUtilsTest::testLengthOnly)); + suiteOfTests->addTest(new CppUnit::TestCaller( + "CCompressUtilsTest::testInflate", &CCompressUtilsTest::testInflate)); + suiteOfTests->addTest(new CppUnit::TestCaller( + "CCompressUtilsTest::testTriviallyCopyableTypeVector", + &CCompressUtilsTest::testTriviallyCopyableTypeVector)); return suiteOfTests; } void CCompressUtilsTest::testEmptyAdd() { - ml::core::CCompressUtils compressor(false); + ml::core::CDeflator compressor(false); std::string str; CPPUNIT_ASSERT(compressor.addString(str)); - ml::core::CCompressUtils::TByteVec output; + ml::core::CCompressUtil::TByteVec output; size_t length(0); - CPPUNIT_ASSERT(compressor.compressedData(true, output)); - CPPUNIT_ASSERT(compressor.compressedLength(true, length)); + CPPUNIT_ASSERT(compressor.data(true, output)); + CPPUNIT_ASSERT(compressor.length(true, length)); LOG_INFO(<< "Length of nothing compressed is " << length); @@ -44,17 +49,17 @@ void CCompressUtilsTest::testEmptyAdd() { } void CCompressUtilsTest::testOneAdd() { - ml::core::CCompressUtils compressor(false); + ml::core::CDeflator compressor(false); std::string str("1234567890"); CPPUNIT_ASSERT(compressor.addString(str)); - ml::core::CCompressUtils::TByteVec output; + ml::core::CCompressUtil::TByteVec output; size_t length(0); - CPPUNIT_ASSERT(compressor.compressedData(true, output)); - CPPUNIT_ASSERT(compressor.compressedLength(true, length)); + CPPUNIT_ASSERT(compressor.data(true, output)); + CPPUNIT_ASSERT(compressor.length(true, length)); LOG_INFO(<< "Length of " << str << " compressed is " << length); @@ -63,7 +68,7 @@ void CCompressUtilsTest::testOneAdd() { } void CCompressUtilsTest::testManyAdds() { - ml::core::CCompressUtils compressorMulti(false); + ml::core::CDeflator compressorMulti(false); std::string str1("1234567890"); std::string str2("qwertyuiopa1234sdfghjklzxcvbnm"); @@ -73,26 +78,26 @@ void CCompressUtilsTest::testManyAdds() { CPPUNIT_ASSERT(compressorMulti.addString(str2)); CPPUNIT_ASSERT(compressorMulti.addString(str3)); - ml::core::CCompressUtils::TByteVec outputMulti; + ml::core::CCompressUtil::TByteVec outputMulti; size_t lengthMulti(0); - CPPUNIT_ASSERT(compressorMulti.compressedData(true, outputMulti)); - CPPUNIT_ASSERT(compressorMulti.compressedLength(true, lengthMulti)); + CPPUNIT_ASSERT(compressorMulti.data(true, outputMulti)); + CPPUNIT_ASSERT(compressorMulti.length(true, lengthMulti)); LOG_INFO(<< "Length of " << str1 << str2 << str3 << " compressed is " << lengthMulti); CPPUNIT_ASSERT(lengthMulti > 0); CPPUNIT_ASSERT_EQUAL(lengthMulti, outputMulti.size()); - ml::core::CCompressUtils compressorSingle(false); + ml::core::CDeflator compressorSingle(false); CPPUNIT_ASSERT(compressorSingle.addString(str1 + str2 + str3)); - ml::core::CCompressUtils::TByteVec outputSingle; + ml::core::CCompressUtil::TByteVec outputSingle; size_t lengthSingle(0); - CPPUNIT_ASSERT(compressorSingle.compressedData(true, outputSingle)); - CPPUNIT_ASSERT(compressorSingle.compressedLength(true, lengthSingle)); + CPPUNIT_ASSERT(compressorSingle.data(true, outputSingle)); + CPPUNIT_ASSERT(compressorSingle.length(true, lengthSingle)); CPPUNIT_ASSERT_EQUAL(lengthMulti, lengthSingle); CPPUNIT_ASSERT_EQUAL(lengthSingle, outputSingle.size()); @@ -100,7 +105,7 @@ void CCompressUtilsTest::testManyAdds() { } void CCompressUtilsTest::testLengthOnly() { - ml::core::CCompressUtils compressorFull(false); + ml::core::CDeflator compressorFull(false); std::string str("qwertyuiopa1234sdfghjklzxcvbnm"); @@ -108,30 +113,86 @@ void CCompressUtilsTest::testLengthOnly() { CPPUNIT_ASSERT(compressorFull.addString(str)); CPPUNIT_ASSERT(compressorFull.addString(str)); - ml::core::CCompressUtils::TByteVec outputFull; + ml::core::CCompressUtil::TByteVec outputFull; size_t lengthFull(0); - CPPUNIT_ASSERT(compressorFull.compressedData(true, outputFull)); - CPPUNIT_ASSERT(compressorFull.compressedLength(true, lengthFull)); + CPPUNIT_ASSERT(compressorFull.data(true, outputFull)); + CPPUNIT_ASSERT(compressorFull.length(true, lengthFull)); LOG_INFO(<< "Length of " << str << str << str << " compressed is " << lengthFull); CPPUNIT_ASSERT(lengthFull > 0); CPPUNIT_ASSERT_EQUAL(lengthFull, outputFull.size()); - ml::core::CCompressUtils compressorLengthOnly(true); + ml::core::CDeflator compressorLengthOnly(true); CPPUNIT_ASSERT(compressorLengthOnly.addString(str)); CPPUNIT_ASSERT(compressorLengthOnly.addString(str)); CPPUNIT_ASSERT(compressorLengthOnly.addString(str)); - ml::core::CCompressUtils::TByteVec outputLengthOnly; + ml::core::CCompressUtil::TByteVec outputLengthOnly; size_t lengthLengthOnly(0); // Should NOT be possible to get the full compressed data in this case - CPPUNIT_ASSERT(!compressorLengthOnly.compressedData(true, outputLengthOnly)); - CPPUNIT_ASSERT(compressorLengthOnly.compressedLength(true, lengthLengthOnly)); + CPPUNIT_ASSERT(!compressorLengthOnly.data(true, outputLengthOnly)); + CPPUNIT_ASSERT(compressorLengthOnly.length(true, lengthLengthOnly)); CPPUNIT_ASSERT_EQUAL(lengthFull, lengthLengthOnly); CPPUNIT_ASSERT_EQUAL(size_t(0), outputLengthOnly.size()); } + +void CCompressUtilsTest::testInflate() { + ml::core::CDeflator compressor(false); + + std::string repeat("qwertyuiopa1234sdfghjklzxcvbnm"); + std::string input(repeat + repeat + repeat); + LOG_DEBUG(<< "Input size = " << input.size()); + + CPPUNIT_ASSERT(compressor.addString(input)); + + ml::core::CCompressUtil::TByteVec compressed; + CPPUNIT_ASSERT(compressor.data(true, compressed)); + LOG_DEBUG(<< "Compressed size = " << compressed.size()); + + ml::core::CInflator decompressor(false); + + CPPUNIT_ASSERT(decompressor.addVector(compressed)); + + ml::core::CCompressUtil::TByteVec decompressed; + CPPUNIT_ASSERT(decompressor.data(true, decompressed)); + LOG_DEBUG(<< "Decompressed size = " << decompressed.size()); + + std::string output{decompressed.begin(), decompressed.end()}; + + CPPUNIT_ASSERT_EQUAL(input, output); +} + +void CCompressUtilsTest::testTriviallyCopyableTypeVector() { + std::vector empty; + + ml::core::CDeflator compressor(false); + + CPPUNIT_ASSERT(compressor.addVector(empty)); + + ml::core::CCompressUtil::TByteVec compressed; + CPPUNIT_ASSERT(!compressor.data(false, compressed)); + CPPUNIT_ASSERT(compressed.empty()); + + std::vector input{1.3, 272083891.1, 1.3902e-26, 1.3, 0.0}; + LOG_DEBUG("Input size = " << input.size() * sizeof(double)); + + CPPUNIT_ASSERT(compressor.addVector(input)); + + compressor.data(true, compressed); + LOG_DEBUG("Compressed size = " << compressed.size()); + CPPUNIT_ASSERT(compressed.size() < input.size() * sizeof(double)); + + ml::core::CInflator decompressor(false); + CPPUNIT_ASSERT(decompressor.addVector(compressed)); + + ml::core::CCompressUtil::TByteVec uncompressed; + CPPUNIT_ASSERT(decompressor.data(true, uncompressed)); + + CPPUNIT_ASSERT(std::equal(uncompressed.begin(), uncompressed.end(), + reinterpret_cast(input.data()))); +} diff --git a/lib/core/unittest/CCompressUtilsTest.h b/lib/core/unittest/CCompressUtilsTest.h index 85263ef7f3..8917d329a2 100644 --- a/lib/core/unittest/CCompressUtilsTest.h +++ b/lib/core/unittest/CCompressUtilsTest.h @@ -14,6 +14,8 @@ class CCompressUtilsTest : public CppUnit::TestFixture { void testOneAdd(); void testManyAdds(); void testLengthOnly(); + void testInflate(); + void testTriviallyCopyableTypeVector(); static CppUnit::Test* suite(); }; diff --git a/lib/maths/CExpandingWindow.cc b/lib/maths/CExpandingWindow.cc index df843e6214..5145c7e45e 100644 --- a/lib/maths/CExpandingWindow.cc +++ b/lib/maths/CExpandingWindow.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -22,20 +23,24 @@ namespace ml { namespace maths { namespace { -const std::string BUCKET_LENGTH_INDEX_TAG("a"); -const std::string BUCKET_VALUES_TAG("b"); -const std::string START_TIME_TAG("c"); -const std::string MEAN_OFFSET_TAG("d"); +const std::string BUCKET_LENGTH_INDEX_TAG{"a"}; +const std::string BUCKET_VALUES_TAG{"b"}; +const std::string START_TIME_TAG{"c"}; +const std::string MEAN_OFFSET_TAG{"d"}; +const std::size_t MAX_BUFFER_SIZE{5}; } CExpandingWindow::CExpandingWindow(core_t::TTime bucketLength, TTimeCRng bucketLengths, std::size_t size, - double decayRate) - : m_DecayRate(decayRate), m_BucketLength(bucketLength), - m_BucketLengths(bucketLengths), m_BucketLengthIndex(0), - m_StartTime(boost::numeric::bounds::lowest()), - m_BucketValues(size % 2 == 0 ? size : size + 1) { + double decayRate, + bool deflate) + : m_Deflate{deflate}, m_DecayRate{decayRate}, m_Size{size}, m_BucketLength{bucketLength}, + m_BucketLengths{bucketLengths}, m_BucketLengthIndex{0}, + m_StartTime{boost::numeric::bounds::lowest()}, + m_BufferedTimeToPropagate(0.0), m_BucketValues(size % 2 == 0 ? size : size + 1) { + m_BufferedValues.reserve(MAX_BUFFER_SIZE); + this->deflate(true); } bool CExpandingWindow::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) { @@ -48,10 +53,12 @@ bool CExpandingWindow::acceptRestoreTraverser(core::CStateRestoreTraverser& trav core::CPersistUtils::restore(BUCKET_VALUES_TAG, m_BucketValues, traverser)); RESTORE(MEAN_OFFSET_TAG, m_MeanOffset.fromDelimited(traverser.value())) } while (traverser.next()); + this->deflate(true); return true; } void CExpandingWindow::acceptPersistInserter(core::CStatePersistInserter& inserter) const { + CScopeInflate inflate(*this, false); inserter.insertValue(BUCKET_LENGTH_INDEX_TAG, m_BucketLengthIndex); inserter.insertValue(START_TIME_TAG, m_StartTime); core::CPersistUtils::persist(BUCKET_VALUES_TAG, m_BucketValues, inserter); @@ -63,7 +70,7 @@ core_t::TTime CExpandingWindow::startTime() const { } core_t::TTime CExpandingWindow::endTime() const { - return m_StartTime + (static_cast(m_BucketValues.size()) * + return m_StartTime + (static_cast(m_Size) * m_BucketLengths[m_BucketLengthIndex]); } @@ -71,12 +78,19 @@ core_t::TTime CExpandingWindow::bucketLength() const { return m_BucketLengths[m_BucketLengthIndex]; } -const CExpandingWindow::TFloatMeanAccumulatorVec& CExpandingWindow::values() const { +std::size_t CExpandingWindow::size() const { + return m_Size; +} + +CExpandingWindow::TFloatMeanAccumulatorVec CExpandingWindow::values() const { + CScopeInflate inflate(*this, false); return m_BucketValues; } CExpandingWindow::TFloatMeanAccumulatorVec CExpandingWindow::valuesMinusPrediction(const TPredictor& predictor) const { + CScopeInflate inflate(*this, false); + core_t::TTime start{CIntegerTools::floor(this->startTime(), m_BucketLength)}; core_t::TTime end{CIntegerTools::ceil(this->endTime(), m_BucketLength)}; core_t::TTime size{static_cast(m_BucketValues.size())}; @@ -109,37 +123,54 @@ void CExpandingWindow::initialize(core_t::TTime time) { void CExpandingWindow::propagateForwardsByTime(double time) { if (!CMathsFuncs::isFinite(time) || time < 0.0) { LOG_ERROR(<< "Bad propagation time " << time); + return; } - double factor = std::exp(-m_DecayRate * time); - for (auto& value : m_BucketValues) { - value.age(factor); + double factor{std::exp(-m_DecayRate * time)}; + for (auto& value : m_BufferedValues) { + value.second.age(factor); } + m_BufferedTimeToPropagate += time; } void CExpandingWindow::add(core_t::TTime time, double value, double weight) { if (time >= m_StartTime) { - while (this->needToCompress(time)) { - m_BucketLengthIndex = (m_BucketLengthIndex + 1) % m_BucketLengths.size(); - auto end = m_BucketValues.begin(); - - if (m_BucketLengthIndex == 0) { - m_StartTime = CIntegerTools::floor(time, m_BucketLengths[0]); - } else { - std::size_t compression = m_BucketLengths[m_BucketLengthIndex] / - m_BucketLengths[m_BucketLengthIndex - 1]; - for (std::size_t i = 0u; i < m_BucketValues.size(); i += compression, ++end) { - std::swap(*end, m_BucketValues[i]); - for (std::size_t j = 1u; - j < compression && i + j < m_BucketValues.size(); ++j) { - *end += m_BucketValues[i + j]; + if (this->needToCompress(time)) { + CScopeInflate inflate(*this, true); + do { + m_BucketLengthIndex = (m_BucketLengthIndex + 1) % + m_BucketLengths.size(); + auto end = m_BucketValues.begin(); + + if (m_BucketLengthIndex == 0) { + m_StartTime = CIntegerTools::floor(time, m_BucketLengths[0]); + } else { + std::size_t compression(m_BucketLengths[m_BucketLengthIndex] / + m_BucketLengths[m_BucketLengthIndex - 1]); + for (std::size_t i = 0u; i < m_BucketValues.size(); + i += compression, ++end) { + std::swap(*end, m_BucketValues[i]); + for (std::size_t j = 1u; + j < compression && i + j < m_BucketValues.size(); ++j) { + *end += m_BucketValues[i + j]; + } } } - } - std::fill(end, m_BucketValues.end(), TFloatMeanAccumulator()); + std::fill(end, m_BucketValues.end(), TFloatMeanAccumulator()); + } while (this->needToCompress(time)); } - m_BucketValues[(time - m_StartTime) / m_BucketLengths[m_BucketLengthIndex]] - .add(value, weight); + std::size_t index((time - m_StartTime) / m_BucketLengths[m_BucketLengthIndex]); + if (m_Deflate == false) { + m_BucketValues[index].add(value, weight); + } else { + if (m_BufferedValues.empty() || index != m_BufferedValues.back().first) { + if (m_BufferedValues.size() == MAX_BUFFER_SIZE) { + CScopeInflate inflate(*this, true); + } + m_BufferedValues.push_back({index, TFloatMeanAccumulator{}}); + } + m_BufferedValues.back().second.add(value, weight); + } m_MeanOffset.add(static_cast(time % m_BucketLength)); } } @@ -149,6 +180,7 @@ bool CExpandingWindow::needToCompress(core_t::TTime time) const { } uint64_t CExpandingWindow::checksum(uint64_t seed) const { + CScopeInflate inflate(*this, false); seed = CChecksum::calculate(seed, m_BucketLengthIndex); seed = CChecksum::calculate(seed, m_StartTime); seed = CChecksum::calculate(seed, m_BucketValues); @@ -156,12 +188,69 @@ uint64_t CExpandingWindow::checksum(uint64_t seed) const { } void CExpandingWindow::debugMemoryUsage(core::CMemoryUsage::TMemoryUsagePtr mem) const { - mem->setName("CScanningPeriodicityTest"); + mem->setName("CExpandingWindow"); core::CMemoryDebug::dynamicSize("m_BucketValues", m_BucketValues, mem); + core::CMemoryDebug::dynamicSize("m_DeflatedBucketValues", m_DeflatedBucketValues, mem); } std::size_t CExpandingWindow::memoryUsage() const { - return core::CMemory::dynamicSize(m_BucketValues); + std::size_t mem{core::CMemory::dynamicSize(m_BucketValues)}; + mem += core::CMemory::dynamicSize(m_DeflatedBucketValues); + return mem; +} + +void CExpandingWindow::deflate(bool commit) const { + if (m_Deflate && m_BucketValues.size() > 0) { + const_cast(this)->doDeflate(commit); + } +} + +void CExpandingWindow::doDeflate(bool commit) { + if (commit) { + bool lengthOnly{false}; + core::CDeflator compressor(lengthOnly); + compressor.addVector(m_BucketValues); + compressor.finishAndTakeData(m_DeflatedBucketValues); + } + m_BucketValues.clear(); + m_BucketValues.shrink_to_fit(); +} + +void CExpandingWindow::inflate(bool commit) const { + if (m_Deflate && m_BucketValues.empty()) { + const_cast(this)->doInflate(commit); + } +} + +void CExpandingWindow::doInflate(bool commit) { + bool lengthOnly{false}; + core::CInflator decompressor(lengthOnly); + decompressor.addVector(m_DeflatedBucketValues); + TByteVec inflated; + decompressor.finishAndTakeData(inflated); + m_BucketValues.resize(inflated.size() / sizeof(TFloatMeanAccumulator)); + std::copy(inflated.begin(), inflated.end(), + reinterpret_cast(m_BucketValues.data())); + double factor{std::exp(-m_DecayRate * m_BufferedTimeToPropagate)}; + for (auto& value : m_BucketValues) { + value.age(factor); + } + for (auto& value : m_BufferedValues) { + m_BucketValues[value.first] += value.second; + } + if (commit) { + m_BufferedValues.clear(); + m_BufferedTimeToPropagate = 0.0; + } +} + +CExpandingWindow::CScopeInflate::CScopeInflate(const CExpandingWindow& window, bool commit) + : m_Window{window}, m_Commit{commit} { + m_Window.inflate(commit); +} + +CExpandingWindow::CScopeInflate::~CScopeInflate() { + m_Window.deflate(m_Commit); } } } diff --git a/lib/maths/CTimeSeriesDecompositionDetail.cc b/lib/maths/CTimeSeriesDecompositionDetail.cc index 13e453c8c7..053bea1179 100644 --- a/lib/maths/CTimeSeriesDecompositionDetail.cc +++ b/lib/maths/CTimeSeriesDecompositionDetail.cc @@ -636,7 +636,7 @@ std::size_t CTimeSeriesDecompositionDetail::CPeriodicityTest::extraMemoryOnIniti static std::size_t result{0}; if (result == 0) { for (auto i : {E_Short, E_Long}) { - TExpandingWindowPtr window(this->newWindow(i)); + TExpandingWindowPtr window(this->newWindow(i, false)); result += core::CMemory::dynamicSize(window); } } @@ -719,18 +719,20 @@ bool CTimeSeriesDecompositionDetail::CPeriodicityTest::shouldTest(ETest test, return m_Windows[test] && (m_Windows[test]->needToCompress(time) || scheduledTest()); } -CExpandingWindow* CTimeSeriesDecompositionDetail::CPeriodicityTest::newWindow(ETest test) const { +CExpandingWindow* +CTimeSeriesDecompositionDetail::CPeriodicityTest::newWindow(ETest test, bool deflate) const { using TTimeCRng = CExpandingWindow::TTimeCRng; - auto newWindow = [this](const TTimeVec& bucketLengths) { + auto newWindow = [this, deflate](const TTimeVec& bucketLengths) { if (m_BucketLength <= bucketLengths.back()) { std::ptrdiff_t a{std::lower_bound(bucketLengths.begin(), bucketLengths.end(), m_BucketLength) - bucketLengths.begin()}; std::size_t b{bucketLengths.size()}; TTimeCRng bucketLengths_(bucketLengths, a, b); - return new CExpandingWindow(m_BucketLength, bucketLengths_, 336, m_DecayRate); + return new CExpandingWindow(m_BucketLength, bucketLengths_, 336, + m_DecayRate, deflate); } return static_cast(nullptr); }; diff --git a/lib/maths/unittest/CDecayRateControllerTest.cc b/lib/maths/unittest/CDecayRateControllerTest.cc index d6b2a5760e..316e27a18c 100644 --- a/lib/maths/unittest/CDecayRateControllerTest.cc +++ b/lib/maths/unittest/CDecayRateControllerTest.cc @@ -91,9 +91,9 @@ void CDecayRateControllerTest::testPersist() { inserter.toXml(origXml); } LOG_TRACE(<< "Controller XML = " << origXml); - LOG_TRACE(<< "Controller XML size = " << origXml.size()); + LOG_DEBUG(<< "Controller XML size = " << origXml.size()); - // Restore the XML into a new sketch. + // Restore the XML into a new controller. { core::CRapidXmlParser parser; CPPUNIT_ASSERT(parser.parseStringIgnoreCdata(origXml)); diff --git a/lib/maths/unittest/CExpandingWindowTest.cc b/lib/maths/unittest/CExpandingWindowTest.cc index 142dcb8ebd..850946f94d 100644 --- a/lib/maths/unittest/CExpandingWindowTest.cc +++ b/lib/maths/unittest/CExpandingWindowTest.cc @@ -34,55 +34,222 @@ using TFloatMeanAccumulatorVec = std::vector; TTimeVec BUCKET_LENGTHS{300, 600, 1800, 3600}; } -void CExpandingWindowTest::testPersistence() { - // Test persist and restore is idempotent. +void CExpandingWindowTest::testBasicUsage() { + // 1) Check compressed and uncompressed storage formats produce the + // same results. + // 2) Check multiple rounds of bucket compression work as expected. + // 3) Check extension beyond the end of the maximum window length + // works as expected. core_t::TTime bucketLength{300}; std::size_t size{336}; double decayRate{0.01}; test::CRandomNumbers rng; - - maths::CExpandingWindow origWindow{bucketLength, TTimeCRng{BUCKET_LENGTHS, 0, 4}, - size, decayRate}; - TDoubleVec values; rng.generateUniformSamples(0.0, 10.0, size, values); + + for (auto startTime : {0, 100000}) { + LOG_DEBUG(<< "Testing start time " << startTime); + + maths::CExpandingWindow compressed{ + bucketLength, TTimeCRng{BUCKET_LENGTHS, 0, 4}, size, decayRate, true}; + maths::CExpandingWindow uncompressed{ + bucketLength, TTimeCRng{BUCKET_LENGTHS, 0, 4}, size, decayRate, false}; + TFloatMeanAccumulatorVec expected300(size); + TFloatMeanAccumulatorVec expected1800(size); + + compressed.initialize(startTime); + uncompressed.initialize(startTime); + + for (core_t::TTime time = startTime; + time < static_cast(size) * bucketLength; time += bucketLength) { + double value{values[(time - startTime) / bucketLength]}; + compressed.add(time, value); + uncompressed.add(time, value); + expected300[(time - startTime) / 300].add(value); + expected1800[(time - startTime) / 1800].add(value); + if (((time - startTime) / bucketLength) % 3 == 0) { + CPPUNIT_ASSERT_EQUAL(uncompressed.checksum(), compressed.checksum()); + } + } + CPPUNIT_ASSERT_EQUAL(core::CContainerPrinter::print(expected300), + core::CContainerPrinter::print(uncompressed.values())); + CPPUNIT_ASSERT_EQUAL(core::CContainerPrinter::print(expected300), + core::CContainerPrinter::print(compressed.values())); + + core_t::TTime time{startTime + static_cast(600 * size + 1)}; + compressed.add(time, 5.0, 0.9); + uncompressed.add(time, 5.0, 0.9); + expected1800[(time - startTime) / 1800].add(5.0, 0.9); + + CPPUNIT_ASSERT_EQUAL(core::CContainerPrinter::print(expected1800), + core::CContainerPrinter::print(uncompressed.values())); + CPPUNIT_ASSERT_EQUAL(core::CContainerPrinter::print(expected1800), + core::CContainerPrinter::print(compressed.values())); + } + + LOG_DEBUG(<< "Testing multiple rounds of compression"); + + TDoubleVec times; + rng.generateUniformSamples(0.0, static_cast(3600 * size), 1000, times); + rng.generateUniformSamples(0.0, 10.0, 1000, values); + std::sort(times.begin(), times.end()); + + maths::CExpandingWindow window{bucketLength, TTimeCRng{BUCKET_LENGTHS, 0, 4}, + size, decayRate, true}; + TFloatMeanAccumulatorVec expected300(size); + TFloatMeanAccumulatorVec expected600(size); + TFloatMeanAccumulatorVec expected1800(size); + TFloatMeanAccumulatorVec expected3600(size); + + window.initialize(0); + + for (std::size_t i = 0; i < times.size(); ++i) { + core_t::TTime time{static_cast(times[i])}; + LOG_TRACE(<< "Adding " << values[i] << " at " << time); + + window.add(time, values[i]); + window.propagateForwardsByTime(1.0); + + expected300[(time / 300) % size].add(values[i]); + expected600[(time / 600) % size].add(values[i]); + expected1800[(time / 1800) % size].add(values[i]); + expected3600[(time / 3600) % size].add(values[i]); + double factor{std::exp(-decayRate)}; + for (std::size_t j = 0; j < size; ++j) { + expected300[j].age(factor); + expected600[j].age(factor); + expected1800[j].age(factor); + expected3600[j].age(factor); + } + + const TFloatMeanAccumulatorVec& expected{[&](core_t::TTime) -> const TFloatMeanAccumulatorVec& { + if (time < static_cast(size * 300)) { + return expected300; + } else if (time < static_cast(size * 600)) { + return expected600; + } else if (time < static_cast(size * 1800)) { + return expected1800; + } + return expected3600; + }(time)}; + TFloatMeanAccumulatorVec actual{window.values()}; + + for (std::size_t j = 0; j < size; ++j) { + CPPUNIT_ASSERT_DOUBLES_EQUAL(maths::CBasicStatistics::count(expected[j]), + maths::CBasicStatistics::count(actual[j]), 1e-5); + CPPUNIT_ASSERT_DOUBLES_EQUAL(maths::CBasicStatistics::mean(expected[j]), + maths::CBasicStatistics::mean(actual[j]), 1e-5); + } + } + + LOG_DEBUG(<< "Testing overflow"); + + window.add(static_cast(size * 3600 + 1), 0.1); + CPPUNIT_ASSERT_EQUAL(static_cast(size * 3600), window.startTime()); + CPPUNIT_ASSERT_EQUAL(static_cast(size * 3900), window.endTime()); + + TFloatMeanAccumulatorVec expected(size); + expected[0].add(0.1); + TFloatMeanAccumulatorVec actual{window.values()}; + + CPPUNIT_ASSERT_EQUAL(core::CContainerPrinter::print(expected), + core::CContainerPrinter::print(actual)); +} + +void CExpandingWindowTest::testValuesMinusPrediction() { + // Test we get back the values we expect. + + maths::CExpandingWindow::TPredictor trend = [](core_t::TTime time) { + return 10.0 * std::sin(boost::math::double_constants::two_pi * + static_cast(time) / + static_cast(core::constants::DAY)); + }; + + core_t::TTime bucketLength{300}; + std::size_t size{336}; + double decayRate{0.01}; + + test::CRandomNumbers rng; + TDoubleVec noise; + rng.generateUniformSamples(0.0, 2.0, size, noise); + + maths::CExpandingWindow window{bucketLength, TTimeCRng{BUCKET_LENGTHS, 0, 4}, + size, decayRate, true}; + + TFloatMeanAccumulatorVec expected(size); for (core_t::TTime time = 0; time < static_cast(size) * bucketLength; time += bucketLength) { - double value{values[time / bucketLength]}; - origWindow.add(time, value); + window.add(time, trend(time) + noise[time / bucketLength]); + expected[time / bucketLength].add(noise[time / bucketLength]); } + TFloatMeanAccumulatorVec actual{window.valuesMinusPrediction(trend)}; - std::string origXml; - { - core::CRapidXmlStatePersistInserter inserter("root"); - origWindow.acceptPersistInserter(inserter); - inserter.toXml(origXml); + for (std::size_t i = 0; i < size; ++i) { + CPPUNIT_ASSERT_EQUAL(maths::CBasicStatistics::count(expected[i]), + maths::CBasicStatistics::count(actual[i])); + CPPUNIT_ASSERT_DOUBLES_EQUAL(maths::CBasicStatistics::mean(expected[i]), + maths::CBasicStatistics::mean(actual[i]), 1e-5); } - LOG_TRACE(<< "Window XML = " << origXml); - LOG_DEBUG(<< "Window XML size = " << origXml.size()); - - // Restore the XML into a new window. - { - core::CRapidXmlParser parser; - CPPUNIT_ASSERT(parser.parseStringIgnoreCdata(origXml)); - core::CRapidXmlStateRestoreTraverser traverser(parser); - maths::CExpandingWindow restoredWindow{ - bucketLength, TTimeCRng{BUCKET_LENGTHS, 0, 4}, size, decayRate}; - CPPUNIT_ASSERT_EQUAL( - true, traverser.traverseSubLevel(boost::bind(&maths::CExpandingWindow::acceptRestoreTraverser, - &restoredWindow, _1))); - - LOG_DEBUG(<< "orig checksum = " << origWindow.checksum() - << ", new checksum = " << restoredWindow.checksum()); - CPPUNIT_ASSERT_EQUAL(origWindow.checksum(), restoredWindow.checksum()); +} + +void CExpandingWindowTest::testPersistence() { + // Test persist and restore is idempotent. + + core_t::TTime bucketLength{300}; + std::size_t size{336}; + double decayRate{0.01}; + + test::CRandomNumbers rng; + + for (auto compressed : {true, false}) { + maths::CExpandingWindow origWindow{ + bucketLength, TTimeCRng{BUCKET_LENGTHS, 0, 4}, size, decayRate, compressed}; + + TDoubleVec values; + rng.generateUniformSamples(0.0, 10.0, size, values); + for (core_t::TTime time = 0; time < static_cast(size) * bucketLength; + time += bucketLength) { + double value{values[time / bucketLength]}; + origWindow.add(time, value); + } + + std::string origXml; + { + core::CRapidXmlStatePersistInserter inserter("root"); + origWindow.acceptPersistInserter(inserter); + inserter.toXml(origXml); + } + LOG_TRACE(<< "Window XML = " << origXml); + LOG_DEBUG(<< "Window XML size = " << origXml.size()); + + // Restore the XML into a new window. + { + core::CRapidXmlParser parser; + CPPUNIT_ASSERT(parser.parseStringIgnoreCdata(origXml)); + core::CRapidXmlStateRestoreTraverser traverser(parser); + maths::CExpandingWindow restoredWindow{ + bucketLength, TTimeCRng{BUCKET_LENGTHS, 0, 4}, size, decayRate, compressed}; + CPPUNIT_ASSERT_EQUAL(true, traverser.traverseSubLevel(boost::bind( + &maths::CExpandingWindow::acceptRestoreTraverser, + &restoredWindow, _1))); + + LOG_DEBUG(<< "orig checksum = " << origWindow.checksum() + << ", new checksum = " << restoredWindow.checksum()); + CPPUNIT_ASSERT_EQUAL(origWindow.checksum(), restoredWindow.checksum()); + } } } CppUnit::Test* CExpandingWindowTest::suite() { CppUnit::TestSuite* suiteOfTests = new CppUnit::TestSuite("CExpandingWindowTest"); + suiteOfTests->addTest(new CppUnit::TestCaller( + "CExpandingWindowTest::testBasicUsage", &CExpandingWindowTest::testBasicUsage)); + suiteOfTests->addTest(new CppUnit::TestCaller( + "CExpandingWindowTest::testValuesMinusPrediction", + &CExpandingWindowTest::testValuesMinusPrediction)); suiteOfTests->addTest(new CppUnit::TestCaller( "CExpandingWindowTest::testPersistence", &CExpandingWindowTest::testPersistence)); diff --git a/lib/maths/unittest/CExpandingWindowTest.h b/lib/maths/unittest/CExpandingWindowTest.h index b7053c5dea..d5d2da67ca 100644 --- a/lib/maths/unittest/CExpandingWindowTest.h +++ b/lib/maths/unittest/CExpandingWindowTest.h @@ -11,6 +11,8 @@ class CExpandingWindowTest : public CppUnit::TestFixture { public: + void testBasicUsage(); + void testValuesMinusPrediction(); void testPersistence(); static CppUnit::Test* suite(); diff --git a/lib/maths/unittest/Main.cc b/lib/maths/unittest/Main.cc index 4c39742277..0ae687eea6 100644 --- a/lib/maths/unittest/Main.cc +++ b/lib/maths/unittest/Main.cc @@ -85,7 +85,6 @@ int main(int argc, const char** argv) { ml::test::CTestRunner runner(argc, argv); - runner.addTest(CExpandingWindowTest::suite()); runner.addTest(CAgglomerativeClustererTest::suite()); runner.addTest(CAssignmentTest::suite()); runner.addTest(CBasicStatisticsTest::suite()); @@ -101,6 +100,7 @@ int main(int argc, const char** argv) { runner.addTest(CDecayRateControllerTest::suite()); runner.addTest(CEqualWithToleranceTest::suite()); runner.addTest(CEntropySketchTest::suite()); + runner.addTest(CExpandingWindowTest::suite()); runner.addTest(CForecastTest::suite()); runner.addTest(CGammaRateConjugateTest::suite()); runner.addTest(CGramSchmidtTest::suite()); diff --git a/lib/model/CEventRateBucketGatherer.cc b/lib/model/CEventRateBucketGatherer.cc index 80ecf44a92..e3a9285296 100644 --- a/lib/model/CEventRateBucketGatherer.cc +++ b/lib/model/CEventRateBucketGatherer.cc @@ -6,11 +6,11 @@ #include -#include #include #include #include #include +#include #include #include @@ -1758,7 +1758,7 @@ void CUniqueStringFeatureData::populateInfoContentFeatureData(SEventRateFeatureD using TStrCRefVec = std::vector; featureData.s_InfluenceValues.clear(); - core::CCompressUtils compressor(true); + core::CDeflator compressor(true); try { TStrCRefVec strings; @@ -1773,7 +1773,7 @@ void CUniqueStringFeatureData::populateInfoContentFeatureData(SEventRateFeatureD }); std::size_t length = 0u; - if (compressor.compressedLength(true, length) == false) { + if (compressor.length(true, length) == false) { LOG_ERROR(<< "Failed to get compressed length"); compressor.reset(); } @@ -1796,7 +1796,7 @@ void CUniqueStringFeatureData::populateInfoContentFeatureData(SEventRateFeatureD compressor.addString(string); }); length = 0u; - if (compressor.compressedLength(true, length) == false) { + if (compressor.length(true, length) == false) { LOG_ERROR(<< "Failed to get compressed length"); compressor.reset(); } diff --git a/lib/model/unittest/CEventRatePopulationDataGathererTest.cc b/lib/model/unittest/CEventRatePopulationDataGathererTest.cc index 93aa099815..98035f4c4a 100644 --- a/lib/model/unittest/CEventRatePopulationDataGathererTest.cc +++ b/lib/model/unittest/CEventRatePopulationDataGathererTest.cc @@ -6,12 +6,12 @@ #include "CEventRatePopulationDataGathererTest.h" -#include #include #include #include #include #include +#include #include #include @@ -520,14 +520,14 @@ void CEventRatePopulationDataGathererTest::testCompressedLength() { TSizeSizePr key(iter->first, 0); const TStrSet& uniqueValues = iter->second; - core::CCompressUtils compressor(false); + core::CDeflator compressor(false); CPPUNIT_ASSERT_EQUAL( uniqueValues.size(), static_cast(std::count_if( uniqueValues.begin(), uniqueValues.end(), - boost::bind(&core::CCompressUtils::addString, &compressor, _1)))); + boost::bind(&core::CCompressUtil::addString, &compressor, _1)))); size_t length(0); - CPPUNIT_ASSERT(compressor.compressedLength(true, length)); + CPPUNIT_ASSERT(compressor.length(true, length)); expectedBucketCompressedLengthPerPerson[key] = length; } LOG_DEBUG(<< "Time " << time << " bucketCompressedLengthPerPerson "