Skip to content

Commit

Permalink
backlog service
Browse files Browse the repository at this point in the history
  • Loading branch information
Bromel777 committed Jul 26, 2022
1 parent 0923431 commit 14aceb1
Show file tree
Hide file tree
Showing 23 changed files with 808 additions and 70 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ lazy val ammExecutor = utils
)
.settings(nativePackagerSettings("amm-executor"))
.enablePlugins(JavaAppPackaging, UniversalPlugin, DockerPlugin)
.dependsOn(Seq(core, http).map(_ % allConfigDependency): _*)
.dependsOn(Seq(core, http, cache).map(_ % allConfigDependency): _*)

lazy val poolResolver = utils
.mkModule("pool-resolver", "PoolResolver")
Expand Down
4 changes: 3 additions & 1 deletion modules/amm-executor/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ rotation.retry-delay = 120s

exchange.reward-address = "9gCigPc9cZNRhKgbgdmTkVxo1ZKgw79G8DvLjCcYWAvEF3XRUKy"

execution.order-lifetime = 300s
backlogConfig.order-lifetime = 300s
backlogConfig.order-execution-time = 180s
backlogConfig.suspended-probability = 10

monetary.miner-fee = 2000000
monetary.min-dex-fee = 1000000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import fs2.kafka.RecordDeserializer
import fs2.kafka.serde._
import org.ergoplatform.ErgoAddressEncoder
import org.ergoplatform.common.EnvApp
import org.ergoplatform.common.cache.{Cache, MakeRedisTransaction, Redis}
import org.ergoplatform.common.streaming._
import org.ergoplatform.dex.configs.ConsumerConfig
import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId}
import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId}
import org.ergoplatform.dex.executor.amm.config.ConfigBundle
import org.ergoplatform.dex.executor.amm.context.AppContext
import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMInterpreter, T2TCFMMInterpreter}
import org.ergoplatform.dex.executor.amm.processes.Executor
import org.ergoplatform.dex.executor.amm.repositories.CFMMPools
import org.ergoplatform.dex.executor.amm.services.Execution
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMProducerRetries}
import org.ergoplatform.dex.executor.amm.repositories.{CFMMOrders, CFMMPools}
import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution}
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMHistConsumer, CFMMProducerRetries}
import org.ergoplatform.dex.protocol.amm.AMMType.{CFMMType, N2T_CFMM, T2T_CFMM}
import org.ergoplatform.ergo.modules.ErgoNetwork
import org.ergoplatform.ergo.services.explorer.{ErgoExplorer, ErgoExplorerStreaming}
Expand All @@ -27,6 +28,7 @@ import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend
import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend
import tofu.WithRun
import tofu.fs2Instances._
import tofu.generate.GenRandom
import tofu.lift.IsoK
import tofu.syntax.unlift._
import zio.interop.catz._
Expand All @@ -40,17 +42,22 @@ object App extends EnvApp[AppContext] {
appF.run(ctx) as ExitCode.success
}.orDie

implicit val mtx: MakeRedisTransaction[RunF] = MakeRedisTransaction.make[RunF]

private def init(configPathOpt: Option[String]): Resource[InitF, (Executor[StreamF], AppContext)] =
for {
blocker <- Blocker[InitF]
configs <- Resource.eval(ConfigBundle.load[InitF](configPathOpt, blocker))
implicit0(genRand: GenRandom[RunF]) <- Resource.eval(GenRandom.instance[InitF, RunF]())
ctx = AppContext.init(configs)
implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx)
implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix)
implicit0(confirmedOrders: CFMMConsumerIn[StreamF, RunF, Confirmed]) =
makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders)
implicit0(unconfirmedOrders: CFMMConsumerIn[StreamF, RunF, Unconfirmed]) =
makeConsumer[OrderId, Unconfirmed[CFMMOrder]](configs.consumers.unconfirmedOrders)
implicit0(ammHistCons: CFMMHistConsumer[StreamF, RunF]) =
makeConsumer[OrderId, Option[EvaluatedCFMMOrder.Any]](configs.consumers.cfmmHistory)
implicit0(consumerRetries: CFMMConsumerRetries[StreamF, RunF]) =
makeConsumer[OrderId, Delayed[CFMMOrder]](configs.consumers.ordersRetry)
implicit0(orders: CFMMConsumerIn[StreamF, RunF, Id]) =
Expand All @@ -66,8 +73,13 @@ object App extends EnvApp[AppContext] {
implicit0(t2tInt: CFMMInterpreter[T2T_CFMM, RunF]) <- Resource.eval(T2TCFMMInterpreter.make[InitF, RunF])
implicit0(n2tInt: CFMMInterpreter[N2T_CFMM, RunF]) <- Resource.eval(N2TCFMMInterpreter.make[InitF, RunF])
implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) = CFMMInterpreter.make[RunF]
implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF])
executor <- Resource.eval(Executor.make[InitF, StreamF, RunF])
implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF])
implicit0(redis: Redis.Plain[RunF]) <- Redis.make[InitF, RunF](configs.redis)
implicit0(cache: Cache[RunF]) <- Resource.eval(Cache.make[InitF, RunF])
implicit0(cfmmOrders: CFMMOrders[RunF]) <- Resource.eval[InitF, CFMMOrders[RunF]](CFMMOrders.make[InitF, RunF])
implicit0(cfmmBacklog: CFMMBacklog[RunF]) <-
Resource.eval[InitF, CFMMBacklog[RunF]](CFMMBacklog.make[InitF, RunF])
executor <- Resource.eval(Executor.make[InitF, StreamF, RunF])
} yield executor -> ctx

private def makeBackend(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.ergoplatform.dex.executor.amm.config

import derevo.derive
import derevo.pureconfig.pureconfigReader
import tofu.Context
import tofu.logging.derivation.loggable

import scala.concurrent.duration.FiniteDuration

@derive(pureconfigReader, loggable)
final case class BacklogConfig(
orderLifetime: FiniteDuration,
orderExecutionTime: FiniteDuration,
suspendedProbability: Int
)

object BacklogConfig extends Context.Companion[BacklogConfig]
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package org.ergoplatform.dex.executor.amm.config

import derevo.derive
import derevo.pureconfig.pureconfigReader
import org.ergoplatform.common.cache.RedisConfig
import org.ergoplatform.common.streaming.RotationConfig
import org.ergoplatform.dex.configs._
import tofu.Context
import tofu.logging.derivation.loggable
import tofu.optics.macros.{promote, ClassyOptics}
import tofu.optics.macros.{ClassyOptics, promote}

@derive(pureconfigReader, loggable)
@ClassyOptics
Expand All @@ -16,7 +17,9 @@ final case class ConfigBundle(
@promote execution: ExecutionConfig,
@promote monetary: MonetaryConfig,
@promote protocol: ProtocolConfig,
@promote backlogConfig: BacklogConfig,
consumers: Consumers,
redis: RedisConfig,
producers: Producers,
@promote kafka: KafkaConfig,
@promote network: NetworkConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import tofu.logging.derivation.loggable
@derive(pureconfigReader, loggable)
final case class Consumers(
confirmedOrders: ConsumerConfig,
cfmmHistory: ConsumerConfig,
unconfirmedOrders: ConsumerConfig,
ordersRetry: ConsumerConfig
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import tofu.logging.derivation.loggable
import scala.concurrent.duration.FiniteDuration

@derive(pureconfigReader, loggable)
final case class ExecutionConfig(orderLifetime: FiniteDuration)
final case class ExecutionConfig(order: FiniteDuration)

object ExecutionConfig extends Context.Companion[ExecutionConfig]

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package org.ergoplatform.dex.executor.amm.processes

import cats.effect.Clock
import cats.effect.{Clock, Timer}
import cats.syntax.option._
import cats.{Functor, Monad}
import cats.syntax.traverse._
import cats.{Defer, Functor, Monad, SemigroupK}
import derevo.derive
import mouse.any._
import org.ergoplatform.common.TraceId
import org.ergoplatform.common.streaming.syntax._
import org.ergoplatform.dex.domain.amm.CFMMOrder
import org.ergoplatform.dex.executor.amm.config.ExecutionConfig
import org.ergoplatform.dex.executor.amm.services.Execution
import org.ergoplatform.dex.executor.amm.streaming.CFMMCircuit
import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution}
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMHistConsumer}
import org.ergoplatform.ergo.services.explorer.TxSubmissionErrorParser
import tofu.Catches
import tofu.higherKind.derived.representableK
import tofu.logging.{Logging, Logs}
import tofu.streams.Evals
import tofu.streams.{Evals, ParFlatten}
import tofu.syntax.context._
import tofu.syntax.embed._
import tofu.syntax.handle._
Expand All @@ -34,10 +35,12 @@ object Executor {

def make[
I[_]: Functor,
F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has,
G[_]: Monad: TraceId.Local: Clock: Catches
F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has: Defer: SemigroupK: ParFlatten,
G[_]: Monad: TraceId.Local: Clock: Catches: Timer
](implicit
orders: CFMMCircuit[F, G],
executedOrders: CFMMHistConsumer[F, G],
cfmmBacklog: CFMMBacklog[G],
service: Execution[G],
logs: Logs[I, G]
): I[Executor[F]] =
Expand All @@ -48,34 +51,50 @@ object Executor {
}

final private class Live[
F[_]: Monad: Evals[*[_], G],
G[_]: Monad: Logging: TraceId.Local: Clock: Catches
F[_]: Monad: Evals[*[_], G]: Defer: SemigroupK: ParFlatten,
G[_]: Monad: Logging: Catches: Timer
](conf: ExecutionConfig)(implicit
orders: CFMMCircuit[F, G],
executedOrders: CFMMHistConsumer[F, G],
backlog: CFMMBacklog[G],
service: Execution[G],
errParser: TxSubmissionErrorParser
) extends Executor[F] {

def run: F[Unit] =
emits(
List(
addToBacklog,
executeOrders,
dropExecuted
)
).parFlattenUnbounded

def addToBacklog: F[Unit] =
orders.stream
.evalMap { rec =>
service
.executeAttempt(rec.message)
.handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder])
.local(_ => TraceId.fromString(rec.message.id.value))
.tupleLeft(rec)
.evalTap(orderRec => backlog.put(orderRec.message))
.evalMap(_.commit)

def dropExecuted: F[Unit] =
executedOrders.stream
.evalTap { rec =>
rec.message.traverse(order => backlog.drop(order.order.id))
}
.flatTap {
case (_, None) => unit[F]
case (_, Some(order)) =>
eval(now.millis) >>= {
case ts if ts - order.timestamp < conf.orderLifetime.toMillis =>
eval(warn"Failed to execute $order. Going to retry.") >>
orders.retry((order.id -> order).pure[F])
case _ =>
eval(warn"Failed to execute $order. Order expired.")
}
.evalMap(_.commit)

def executeOrders: F[Unit] =
eval(backlog.get).evalMap {
case Some(order) => executeOrder(order)
case None => trace"No orders to execute. Going to wait for" >> Timer[G].sleep(conf.order)
}.repeat

private def executeOrder(order: CFMMOrder): G[Unit] =
service
.executeAttempt(order)
.handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder])
.flatMap {
case Some(order) => backlog.suspend(order)
case None => ().pure[G]
}
.evalMap { case (rec, _) => rec.commit }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.ergoplatform.dex.executor.amm.repositories

import cats.{FlatMap, Functor}
import derevo.derive
import org.ergoplatform.common.cache.Cache
import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId}
import org.ergoplatform.dex.executor.amm.repositories.CFMMPools.{CFMMPoolsTracing, Live}
import tofu.higherKind.Mid
import tofu.higherKind.derived.representableK
import tofu.logging.{Logging, Logs}
import tofu.syntax.logging._
import tofu.syntax.monadic._

@derive(representableK)
trait CFMMOrders[F[_]] {

def put(order: CFMMOrder): F[Unit]

def exists(orderId: OrderId): F[Boolean]

def drop(orderId: OrderId): F[Unit]

def get(orderId: OrderId): F[Option[CFMMOrder]]

def getAll: F[List[CFMMOrder]]
}

object CFMMOrders {

def make[I[_]: Functor, F[_]: FlatMap](implicit logs: Logs[I, F], cache: Cache[F]): I[CFMMOrders[F]] =
logs.forService[CFMMOrders[F]].map { implicit logging =>
new CFMMOrdersTracingMid[F] attach new Live[F](cache)
}

final private class Live[F[_]](cache: Cache[F]) extends CFMMOrders[F] {

def put(order: CFMMOrder): F[Unit] = cache.set(order.id, order)

def exists(orderId: OrderId): F[Boolean] = cache.exists(orderId)

def drop(orderId: OrderId): F[Unit] = cache.del(orderId)

def get(orderId: OrderId): F[Option[CFMMOrder]] = cache.get[OrderId, CFMMOrder](orderId)

def getAll: F[List[CFMMOrder]] = cache.getAll
}

final private class CFMMOrdersTracingMid[F[_]: FlatMap: Logging] extends CFMMOrders[Mid[F, *]] {

def put(order: CFMMOrder): Mid[F, Unit] = for {
_ <- trace"put(order=$order)"
r <- _
_ <- trace"put(order=$order) -> $r"
} yield r

def exists(orderId: OrderId): Mid[F, Boolean] = for {
_ <- trace"exists(orderId=$orderId)"
r <- _
_ <- trace"exists(orderId=$orderId) -> $r"
} yield r

def drop(orderId: OrderId): Mid[F, Unit] = for {
_ <- trace"drop(orderId=$orderId)"
r <- _
_ <- trace"drop(orderId=$orderId) -> $r"
} yield r

def get(orderId: OrderId): Mid[F, Option[CFMMOrder]] = for {
_ <- trace"checkLater(order=$orderId)"
r <- _
_ <- trace"checkLater(order=$orderId) -> $r"
} yield r

def getAll: Mid[F, List[CFMMOrder]] = for {
_ <- trace"getAll()"
r <- _
_ <- trace"getAll() -> length: ${r.length}"
} yield r
}
}
Loading

0 comments on commit 14aceb1

Please sign in to comment.