Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backlog service #57

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .bsp/sbt.json

This file was deleted.

2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ project/metals.sbt

config.env

/.bsp/
.bsp/
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ lazy val ammExecutor = utils
)
.settings(nativePackagerSettings("amm-executor"))
.enablePlugins(JavaAppPackaging, UniversalPlugin, DockerPlugin)
.dependsOn(Seq(core, http).map(_ % allConfigDependency): _*)
.dependsOn(Seq(core, http, cache).map(_ % allConfigDependency): _*)

lazy val poolResolver = utils
.mkModule("pool-resolver", "PoolResolver")
Expand Down
4 changes: 3 additions & 1 deletion modules/amm-executor/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ rotation.retry-delay = 120s

exchange.reward-address = "9gCigPc9cZNRhKgbgdmTkVxo1ZKgw79G8DvLjCcYWAvEF3XRUKy"

execution.order-lifetime = 300s
backlogConfig.order-lifetime = 300s
backlogConfig.order-execution-time = 180s
backlogConfig.suspended-probability = 10

monetary.miner-fee = 2000000
monetary.min-dex-fee = 1000000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import fs2.kafka.RecordDeserializer
import fs2.kafka.serde._
import org.ergoplatform.ErgoAddressEncoder
import org.ergoplatform.common.EnvApp
import org.ergoplatform.common.cache.{Cache, CacheStreaming, MakeRedisTransaction, Redis}
import org.ergoplatform.common.streaming._
import org.ergoplatform.dex.configs.ConsumerConfig
import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId}
import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId}
import org.ergoplatform.dex.executor.amm.config.ConfigBundle
import org.ergoplatform.dex.executor.amm.context.AppContext
import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMInterpreter, T2TCFMMInterpreter}
import org.ergoplatform.dex.executor.amm.processes.Executor
import org.ergoplatform.dex.executor.amm.repositories.CFMMPools
import org.ergoplatform.dex.executor.amm.services.Execution
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMProducerRetries}
import org.ergoplatform.dex.executor.amm.repositories.{CFMMOrders, CFMMPools}
import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution}
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMHistConsumer, CFMMProducerRetries}
import org.ergoplatform.dex.protocol.amm.AMMType.{CFMMType, N2T_CFMM, T2T_CFMM}
import org.ergoplatform.ergo.modules.ErgoNetwork
import org.ergoplatform.ergo.services.explorer.{ErgoExplorer, ErgoExplorerStreaming}
Expand All @@ -27,6 +28,7 @@ import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend
import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend
import tofu.WithRun
import tofu.fs2Instances._
import tofu.generate.GenRandom
import tofu.lift.IsoK
import tofu.syntax.unlift._
import zio.interop.catz._
Expand All @@ -40,17 +42,22 @@ object App extends EnvApp[AppContext] {
appF.run(ctx) as ExitCode.success
}.orDie

implicit val mtx: MakeRedisTransaction[RunF] = MakeRedisTransaction.make[RunF]

private def init(configPathOpt: Option[String]): Resource[InitF, (Executor[StreamF], AppContext)] =
for {
blocker <- Blocker[InitF]
configs <- Resource.eval(ConfigBundle.load[InitF](configPathOpt, blocker))
implicit0(genRand: GenRandom[RunF]) <- Resource.eval(GenRandom.instance[InitF, RunF]())
ctx = AppContext.init(configs)
implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx)
implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix)
implicit0(confirmedOrders: CFMMConsumerIn[StreamF, RunF, Confirmed]) =
makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders)
implicit0(unconfirmedOrders: CFMMConsumerIn[StreamF, RunF, Unconfirmed]) =
makeConsumer[OrderId, Unconfirmed[CFMMOrder]](configs.consumers.unconfirmedOrders)
implicit0(ammHistCons: CFMMHistConsumer[StreamF, RunF]) =
makeConsumer[OrderId, Option[EvaluatedCFMMOrder.Any]](configs.consumers.cfmmHistory)
implicit0(consumerRetries: CFMMConsumerRetries[StreamF, RunF]) =
makeConsumer[OrderId, Delayed[CFMMOrder]](configs.consumers.ordersRetry)
implicit0(orders: CFMMConsumerIn[StreamF, RunF, Id]) =
Expand All @@ -66,8 +73,14 @@ object App extends EnvApp[AppContext] {
implicit0(t2tInt: CFMMInterpreter[T2T_CFMM, RunF]) <- Resource.eval(T2TCFMMInterpreter.make[InitF, RunF])
implicit0(n2tInt: CFMMInterpreter[N2T_CFMM, RunF]) <- Resource.eval(N2TCFMMInterpreter.make[InitF, RunF])
implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) = CFMMInterpreter.make[RunF]
implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF])
executor <- Resource.eval(Executor.make[InitF, StreamF, RunF])
implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF])
implicit0(redis: Redis.Plain[RunF]) <- Redis.make[InitF, RunF](configs.redis)
implicit0(cache: Cache[RunF]) <- Resource.eval(Cache.make[InitF, RunF])
implicit0(cfmmOrders: CFMMOrders[RunF]) <- Resource.eval[InitF, CFMMOrders[RunF]](CFMMOrders.make[InitF, RunF])
implicit0(cacheStreaming: CacheStreaming[StreamF]) <- Resource.eval(CacheStreaming.make[InitF, StreamF, RunF])
implicit0(cfmmBacklog: CFMMBacklog[RunF]) <-
Resource.eval[InitF, CFMMBacklog[RunF]](CFMMBacklog.make[InitF, StreamF, RunF])
executor <- Resource.eval(Executor.make[InitF, StreamF, RunF])
} yield executor -> ctx

private def makeBackend(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.ergoplatform.dex.executor.amm.config

import derevo.derive
import derevo.pureconfig.pureconfigReader
import tofu.Context
import tofu.logging.derivation.loggable

import scala.concurrent.duration.FiniteDuration

@derive(pureconfigReader, loggable)
final case class BacklogConfig(
orderLifetime: FiniteDuration,
orderExecutionTime: FiniteDuration,
suspendedOrdersExecutionProbabilityPercent: Int
)

object BacklogConfig extends Context.Companion[BacklogConfig]
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package org.ergoplatform.dex.executor.amm.config

import derevo.derive
import derevo.pureconfig.pureconfigReader
import org.ergoplatform.common.cache.RedisConfig
import org.ergoplatform.common.streaming.RotationConfig
import org.ergoplatform.dex.configs._
import tofu.Context
import tofu.logging.derivation.loggable
import tofu.optics.macros.{promote, ClassyOptics}
import tofu.optics.macros.{ClassyOptics, promote}

@derive(pureconfigReader, loggable)
@ClassyOptics
Expand All @@ -16,7 +17,9 @@ final case class ConfigBundle(
@promote execution: ExecutionConfig,
@promote monetary: MonetaryConfig,
@promote protocol: ProtocolConfig,
@promote backlogConfig: BacklogConfig,
consumers: Consumers,
redis: RedisConfig,
producers: Producers,
@promote kafka: KafkaConfig,
@promote network: NetworkConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import tofu.logging.derivation.loggable
@derive(pureconfigReader, loggable)
final case class Consumers(
confirmedOrders: ConsumerConfig,
cfmmHistory: ConsumerConfig,
unconfirmedOrders: ConsumerConfig,
ordersRetry: ConsumerConfig
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import tofu.logging.derivation.loggable
import scala.concurrent.duration.FiniteDuration

@derive(pureconfigReader, loggable)
final case class ExecutionConfig(orderLifetime: FiniteDuration)
final case class ExecutionConfig(order: FiniteDuration)

object ExecutionConfig extends Context.Companion[ExecutionConfig]

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package org.ergoplatform.dex.executor.amm.processes

import cats.effect.Clock
import cats.effect.{Clock, Timer}
import cats.syntax.option._
import cats.{Functor, Monad}
import cats.syntax.traverse._
import cats.{Defer, Functor, Monad, SemigroupK}
import derevo.derive
import mouse.any._
import org.ergoplatform.common.TraceId
import org.ergoplatform.common.streaming.syntax._
import org.ergoplatform.dex.domain.amm.CFMMOrder
import org.ergoplatform.dex.executor.amm.config.ExecutionConfig
import org.ergoplatform.dex.executor.amm.services.Execution
import org.ergoplatform.dex.executor.amm.streaming.CFMMCircuit
import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution}
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMHistConsumer}
import org.ergoplatform.ergo.services.explorer.TxSubmissionErrorParser
import tofu.Catches
import tofu.higherKind.derived.representableK
import tofu.logging.{Logging, Logs}
import tofu.streams.Evals
import tofu.streams.{Evals, ParFlatten}
import tofu.syntax.context._
import tofu.syntax.embed._
import tofu.syntax.handle._
Expand All @@ -34,10 +35,12 @@ object Executor {

def make[
I[_]: Functor,
F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has,
G[_]: Monad: TraceId.Local: Clock: Catches
F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has: Defer: SemigroupK: ParFlatten,
G[_]: Monad: TraceId.Local: Clock: Catches: Timer
](implicit
orders: CFMMCircuit[F, G],
executedOrders: CFMMHistConsumer[F, G],
cfmmBacklog: CFMMBacklog[G],
service: Execution[G],
logs: Logs[I, G]
): I[Executor[F]] =
Expand All @@ -48,34 +51,56 @@ object Executor {
}

final private class Live[
F[_]: Monad: Evals[*[_], G],
G[_]: Monad: Logging: TraceId.Local: Clock: Catches
F[_]: Monad: Evals[*[_], G]: Defer: SemigroupK: ParFlatten,
G[_]: Monad: Logging: Catches: Timer
](conf: ExecutionConfig)(implicit
orders: CFMMCircuit[F, G],
executedOrders: CFMMHistConsumer[F, G],
backlog: CFMMBacklog[G],
service: Execution[G],
errParser: TxSubmissionErrorParser
) extends Executor[F] {

def run: F[Unit] =
emits(
List(
addToBacklog,
executeOrders,
dropExecuted
)
).parFlattenUnbounded

def addToBacklog: F[Unit] =
orders.stream
.evalMap { rec =>
service
.executeAttempt(rec.message)
.handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder])
.local(_ => TraceId.fromString(rec.message.id.value))
.tupleLeft(rec)
.evalTap(orderRec =>
backlog.put(orderRec.message)
.handleWith[Throwable](e => warnCause"Attempt to add order to backlog failed." (e))
)
.evalMap(_.commit)
GusevTimofey marked this conversation as resolved.
Show resolved Hide resolved

def dropExecuted: F[Unit] =
executedOrders.stream
.evalTap { rec =>
rec.message.traverse(order =>
backlog.drop(order.order.id)
.handleWith[Throwable](e => warnCause"Attempt to drop order from backlog failed." (e))
)
}
.flatTap {
case (_, None) => unit[F]
case (_, Some(order)) =>
eval(now.millis) >>= {
case ts if ts - order.timestamp < conf.orderLifetime.toMillis =>
eval(warn"Failed to execute $order. Going to retry.") >>
orders.retry((order.id -> order).pure[F])
case _ =>
eval(warn"Failed to execute $order. Order expired.")
}
.evalMap(_.commit)
GusevTimofey marked this conversation as resolved.
Show resolved Hide resolved

def executeOrders: F[Unit] =
eval(backlog.get).evalMap {
case Some(order) => executeOrder(order)
case None => trace"No orders to execute. Going to wait for" >> Timer[G].sleep(conf.order)
}.repeat

private def executeOrder(order: CFMMOrder): G[Unit] =
service
.executeAttempt(order)
.handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder])
.flatMap {
case Some(order) => backlog.suspend(order)
case None => backlog.checkLater(order)
}
.evalMap { case (rec, _) => rec.commit }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.ergoplatform.dex.executor.amm.repositories

import cats.{FlatMap, Functor}
import derevo.derive
import org.ergoplatform.common.cache.Cache
import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId}
import org.ergoplatform.dex.executor.amm.repositories.CFMMPools.{CFMMPoolsTracing, Live}
import tofu.higherKind.Mid
import tofu.higherKind.derived.representableK
import tofu.logging.{Logging, Logs}
import tofu.syntax.logging._
import tofu.syntax.monadic._

@derive(representableK)
trait CFMMOrders[F[_]] {

def put(order: CFMMOrder): F[Unit]

def exists(orderId: OrderId): F[Boolean]

def drop(orderId: OrderId): F[Unit]

def get(orderId: OrderId): F[Option[CFMMOrder]]

def getAll: F[List[CFMMOrder]]
}

object CFMMOrders {

def make[I[_]: Functor, F[_]: FlatMap](implicit logs: Logs[I, F], cache: Cache[F]): I[CFMMOrders[F]] =
logs.forService[CFMMOrders[F]].map { implicit logging =>
new CFMMOrdersTracingMid[F] attach new Live[F](cache)
}

final private class Live[F[_]](cache: Cache[F]) extends CFMMOrders[F] {

def put(order: CFMMOrder): F[Unit] = cache.set(order.id, order)

def exists(orderId: OrderId): F[Boolean] = cache.exists(orderId)

def drop(orderId: OrderId): F[Unit] = cache.del(orderId)

def get(orderId: OrderId): F[Option[CFMMOrder]] = cache.get[OrderId, CFMMOrder](orderId)

def getAll: F[List[CFMMOrder]] = cache.getAll
}

final private class CFMMOrdersTracingMid[F[_]: FlatMap: Logging] extends CFMMOrders[Mid[F, *]] {

def put(order: CFMMOrder): Mid[F, Unit] = for {
_ <- trace"put(order=$order)"
r <- _
_ <- trace"put(order=$order) -> $r"
} yield r

def exists(orderId: OrderId): Mid[F, Boolean] = for {
_ <- trace"exists(orderId=$orderId)"
r <- _
_ <- trace"exists(orderId=$orderId) -> $r"
} yield r

def drop(orderId: OrderId): Mid[F, Unit] = for {
_ <- trace"drop(orderId=$orderId)"
r <- _
_ <- trace"drop(orderId=$orderId) -> $r"
} yield r

def get(orderId: OrderId): Mid[F, Option[CFMMOrder]] = for {
_ <- trace"checkLater(order=$orderId)"
r <- _
_ <- trace"checkLater(order=$orderId) -> $r"
} yield r

def getAll: Mid[F, List[CFMMOrder]] = for {
_ <- trace"getAll()"
r <- _
_ <- trace"getAll() -> length: ${r.length}"
} yield r
}
}
Loading