From 14aceb1926bd8481bafb49f443c31fcc0b6f681e Mon Sep 17 00:00:00 2001 From: Bromel777 Date: Thu, 21 Jul 2022 23:38:09 +0300 Subject: [PATCH] backlog service --- build.sbt | 2 +- .../src/main/resources/application.conf | 4 +- .../ergoplatform/dex/executor/amm/App.scala | 24 ++- .../executor/amm/config/BacklogConfig.scala | 17 ++ .../executor/amm/config/ConfigBundle.scala | 5 +- .../dex/executor/amm/config/Consumers.scala | 1 + .../executor/amm/config/ExecutionConfig.scala | 2 +- .../executor/amm/modules/CFMMBacklog.scala | 18 -- .../dex/executor/amm/processes/Executor.scala | 71 +++--- .../amm/repositories/CFMMOrders.scala | 80 +++++++ .../executor/amm/services/CFMMBacklog.scala | 202 ++++++++++++++++++ .../dex/executor/amm/streaming.scala | 11 +- .../dex/executor/amm/configs/package.scala | 40 ++++ .../amm/generators/CFMMOrdersGenerator.scala | 31 +++ .../amm/generators/ErgoTreeGenerator.scala | 38 ++++ .../amm/generators/OrdersGenerator.scala | 77 +++++++ .../amm/services/CFMMBacklogTests.scala | 110 ++++++++++ .../dex/executor/amm/utils/Ordering.scala | 14 ++ .../dex/executor/amm/utils/package.scala | 31 +++ .../org/ergoplatform/common/cache/Cache.scala | 70 ++++-- .../ergoplatform/common/cache/errors.scala | 5 + .../dex/domain/amm/CFMMOrder.scala | 6 + .../ergoplatform/dex/domain/amm/package.scala | 19 ++ 23 files changed, 808 insertions(+), 70 deletions(-) create mode 100644 modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala delete mode 100644 modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala create mode 100644 modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/repositories/CFMMOrders.scala create mode 100644 modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala create mode 100644 modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala create mode 100644 modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CFMMOrdersGenerator.scala create mode 100644 modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/ErgoTreeGenerator.scala create mode 100644 modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/OrdersGenerator.scala create mode 100644 modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklogTests.scala create mode 100644 modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/Ordering.scala create mode 100644 modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/package.scala diff --git a/build.sbt b/build.sbt index 1b574dc9..7f12328c 100644 --- a/build.sbt +++ b/build.sbt @@ -158,7 +158,7 @@ lazy val ammExecutor = utils ) .settings(nativePackagerSettings("amm-executor")) .enablePlugins(JavaAppPackaging, UniversalPlugin, DockerPlugin) - .dependsOn(Seq(core, http).map(_ % allConfigDependency): _*) + .dependsOn(Seq(core, http, cache).map(_ % allConfigDependency): _*) lazy val poolResolver = utils .mkModule("pool-resolver", "PoolResolver") diff --git a/modules/amm-executor/src/main/resources/application.conf b/modules/amm-executor/src/main/resources/application.conf index e4dbe859..f72ea6b7 100644 --- a/modules/amm-executor/src/main/resources/application.conf +++ b/modules/amm-executor/src/main/resources/application.conf @@ -2,7 +2,9 @@ rotation.retry-delay = 120s exchange.reward-address = "9gCigPc9cZNRhKgbgdmTkVxo1ZKgw79G8DvLjCcYWAvEF3XRUKy" -execution.order-lifetime = 300s +backlogConfig.order-lifetime = 300s +backlogConfig.order-execution-time = 180s +backlogConfig.suspended-probability = 10 monetary.miner-fee = 2000000 monetary.min-dex-fee = 1000000 diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala index ada1e081..b9cda408 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala @@ -6,16 +6,17 @@ import fs2.kafka.RecordDeserializer import fs2.kafka.serde._ import org.ergoplatform.ErgoAddressEncoder import org.ergoplatform.common.EnvApp +import org.ergoplatform.common.cache.{Cache, MakeRedisTransaction, Redis} import org.ergoplatform.common.streaming._ import org.ergoplatform.dex.configs.ConsumerConfig -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId} import org.ergoplatform.dex.executor.amm.config.ConfigBundle import org.ergoplatform.dex.executor.amm.context.AppContext import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMInterpreter, T2TCFMMInterpreter} import org.ergoplatform.dex.executor.amm.processes.Executor -import org.ergoplatform.dex.executor.amm.repositories.CFMMPools -import org.ergoplatform.dex.executor.amm.services.Execution -import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMProducerRetries} +import org.ergoplatform.dex.executor.amm.repositories.{CFMMOrders, CFMMPools} +import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution} +import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMHistConsumer, CFMMProducerRetries} import org.ergoplatform.dex.protocol.amm.AMMType.{CFMMType, N2T_CFMM, T2T_CFMM} import org.ergoplatform.ergo.modules.ErgoNetwork import org.ergoplatform.ergo.services.explorer.{ErgoExplorer, ErgoExplorerStreaming} @@ -27,6 +28,7 @@ import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend import tofu.WithRun import tofu.fs2Instances._ +import tofu.generate.GenRandom import tofu.lift.IsoK import tofu.syntax.unlift._ import zio.interop.catz._ @@ -40,10 +42,13 @@ object App extends EnvApp[AppContext] { appF.run(ctx) as ExitCode.success }.orDie + implicit val mtx: MakeRedisTransaction[RunF] = MakeRedisTransaction.make[RunF] + private def init(configPathOpt: Option[String]): Resource[InitF, (Executor[StreamF], AppContext)] = for { blocker <- Blocker[InitF] configs <- Resource.eval(ConfigBundle.load[InitF](configPathOpt, blocker)) + implicit0(genRand: GenRandom[RunF]) <- Resource.eval(GenRandom.instance[InitF, RunF]()) ctx = AppContext.init(configs) implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx) implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix) @@ -51,6 +56,8 @@ object App extends EnvApp[AppContext] { makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders) implicit0(unconfirmedOrders: CFMMConsumerIn[StreamF, RunF, Unconfirmed]) = makeConsumer[OrderId, Unconfirmed[CFMMOrder]](configs.consumers.unconfirmedOrders) + implicit0(ammHistCons: CFMMHistConsumer[StreamF, RunF]) = + makeConsumer[OrderId, Option[EvaluatedCFMMOrder.Any]](configs.consumers.cfmmHistory) implicit0(consumerRetries: CFMMConsumerRetries[StreamF, RunF]) = makeConsumer[OrderId, Delayed[CFMMOrder]](configs.consumers.ordersRetry) implicit0(orders: CFMMConsumerIn[StreamF, RunF, Id]) = @@ -66,8 +73,13 @@ object App extends EnvApp[AppContext] { implicit0(t2tInt: CFMMInterpreter[T2T_CFMM, RunF]) <- Resource.eval(T2TCFMMInterpreter.make[InitF, RunF]) implicit0(n2tInt: CFMMInterpreter[N2T_CFMM, RunF]) <- Resource.eval(N2TCFMMInterpreter.make[InitF, RunF]) implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) = CFMMInterpreter.make[RunF] - implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF]) - executor <- Resource.eval(Executor.make[InitF, StreamF, RunF]) + implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF]) + implicit0(redis: Redis.Plain[RunF]) <- Redis.make[InitF, RunF](configs.redis) + implicit0(cache: Cache[RunF]) <- Resource.eval(Cache.make[InitF, RunF]) + implicit0(cfmmOrders: CFMMOrders[RunF]) <- Resource.eval[InitF, CFMMOrders[RunF]](CFMMOrders.make[InitF, RunF]) + implicit0(cfmmBacklog: CFMMBacklog[RunF]) <- + Resource.eval[InitF, CFMMBacklog[RunF]](CFMMBacklog.make[InitF, RunF]) + executor <- Resource.eval(Executor.make[InitF, StreamF, RunF]) } yield executor -> ctx private def makeBackend( diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala new file mode 100644 index 00000000..cf9ffea8 --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala @@ -0,0 +1,17 @@ +package org.ergoplatform.dex.executor.amm.config + +import derevo.derive +import derevo.pureconfig.pureconfigReader +import tofu.Context +import tofu.logging.derivation.loggable + +import scala.concurrent.duration.FiniteDuration + +@derive(pureconfigReader, loggable) +final case class BacklogConfig( + orderLifetime: FiniteDuration, + orderExecutionTime: FiniteDuration, + suspendedProbability: Int +) + +object BacklogConfig extends Context.Companion[BacklogConfig] diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala index 6a534573..6e98bb8c 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala @@ -2,11 +2,12 @@ package org.ergoplatform.dex.executor.amm.config import derevo.derive import derevo.pureconfig.pureconfigReader +import org.ergoplatform.common.cache.RedisConfig import org.ergoplatform.common.streaming.RotationConfig import org.ergoplatform.dex.configs._ import tofu.Context import tofu.logging.derivation.loggable -import tofu.optics.macros.{promote, ClassyOptics} +import tofu.optics.macros.{ClassyOptics, promote} @derive(pureconfigReader, loggable) @ClassyOptics @@ -16,7 +17,9 @@ final case class ConfigBundle( @promote execution: ExecutionConfig, @promote monetary: MonetaryConfig, @promote protocol: ProtocolConfig, + @promote backlogConfig: BacklogConfig, consumers: Consumers, + redis: RedisConfig, producers: Producers, @promote kafka: KafkaConfig, @promote network: NetworkConfig, diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala index a28ff859..89218ca5 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala @@ -8,6 +8,7 @@ import tofu.logging.derivation.loggable @derive(pureconfigReader, loggable) final case class Consumers( confirmedOrders: ConsumerConfig, + cfmmHistory: ConsumerConfig, unconfirmedOrders: ConsumerConfig, ordersRetry: ConsumerConfig ) diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ExecutionConfig.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ExecutionConfig.scala index 1ae0489b..3f8124be 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ExecutionConfig.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ExecutionConfig.scala @@ -8,6 +8,6 @@ import tofu.logging.derivation.loggable import scala.concurrent.duration.FiniteDuration @derive(pureconfigReader, loggable) -final case class ExecutionConfig(orderLifetime: FiniteDuration) +final case class ExecutionConfig(order: FiniteDuration) object ExecutionConfig extends Context.Companion[ExecutionConfig] \ No newline at end of file diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala deleted file mode 100644 index 41a413ce..00000000 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala +++ /dev/null @@ -1,18 +0,0 @@ -package org.ergoplatform.dex.executor.amm.modules - -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} - -trait CFMMBacklog[F[_]] { - - /** Put an order to the backlog. - */ - def put(order: CFMMOrder): F[Unit] - - /** Get candidate order for execution. Blocks until an order is available. - */ - def get: F[CFMMOrder] - - /** Put an order from the backlog. - */ - def drop(id: OrderId): F[Unit] -} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala index 747ee2cb..b67ef75c 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala @@ -1,21 +1,22 @@ package org.ergoplatform.dex.executor.amm.processes -import cats.effect.Clock +import cats.effect.{Clock, Timer} import cats.syntax.option._ -import cats.{Functor, Monad} +import cats.syntax.traverse._ +import cats.{Defer, Functor, Monad, SemigroupK} import derevo.derive import mouse.any._ import org.ergoplatform.common.TraceId import org.ergoplatform.common.streaming.syntax._ import org.ergoplatform.dex.domain.amm.CFMMOrder import org.ergoplatform.dex.executor.amm.config.ExecutionConfig -import org.ergoplatform.dex.executor.amm.services.Execution -import org.ergoplatform.dex.executor.amm.streaming.CFMMCircuit +import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution} +import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMHistConsumer} import org.ergoplatform.ergo.services.explorer.TxSubmissionErrorParser import tofu.Catches import tofu.higherKind.derived.representableK import tofu.logging.{Logging, Logs} -import tofu.streams.Evals +import tofu.streams.{Evals, ParFlatten} import tofu.syntax.context._ import tofu.syntax.embed._ import tofu.syntax.handle._ @@ -34,10 +35,12 @@ object Executor { def make[ I[_]: Functor, - F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has, - G[_]: Monad: TraceId.Local: Clock: Catches + F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has: Defer: SemigroupK: ParFlatten, + G[_]: Monad: TraceId.Local: Clock: Catches: Timer ](implicit orders: CFMMCircuit[F, G], + executedOrders: CFMMHistConsumer[F, G], + cfmmBacklog: CFMMBacklog[G], service: Execution[G], logs: Logs[I, G] ): I[Executor[F]] = @@ -48,34 +51,50 @@ object Executor { } final private class Live[ - F[_]: Monad: Evals[*[_], G], - G[_]: Monad: Logging: TraceId.Local: Clock: Catches + F[_]: Monad: Evals[*[_], G]: Defer: SemigroupK: ParFlatten, + G[_]: Monad: Logging: Catches: Timer ](conf: ExecutionConfig)(implicit orders: CFMMCircuit[F, G], + executedOrders: CFMMHistConsumer[F, G], + backlog: CFMMBacklog[G], service: Execution[G], errParser: TxSubmissionErrorParser ) extends Executor[F] { def run: F[Unit] = + emits( + List( + addToBacklog, + executeOrders, + dropExecuted + ) + ).parFlattenUnbounded + + def addToBacklog: F[Unit] = orders.stream - .evalMap { rec => - service - .executeAttempt(rec.message) - .handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder]) - .local(_ => TraceId.fromString(rec.message.id.value)) - .tupleLeft(rec) + .evalTap(orderRec => backlog.put(orderRec.message)) + .evalMap(_.commit) + + def dropExecuted: F[Unit] = + executedOrders.stream + .evalTap { rec => + rec.message.traverse(order => backlog.drop(order.order.id)) } - .flatTap { - case (_, None) => unit[F] - case (_, Some(order)) => - eval(now.millis) >>= { - case ts if ts - order.timestamp < conf.orderLifetime.toMillis => - eval(warn"Failed to execute $order. Going to retry.") >> - orders.retry((order.id -> order).pure[F]) - case _ => - eval(warn"Failed to execute $order. Order expired.") - } + .evalMap(_.commit) + + def executeOrders: F[Unit] = + eval(backlog.get).evalMap { + case Some(order) => executeOrder(order) + case None => trace"No orders to execute. Going to wait for" >> Timer[G].sleep(conf.order) + }.repeat + + private def executeOrder(order: CFMMOrder): G[Unit] = + service + .executeAttempt(order) + .handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder]) + .flatMap { + case Some(order) => backlog.suspend(order) + case None => ().pure[G] } - .evalMap { case (rec, _) => rec.commit } } } diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/repositories/CFMMOrders.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/repositories/CFMMOrders.scala new file mode 100644 index 00000000..be7461c1 --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/repositories/CFMMOrders.scala @@ -0,0 +1,80 @@ +package org.ergoplatform.dex.executor.amm.repositories + +import cats.{FlatMap, Functor} +import derevo.derive +import org.ergoplatform.common.cache.Cache +import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.executor.amm.repositories.CFMMPools.{CFMMPoolsTracing, Live} +import tofu.higherKind.Mid +import tofu.higherKind.derived.representableK +import tofu.logging.{Logging, Logs} +import tofu.syntax.logging._ +import tofu.syntax.monadic._ + +@derive(representableK) +trait CFMMOrders[F[_]] { + + def put(order: CFMMOrder): F[Unit] + + def exists(orderId: OrderId): F[Boolean] + + def drop(orderId: OrderId): F[Unit] + + def get(orderId: OrderId): F[Option[CFMMOrder]] + + def getAll: F[List[CFMMOrder]] +} + +object CFMMOrders { + + def make[I[_]: Functor, F[_]: FlatMap](implicit logs: Logs[I, F], cache: Cache[F]): I[CFMMOrders[F]] = + logs.forService[CFMMOrders[F]].map { implicit logging => + new CFMMOrdersTracingMid[F] attach new Live[F](cache) + } + + final private class Live[F[_]](cache: Cache[F]) extends CFMMOrders[F] { + + def put(order: CFMMOrder): F[Unit] = cache.set(order.id, order) + + def exists(orderId: OrderId): F[Boolean] = cache.exists(orderId) + + def drop(orderId: OrderId): F[Unit] = cache.del(orderId) + + def get(orderId: OrderId): F[Option[CFMMOrder]] = cache.get[OrderId, CFMMOrder](orderId) + + def getAll: F[List[CFMMOrder]] = cache.getAll + } + + final private class CFMMOrdersTracingMid[F[_]: FlatMap: Logging] extends CFMMOrders[Mid[F, *]] { + + def put(order: CFMMOrder): Mid[F, Unit] = for { + _ <- trace"put(order=$order)" + r <- _ + _ <- trace"put(order=$order) -> $r" + } yield r + + def exists(orderId: OrderId): Mid[F, Boolean] = for { + _ <- trace"exists(orderId=$orderId)" + r <- _ + _ <- trace"exists(orderId=$orderId) -> $r" + } yield r + + def drop(orderId: OrderId): Mid[F, Unit] = for { + _ <- trace"drop(orderId=$orderId)" + r <- _ + _ <- trace"drop(orderId=$orderId) -> $r" + } yield r + + def get(orderId: OrderId): Mid[F, Option[CFMMOrder]] = for { + _ <- trace"checkLater(order=$orderId)" + r <- _ + _ <- trace"checkLater(order=$orderId) -> $r" + } yield r + + def getAll: Mid[F, List[CFMMOrder]] = for { + _ <- trace"getAll()" + r <- _ + _ <- trace"getAll() -> length: ${r.length}" + } yield r + } +} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala new file mode 100644 index 00000000..850e7aba --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala @@ -0,0 +1,202 @@ +package org.ergoplatform.dex.executor.amm.services + +import cats.effect.{Clock, Sync} +import cats.{FlatMap, Monad} +import derevo.derive +import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId, WeightedOrder} +import org.ergoplatform.dex.executor.amm.repositories.CFMMOrders +import tofu.concurrent.{Atom, MakeAtom} +import cats.syntax.traverse._ +import cats.syntax.option._ +import org.ergoplatform.dex.executor.amm.config.BacklogConfig +import tofu.concurrent.MakeAtom._ +import tofu.generate.GenRandom +import tofu.higherKind.Mid +import tofu.higherKind.derived.representableK +import tofu.lift.IsoK +import tofu.logging.{Logging, Logs} +import tofu.syntax.logging._ +import tofu.syntax.monadic._ +import tofu.syntax.embed._ +import tofu.syntax.time.now + +import scala.collection.mutable + +@derive(representableK) +trait CFMMBacklog[F[_]] { + + /** Put an order to the backlog. + */ + def put(order: CFMMOrder): F[Unit] + + /** Put an order with priceTooHigh or priceTooLow execution err + */ + def suspend(order: CFMMOrder): F[Unit] + + /** Put possibly executed order to the backlog + */ + def checkLater(order: CFMMOrder): F[Unit] + + /** Get candidate order for execution. Blocks until an order is available. + */ + def get: F[Option[CFMMOrder]] + + /** Put an order from the backlog. + */ + def drop(id: OrderId): F[Unit] +} + +object CFMMBacklog { + + def make[I[_]: Sync, F[_]: Sync: BacklogConfig.Has: Clock: GenRandom](implicit + logs: Logs[I, F], + cfmmOrders: CFMMOrders[F], + isoKGI: IsoK[F, I] + ): I[CFMMBacklog[F]] = for { + implicit0(logging: Logging[F]) <- logs.forService[CFMMBacklog[F]] + pendingQueueRef <- MakeAtom[I, F].of(mutable.PriorityQueue.empty[WeightedOrder]) + suspendedQueueRef <- MakeAtom[I, F].of(mutable.PriorityQueue.empty[WeightedOrder]) + revisitOrders <- MakeAtom[I, F].of(List.empty[WeightedOrder]) + _ <- isoKGI.to(recoverPendingQueue[F](pendingQueueRef, cfmmOrders)) + } yield BacklogConfig.access + .map(cfg => + new CFMMBacklogTracingMid[F] attach new Live[F]( + pendingQueueRef, + suspendedQueueRef, + revisitOrders, + cfmmOrders, + cfg + ) + ) + .embed + + final private class Live[F[_]: Monad: Clock: GenRandom]( + pendingQueueRef: Atom[F, mutable.PriorityQueue[WeightedOrder]], + suspendedQueueRef: Atom[F, mutable.PriorityQueue[WeightedOrder]], + revisitOrders: Atom[F, List[WeightedOrder]], + cfmmOrders: CFMMOrders[F], + backlogConfig: BacklogConfig + ) extends CFMMBacklog[F] { + + /** Put an order to the backlog. + */ + def put(order: CFMMOrder): F[Unit] = + cfmmOrders.put(order) >> pendingQueueRef.update(_ += WeightedOrder.fromOrder(order)) + + /** Put an order with priceTooHigh or priceTooLow execution err + */ + def suspend(order: CFMMOrder): F[Unit] = + suspendedQueueRef.update(_ += WeightedOrder.fromOrder(order)) + + /** Put possibly executed order to the backlog + */ + def checkLater(order: CFMMOrder): F[Unit] = + revisitOrders.update(_ :+ WeightedOrder.fromOrder(order)) + + /** Get candidate order for execution. Blocks until an order is available. + */ + def get: F[Option[CFMMOrder]] = for { + _ <- filterRevisit + random <- GenRandom.nextInt[F](100) + order <- + if (random > backlogConfig.suspendedProbability) + getMaxOrderFromQueue(pendingQueueRef) + else + getMaxOrderFromQueue(suspendedQueueRef) + } yield order + + /** Put an order from the backlog. + */ + def drop(id: OrderId): F[Unit] = + cfmmOrders.drop(id) + + def getMaxOrderFromQueue(queue: Atom[F, mutable.PriorityQueue[WeightedOrder]]): F[Option[CFMMOrder]] = + for { + maxId <- queue.modify { queue => + val elem = if (queue.isEmpty) none else queue.dequeue().some + (queue, elem) + } + time <- now.millis + suspendedElem <- maxId match { + case Some(value) if (time - value.timestamp) < backlogConfig.orderLifetime.toMillis => + cfmmOrders.get(value.orderId) >>= { + case Some(value) => value.some.pure[F] + case None => getMaxOrderFromQueue(queue) + } + case Some(value) => + cfmmOrders.drop(value.orderId) >> getMaxOrderFromQueue(queue) + case None => none.pure[F] + } + } yield suspendedElem + + def filterRevisit: F[Unit] = for { + curTime <- now.millis + possible2pending <- + revisitOrders.modify(_.span(wOrd => (curTime - wOrd.timestamp > backlogConfig.orderExecutionTime.toMillis))) + _ <- possible2pending.traverse { + case ord if (curTime - ord.timestamp) > backlogConfig.orderLifetime.toMillis => + drop(ord.orderId) + case ord => + pendingQueueRef.update(_ += ord) + } + } yield () + } + + final private class CFMMBacklogTracingMid[F[_]: FlatMap: Logging] extends CFMMBacklog[Mid[F, *]] { + + /** Put an order to the backlog. + */ + override def put(order: CFMMOrder): Mid[F, Unit] = for { + _ <- trace"put(order=$order)" + r <- _ + _ <- trace"put(order=$order) -> $r" + } yield r + + /** Put an order with priceTooHigh or priceTooLow execution err + */ + override def suspend(order: CFMMOrder): Mid[F, Unit] = for { + _ <- trace"suspend(order=$order)" + r <- _ + _ <- trace"suspend(order=$order) -> $r" + } yield r + + /** Put possibly executed order to the backlog + */ + override def checkLater(order: CFMMOrder): Mid[F, Unit] = for { + _ <- trace"checkLater(order=$order)" + r <- _ + _ <- trace"checkLater(order=$order) -> $r" + } yield r + + /** Get candidate order for execution. Blocks until an order is available. + */ + override def get: Mid[F, Option[CFMMOrder]] = for { + _ <- trace"get()" + r <- _ + _ <- trace"get() -> $r" + } yield r + + /** Put an order from the backlog. + */ + override def drop(id: OrderId): Mid[F, Unit] = for { + _ <- trace"drop(id=$id)" + r <- _ + _ <- trace"drop(id=$id) -> $r" + } yield r + } + + private def recoverPendingQueue[F[_]: Monad: Clock: BacklogConfig.Has]( + pendingQueue: Atom[F, mutable.PriorityQueue[WeightedOrder]], + cfmmOrders: CFMMOrders[F] + ): F[Unit] = for { + cfg <- BacklogConfig.access + ordersInRedis <- cfmmOrders.getAll + curTime <- now.millis + orders2Pending = ordersInRedis + .collect { + case order if (curTime - order.timestamp) < cfg.orderLifetime.toMillis => + WeightedOrder.fromOrder(order) + } + _ <- pendingQueue.update(_ ++= mutable.PriorityQueue(orders2Pending: _*)) + } yield () +} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala index 8ee7e122..b29fbe3f 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala @@ -2,13 +2,16 @@ package org.ergoplatform.dex.executor.amm import fs2.kafka.types.KafkaOffset import org.ergoplatform.common.streaming._ -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId} +import org.ergoplatform.ergo.BoxId object streaming { - type CFMMConsumerIn[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] - type CFMMConsumerRetries[F[_], G[_]] = Consumer.Aux[OrderId, Delayed[CFMMOrder], KafkaOffset, F, G] - type CFMMProducerRetries[F[_]] = Producer[OrderId, Delayed[CFMMOrder], F] + type CFMMConsumerIn[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] + type CFMMConsumerInId[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[BoxId], KafkaOffset, F, G] + type CFMMConsumerRetries[F[_], G[_]] = Consumer.Aux[OrderId, Delayed[CFMMOrder], KafkaOffset, F, G] + type CFMMHistConsumer[F[_], G[_]] = Consumer.Aux[OrderId, Option[EvaluatedCFMMOrder.Any], KafkaOffset, F, G] + type CFMMProducerRetries[F[_]] = Producer[OrderId, Delayed[CFMMOrder], F] type CFMMCircuit[F[_], G[_]] = StreamingCircuit[OrderId, CFMMOrder, F, G] } diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala new file mode 100644 index 00000000..04bf28fb --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala @@ -0,0 +1,40 @@ +package org.ergoplatform.dex.executor.amm + +import cats.Functor +import cats.effect.IO +import org.ergoplatform.dex.executor.amm.config.BacklogConfig +import tofu.WithContext + +import scala.concurrent.duration._ + +package object configs { + + val backlogCfgWithNoSuspended = BacklogConfig( + orderLifetime = 120.seconds, + orderExecutionTime = 10.seconds, + suspendedProbability = -1 + ) + + val backlogCfgWithOnlySuspended = BacklogConfig( + orderLifetime = 120.seconds, + orderExecutionTime = 10.seconds, + suspendedProbability = 100 + ) + + object has { + + val cfgWithNoSuspended: BacklogConfig.Has[IO] = new WithContext[IO, BacklogConfig] { + + override def functor: Functor[IO] = Functor[IO] + + override def context: IO[BacklogConfig] = IO.pure(backlogCfgWithNoSuspended) + } + + val cfgWithOnlySuspended: BacklogConfig.Has[IO] = new WithContext[IO, BacklogConfig] { + + override def functor: Functor[IO] = Functor[IO] + + override def context: IO[BacklogConfig] = IO.pure(backlogCfgWithOnlySuspended) + } + } +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CFMMOrdersGenerator.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CFMMOrdersGenerator.scala new file mode 100644 index 00000000..2aa3a6c7 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CFMMOrdersGenerator.scala @@ -0,0 +1,31 @@ +package org.ergoplatform.dex.executor.amm.generators + +import cats.effect.Sync +import org.ergoplatform.dex.domain.amm +import org.ergoplatform.dex.domain.amm.CFMMOrder +import org.ergoplatform.dex.executor.amm.repositories.CFMMOrders +import tofu.concurrent.MakeAtom +import tofu.syntax.monadic._ + +object CFMMOrdersGenerator { + + def genMapBased[I[_]: Sync, F[_]: Sync]: I[CFMMOrders[F]] = for { + map <- MakeAtom[I, F].of(Map.empty[amm.OrderId, CFMMOrder]) + } yield (new CFMMOrders[F] { + + override def put(order: CFMMOrder): F[Unit] = + map.update(_ + (order.id -> order)) + + override def exists(orderId: amm.OrderId): F[Boolean] = + map.get.map(_.exists(_._1 == orderId)) + + override def drop(orderId: amm.OrderId): F[Unit] = + map.update(_.filter(_._1 != orderId)) + + override def get(orderId: amm.OrderId): F[Option[CFMMOrder]] = + map.get.map(_.get(orderId)) + + override def getAll: F[List[CFMMOrder]] = + map.get.map(_.values.toList) + }) +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/ErgoTreeGenerator.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/ErgoTreeGenerator.scala new file mode 100644 index 00000000..7f1f610f --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/ErgoTreeGenerator.scala @@ -0,0 +1,38 @@ +package org.ergoplatform.dex.executor.amm.generators + +import org.ergoplatform.ErgoAddressEncoder +import org.ergoplatform.dex.protocol.{sigmaUtils, ErgoTreeSerializer} +import org.ergoplatform.dex.sources.n2tContracts +import scorex.util.encode.Base16 +import sigmastate.Values.ErgoTree +import sigmastate.lang.Terms.ValueOps +import sigmastate.basics.DLogProtocol.DLogProverInput +import sigmastate.eval.{CompiletimeIRContext, IRContext} +import sigmastate.lang.SigmaCompiler + +object ErgoTreeGenerator { + + implicit private val IR: IRContext = new CompiletimeIRContext() + val sigma = SigmaCompiler(ErgoAddressEncoder.MainnetNetworkPrefix) + + val env = Map( + "Pk" -> DLogProverInput(BigInt(Long.MaxValue).bigInteger).publicImage, + "PoolNFT" -> Array.fill(32)(0: Byte), + "QuoteId" -> Array.fill(32)(1.toByte), + "DexFee" -> 999999L, + "SelfX" -> 888888L, + "MaxMinerFee" -> 777777L, + "MinerPropBytes" -> Base16 + .decode( + "1005040004000e36100204a00b08cd0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798ea02d192a39a8cc7a701730073011001020402d19683030193a38cc7b2a57300000193c2b2a57301007473027303830108cdeeac93b1a57304" + ) + .get + ) + + val source = n2tContracts.swapSell + + val tree = + sigmaUtils.updateVersionHeader(ErgoTree.fromProposition(sigma.compile(env, source).asSigmaProp)) + + val serializedSwapTree = ErgoTreeSerializer.default.serialize(tree) +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/OrdersGenerator.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/OrdersGenerator.scala new file mode 100644 index 00000000..15cf7cbb --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/OrdersGenerator.scala @@ -0,0 +1,77 @@ +package org.ergoplatform.dex.executor.amm.generators + +import cats.effect.{Clock, Sync} +import org.ergoplatform.dex.domain.amm.{PoolId, Swap, SwapParams} +import cats.syntax.traverse._ +import org.ergoplatform.common.HexString +import org.ergoplatform.dex.domain.AssetAmount +import org.ergoplatform.dex.executor.amm.generators.ErgoTreeGenerator.serializedSwapTree +import org.ergoplatform.ergo.{BoxId, PubKey, TokenId, TxId} +import org.ergoplatform.ergo.domain.{BoxAsset, Output} +import tofu.generate.GenRandom +import tofu.syntax.time.now +import tofu.syntax.monadic._ + +import scala.util.Random + +object OrdersGenerator { + + def genSwapOrders[F[+_]: Sync: Clock: GenRandom](qty: Int): F[List[Swap]] = + (0 until qty).toList.traverse(_ => genSwapOrder[F]) + + def genSwapOrder[F[_]: Sync: GenRandom: Clock]: F[Swap] = for { + timestamp <- now.millis + maxMinerFee <- GenRandom.nextLong + tokenHex <- randomHexString + inputAsset <- genAssetAmount + params <- genDummySwapParams(inputAsset) + box <- genDummyOutput(inputAsset) + } yield Swap( + PoolId(TokenId(tokenHex)), + maxMinerFee, + timestamp, + params, + box + ) + + def genDummySwapParams[F[_]: GenRandom: Sync](input: AssetAmount): F[SwapParams] = for { + minOutput <- genAssetAmount + dexFeePerTokenNum <- GenRandom.nextLong + dexFeePerTokenDenom <- GenRandom.nextLong + pubKeyHex <- randomHexString + } yield SwapParams( + input, + minOutput, + dexFeePerTokenNum, + dexFeePerTokenDenom, + PubKey(pubKeyHex) + ) + + def genDummyOutput[F[_]: Sync: GenRandom](input: AssetAmount): F[Output] = for { + boxIdRaw <- randomString(32) + txIdRaw <- randomString(32) + value <- GenRandom.nextLong + index <- GenRandom.nextInt(32) + creationHeight <- GenRandom.nextInt(32) + } yield Output( + BoxId(boxIdRaw), + TxId(txIdRaw), + value, + index, + creationHeight, + serializedSwapTree, + List(BoxAsset(input.id, input.value)), + Map.empty + ) + + def genAssetAmount[F[_]: GenRandom: Sync]: F[AssetAmount] = for { + assetId <- randomHexString + value <- GenRandom.nextLong + } yield AssetAmount(TokenId(assetId), value) + + def randomHexString[F[_]: Sync] = + randomString(32).map(str => HexString.fromBytes(str.getBytes)) + + def randomString[F[_]: Sync](length: Int): F[String] = + Sync[F].delay(Random.alphanumeric.take(length).mkString) +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklogTests.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklogTests.scala new file mode 100644 index 00000000..7c21f6b0 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklogTests.scala @@ -0,0 +1,110 @@ +package org.ergoplatform.dex.executor.amm.services + +import cats.effect.IO +import cats.syntax.traverse._ +import org.ergoplatform.dex.domain.amm.{CFMMOrder, WeightedOrder} +import org.scalatest.flatspec.AnyFlatSpec +import org.ergoplatform.dex.executor.amm._ +import org.scalatest.matchers.should + +import org.ergoplatform.dex.executor.amm.generators._ +import org.ergoplatform.dex.executor.amm.utils.isoK._ +import org.ergoplatform.dex.executor.amm.repositories.CFMMOrders +import org.ergoplatform.dex.executor.amm.utils.Ordering.checkDescSort +import org.ergoplatform.dex.executor.amm.utils.genRandoms.genRandom +import tofu.logging.Logs + +import scala.concurrent.ExecutionContext + +class CFMMBacklogTests extends AnyFlatSpec with should.Matchers { + + implicit val timer = IO.timer(ExecutionContext.global) + + implicit val logs = Logs.empty[IO, IO] + + "CFMMBacklog" should "correctly retry non executed orders" in { + + val ordersQty = 100 + + implicit val cfgHas = configs.has.cfgWithOnlySuspended + + val ordersFromBacklog = for { + implicit0(cfmmOrders: CFMMOrders[IO]) <- CFMMOrdersGenerator.genMapBased[IO, IO] + orders <- OrdersGenerator.genSwapOrders[IO](ordersQty) + suspendedOrders = + orders.map(order => + order.copy(timestamp = order.timestamp - configs.backlogCfgWithOnlySuspended.orderExecutionTime.toMillis * 2) + ) + backlog <- CFMMBacklog.make[IO, IO] + _ <- suspendedOrders.traverse(cfmmOrders.put) + _ <- suspendedOrders.traverse(backlog.suspend) + fromBacklog <- suspendedOrders.traverse(_ => backlog.get) + } yield fromBacklog + + val ordersOpts = ordersFromBacklog.unsafeRunSync() + + val orders = ordersOpts.collect[CFMMOrder, List[CFMMOrder]] { case Some(order) => + order + } + + orders.length shouldBe ordersQty + + checkDescSort(orders.map(WeightedOrder.fromOrder)) shouldBe true + } + "CFMMBacklog" should "correctly drop outdated orders" in { + + val normalOrdersQty = 50 + + val outdatedOrdersQty = 50 + + implicit val cfgHas = configs.has.cfgWithNoSuspended + + val ordersFromBacklog = for { + implicit0(cfmmOrders: CFMMOrders[IO]) <- CFMMOrdersGenerator.genMapBased[IO, IO] + orders <- OrdersGenerator.genSwapOrders[IO](normalOrdersQty + outdatedOrdersQty) + outdatedOrders = + orders + .take(outdatedOrdersQty) + .map(order => + order.copy(timestamp = order.timestamp - configs.backlogCfgWithNoSuspended.orderLifetime.toMillis * 4) + ) + normalOrders = orders.drop(outdatedOrdersQty) + backlog <- CFMMBacklog.make[IO, IO] + _ <- (normalOrders ++ outdatedOrders).traverse(backlog.put) + fromBacklog <- normalOrders.traverse(_ => backlog.get) + } yield fromBacklog + + val ordersOpts = ordersFromBacklog.unsafeRunSync() + + val orders = ordersOpts.collect[CFMMOrder, List[CFMMOrder]] { case Some(order) => + order + } + + orders.length shouldBe normalOrdersQty + + checkDescSort(orders.map(WeightedOrder.fromOrder)) shouldBe true + } + "CFMMBacklog" should "correctly process orders by weight during pipeline" in { + val ordersQty = 100 + + implicit val cfgHas = configs.has.cfgWithNoSuspended + + val ordersFromBacklog = for { + implicit0(cfmmOrders: CFMMOrders[IO]) <- CFMMOrdersGenerator.genMapBased[IO, IO] + orders <- OrdersGenerator.genSwapOrders[IO](ordersQty) + backlog <- CFMMBacklog.make[IO, IO] + _ <- orders.traverse(backlog.put) + fromBacklog <- orders.traverse(_ => backlog.get) + } yield fromBacklog + + val ordersOpts = ordersFromBacklog.unsafeRunSync() + + val orders = ordersOpts.collect[CFMMOrder, List[CFMMOrder]] { case Some(order) => + order + } + + orders.length shouldBe ordersQty + + checkDescSort(orders.map(WeightedOrder.fromOrder)) shouldBe true + } +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/Ordering.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/Ordering.scala new file mode 100644 index 00000000..68e9b7d8 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/Ordering.scala @@ -0,0 +1,14 @@ +package org.ergoplatform.dex.executor.amm.utils + +import scala.math.Ordering.Implicits.infixOrderingOps + +object Ordering { + + def checkDescSort[A: Ordering](in: List[A]): Boolean = + (in.headOption, in.tail.headOption) match { + case (Some(head), Some(nextAfterHead)) if head >= nextAfterHead => + checkDescSort(in.tail) + case (Some(_), None) => true + case _ => false + } +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/package.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/package.scala new file mode 100644 index 00000000..3589cc32 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/package.scala @@ -0,0 +1,31 @@ +package org.ergoplatform.dex.executor.amm + +import cats.effect.IO +import tofu.generate.GenRandom +import tofu.lift.IsoK + +import scala.util.Random + +package object utils { + + object genRandoms { + + implicit val genRandom: GenRandom[IO] = new GenRandom[IO] { + + override def nextLong: IO[Long] = + IO.delay(Random.nextLong()) + + override def nextInt(n: Int): IO[Int] = + IO.delay(Random.nextInt(n)) + } + } + + object isoK { + + implicit val isokIO2IO: IsoK[IO, IO] = new IsoK[IO, IO] { + override def to[A](fa: IO[A]): IO[A] = fa + + override def from[A](ga: IO[A]): IO[A] = ga + } + } +} diff --git a/modules/cache/src/main/scala/org/ergoplatform/common/cache/Cache.scala b/modules/cache/src/main/scala/org/ergoplatform/common/cache/Cache.scala index 69ec5c9f..4fcc27aa 100644 --- a/modules/cache/src/main/scala/org/ergoplatform/common/cache/Cache.scala +++ b/modules/cache/src/main/scala/org/ergoplatform/common/cache/Cache.scala @@ -2,18 +2,21 @@ package org.ergoplatform.common.cache import cats.data.OptionT import cats.syntax.either._ +import cats.syntax.traverse._ import cats.syntax.show._ -import cats.{Functor, Monad, Show} +import cats.{Functor, Monad, MonadError, Show} import derevo.derive import derevo.tagless.applyK +import dev.profunktor.redis4cats.data.KeyScanCursor import dev.profunktor.redis4cats.hlist.{HList, Witness} -import org.ergoplatform.common.cache.errors.{BinaryDecodingFailed, BinaryEncodingFailed} +import org.ergoplatform.common.cache.errors.{BinaryDecodingFailed, BinaryEncodingFailed, ValueNotFound} import scodec.Codec import scodec.bits.BitVector import tofu.BracketThrow import tofu.higherKind.Mid import tofu.logging.{Loggable, Logging, Logs} import tofu.syntax.logging._ +import tofu.syntax.loggable._ import tofu.syntax.monadic._ import tofu.syntax.raise._ @@ -26,6 +29,10 @@ trait Cache[F[_]] { def del[K: Codec: Loggable](key: K): F[Unit] + def exists[K: Codec: Loggable](key: K): F[Boolean] + + def getAll[V: Codec: Loggable]: F[List[V]] + def flushAll: F[Unit] def transaction[T <: HList](commands: T)(implicit w: Witness[T]): F[Unit] @@ -33,6 +40,8 @@ trait Cache[F[_]] { object Cache { + implicit val loggable: Loggable[Array[Byte]] = Loggable.empty + def make[I[_]: Functor, F[_]: Monad: BracketThrow](implicit redis: Redis.Plain[F], makeTx: MakeRedisTransaction[F], @@ -43,7 +52,7 @@ object Cache { } final class Redis[ - F[_]: Monad: BinaryEncodingFailed.Raise: BinaryDecodingFailed.Raise: BracketThrow + F[_]: Monad: BinaryEncodingFailed.Raise: BinaryDecodingFailed.Raise: ValueNotFound.Raise: BracketThrow ](implicit redis: Redis.Plain[F], makeTx: MakeRedisTransaction[F]) extends Cache[F] { @@ -73,15 +82,7 @@ object Cache { .leftMap(err => BinaryEncodingFailed(key.show, err.messageWithContext)) .toRaise ) - raw <- OptionT(redis.get(k.toByteArray)) - value <- OptionT.liftF( - Codec[V] - .decode(BitVector(raw)) - .toEither - .map(_.value) - .leftMap(err => BinaryDecodingFailed(key.show, err.messageWithContext)) - .toRaise - ) + value <- getValue[V](k.toByteArray) } yield value).value def del[K: Codec: Loggable](key: K): F[Unit] = @@ -95,8 +96,47 @@ object Cache { def flushAll: F[Unit] = redis.flushAll + def exists[K: Codec: Loggable](key: K): F[Boolean] = + Codec[K] + .encode(key) + .toEither + .leftMap(err => BinaryEncodingFailed(key.show, err.messageWithContext)) + .toRaise + .flatMap(k => redis.exists(k.toByteArray)) + def transaction[T <: HList](commands: T)(implicit w: Witness[T]): F[Unit] = makeTx.make.use(_.exec(commands).void) + + def getAll[V: Codec: Loggable]: F[List[V]] = { + def iterate(acc: List[V], scanner: KeyScanCursor[Array[Byte]]): F[List[V]] = + for { + elems <- scanner.keys + .traverse(key => + getValue[V](key).value >>= { + case Some(elem) => elem.pure + case None => ValueNotFound(key.logShow).raise[F, V] + } + ) + newAcc = acc ++ elems + toReturn <- + if (scanner.isFinished) newAcc.pure + else redis.scan(scanner) >>= (iterate(newAcc, _)) + } yield toReturn + + redis.scan >>= (iterate(List.empty, _)) + } + + private def getValue[V: Codec: Loggable](key: Array[Byte]) = for { + raw <- OptionT(redis.get(key)) + value <- OptionT.liftF( + Codec[V] + .decode(BitVector(raw)) + .toEither + .map(_.value) + .leftMap(err => BinaryDecodingFailed(key.show, err.messageWithContext)) + .toRaise + ) + } yield value } final class CacheTracing[F[_]: Monad: Logging] extends Cache[Mid[F, *]] { @@ -115,5 +155,11 @@ object Cache { def transaction[T <: HList](commands: T)(implicit w: Witness[T]): Mid[F, Unit] = fa => trace"transaction begin" >> fa.flatTap(_ => trace"transaction end") + + def exists[K: Codec: Loggable](key: K): Mid[F, Boolean] = + _ >>= (r => trace"exists(key=$key) -> $r" as r) + + def getAll[V: Codec: Loggable]: Mid[F, List[V]] = + _ >>= (r => trace"getAll() -> length: ${r.length}" as r) } } diff --git a/modules/cache/src/main/scala/org/ergoplatform/common/cache/errors.scala b/modules/cache/src/main/scala/org/ergoplatform/common/cache/errors.scala index 3e265d4e..c8a25a74 100644 --- a/modules/cache/src/main/scala/org/ergoplatform/common/cache/errors.scala +++ b/modules/cache/src/main/scala/org/ergoplatform/common/cache/errors.scala @@ -13,4 +13,9 @@ object errors { extends Exception(s"Failed to decode value {$showValue}. $reason") object BinaryDecodingFailed extends Errors.Companion[BinaryDecodingFailed] + + final case class ValueNotFound(key: String) + extends Exception(s"Failed to get value by key {$key}.") + + object ValueNotFound extends Errors.Companion[ValueNotFound] } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala index d85ef88c..c5842115 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala @@ -3,6 +3,7 @@ package org.ergoplatform.dex.domain.amm import derevo.circe.{decoder, encoder} import derevo.derive import org.ergoplatform.ergo.domain.Output +import scodec.Codec import tofu.logging.derivation.loggable @derive(encoder, decoder, loggable) @@ -26,3 +27,8 @@ final case class Redeem(poolId: PoolId, maxMinerFee: Long, timestamp: Long, para @derive(encoder, decoder, loggable) final case class Swap(poolId: PoolId, maxMinerFee: Long, timestamp: Long, params: SwapParams, box: Output) extends CFMMOrder + +object CFMMOrder { + + implicit val codec: Codec[CFMMOrder] = implicitly +} diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala index b6a6c9b0..447e0340 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala @@ -12,6 +12,7 @@ import org.ergoplatform.common.HexString import org.ergoplatform.dex.domain.amm.PoolId import org.ergoplatform.ergo.{BoxId, TokenId} import scodec.bits.ByteVector +import scodec.codecs.{uint16, utf8} import sttp.tapir.{Codec, Schema, Validator} import tofu.logging.derivation.loggable @@ -83,6 +84,24 @@ package object amm { implicit val get: Get[OrderId] = deriving implicit val put: Put[OrderId] = deriving + implicit def codec: scodec.Codec[OrderId] = + scodec.codecs.variableSizeBits(uint16, utf8).xmap(OrderId(_), _.value) + + implicit def recordSerializer[F[_]: Sync]: RecordSerializer[F, OrderId] = serializerViaCirceEncoder + implicit def recordDeserializer[F[_]: Sync]: RecordDeserializer[F, OrderId] = deserializerViaKafkaDecoder + } + + @derive(show, loggable, encoder, decoder) + case class WeightedOrder(weight: Long, orderId: OrderId, timestamp: Long) + + object WeightedOrder { + + def fromOrder(order: CFMMOrder): WeightedOrder = + WeightedOrder(order.maxMinerFee, order.id, order.timestamp) + + implicit val ord: Ordering[WeightedOrder] = + (x: WeightedOrder, y: WeightedOrder) => x.weight compare y.weight + implicit def recordSerializer[F[_]: Sync]: RecordSerializer[F, OrderId] = serializerViaCirceEncoder implicit def recordDeserializer[F[_]: Sync]: RecordDeserializer[F, OrderId] = deserializerViaKafkaDecoder }