-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor: ServerResponseHandler #1198
Changes from 4 commits
54040c0
fcecdc7
17f4372
bdc902b
4b5ea8e
c8a520a
c033742
2c99527
dd29f82
dcc7d63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,23 +4,19 @@ import io.netty.channel.ChannelHandler.Sharable | |||||
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} | ||||||
import io.netty.handler.codec.http._ | ||||||
import zhttp.http._ | ||||||
import zhttp.service.server.content.handlers.ServerResponseHandler | ||||||
import zhttp.service.server.{ServerTime, WebSocketUpgrade} | ||||||
import zhttp.service.server.WebSocketUpgrade | ||||||
import zio.{UIO, ZIO} | ||||||
|
||||||
import java.net.{InetAddress, InetSocketAddress} | ||||||
|
||||||
@Sharable | ||||||
private[zhttp] final case class Handler[R]( | ||||||
app: HttpApp[R, Throwable], | ||||||
runtime: HttpRuntime[R], | ||||||
config: Server.Config[R, Throwable], | ||||||
serverTimeGenerator: ServerTime, | ||||||
serverResponseWriter: ServerResponseWriter[R], | ||||||
) extends SimpleChannelInboundHandler[HttpObject](false) | ||||||
with WebSocketUpgrade[R] | ||||||
with ServerResponseHandler[R] { self => | ||||||
with WebSocketUpgrade[R] { self => | ||||||
|
||||||
override def channelRead0(ctx: Ctx, msg: HttpObject): Unit = { | ||||||
override def channelRead0(ctx: serverResponseWriter.Ctx, msg: HttpObject): Unit = { | ||||||
|
||||||
implicit val iCtx: ChannelHandlerContext = ctx | ||||||
msg match { | ||||||
|
@@ -56,7 +52,7 @@ private[zhttp] final case class Handler[R]( | |||||
) | ||||||
catch { | ||||||
case throwable: Throwable => | ||||||
writeResponse( | ||||||
serverResponseWriter.write( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This is more concise. |
||||||
Response | ||||||
.fromHttpError(HttpError.InternalServerError(cause = Some(throwable))) | ||||||
.withConnection(HeaderValues.close), | ||||||
|
@@ -100,7 +96,7 @@ private[zhttp] final case class Handler[R]( | |||||
) | ||||||
catch { | ||||||
case throwable: Throwable => | ||||||
writeResponse( | ||||||
serverResponseWriter.write( | ||||||
Response | ||||||
.fromHttpError(HttpError.InternalServerError(cause = Some(throwable))) | ||||||
.withConnection(HeaderValues.close), | ||||||
|
@@ -130,7 +126,7 @@ private[zhttp] final case class Handler[R]( | |||||
jReq: HttpRequest, | ||||||
http: Http[R, Throwable, A, Response], | ||||||
a: A, | ||||||
)(implicit ctx: Ctx): Unit = { | ||||||
)(implicit ctx: serverResponseWriter.Ctx): Unit = { | ||||||
http.execute(a) match { | ||||||
case HExit.Effect(resM) => | ||||||
unsafeRunZIO { | ||||||
|
@@ -139,20 +135,20 @@ private[zhttp] final case class Handler[R]( | |||||
cause.failureOrCause match { | ||||||
case Left(Some(cause)) => | ||||||
UIO { | ||||||
writeResponse( | ||||||
serverResponseWriter.write( | ||||||
Response.fromHttpError(HttpError.InternalServerError(cause = Some(cause))), | ||||||
jReq, | ||||||
) | ||||||
} | ||||||
case Left(None) => | ||||||
UIO { | ||||||
writeResponse(Response.status(Status.NotFound), jReq) | ||||||
serverResponseWriter.write(Response.status(Status.NotFound), jReq) | ||||||
} | ||||||
case Right(other) => | ||||||
other.dieOption match { | ||||||
case Some(defect) => | ||||||
UIO { | ||||||
writeResponse( | ||||||
serverResponseWriter.write( | ||||||
Response.fromHttpError(HttpError.InternalServerError(cause = Some(defect))), | ||||||
jReq, | ||||||
) | ||||||
|
@@ -166,7 +162,7 @@ private[zhttp] final case class Handler[R]( | |||||
else { | ||||||
for { | ||||||
_ <- ZIO { | ||||||
writeResponse(res, jReq) | ||||||
serverResponseWriter.write(res, jReq) | ||||||
} | ||||||
} yield () | ||||||
}, | ||||||
|
@@ -177,34 +173,38 @@ private[zhttp] final case class Handler[R]( | |||||
if (self.isWebSocket(res)) { | ||||||
self.upgradeToWebSocket(jReq, res) | ||||||
} else { | ||||||
writeResponse(res, jReq): Unit | ||||||
serverResponseWriter.write(res, jReq): Unit | ||||||
} | ||||||
|
||||||
case HExit.Failure(e) => | ||||||
writeResponse(Response.fromHttpError(HttpError.InternalServerError(cause = Some(e))), jReq): Unit | ||||||
serverResponseWriter.write( | ||||||
Response.fromHttpError(HttpError.InternalServerError(cause = Some(e))), | ||||||
jReq, | ||||||
): Unit | ||||||
|
||||||
case HExit.Die(e) => | ||||||
writeResponse(Response.fromHttpError(HttpError.InternalServerError(cause = Some(e))), jReq): Unit | ||||||
serverResponseWriter.write( | ||||||
Response.fromHttpError(HttpError.InternalServerError(cause = Some(e))), | ||||||
jReq, | ||||||
): Unit | ||||||
|
||||||
case HExit.Empty => | ||||||
writeResponse(Response.fromHttpError(HttpError.NotFound(Path(jReq.uri()))), jReq): Unit | ||||||
serverResponseWriter.write(Response.fromHttpError(HttpError.NotFound(Path(jReq.uri()))), jReq): Unit | ||||||
|
||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Executes program | ||||||
*/ | ||||||
private def unsafeRunZIO(program: ZIO[R, Throwable, Any])(implicit ctx: Ctx): Unit = | ||||||
rt.unsafeRun(ctx) { | ||||||
private def unsafeRunZIO(program: ZIO[R, Throwable, Any])(implicit ctx: serverResponseWriter.Ctx): Unit = | ||||||
serverResponseWriter.rt.unsafeRun(ctx) { | ||||||
program | ||||||
} | ||||||
|
||||||
override def serverTime: ServerTime = serverTimeGenerator | ||||||
override val runtime: HttpRuntime[R] = serverResponseWriter.rt | ||||||
|
||||||
override val rt: HttpRuntime[R] = runtime | ||||||
|
||||||
override def exceptionCaught(ctx: Ctx, cause: Throwable): Unit = { | ||||||
config.error.fold(super.exceptionCaught(ctx, cause))(f => runtime.unsafeRun(ctx)(f(cause))) | ||||||
override def exceptionCaught(ctx: serverResponseWriter.Ctx, cause: Throwable): Unit = { | ||||||
serverResponseWriter.config.error.fold(super.exceptionCaught(ctx, cause))(f => runtime.unsafeRun(ctx)(f(cause))) | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -249,7 +249,8 @@ object Server { | |||||
channelFactory <- ZManaged.access[ServerChannelFactory](_.get) | ||||||
eventLoopGroup <- ZManaged.access[EventLoopGroup](_.get) | ||||||
zExec <- HttpRuntime.sticky[R](eventLoopGroup).toManaged_ | ||||||
reqHandler = settings.app.compile(zExec, settings, ServerTime.make) | ||||||
handler = ServerResponseWriter(zExec, settings, ServerTime.make) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
reqHandler = settings.app.compile(handler) | ||||||
init = ServerChannelInitializer(zExec, settings, reqHandler) | ||||||
serverBootstrap = new ServerBootstrap().channelFactory(channelFactory).group(eventLoopGroup) | ||||||
chf <- ZManaged.effect(serverBootstrap.childHandler(init).bind(settings.address)) | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,23 @@ | ||
package zhttp.service.server.content.handlers | ||
package zhttp.service | ||
|
||
import io.netty.buffer.ByteBuf | ||
import io.netty.channel.{ChannelHandlerContext, DefaultFileRegion} | ||
import io.netty.handler.codec.http._ | ||
import zhttp.http.{HttpData, Response} | ||
import zhttp.service.server.ServerTime | ||
import zhttp.service.{ChannelFuture, HttpRuntime, Server} | ||
import zio.stream.ZStream | ||
import zio.{UIO, ZIO} | ||
|
||
import java.io.File | ||
|
||
private[zhttp] trait ServerResponseHandler[R] { | ||
private[zhttp] trait ServerResponseWriter[R] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can convert it to a |
||
type Ctx = ChannelHandlerContext | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can move There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also keep it package private. |
||
val rt: HttpRuntime[R] | ||
val config: Server.Config[R, Throwable] | ||
|
||
def serverTime: ServerTime | ||
|
||
def writeResponse(msg: Response, jReq: HttpRequest)(implicit ctx: Ctx): Unit = { | ||
def write(msg: Response, jReq: HttpRequest)(implicit ctx: Ctx): Unit = { | ||
ctx.write(encodeResponse(msg)) | ||
writeData(msg.data.asInstanceOf[HttpData.Complete], jReq) | ||
() | ||
|
@@ -126,3 +125,12 @@ private[zhttp] trait ServerResponseHandler[R] { | |
} yield () | ||
} | ||
} | ||
object ServerResponseWriter { | ||
def apply[R](runtime: HttpRuntime[R], conf: Server.Config[R, Throwable], st: ServerTime): ServerResponseWriter[R] = | ||
new ServerResponseWriter[R]() { | ||
override val rt: HttpRuntime[R] = runtime | ||
override val config: Server.Config[R, Throwable] = conf | ||
|
||
override def serverTime: ServerTime = st | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.