diff --git a/README.md b/README.md index cd4534bb15..a6a1257359 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ interpreted as: * [Finatra](https://tapir.softwaremill.com/en/latest/server/finatra.html) `FinatraRoute` * [Play](https://tapir.softwaremill.com/en/latest/server/play.html) `Route` * [ZIO Http](https://tapir.softwaremill.com/en/latest/server/ziohttp.html) `Http` + * [Armeria](https://tapir.softwaremill.com/en/latest/server/armeria.html) `HttpServiceWithRoutes` * [aws](https://tapir.softwaremill.com/en/latest/server/aws.html) through Lambda/SAM/Terraform * a client, which is a function from input parameters to output parameters. Currently supported: diff --git a/build.sbt b/build.sbt index 84fd50e4ba..5bfd511d7f 100644 --- a/build.sbt +++ b/build.sbt @@ -140,6 +140,9 @@ lazy val allAggregates = core.projectRefs ++ redocBundle.projectRefs ++ serverTests.projectRefs ++ akkaHttpServer.projectRefs ++ + armeriaServer.projectRefs ++ + armeriaServerCats.projectRefs ++ + armeriaServerZio.projectRefs ++ http4sServer.projectRefs ++ sttpStubServer.projectRefs ++ sttpMockServer.projectRefs ++ @@ -816,6 +819,44 @@ lazy val akkaHttpServer: ProjectMatrix = (projectMatrix in file("server/akka-htt .jvmPlatform(scalaVersions = scala2Versions) .dependsOn(core, serverTests % Test) +lazy val armeriaServer: ProjectMatrix = (projectMatrix in file("server/armeria-server")) + .settings(commonJvmSettings) + .settings( + name := "tapir-armeria-server", + libraryDependencies ++= Seq( + "com.linecorp.armeria" % "armeria" % Versions.armeria, + "org.scala-lang.modules" %% "scala-java8-compat" % Versions.scalaJava8Compat, + "com.softwaremill.sttp.shared" %% "armeria" % Versions.sttpShared + ) + ) + .jvmPlatform(scalaVersions = scala2And3Versions) + .dependsOn(core, serverTests % Test) + +lazy val armeriaServerCats: ProjectMatrix = + (projectMatrix in file("server/armeria-server/cats")) + .settings(commonJvmSettings) + .settings( + name := "tapir-armeria-server-cats", + libraryDependencies ++= Seq( + "com.softwaremill.sttp.shared" %% "fs2" % Versions.sttpShared, + "co.fs2" %% "fs2-reactive-streams" % Versions.fs2 + ) + ) + .jvmPlatform(scalaVersions = scala2And3Versions) + .dependsOn(armeriaServer % "compile->compile;test->test", cats, serverTests % Test) + +lazy val armeriaServerZio: ProjectMatrix = + (projectMatrix in file("server/armeria-server/zio")) + .settings(commonJvmSettings) + .settings( + name := "tapir-armeria-server-zio", + libraryDependencies ++= Seq( + "dev.zio" %% "zio-interop-reactivestreams" % Versions.zioInteropReactiveStreams + ) + ) + .jvmPlatform(scalaVersions = scala2And3Versions) + .dependsOn(armeriaServer % "compile->compile;test->test", zio, serverTests % Test) + lazy val http4sServer: ProjectMatrix = (projectMatrix in file("server/http4s-server")) .settings(commonJvmSettings) .settings( @@ -1250,6 +1291,7 @@ lazy val examples: ProjectMatrix = (projectMatrix in file("examples")) .jvmPlatform(scalaVersions = examplesScalaVersions) .dependsOn( akkaHttpServer, + armeriaServer, http4sServer, http4sClient, sttpClient, @@ -1321,6 +1363,9 @@ lazy val documentation: ProjectMatrix = (projectMatrix in file("generated-doc")) .dependsOn( core % "compile->test", akkaHttpServer, + armeriaServer, + armeriaServerCats, + armeriaServerZio, circeJson, enumeratum, finatraServer, diff --git a/core/src/main/scala-2/sttp/tapir/macros/SchemaMacros.scala b/core/src/main/scala-2/sttp/tapir/macros/SchemaMacros.scala index bc084308cb..c8014fbb54 100644 --- a/core/src/main/scala-2/sttp/tapir/macros/SchemaMacros.scala +++ b/core/src/main/scala-2/sttp/tapir/macros/SchemaMacros.scala @@ -7,6 +7,10 @@ import sttp.tapir.generic.internal.{OneOfMacro, SchemaMagnoliaDerivation, Schema import sttp.tapir.internal.{ModifySchemaMacro, SchemaEnumerationMacro} trait SchemaMacros[T] { + + /** Modifies nested schemas for case classes and case class families (sealed traits / enums), accessible with `path`, using the given + * `modification` function. To traverse collections, use `.each`. + */ def modify[U](path: T => U)(modification: Schema[U] => Schema[U]): Schema[T] = macro ModifySchemaMacro.generateModify[T, U] } diff --git a/core/src/main/scala-3/sttp/tapir/macros/SchemaMacros.scala b/core/src/main/scala-3/sttp/tapir/macros/SchemaMacros.scala index f9ccc21430..d5c8706849 100644 --- a/core/src/main/scala-3/sttp/tapir/macros/SchemaMacros.scala +++ b/core/src/main/scala-3/sttp/tapir/macros/SchemaMacros.scala @@ -8,6 +8,10 @@ import magnolia1._ import scala.quoted.* trait SchemaMacros[T] { this: Schema[T] => + + /** Modifies nested schemas for case classes and case class families (sealed traits / enums), accessible with `path`, using the given + * `modification` function. To traverse collections, use `.each`. + */ inline def modify[U](inline path: T => U)(inline modification: Schema[U] => Schema[U]): Schema[T] = ${ SchemaMacros.modifyImpl[T, U]('this)('path)('modification) } diff --git a/doc/index.md b/doc/index.md index 47e6feb713..43fb51fd4c 100644 --- a/doc/index.md +++ b/doc/index.md @@ -11,6 +11,7 @@ input and output parameters. An endpoint specification can be interpreted as: * [Finatra](server/finatra.md) `http.Controller` * [Play](server/play.md) `Route` * [ZIO Http](server/ziohttp.md) `Http` + * [Armeria](server/armeria.md) `HttpServiceWithRoutes` * [aws](server/aws.md) through Lambda/SAM/Terraform * a client, which is a function from input parameters to output parameters. Currently supported: @@ -149,6 +150,7 @@ Development and maintenance of sttp tapir is sponsored by [SoftwareMill](https:/ server/play server/vertx server/ziohttp + server/armeria server/aws server/options server/interceptors diff --git a/doc/server/armeria.md b/doc/server/armeria.md new file mode 100644 index 0000000000..511825a745 --- /dev/null +++ b/doc/server/armeria.md @@ -0,0 +1,201 @@ +# Running as an Armeria server + +Endpoints can be mounted as `TapirService[S, F]` on top of [Armeria](https://armeria.dev)'s `HttpServiceWithRoutes`. + +Armeria interpreter can be used with different effect systems (cats-effect, ZIO) as well as Scala's standard `Future`. + +## Scala's standard `Future` + +Add the following dependency +```scala +"com.softwaremill.sttp.tapir" %% "tapir-armeria-server" % "@VERSION@" +``` + +and import the object: + +```scala mdoc:compile-only +import sttp.tapir.server.armeria.ArmeriaFutureServerInterpreter +``` +to use this interpreter with `Future`. + +The `toService` method require a single, or a list of `ServerEndpoint`s, which can be created by adding +[server logic](logic.md) to an endpoint. + +```scala mdoc:compile-only +import sttp.tapir._ +import sttp.tapir.server.armeria.ArmeriaFutureServerInterpreter +import scala.concurrent.Future +import com.linecorp.armeria.server.Server + +object Main { + // JVM entry point that starts the HTTP server + def main(args: Array[String]): Unit = { + val tapirEndpoint: PublicEndpoint[(String, Int), Unit, String, Any] = ??? // your definition here + def logic(s: String, i: Int): Future[Either[Unit, String]] = ??? // your logic here + val tapirService = ArmeriaFutureServerInterpreter().toService(tapirEndpoint.serverLogic((logic _).tupled)) + val server = Server + .builder() + .service(tapirService) // your endpoint is bound to the server + .build() + server.start().join() + } +} +``` + +This interpreter also supports streaming using Armeria Streams which is fully compatible with Reactive Streams: + +```scala mdoc:compile-only + +import sttp.capabilities.armeria.ArmeriaStreams +import sttp.tapir._ +import sttp.tapir.server.armeria.ArmeriaFutureServerInterpreter +import scala.concurrent.Future +import com.linecorp.armeria.common.HttpData +import com.linecorp.armeria.common.stream.StreamMessage +import org.reactivestreams.Publisher + +val streamingResponse: PublicEndpoint[Int, Unit, Publisher[HttpData], ArmeriaStreams] = + endpoint + .in("stream") + .in(query[Int]("key")) + .out(streamTextBody(ArmeriaStreams)(CodecFormat.TextPlain())) + +def streamLogic(foo: Int): Future[Publisher[HttpData]] = { + Future.successful(StreamMessage.of(HttpData.ofUtf8("hello"), HttpData.ofUtf8("world"))) +} + +val tapirService = ArmeriaFutureServerInterpreter().toService(streamingResponse.serverLogicSuccess(streamLogic)) +``` + +## Configuration + +Every endpoint can be configured by providing an instance of `ArmeriaFutureEndpointOptions`, see [server options](options.md) for details. +Note that Armeria automatically injects an `ExecutionContext` on top of Armeria's `EventLoop` to invoke the logic. + +## Cats Effect + +Add the following dependency +```scala +"com.softwaremill.sttp.tapir" %% "tapir-armeria-server-cats" % "@VERSION@" +``` +to use this interpreter with Cats Effect typeclasses. + +Then import the object: +```scala mdoc:compile-only +import sttp.tapir.server.armeria.cats.ArmeriaCatsServerInterpreter +``` + +This object contains the `toService(e: ServerEndpoint[Fs2Streams[F], F])` method which returns a `TapirService[Fs2Streams[F], F]`. +An HTTP server can then be started as in the following example: + +```scala mdoc:compile-only +import sttp.tapir._ +import sttp.tapir.server.armeria.cats.ArmeriaCatsServerInterpreter +import cats.effect._ +import cats.effect.std.Dispatcher +import com.linecorp.armeria.server.Server + +object Main extends IOApp { + override def run(args: List[String]): IO[ExitCode] = { + val tapirEndpoint: PublicEndpoint[String, Unit, String, Any] = ??? + def logic(req: String): IO[Either[Unit, String]] = ??? + + Dispatcher[IO] + .flatMap { dispatcher => + Resource + .make( + IO.async_[Server] { cb => + val tapirService = ArmeriaCatsServerInterpreter[IO](dispatcher).toService(tapirEndpoint.serverLogic(logic)) + + val server = Server + .builder() + .service(tapirService) + .build() + server.start().handle[Unit] { + case (_, null) => cb(Right(server)) + case (_, cause) => cb(Left(cause)) + } + } + )({ server => + IO.fromCompletableFuture(IO(server.closeAsync())).void + }) + } + .use(_ => IO.never) + } +} +``` + +This interpreter also supports streaming using FS2 streams: + +```scala mdoc:compile-only +import sttp.capabilities.fs2.Fs2Streams +import sttp.tapir._ +import sttp.tapir.server.armeria.cats.ArmeriaCatsServerInterpreter +import cats.effect._ +import cats.effect.std.Dispatcher +import fs2._ + +val streamingResponse: Endpoint[Unit, Int, Unit, Stream[IO, Byte], Fs2Streams[IO]] = + endpoint + .in("stream") + .in(query[Int]("times")) + .out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) + +def streamLogic(times: Int): IO[Stream[IO, Byte]] = { + IO.pure(Stream.chunk(Chunk.array("Hello world!".getBytes)).repeatN(times)) +} + +def dispatcher: Dispatcher[IO] = ??? + +val tapirService = ArmeriaCatsServerInterpreter(dispatcher).toService(streamingResponse.serverLogicSuccess(streamLogic)) +``` + +## ZIO + +Add the following dependency + +```scala +"com.softwaremill.sttp.tapir" %% "tapir-armeria-server-zio" % "@VERSION@" +``` + +to use this interpreter with ZIO. + +Then import the object: +```scala mdoc:compile-only +import sttp.tapir.server.armeria.zio.ArmeriaZioServerInterpreter +``` + +This object contains `toService(e: ServerEndpoint[ZioStreams, RIO[R, *]])` method which returns a `TapirService[ZioStreams, RIO[R, *]]`. +An HTTP server can then be started as in the following example: + +```scala mdoc:compile-only +import sttp.tapir._ +import sttp.tapir.server.armeria.zio.ArmeriaZioServerInterpreter +import sttp.tapir.ztapir._ +import zio._ +import com.linecorp.armeria.server.Server + +object Main extends zio.App { + override def run(args: List[String]): URIO[ZEnv, ExitCode] = { + implicit val runtime = Runtime.default + + val tapirEndpoint: PublicEndpoint[String, Unit, String, Any] = ??? + def logic(key: String): UIO[String] = ??? + + ZManaged + .make(ZIO.fromCompletableFuture { + val tapirService = ArmeriaZioServerInterpreter().toService(tapirEndpoint.zServerLogic(logic)) + + val server = Server + .builder() + .service(tapirService) + .build() + server.start().thenApply[Server](_ => server) + }) { server => + ZIO.fromCompletableFuture(server.closeAsync()).orDie + }.useForever.as(ExitCode.success).orDie + } +} +``` + +This interpreter supports streaming using ZStreams. diff --git a/examples/src/main/scala/sttp/tapir/examples/HelloWorldArmeriaServer.scala b/examples/src/main/scala/sttp/tapir/examples/HelloWorldArmeriaServer.scala new file mode 100644 index 0000000000..cffe16e43e --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/HelloWorldArmeriaServer.scala @@ -0,0 +1,36 @@ +package sttp.tapir.examples + +import com.linecorp.armeria.server.Server +import scala.concurrent.Future +import sttp.client3.{HttpURLConnectionBackend, Identity, SttpBackend, UriContext, asStringAlways, basicRequest} +import sttp.capabilities.armeria.ArmeriaStreams +import sttp.tapir.server.armeria.{ArmeriaFutureServerInterpreter, TapirService} +import sttp.tapir.{PublicEndpoint, endpoint, query, stringBody} + +object HelloWorldArmeriaServer extends App { + + // the endpoint: single fixed path input ("hello"), single query parameter + // corresponds to: GET /hello?name=... + val helloWorld: PublicEndpoint[String, Unit, String, Any] = + endpoint.get.in("hello").in(query[String]("name")).out(stringBody) + + // converting an endpoint to a TapirService (providing server-side logic); extension method comes from imported packages + val helloWorldService: TapirService[ArmeriaStreams, Future] = + ArmeriaFutureServerInterpreter().toService(helloWorld.serverLogicSuccess(name => Future.successful(s"Hello, $name!"))) + + // starting the server + val server: Server = Server + .builder() + .http(8080) + .service(helloWorldService) + .build() + + server.start().join() + // testing + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + val result: String = basicRequest.response(asStringAlways).get(uri"http://localhost:8080/hello?name=Frodo").send(backend).body + println("Got result: " + result) + + assert(result == "Hello, Frodo!") + server.stop().join() +} diff --git a/project/Versions.scala b/project/Versions.scala index 4674ed9229..67ec9d61a3 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -25,6 +25,7 @@ object Versions { val zio1Json = "0.2.0-M3" val zio = "2.0.0-RC2" val zioInteropCats = "3.3.0-RC2" + val zioInteropReactiveStreams = "2.0.0-RC3" val zioJson = "0.3.0-RC3" val playClient = "2.1.7" val playServer = "2.8.13" @@ -35,4 +36,7 @@ object Versions { val derevo = "0.13.0" val newtype = "0.4.4" val awsLambdaInterface = "2.1.0" + val armeria = "1.14.0" + val scalaJava8Compat = "1.0.2" + val fs2 = "3.2.4" } diff --git a/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerInterpreter.scala b/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerInterpreter.scala new file mode 100644 index 0000000000..a1ee5524c1 --- /dev/null +++ b/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerInterpreter.scala @@ -0,0 +1,36 @@ +package sttp.tapir.server.armeria.cats + +import cats.effect.Async +import cats.effect.std.Dispatcher +import sttp.capabilities.fs2.Fs2Streams +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.armeria.TapirService + +trait ArmeriaCatsServerInterpreter[F[_]] { + + implicit def fa: Async[F] + + def armeriaServerOptions: ArmeriaCatsServerOptions[F] + + def toService(serverEndpoint: ServerEndpoint[Fs2Streams[F], F]): TapirService[Fs2Streams[F], F] = + toService(List(serverEndpoint)) + + def toService(serverEndpoints: List[ServerEndpoint[Fs2Streams[F], F]]): TapirService[Fs2Streams[F], F] = + TapirCatsService(serverEndpoints, armeriaServerOptions) +} + +object ArmeriaCatsServerInterpreter { + def apply[F[_]](dispatcher: Dispatcher[F])(implicit _fa: Async[F]): ArmeriaCatsServerInterpreter[F] = { + new ArmeriaCatsServerInterpreter[F] { + override implicit def fa: Async[F] = _fa + override def armeriaServerOptions: ArmeriaCatsServerOptions[F] = ArmeriaCatsServerOptions.default[F](dispatcher)(fa) + } + } + + def apply[F[_]](serverOptions: ArmeriaCatsServerOptions[F])(implicit _fa: Async[F]): ArmeriaCatsServerInterpreter[F] = { + new ArmeriaCatsServerInterpreter[F] { + override implicit def fa: Async[F] = _fa + override def armeriaServerOptions: ArmeriaCatsServerOptions[F] = serverOptions + } + } +} diff --git a/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerOptions.scala b/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerOptions.scala new file mode 100644 index 0000000000..a41a42f7db --- /dev/null +++ b/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerOptions.scala @@ -0,0 +1,77 @@ +package sttp.tapir.server.armeria.cats + +import cats.effect.std.Dispatcher +import cats.effect.{Async, Sync} +import com.linecorp.armeria.common.CommonPools +import org.slf4j.{Logger, LoggerFactory} +import scala.util.control.NonFatal +import sttp.tapir.server.armeria.ArmeriaServerOptions +import sttp.tapir.server.interceptor.log.{DefaultServerLog, ServerLog} +import sttp.tapir.server.interceptor.{CustomInterceptors, Interceptor} +import sttp.tapir.{Defaults, TapirFile} + +final case class ArmeriaCatsServerOptions[F[_]]( + dispatcher: Dispatcher[F], + createFile: () => F[TapirFile], + deleteFile: TapirFile => F[Unit], + interceptors: List[Interceptor[F]] +) extends ArmeriaServerOptions[F] { + def prependInterceptor(i: Interceptor[F]): ArmeriaCatsServerOptions[F] = + copy(interceptors = i :: interceptors) + + def appendInterceptor(i: Interceptor[F]): ArmeriaCatsServerOptions[F] = + copy(interceptors = interceptors :+ i) +} + +object ArmeriaCatsServerOptions { + + /** Allows customising the interceptors used by the server interpreter. */ + def customInterceptors[F[_]](dispatcher: Dispatcher[F])(implicit F: Async[F]): CustomInterceptors[F, ArmeriaCatsServerOptions[F]] = { + CustomInterceptors( + createOptions = (ci: CustomInterceptors[F, ArmeriaCatsServerOptions[F]]) => { + ArmeriaCatsServerOptions[F]( + dispatcher, + () => defaultCreateFile()(F), + file => defaultDeleteFile(file)(F), + ci.interceptors + ) + } + ).serverLog(defaultServerLog) + } + + private val logger: Logger = LoggerFactory.getLogger(this.getClass.getPackage.getName) + + def defaultCreateFile[F[_]: Async](): F[TapirFile] = blocking(Defaults.createTempFile()) + + def defaultDeleteFile[F[_]: Async](file: TapirFile): F[Unit] = blocking(Defaults.deleteFile()(file)) + + def defaultServerLog[F[_]: Async]: ServerLog[F] = DefaultServerLog[F]( + doLogWhenHandled = debugLog[F], + doLogAllDecodeFailures = debugLog[F], + doLogExceptions = (msg: String, ex: Throwable) => Sync[F].delay(logger.warn(msg, ex)), + noLog = Async[F].pure(()) + ) + + def default[F[_]: Async](dispatcher: Dispatcher[F]): ArmeriaCatsServerOptions[F] = customInterceptors(dispatcher).options + + private def debugLog[F[_]: Async](msg: String, exOpt: Option[Throwable]): F[Unit] = + Sync[F].delay(exOpt match { + case None => logger.debug(msg) + case Some(ex) => logger.debug(msg, ex) + }) + + private def blocking[F[_], T](body: => T)(implicit F: Async[F]): F[T] = { + F.async_ { cb => + CommonPools + .blockingTaskExecutor() + .execute(() => { + try { + cb(Right(body)) + } catch { + case NonFatal(ex) => + cb(Left(ex)) + } + }) + } + } +} diff --git a/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/CatsMonadAsyncError.scala b/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/CatsMonadAsyncError.scala new file mode 100644 index 0000000000..f9d973b776 --- /dev/null +++ b/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/CatsMonadAsyncError.scala @@ -0,0 +1,12 @@ +package sttp.tapir.server.armeria.cats + +import cats.effect.Async +import cats.syntax.functor._ +import sttp.monad.{Canceler, MonadAsyncError} +import sttp.tapir.integ.cats.CatsMonadError + +// Forked from sttp.client3.impl.cats.CatsMonadAsyncError +private class CatsMonadAsyncError[F[_]](implicit F: Async[F]) extends CatsMonadError[F] with MonadAsyncError[F] { + override def async[T](register: ((Either[Throwable, T]) => Unit) => Canceler): F[T] = + F.async(cb => F.delay(register(cb)).map(c => Some(F.delay(c.cancel())))) +} diff --git a/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/TapirCatsService.scala b/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/TapirCatsService.scala new file mode 100644 index 0000000000..efc59c13bb --- /dev/null +++ b/server/armeria-server/cats/src/main/scala/sttp/tapir/server/armeria/cats/TapirCatsService.scala @@ -0,0 +1,100 @@ +package sttp.tapir.server.armeria.cats + +import cats.effect.Async +import cats.effect.std.Dispatcher +import com.linecorp.armeria.common.{HttpData, HttpRequest, HttpResponse} +import com.linecorp.armeria.server.ServiceRequestContext +import fs2.interop.reactivestreams._ +import fs2.{Chunk, Stream} +import java.util.concurrent.CompletableFuture +import org.reactivestreams.Publisher +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} +import sttp.capabilities.fs2.Fs2Streams +import sttp.monad.MonadAsyncError +import sttp.monad.syntax._ +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.armeria._ +import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} + +private[cats] final case class TapirCatsService[F[_]: Async]( + serverEndpoints: List[ServerEndpoint[Fs2Streams[F], F]], + armeriaServerOptions: ArmeriaCatsServerOptions[F] +) extends TapirService[Fs2Streams[F], F] { + + private[this] implicit val monad: MonadAsyncError[F] = new CatsMonadAsyncError() + private[this] implicit val bodyListener: BodyListener[F, ArmeriaResponseType] = new ArmeriaBodyListener + + private[this] val dispatcher: Dispatcher[F] = armeriaServerOptions.dispatcher + private[this] val fs2StreamCompatible: StreamCompatible[Fs2Streams[F]] = Fs2StreamCompatible(dispatcher) + private[this] val interpreter: ServerInterpreter[Fs2Streams[F], F, ArmeriaResponseType, Fs2Streams[F]] = + new ServerInterpreter( + serverEndpoints, + new ArmeriaToResponseBody(fs2StreamCompatible), + armeriaServerOptions.interceptors, + armeriaServerOptions.deleteFile + ) + + override def serve(ctx: ServiceRequestContext, req: HttpRequest): HttpResponse = { + implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(ctx.eventLoop()) + implicit val catsFutureConversion: CatsFutureConversion[F] = new CatsFutureConversion(dispatcher) + + val serverRequest = new ArmeriaServerRequest(ctx) + val requestBody = new ArmeriaRequestBody(ctx, armeriaServerOptions, fs2StreamCompatible) + val future = new CompletableFuture[HttpResponse]() + val result = interpreter(serverRequest, requestBody).map(ResultMapping.toArmeria) + + val (response, cancelRef) = dispatcher.unsafeToFutureCancelable(result) + response.onComplete { + case Failure(exception) => + future.completeExceptionally(exception) + case Success(value) => + future.complete(value) + } + + val httpResponse = HttpResponse.from(future) + httpResponse + .whenComplete() + .asInstanceOf[CompletableFuture[Unit]] + .exceptionally { case (_: Throwable) => + cancelRef() + () + } + httpResponse + } +} + +private object Fs2StreamCompatible { + def apply[F[_]: Async](dispatcher: Dispatcher[F]): StreamCompatible[Fs2Streams[F]] = { + new StreamCompatible[Fs2Streams[F]] { + override val streams: Fs2Streams[F] = Fs2Streams[F] + + override def asStreamMessage(stream: Stream[F, Byte]): Publisher[HttpData] = + StreamUnicastPublisher( + stream.chunks + .map { chunk => + val bytes = chunk.compact + HttpData.wrap(bytes.values, bytes.offset, bytes.length) + }, + dispatcher + ) + + override def fromArmeriaStream(publisher: Publisher[HttpData]): Stream[F, Byte] = + publisher.toStreamBuffered[F](4).flatMap(httpData => Stream.chunk(Chunk.array(httpData.array()))) + } + } +} + +private class CatsFutureConversion[F[_]: Async](dispatcher: Dispatcher[F])(implicit ec: ExecutionContext) extends FutureConversion[F] { + override def from[A](f: => Future[A]): F[A] = { + Async[F].async_ { cb => + f.onComplete { + case Failure(exception) => cb(Left(exception)) + case Success(value) => cb(Right(value)) + } + () + } + } + + override def to[A](f: => F[A]): Future[A] = dispatcher.unsafeToFuture(f) +} diff --git a/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerTest.scala b/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerTest.scala new file mode 100644 index 0000000000..fca6dba27b --- /dev/null +++ b/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerTest.scala @@ -0,0 +1,21 @@ +package sttp.tapir.server.armeria.cats + +import cats.effect.{IO, Resource} +import sttp.capabilities.fs2.Fs2Streams +import sttp.tapir.integ.cats.CatsMonadError +import sttp.tapir.server.tests._ +import sttp.tapir.tests.{Test, TestSuite} + +class ArmeriaCatsServerTest extends TestSuite { + + override def tests: Resource[IO, List[Test]] = backendResource.map { backend => + implicit val m: CatsMonadError[IO] = new CatsMonadError[IO] + + val interpreter = new ArmeriaCatsTestServerInterpreter(dispatcher) + val createServerTest = new DefaultCreateServerTest(backend, interpreter) + + new AllServerTests(createServerTest, interpreter, backend, basic = false).tests() ++ + new ServerBasicTests(createServerTest, interpreter, supportsUrlEncodedPathSegments = false).tests() ++ + new ServerStreamingTests(createServerTest, Fs2Streams[IO]).tests() + } +} diff --git a/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsTestServerInterpreter.scala b/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsTestServerInterpreter.scala new file mode 100644 index 0000000000..42f732a0e1 --- /dev/null +++ b/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsTestServerInterpreter.scala @@ -0,0 +1,31 @@ +package sttp.tapir.server.armeria.cats + +import cats.effect.IO +import cats.effect.std.Dispatcher +import sttp.capabilities.fs2.Fs2Streams +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.armeria.{ArmeriaTestServerInterpreter, TapirService} +import sttp.tapir.server.interceptor.decodefailure.{DecodeFailureHandler, DefaultDecodeFailureHandler} +import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor + +class ArmeriaCatsTestServerInterpreter(dispatcher: Dispatcher[IO]) extends ArmeriaTestServerInterpreter[Fs2Streams[IO], IO] { + + override def route( + e: ServerEndpoint[Fs2Streams[IO], IO], + decodeFailureHandler: Option[DecodeFailureHandler], + metricsInterceptor: Option[MetricsRequestInterceptor[IO]] = None + ): TapirService[Fs2Streams[IO], IO] = { + val options: ArmeriaCatsServerOptions[IO] = { + ArmeriaCatsServerOptions + .customInterceptors[IO](dispatcher) + .metricsInterceptor(metricsInterceptor) + .decodeFailureHandler(decodeFailureHandler.getOrElse(DefaultDecodeFailureHandler.default)) + .options + } + ArmeriaCatsServerInterpreter(options).toService(e) + } + + override def route(es: List[ServerEndpoint[Fs2Streams[IO], IO]]): TapirService[Fs2Streams[IO], IO] = { + ArmeriaCatsServerInterpreter(dispatcher).toService(es) + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaBodyListener.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaBodyListener.scala new file mode 100644 index 0000000000..8fb7af28e4 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaBodyListener.scala @@ -0,0 +1,27 @@ +package sttp.tapir.server.armeria + +import scala.util.{Failure, Success, Try} +import sttp.monad.syntax._ +import sttp.monad.{Canceler, MonadAsyncError} +import sttp.tapir.server.interpreter.BodyListener + +private[armeria] final class ArmeriaBodyListener[F[_]](implicit F: MonadAsyncError[F]) extends BodyListener[F, ArmeriaResponseType] { + override def onComplete(body: ArmeriaResponseType)(cb: Try[Unit] => F[Unit]): F[ArmeriaResponseType] = { + body match { + case Left(stream) => + F.async[Try[Unit]] { cb0 => + stream + .whenComplete() + .handle[Unit] { + case (_, null) => + cb0(Right(Success(()))) + case (_, ex) => + cb0(Right(Failure(ex))) + } + Canceler(() => stream.abort()) + }.flatMap(cb(_).map(_ => body)) + case Right(_) => + cb(Success(())).map(_ => body) + } + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaFutureServerInterpreter.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaFutureServerInterpreter.scala new file mode 100644 index 0000000000..724d6ba0d8 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaFutureServerInterpreter.scala @@ -0,0 +1,24 @@ +package sttp.tapir.server.armeria + +import scala.concurrent.Future +import sttp.capabilities.armeria.ArmeriaStreams +import sttp.tapir.server.ServerEndpoint + +trait ArmeriaFutureServerInterpreter { + + def armeriaServerOptions: ArmeriaFutureServerOptions = ArmeriaFutureServerOptions.default + + def toService(serverEndpoint: ServerEndpoint[ArmeriaStreams, Future]): TapirService[ArmeriaStreams, Future] = + toService(List(serverEndpoint)) + + def toService(serverEndpoints: List[ServerEndpoint[ArmeriaStreams, Future]]): TapirService[ArmeriaStreams, Future] = + TapirFutureService(serverEndpoints, armeriaServerOptions) +} + +object ArmeriaFutureServerInterpreter extends ArmeriaFutureServerInterpreter { + def apply(serverOptions: ArmeriaFutureServerOptions = ArmeriaFutureServerOptions.default): ArmeriaFutureServerInterpreter = { + new ArmeriaFutureServerInterpreter { + override def armeriaServerOptions: ArmeriaFutureServerOptions = serverOptions + } + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaFutureServerOptions.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaFutureServerOptions.scala new file mode 100644 index 0000000000..d296fb2c41 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaFutureServerOptions.scala @@ -0,0 +1,68 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.CommonPools +import org.slf4j.{Logger, LoggerFactory} +import scala.concurrent.{Future, Promise} +import scala.util.control.NonFatal +import sttp.tapir.{Defaults, TapirFile} +import sttp.tapir.server.interceptor.log.{DefaultServerLog, ServerLog} +import sttp.tapir.server.interceptor.{CustomInterceptors, Interceptor} + +final case class ArmeriaFutureServerOptions( + createFile: () => Future[TapirFile], + deleteFile: TapirFile => Future[Unit], + interceptors: List[Interceptor[Future]] +) extends ArmeriaServerOptions[Future] { + def prependInterceptor(i: Interceptor[Future]): ArmeriaFutureServerOptions = copy(interceptors = i :: interceptors) + + def appendInterceptor(i: Interceptor[Future]): ArmeriaFutureServerOptions = copy(interceptors = interceptors :+ i) +} + +object ArmeriaFutureServerOptions { + + val defaultServerLog: ServerLog[Future] = DefaultServerLog[Future]( + doLogWhenHandled = debugLog, + doLogAllDecodeFailures = debugLog, + doLogExceptions = (msg: String, ex: Throwable) => Future.successful(logger.warn(msg, ex)), + noLog = Future.unit + ) + + /** Allows customising the interceptors used by the server interpreter. */ + def customInterceptors: CustomInterceptors[Future, ArmeriaFutureServerOptions] = + CustomInterceptors( + createOptions = (ci: CustomInterceptors[Future, ArmeriaFutureServerOptions]) => + ArmeriaFutureServerOptions( + defaultCreateFile, + defaultDeleteFile, + ci.interceptors + ) + ).serverLog(defaultServerLog) + + val default: ArmeriaFutureServerOptions = customInterceptors.options + + private val logger: Logger = LoggerFactory.getLogger(this.getClass.getPackage.getName) + + def defaultCreateFile(): Future[TapirFile] = blocking(Defaults.createTempFile()) + + def defaultDeleteFile(file: TapirFile): Future[Unit] = blocking(Defaults.deleteFile()(file)) + + private def debugLog(msg: String, exOpt: Option[Throwable]): Future[Unit] = + Future.successful(exOpt match { + case None => logger.debug(msg) + case Some(ex) => logger.debug(msg, ex) + }) + + private def blocking[T](body: => T): Future[T] = { + val promise = Promise[T]() + CommonPools + .blockingTaskExecutor() + .execute(() => { + try { + promise.success(body) + } catch { + case NonFatal(ex) => promise.failure(ex) + } + }) + promise.future + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaRequestBody.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaRequestBody.scala new file mode 100644 index 0000000000..d25e174b9d --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaRequestBody.scala @@ -0,0 +1,104 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.multipart.{AggregatedBodyPart, Multipart} +import com.linecorp.armeria.common.stream.{StreamMessage, StreamMessages} +import com.linecorp.armeria.common.{HttpData, HttpRequest} +import com.linecorp.armeria.server.ServiceRequestContext +import java.io.ByteArrayInputStream +import scala.collection.JavaConverters._ +import scala.compat.java8.FutureConverters._ +import scala.concurrent.{ExecutionContext, Future} +import sttp.capabilities.Streams +import sttp.model.Part +import sttp.tapir.server.interpreter.{RawValue, RequestBody} +import sttp.tapir.{FileRange, RawBodyType} + +private[armeria] final class ArmeriaRequestBody[F[_], S <: Streams[S]]( + ctx: ServiceRequestContext, + serverOptions: ArmeriaServerOptions[F], + streamCompatible: StreamCompatible[S] +)(implicit ec: ExecutionContext, futureConversion: FutureConversion[F]) + extends RequestBody[F, S] { + + private val request: HttpRequest = ctx.request() + + override val streams: Streams[S] = streamCompatible.streams + + override def toStream(): streams.BinaryStream = { + streamCompatible + .fromArmeriaStream(request.filter(x => x.isInstanceOf[HttpData]).asInstanceOf[StreamMessage[HttpData]]) + .asInstanceOf[streams.BinaryStream] + } + + override def toRaw[R](bodyType: RawBodyType[R]): F[RawValue[R]] = futureConversion.from(bodyType match { + case RawBodyType.StringBody(_) => + request.aggregate().thenApply[RawValue[R]](agg => RawValue(agg.contentUtf8())).toScala + case RawBodyType.ByteArrayBody => + request.aggregate().thenApply[RawValue[R]](agg => RawValue(agg.content().array())).toScala + case RawBodyType.ByteBufferBody => + request.aggregate().thenApply[RawValue[R]](agg => RawValue(agg.content().byteBuf().nioBuffer())).toScala + case RawBodyType.InputStreamBody => + request + .aggregate() + .thenApply[RawValue[R]](agg => RawValue(new ByteArrayInputStream(agg.content().array()))) + .toScala + case RawBodyType.FileBody => + val bodyStream = request.filter(x => x.isInstanceOf[HttpData]).asInstanceOf[StreamMessage[HttpData]] + for { + file <- futureConversion.to(serverOptions.createFile()) + _ <- StreamMessages.writeTo(bodyStream, file.toPath, ctx.eventLoop(), ctx.blockingTaskExecutor()).toScala + fileRange = FileRange(file) + } yield RawValue(fileRange, Seq(fileRange)) + case m: RawBodyType.MultipartBody => + Multipart + .from(request) + .aggregate() + .toScala + .flatMap(multipart => { + val rawParts = multipart + .bodyParts() + .asScala + .toList + .flatMap(part => m.partType(part.name()).map(toRawPart(part, _))) + + Future + .sequence(rawParts) + .map(RawValue.fromParts(_)) + }) + .asInstanceOf[Future[RawValue[R]]] + }) + + private def toRawFromHttpData[R](body: HttpData, bodyType: RawBodyType[R]): Future[RawValue[R]] = { + bodyType match { + case RawBodyType.StringBody(_) => Future.successful(RawValue(body.toStringUtf8)) + case RawBodyType.ByteArrayBody => Future.successful(RawValue(body.array())) + case RawBodyType.ByteBufferBody => Future.successful(RawValue(body.byteBuf().nioBuffer())) + case RawBodyType.InputStreamBody => Future.successful(RawValue(new ByteArrayInputStream(body.array()))) + case RawBodyType.FileBody => + for { + file <- futureConversion.to(serverOptions.createFile()) + _ <- StreamMessages.writeTo(StreamMessage.of(Array(body): _*), file.toPath, ctx.eventLoop(), ctx.blockingTaskExecutor()).toScala + fileRange = FileRange(file) + } yield RawValue(fileRange, Seq(fileRange)) + case RawBodyType.MultipartBody(_, _) => + throw new UnsupportedOperationException("Nested multipart data is not supported.") + } + } + + private def toRawPart[R](part: AggregatedBodyPart, bodyType: RawBodyType[R]): Future[Part[R]] = { + toRawFromHttpData(part.content(), bodyType) + .map((r: RawValue[R]) => + Part( + name = part.name, + body = r.value, + contentType = if (part.contentType() != null) { + Some(HeaderMapping.fromArmeria(part.contentType())) + } else { + None + }, + fileName = Option(part.filename()), + otherHeaders = HeaderMapping.fromArmeria(part.headers()) + ) + ) + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerOptions.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerOptions.scala new file mode 100644 index 0000000000..a415f51422 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerOptions.scala @@ -0,0 +1,10 @@ +package sttp.tapir.server.armeria + +import sttp.tapir.TapirFile +import sttp.tapir.server.interceptor.Interceptor + +trait ArmeriaServerOptions[F[_]] { + def createFile: () => F[TapirFile] + def deleteFile: TapirFile => F[Unit] + def interceptors: List[Interceptor[F]] +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerRequest.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerRequest.scala new file mode 100644 index 0000000000..39d4d71eb7 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerRequest.scala @@ -0,0 +1,60 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.HttpRequest +import com.linecorp.armeria.server.ServiceRequestContext +import java.net.InetSocketAddress +import scala.collection.JavaConverters._ +import sttp.model.{Header, Method, QueryParams, Uri} +import sttp.tapir.model.{ConnectionInfo, ServerRequest} +import scala.collection.immutable.Seq + +private[armeria] final class ArmeriaServerRequest(ctx: ServiceRequestContext) extends ServerRequest { + private lazy val request: HttpRequest = ctx.request + + lazy val connectionInfo: ConnectionInfo = { + val remotePort = ctx.remoteAddress[InetSocketAddress]().getPort + val clientAddress = InetSocketAddress.createUnresolved(ctx.clientAddress().getHostAddress, remotePort) + ConnectionInfo( + Some(ctx.localAddress[InetSocketAddress]()), + Some(clientAddress), + Some(ctx.sessionProtocol().isTls) + ) + } + + override lazy val method: Method = MethodMapping.fromArmeria(request.method()) + + override lazy val protocol: String = ctx.sessionProtocol().toString + + override lazy val uri: Uri = Uri(ctx.request().uri()) + + override lazy val headers: Seq[Header] = HeaderMapping.fromArmeria(request.headers()) + + override def header(name: String): Option[String] = Option(request.headers().get(name)) + + override def underlying: Any = ctx + + override val pathSegments: List[String] = { + // ctx.path() always starts with '/'. + if (ctx.path() == "/") { + Nil + } else { + ctx.path().substring(1).split("/").toList + } + } + + override val queryParameters: QueryParams = { + val params = ctx.queryParams() + + val builder = Seq.newBuilder[(String, Seq[String])] + builder.sizeHint(params.size()) + + params + .names() + .forEach(key => { + val list = params.getAll(key).asScala.toList + builder += ((key, list)) + }) + + QueryParams(builder.result()) + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaToResponseBody.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaToResponseBody.scala new file mode 100644 index 0000000000..b4d479c013 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaToResponseBody.scala @@ -0,0 +1,101 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.multipart.{BodyPart, Multipart} +import com.linecorp.armeria.common.stream.StreamMessage +import com.linecorp.armeria.common.{ContentDisposition, HttpData, HttpHeaders} +import com.linecorp.armeria.internal.shaded.guava.io.ByteStreams +import io.netty.buffer.Unpooled +import java.io.InputStream +import java.nio.ByteBuffer +import java.nio.charset.Charset +import sttp.capabilities.Streams +import sttp.model.{HasHeaders, HeaderNames, Part} +import sttp.tapir.server.interpreter.ToResponseBody +import sttp.tapir.{CodecFormat, FileRange, RawBodyType, RawPart, WebSocketBodyOutput} + +private[armeria] final class ArmeriaToResponseBody[S <: Streams[S]](streamCompatible: StreamCompatible[S]) + extends ToResponseBody[ArmeriaResponseType, S] { + override val streams: S = streamCompatible.streams + + override def fromRawValue[R](v: R, headers: HasHeaders, format: CodecFormat, bodyType: RawBodyType[R]): ArmeriaResponseType = + rawValueToHttpData(bodyType, v) + + override def fromStreamValue( + v: streams.BinaryStream, + headers: HasHeaders, + format: CodecFormat, + charset: Option[Charset] + ): ArmeriaResponseType = + Left(StreamMessage.of(streamCompatible.asStreamMessage(v.asInstanceOf[streamCompatible.streams.BinaryStream]))) + + override def fromWebSocketPipe[REQ, RESP]( + pipe: streams.Pipe[REQ, RESP], + o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, S] + ): ArmeriaResponseType = throw new UnsupportedOperationException() + + private def rawValueToHttpData[R](bodyType: RawBodyType[R], v: R): ArmeriaResponseType = { + bodyType match { + case RawBodyType.StringBody(charset) => + val str = v.asInstanceOf[String] + Right(HttpData.of(charset, str)) + + case RawBodyType.ByteArrayBody => + val bytes = v.asInstanceOf[Array[Byte]] + Right(HttpData.wrap(bytes)) + + case RawBodyType.ByteBufferBody => + val byteBuffer = v.asInstanceOf[ByteBuffer] + Right(HttpData.wrap(Unpooled.wrappedBuffer(byteBuffer))) + + case RawBodyType.InputStreamBody => + val is = v.asInstanceOf[InputStream] + // TODO(ikhoon): Add StreamMessage.of(InputStream) + Right(HttpData.wrap(ByteStreams.toByteArray(is))) + + case RawBodyType.FileBody => + val tapirFile = v.asInstanceOf[FileRange] + val streamMessage = tapirFile.range + .flatMap(r => + r.startAndEnd.map { case (start, end) => + PathStreamMessage(tapirFile.file.toPath, start, end) + } + ) + .getOrElse(StreamMessage.of(tapirFile.file)) + Left(streamMessage) + + case m: RawBodyType.MultipartBody => + val parts = (v: Seq[RawPart]).flatMap(rawPartToBodyPart(m, _)) + Left(Multipart.of(parts: _*).toStreamMessage) + } + } + + private def rawPartToBodyPart[T](m: RawBodyType.MultipartBody, part: Part[T]): Option[BodyPart] = { + m.partType(part.name).map { partType => + val headerBuilder = HttpHeaders.builder() + part.headers.foreach(header => headerBuilder.add(header.name, header.value)) + + if (part.header(HeaderNames.ContentDisposition).isEmpty) { + // Build Content-Disposition header if missing + val dispositionType = part.otherDispositionParams.getOrElse("type", "form-data") + val dispositionBuilder = + ContentDisposition + .builder(dispositionType) + .name(part.name) + part.fileName.foreach(dispositionBuilder.filename) + headerBuilder.contentDisposition(dispositionBuilder.build()) + } + + val bodyPartBuilder = + BodyPart + .builder() + .headers(headerBuilder.build()) + rawValueToHttpData(partType.asInstanceOf[RawBodyType[Any]], part.body) match { + case Left(stream) => + bodyPartBuilder.content(stream) + case Right(httpData) => + bodyPartBuilder.content(httpData) + } + bodyPartBuilder.build(); + } + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/FutureConversion.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/FutureConversion.scala new file mode 100644 index 0000000000..b184bf5fb1 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/FutureConversion.scala @@ -0,0 +1,17 @@ +package sttp.tapir.server.armeria + +import scala.concurrent.Future + +private[armeria] trait FutureConversion[F[_]] { + def from[A](f: => Future[A]): F[A] + + def to[A](f: => F[A]): Future[A] +} + +private[armeria] object FutureConversion { + val identity: FutureConversion[Future] = new FutureConversion[Future] { + override def from[A](f: => Future[A]): Future[A] = f + + override def to[A](f: => Future[A]): Future[A] = f + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/HeaderMapping.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/HeaderMapping.scala new file mode 100644 index 0000000000..67d985863c --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/HeaderMapping.scala @@ -0,0 +1,44 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.{HttpHeaders, ResponseHeaders, MediaType => ArmeriaMediaType} +import scala.collection.JavaConverters._ +import scala.collection.immutable.Seq +import sttp.model.{Header, MediaType, StatusCode} + +private[armeria] object HeaderMapping { + def fromArmeria(headers: HttpHeaders): Seq[Header] = { + val builder = Seq.newBuilder[Header] + builder.sizeHint(headers.size()) + + headers.forEach((key, value) => builder += Header(key.toString, value)) + builder.result() + } + + def fromArmeria(mediaType: ArmeriaMediaType): MediaType = { + val parameters = mediaType + .parameters() + .asScala + .map { case (x, v) => + // The each parameter of MediaType of Armeria returns a list. + (x, v.get(0)) + } + .toMap + MediaType(mediaType.`type`(), mediaType.subtype(), Option(mediaType.charset()).map(_.toString), parameters) + } + + def toArmeria(mediaType: MediaType): ArmeriaMediaType = { + ArmeriaMediaType.parse(mediaType.toString()) + } + + def toArmeria(headers: Seq[Header]): HttpHeaders = { + val builder = HttpHeaders.builder() + headers.foreach(header => builder.add(header.name, header.value)) + builder.build() + } + + def toArmeria(headers: Seq[Header], status: StatusCode): ResponseHeaders = { + val builder = ResponseHeaders.builder(status.code) + headers.foreach(header => builder.add(header.name, header.value)) + builder.build() + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/MethodMapping.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/MethodMapping.scala new file mode 100644 index 0000000000..aaf74c842e --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/MethodMapping.scala @@ -0,0 +1,36 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.HttpMethod +import sttp.model.Method + +/** Utility object to convert HTTP methods between Armeria and Tapir. */ +private[armeria] object MethodMapping { + + private val sttpToArmeria: Map[Method, HttpMethod] = Map( + Method.CONNECT -> HttpMethod.CONNECT, + Method.DELETE -> HttpMethod.DELETE, + Method.GET -> HttpMethod.GET, + Method.HEAD -> HttpMethod.HEAD, + Method.OPTIONS -> HttpMethod.OPTIONS, + Method.PATCH -> HttpMethod.PATCH, + Method.POST -> HttpMethod.POST, + Method.PUT -> HttpMethod.PUT, + Method.TRACE -> HttpMethod.TRACE + ) + + private val armeriaToSttp: Map[HttpMethod, Method] = Map( + HttpMethod.CONNECT -> Method.CONNECT, + HttpMethod.DELETE -> Method.DELETE, + HttpMethod.GET -> Method.GET, + HttpMethod.HEAD -> Method.HEAD, + HttpMethod.OPTIONS -> Method.OPTIONS, + HttpMethod.PATCH -> Method.PATCH, + HttpMethod.POST -> Method.POST, + HttpMethod.PUT -> Method.PUT, + HttpMethod.TRACE -> Method.TRACE + ) + + def toArmeria(method: Method): HttpMethod = sttpToArmeria(method) + + def fromArmeria(method: HttpMethod): Method = armeriaToSttp(method) +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/PathStreamMessage.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/PathStreamMessage.scala new file mode 100644 index 0000000000..2ffe729dc5 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/PathStreamMessage.scala @@ -0,0 +1,288 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.HttpData +import com.linecorp.armeria.common.stream._ +import com.linecorp.armeria.common.util.Exceptions +import com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.{containsNotifyCancellation, containsWithPooledObjects} +import com.linecorp.armeria.internal.common.stream.NoopSubscription +import com.linecorp.armeria.internal.shaded.guava.math.LongMath +import com.linecorp.armeria.internal.shaded.guava.primitives.Ints +import io.netty.buffer.{ByteBuf, ByteBufAllocator, ByteBufUtil} +import io.netty.util.concurrent.EventExecutor +import java.io.IOException +import java.nio.channels.{AsynchronousFileChannel, CompletionHandler} +import java.nio.file.{Path, StandardOpenOption} +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import org.reactivestreams.{Subscriber, Subscription} +import org.slf4j.{Logger, LoggerFactory} +import sttp.tapir.server.armeria.PathStreamMessage.logger + +// Forked from https://github.com/line/armeria/blob/d1237acd5f1dc660b53305e17774bf29f76766c7/core/src/main/java/com/linecorp/armeria/common/stream/PathStreamMessage.java +// and modified to read a file with a specific range. +// TODO(ikhoon): Upstream's PathStreamMessage did not support partical read. +// Remove this class if https://github.com/line/armeria/pull/4058 is merged. +private[armeria] object PathStreamMessage { + private val logger: Logger = LoggerFactory.getLogger(classOf[PathStreamMessage]) + private val DEFAULT_FILE_BUFFER_SIZE: Int = 8192 + + def apply(path: Path, start: Long, end: Long): PathStreamMessage = { + val normalizedEnd: Long = if (end == -1) Long.MaxValue else end + val bufferSize = math.min(Ints.saturatedCast(normalizedEnd - start), DEFAULT_FILE_BUFFER_SIZE) + + new PathStreamMessage(path, ByteBufAllocator.DEFAULT, start, normalizedEnd, bufferSize) + } +} + +private final class PathStreamMessage( + val path: Path, + val alloc: ByteBufAllocator, + val start: Long, + val end: Long, + val bufferSize: Int +) extends StreamMessage[HttpData] { + private val completionFuture: CompletableFuture[Void] = new CompletableFuture[Void] + private val subscribed: AtomicBoolean = new AtomicBoolean() + + @volatile private var pathSubscription: PathSubscription = _ + + override def isOpen: Boolean = !completionFuture.isDone + + override def isEmpty: Boolean = { + if (isOpen) { + false + } else { + val pathSubscription = this.pathSubscription + pathSubscription == null || pathSubscription.position == 0 + } + } + + override def demand: Long = { + val pathSubscription = this.pathSubscription + if (pathSubscription != null) { + pathSubscription.requested + } else { + 0 + } + } + + override def whenComplete: CompletableFuture[Void] = completionFuture + + override def subscribe(subscriber: Subscriber[_ >: HttpData], executor: EventExecutor, options: SubscriptionOption*): Unit = { + if (!subscribed.compareAndSet(false, true)) { + subscriber.onSubscribe(NoopSubscription.get) + subscriber.onError(new IllegalStateException("Only single subscriber is allowed!")) + } else { + if (executor.inEventLoop) { + subscribe0(subscriber, executor, options: _*) + } else { + executor.execute(() => subscribe0(subscriber, executor, options: _*)) + } + } + } + + private def subscribe0(subscriber: Subscriber[_ >: HttpData], executor: EventExecutor, options: SubscriptionOption*): Unit = { + var fileChannel: AsynchronousFileChannel = null + var success = false + try { + fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ) + if (fileChannel.size == 0) { + subscriber.onSubscribe(NoopSubscription.get) + if (completionFuture.isCompletedExceptionally) + completionFuture.handle[Unit]((_: Void, cause: Throwable) => { + subscriber.onError(Exceptions.peel(cause)) + }) + else { + subscriber.onComplete() + completionFuture.complete(null) + } + return + } + success = true + } catch { + case e: IOException => + subscriber.onSubscribe(NoopSubscription.get) + subscriber.onError(e) + completionFuture.completeExceptionally(e) + return + } finally { + if (!success && fileChannel != null) { + try { + fileChannel.close() + } catch { + case e: IOException => + logger.warn("Unexpected exception while closing {}.", Array(fileChannel, e): _*) + } + } + } + + val pathSubscription: PathSubscription = + new PathSubscription( + fileChannel, + subscriber, + executor, + start, + end, + bufferSize, + containsNotifyCancellation(options: _*), + containsWithPooledObjects(options: _*) + ) + this.pathSubscription = pathSubscription + subscriber.onSubscribe(pathSubscription) + } + + override def abort(): Unit = { + abort(AbortedStreamException.get) + } + + override def abort(cause: Throwable): Unit = { + val pathSubscription: PathSubscription = this.pathSubscription + if (pathSubscription != null) { + pathSubscription.maybeCloseFileChannel() + pathSubscription.close(Some(cause)) + } + val _ = completionFuture.completeExceptionally(cause) + } + + private class PathSubscription( + val fileChannel: AsynchronousFileChannel, + var downstream: Subscriber[_ >: HttpData], + val executor: EventExecutor, + @volatile var position: Long, + val end: Long, + val bufferSize: Int, + val notifyCancellation: Boolean, + val withPooledObjects: Boolean + ) extends Subscription + with CompletionHandler[Integer, ByteBuf] { + + private var reading: Boolean = false + private var closed: Boolean = false + @volatile var requested: Long = 0L + + override def request(n: Long): Unit = { + if (n <= 0L) { + downstream.onError( + new IllegalArgumentException( + "Rule ยง3.9 violated: non-positive subscription " + + "requests " + "are forbidden." + ) + ) + cancel() + } else request0(n) + } + + private def request0(n: Long): Unit = { + val oldRequested = this.requested + if (oldRequested == Long.MaxValue) { + // no-op + } else { + this.requested = if (n == Long.MaxValue) Long.MaxValue else LongMath.saturatedAdd(oldRequested, n) + if (oldRequested > 0) { + // PathSubscription is reading a file. + // New requests will be handled by 'completed(Integer, ByteBuf)'. + } else { + read() + } + } + } + + private def read(): Unit = { + if (!reading && !closed && requested > 0) { + requested -= 1 + reading = true + val position: Long = this.position + val bufferSize: Int = Math.min(this.bufferSize, Ints.saturatedCast(end - position)) + val buffer: ByteBuf = alloc.buffer(bufferSize) + fileChannel.read(buffer.nioBuffer(0, bufferSize), position, buffer, this) + } + } + + override def cancel(): Unit = { + if (executor.inEventLoop) { + cancel0() + } else { + executor.execute(() => cancel0()) + } + } + + private def cancel0(): Unit = { + if (closed) return + closed = true + if (!reading) maybeCloseFileChannel() + val cause: CancelledSubscriptionException = CancelledSubscriptionException.get + if (notifyCancellation) downstream.onError(cause) + completionFuture.completeExceptionally(cause) + downstream = NoopSubscriber.get[HttpData]() + } + + override def completed(result: Integer, byteBuf: ByteBuf): Unit = { + executor.execute(() => { + if (closed) { + byteBuf.release + maybeCloseFileChannel() + } else if (result >= 0) { + position = position + result + var data: HttpData = null + if (withPooledObjects) { + byteBuf.writerIndex(result) + data = HttpData.wrap(byteBuf) + } else { + data = HttpData.wrap(ByteBufUtil.getBytes(byteBuf, 0, result)) + byteBuf.release + } + downstream.onNext(data) + if (position < end) { + reading = false + read() + } else { + maybeCloseFileChannel() + close0(None) + } + } else { + byteBuf.release + maybeCloseFileChannel() + close0(None) + } + }) + } + + override def failed(ex: Throwable, byteBuf: ByteBuf): Unit = { + executor.execute(() => { + byteBuf.release + maybeCloseFileChannel() + close0(Some(ex)) + }) + } + + def maybeCloseFileChannel(): Unit = { + if (fileChannel.isOpen) { + try { + fileChannel.close() + } catch { + case cause: IOException => + logger.warn("Unexpected exception while closing {}.", Array(fileChannel, cause): _*) + } + } + } + + def close(cause: Option[Throwable]): Unit = { + if (executor.inEventLoop) close0(cause) + else executor.execute(() => close0(cause)) + } + + private def close0(cause: Option[Throwable]): Unit = { + if (!closed) { + closed = true + cause match { + case Some(ex) => + downstream.onError(ex) + val _ = completionFuture.completeExceptionally(ex) + case None => + downstream.onComplete() + val _ = completionFuture.complete(null) + } + } + } + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ResultMapping.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ResultMapping.scala new file mode 100644 index 0000000000..5d031fb54e --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ResultMapping.scala @@ -0,0 +1,27 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.{HttpResponse, HttpStatus} +import com.linecorp.armeria.common.multipart.Multipart +import sttp.tapir.server.interceptor.RequestResult + +private[armeria] object ResultMapping { + def toArmeria(result: RequestResult[ArmeriaResponseType]): HttpResponse = { + result match { + case RequestResult.Failure(_) => + HttpResponse.of(HttpStatus.NOT_FOUND) + case RequestResult.Response(response) => + val headers = HeaderMapping.toArmeria(response.headers, response.code) + response.body match { + case None => + HttpResponse.of(headers) + case Some(Right(httpData)) => + HttpResponse.of(headers, httpData) + case Some(Left(stream)) => + stream match { + case multipart: Multipart => multipart.toHttpResponse(headers) + case _ => HttpResponse.of(headers, stream) + } + } + } + } +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/RouteMapping.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/RouteMapping.scala new file mode 100644 index 0000000000..eac0e763e4 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/RouteMapping.scala @@ -0,0 +1,81 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.server.Route +import sttp.tapir.EndpointIO.{Body, StreamBodyWrapper} +import sttp.tapir.EndpointInput.{FixedPath, PathCapture, PathsCapture} +import sttp.tapir.RawBodyType.FileBody +import sttp.tapir.internal.{RichEndpoint, RichEndpointOutput} +import sttp.tapir.{AnyEndpoint, EndpointInput, EndpointTransput, RawBodyType} + +private[armeria] object RouteMapping { + + def toRoute(e: AnyEndpoint): List[(Route, ExchangeType.Value)] = { + val inputs: Seq[EndpointInput.Basic[_]] = e.asVectorOfBasicInputs() + + val outputsList = e.output.asBasicOutputsList + val requestStreaming = inputs.exists(isStreaming) + val responseStreaming = outputsList.exists(_.exists(isStreaming)) + val exchangeType = (requestStreaming, responseStreaming) match { + case (false, false) => ExchangeType.Unary + case (true, false) => ExchangeType.RequestStreaming + case (false, true) => ExchangeType.ResponseStreaming + case (true, true) => ExchangeType.BidiStreaming + } + + toPathPatterns(inputs).map { path => + // Allows all HTTP method to handle invalid requests by RejectInterceptor + val routeBuilder = + Route + .builder() + .path(path) + + (routeBuilder.build(), exchangeType) + } + } + + private def isStreaming(output: EndpointTransput.Basic[_]): Boolean = output match { + case StreamBodyWrapper(_) => true + case body: Body[_, _] => + body.bodyType match { + case FileBody => true + case RawBodyType.MultipartBody(_, _) => true + case _ => false + } + case _ => false + } + + private def toPathPatterns(inputs: Seq[EndpointInput.Basic[_]]): List[String] = { + var idxUsed = 0 + var capturePaths = false + val fragments = inputs.collect { + case segment: FixedPath[_] => + segment.show + case PathCapture(Some(name), _, _) => + s"/:$name" + case PathCapture(_, _, _) => + idxUsed += 1 + s"/:param$idxUsed" + case PathsCapture(_, _) => + idxUsed += 1 + capturePaths = true + s"/:*param$idxUsed" + } + if (fragments.isEmpty) { + // No path should match anything + List("prefix:/") + } else { + val pathPattern = fragments.mkString + if (capturePaths) { + List(pathPattern) + } else { + // endpoint.in("api") should match both '/api', '/api/' + List(pathPattern, s"$pathPattern/") + } + } + } +} + +private[armeria] object ExchangeType extends Enumeration { + type ExchangeType = Value + val Unary, RequestStreaming, ResponseStreaming, BidiStreaming = Value +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/StreamCompatible.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/StreamCompatible.scala new file mode 100644 index 0000000000..0e7c8dda82 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/StreamCompatible.scala @@ -0,0 +1,11 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.HttpData +import org.reactivestreams.Publisher +import sttp.capabilities.Streams + +private[armeria] trait StreamCompatible[S <: Streams[S]] { + val streams: S + def asStreamMessage(s: streams.BinaryStream): Publisher[HttpData] + def fromArmeriaStream(s: Publisher[HttpData]): streams.BinaryStream +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/TapirFutureService.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/TapirFutureService.scala new file mode 100644 index 0000000000..1ba78f558e --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/TapirFutureService.scala @@ -0,0 +1,55 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.common.{HttpData, HttpRequest, HttpResponse} +import com.linecorp.armeria.server.ServiceRequestContext +import java.util.concurrent.CompletableFuture +import org.reactivestreams.Publisher +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} +import sttp.capabilities.armeria.ArmeriaStreams +import sttp.monad.FutureMonad +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter} + +private[armeria] final case class TapirFutureService( + serverEndpoints: List[ServerEndpoint[ArmeriaStreams, Future]], + armeriaServerOptions: ArmeriaFutureServerOptions +) extends TapirService[ArmeriaStreams, Future] { + + private implicit val futureConversion: FutureConversion[Future] = FutureConversion.identity + + override def serve(ctx: ServiceRequestContext, req: HttpRequest): HttpResponse = { + implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(ctx.eventLoop()) + implicit val monad: FutureMonad = new FutureMonad() + implicit val bodyListener: BodyListener[Future, ArmeriaResponseType] = new ArmeriaBodyListener + + val serverRequest = new ArmeriaServerRequest(ctx) + val requestBody: ArmeriaRequestBody[Future, ArmeriaStreams] = + new ArmeriaRequestBody(ctx, armeriaServerOptions, ArmeriaStreamCompatible) + val future = new CompletableFuture[HttpResponse]() + val interpreter: ServerInterpreter[ArmeriaStreams, Future, ArmeriaResponseType, ArmeriaStreams] = new ServerInterpreter( + serverEndpoints, + new ArmeriaToResponseBody(ArmeriaStreamCompatible), + armeriaServerOptions.interceptors, + armeriaServerOptions.deleteFile + ) + + interpreter(serverRequest, requestBody) + .map(ResultMapping.toArmeria) + .onComplete { + case Failure(exception) => + future.completeExceptionally(exception) + case Success(value) => + future.complete(value) + } + HttpResponse.from(future) + } +} + +private object ArmeriaStreamCompatible extends StreamCompatible[ArmeriaStreams] { + override val streams: ArmeriaStreams = ArmeriaStreams + + override def fromArmeriaStream(s: Publisher[HttpData]): Publisher[HttpData] = s + + override def asStreamMessage(s: Publisher[HttpData]): Publisher[HttpData] = s +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/TapirService.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/TapirService.scala new file mode 100644 index 0000000000..a889e620b1 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/TapirService.scala @@ -0,0 +1,19 @@ +package sttp.tapir.server.armeria + +import com.linecorp.armeria.server.{HttpServiceWithRoutes, Route} +import java.util.{Set => JSet} +import scala.collection.JavaConverters._ +import sttp.capabilities.Streams +import sttp.tapir.server.ServerEndpoint + +trait TapirService[S <: Streams[S], F[_]] extends HttpServiceWithRoutes { + + def serverEndpoints: List[ServerEndpoint[S, F]] + + def armeriaServerOptions: ArmeriaServerOptions[F] + + // TODO(ikhoon): Use upstream's ExchangeType to optimize performance for non-streaming requests + // if https://github.com/line/armeria/pull/3956 is merged. + private val routesMap: Map[Route, ExchangeType.Value] = serverEndpoints.flatMap(se => RouteMapping.toRoute(se.endpoint)).toMap + override final val routes: JSet[Route] = routesMap.keySet.asJava +} diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/package.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/package.scala new file mode 100644 index 0000000000..c2ee278960 --- /dev/null +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/package.scala @@ -0,0 +1,8 @@ +package sttp.tapir.server + +import com.linecorp.armeria.common.HttpData +import com.linecorp.armeria.common.stream.StreamMessage + +package object armeria { + private[armeria] type ArmeriaResponseType = Either[StreamMessage[HttpData], HttpData] +} diff --git a/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaFutureServerTest.scala b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaFutureServerTest.scala new file mode 100644 index 0000000000..f686f376f7 --- /dev/null +++ b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaFutureServerTest.scala @@ -0,0 +1,21 @@ +package sttp.tapir.server.armeria + +import cats.effect.{IO, Resource} +import sttp.capabilities.armeria.ArmeriaStreams +import sttp.monad.FutureMonad +import sttp.tapir.server.tests._ +import sttp.tapir.tests.{Test, TestSuite} + +class ArmeriaFutureServerTest extends TestSuite { + + override def tests: Resource[IO, List[Test]] = backendResource.map { backend => + implicit val m: FutureMonad = new FutureMonad() + + val interpreter = new ArmeriaTestFutureServerInterpreter() + val createServerTest = new DefaultCreateServerTest(backend, interpreter) + + new AllServerTests(createServerTest, interpreter, backend, basic = false).tests() ++ + new ServerBasicTests(createServerTest, interpreter, supportsUrlEncodedPathSegments = false).tests() ++ + new ServerStreamingTests(createServerTest, ArmeriaStreams).tests() + } +} diff --git a/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestFutureServerInterpreter.scala b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestFutureServerInterpreter.scala new file mode 100644 index 0000000000..a0fbfc7d9c --- /dev/null +++ b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestFutureServerInterpreter.scala @@ -0,0 +1,25 @@ +package sttp.tapir.server.armeria + +import scala.concurrent.Future +import sttp.capabilities.armeria.ArmeriaStreams +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.interceptor.decodefailure.{DecodeFailureHandler, DefaultDecodeFailureHandler} +import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor + +class ArmeriaTestFutureServerInterpreter extends ArmeriaTestServerInterpreter[ArmeriaStreams, Future] { + + override def route( + e: ServerEndpoint[ArmeriaStreams, Future], + decodeFailureHandler: Option[DecodeFailureHandler] = None, + metricsInterceptor: Option[MetricsRequestInterceptor[Future]] = None + ): TapirService[ArmeriaStreams, Future] = { + val serverOptions: ArmeriaFutureServerOptions = ArmeriaFutureServerOptions.customInterceptors + .metricsInterceptor(metricsInterceptor) + .decodeFailureHandler(decodeFailureHandler.getOrElse(DefaultDecodeFailureHandler.default)) + .options + ArmeriaFutureServerInterpreter(serverOptions).toService(e) + } + + override def route(es: List[ServerEndpoint[ArmeriaStreams, Future]]): TapirService[ArmeriaStreams, Future] = + ArmeriaFutureServerInterpreter().toService(es) +} diff --git a/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestServerInterpreter.scala b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestServerInterpreter.scala new file mode 100644 index 0000000000..aa81cecde2 --- /dev/null +++ b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaTestServerInterpreter.scala @@ -0,0 +1,36 @@ +package sttp.tapir.server.armeria + +import cats.data.NonEmptyList +import cats.effect.{IO, Resource} +import com.linecorp.armeria.server.Server +import sttp.capabilities.Streams +import sttp.tapir.server.tests.TestServerInterpreter +import sttp.tapir.tests.Port + +trait ArmeriaTestServerInterpreter[S <: Streams[S], F[_]] extends TestServerInterpreter[F, S, TapirService[S, F]] { + + override def server(routes: NonEmptyList[TapirService[S, F]]): Resource[IO, Port] = { + val bind = IO.fromCompletableFuture( + IO { + val serverBuilder = Server + .builder() + .maxRequestLength(0) + .connectionDrainDurationMicros(0) + routes.foldLeft(serverBuilder)((sb, route) => sb.service(route)) + val server = serverBuilder.build() + server.start().thenApply[Server](_ => server) + } + ) + Resource + .make(bind)(binding => + IO { + // Ignore returned future for fast test iterations. + // Armeria server wait for 2 seconds by default to let the boss group gracefully finish all remaining + // tasks in the queue. + val _ = binding.stop() + () + } + ) + .map(_.activeLocalPort()) + } +} diff --git a/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/RouteMappingTest.scala b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/RouteMappingTest.scala new file mode 100644 index 0000000000..dc92ff63e1 --- /dev/null +++ b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/RouteMappingTest.scala @@ -0,0 +1,83 @@ +package sttp.tapir.server.armeria + +import io.circe.generic.auto._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import sttp.capabilities.armeria.ArmeriaStreams +import sttp.tapir.generic.auto._ +import sttp.tapir.json.circe._ +import sttp.tapir.tests.data.Person +import sttp.tapir.{CodecFormat, Schema, endpoint, fileBody, multipartBody, streamBody, stringBody, stringToPath} + +class RouteMappingTest extends AnyFunSuite with Matchers { + + test("unary - stringBody") { + val stringEndpoint = endpoint + .in("foo" / "bar") + .in(stringBody) + .out(stringBody) + val routesMap = RouteMapping.toRoute(stringEndpoint).toMap + val exchangeTypes = routesMap.values + exchangeTypes.forall(exchangeType => exchangeType == ExchangeType.Unary) shouldBe true + } + + test("unary - json") { + val stringEndpoint = endpoint + .in("foo" / "bar") + .in(jsonBody[Person]) + .out(jsonBody[Person]) + val routesMap = RouteMapping.toRoute(stringEndpoint).toMap + val exchangeTypes = routesMap.values + exchangeTypes.forall(exchangeType => exchangeType == ExchangeType.Unary) shouldBe true + } + + test("streaming - fileBody") { + val stringEndpoint = endpoint + .in("foo" / "bar") + .in(fileBody) + .out(fileBody) + val routesMap = RouteMapping.toRoute(stringEndpoint).toMap + val exchangeTypes = routesMap.values + exchangeTypes.forall(exchangeType => exchangeType == ExchangeType.BidiStreaming) shouldBe true + } + + test("streaming - multipart") { + val stringEndpoint = endpoint + .in("foo" / "bar") + .in(multipartBody) + .out(multipartBody) + val routesMap = RouteMapping.toRoute(stringEndpoint).toMap + val exchangeTypes = routesMap.values + exchangeTypes.forall(exchangeType => exchangeType == ExchangeType.BidiStreaming) shouldBe true + } + + test("streaming - publisher") { + val stringEndpoint = endpoint + .in("foo" / "bar") + .in(streamBody(ArmeriaStreams)(Schema.derived[List[Person]], CodecFormat.Json())) + .out(streamBody(ArmeriaStreams)(Schema.derived[List[Person]], CodecFormat.Json())) + val routesMap = RouteMapping.toRoute(stringEndpoint).toMap + val exchangeTypes = routesMap.values + exchangeTypes.forall(exchangeType => exchangeType == ExchangeType.BidiStreaming) shouldBe true + } + + test("mixed - string~file") { + val stringFileEndpoint = endpoint + .in("foo" / "bar") + .in(stringBody) + .out(fileBody) + val routesMap = RouteMapping.toRoute(stringFileEndpoint).toMap + val exchangeTypes = routesMap.values + exchangeTypes.forall(exchangeType => exchangeType == ExchangeType.ResponseStreaming) shouldBe true + } + + test("mixed - file~string") { + val fileStringEndpoint = endpoint + .in("foo" / "bar") + .in(fileBody) + .out(stringBody) + val routesMap = RouteMapping.toRoute(fileStringEndpoint).toMap + val exchangeTypes = routesMap.values + exchangeTypes.forall(exchangeType => exchangeType == ExchangeType.RequestStreaming) shouldBe true + } +} diff --git a/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerInterpreter.scala b/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerInterpreter.scala new file mode 100644 index 0000000000..32ef251599 --- /dev/null +++ b/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerInterpreter.scala @@ -0,0 +1,27 @@ +package sttp.tapir.server.armeria.zio + +import _root_.zio._ +import sttp.capabilities.zio.ZioStreams +import sttp.tapir.server.armeria.TapirService +import sttp.tapir.ztapir.ZServerEndpoint + +trait ArmeriaZioServerInterpreter[R] { + + def armeriaServerOptions: ArmeriaZioServerOptions[RIO[R, *]] + + def toService(serverEndpoints: ZServerEndpoint[R, ZioStreams])(implicit runtime: Runtime[R]): TapirService[ZioStreams, RIO[R, *]] = + toService(List(serverEndpoints)) + + def toService(serverEndpoints: List[ZServerEndpoint[R, ZioStreams]])(implicit runtime: Runtime[R]): TapirService[ZioStreams, RIO[R, *]] = + TapirZioService(serverEndpoints, armeriaServerOptions) +} + +object ArmeriaZioServerInterpreter { + def apply[R]( + serverOptions: ArmeriaZioServerOptions[RIO[R, *]] = ArmeriaZioServerOptions.default[R] + ): ArmeriaZioServerInterpreter[R] = { + new ArmeriaZioServerInterpreter[R] { + override def armeriaServerOptions: ArmeriaZioServerOptions[RIO[R, *]] = serverOptions + } + } +} diff --git a/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerOptions.scala b/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerOptions.scala new file mode 100644 index 0000000000..0a287c404e --- /dev/null +++ b/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerOptions.scala @@ -0,0 +1,73 @@ +package sttp.tapir.server.armeria.zio + +import _root_.zio.{RIO, Task, URIO} +import com.linecorp.armeria.common.CommonPools +import org.slf4j.{Logger, LoggerFactory} +import sttp.tapir.server.armeria.ArmeriaServerOptions +import sttp.tapir.server.interceptor.log.{DefaultServerLog, ServerLog} +import sttp.tapir.server.interceptor.{CustomInterceptors, Interceptor} +import sttp.tapir.{Defaults, TapirFile} + +import scala.util.control.NonFatal + +final case class ArmeriaZioServerOptions[F[_]]( + createFile: () => F[TapirFile], + deleteFile: TapirFile => F[Unit], + interceptors: List[Interceptor[F]] +) extends ArmeriaServerOptions[F] { + def prependInterceptor(i: Interceptor[F]): ArmeriaZioServerOptions[F] = + copy(interceptors = i :: interceptors) + def appendInterceptor(i: Interceptor[F]): ArmeriaZioServerOptions[F] = + copy(interceptors = interceptors :+ i) +} + +object ArmeriaZioServerOptions { + + /** Allows customising the interceptors used by the server interpreter. */ + def customInterceptors[R]: CustomInterceptors[RIO[R, *], ArmeriaZioServerOptions[RIO[R, *]]] = + CustomInterceptors( + createOptions = (ci: CustomInterceptors[RIO[R, *], ArmeriaZioServerOptions[RIO[R, *]]]) => { + ArmeriaZioServerOptions( + defaultCreateFile, + defaultDeleteFile, + ci.interceptors + ) + } + ).serverLog(defaultServerLog[R]) + + private val logger: Logger = LoggerFactory.getLogger(this.getClass.getPackage.getName) + + implicit def default[R]: ArmeriaZioServerOptions[RIO[R, *]] = customInterceptors.options + + def defaultCreateFile[R](): RIO[R, TapirFile] = blocking(Defaults.createTempFile()) + + def defaultDeleteFile[R](file: TapirFile): RIO[R, Unit] = blocking(Defaults.deleteFile()(file)) + + def defaultServerLog[R]: ServerLog[RIO[R, *]] = DefaultServerLog( + doLogWhenHandled = debugLog[R], + doLogAllDecodeFailures = debugLog[R], + doLogExceptions = (msg: String, ex: Throwable) => URIO.succeed { logger.warn(msg, ex) }, + noLog = URIO.unit + ) + + private def debugLog[R](msg: String, exOpt: Option[Throwable]): RIO[R, Unit] = + URIO.succeed(exOpt match { + case None => logger.debug(msg) + case Some(ex) => logger.debug(msg, ex) + }) + + private def blocking[R, T](body: => T): RIO[R, T] = { + Task.async { cb => + CommonPools + .blockingTaskExecutor() + .execute(() => { + try { + cb(Task.succeed(body)) + } catch { + case NonFatal(ex) => + cb(Task.fail(ex)) + } + }) + } + } +} diff --git a/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/RIOMonadAsyncError.scala b/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/RIOMonadAsyncError.scala new file mode 100644 index 0000000000..844160c5e0 --- /dev/null +++ b/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/RIOMonadAsyncError.scala @@ -0,0 +1,37 @@ +package sttp.tapir.server.armeria.zio + +import sttp.monad.{Canceler, MonadAsyncError} +import zio._ + +// Forked from sttp.client3.impl.zio.RIOMonadAsyncError +private class RIOMonadAsyncError[R] extends MonadAsyncError[RIO[R, *]] { + override def unit[T](t: T): RIO[R, T] = RIO.succeed(t) + + override def map[T, T2](fa: RIO[R, T])(f: T => T2): RIO[R, T2] = fa.map(f) + + override def flatMap[T, T2](fa: RIO[R, T])(f: T => RIO[R, T2]): RIO[R, T2] = + fa.flatMap(f) + + override def async[T](register: (Either[Throwable, T] => Unit) => Canceler): RIO[R, T] = + RIO.effectAsyncInterrupt { cb => + val canceler = register { + case Left(t) => cb(RIO.fail(t)) + case Right(t) => cb(RIO.succeed(t)) + } + + Left(UIO(canceler.cancel())) + } + + override def error[T](t: Throwable): RIO[R, T] = RIO.fail(t) + + override protected def handleWrappedError[T](rt: RIO[R, T])(h: PartialFunction[Throwable, RIO[R, T]]): RIO[R, T] = + rt.catchSome(h) + + override def eval[T](t: => T): RIO[R, T] = RIO.effect(t) + + override def suspend[T](t: => RIO[R, T]): RIO[R, T] = RIO.effectSuspend(t) + + override def flatten[T](ffa: RIO[R, RIO[R, T]]): RIO[R, T] = ffa.flatten + + override def ensure[T](f: RIO[R, T], e: => RIO[R, Unit]): RIO[R, T] = f.ensuring(e.catchAll(_ => ZIO.unit)) +} diff --git a/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/TapirZioService.scala b/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/TapirZioService.scala new file mode 100644 index 0000000000..ddae756dcc --- /dev/null +++ b/server/armeria-server/zio/src/main/scala/sttp/tapir/server/armeria/zio/TapirZioService.scala @@ -0,0 +1,91 @@ +package sttp.tapir.server.armeria.zio + +import _root_.zio._ +import _root_.zio.interop.reactivestreams._ +import _root_.zio.stream.Stream +import com.linecorp.armeria.common.{HttpData, HttpRequest, HttpResponse} +import com.linecorp.armeria.server.ServiceRequestContext +import java.util.concurrent.CompletableFuture +import org.reactivestreams.Publisher +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} +import sttp.capabilities.zio.ZioStreams +import sttp.monad.{Canceler, MonadAsyncError} +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.armeria._ +import sttp.tapir.server.interpreter.ServerInterpreter +import sttp.tapir.ztapir.RIOMonadError + +private[zio] final case class TapirZioService[R]( + serverEndpoints: List[ServerEndpoint[ZioStreams, RIO[R, *]]], + armeriaServerOptions: ArmeriaZioServerOptions[RIO[R, *]] +)(implicit runtime: Runtime[R]) + extends TapirService[ZioStreams, RIO[R, *]] { + + private[this] implicit val monad: RIOMonadAsyncError[R] = new RIOMonadAsyncError() + private[this] implicit val bodyListener: ArmeriaBodyListener[RIO[R, *]] = new ArmeriaBodyListener + + private[this] val zioStreamCompatible: StreamCompatible[ZioStreams] = ZioStreamCompatible(runtime) + private[this] val interpreter: ServerInterpreter[ZioStreams, RIO[R, *], ArmeriaResponseType, ZioStreams] = + new ServerInterpreter[ZioStreams, RIO[R, *], ArmeriaResponseType, ZioStreams]( + serverEndpoints, + new ArmeriaToResponseBody(zioStreamCompatible), + armeriaServerOptions.interceptors, + armeriaServerOptions.deleteFile + ) + + override def serve(ctx: ServiceRequestContext, req: HttpRequest): HttpResponse = { + implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(ctx.eventLoop()) + implicit val rioFutureConversion: RioFutureConversion[R] = new RioFutureConversion[R] + + val serverRequest = new ArmeriaServerRequest(ctx) + val requestBody = new ArmeriaRequestBody(ctx, armeriaServerOptions, zioStreamCompatible) + val future = new CompletableFuture[HttpResponse]() + val result = interpreter(serverRequest, requestBody).map(ResultMapping.toArmeria) + + val cancellable = runtime.unsafeRunToFuture(result) + cancellable.future.onComplete { + case Failure(exception) => + future.completeExceptionally(exception) + case Success(value) => + future.complete(value) + } + + val httpResponse = HttpResponse.from(future) + httpResponse + .whenComplete() + .asInstanceOf[CompletableFuture[Unit]] + .exceptionally { case (_: Throwable) => + cancellable.cancel() + () + } + httpResponse + } +} + +private object ZioStreamCompatible { + def apply(runtime: Runtime[Any]): StreamCompatible[ZioStreams] = { + new StreamCompatible[ZioStreams] { + override val streams: ZioStreams = ZioStreams + + override def asStreamMessage(stream: Stream[Throwable, Byte]): Publisher[HttpData] = + runtime.unsafeRun(stream.mapChunks(c => Chunk.single(HttpData.wrap(c.toArray))).toPublisher) + + override def fromArmeriaStream(publisher: Publisher[HttpData]): Stream[Throwable, Byte] = + publisher.toStream().mapConcatChunk(httpData => Chunk.fromArray(httpData.array())) + } + } +} + +private class RioFutureConversion[R](implicit ec: ExecutionContext, runtime: Runtime[R]) extends FutureConversion[RIO[R, *]] { + def from[T](f: => Future[T]): RIO[R, T] = { + RIO.effectAsync { cb => + f.onComplete { + case Failure(exception) => cb(Task.fail(exception)) + case Success(value) => cb(Task.succeed(value)) + } + } + } + + override def to[A](f: => RIO[R, A]): Future[A] = runtime.unsafeRunToFuture(f) +} diff --git a/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerTest.scala b/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerTest.scala new file mode 100644 index 0000000000..937c26ab3d --- /dev/null +++ b/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerTest.scala @@ -0,0 +1,24 @@ +package sttp.tapir.server.armeria.zio + +import cats.effect.{IO, Resource} +import sttp.capabilities.zio.ZioStreams +import sttp.monad.MonadError +import sttp.tapir.server.tests._ +import sttp.tapir.tests.{Test, TestSuite} +import sttp.tapir.ztapir.RIOMonadError +import zio.Task + +class ArmeriaZioServerTest extends TestSuite { + + override def tests: Resource[IO, List[Test]] = backendResource.map { backend => + + implicit val monadError: MonadError[Task] = new RIOMonadError + + val interpreter = new ArmeriaZioTestServerInterpreter() + val createServerTest = new DefaultCreateServerTest(backend, interpreter) + + new AllServerTests(createServerTest, interpreter, backend, basic = false).tests() ++ + new ServerBasicTests(createServerTest, interpreter, supportsUrlEncodedPathSegments = false).tests() ++ + new ServerStreamingTests(createServerTest, ZioStreams).tests() + } +} diff --git a/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioTestServerInterpreter.scala b/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioTestServerInterpreter.scala new file mode 100644 index 0000000000..1f94040ad8 --- /dev/null +++ b/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioTestServerInterpreter.scala @@ -0,0 +1,34 @@ +package sttp.tapir.server.armeria.zio + +import _root_.zio.{Runtime, Task} +import sttp.capabilities.zio.ZioStreams +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.armeria.{ArmeriaTestServerInterpreter, TapirService} +import sttp.tapir.server.interceptor.decodefailure.{DecodeFailureHandler, DefaultDecodeFailureHandler} +import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor + +class ArmeriaZioTestServerInterpreter extends ArmeriaTestServerInterpreter[ZioStreams, Task] { + import ArmeriaZioTestServerInterpreter._ + + override def route( + e: ServerEndpoint[ZioStreams, Task], + decodeFailureHandler: Option[DecodeFailureHandler], + metricsInterceptor: Option[MetricsRequestInterceptor[Task]] = None + ): TapirService[ZioStreams, Task] = { + val options: ArmeriaZioServerOptions[Task] = { + ArmeriaZioServerOptions.customInterceptors + .metricsInterceptor(metricsInterceptor) + .decodeFailureHandler(decodeFailureHandler.getOrElse(DefaultDecodeFailureHandler.default)) + .options + } + ArmeriaZioServerInterpreter(options).toService(e) + } + + override def route(es: List[ServerEndpoint[ZioStreams, Task]]): TapirService[ZioStreams, Task] = { + ArmeriaZioServerInterpreter[Any]().toService(es) + } +} + +object ArmeriaZioTestServerInterpreter { + implicit val runtime: Runtime[zio.ZEnv] = Runtime.default +} diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala index 8e5ea902d3..d7f23b1036 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala @@ -70,7 +70,7 @@ class ServerMultipartTests[F[_], ROUTE]( .send(backend) .map { r => r.code shouldBe StatusCode.Ok - if (partOtherHeaderSupport) r.body should include regex "X-Auth: Some\\(12Aa\\)" + if (partOtherHeaderSupport) r.body should include regex "((?i)X-Auth):[ ]?Some\\(12Aa\\)" r.body should include regex "name=\"data\"[\\s\\S]*oiram hcaep" } }, @@ -99,9 +99,9 @@ class ServerMultipartTests[F[_], ROUTE]( .map { r => r.code shouldBe StatusCode.Ok if (partOtherHeaderSupport) { - r.body should include("X-Auth: 12Aax") - r.body should include("X-Auth: 12Abx") - r.body should include("X-Auth: 12Acx") + r.body should include regex "((?i)X-Auth):[ ]?12Aax" + r.body should include regex "((?i)X-Auth):[ ]?12Abx" + r.body should include regex "((?i)X-Auth):[ ]?12Acx" } r.body should include("peach mario 1 result") r.body should include("peach mario 2 result") @@ -148,7 +148,7 @@ class ServerMultipartTests[F[_], ROUTE]( .send(backend) .map { r => r.code shouldBe StatusCode.Ok - r.body.toLowerCase() should include("content-type: text/html") + r.body.toLowerCase() should include regex "content-type:[ ]?text/html" } } ) diff --git a/tests/src/main/resources/logback.xml b/tests/src/main/resources/logback.xml index e6cee15ae7..e99b3520a8 100644 --- a/tests/src/main/resources/logback.xml +++ b/tests/src/main/resources/logback.xml @@ -9,4 +9,4 @@ - \ No newline at end of file +