diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala index cf9ffea8..ffedb3ad 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala @@ -11,7 +11,7 @@ import scala.concurrent.duration.FiniteDuration final case class BacklogConfig( orderLifetime: FiniteDuration, orderExecutionTime: FiniteDuration, - suspendedProbability: Int + suspendedOrdersExecutionProbabilityPercent: Int ) object BacklogConfig extends Context.Companion[BacklogConfig] diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala index b67ef75c..9833dde7 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala @@ -94,7 +94,7 @@ object Executor { .handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder]) .flatMap { case Some(order) => backlog.suspend(order) - case None => ().pure[G] + case None => backlog.checkLater(order) } } } diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala index 850e7aba..c2d060e6 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala @@ -99,7 +99,7 @@ object CFMMBacklog { _ <- filterRevisit random <- GenRandom.nextInt[F](100) order <- - if (random > backlogConfig.suspendedProbability) + if (random > backlogConfig.suspendedOrdersExecutionProbabilityPercent) getMaxOrderFromQueue(pendingQueueRef) else getMaxOrderFromQueue(suspendedQueueRef) @@ -132,7 +132,7 @@ object CFMMBacklog { def filterRevisit: F[Unit] = for { curTime <- now.millis possible2pending <- - revisitOrders.modify(_.span(wOrd => (curTime - wOrd.timestamp > backlogConfig.orderExecutionTime.toMillis))) + revisitOrders.modify(_.span(wOrd => (curTime - wOrd.timestamp < backlogConfig.orderExecutionTime.toMillis))) _ <- possible2pending.traverse { case ord if (curTime - ord.timestamp) > backlogConfig.orderLifetime.toMillis => drop(ord.orderId) @@ -142,6 +142,21 @@ object CFMMBacklog { } yield () } + private def recoverPendingQueue[F[_]: Monad: Clock: BacklogConfig.Has]( + pendingQueue: Atom[F, mutable.PriorityQueue[WeightedOrder]], + cfmmOrders: CFMMOrders[F] + ): F[Unit] = for { + cfg <- BacklogConfig.access + ordersInRedis <- cfmmOrders.getAll + curTime <- now.millis + orders2Pending = ordersInRedis + .collect { + case order if (curTime - order.timestamp) < cfg.orderLifetime.toMillis => + WeightedOrder.fromOrder(order) + } + _ <- pendingQueue.update(_ ++= mutable.PriorityQueue(orders2Pending: _*)) + } yield () + final private class CFMMBacklogTracingMid[F[_]: FlatMap: Logging] extends CFMMBacklog[Mid[F, *]] { /** Put an order to the backlog. @@ -184,19 +199,4 @@ object CFMMBacklog { _ <- trace"drop(id=$id) -> $r" } yield r } - - private def recoverPendingQueue[F[_]: Monad: Clock: BacklogConfig.Has]( - pendingQueue: Atom[F, mutable.PriorityQueue[WeightedOrder]], - cfmmOrders: CFMMOrders[F] - ): F[Unit] = for { - cfg <- BacklogConfig.access - ordersInRedis <- cfmmOrders.getAll - curTime <- now.millis - orders2Pending = ordersInRedis - .collect { - case order if (curTime - order.timestamp) < cfg.orderLifetime.toMillis => - WeightedOrder.fromOrder(order) - } - _ <- pendingQueue.update(_ ++= mutable.PriorityQueue(orders2Pending: _*)) - } yield () } diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala index 04bf28fb..d2875912 100644 --- a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala @@ -12,13 +12,13 @@ package object configs { val backlogCfgWithNoSuspended = BacklogConfig( orderLifetime = 120.seconds, orderExecutionTime = 10.seconds, - suspendedProbability = -1 + suspendedOrdersExecutionProbabilityPercent = -1 ) val backlogCfgWithOnlySuspended = BacklogConfig( orderLifetime = 120.seconds, orderExecutionTime = 10.seconds, - suspendedProbability = 100 + suspendedOrdersExecutionProbabilityPercent = 100 ) object has {