From 9b96b4da28f12d76e4812c436e974f542d19812d Mon Sep 17 00:00:00 2001 From: Ivano Pagano Date: Mon, 7 Sep 2020 16:34:23 +0200 Subject: [PATCH 1/7] Remove compilation warnings for test deprecation and trivial correction --- .../data/tezos/TezosDataOperationsTest.scala | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/data/tezos/TezosDataOperationsTest.scala b/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/data/tezos/TezosDataOperationsTest.scala index 323c99e87..53a410675 100644 --- a/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/data/tezos/TezosDataOperationsTest.scala +++ b/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/data/tezos/TezosDataOperationsTest.scala @@ -1690,7 +1690,7 @@ class TezosDataOperationsTest ) } - "should aggregate with COUNT function" in { + "aggregate with COUNT function" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind", forkId = Fork.mainForkId), @@ -1725,7 +1725,7 @@ class TezosDataOperationsTest ) } - "should aggregate with MAX function" in { + "aggregate with MAX function" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind", forkId = Fork.mainForkId), @@ -1760,7 +1760,7 @@ class TezosDataOperationsTest ) } - "should aggregate with MIN function" in { + "aggregate with MIN function" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind", forkId = Fork.mainForkId), @@ -1795,7 +1795,7 @@ class TezosDataOperationsTest ) } - "should aggregate with SUM function" in { + "aggregate with SUM function" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind", forkId = Fork.mainForkId), @@ -1830,7 +1830,7 @@ class TezosDataOperationsTest ) } - "should aggregate with SUM function and order by SUM()" in { + "aggregate with SUM function and order by SUM()" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind", forkId = Fork.mainForkId), @@ -1865,7 +1865,7 @@ class TezosDataOperationsTest ) } - "should order correctly by the field not existing in query)" in { + "order correctly by the field not existing in query)" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind", forkId = Fork.mainForkId), @@ -1897,7 +1897,7 @@ class TezosDataOperationsTest ) } - "should correctly check use between in the timestamps" in { + "correctly check use between in the timestamps" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(2), "kind", forkId = Fork.mainForkId), @@ -1936,7 +1936,7 @@ class TezosDataOperationsTest ) } - "should correctly execute BETWEEN operation using numeric comparison instead of lexicographical" in { + "correctly execute BETWEEN operation using numeric comparison instead of lexicographical" in { val feesTmp = List( FeesRow(0, 0, 0, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 0, 10, new Timestamp(3), "kind", forkId = Fork.mainForkId), @@ -1971,7 +1971,7 @@ class TezosDataOperationsTest result shouldBe List(Map("high" -> Some(2))) } - "should return correct query when asked for SQL" in { + "return correct query when asked for SQL" in { val predicates = List( Predicate( field = "medium", @@ -2022,7 +2022,7 @@ class TezosDataOperationsTest result shouldBe List(Map("sql" -> Some(expectedQuery))) } - "should aggregate with multiple aggregations on the same field" in { + "aggregate with multiple aggregations on the same field" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind", forkId = Fork.mainForkId), @@ -2058,7 +2058,7 @@ class TezosDataOperationsTest ) } - "should aggregate with single aggegation when there is only one field" in { + "aggregate with single aggegation when there is only one field" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind", forkId = Fork.mainForkId), @@ -2107,7 +2107,7 @@ class TezosDataOperationsTest ) } - "should aggregate with correct predicate field when aggregation is using predicate" in { + "aggregate with correct predicate field when aggregation is using predicate" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind1", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind2", forkId = Fork.mainForkId), @@ -2164,7 +2164,7 @@ class TezosDataOperationsTest ) } - "should aggregate correctly with multiple aggregation fields with predicate" in { + "aggregate correctly with multiple aggregation fields with predicate" in { val feesTmp = List( FeesRow(0, 2, 4, new Timestamp(0), "kind1", forkId = Fork.mainForkId), FeesRow(0, 4, 8, new Timestamp(1), "kind2", forkId = Fork.mainForkId), @@ -2239,14 +2239,14 @@ class TezosDataOperationsTest ) } - "should aggregate with datePart aggregation" in { + "aggregate with datePart aggregation" in { val feesTmp = List( - FeesRow(0, 2, 4, new Timestamp(100, 0, 1, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 4, 8, new Timestamp(100, 0, 2, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 3, 4, new Timestamp(100, 0, 2, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 3, 4, new Timestamp(100, 0, 2, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 3, 4, new Timestamp(100, 0, 3, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 3, 4, new Timestamp(100, 0, 3, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId) + FeesRow(0, 2, 4, Timestamp.valueOf("2000-01-01 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 4, 8, Timestamp.valueOf("2000-01-02 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind", forkId = Fork.mainForkId)) ) val aggregate = List( @@ -2279,14 +2279,14 @@ class TezosDataOperationsTest } - "should map date with datePart aggregation when it is only type of aggregation" in { + "map date with datePart aggregation when it is only type of aggregation" in { val feesTmp = List( - FeesRow(0, 1, 4, new Timestamp(100, 0, 1, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 2, 8, new Timestamp(100, 0, 2, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 3, 4, new Timestamp(100, 0, 2, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 4, 4, new Timestamp(100, 0, 2, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 5, 4, new Timestamp(100, 0, 3, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId), - FeesRow(0, 6, 4, new Timestamp(100, 0, 3, 0, 0, 0, 0), "kind", forkId = Fork.mainForkId) + FeesRow(0, 1, 4, Timestamp.valueOf("2000-01-01 00:00:00"), "kind",forkId = Fork.mainForkId), + FeesRow(0, 2, 8, Timestamp.valueOf("2000-01-02 00:00:00"), "kind",forkId = Fork.mainForkId), + FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind",forkId = Fork.mainForkId), + FeesRow(0, 4, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind",forkId = Fork.mainForkId), + FeesRow(0, 5, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind",forkId = Fork.mainForkId), + FeesRow(0, 6, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind",forkId = Fork.mainForkId) ) val populateAndTest = for { @@ -2317,7 +2317,7 @@ class TezosDataOperationsTest ) } - "should correctly use query on temporal table" in { + "correctly use query on temporal table" in { val accountsHistoryRow = AccountsHistoryRow( accountId = "id", @@ -2357,7 +2357,7 @@ class TezosDataOperationsTest } - "should get the balance of an account at a specific timestamp where there are multiple entities for given account_id" in { + "get the balance of an account at a specific timestamp where there are multiple entities for given account_id" in { val accountsHistoryRows = List( AccountsHistoryRow( @@ -2414,7 +2414,7 @@ class TezosDataOperationsTest ) } - "should get the balance of an account at a specific timestamp" in { + "get the balance of an account at a specific timestamp" in { val accountsHistoryRows = List( AccountsHistoryRow( From 1b95a3128b82b08964b8d81e2b83948f68f59a73 Mon Sep 17 00:00:00 2001 From: Ivano Pagano Date: Mon, 7 Sep 2020 16:35:11 +0200 Subject: [PATCH 2/7] Add a lower bound on fees' timestamps in the computation query --- .../common/testkit/util/RandomTestKit.scala | 2 +- .../tezos/TezosDatabaseOperations.scala | 39 +++++++++++++++++-- .../tezos/TezosDatabaseOperationsTest.scala | 12 ++++-- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/conseil-common-testkit/src/main/scala/tech/cryptonomic/conseil/common/testkit/util/RandomTestKit.scala b/conseil-common-testkit/src/main/scala/tech/cryptonomic/conseil/common/testkit/util/RandomTestKit.scala index 5c6b00b4b..d88215bf8 100644 --- a/conseil-common-testkit/src/main/scala/tech/cryptonomic/conseil/common/testkit/util/RandomTestKit.scala +++ b/conseil-common-testkit/src/main/scala/tech/cryptonomic/conseil/common/testkit/util/RandomTestKit.scala @@ -36,7 +36,7 @@ trait RandomGenerationKit { //a stable timestamp reference if needed lazy val testReferenceTimestamp = - new Timestamp(testReferenceDateTime.toEpochSecond) + new Timestamp(testReferenceDateTime.toEpochSecond * 1000L) //creates pseudo-random strings of given length, based on an existing [[Random]] generator val alphaNumericGenerator = (random: Random) => random.alphanumeric.take(_: Int).mkString diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala index d4e43b8b8..bf4a65809 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala @@ -1,7 +1,7 @@ package tech.cryptonomic.conseil.indexer.tezos import java.sql.Timestamp -import java.time.Instant +import java.time.{Instant, ZoneOffset} import cats.effect.Async import cats.implicits._ @@ -722,11 +722,41 @@ object TezosDatabaseOperations extends LazyLogging { * Given the operation kind, return range of fees and timestamp for that operation. * @param kind Operation kind * @param numberOfFeesAveraged How many values to use for statistics computations + * @param asOf When the computation is to be considered, by default uses the time of invocation * @return The average fees for a given operation kind, if it exists */ - def calculateAverageFees(kind: String, numberOfFeesAveraged: Int)( + def calculateAverageFees( + kind: String, + numberOfFeesAveraged: Int, + asOf: Instant = Instant.now() + )( implicit ec: ExecutionContext ): DBIO[Option[AverageFees]] = { + /* We need to limit the past timestamps for this computation to a reasonable value. + * Otherwise the query optimizer won't be able to efficiently use the indexing and + * will do a full table scan. + * + * This is what we know now: + * - each cycle bakes 4096 blocks + * - a cycle takes around 2-3 days to run + * - each block stores a variable number of transactions, down-to the min of 1. + * + * We can make conservative computations to figure out how far in the past we need to go, + * to guarantee a [[numberOfFeesAveraged]] values. + * + * We can assume a single transaction per block (1 trans/block), hence we need numberOfFeesAveraged blocks. + * Assuming 3 days to get 4096 blocks we have 4096/3 blocks-a-day at worst, which is ~1365 blocks. + * Therefore we want to look back to numberOfFeesAveraged/1365 days in the past to guarantee the required fees counts. + */ + val blocksPerDay = 1365 + val daysToPastHorizon = 1 + (numberOfFeesAveraged / blocksPerDay) //round-up for integer division + val secsPerDay = 60L * 60L * 24L //secs * mins * hours + val secsToPastHorizon = daysToPastHorizon.toLong * secsPerDay + + logger.info( + s"Computing fees starting from $daysToPastHorizon days before $asOf, averaging over $numberOfFeesAveraged values" + ) + type Cycle = Int type Fee = BigDecimal type FeeDetails = (Option[Fee], Timestamp, Option[Cycle], BlockLevel) @@ -746,11 +776,14 @@ object TezosDatabaseOperations extends LazyLogging { AverageFees(max(m - s, 0), m, m + s, ts, kind, cycle, level) } + val timestampLowerBound = + Timestamp.from(asOf.atOffset(ZoneOffset.UTC).minusSeconds(secsToPastHorizon).toInstant()) + val opQuery = Tables.Operations .filter(op => op.kind === kind && op.invalidatedAsof.isEmpty) + .filter(_.timestamp >= timestampLowerBound) .map(o => (o.fee, o.timestamp, o.cycle, o.blockLevel)) - .distinct .sortBy { case (_, ts, _, _) => ts.desc } .take(numberOfFeesAveraged) .result diff --git a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala index dfde812ac..bf0a2fc65 100644 --- a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala +++ b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala @@ -968,7 +968,9 @@ class TezosDatabaseOperationsTest ) //check - val feesCalculation = sut.calculateAverageFees(ops.head.kind, feesToConsider) + //we specify when the computation of fees needs be done, to have the test block reference time in range + val feesCalculation = + sut.calculateAverageFees(ops.head.kind, feesToConsider, asOf = testReferenceDateTime.toInstant()) dbHandler.run(feesCalculation).futureValue.value shouldEqual expected @@ -992,7 +994,9 @@ class TezosDatabaseOperationsTest dbHandler.run(populate).futureValue should have size (fees.size) //check - val feesCalculation = sut.calculateAverageFees("undefined", feesToConsider) + //we specify when the computation of fees needs be done, to have the test block reference time in range + val feesCalculation = + sut.calculateAverageFees("undefined", feesToConsider, asOf = testReferenceDateTime.toInstant()) dbHandler.run(feesCalculation).futureValue shouldBe None @@ -1038,7 +1042,9 @@ class TezosDatabaseOperationsTest level = 0 ) //check - val feesCalculation = sut.calculateAverageFees(selection.head.kind, feesToConsider) + //we specify when the computation of fees needs be done, to have the test block reference time in range + val feesCalculation = + sut.calculateAverageFees(selection.head.kind, feesToConsider, asOf = testReferenceDateTime.toInstant()) dbHandler.run(feesCalculation).futureValue.value shouldEqual expected From 66332f3ae1ba51678262d266bf82d81ad35b9f08 Mon Sep 17 00:00:00 2001 From: Ivano Pagano Date: Wed, 7 Oct 2020 17:25:00 +0200 Subject: [PATCH 3/7] Fix rebase error --- .../data/tezos/TezosDataOperationsTest.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/data/tezos/TezosDataOperationsTest.scala b/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/data/tezos/TezosDataOperationsTest.scala index 53a410675..b28deeda3 100644 --- a/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/data/tezos/TezosDataOperationsTest.scala +++ b/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/data/tezos/TezosDataOperationsTest.scala @@ -2246,7 +2246,7 @@ class TezosDataOperationsTest FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind", forkId = Fork.mainForkId), FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind", forkId = Fork.mainForkId), FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind", forkId = Fork.mainForkId), - FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind", forkId = Fork.mainForkId)) + FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind", forkId = Fork.mainForkId) ) val aggregate = List( @@ -2281,12 +2281,12 @@ class TezosDataOperationsTest "map date with datePart aggregation when it is only type of aggregation" in { val feesTmp = List( - FeesRow(0, 1, 4, Timestamp.valueOf("2000-01-01 00:00:00"), "kind",forkId = Fork.mainForkId), - FeesRow(0, 2, 8, Timestamp.valueOf("2000-01-02 00:00:00"), "kind",forkId = Fork.mainForkId), - FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind",forkId = Fork.mainForkId), - FeesRow(0, 4, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind",forkId = Fork.mainForkId), - FeesRow(0, 5, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind",forkId = Fork.mainForkId), - FeesRow(0, 6, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind",forkId = Fork.mainForkId) + FeesRow(0, 1, 4, Timestamp.valueOf("2000-01-01 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 2, 8, Timestamp.valueOf("2000-01-02 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 3, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 4, 4, Timestamp.valueOf("2000-01-02 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 5, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind", forkId = Fork.mainForkId), + FeesRow(0, 6, 4, Timestamp.valueOf("2000-01-03 00:00:00"), "kind", forkId = Fork.mainForkId) ) val populateAndTest = for { From a84258eb4db741c1a1a0b207743e7f4137612d26 Mon Sep 17 00:00:00 2001 From: Ivano Pagano Date: Wed, 7 Oct 2020 19:08:53 +0200 Subject: [PATCH 4/7] Define fees averaging in term of configured time-window * update the tests * fix a flaky test --- .../src/main/resources/application.conf | 4 +-- .../indexer/config/LorreConfiguration.scala | 2 +- .../tezos/TezosDatabaseOperations.scala | 33 +++++-------------- .../indexer/tezos/TezosFeeOperations.scala | 18 +++++++--- .../conseil/indexer/tezos/TezosIndexer.scala | 2 +- .../tezos/TezosDatabaseOperationsTest.scala | 12 ++++--- .../TezosForkDatabaseOperationsTest.scala | 19 ++++++----- 7 files changed, 45 insertions(+), 45 deletions(-) diff --git a/conseil-lorre/src/main/resources/application.conf b/conseil-lorre/src/main/resources/application.conf index c6cc43867..cc3fc3a9c 100644 --- a/conseil-lorre/src/main/resources/application.conf +++ b/conseil-lorre/src/main/resources/application.conf @@ -4,8 +4,8 @@ lorre { bootup-connection-check-timeout: 10 s #Used to make sure Lorre records average fees every n iterations fee-update-interval: 20 - #Used to select how many fees should be averaged together - number-of-fees-averaged: 1000 + #Used to select how many fees should be averaged together, based on a time-window + fees-average-time-window: 1 day #Docs missing depth: newest depth: ${?CONSEIL_LORRE_DEPTH} diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/config/LorreConfiguration.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/config/LorreConfiguration.scala index 22657ab9f..b093b051a 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/config/LorreConfiguration.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/config/LorreConfiguration.scala @@ -17,7 +17,7 @@ final case class LorreConfiguration( bootupRetryInterval: FiniteDuration, bootupConnectionCheckTimeout: FiniteDuration, feeUpdateInterval: Int, - numberOfFeesAveraged: Int, + feesAverageTimeWindow: FiniteDuration, depth: Depth, headHash: Option[String], chainEvents: List[ChainEvent], diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala index bf4a65809..8708ad186 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala @@ -718,16 +718,16 @@ object TezosDatabaseOperations extends LazyLogging { Tables.Bakers.insertOrUpdateAll(bakers) } - /** - * Given the operation kind, return range of fees and timestamp for that operation. - * @param kind Operation kind - * @param numberOfFeesAveraged How many values to use for statistics computations - * @param asOf When the computation is to be considered, by default uses the time of invocation - * @return The average fees for a given operation kind, if it exists + /** Given the operation kind, return range of fees and timestamp for that operation. + * + * @param kind operation kind + * @param daysPast how many values to use for statistics computations, as a time-window + * @param asOf when the computation is to be considered, by default uses the time of invocation + * @return the average fees for a given operation kind, if it exists */ def calculateAverageFees( kind: String, - numberOfFeesAveraged: Int, + daysPast: Int, asOf: Instant = Instant.now() )( implicit ec: ExecutionContext @@ -735,26 +735,12 @@ object TezosDatabaseOperations extends LazyLogging { /* We need to limit the past timestamps for this computation to a reasonable value. * Otherwise the query optimizer won't be able to efficiently use the indexing and * will do a full table scan. - * - * This is what we know now: - * - each cycle bakes 4096 blocks - * - a cycle takes around 2-3 days to run - * - each block stores a variable number of transactions, down-to the min of 1. - * - * We can make conservative computations to figure out how far in the past we need to go, - * to guarantee a [[numberOfFeesAveraged]] values. - * - * We can assume a single transaction per block (1 trans/block), hence we need numberOfFeesAveraged blocks. - * Assuming 3 days to get 4096 blocks we have 4096/3 blocks-a-day at worst, which is ~1365 blocks. - * Therefore we want to look back to numberOfFeesAveraged/1365 days in the past to guarantee the required fees counts. */ - val blocksPerDay = 1365 - val daysToPastHorizon = 1 + (numberOfFeesAveraged / blocksPerDay) //round-up for integer division val secsPerDay = 60L * 60L * 24L //secs * mins * hours - val secsToPastHorizon = daysToPastHorizon.toLong * secsPerDay + val secsToPastHorizon = daysPast * secsPerDay logger.info( - s"Computing fees starting from $daysToPastHorizon days before $asOf, averaging over $numberOfFeesAveraged values" + s"Computing fees starting from $daysPast days before $asOf, averaging over all values in the range" ) type Cycle = Int @@ -785,7 +771,6 @@ object TezosDatabaseOperations extends LazyLogging { .filter(_.timestamp >= timestampLowerBound) .map(o => (o.fee, o.timestamp, o.cycle, o.blockLevel)) .sortBy { case (_, ts, _, _) => ts.desc } - .take(numberOfFeesAveraged) .result opQuery.map { timestampedFees => diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala index c900bfaa3..8ae43c5a6 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala @@ -6,6 +6,7 @@ import tech.cryptonomic.conseil.indexer.tezos.{TezosDatabaseOperations => TezosD import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} +import scala.concurrent.duration.FiniteDuration /** * Helper classes and functions used for average fee calculations. @@ -26,16 +27,23 @@ private[tezos] object TezosFeeOperations extends LazyLogging { "endorsement" ) - /** - * Calculates average fees for each operation kind and stores them into a fees table. + /** Calculates average fees for each operation kind and stores them into a fees table. + * The computation will use a limited number of fees, as the result of a selection window in days. + * Only blocks belonging within such window in the past, relative to the calculation moment, will be considered. + * * @param numberOfFeesAveraged a limit on how many of the latest fee values will be used for averaging - * @param ex the needed ExecutionContext to combine multiple database operations + * @param selectionWindow the max number of days back from the current block timestamp to use when averaging + * @param ec the needed ExecutionContext to combine multiple database operations * @return a future result of the number of rows stored to db, if supported by the driver */ - def processTezosAverageFees(numberOfFeesAveraged: Int)(implicit ex: ExecutionContext): Future[Option[Int]] = { + def processTezosAverageFees(selectionWindow: FiniteDuration)(implicit ec: ExecutionContext): Future[Option[Int]] = { logger.info("Processing latest Tezos fee data...") + + //partially apply the fixed window to get a function that only uses the kind + val calculateForWindow = TezosDb.calculateAverageFees(_: String, daysPast = selectionWindow.toDays.toInt) + val computeAndStore = for { - fees <- DBIOAction.sequence(operationKinds.map(TezosDb.calculateAverageFees(_, numberOfFeesAveraged))) + fees <- DBIOAction.sequence(operationKinds.map(calculateForWindow)) dbWrites <- TezosDb.writeFees(fees.collect { case Some(fee) => fee }) } yield dbWrites diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala index 37b855676..4e3570050 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala @@ -100,7 +100,7 @@ class TezosIndexer private ( ) _ <- processTezosBlocks(maxLevel) _ <- if (iteration % lorreConf.feeUpdateInterval == 0) - TezosFeeOperations.processTezosAverageFees(lorreConf.numberOfFeesAveraged) + TezosFeeOperations.processTezosAverageFees(lorreConf.feesAverageTimeWindow) else noOp _ <- rightsProcessor.updateRightsTimestamps() diff --git a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala index bf0a2fc65..a67381279 100644 --- a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala +++ b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala @@ -44,7 +44,7 @@ class TezosDatabaseOperationsTest val sut = TezosDatabaseOperations val indexed = new TezosIndexedDataOperations(dbHandler) - val feesToConsider = 1000 + val feesSelectionWindowInDays = 100 "use the right collation" in { val ordered = @@ -970,7 +970,7 @@ class TezosDatabaseOperationsTest //check //we specify when the computation of fees needs be done, to have the test block reference time in range val feesCalculation = - sut.calculateAverageFees(ops.head.kind, feesToConsider, asOf = testReferenceDateTime.toInstant()) + sut.calculateAverageFees(ops.head.kind, feesSelectionWindowInDays, asOf = testReferenceDateTime.toInstant()) dbHandler.run(feesCalculation).futureValue.value shouldEqual expected @@ -996,7 +996,7 @@ class TezosDatabaseOperationsTest //check //we specify when the computation of fees needs be done, to have the test block reference time in range val feesCalculation = - sut.calculateAverageFees("undefined", feesToConsider, asOf = testReferenceDateTime.toInstant()) + sut.calculateAverageFees("undefined", feesSelectionWindowInDays, asOf = testReferenceDateTime.toInstant()) dbHandler.run(feesCalculation).futureValue shouldBe None @@ -1044,7 +1044,11 @@ class TezosDatabaseOperationsTest //check //we specify when the computation of fees needs be done, to have the test block reference time in range val feesCalculation = - sut.calculateAverageFees(selection.head.kind, feesToConsider, asOf = testReferenceDateTime.toInstant()) + sut.calculateAverageFees( + selection.head.kind, + feesSelectionWindowInDays, + asOf = testReferenceDateTime.toInstant() + ) dbHandler.run(feesCalculation).futureValue.value shouldEqual expected diff --git a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala index 78935609a..1d558b3dc 100644 --- a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala +++ b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala @@ -527,7 +527,7 @@ class TezosForkDatabaseOperationsTest _ <- Tables.Blocks += invalidBlock _ <- Tables.OperationGroups += invalidGroup Some(stored) <- Tables.Operations ++= invalidRows - loaded <- sut.calculateAverageFees(kind = "transaction", numberOfFeesAveraged = invalidRows.size) + loaded <- sut.calculateAverageFees(kind = "transaction", daysPast = 10) } yield (stored, loaded) val (stored, loaded) = dbHandler.run(populateAndFetch).futureValue @@ -1400,8 +1400,7 @@ class TezosForkDatabaseOperationsTest val (validRows, forkLevel, invalidation, fork) = { val generator = for { //duplicate ids will fail to save on the db for violation of the PK uniqueness - rows <- Gen - .nonEmptyListOf(arbitrary[DBSafe[ProcessedChainEventsRow]]) + rows <- nonConflictingArbitrary[DBSafe[ProcessedChainEventsRow]] level <- Gen.posNum[Long] instant <- TezosDataGenerationKit.instantGenerator id <- arbitrary[ju.UUID] @@ -1429,11 +1428,8 @@ class TezosForkDatabaseOperationsTest /* we expect to have some invalidation and that the fork level will discriminate */ invalidCount shouldBe >(0) - /* we also want non-empty results to verify */ - loaded should not be empty - /* we expect no more events after the fork */ - loaded.exists(_.eventLevel >= forkLevel) shouldBe false + (loaded.isEmpty || loaded.forall(_.eventLevel < forkLevel)) shouldBe true info(s"resulting in ${loaded.size} remaining elements after the invalidation") } @@ -1464,7 +1460,12 @@ class TezosForkDatabaseOperationsTest * corresponds to the single method of HasKey, i.e. getKey */ implicit def forkValidWithKey[PK, E](implicit hasKey: HasKey[E, PK]): HasKey[ForkValid[E], PK] = { - case ForkValid(entity) => implicitly[HasKey[E, PK]].getKey(entity) + case ForkValid(entity) => hasKey.getKey(entity) + } + + /* will provide an implicit HasKey for an entity wrapped by DBSafe, like the one for ForkValid */ + implicit def dbSafeWithKey[PK, E](implicit hasKey: HasKey[E, PK]): HasKey[DBSafe[E], PK] = { + case DBSafe(entity) => hasKey.getKey(entity) } /* There we provide all the key extractors for known db entites, again using functional interfaces shortcut */ @@ -1480,6 +1481,8 @@ class TezosForkDatabaseOperationsTest implicit val tokenBalancesRowHasKey: HasKey[TokenBalancesRow, String] = _.address implicit val governanceRowHasKey: HasKey[GovernanceRow, (String, String, String)] = gov => (gov.blockHash, gov.proposalHash, gov.votingPeriodKind) + implicit val processedChainEventsRowHasKey: HasKey[ProcessedChainEventsRow, (Long, String)] = + ev => (ev.eventLevel, ev.eventType) } From e345069a20ef94229a39f25da77be934835d532d Mon Sep 17 00:00:00 2001 From: Ivano Pagano Date: Fri, 9 Oct 2020 18:56:42 +0200 Subject: [PATCH 5/7] Basic implementation of fee stats using postgres functions, half-working * convert the tests * the test for empty results fails, as the db-query is not giving correct results --- .../common/sql/CustomProfileExtension.scala | 16 +++- .../tezos/TezosDatabaseOperations.scala | 80 +++++++++++++++++-- .../tezos/TezosDatabaseOperationsTest.scala | 20 +++-- 3 files changed, 103 insertions(+), 13 deletions(-) diff --git a/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/sql/CustomProfileExtension.scala b/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/sql/CustomProfileExtension.scala index 1c61965e5..70aa9acbc 100644 --- a/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/sql/CustomProfileExtension.scala +++ b/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/sql/CustomProfileExtension.scala @@ -3,12 +3,26 @@ package tech.cryptonomic.conseil.common.sql import com.github.tminglei.slickpg.ExPostgresProfile import slick.basic.Capability import slick.jdbc.JdbcCapabilities +import com.github.tminglei.slickpg.agg.PgAggFuncSupport /** Custom postgres profile for enabling `insertOrUpdateAll` */ -trait CustomProfileExtension extends ExPostgresProfile { +trait CustomProfileExtension extends ExPostgresProfile with PgAggFuncSupport { // Add back `capabilities.insertOrUpdate` to enable native `upsert` support; for postgres 9.5+ override protected def computeCapabilities: Set[Capability] = super.computeCapabilities + JdbcCapabilities.insertOrUpdate + + /** provides access to postgres general aggregations */ + lazy val generalAggregations = new GeneralAggFunctions {} + + /** provides access to postgres statistics aggregations */ + lazy val statisticsAggregations = new StatisticsAggFunctions {} + + /** provides access to postgres ordered-set aggregations */ + lazy val orderedSetAggregations = new OrderedSetAggFunctions {} + + /** provides access to postgres hypothetical-set aggregations */ + lazy val hypotheticalSetAggregations = new HypotheticalSetAggFunctions {} + } object CustomProfileExtension extends CustomProfileExtension diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala index 8708ad186..e6dc85094 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala @@ -26,7 +26,7 @@ import tech.cryptonomic.conseil.indexer.sql.DefaultDatabaseOperations._ import scala.collection.immutable.Queue import scala.concurrent.{ExecutionContext, Future} -import scala.math.{ceil, max} +import scala.math import scala.util.{Failure, Success} import tech.cryptonomic.conseil.indexer.tezos.TezosGovernanceOperations.GovernanceAggregate import java.{util => ju} @@ -756,10 +756,10 @@ object TezosDatabaseOperations extends LazyLogging { val values = fees.map { case (fee, _, _, _) => fee.map(_.toDouble).getOrElse(0.0) } - val m: Int = ceil(mean(values)).toInt - val s: Int = ceil(stdev(values)).toInt + val m: Int = math.ceil(mean(values)).toInt + val s: Int = math.ceil(stdev(values)).toInt - AverageFees(max(m - s, 0), m, m + s, ts, kind, cycle, level) + AverageFees(math.max(m - s, 0), m, m + s, ts, kind, cycle, level) } val timestampLowerBound = @@ -771,9 +771,10 @@ object TezosDatabaseOperations extends LazyLogging { .filter(_.timestamp >= timestampLowerBound) .map(o => (o.fee, o.timestamp, o.cycle, o.blockLevel)) .sortBy { case (_, ts, _, _) => ts.desc } - .result - opQuery.map { timestampedFees => + opQuery.result.statements.foreach(stmt => println(s"raising fees as $stmt")) + + opQuery.result.map { timestampedFees => timestampedFees.headOption.map { case (_, latest, cycle, level) => computeAverage(latest, cycle, level, timestampedFees) @@ -781,6 +782,73 @@ object TezosDatabaseOperations extends LazyLogging { } } + object FeesStatistics { + import CustomProfileExtension.api._ + import CustomProfileExtension.generalAggregations.{avg, max} + import CustomProfileExtension.statisticsAggregations.{stdDevPop} + private val zeroBD = BigDecimal.exact(0) + + def stats(kind: Rep[String], lowBound: Rep[Timestamp]) = { + val baseQuery = Tables.Operations + .filter(op => op.kind === kind && op.invalidatedAsof.isEmpty && op.timestamp >= lowBound) + + baseQuery + .map( + it => + ( + avg(it.fee.getOrElse(zeroBD).?), + stdDevPop(it.fee.getOrElse(zeroBD).?), + max(it.timestamp.?), + max(it.cycle).?, + max(it.blockLevel.?) + ) + ) + + } + val feesStatsQuery = Compiled(stats _) + + def calculateAverage(kind: String, daysPast: Int, asOf: Instant = Instant.now())( + implicit ec: ExecutionContext + ): DBIO[Option[AverageFees]] = { + + /* We need to limit the past timestamps for this computation to a reasonable value. + * Otherwise the query optimizer won't be able to efficiently use the indexing and + * will do a full table scan. + */ + + logger.info( + s"Computing fees starting from $daysPast days before $asOf, averaging over all values in the range" + ) + + val timestampLowerBound = + Timestamp.from( + asOf + .atOffset(ZoneOffset.UTC) + .minusDays(daysPast) + .toInstant() + ) + + stats(kind, timestampLowerBound).result.headOption.map { rows => + rows.map { + case (mean, stddev, ts, cycle, level) => + val mu = math.ceil(mean.toDouble).toInt + val sigma = math.ceil(stddev.toDouble).toInt + AverageFees( + low = math.max(mu - sigma, 0), + medium = mu, + high = mu + sigma, + timestamp = ts, + kind = kind, + cycle = cycle, + level = level + ) + } + } + + } + + } + /** Returns all levels that have seen a custom event processing, e.g. * - auto-refresh of all accounts after the babylon protocol amendment * diff --git a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala index a67381279..04d4d280e 100644 --- a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala +++ b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala @@ -9,7 +9,7 @@ import org.scalatest.OptionValues import org.scalatest.wordspec.AnyWordSpec import org.scalatest.matchers.should.Matchers import org.scalacheck.Arbitrary.arbitrary -import slick.jdbc.PostgresProfile.api._ +import tech.cryptonomic.conseil.common.sql.CustomProfileExtension.api._ import tech.cryptonomic.conseil.common.testkit.InMemoryDatabase import tech.cryptonomic.conseil.common.testkit.util.RandomSeed import tech.cryptonomic.conseil.common.tezos.{Tables, TezosOptics} @@ -970,7 +970,11 @@ class TezosDatabaseOperationsTest //check //we specify when the computation of fees needs be done, to have the test block reference time in range val feesCalculation = - sut.calculateAverageFees(ops.head.kind, feesSelectionWindowInDays, asOf = testReferenceDateTime.toInstant()) + sut.FeesStatistics.calculateAverage( + ops.head.kind, + feesSelectionWindowInDays, + asOf = testReferenceDateTime.toInstant() + ) dbHandler.run(feesCalculation).futureValue.value shouldEqual expected @@ -996,9 +1000,13 @@ class TezosDatabaseOperationsTest //check //we specify when the computation of fees needs be done, to have the test block reference time in range val feesCalculation = - sut.calculateAverageFees("undefined", feesSelectionWindowInDays, asOf = testReferenceDateTime.toInstant()) + sut.FeesStatistics.calculateAverage( + "undefined", + feesSelectionWindowInDays, + asOf = testReferenceDateTime.toInstant() + ) - dbHandler.run(feesCalculation).futureValue shouldBe None + dbHandler.run(feesCalculation).futureValue shouldBe empty } @@ -1044,8 +1052,8 @@ class TezosDatabaseOperationsTest //check //we specify when the computation of fees needs be done, to have the test block reference time in range val feesCalculation = - sut.calculateAverageFees( - selection.head.kind, + sut.FeesStatistics.calculateAverage( + ops.head.kind, feesSelectionWindowInDays, asOf = testReferenceDateTime.toInstant() ) From 9f23d68a4bae3ac0b9d318e0276c433a41e3f7dd Mon Sep 17 00:00:00 2001 From: Ivano Pagano Date: Mon, 12 Oct 2020 18:50:03 +0200 Subject: [PATCH 6/7] Compute statistics for fees as part of the db query --- .../common/sql/CustomProfileExtension.scala | 76 ++++++++++- .../conseil/common/util/MathUtil.scala | 38 ------ .../conseil/common/util/MathUtilTest.scala | 22 ---- .../tezos/TezosDatabaseOperations.scala | 122 +++++------------- .../indexer/tezos/TezosFeeOperations.scala | 8 +- .../tezos/TezosDatabaseOperationsTest.scala | 36 +----- .../TezosForkDatabaseOperationsTest.scala | 2 +- 7 files changed, 109 insertions(+), 195 deletions(-) delete mode 100644 conseil-common/src/main/scala/tech/cryptonomic/conseil/common/util/MathUtil.scala delete mode 100644 conseil-common/src/test/scala/tech/cryptonomic/conseil/common/util/MathUtilTest.scala diff --git a/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/sql/CustomProfileExtension.scala b/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/sql/CustomProfileExtension.scala index 70aa9acbc..77bfbaf74 100644 --- a/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/sql/CustomProfileExtension.scala +++ b/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/sql/CustomProfileExtension.scala @@ -1,28 +1,94 @@ package tech.cryptonomic.conseil.common.sql import com.github.tminglei.slickpg.ExPostgresProfile +import slick.ast._ +import slick.lifted.{Query, Rep} import slick.basic.Capability import slick.jdbc.JdbcCapabilities import com.github.tminglei.slickpg.agg.PgAggFuncSupport /** Custom postgres profile for enabling `insertOrUpdateAll` */ trait CustomProfileExtension extends ExPostgresProfile with PgAggFuncSupport { + // Add back `capabilities.insertOrUpdate` to enable native `upsert` support; for postgres 9.5+ override protected def computeCapabilities: Set[Capability] = super.computeCapabilities + JdbcCapabilities.insertOrUpdate - /** provides access to postgres general aggregations */ + /* The following values can be used via imports to add all aggregations supported + * by pg-slick, which covers all postgres defined ones. + * e.g. + * {{{ + * import CustomProfileExtension.statisticsAggregations._ + * }}} + * + * There is a very strong limitation though, the current implementation + * doesn't support aggregations combined with grouping of other columns. + * + * see the related issue here: https://github.com/tminglei/slick-pg/issues/289 + * To overcome this limitation, we introduced an additional extension of + * slick-provided aggregations in the CustomApi object. + * The latter is a totally different work-around and can't be used + * with the pg-slick aggregations, as the two API designs are not compatible. + */ + /** provides access to postgres general aggregations via pg-slick */ lazy val generalAggregations = new GeneralAggFunctions {} - /** provides access to postgres statistics aggregations */ + /** provides access to postgres statistics aggregations via pg-slick */ lazy val statisticsAggregations = new StatisticsAggFunctions {} - /** provides access to postgres ordered-set aggregations */ + /** provides access to postgres ordered-set aggregations via pg-slick */ lazy val orderedSetAggregations = new OrderedSetAggFunctions {} - /** provides access to postgres hypothetical-set aggregations */ + /** provides access to postgres hypothetical-set aggregations via pg-slick */ lazy val hypotheticalSetAggregations = new HypotheticalSetAggFunctions {} + override val api: API = CustomApi + + /** Provide additional features from api import */ + object CustomApi extends API { + + /* The following conversions provides extra statistic operations from postgres + * which are not generally available for any db-type + * The use the same mechanisim used internally by slick, which provides aggregate functions + * only for queries that results in single-column values. + * The column can contain a value or an optional value (i.e. it works for nullable columns too) + */ + implicit def customSingleColumnQueryExtensions[E: BaseTypedType, C[_]](q: Query[Rep[E], _, C]) = + new CustomSingleColumnQueryExtensions[E, E, C](q) + implicit def customSingleOptionColumnQueryExtensions[E: BaseTypedType, C[_]](q: Query[Rep[Option[E]], _, C]) = + new CustomSingleColumnQueryExtensions[E, Option[E], C](q) + } + } -object CustomProfileExtension extends CustomProfileExtension +/** Allow using the extensions under an import in scope */ +object CustomProfileExtension extends CustomProfileExtension { + import slick.ast.Library._ + + /** Defines extra functions that we use for statistical computations */ + object Library { + /* slick-compatible definitions, based on the internal implementation of slick-provided + * extensions for sql functions + */ + val StdDevAggregate = new SqlAggregateFunction("stddev") + val StdDevPopAggregate = new SqlAggregateFunction("stddev_pop") + val StdDevSampAggregate = new SqlAggregateFunction("stddev_samp") + } +} + +/** The implementation of extra functions added via implicit conversions of the query type */ +final class CustomSingleColumnQueryExtensions[A, E, C[_]](val q: Query[Rep[E], _, C]) extends AnyVal { + import slick.lifted.FunctionSymbolExtensionMethods._ + + /** historical alias for [[stdDevSamp]] */ + def stdDev(implicit tt: TypedType[Option[A]]) = + CustomProfileExtension.Library.StdDevAggregate.column[Option[A]](q.toNode) + + /** population standard deviation of the input values */ + def stdDevPop(implicit tt: TypedType[Option[A]]) = + CustomProfileExtension.Library.StdDevPopAggregate.column[Option[A]](q.toNode) + + /** sample standard deviation of the input values */ + def stdDevSamp(implicit tt: TypedType[Option[A]]) = + CustomProfileExtension.Library.StdDevSampAggregate.column[Option[A]](q.toNode) +} diff --git a/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/util/MathUtil.scala b/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/util/MathUtil.scala deleted file mode 100644 index 113059b26..000000000 --- a/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/util/MathUtil.scala +++ /dev/null @@ -1,38 +0,0 @@ -package tech.cryptonomic.conseil.common.util - -import scala.math.{pow, sqrt} - -object MathUtil { - - /** - * Average value of a sequence - * @param l Sequence of doubles. - * @return - */ - def mean(l: Seq[Double]): Double = - l.sum / l.length - - /** - * Standard deviation of a sequence - * @param l Sequence of doubles. - * @return - */ - def stdev(l: Seq[Double]): Double = { - val m = mean(l) - val len = l.length.toDouble - sqrt(l.map(x => pow(x - m, 2)).sum / len) - } - - /** - * Standard deviation of a sequence corrected for using samples - * see https://en.wikipedia.org/wiki/Standard_deviation for Bessel correction - * @param l Sequence of doubles. - * @return - */ - def sampledStdev(l: Seq[Double]): Double = { - val m = mean(l) - val samples = (l.length - 1).toDouble - sqrt(l.map(x => pow(x - m, 2)).sum / samples) - } - -} diff --git a/conseil-common/src/test/scala/tech/cryptonomic/conseil/common/util/MathUtilTest.scala b/conseil-common/src/test/scala/tech/cryptonomic/conseil/common/util/MathUtilTest.scala deleted file mode 100644 index 511e13cb6..000000000 --- a/conseil-common/src/test/scala/tech/cryptonomic/conseil/common/util/MathUtilTest.scala +++ /dev/null @@ -1,22 +0,0 @@ -package tech.cryptonomic.conseil.common.util - -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class MathUtilTest extends AnyFlatSpec with Matchers { - - "MathUtilTest" should "correctly calculate the mean of a sequence" in { - val dataset = List(1d, 2d, 3d, 4d, 5d, 6d, 7d, 8d, 9d, 10d) - MathUtil.mean(dataset) should be(5.5) - } - - "MathUtilTest" should "correctly calculate the population standard deviation of a sequence" in { - val dataset = List(1d, 2d, 3d, 4d, 5d, 6d, 7d, 8d, 9d, 10d) - MathUtil.stdev(dataset) should be(2.8722813232690143) - } - - "MathUtilTest" should "correctly calculate the population standard deviation of a sequence corrected for using samples" in { - val dataset = List(1d, 2d, 3d, 4d, 5d, 6d, 7d, 8d, 9d, 10d) - MathUtil.sampledStdev(dataset) should be(3.0276503540974917) - } -} diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala index e6dc85094..88c0f3494 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala @@ -21,7 +21,6 @@ import tech.cryptonomic.conseil.common.tezos.Tables import tech.cryptonomic.conseil.common.util.ConfigUtil import tech.cryptonomic.conseil.common.util.CollectionOps._ import tech.cryptonomic.conseil.common.util.Conversion.Syntax._ -import tech.cryptonomic.conseil.common.util.MathUtil.{mean, stdev} import tech.cryptonomic.conseil.indexer.sql.DefaultDatabaseOperations._ import scala.collection.immutable.Queue @@ -718,98 +717,42 @@ object TezosDatabaseOperations extends LazyLogging { Tables.Bakers.insertOrUpdateAll(bakers) } - /** Given the operation kind, return range of fees and timestamp for that operation. - * - * @param kind operation kind - * @param daysPast how many values to use for statistics computations, as a time-window - * @param asOf when the computation is to be considered, by default uses the time of invocation - * @return the average fees for a given operation kind, if it exists - */ - def calculateAverageFees( - kind: String, - daysPast: Int, - asOf: Instant = Instant.now() - )( - implicit ec: ExecutionContext - ): DBIO[Option[AverageFees]] = { - /* We need to limit the past timestamps for this computation to a reasonable value. - * Otherwise the query optimizer won't be able to efficiently use the indexing and - * will do a full table scan. - */ - val secsPerDay = 60L * 60L * 24L //secs * mins * hours - val secsToPastHorizon = daysPast * secsPerDay - - logger.info( - s"Computing fees starting from $daysPast days before $asOf, averaging over all values in the range" - ) - - type Cycle = Int - type Fee = BigDecimal - type FeeDetails = (Option[Fee], Timestamp, Option[Cycle], BlockLevel) - - def computeAverage( - ts: Timestamp, - cycle: Option[Cycle], - level: BlockLevel, - fees: Seq[FeeDetails] - ): AverageFees = { - val values = fees.map { - case (fee, _, _, _) => fee.map(_.toDouble).getOrElse(0.0) - } - val m: Int = math.ceil(mean(values)).toInt - val s: Int = math.ceil(stdev(values)).toInt - - AverageFees(math.max(m - s, 0), m, m + s, ts, kind, cycle, level) - } - - val timestampLowerBound = - Timestamp.from(asOf.atOffset(ZoneOffset.UTC).minusSeconds(secsToPastHorizon).toInstant()) - - val opQuery = - Tables.Operations - .filter(op => op.kind === kind && op.invalidatedAsof.isEmpty) - .filter(_.timestamp >= timestampLowerBound) - .map(o => (o.fee, o.timestamp, o.cycle, o.blockLevel)) - .sortBy { case (_, ts, _, _) => ts.desc } - - opQuery.result.statements.foreach(stmt => println(s"raising fees as $stmt")) - - opQuery.result.map { timestampedFees => - timestampedFees.headOption.map { - case (_, latest, cycle, level) => - computeAverage(latest, cycle, level, timestampedFees) - } - } - } - + /** Stats computations for Fees */ object FeesStatistics { - import CustomProfileExtension.api._ - import CustomProfileExtension.generalAggregations.{avg, max} - import CustomProfileExtension.statisticsAggregations.{stdDevPop} + import CustomProfileExtension.CustomApi._ private val zeroBD = BigDecimal.exact(0) - def stats(kind: Rep[String], lowBound: Rep[Timestamp]) = { - val baseQuery = Tables.Operations - .filter(op => op.kind === kind && op.invalidatedAsof.isEmpty && op.timestamp >= lowBound) - - baseQuery - .map( - it => - ( - avg(it.fee.getOrElse(zeroBD).?), - stdDevPop(it.fee.getOrElse(zeroBD).?), - max(it.timestamp.?), - max(it.cycle).?, - max(it.blockLevel.?) - ) - ) - + /* prepares the query */ + private def stats(lowBound: Rep[Timestamp]) = { + val baseSelection = Tables.Operations + .filter(op => op.invalidatedAsof.isEmpty && op.timestamp >= lowBound) + + baseSelection.groupBy(_.kind).map { + case (kind, subQuery) => + ( + kind, + subQuery.map(_.fee.getOrElse(zeroBD)).avg, + subQuery.map(_.fee.getOrElse(zeroBD)).stdDevPop, + subQuery.map(_.timestamp).max, + subQuery.map(_.cycle).max, + subQuery.map(_.blockLevel).max + ) + } } - val feesStatsQuery = Compiled(stats _) - def calculateAverage(kind: String, daysPast: Int, asOf: Instant = Instant.now())( + /* slick compiled queries don't need to be converted to sql at each call, gaining in performance */ + private val feesStatsQuery = Compiled(stats _) + + /** Collects fees from any block operation and return statistic data from a time window. + * The results are grouped by the operation kind. + * + * @param daysPast how many values to use for statistics computations, as a time-window + * @param asOf when the computation is to be considered, by default uses the time of invocation + * @return the average fees for each given operation kind, if it exists + */ + def calculateAverage(daysPast: Long, asOf: Instant = Instant.now())( implicit ec: ExecutionContext - ): DBIO[Option[AverageFees]] = { + ): DBIO[Seq[AverageFees]] = { /* We need to limit the past timestamps for this computation to a reasonable value. * Otherwise the query optimizer won't be able to efficiently use the indexing and @@ -828,9 +771,10 @@ object TezosDatabaseOperations extends LazyLogging { .toInstant() ) - stats(kind, timestampLowerBound).result.headOption.map { rows => + //here we assume all the values are present, as we used defaults for any of them, or know they exists for certain + feesStatsQuery(timestampLowerBound).result.map { rows => rows.map { - case (mean, stddev, ts, cycle, level) => + case (kind, Some(mean), Some(stddev), Some(ts), cycle, Some(level)) => val mu = math.ceil(mean.toDouble).toInt val sigma = math.ceil(stddev.toDouble).toInt AverageFees( diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala index 8ae43c5a6..9781929be 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala @@ -1,7 +1,6 @@ package tech.cryptonomic.conseil.indexer.tezos import com.typesafe.scalalogging.LazyLogging -import slick.dbio.DBIOAction import tech.cryptonomic.conseil.indexer.tezos.{TezosDatabaseOperations => TezosDb} import scala.concurrent.{ExecutionContext, Future} @@ -39,12 +38,9 @@ private[tezos] object TezosFeeOperations extends LazyLogging { def processTezosAverageFees(selectionWindow: FiniteDuration)(implicit ec: ExecutionContext): Future[Option[Int]] = { logger.info("Processing latest Tezos fee data...") - //partially apply the fixed window to get a function that only uses the kind - val calculateForWindow = TezosDb.calculateAverageFees(_: String, daysPast = selectionWindow.toDays.toInt) - val computeAndStore = for { - fees <- DBIOAction.sequence(operationKinds.map(calculateForWindow)) - dbWrites <- TezosDb.writeFees(fees.collect { case Some(fee) => fee }) + feeStats <- TezosDb.FeesStatistics.calculateAverage(selectionWindow.toDays) + dbWrites <- TezosDb.writeFees(feeStats.filter(fee => operationKinds.contains(fee.kind)).toList) } yield dbWrites db.run(computeAndStore).andThen { diff --git a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala index 04d4d280e..6317e3448 100644 --- a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala +++ b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperationsTest.scala @@ -971,42 +971,11 @@ class TezosDatabaseOperationsTest //we specify when the computation of fees needs be done, to have the test block reference time in range val feesCalculation = sut.FeesStatistics.calculateAverage( - ops.head.kind, feesSelectionWindowInDays, asOf = testReferenceDateTime.toInstant() ) - dbHandler.run(feesCalculation).futureValue.value shouldEqual expected - - } - - "return None when computing average fees for a kind with no data" in { - //generate data - implicit val randomSeed = RandomSeed(testReferenceTimestamp.getTime) - val block = generateBlockRows(1, testReferenceTimestamp).head - val group = generateOperationGroupRows(block).head - - val fees = Seq.fill(3)(Some(BigDecimal(1))) - val ops = wrapFeesWithOperations(fees, block, group) - - val populate = for { - _ <- Tables.Blocks += block - _ <- Tables.OperationGroups += group - ids <- Tables.Operations returning Tables.Operations.map(_.operationId) ++= ops - } yield ids - - dbHandler.run(populate).futureValue should have size (fees.size) - - //check - //we specify when the computation of fees needs be done, to have the test block reference time in range - val feesCalculation = - sut.FeesStatistics.calculateAverage( - "undefined", - feesSelectionWindowInDays, - asOf = testReferenceDateTime.toInstant() - ) - - dbHandler.run(feesCalculation).futureValue shouldBe empty + dbHandler.run(feesCalculation).futureValue.headOption.value shouldEqual expected } @@ -1053,12 +1022,11 @@ class TezosDatabaseOperationsTest //we specify when the computation of fees needs be done, to have the test block reference time in range val feesCalculation = sut.FeesStatistics.calculateAverage( - ops.head.kind, feesSelectionWindowInDays, asOf = testReferenceDateTime.toInstant() ) - dbHandler.run(feesCalculation).futureValue.value shouldEqual expected + dbHandler.run(feesCalculation).futureValue.find(_.kind == ops.head.kind).value shouldEqual expected } diff --git a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala index 1d558b3dc..bd5f41802 100644 --- a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala +++ b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala @@ -527,7 +527,7 @@ class TezosForkDatabaseOperationsTest _ <- Tables.Blocks += invalidBlock _ <- Tables.OperationGroups += invalidGroup Some(stored) <- Tables.Operations ++= invalidRows - loaded <- sut.calculateAverageFees(kind = "transaction", daysPast = 10) + loaded <- sut.FeesStatistics.calculateAverage(daysPast = 10) } yield (stored, loaded) val (stored, loaded) = dbHandler.run(populateAndFetch).futureValue From 40e799ee3a3f5ae9f52efdc552fd25d2425de993 Mon Sep 17 00:00:00 2001 From: Ivano Pagano Date: Fri, 16 Oct 2020 16:19:52 +0200 Subject: [PATCH 7/7] Apply code reviews suggestions --- .../conseil/indexer/tezos/TezosFeeOperations.scala | 5 +++-- .../indexer/tezos/TezosForkDatabaseOperationsTest.scala | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala index 9781929be..97e987319 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosFeeOperations.scala @@ -30,7 +30,8 @@ private[tezos] object TezosFeeOperations extends LazyLogging { * The computation will use a limited number of fees, as the result of a selection window in days. * Only blocks belonging within such window in the past, relative to the calculation moment, will be considered. * - * @param numberOfFeesAveraged a limit on how many of the latest fee values will be used for averaging + * The window is rounded to a granularity of whole days and can't be less than 1. + * * @param selectionWindow the max number of days back from the current block timestamp to use when averaging * @param ec the needed ExecutionContext to combine multiple database operations * @return a future result of the number of rows stored to db, if supported by the driver @@ -39,7 +40,7 @@ private[tezos] object TezosFeeOperations extends LazyLogging { logger.info("Processing latest Tezos fee data...") val computeAndStore = for { - feeStats <- TezosDb.FeesStatistics.calculateAverage(selectionWindow.toDays) + feeStats <- TezosDb.FeesStatistics.calculateAverage(math.max(selectionWindow.toDays, 1)) dbWrites <- TezosDb.writeFees(feeStats.filter(fee => operationKinds.contains(fee.kind)).toList) } yield dbWrites diff --git a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala index bd5f41802..32229631c 100644 --- a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala +++ b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/TezosForkDatabaseOperationsTest.scala @@ -51,7 +51,6 @@ class TezosForkDatabaseOperationsTest import scala.concurrent.ExecutionContext.Implicits.global import TezosDataGenerationKit.DataModelGeneration._ - import TezosDataGenerationKit.DomainModelGeneration._ import LocalGenerationUtils._ val sut = TezosDatabaseOperations