Skip to content

Commit

Permalink
Generic ServerSentEvents (#2952)
Browse files Browse the repository at this point in the history
  • Loading branch information
987Nabil committed Jul 30, 2024
1 parent fdfde8e commit 05a0e0b
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 29 deletions.
6 changes: 3 additions & 3 deletions docs/reference/response/response.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ The `Response.fromServerSentEvents` method creates a response with a stream of s

```scala
object Response {
def fromServerSentEvents(stream: ZStream[Any, Nothing, ServerSentEvent]): Response = ???
def fromServerSentEvents(stream: ZStream[Any, Nothing, ServerSentEvent[String]]): Response = ???
}
```

Expand All @@ -239,7 +239,7 @@ import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME

object ServerSentExample extends ZIOAppDefault {

val stream: ZStream[Any, Nothing, ServerSentEvent] =
val stream: ZStream[Any, Nothing, ServerSentEvent[String]] =
ZStream.repeatWithSchedule(
ServerSentEvent(ISO_LOCAL_TIME.format(LocalDateTime.now)),
Schedule.spaced(1.second),
Expand Down Expand Up @@ -353,4 +353,4 @@ Response.ok.addFlash(flash)

### Working with Headers

There are various methods to work with headers in `Response` which we have discussed in the [Headers](../headers/headers.md#headers-operations) page.
There are various methods to work with headers in `Response` which we have discussed in the [Headers](../headers/headers.md#headers-operations) page.
2 changes: 1 addition & 1 deletion zio-http-example/src/main/scala/example/SSEServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import zio.http._

object SSEServer extends ZIOAppDefault {

val stream: ZStream[Any, Nothing, ServerSentEvent] =
val stream: ZStream[Any, Nothing, ServerSentEvent[String]] =
ZStream.repeatWithSchedule(ServerSentEvent(ISO_LOCAL_TIME.format(LocalDateTime.now)), Schedule.spaced(1.second))

val app: Routes[Any, Response] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package example

import java.time.{Instant, LocalDateTime}

import zio._

import zio.stream.ZStream

import zio.schema.{DeriveSchema, Schema}

import zio.http._
import zio.http.codec.HttpCodec
import zio.http.endpoint.Endpoint
import zio.http.endpoint.EndpointMiddleware.None

object ServerSentEventAsJsonEndpoint extends ZIOAppDefault {
import HttpCodec._

case class Payload(timeStamp: Instant, message: String)

object Payload {
implicit val schema: Schema[Payload] = DeriveSchema.gen[Payload]
}

val stream: ZStream[Any, Nothing, ServerSentEvent[Payload]] =
ZStream.repeatWithSchedule(ServerSentEvent(Payload(Instant.now(), "message")), Schedule.spaced(1.second))

val sseEndpoint: Endpoint[Unit, Unit, ZNothing, ZStream[Any, Nothing, ServerSentEvent[Payload]], None] =
Endpoint(Method.GET / "sse").outStream[ServerSentEvent[Payload]]

val sseRoute = sseEndpoint.implementHandler(Handler.succeed(stream))

val routes: Routes[Any, Response] = sseRoute.toRoutes

override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = {
Server.serve(routes).provide(Server.default).exitCode
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import zio.http.endpoint.EndpointMiddleware.None
object ServerSentEventEndpoint extends ZIOAppDefault {
import HttpCodec._

val stream: ZStream[Any, Nothing, ServerSentEvent] =
val stream: ZStream[Any, Nothing, ServerSentEvent[String]] =
ZStream.repeatWithSchedule(ServerSentEvent(ISO_LOCAL_TIME.format(LocalDateTime.now)), Schedule.spaced(1.second))

val sseEndpoint: Endpoint[Unit, Unit, ZNothing, ZStream[Any, Nothing, ServerSentEvent], None] =
Endpoint(Method.GET / "sse").outStream[ServerSentEvent]
val sseEndpoint: Endpoint[Unit, Unit, ZNothing, ZStream[Any, Nothing, ServerSentEvent[String]], None] =
Endpoint(Method.GET / "sse").outStream[ServerSentEvent[String]]

val sseRoute = sseEndpoint.implementHandler(Handler.succeed(stream))

Expand Down
16 changes: 14 additions & 2 deletions zio-http/shared/src/main/scala/zio/http/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import zio._

import zio.stream.ZStream

import zio.schema.Schema
import zio.schema.codec.BinaryCodec

import zio.http.codec.internal.TextBinaryCodec
import zio.http.internal.HeaderOps
import zio.http.template.Html

Expand Down Expand Up @@ -180,8 +184,16 @@ object Response {
* @param data
* \- stream of data to be sent as Server Sent Events
*/
def fromServerSentEvents(data: ZStream[Any, Nothing, ServerSentEvent])(implicit trace: Trace): Response =
Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStreamChunked(data.map(_.encode)))
def fromServerSentEvents[T: Schema](data: ZStream[Any, Nothing, ServerSentEvent[T]])(implicit
trace: Trace,
): Response = {
val codec = ServerSentEvent.defaultBinaryCodec[T]
Response(
Status.Ok,
contentTypeEventStream,
Body.fromCharSequenceStreamChunked(data.map(codec.encode).map(_.asString)),
)
}

/**
* Creates a new response for the provided socket app
Expand Down
73 changes: 54 additions & 19 deletions zio-http/shared/src/main/scala/zio/http/ServerSentEvent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import zio._

import zio.stream.ZPipeline

import zio.schema.codec.{BinaryCodec, DecodeError}
import zio.schema.codec._
import zio.schema.{DeriveSchema, Schema}

import zio.http.codec.{BinaryCodecWithSchema, HttpContentCodec}
Expand All @@ -38,18 +38,16 @@ import zio.http.codec.{BinaryCodecWithSchema, HttpContentCodec}
* @param retry
* optional reconnection delay in milliseconds
*/
final case class ServerSentEvent(
data: String,
final case class ServerSentEvent[T](
data: T,
eventType: Option[String] = None,
id: Option[String] = None,
retry: Option[Int] = None,
) {

def encode: String = {
def encode(implicit binaryCodec: BinaryCodec[T]): String = {
val sb = new StringBuilder
data.linesIterator.foreach { line =>
sb.append("data: ").append(line).append('\n')
}
sb.append("data: ").append(binaryCodec.encode(data).asString)
eventType.foreach { et =>
sb.append("event: ").append(et.linesIterator.mkString(" ")).append('\n')
}
Expand All @@ -64,23 +62,60 @@ final case class ServerSentEvent(
}

object ServerSentEvent {
implicit lazy val schema: Schema[ServerSentEvent] = DeriveSchema.gen[ServerSentEvent]

implicit val contentCodec: HttpContentCodec[ServerSentEvent] = HttpContentCodec.from(
MediaType.text.`event-stream` -> BinaryCodecWithSchema.fromBinaryCodec(new BinaryCodec[ServerSentEvent] {
override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent] =
throw new UnsupportedOperationException("ServerSentEvent decoding is not yet supported.")
implicit def schema[T](implicit schema: Schema[T]): Schema[ServerSentEvent[T]] = DeriveSchema.gen[ServerSentEvent[T]]

implicit def defaultBinaryCodec[T](implicit schema: Schema[T]): BinaryCodec[ServerSentEvent[T]] =
defaultContentCodec(schema).defaultCodec

private def nextOption(lines: Iterator[String]): Option[String] =
if (lines.hasNext) Some(lines.next())
else None

implicit def binaryCodec[T](implicit binaryCodec: BinaryCodec[T]): BinaryCodec[ServerSentEvent[T]] =
new BinaryCodec[ServerSentEvent[T]] {
override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent[T]] = {
val lines = whole.asString.linesIterator
val data = lines.next().stripPrefix("data: ")
val eventType = nextOption(lines).map(_.stripPrefix("event: "))
val id = nextOption(lines).map(_.stripPrefix("id: "))
val retry = nextOption(lines).map(_.stripPrefix("retry: ").toInt)
val decoded = binaryCodec.decode(Chunk.fromArray(data.getBytes))
decoded.map(value => ServerSentEvent(value, eventType, id, retry))
}

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent] =
throw new UnsupportedOperationException("ServerSentEvent decoding is not yet supported.")
override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent[T]] =
ZPipeline.chunks[Byte].map(_.asString) >>> ZPipeline
.splitOn("\n\n")
.mapZIO(s => {
val lines = s.linesIterator
val data = lines.next().stripPrefix("data: ")
val eventType = nextOption(lines).map(_.stripPrefix("event: "))
val id = nextOption(lines).map(_.stripPrefix("id: "))
val retry = nextOption(lines).map(_.stripPrefix("retry: ").toInt)
val decoded = binaryCodec.decode(Chunk.fromArray(data.getBytes))
ZIO.fromEither(decoded.map(value => ServerSentEvent(value, eventType, id, retry)))
})

override def encode(value: ServerSentEvent): Chunk[Byte] =
Chunk.fromArray(value.encode.getBytes)
override def encode(value: ServerSentEvent[T]): Chunk[Byte] = Chunk.fromArray(value.encode.getBytes)

override def streamEncoder: ZPipeline[Any, Nothing, ServerSentEvent, Byte] =
override def streamEncoder: ZPipeline[Any, Nothing, ServerSentEvent[T], Byte] =
ZPipeline.mapChunks(value => value.flatMap(c => c.encode.getBytes))
}),
}

implicit def contentCodec[T](implicit
tCodec: BinaryCodec[T],
schema: Schema[T],
): HttpContentCodec[ServerSentEvent[T]] = HttpContentCodec.from(
MediaType.text.`event-stream` -> BinaryCodecWithSchema.fromBinaryCodec(binaryCodec),
)

def heartbeat: ServerSentEvent = new ServerSentEvent("")
implicit def defaultContentCodec[T](implicit
schema: Schema[T],
): HttpContentCodec[ServerSentEvent[T]] = {
if (schema.isInstanceOf[Schema.Primitive[_]]) contentCodec(HttpContentCodec.text.only[T].defaultCodec, schema)
else contentCodec(JsonCodec.schemaBasedBinaryCodec, schema)
}

def heartbeat: ServerSentEvent[String] = new ServerSentEvent("")
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object TextBinaryCodec {
)
}

def fromSchema[A](schema: Schema[A]): BinaryCodec[A] = {
implicit def fromSchema[A](implicit schema: Schema[A]): BinaryCodec[A] = {
schema match {
case enum0: Schema.Enum[_] => errorCodec(enum0)
case record: Schema.Record[_] => errorCodec(record)
Expand Down

0 comments on commit 05a0e0b

Please sign in to comment.