Skip to content

Commit

Permalink
RelayCrowd new ongoing round selector, set relay.crowdAt
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Nov 17, 2024
1 parent 2ccce23 commit d7248d4
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions src/main/scala/RelayCrowd.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ final private class RelayCrowd(roomCrowd: RoomCrowd, mongo: Mongo)(using ex: Exe

import reactivemongo.api.bson.*

/* selects the last non-finished round of each active broadcast.
* or the last round that was finished less than 2 hours ago. */
def ongoingIds: Future[Set[RoomId]] = for
tourColl <- mongo.relayTourColl
roundColl <- mongo.relayRoundColl
now = nowInstant
result <- tourColl
.aggregateWith[BSONDocument](): framework =>
import framework.*
Expand All @@ -38,22 +41,27 @@ final private class RelayCrowd(roomCrowd: RoomCrowd, mongo: Mongo)(using ex: Exe
"pipeline" -> List(
BSONDocument(
"$match" -> BSONDocument(
"$expr" -> BSONDocument(
"$and" ->
BSONArray(
BSONDocument(
BSONDocument("$eq" -> BSONArray("$tourId", "$$tourId")),
"$or" -> BSONArray(
BSONDocument("$exists" -> BSONArray("$finishedAt", false)),
BSONDocument(
"$gt" -> BSONArray(
"$finishedAt",
BSONDateTime(nowMillis - 1000 * 60 * 60 * 2) // 2 hours
)
)
)
"$expr" -> BSONDocument("$eq" -> BSONArray("$tourId", "$$tourId"))
)
),
// the following matcher finds the round to monitor
BSONDocument(
"$match" -> BSONDocument(
"$or" -> BSONArray(
// either finished less than 2 hours ago
BSONDocument("finishedAt" -> BSONDocument("$gt" -> now.minusHours(2))),
// or unfinished, and
BSONDocument(
"finishedAt" -> BSONDocument("$exists" -> false),
BSONDocument(
"$or" -> BSONArray(
// either started less than 8 hours ago
BSONDocument("startedAt" -> BSONDocument("$gt" -> now.minusHours(8))),
// or will start in the next 1 hour
BSONDocument("startsAt" -> BSONDocument("$lt" -> now.plusHours(1)))
)
)
)
)
)
),
Expand All @@ -73,9 +81,11 @@ final private class RelayCrowd(roomCrowd: RoomCrowd, mongo: Mongo)(using ex: Exe

// couldn't make update.many work
def setMembers(all: Map[RoomId, Int]): Future[Unit] = mongo.relayRoundColl.flatMap: coll =>
val crowdAt = BSONDocument("crowdAt" -> nowInstant)
all.toSeq.traverse_ { (id, crowd) =>
val set = BSONDocument("crowd" -> crowd) ++ crowdAt
coll.update.one(
q = BSONDocument("_id" -> id),
u = BSONDocument("$set" -> BSONDocument("crowd" -> crowd))
u = BSONDocument("$set" -> set)
)
}

0 comments on commit d7248d4

Please sign in to comment.