Skip to content

Commit

Permalink
Define fees averaging in term of configured time-window
Browse files Browse the repository at this point in the history
 * update the tests
 * fix a flaky test
  • Loading branch information
ivanopagano committed Oct 7, 2020
1 parent ea9a4e6 commit 1b20818
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 45 deletions.
4 changes: 2 additions & 2 deletions conseil-lorre/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ lorre {
bootup-connection-check-timeout: 10 s
#Used to make sure Lorre records average fees every n iterations
fee-update-interval: 20
#Used to select how many fees should be averaged together
number-of-fees-averaged: 1000
#Used to select how many fees should be averaged together, based on a time-window
fees-average-time-window: 1 day
#Docs missing
depth: newest
depth: ${?CONSEIL_LORRE_DEPTH}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final case class LorreConfiguration(
bootupRetryInterval: FiniteDuration,
bootupConnectionCheckTimeout: FiniteDuration,
feeUpdateInterval: Int,
numberOfFeesAveraged: Int,
feesAverageTimeWindow: FiniteDuration,
depth: Depth,
headHash: Option[String],
chainEvents: List[ChainEvent],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,43 +701,29 @@ object TezosDatabaseOperations extends LazyLogging {
Tables.Bakers.insertOrUpdateAll(bakers)
}

/**
* Given the operation kind, return range of fees and timestamp for that operation.
* @param kind Operation kind
* @param numberOfFeesAveraged How many values to use for statistics computations
* @param asOf When the computation is to be considered, by default uses the time of invocation
* @return The average fees for a given operation kind, if it exists
/** Given the operation kind, return range of fees and timestamp for that operation.
*
* @param kind operation kind
* @param daysPast how many values to use for statistics computations, as a time-window
* @param asOf when the computation is to be considered, by default uses the time of invocation
* @return the average fees for a given operation kind, if it exists
*/
def calculateAverageFees(
kind: String,
numberOfFeesAveraged: Int,
daysPast: Int,
asOf: Instant = Instant.now()
)(
implicit ec: ExecutionContext
): DBIO[Option[AverageFees]] = {
/* We need to limit the past timestamps for this computation to a reasonable value.
* Otherwise the query optimizer won't be able to efficiently use the indexing and
* will do a full table scan.
*
* This is what we know now:
* - each cycle bakes 4096 blocks
* - a cycle takes around 2-3 days to run
* - each block stores a variable number of transactions, down-to the min of 1.
*
* We can make conservative computations to figure out how far in the past we need to go,
* to guarantee a [[numberOfFeesAveraged]] values.
*
* We can assume a single transaction per block (1 trans/block), hence we need numberOfFeesAveraged blocks.
* Assuming 3 days to get 4096 blocks we have 4096/3 blocks-a-day at worst, which is ~1365 blocks.
* Therefore we want to look back to numberOfFeesAveraged/1365 days in the past to guarantee the required fees counts.
*/
val blocksPerDay = 1365
val daysToPastHorizon = 1 + (numberOfFeesAveraged / blocksPerDay) //round-up for integer division
val secsPerDay = 60L * 60L * 24L //secs * mins * hours
val secsToPastHorizon = daysToPastHorizon.toLong * secsPerDay
val secsToPastHorizon = daysPast * secsPerDay

logger.info(
s"Computing fees starting from $daysToPastHorizon days before $asOf, averaging over $numberOfFeesAveraged values"
s"Computing fees starting from $daysPast days before $asOf, averaging over all values in the range"
)

type Cycle = Int
Expand Down Expand Up @@ -768,7 +754,6 @@ object TezosDatabaseOperations extends LazyLogging {
.filter(_.timestamp >= timestampLowerBound)
.map(o => (o.fee, o.timestamp, o.cycle, o.blockLevel))
.sortBy { case (_, ts, _, _) => ts.desc }
.take(numberOfFeesAveraged)
.result

opQuery.map { timestampedFees =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import tech.cryptonomic.conseil.indexer.tezos.{TezosDatabaseOperations => TezosD

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.concurrent.duration.FiniteDuration

/**
* Helper classes and functions used for average fee calculations.
Expand All @@ -26,16 +27,23 @@ private[tezos] object TezosFeeOperations extends LazyLogging {
"endorsement"
)

/**
* Calculates average fees for each operation kind and stores them into a fees table.
/** Calculates average fees for each operation kind and stores them into a fees table.
* The computation will use a limited number of fees, as the result of a selection window in days.
* Only blocks belonging within such window in the past, relative to the calculation moment, will be considered.
*
* @param numberOfFeesAveraged a limit on how many of the latest fee values will be used for averaging
* @param ex the needed ExecutionContext to combine multiple database operations
* @param selectionWindow the max number of days back from the current block timestamp to use when averaging
* @param ec the needed ExecutionContext to combine multiple database operations
* @return a future result of the number of rows stored to db, if supported by the driver
*/
def processTezosAverageFees(numberOfFeesAveraged: Int)(implicit ex: ExecutionContext): Future[Option[Int]] = {
def processTezosAverageFees(selectionWindow: FiniteDuration)(implicit ec: ExecutionContext): Future[Option[Int]] = {
logger.info("Processing latest Tezos fee data...")

//partially apply the fixed window to get a function that only uses the kind
val calculateForWindow = TezosDb.calculateAverageFees(_: String, daysPast = selectionWindow.toDays.toInt)

val computeAndStore = for {
fees <- DBIOAction.sequence(operationKinds.map(TezosDb.calculateAverageFees(_, numberOfFeesAveraged)))
fees <- DBIOAction.sequence(operationKinds.map(calculateForWindow))
dbWrites <- TezosDb.writeFees(fees.collect { case Some(fee) => fee })
} yield dbWrites

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class TezosIndexer private (
)
_ <- processTezosBlocks(maxLevel)
_ <- if (iteration % lorreConf.feeUpdateInterval == 0)
TezosFeeOperations.processTezosAverageFees(lorreConf.numberOfFeesAveraged)
TezosFeeOperations.processTezosAverageFees(lorreConf.feesAverageTimeWindow)
else
noOp
_ <- rightsProcessor.updateRightsTimestamps()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class TezosDatabaseOperationsTest

val sut = TezosDatabaseOperations
val indexed = new TezosIndexedDataOperations(dbHandler)
val feesToConsider = 1000
val feesSelectionWindowInDays = 100

"use the right collation" in {
val ordered =
Expand Down Expand Up @@ -896,7 +896,7 @@ class TezosDatabaseOperationsTest
//check
//we specify when the computation of fees needs be done, to have the test block reference time in range
val feesCalculation =
sut.calculateAverageFees(ops.head.kind, feesToConsider, asOf = testReferenceDateTime.toInstant())
sut.calculateAverageFees(ops.head.kind, feesSelectionWindowInDays, asOf = testReferenceDateTime.toInstant())

dbHandler.run(feesCalculation).futureValue.value shouldEqual expected

Expand All @@ -922,7 +922,7 @@ class TezosDatabaseOperationsTest
//check
//we specify when the computation of fees needs be done, to have the test block reference time in range
val feesCalculation =
sut.calculateAverageFees("undefined", feesToConsider, asOf = testReferenceDateTime.toInstant())
sut.calculateAverageFees("undefined", feesSelectionWindowInDays, asOf = testReferenceDateTime.toInstant())

dbHandler.run(feesCalculation).futureValue shouldBe None

Expand Down Expand Up @@ -970,7 +970,11 @@ class TezosDatabaseOperationsTest
//check
//we specify when the computation of fees needs be done, to have the test block reference time in range
val feesCalculation =
sut.calculateAverageFees(selection.head.kind, feesToConsider, asOf = testReferenceDateTime.toInstant())
sut.calculateAverageFees(
selection.head.kind,
feesSelectionWindowInDays,
asOf = testReferenceDateTime.toInstant()
)

dbHandler.run(feesCalculation).futureValue.value shouldEqual expected

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class TezosForkDatabaseOperationsTest
_ <- Tables.Blocks += invalidBlock
_ <- Tables.OperationGroups += invalidGroup
Some(stored) <- Tables.Operations ++= invalidRows
loaded <- sut.calculateAverageFees(kind = "transaction", numberOfFeesAveraged = invalidRows.size)
loaded <- sut.calculateAverageFees(kind = "transaction", daysPast = 10)
} yield (stored, loaded)

val (stored, loaded) = dbHandler.run(populateAndFetch).futureValue
Expand Down Expand Up @@ -1402,8 +1402,7 @@ class TezosForkDatabaseOperationsTest
val (validRows, forkLevel, invalidation, fork) = {
val generator = for {
//duplicate ids will fail to save on the db for violation of the PK uniqueness
rows <- Gen
.nonEmptyListOf(arbitrary[DBSafe[ProcessedChainEventsRow]])
rows <- nonConflictingArbitrary[DBSafe[ProcessedChainEventsRow]]
level <- Gen.posNum[Long]
instant <- TezosDataGenerationKit.instantGenerator
id <- arbitrary[ju.UUID]
Expand Down Expand Up @@ -1431,11 +1430,8 @@ class TezosForkDatabaseOperationsTest
/* we expect to have some invalidation and that the fork level will discriminate */
invalidCount shouldBe >(0)

/* we also want non-empty results to verify */
loaded should not be empty

/* we expect no more events after the fork */
loaded.exists(_.eventLevel >= forkLevel) shouldBe false
(loaded.isEmpty || loaded.forall(_.eventLevel < forkLevel)) shouldBe true

info(s"resulting in ${loaded.size} remaining elements after the invalidation")
}
Expand Down Expand Up @@ -1466,7 +1462,12 @@ class TezosForkDatabaseOperationsTest
* corresponds to the single method of HasKey, i.e. getKey
*/
implicit def forkValidWithKey[PK, E](implicit hasKey: HasKey[E, PK]): HasKey[ForkValid[E], PK] = {
case ForkValid(entity) => implicitly[HasKey[E, PK]].getKey(entity)
case ForkValid(entity) => hasKey.getKey(entity)
}

/* will provide an implicit HasKey for an entity wrapped by DBSafe, like the one for ForkValid */
implicit def dbSafeWithKey[PK, E](implicit hasKey: HasKey[E, PK]): HasKey[DBSafe[E], PK] = {
case DBSafe(entity) => hasKey.getKey(entity)
}

/* There we provide all the key extractors for known db entites, again using functional interfaces shortcut */
Expand All @@ -1482,6 +1483,8 @@ class TezosForkDatabaseOperationsTest
implicit val tokenBalancesRowHasKey: HasKey[TokenBalancesRow, String] = _.address
implicit val governanceRowHasKey: HasKey[GovernanceRow, (String, String, String)] =
gov => (gov.blockHash, gov.proposalHash, gov.votingPeriodKind)
implicit val processedChainEventsRowHasKey: HasKey[ProcessedChainEventsRow, (Long, String)] =
ev => (ev.eventLevel, ev.eventType)

}

Expand Down

0 comments on commit 1b20818

Please sign in to comment.