Skip to content

Commit

Permalink
Jetty 10 and friends
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker committed Jan 7, 2023
1 parent a4d40e4 commit cc5092a
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 39 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ lazy val root = project
.enablePlugins(NoPublishPlugin)
.aggregate(jettyServer, jettyClient)

val jettyVersion = "9.4.50.v20221201"
val jettyVersion = "10.0.13"
val http4sVersion = "0.23.17"
val http4sServletVersion = "0.23.13"
val http4sServletVersion = "0.24.0-M2"
val munitCatsEffectVersion = "1.0.7"
val slf4jVersion = "1.7.25"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import fs2._
import org.eclipse.jetty.client.HttpClient
import org.eclipse.jetty.client.api.{Request => JettyRequest}
import org.eclipse.jetty.http.{HttpVersion => JHttpVersion}
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.http4s.client.Client
import org.log4s.Logger
import org.log4s.getLogger
Expand All @@ -39,19 +38,19 @@ object JettyClient {

def resource[F[_]](
client: HttpClient = defaultHttpClient()
)(implicit F: Async[F]): Resource[F, Client[F]] = Dispatcher.parallel[F].flatMap { implicit D =>
)(implicit F: Async[F]): Resource[F, Client[F]] = Dispatcher.parallel[F].flatMap { dispatcher =>
val acquire = F
.pure(client)
.flatTap(client => F.delay(client.start()))
.map(client =>
Client[F] { req =>
Resource.suspend(F.async[Resource[F, Response[F]]] { cb =>
F.bracket(StreamRequestContentProvider()) { dcp =>
F.bracket(StreamRequestContent[F](dispatcher)) { dcp =>
(for {
jReq <- F.catchNonFatal(toJettyRequest(client, req, dcp))
rl <- ResponseListener(cb)
rl <- ResponseListener(cb, dispatcher)
_ <- F.delay(jReq.send(rl))
_ <- dcp.write(req)
_ <- dcp.write(req.body)
} yield Option.empty[F[Unit]]).recover { case e =>
cb(Left(e))
Option.empty[F[Unit]]
Expand All @@ -75,8 +74,7 @@ object JettyClient {
Stream.resource(resource(client))

def defaultHttpClient(): HttpClient = {
val sslCtxFactory = new SslContextFactory.Client();
val c = new HttpClient(sslCtxFactory)
val c = new HttpClient()
c.setFollowRedirects(false)
c.setDefaultRequestContentType(null)
c
Expand All @@ -85,7 +83,7 @@ object JettyClient {
private def toJettyRequest[F[_]](
client: HttpClient,
request: Request[F],
dcp: StreamRequestContentProvider[F],
dcp: StreamRequestContent[F],
): JettyRequest = {
val jReq = client
.newRequest(request.uri.toString)
Expand All @@ -98,9 +96,10 @@ object JettyClient {
case _ => JHttpVersion.HTTP_1_1
}
)

for (h <- request.headers.headers if h.isNameValid)
jReq.header(h.name.toString, h.value)
jReq.content(dcp)
jReq.headers { jettyHeaders =>
for (h <- request.headers.headers if h.isNameValid)
jettyHeaders.add(h.name.toString, h.value)
}
jReq.body(dcp)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import java.nio.ByteBuffer
private[jetty] final case class ResponseListener[F[_]](
queue: Queue[F, Option[Item]],
cb: Callback[Resource[F, Response[F]]],
)(implicit F: Async[F], D: Dispatcher[F])
dispatcher: Dispatcher[F],
)(implicit F: Async[F])
extends JettyResponse.Listener.Adapter {
import ResponseListener.logger

Expand Down Expand Up @@ -69,7 +70,9 @@ private[jetty] final case class ResponseListener[F[_]](
}
.leftMap { t => abort(t, response); t }

D.unsafeRunAndForget(F.delay(cb(r)).attempt.flatMap(loggingAsyncCallback[F, Unit](logger)))
dispatcher.unsafeRunAndForget(
F.delay(cb(r)).attempt.flatMap(loggingAsyncCallback[F, Unit](logger))
)
}

private def getHttpVersion(version: JHttpVersion): HttpVersion =
Expand Down Expand Up @@ -100,7 +103,7 @@ private[jetty] final case class ResponseListener[F[_]](
override def onFailure(response: JettyResponse, failure: Throwable): Unit =
if (responseSent) enqueue(Item.Raise(failure))(_ => F.unit)
else
D.unsafeRunAndForget(
dispatcher.unsafeRunAndForget(
F.delay(cb(Left(failure))).attempt.flatMap(loggingAsyncCallback[F, Unit](logger))
)

Expand All @@ -122,7 +125,7 @@ private[jetty] final case class ResponseListener[F[_]](
enqueue(Item.Done)(loggingAsyncCallback[F, Unit](logger))

private def enqueue(item: Item)(cb: Either[Throwable, Unit] => F[Unit]): Unit =
D.unsafeRunAndForget(queue.offer(item.some).attempt.flatMap(cb))
dispatcher.unsafeRunAndForget(queue.offer(item.some).attempt.flatMap(cb))
}

private[jetty] object ResponseListener {
Expand All @@ -138,9 +141,10 @@ private[jetty] object ResponseListener {
private val logger = getLogger

def apply[F[_]](
cb: Callback[Resource[F, Response[F]]]
)(implicit F: Async[F], D: Dispatcher[F]): F[ResponseListener[F]] =
cb: Callback[Resource[F, Response[F]]],
dispatcher: Dispatcher[F],
)(implicit F: Async[F]): F[ResponseListener[F]] =
Queue
.synchronous[F, Option[Item]]
.map(q => ResponseListener(q, cb))
.map(q => ResponseListener(q, cb, dispatcher))
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ import cats.effect._
import cats.effect.std._
import cats.syntax.all._
import fs2._
import org.eclipse.jetty.client.util.DeferredContentProvider
import org.eclipse.jetty.client.util.AsyncRequestContent
import org.eclipse.jetty.util.{Callback => JettyCallback}
import org.http4s.jetty.client.internal.loggingAsyncCallback
import org.log4s.getLogger

private[jetty] final case class StreamRequestContentProvider[F[_]](s: Semaphore[F])(implicit
F: Async[F],
D: Dispatcher[F],
) extends DeferredContentProvider {
import StreamRequestContentProvider.logger
private[jetty] class StreamRequestContent[F[_]] private (
s: Semaphore[F],
dispatcher: Dispatcher[F],
)(implicit
F: Async[F]
) extends AsyncRequestContent {
import StreamRequestContent.logger

def write(req: Request[F]): F[Unit] =
req.body.chunks
def write(body: Stream[F, Byte]): F[Unit] =
body.chunks
.through(pipe)
.compile
.drain
Expand All @@ -53,13 +55,15 @@ private[jetty] final case class StreamRequestContentProvider[F[_]](s: Semaphore[

private val callback: JettyCallback = new JettyCallback {
override def succeeded(): Unit =
D.unsafeRunAndForget(s.release.attempt.flatMap(loggingAsyncCallback[F, Unit](logger)))
dispatcher.unsafeRunAndForget(
s.release.attempt.flatMap(loggingAsyncCallback[F, Unit](logger))
)
}
}

private[jetty] object StreamRequestContentProvider {
private[jetty] object StreamRequestContent {
private val logger = getLogger

def apply[F[_]: Async: Dispatcher](): F[StreamRequestContentProvider[F]] =
Semaphore[F](1).map(StreamRequestContentProvider(_))
def apply[F[_]: Async](dispatcher: Dispatcher[F]): F[StreamRequestContent[F]] =
Semaphore[F](1).map(new StreamRequestContent(_, dispatcher))
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import java.util
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLParameters
import javax.servlet.DispatcherType
import javax.servlet.Filter
import javax.servlet.http.HttpFilter
import javax.servlet.http.HttpServlet
import scala.annotation.nowarn
import scala.collection.immutable
Expand Down Expand Up @@ -189,7 +189,7 @@ sealed class JettyBuilder[F[_]] private (
})

override def mountFilter(
filter: Filter,
filter: HttpFilter,
urlMapping: String,
name: Option[String],
dispatches: util.EnumSet[DispatcherType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[jetty] object JettyLifeCycle {
*/
private[this] def stopLifeCycle[F[_]](lifeCycle: LifeCycle)(implicit F: Async[F]): F[Unit] =
F.async_[Unit] { cb =>
lifeCycle.addLifeCycleListener(
lifeCycle.addEventListener(
new LifeCycle.Listener {
override def lifeCycleStopped(a: LifeCycle): Unit =
cb(Right(()))
Expand Down Expand Up @@ -96,7 +96,7 @@ private[jetty] object JettyLifeCycle {
*/
private[this] def startLifeCycle[F[_]](lifeCycle: LifeCycle)(implicit F: Async[F]): F[Unit] =
F.async_[Unit] { cb =>
lifeCycle.addLifeCycleListener(
lifeCycle.addEventListener(
new LifeCycle.Listener {
override def lifeCycleStarted(a: LifeCycle): Unit =
cb(Right(()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.eclipse.jetty.client.api.Request
import org.eclipse.jetty.client.api.Response
import org.eclipse.jetty.client.api.Result
import org.eclipse.jetty.client.util.BufferingResponseListener
import org.eclipse.jetty.client.util.StringContentProvider
import org.eclipse.jetty.client.util.StringRequestContent
import org.http4s.dsl.io._
import org.http4s.server.Server

Expand Down Expand Up @@ -94,7 +94,7 @@ class JettyServerSuite extends CatsEffectSuite {
val req = client()
.newRequest(s"http://127.0.0.1:${server.address.getPort}$path")
.method("POST")
.content(new StringContentProvider(body))
.body(new StringRequestContent(body))
fetchBody(req)
}

Expand Down

0 comments on commit cc5092a

Please sign in to comment.