Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Bromel777 committed Jul 29, 2022
1 parent 14aceb1 commit 598d933
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import scala.concurrent.duration.FiniteDuration
final case class BacklogConfig(
orderLifetime: FiniteDuration,
orderExecutionTime: FiniteDuration,
suspendedProbability: Int
suspendedOrdersExecutionProbabilityPercent: Int
)

object BacklogConfig extends Context.Companion[BacklogConfig]
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object Executor {
.handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder])
.flatMap {
case Some(order) => backlog.suspend(order)
case None => ().pure[G]
case None => backlog.checkLater(order)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ object CFMMBacklog {
_ <- filterRevisit
random <- GenRandom.nextInt[F](100)
order <-
if (random > backlogConfig.suspendedProbability)
if (random > backlogConfig.suspendedOrdersExecutionProbabilityPercent)
getMaxOrderFromQueue(pendingQueueRef)
else
getMaxOrderFromQueue(suspendedQueueRef)
Expand Down Expand Up @@ -132,7 +132,7 @@ object CFMMBacklog {
def filterRevisit: F[Unit] = for {
curTime <- now.millis
possible2pending <-
revisitOrders.modify(_.span(wOrd => (curTime - wOrd.timestamp > backlogConfig.orderExecutionTime.toMillis)))
revisitOrders.modify(_.span(wOrd => (curTime - wOrd.timestamp < backlogConfig.orderExecutionTime.toMillis)))
_ <- possible2pending.traverse {
case ord if (curTime - ord.timestamp) > backlogConfig.orderLifetime.toMillis =>
drop(ord.orderId)
Expand All @@ -142,6 +142,21 @@ object CFMMBacklog {
} yield ()
}

private def recoverPendingQueue[F[_]: Monad: Clock: BacklogConfig.Has](
pendingQueue: Atom[F, mutable.PriorityQueue[WeightedOrder]],
cfmmOrders: CFMMOrders[F]
): F[Unit] = for {
cfg <- BacklogConfig.access
ordersInRedis <- cfmmOrders.getAll
curTime <- now.millis
orders2Pending = ordersInRedis
.collect {
case order if (curTime - order.timestamp) < cfg.orderLifetime.toMillis =>
WeightedOrder.fromOrder(order)
}
_ <- pendingQueue.update(_ ++= mutable.PriorityQueue(orders2Pending: _*))
} yield ()

final private class CFMMBacklogTracingMid[F[_]: FlatMap: Logging] extends CFMMBacklog[Mid[F, *]] {

/** Put an order to the backlog.
Expand Down Expand Up @@ -184,19 +199,4 @@ object CFMMBacklog {
_ <- trace"drop(id=$id) -> $r"
} yield r
}

private def recoverPendingQueue[F[_]: Monad: Clock: BacklogConfig.Has](
pendingQueue: Atom[F, mutable.PriorityQueue[WeightedOrder]],
cfmmOrders: CFMMOrders[F]
): F[Unit] = for {
cfg <- BacklogConfig.access
ordersInRedis <- cfmmOrders.getAll
curTime <- now.millis
orders2Pending = ordersInRedis
.collect {
case order if (curTime - order.timestamp) < cfg.orderLifetime.toMillis =>
WeightedOrder.fromOrder(order)
}
_ <- pendingQueue.update(_ ++= mutable.PriorityQueue(orders2Pending: _*))
} yield ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ package object configs {
val backlogCfgWithNoSuspended = BacklogConfig(
orderLifetime = 120.seconds,
orderExecutionTime = 10.seconds,
suspendedProbability = -1
suspendedOrdersExecutionProbabilityPercent = -1
)

val backlogCfgWithOnlySuspended = BacklogConfig(
orderLifetime = 120.seconds,
orderExecutionTime = 10.seconds,
suspendedProbability = 100
suspendedOrdersExecutionProbabilityPercent = 100
)

object has {
Expand Down

0 comments on commit 598d933

Please sign in to comment.