Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add methods which allow defining stream/websocket handling behavior taking into account the response metadata #784

Merged
merged 2 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[akkahttp] class BodyFromAkka()(implicit ec: ExecutionContext, mat: Mater
val (flow, wsFuture) = webSocketAndFlow(meta)
wsFlow.success(flow)
wsFuture.flatMap { ws =>
val result = f.asInstanceOf[WebSocket[Future] => Future[T]](ws)
val result = f.asInstanceOf[(WebSocket[Future], ResponseMetadata) => Future[T]](ws, meta)
result.onComplete(_ => ws.close())
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[asynchttpclient] trait BodyFromAHC[F[_], S] {
responseAs: WebSocketResponseAs[T, _],
meta: ResponseMetadata,
ws: WebSocket[F]
): F[T] = bodyFromWs(responseAs, ws)
): F[T] = bodyFromWs(responseAs, ws, meta)

override protected def cleanupWhenNotAWebSocket(
response: Publisher[ByteBuffer],
Expand All @@ -100,9 +100,10 @@ private[asynchttpclient] trait BodyFromAHC[F[_], S] {
isSubscribed: () => Boolean
): F[TT] = bodyFromResponseAs(isSubscribed)(responseAs, responseMetadata, response)

private def bodyFromWs[TT](r: WebSocketResponseAs[TT, _], ws: WebSocket[F]): F[TT] =
private def bodyFromWs[TT](r: WebSocketResponseAs[TT, _], ws: WebSocket[F], meta: ResponseMetadata): F[TT] =
r match {
case ResponseAsWebSocket(f) => f.asInstanceOf[WebSocket[F] => F[TT]](ws).ensure(ws.close())
case ResponseAsWebSocket(f) =>
f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[TT]].apply(ws, meta).ensure(ws.close())
case ResponseAsWebSocketUnsafe() => ws.unit.asInstanceOf[F[TT]]
case ResponseAsWebSocketStream(_, p) =>
compileWebSocketPipe(ws, p.asInstanceOf[streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame]])
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/sttp/client3/ResponseAs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ case object ResponseAsByteArray extends ResponseAs[Array[Byte], Any] {
// Path-dependent types are not supported in constructor arguments or the extends clause. Thus we cannot express the
// fact that `BinaryStream =:= s.BinaryStream`. We have to rely on correct construction via the companion object and
// perform typecasts when the request is deconstructed.
case class ResponseAsStream[F[_], T, Stream, S] private (s: Streams[S], f: Stream => F[T])
case class ResponseAsStream[F[_], T, Stream, S] private (s: Streams[S], f: (Stream, ResponseMetadata) => F[T])
extends ResponseAs[T, Effect[F] with S] {
override def show: String = "as stream"
}
object ResponseAsStream {
def apply[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[T, Effect[F] with S] =
def apply[F[_], T, S](s: Streams[S])(f: (s.BinaryStream, ResponseMetadata) => F[T]): ResponseAs[T, Effect[F] with S] =
new ResponseAsStream(s, f)
}

Expand All @@ -63,7 +63,7 @@ case class ResponseAsFile(output: SttpFile) extends ResponseAs[SttpFile, Any] {
}

sealed trait WebSocketResponseAs[T, -R] extends ResponseAs[T, R]
case class ResponseAsWebSocket[F[_], T](f: WebSocket[F] => F[T])
case class ResponseAsWebSocket[F[_], T](f: (WebSocket[F], ResponseMetadata) => F[T])
extends WebSocketResponseAs[T, Effect[F] with WebSockets] {
override def show: String = "as web socket"
}
Expand Down
21 changes: 20 additions & 1 deletion core/src/main/scala/sttp/client3/SttpApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,17 @@ trait SttpApi extends SttpExtensions with UriInterpolator {
def asStream[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[Either[String, T], Effect[F] with S] =
asEither(asStringAlways, asStreamAlways(s)(f))

def asStreamWithMetadata[F[_], T, S](s: Streams[S])(
f: (s.BinaryStream, ResponseMetadata) => F[T]
): ResponseAs[Either[String, T], Effect[F] with S] =
asEither(asStringAlways, asStreamAlwaysWithMetadata(s)(f))

def asStreamAlways[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[T, Effect[F] with S] =
ResponseAsStream(s)(f)
asStreamAlwaysWithMetadata(s)((s, _) => f(s))

def asStreamAlwaysWithMetadata[F[_], T, S](s: Streams[S])(
f: (s.BinaryStream, ResponseMetadata) => F[T]
): ResponseAs[T, Effect[F] with S] = ResponseAsStream(s)(f)

def asStreamUnsafe[S](s: Streams[S]): ResponseAs[Either[String, s.BinaryStream], S] =
asEither(asStringAlways, asStreamAlwaysUnsafe(s))
Expand All @@ -121,7 +130,17 @@ trait SttpApi extends SttpExtensions with UriInterpolator {
def asWebSocket[F[_], T](f: WebSocket[F] => F[T]): ResponseAs[Either[String, T], Effect[F] with WebSockets] =
asWebSocketEither(asStringAlways, asWebSocketAlways(f))

def asWebSocketWithMetadata[F[_], T](
f: (WebSocket[F], ResponseMetadata) => F[T]
): ResponseAs[Either[String, T], Effect[F] with WebSockets] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f))

def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]): ResponseAs[T, Effect[F] with WebSockets] =
asWebSocketAlwaysWithMetadata((w, _) => f(w))

def asWebSocketAlwaysWithMetadata[F[_], T](
f: (WebSocket[F], ResponseMetadata) => F[T]
): ResponseAs[T, Effect[F] with WebSockets] =
ResponseAsWebSocket(f)

def asWebSocketUnsafe[F[_]]: ResponseAs[Either[String, WebSocket[F]], Effect[F] with WebSockets] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class BodyFromResponseAs[F[_], RegularResponse, WSResponse, Stream](imp

case (ResponseAsStream(_, f), Left(regular)) =>
regularAsStream(regular).flatMap { case (stream, cancel) =>
f.asInstanceOf[Stream => F[T]](stream).map((_, nonReplayableBody)).ensure(cancel())
f.asInstanceOf[(Stream, ResponseMetadata) => F[T]](stream, meta).map((_, nonReplayableBody)).ensure(cancel())
}

case (ResponseAsStreamUnsafe(_), Left(regular)) =>
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/sttp/client3/monad/MapEffect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ object MapEffect {
case IgnoreResponse => IgnoreResponse
case ResponseAsByteArray => ResponseAsByteArray
case ResponseAsStream(s, f) =>
ResponseAsStream(s)(f.asInstanceOf[Any => F[Any]].andThen(fk.apply(_)))
ResponseAsStream(s)((s, m) => fk(f.asInstanceOf[(Any, ResponseMetadata) => F[Any]](s, m)))
case rasu: ResponseAsStreamUnsafe[_, _] => rasu
case ResponseAsFile(output) => ResponseAsFile(output)
case ResponseAsWebSocket(f) =>
ResponseAsWebSocket((wg: WebSocket[G]) => fk(f.asInstanceOf[WebSocket[F] => F[Any]](apply[G, F](wg, gk, fm))))
ResponseAsWebSocket((wg: WebSocket[G], m: ResponseMetadata) =>
fk(f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[Any]](apply[G, F](wg, gk, fm), m))
)
case ResponseAsWebSocketUnsafe() => ResponseAsWebSocketUnsafe()
case ResponseAsWebSocketStream(s, p) =>
ResponseAsWebSocketStream(s, p)
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/sttp/client3/testing/SttpBackendStub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ object SttpBackendStub {
}
case ResponseAsStream(_, f) =>
b match {
case RawStream(s) => Some(f.asInstanceOf[Any => F[T]](s))
case RawStream(s) => Some(f.asInstanceOf[(Any, ResponseMetadata) => F[T]](s, meta))
case _ => None
}
case ResponseAsStreamUnsafe(_) =>
Expand All @@ -226,9 +226,11 @@ object SttpBackendStub {
}
case ResponseAsWebSocket(f) =>
b match {
case wss: WebSocketStub[_] => Some(f.asInstanceOf[WebSocket[F] => F[T]](wss.build[F](monad)))
case ws: WebSocket[_] => Some(f.asInstanceOf[WebSocket[F] => F[T]](ws.asInstanceOf[WebSocket[F]]))
case _ => None
case wss: WebSocketStub[_] =>
Some(f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[T]](wss.build[F](monad), meta))
case ws: WebSocket[_] =>
Some(f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[T]](ws.asInstanceOf[WebSocket[F]], meta))
case _ => None
}
case ResponseAsWebSocketUnsafe() =>
b match {
Expand Down
8 changes: 8 additions & 0 deletions docs/responses/body.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,17 @@ import sttp.client3._
def asStream[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]):
ResponseAs[Either[String, T], Effect[F] with S] = ???

def asStreamWithMetadata[F[_], T, S](s: Streams[S])(
f: (s.BinaryStream, ResponseMetadata) => F[T]
): ResponseAs[Either[String, T], Effect[F] with S] = ???

def asStreamAlways[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]):
ResponseAs[T, Effect[F] with S] = ???

def asStreamAlwaysWithMetadata[F[_], T, S](s: Streams[S])(
f: (s.BinaryStream, ResponseMetadata) => F[T]
): ResponseAs[T, Effect[F] with S] = ???

def asStreamUnsafe[S](s: Streams[S]):
ResponseAs[Either[String, s.BinaryStream], S] = ???

Expand Down
8 changes: 8 additions & 0 deletions docs/websockets.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@ import sttp.ws.WebSocket
def asWebSocket[F[_], T](f: WebSocket[F] => F[T]):
ResponseAs[Either[String, T], Effect[F] with WebSockets] = ???

def asWebSocketWithMetadata[F[_], T](
f: (WebSocket[F], ResponseMetadata) => F[T]
): ResponseAs[Either[String, T], Effect[F] with WebSockets] = ???

def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]):
ResponseAs[T, Effect[F] with WebSockets] = ???

def asWebSocketAlwaysWithMetadata[F[_], T](
f: (WebSocket[F], ResponseMetadata) => F[T]
): ResponseAs[T, Effect[F] with WebSockets] = ???

def asWebSocketUnsafe[F[_]]:
ResponseAs[Either[String, WebSocket[F]], Effect[F] with WebSockets] = ???

Expand Down
2 changes: 1 addition & 1 deletion generated-docs/out/backends/wrappers/opentracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Add following dependency:
```
libraryDependencies += "io.opentracing.brave" % "brave-opentracing" % "0.37.5"
// and for integrationw with okHttp:
libraryDependencies += "io.zipkin.reporter2" % "zipkin-sender-okhttp3" % "2.16.1"
libraryDependencies += "io.zipkin.reporter2" % "zipkin-sender-okhttp3" % "2.16.2"
```

Create an instance of tracer:
Expand Down
8 changes: 8 additions & 0 deletions generated-docs/out/responses/body.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,17 @@ import sttp.client3._
def asStream[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]):
ResponseAs[Either[String, T], Effect[F] with S] = ???

def asStreamWithMetadata[F[_], T, S](s: Streams[S])(
f: (s.BinaryStream, ResponseMetadata) => F[T]
): ResponseAs[Either[String, T], Effect[F] with S] = ???

def asStreamAlways[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]):
ResponseAs[T, Effect[F] with S] = ???

def asStreamAlwaysWithMetadata[F[_], T, S](s: Streams[S])(
f: (s.BinaryStream, ResponseMetadata) => F[T]
): ResponseAs[T, Effect[F] with S] = ???

def asStreamUnsafe[S](s: Streams[S]):
ResponseAs[Either[String, s.BinaryStream], S] = ???

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[fs2] class Fs2BodyFromHttpClient[F[_]: ConcurrentEffect: ContextShift](b
responseAs: WebSocketResponseAs[T, _],
meta: ResponseMetadata,
ws: WebSocket[F]
): F[T] = bodyFromWs(responseAs, ws)
): F[T] = bodyFromWs(responseAs, ws, meta)

override protected def cleanupWhenNotAWebSocket(
response: Stream[F, Byte],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ trait MonixBodyFromHttpClient extends BodyFromHttpClient[Task, MonixStreams, Mon
responseAs: WebSocketResponseAs[T, _],
meta: ResponseMetadata,
ws: WebSocket[Task]
): Task[T] = bodyFromWs(responseAs, ws)
): Task[T] = bodyFromWs(responseAs, ws, meta)

override protected def cleanupWhenNotAWebSocket(
response: Observable[Array[Byte]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ private[httpclient] trait BodyFromHttpClient[F[_], S, B] {

protected def bodyFromResponseAs: BodyFromResponseAs[F, B, WebSocket[F], streams.BinaryStream]

protected def bodyFromWs[T](r: WebSocketResponseAs[T, _], ws: WebSocket[F]): F[T] =
protected def bodyFromWs[T](r: WebSocketResponseAs[T, _], ws: WebSocket[F], meta: ResponseMetadata): F[T] =
r match {
case ResponseAsWebSocket(f) => f.asInstanceOf[WebSocket[F] => F[T]](ws).ensure(ws.close())
case ResponseAsWebSocket(f) =>
f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[T]](ws, meta).ensure(ws.close())
case ResponseAsWebSocketUnsafe() => ws.unit.asInstanceOf[F[T]]
case ResponseAsWebSocketStream(_, p) =>
compileWebSocketPipe(ws, p.asInstanceOf[streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ import sttp.client3.{
Identity,
Request,
Response,
ResponseAs,
ResponseMetadata,
SttpBackend,
SttpBackendOptions,
SttpClientException,
WebSocketResponseAs
SttpClientException
}
import sttp.monad.MonadError
import sttp.ws.{WebSocket, WebSocketFrame}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait InputStreamBodyFromHttpClient[F[_], S] extends BodyFromHttpClient[F, S, In
responseAs: WebSocketResponseAs[T, _],
meta: ResponseMetadata,
ws: WebSocket[F]
): F[T] = bodyFromWs(responseAs, ws)
): F[T] = bodyFromWs(responseAs, ws, meta)

override protected def cleanupWhenNotAWebSocket(response: InputStream, e: NotAWebSocketException): F[Unit] =
monad.eval(response.close())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[zio] class ZioBodyFromHttpClient extends BodyFromHttpClient[Task, ZioStr
responseAs: WebSocketResponseAs[T, _],
meta: ResponseMetadata,
ws: WebSocket[Task]
): Task[T] = bodyFromWs(responseAs, ws)
): Task[T] = bodyFromWs(responseAs, ws, meta)

override protected def cleanupWhenNotAWebSocket(
response: ZStream[Any, Throwable, Byte],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ private[okhttp] trait BodyFromOkHttp[F[_], S] {

def responseBodyToStream(inputStream: InputStream): streams.BinaryStream

private def fromWs[TT](r: WebSocketResponseAs[TT, _], ws: WebSocket[F]): F[TT] =
private def fromWs[TT](r: WebSocketResponseAs[TT, _], ws: WebSocket[F], meta: ResponseMetadata): F[TT] =
r match {
case ResponseAsWebSocket(f) => f.asInstanceOf[WebSocket[F] => F[TT]](ws).ensure(ws.close())
case ResponseAsWebSocket(f) =>
f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[TT]](ws, meta).ensure(ws.close())
case ResponseAsWebSocketUnsafe() => ws.unit.asInstanceOf[F[TT]]
case ResponseAsWebSocketStream(_, p) =>
compileWebSocketPipe(ws, p.asInstanceOf[streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame]])
Expand Down Expand Up @@ -78,7 +79,7 @@ private[okhttp] trait BodyFromOkHttp[F[_], S] {
responseAs: WebSocketResponseAs[T, _],
meta: ResponseMetadata,
ws: WebSocket[F]
): F[T] = fromWs(responseAs, ws)
): F[T] = fromWs(responseAs, ws, meta)

override protected def cleanupWhenNotAWebSocket(response: InputStream, e: NotAWebSocketException): F[Unit] =
monad.eval(response.close())
Expand Down