Skip to content

Commit

Permalink
ETCM-463: Merge the two vectors.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Dec 9, 2020
1 parent bd83cb1 commit a891426
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 51 deletions.
84 changes: 44 additions & 40 deletions src/main/scala/io/iohk/ethereum/network/TimeSlotStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,34 @@ import cats.implicits._
import scala.concurrent.duration.{Duration, FiniteDuration}

/** Track statistics over time a fixed size timewindow. */
case class TimeSlotStats[K, V: Monoid] private (
class TimeSlotStats[K, V: Monoid] private (
// Time resolution.
slotDuration: FiniteDuration,
slotCount: Int,
// The last written slot.
lastIdx: Int,
// Ring buffer of the timestamp of each slot.
timeSlots: IndexedSeq[TimeSlotStats.Timestamp],
// Ring buffer of statistics per slot.
statSlots: IndexedSeq[Map[K, V]]
val slotDuration: FiniteDuration,
// The last written position in the buffer.
val lastIdx: Int,
// Ring buffer of slots statistics.
val buffer: IndexedSeq[TimeSlotStats.Entry[K, V]]
) {
import TimeSlotStats._

/** Overall length of the timewindow. */
def duration = slotDuration * slotCount
def slotCount = buffer.size

/** Merge new stats for a given key in the current timestamp. */
def add(key: K, stat: V, timestamp: Timestamp = System.currentTimeMillis): TimeSlotStats[K, V] = {
val currSlot = slotId(timestamp)
val lastSlot = timeSlots(lastIdx)
val currSlotId = slotId(timestamp)
val lastEntry = buffer(lastIdx)

if (currSlot == lastSlot) {
if (currSlotId == lastEntry.slotId) {
// We're still in the same timeslot, so just append stats.
val newStats = statSlots(lastIdx) |+| Map(key -> stat)
copy(
statSlots = statSlots.updated(lastIdx, newStats)
)
} else if (currSlot > lastSlot) {
val newEntry = lastEntry.add(key, stat)
updated(lastIdx, newEntry)
} else if (currSlotId > lastEntry.slotId) {
// Go to the next slot.
val newStats = Map(key -> stat)
val newIdx = succ(lastIdx)
copy(
lastIdx = newIdx,
timeSlots = timeSlots.updated(newIdx, currSlot),
statSlots = statSlots.updated(newIdx, newStats)
)
val newEntry = Entry(currSlotId, Map(key -> stat))
updated(newIdx, newEntry)
} else {
// Going backwards in time, just ignore it.
this
Expand All @@ -49,7 +41,7 @@ case class TimeSlotStats[K, V: Monoid] private (

/** Forget all statistics about a given key. */
def remove(key: K): TimeSlotStats[K, V] =
copy(statSlots = statSlots.map(_ - key))
updated(lastIdx, buffer.map(_.remove(key)))

/** Aggregate stats for a key in all slots that are within the duration. */
def get(key: K, timestamp: Timestamp = System.currentTimeMillis): V =
Expand All @@ -67,11 +59,11 @@ case class TimeSlotStats[K, V: Monoid] private (
val (start, end) = slotRange(timestamp)

def loop(idx: Int, acc: A): A = {
val t = timeSlots(idx)
if (t < start || t > end)
val entry = buffer(idx)
if (entry.slotId < start || entry.slotId > end)
acc
else
loop(pred(idx), f(acc, statSlots(idx)))
loop(pred(idx), f(acc, entry.slotStats))
}

loop(lastIdx, init)
Expand All @@ -82,44 +74,56 @@ case class TimeSlotStats[K, V: Monoid] private (
timestamp - timestamp % slotDuration.toMillis
}

/** The range of time slots based on the current timestamp and the buffer duration. */
private def slotRange(timestamp: Timestamp): (Timestamp, Timestamp) = {
val endSlot = slotId(timestamp)
val startSlot = slotId(timestamp - slotDuration.toMillis * slotCount)
startSlot -> endSlot
val end = slotId(timestamp)
val start = slotId(timestamp - duration.toMillis)
start -> end
}

private def succ(idx: Int): Int = (idx + 1) % slotCount
private def pred(idx: Int): Int = (idx - 1) % slotCount

private def copy(statSlots: IndexedSeq[Map[K, V]]): TimeSlotStats[K, V] =
copy(lastIdx, timeSlots, statSlots)
private def updated(
lastIdx: Int,
entry: Entry[K, V]
): TimeSlotStats[K, V] =
updated(lastIdx, buffer.updated(lastIdx, entry))

private def copy(
private def updated(
lastIdx: Int,
timeSlots: IndexedSeq[Timestamp],
statSlots: IndexedSeq[Map[K, V]]
buffer: IndexedSeq[Entry[K, V]]
): TimeSlotStats[K, V] =
new TimeSlotStats[K, V](slotDuration, slotCount, lastIdx, timeSlots, statSlots)
new TimeSlotStats[K, V](slotDuration, lastIdx, buffer)
}

object TimeSlotStats {

// Milliseconds since epoch.
type Timestamp = Long

case class Entry[K, V: Monoid](
slotId: Timestamp,
slotStats: Map[K, V]
) {
def add(key: K, stat: V): Entry[K, V] =
copy(slotStats = slotStats |+| Map(key -> stat))

def remove(key: K): Entry[K, V] =
copy(slotStats = slotStats - key)
}

def apply[K, V: Monoid](
slotDuration: FiniteDuration,
slotCount: Int
): Option[TimeSlotStats[K, V]] =
if (slotDuration == Duration.Zero || slotCount <= 0) None
else
Some {
TimeSlotStats[K, V](
new TimeSlotStats[K, V](
slotDuration,
slotCount,
lastIdx = slotCount - 1, // So the first slot we fill will move to 0.
timeSlots = IndexedSeq.fill(slotCount)(-1L),
statSlots = IndexedSeq.fill(slotCount)(Map.empty[K, V])
buffer = IndexedSeq.fill(slotCount)(Entry(0L, Map.empty[K, V]))
)
}
}
22 changes: 11 additions & 11 deletions src/test/scala/io/iohk/ethereum/network/TimeSlotStatsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ class TimeSlotStatsSpec extends AnyFlatSpec with Matchers {

it should "add new keys to the last timeslot" in {
val stats = emptyStats.add("foo", 1)
stats.statSlots(0)("foo") shouldBe 1
stats.buffer(0).slotStats("foo") shouldBe 1
}

it should "merge keys in the last timeslot" in {
val stats = emptyStats.add("foo", 1).add("foo", 2).add("bar", 0, timestamp = System.currentTimeMillis + 10)
stats.statSlots(0)("foo") shouldBe 3
stats.statSlots(0)("bar") shouldBe 0
stats.buffer(0).slotStats("foo") shouldBe 3
stats.buffer(0).slotStats("bar") shouldBe 0
}

it should "ignore updates for earlier timeslots" in {
Expand All @@ -33,8 +33,8 @@ class TimeSlotStatsSpec extends AnyFlatSpec with Matchers {
.add("foo", 1)
.add("foo", 2, timestamp = System.currentTimeMillis + emptyStats.slotDuration.toMillis + 1)

stats.statSlots(0)("foo") shouldBe 1
stats.statSlots(1)("foo") shouldBe 2
stats.buffer(0).slotStats("foo") shouldBe 1
stats.buffer(1).slotStats("foo") shouldBe 2
}

it should "remove keys from all slots" in {
Expand All @@ -45,11 +45,11 @@ class TimeSlotStatsSpec extends AnyFlatSpec with Matchers {
.add("bar", 4, timestamp = System.currentTimeMillis + emptyStats.slotDuration.toMillis + 2)
.remove("foo")

Inspectors.forAll(stats.statSlots) {
_ should not contain key("foo")
Inspectors.forAll(stats.buffer) { entry =>
entry.slotStats should not contain key("foo")
}
Inspectors.forExactly(2, stats.statSlots) {
_ should contain key ("bar")
Inspectors.forExactly(2, stats.buffer) { entry =>
entry.slotStats should contain key ("bar")
}
}

Expand All @@ -61,8 +61,8 @@ class TimeSlotStatsSpec extends AnyFlatSpec with Matchers {
stats.add("foo", i, timestamp)
}

stats.statSlots(0)("foo") shouldBe emptyStats.slotCount
stats.statSlots(1)("foo") shouldBe 1
stats.buffer(0).slotStats("foo") shouldBe emptyStats.slotCount
stats.buffer(1).slotStats("foo") shouldBe 1
}

it should "aggregate the stats of a given key" in new AggregateFixture {
Expand Down

0 comments on commit a891426

Please sign in to comment.