Skip to content

Commit

Permalink
Fixes, media type override
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed May 5, 2023
1 parent b5852ac commit 307d8f6
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ private[cli] object CliEndpoint {
r <- fromInput(right)
} yield l ++ r

case HttpCodec.Content(schema, _) => fromSchema(schema)
case HttpCodec.ContentStream(schema, _) => fromSchema(schema)
case HttpCodec.Content(schema, _, _) => fromSchema(schema)
case HttpCodec.ContentStream(schema, _, _) => fromSchema(schema)
case HttpCodec.Empty => Set.empty
case HttpCodec.Fallback(left, right) => fromInput(left) ++ fromInput(right)
case HttpCodec.Halt => Set.empty
Expand Down
14 changes: 11 additions & 3 deletions zio-http/src/main/scala/zio/http/codec/ContentCodecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@ import zio.stream.ZStream

import zio.schema.Schema

import zio.http.MediaType

private[codec] trait ContentCodecs {
def content[A](implicit schema: Schema[A]): ContentCodec[A] =
HttpCodec.Content(schema)
HttpCodec.Content(schema, mediaType = None)

def content[A](mediaType: MediaType)(implicit schema: Schema[A]): ContentCodec[A] =
HttpCodec.Content(schema, mediaType = Some(mediaType))

def contentStream[A](implicit schema: Schema[A]): ContentCodec[ZStream[Any, Nothing, A]] =
HttpCodec.ContentStream(schema, mediaType = None)

def contentStream[A](implicit schema: Schema[A]): ContentCodec[ZStream[Any, Throwable, A]] =
HttpCodec.ContentStream(schema)
def contentStream[A](mediaType: MediaType)(implicit schema: Schema[A]): ContentCodec[ZStream[Any, Nothing, A]] =
HttpCodec.ContentStream(schema, mediaType = Some(mediaType))
}
13 changes: 7 additions & 6 deletions zio-http/src/main/scala/zio/http/codec/HttpCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ object HttpCodec
}

private[http] final case class Status[A](codec: SimpleCodec[zio.http.Status, A], index: Int = 0)
extends Atom[HttpCodecType.Status, A] {
extends Atom[HttpCodecType.Status, A] {
self =>
def erase: Status[Any] = self.asInstanceOf[Status[Any]]

Expand All @@ -503,27 +503,28 @@ object HttpCodec
def withIndex(index: Int): Status[A] = copy(index = index)
}
private[http] final case class Path[A](textCodec: TextCodec[A], name: Option[String], index: Int = 0)
extends Atom[HttpCodecType.Path, A] { self =>
extends Atom[HttpCodecType.Path, A] { self =>
def erase: Path[Any] = self.asInstanceOf[Path[Any]]

def tag: AtomTag = AtomTag.Path

def withIndex(index: Int): Path[A] = copy(index = index)
}
private[http] final case class Content[A](schema: Schema[A], index: Int = 0) extends Atom[HttpCodecType.Content, A] {
private[http] final case class Content[A](schema: Schema[A], mediaType: Option[MediaType], index: Int = 0)
extends Atom[HttpCodecType.Content, A] {
self =>
def tag: AtomTag = AtomTag.Content

def withIndex(index: Int): Content[A] = copy(index = index)
}
private[http] final case class ContentStream[A](schema: Schema[A], index: Int = 0)
extends Atom[HttpCodecType.Content, ZStream[Any, Throwable, A]] {
private[http] final case class ContentStream[A](schema: Schema[A], mediaType: Option[MediaType], index: Int = 0)
extends Atom[HttpCodecType.Content, ZStream[Any, Nothing, A]] {
def tag: AtomTag = AtomTag.Content

def withIndex(index: Int): ContentStream[A] = copy(index = index)
}
private[http] final case class Query[A](name: String, textCodec: TextCodec[A], index: Int = 0)
extends Atom[HttpCodecType.Query, A] {
extends Atom[HttpCodecType.Query, A] {
self =>
def erase: Query[Any] = self.asInstanceOf[Query[Any]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ private[http] final case class AtomizedCodecs(
status: Chunk[SimpleCodec[zio.http.Status, _]],
) { self =>
def append(atom: Atom[_, _]): AtomizedCodecs = atom match {
case path0: Path[_] => self.copy(path = path :+ path0.textCodec)
case method0: Method[_] => self.copy(method = method :+ method0.codec)
case query0: Query[_] => self.copy(query = query :+ query0)
case header0: Header[_] => self.copy(header = header :+ header0)
case content0: Content[_] => self.copy(content = content :+ BodyCodec.Single(content0.schema))
case status0: Status[_] => self.copy(status = status :+ status0.codec)
case stream0: ContentStream[_] => self.copy(content = content :+ BodyCodec.Multiple(stream0.schema))
case path0: Path[_] => self.copy(path = path :+ path0.textCodec)
case method0: Method[_] => self.copy(method = method :+ method0.codec)
case query0: Query[_] => self.copy(query = query :+ query0)
case header0: Header[_] => self.copy(header = header :+ header0)
case content0: Content[_] => self.copy(content = content :+ BodyCodec.Single(content0.schema, content0.mediaType))
case status0: Status[_] => self.copy(status = status :+ status0.codec)
case stream0: ContentStream[_] =>
self.copy(content = content :+ BodyCodec.Multiple(stream0.schema, stream0.mediaType))
}

def makeInputsBuilder(): Mechanic.InputsBuilder = {
Expand Down
17 changes: 14 additions & 3 deletions zio-http/src/main/scala/zio/http/codec/internal/BodyCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import zio.stream.ZStream
import zio.schema._
import zio.schema.codec.BinaryCodec

import zio.http.Body
import zio.http.{Body, MediaType}

/**
* A BodyCodec encapsulates the logic necessary to both encode and decode bodies
Expand Down Expand Up @@ -57,6 +57,14 @@ private[internal] sealed trait BodyCodec[A] { self =>
* The schema associated with the element type.
*/
def schema: Schema[Element]

/**
* Allows customizing the media type.
*
* The default is application/json for arbitrary types and
* application/octet-stream for byte streams
*/
def mediaType: Option[MediaType]
}
private[internal] object BodyCodec {
case object Empty extends BodyCodec[Unit] {
Expand All @@ -67,9 +75,11 @@ private[internal] object BodyCodec {
def encodeToBody(value: Unit, codec: BinaryCodec[Unit]): Body = Body.empty

def schema: Schema[Unit] = Schema[Unit]

def mediaType: Option[MediaType] = None
}

final case class Single[A](schema: Schema[A]) extends BodyCodec[A] {
final case class Single[A](schema: Schema[A], mediaType: Option[MediaType]) extends BodyCodec[A] {
def decodeFromBody(body: Body, codec: BinaryCodec[A]): IO[Throwable, A] = {
if (schema == Schema[Unit]) ZIO.unit.asInstanceOf[IO[Throwable, A]]
else
Expand All @@ -84,7 +94,8 @@ private[internal] object BodyCodec {
type Element = A
}

final case class Multiple[E](schema: Schema[E]) extends BodyCodec[ZStream[Any, Nothing, E]] {
final case class Multiple[E](schema: Schema[E], mediaType: Option[MediaType])
extends BodyCodec[ZStream[Any, Nothing, E]] {
def decodeFromBody(body: Body, codec: BinaryCodec[E]): IO[Throwable, ZStream[Any, Nothing, E]] =
ZIO.succeed((body.asStream >>> codec.streamDecoder).orDie)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private[codec] object EncoderDecoder {

private def decodeBody(body: Body, inputs: Array[Any])(implicit trace: Trace): Task[Unit] = {
if (isByteStream(flattened.content)) {
ZIO.attempt(inputs(0) = body.asStream)
ZIO.attempt(inputs(0) = body.asStream.orDie)
} else if (jsonDecoders.length == 0) {
ZIO.unit
} else if (jsonDecoders.length == 1) {
Expand Down Expand Up @@ -334,22 +334,24 @@ private[codec] object EncoderDecoder {

private def encodeContentType(inputs: Array[Any]): Headers = {
if (isByteStream(flattened.content)) {
Headers(Header.ContentType(MediaType.application.`octet-stream`)) // TODO: customizable content type
val mediaType = flattened.content(0).mediaType.getOrElse(MediaType.application.`octet-stream`)
Headers(Header.ContentType(mediaType))
} else {
val _ = inputs // TODO: Support multiple content types
if (jsonEncoders.length == 0) Headers.empty
else if (jsonEncoders.length == 1) {
Headers(Header.ContentType(MediaType.application.json))
val mediaType = flattened.content(0).mediaType.getOrElse(MediaType.application.json)
Headers(Header.ContentType(mediaType))
} else throw new IllegalStateException("A request on a REST endpoint should have at most one body")
}
}

private def isByteStream(codecs: Chunk[BodyCodec[_]]): Boolean =
if (codecs.length == 1) {
codecs.headOption match {
case Some(BodyCodec.Multiple(schema)) =>
codecs(0) match {
case BodyCodec.Multiple(schema, _) =>
schema == Schema[Byte]
case _ => false
case _ => false
}
} else {
false
Expand Down
40 changes: 34 additions & 6 deletions zio-http/src/main/scala/zio/http/endpoint/Endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import zio.stream.ZStream

import zio.schema._

import zio.http.Status
import zio.http.codec._
import zio.http.endpoint.Endpoint.OutErrors
import zio.http.{MediaType, Status}

/**
* An [[zio.http.endpoint.Endpoint]] represents an API endpoint for the HTTP
Expand Down Expand Up @@ -142,7 +142,7 @@ final case class Endpoint[Input, Err, Output, Middleware <: EndpointMiddleware](
schema: Schema[Input2],
combiner: Combiner[Input, Input2],
): Endpoint[combiner.Out, Err, Output, Middleware] =
copy(input = input ++ HttpCodec.Content(schema))
copy(input = input ++ HttpCodec.content(schema))

/**
* Returns a new endpoint derived from this one, whose request must satisfy
Expand All @@ -158,7 +158,7 @@ final case class Endpoint[Input, Err, Output, Middleware <: EndpointMiddleware](
* of the specified typ
*/
def inStream[Input2: Schema](implicit
combiner: Combiner[Input, ZStream[Any, Throwable, Input2]],
combiner: Combiner[Input, ZStream[Any, Nothing, Input2]],
): Endpoint[combiner.Out, Err, Output, Middleware] =
Endpoint(
input = self.input ++ ContentCodec.contentStream[Input2],
Expand Down Expand Up @@ -196,7 +196,23 @@ final case class Endpoint[Input, Err, Output, Middleware <: EndpointMiddleware](
)(implicit alt: Alternator[Output, Output2]): Endpoint[Input, Err, alt.Out, Middleware] =
Endpoint(
input,
output = (self.output | HttpCodec.Content(implicitly[Schema[Output2]])) ++ StatusCodec.status(status),
output = (self.output | HttpCodec.content(implicitly[Schema[Output2]])) ++ StatusCodec.status(status),
error,
doc,
mw,
)

/**
* Returns a new endpoint derived from this one, whose output type is the
* specified type for the specified status code.
*/
def out[Output2: Schema](
status: Status,
mediaType: MediaType,
)(implicit alt: Alternator[Output, Output2]): Endpoint[Input, Err, alt.Out, Middleware] =
Endpoint(
input,
output = (self.output | HttpCodec.content(mediaType)(implicitly[Schema[Output2]])) ++ StatusCodec.status(status),
error,
doc,
mw,
Expand Down Expand Up @@ -230,7 +246,7 @@ final case class Endpoint[Input, Err, Output, Middleware <: EndpointMiddleware](
* of the specified type for the ok status code.
*/
def outStream[Output2: Schema](implicit
alt: Alternator[Output, ZStream[Any, Throwable, Output2]],
alt: Alternator[Output, ZStream[Any, Nothing, Output2]],
): Endpoint[Input, Err, alt.Out, Middleware] =
outStream[Output2](Status.Ok)

Expand All @@ -240,7 +256,7 @@ final case class Endpoint[Input, Err, Output, Middleware <: EndpointMiddleware](
*/
def outStream[Output2: Schema](
status: Status,
)(implicit alt: Alternator[Output, ZStream[Any, Throwable, Output2]]): Endpoint[Input, Err, alt.Out, Middleware] =
)(implicit alt: Alternator[Output, ZStream[Any, Nothing, Output2]]): Endpoint[Input, Err, alt.Out, Middleware] =
Endpoint(
input,
output = (self.output | ContentCodec.contentStream[Output2]) ++ StatusCodec.status(status),
Expand All @@ -249,6 +265,18 @@ final case class Endpoint[Input, Err, Output, Middleware <: EndpointMiddleware](
mw,
)

def outStream[Output2: Schema](
status: Status,
mediaType: MediaType,
)(implicit alt: Alternator[Output, ZStream[Any, Nothing, Output2]]): Endpoint[Input, Err, alt.Out, Middleware] =
Endpoint(
input,
output = (self.output | ContentCodec.contentStream[Output2](mediaType)) ++ StatusCodec.status(status),
error,
doc,
mw,
)

/**
* Returns a new endpoint with the specified path appended.
*/
Expand Down
28 changes: 27 additions & 1 deletion zio-http/src/test/scala/zio/http/endpoint/EndpointSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,32 @@ object EndpointSpec extends ZIOSpecDefault {
body == bytes,
)
},
test("responding with a byte stream, custom media type") {
for {
bytes <- Random.nextBytes(1024)
route =
Endpoint
.get(literal("test-byte-stream"))
.outStream[Byte](Status.Ok, MediaType.image.png)
.implement { _ =>
ZIO.succeed(ZStream.fromChunk(bytes).rechunk(16))
}
result <- route.toApp.runZIO(Request.get(URL.decode("/test-byte-stream").toOption.get)).exit
response <- result match {
case Exit.Success(value) => ZIO.succeed(value)
case Exit.Failure(cause) =>
cause.failureOrCause match {
case Left(Some(response)) => ZIO.succeed(response)
case Left(None) => ZIO.failCause(cause)
case Right(cause) => ZIO.failCause(cause)
}
}
body <- response.body.asChunk.orDie
} yield assertTrue(
response.header(ContentType) == Some(ContentType(MediaType.image.png)),
body == bytes,
)
},
test("request body as a byte stream") {
for {
bytes <- Random.nextBytes(1024)
Expand All @@ -465,7 +491,7 @@ object EndpointSpec extends ZIOSpecDefault {
.inStream[Byte]
.out[Long]
.implement { byteStream =>
byteStream.runCount.orDie
byteStream.runCount
}
result <- route.toApp
.runZIO(Request.post(Body.fromChunk(bytes), URL.decode("/test-byte-stream").toOption.get))
Expand Down

2 comments on commit 307d8f6

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 : Performance Benchmarks (SimpleEffectBenchmarkServer)

concurrency: 256
requests/sec: 562690

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 : Performance Benchmarks (PlainTextBenchmarkServer)

concurrency: 256
requests/sec: 877488

Please sign in to comment.