diff --git a/akka-http-backend/src/main/scala/sttp/client3/akkahttp/BodyFromAkka.scala b/akka-http-backend/src/main/scala/sttp/client3/akkahttp/BodyFromAkka.scala index 89abb43684..c0e8f0666f 100644 --- a/akka-http-backend/src/main/scala/sttp/client3/akkahttp/BodyFromAkka.scala +++ b/akka-http-backend/src/main/scala/sttp/client3/akkahttp/BodyFromAkka.scala @@ -106,7 +106,7 @@ private[akkahttp] class BodyFromAkka()(implicit ec: ExecutionContext, mat: Mater val (flow, wsFuture) = webSocketAndFlow(meta) wsFlow.success(flow) wsFuture.flatMap { ws => - val result = f.asInstanceOf[WebSocket[Future] => Future[T]](ws) + val result = f.asInstanceOf[(WebSocket[Future], ResponseMetadata) => Future[T]](ws, meta) result.onComplete(_ => ws.close()) result } diff --git a/async-http-client-backend/src/main/scala/sttp/client3/asynchttpclient/BodyFromAHC.scala b/async-http-client-backend/src/main/scala/sttp/client3/asynchttpclient/BodyFromAHC.scala index 1a4f6dfd54..30a383c78b 100644 --- a/async-http-client-backend/src/main/scala/sttp/client3/asynchttpclient/BodyFromAHC.scala +++ b/async-http-client-backend/src/main/scala/sttp/client3/asynchttpclient/BodyFromAHC.scala @@ -82,7 +82,7 @@ private[asynchttpclient] trait BodyFromAHC[F[_], S] { responseAs: WebSocketResponseAs[T, _], meta: ResponseMetadata, ws: WebSocket[F] - ): F[T] = bodyFromWs(responseAs, ws) + ): F[T] = bodyFromWs(responseAs, ws, meta) override protected def cleanupWhenNotAWebSocket( response: Publisher[ByteBuffer], @@ -100,9 +100,10 @@ private[asynchttpclient] trait BodyFromAHC[F[_], S] { isSubscribed: () => Boolean ): F[TT] = bodyFromResponseAs(isSubscribed)(responseAs, responseMetadata, response) - private def bodyFromWs[TT](r: WebSocketResponseAs[TT, _], ws: WebSocket[F]): F[TT] = + private def bodyFromWs[TT](r: WebSocketResponseAs[TT, _], ws: WebSocket[F], meta: ResponseMetadata): F[TT] = r match { - case ResponseAsWebSocket(f) => f.asInstanceOf[WebSocket[F] => F[TT]](ws).ensure(ws.close()) + case ResponseAsWebSocket(f) => + f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[TT]].apply(ws, meta).ensure(ws.close()) case ResponseAsWebSocketUnsafe() => ws.unit.asInstanceOf[F[TT]] case ResponseAsWebSocketStream(_, p) => compileWebSocketPipe(ws, p.asInstanceOf[streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame]]) diff --git a/core/src/main/scala/sttp/client3/ResponseAs.scala b/core/src/main/scala/sttp/client3/ResponseAs.scala index 4e148369d3..c6efd565db 100644 --- a/core/src/main/scala/sttp/client3/ResponseAs.scala +++ b/core/src/main/scala/sttp/client3/ResponseAs.scala @@ -42,12 +42,12 @@ case object ResponseAsByteArray extends ResponseAs[Array[Byte], Any] { // Path-dependent types are not supported in constructor arguments or the extends clause. Thus we cannot express the // fact that `BinaryStream =:= s.BinaryStream`. We have to rely on correct construction via the companion object and // perform typecasts when the request is deconstructed. -case class ResponseAsStream[F[_], T, Stream, S] private (s: Streams[S], f: Stream => F[T]) +case class ResponseAsStream[F[_], T, Stream, S] private (s: Streams[S], f: (Stream, ResponseMetadata) => F[T]) extends ResponseAs[T, Effect[F] with S] { override def show: String = "as stream" } object ResponseAsStream { - def apply[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[T, Effect[F] with S] = + def apply[F[_], T, S](s: Streams[S])(f: (s.BinaryStream, ResponseMetadata) => F[T]): ResponseAs[T, Effect[F] with S] = new ResponseAsStream(s, f) } @@ -63,7 +63,7 @@ case class ResponseAsFile(output: SttpFile) extends ResponseAs[SttpFile, Any] { } sealed trait WebSocketResponseAs[T, -R] extends ResponseAs[T, R] -case class ResponseAsWebSocket[F[_], T](f: WebSocket[F] => F[T]) +case class ResponseAsWebSocket[F[_], T](f: (WebSocket[F], ResponseMetadata) => F[T]) extends WebSocketResponseAs[T, Effect[F] with WebSockets] { override def show: String = "as web socket" } diff --git a/core/src/main/scala/sttp/client3/SttpApi.scala b/core/src/main/scala/sttp/client3/SttpApi.scala index 248d211d4f..493c8556eb 100644 --- a/core/src/main/scala/sttp/client3/SttpApi.scala +++ b/core/src/main/scala/sttp/client3/SttpApi.scala @@ -107,8 +107,17 @@ trait SttpApi extends SttpExtensions with UriInterpolator { def asStream[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[Either[String, T], Effect[F] with S] = asEither(asStringAlways, asStreamAlways(s)(f)) + def asStreamWithMetadata[F[_], T, S](s: Streams[S])( + f: (s.BinaryStream, ResponseMetadata) => F[T] + ): ResponseAs[Either[String, T], Effect[F] with S] = + asEither(asStringAlways, asStreamAlwaysWithMetadata(s)(f)) + def asStreamAlways[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[T, Effect[F] with S] = - ResponseAsStream(s)(f) + asStreamAlwaysWithMetadata(s)((s, _) => f(s)) + + def asStreamAlwaysWithMetadata[F[_], T, S](s: Streams[S])( + f: (s.BinaryStream, ResponseMetadata) => F[T] + ): ResponseAs[T, Effect[F] with S] = ResponseAsStream(s)(f) def asStreamUnsafe[S](s: Streams[S]): ResponseAs[Either[String, s.BinaryStream], S] = asEither(asStringAlways, asStreamAlwaysUnsafe(s)) @@ -121,7 +130,17 @@ trait SttpApi extends SttpExtensions with UriInterpolator { def asWebSocket[F[_], T](f: WebSocket[F] => F[T]): ResponseAs[Either[String, T], Effect[F] with WebSockets] = asWebSocketEither(asStringAlways, asWebSocketAlways(f)) + def asWebSocketWithMetadata[F[_], T]( + f: (WebSocket[F], ResponseMetadata) => F[T] + ): ResponseAs[Either[String, T], Effect[F] with WebSockets] = + asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f)) + def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]): ResponseAs[T, Effect[F] with WebSockets] = + asWebSocketAlwaysWithMetadata((w, _) => f(w)) + + def asWebSocketAlwaysWithMetadata[F[_], T]( + f: (WebSocket[F], ResponseMetadata) => F[T] + ): ResponseAs[T, Effect[F] with WebSockets] = ResponseAsWebSocket(f) def asWebSocketUnsafe[F[_]]: ResponseAs[Either[String, WebSocket[F]], Effect[F] with WebSockets] = diff --git a/core/src/main/scala/sttp/client3/internal/BodyFromResponseAs.scala b/core/src/main/scala/sttp/client3/internal/BodyFromResponseAs.scala index 2581d71cf3..969470accb 100644 --- a/core/src/main/scala/sttp/client3/internal/BodyFromResponseAs.scala +++ b/core/src/main/scala/sttp/client3/internal/BodyFromResponseAs.scala @@ -46,7 +46,7 @@ abstract class BodyFromResponseAs[F[_], RegularResponse, WSResponse, Stream](imp case (ResponseAsStream(_, f), Left(regular)) => regularAsStream(regular).flatMap { case (stream, cancel) => - f.asInstanceOf[Stream => F[T]](stream).map((_, nonReplayableBody)).ensure(cancel()) + f.asInstanceOf[(Stream, ResponseMetadata) => F[T]](stream, meta).map((_, nonReplayableBody)).ensure(cancel()) } case (ResponseAsStreamUnsafe(_), Left(regular)) => diff --git a/core/src/main/scala/sttp/client3/monad/MapEffect.scala b/core/src/main/scala/sttp/client3/monad/MapEffect.scala index 687539b0ae..70dec367f9 100644 --- a/core/src/main/scala/sttp/client3/monad/MapEffect.scala +++ b/core/src/main/scala/sttp/client3/monad/MapEffect.scala @@ -72,11 +72,13 @@ object MapEffect { case IgnoreResponse => IgnoreResponse case ResponseAsByteArray => ResponseAsByteArray case ResponseAsStream(s, f) => - ResponseAsStream(s)(f.asInstanceOf[Any => F[Any]].andThen(fk.apply(_))) + ResponseAsStream(s)((s, m) => fk(f.asInstanceOf[(Any, ResponseMetadata) => F[Any]](s, m))) case rasu: ResponseAsStreamUnsafe[_, _] => rasu case ResponseAsFile(output) => ResponseAsFile(output) case ResponseAsWebSocket(f) => - ResponseAsWebSocket((wg: WebSocket[G]) => fk(f.asInstanceOf[WebSocket[F] => F[Any]](apply[G, F](wg, gk, fm)))) + ResponseAsWebSocket((wg: WebSocket[G], m: ResponseMetadata) => + fk(f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[Any]](apply[G, F](wg, gk, fm), m)) + ) case ResponseAsWebSocketUnsafe() => ResponseAsWebSocketUnsafe() case ResponseAsWebSocketStream(s, p) => ResponseAsWebSocketStream(s, p) diff --git a/core/src/main/scala/sttp/client3/testing/SttpBackendStub.scala b/core/src/main/scala/sttp/client3/testing/SttpBackendStub.scala index 4b92a37f01..0ff211d8d1 100644 --- a/core/src/main/scala/sttp/client3/testing/SttpBackendStub.scala +++ b/core/src/main/scala/sttp/client3/testing/SttpBackendStub.scala @@ -211,7 +211,7 @@ object SttpBackendStub { } case ResponseAsStream(_, f) => b match { - case RawStream(s) => Some(f.asInstanceOf[Any => F[T]](s)) + case RawStream(s) => Some(f.asInstanceOf[(Any, ResponseMetadata) => F[T]](s, meta)) case _ => None } case ResponseAsStreamUnsafe(_) => @@ -226,9 +226,11 @@ object SttpBackendStub { } case ResponseAsWebSocket(f) => b match { - case wss: WebSocketStub[_] => Some(f.asInstanceOf[WebSocket[F] => F[T]](wss.build[F](monad))) - case ws: WebSocket[_] => Some(f.asInstanceOf[WebSocket[F] => F[T]](ws.asInstanceOf[WebSocket[F]])) - case _ => None + case wss: WebSocketStub[_] => + Some(f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[T]](wss.build[F](monad), meta)) + case ws: WebSocket[_] => + Some(f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[T]](ws.asInstanceOf[WebSocket[F]], meta)) + case _ => None } case ResponseAsWebSocketUnsafe() => b match { diff --git a/docs/responses/body.md b/docs/responses/body.md index afb1501f3a..ef31efcf21 100644 --- a/docs/responses/body.md +++ b/docs/responses/body.md @@ -191,9 +191,17 @@ import sttp.client3._ def asStream[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[Either[String, T], Effect[F] with S] = ??? +def asStreamWithMetadata[F[_], T, S](s: Streams[S])( + f: (s.BinaryStream, ResponseMetadata) => F[T] + ): ResponseAs[Either[String, T], Effect[F] with S] = ??? + def asStreamAlways[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[T, Effect[F] with S] = ??? +def asStreamAlwaysWithMetadata[F[_], T, S](s: Streams[S])( + f: (s.BinaryStream, ResponseMetadata) => F[T] + ): ResponseAs[T, Effect[F] with S] = ??? + def asStreamUnsafe[S](s: Streams[S]): ResponseAs[Either[String, s.BinaryStream], S] = ??? diff --git a/docs/websockets.md b/docs/websockets.md index 01e464e392..2d99807543 100644 --- a/docs/websockets.md +++ b/docs/websockets.md @@ -23,9 +23,17 @@ import sttp.ws.WebSocket def asWebSocket[F[_], T](f: WebSocket[F] => F[T]): ResponseAs[Either[String, T], Effect[F] with WebSockets] = ??? +def asWebSocketWithMetadata[F[_], T]( + f: (WebSocket[F], ResponseMetadata) => F[T] + ): ResponseAs[Either[String, T], Effect[F] with WebSockets] = ??? + def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]): ResponseAs[T, Effect[F] with WebSockets] = ??? +def asWebSocketAlwaysWithMetadata[F[_], T]( + f: (WebSocket[F], ResponseMetadata) => F[T] + ): ResponseAs[T, Effect[F] with WebSockets] = ??? + def asWebSocketUnsafe[F[_]]: ResponseAs[Either[String, WebSocket[F]], Effect[F] with WebSockets] = ??? diff --git a/generated-docs/out/backends/wrappers/opentracing.md b/generated-docs/out/backends/wrappers/opentracing.md index e0162bc9f9..cd525d9d70 100644 --- a/generated-docs/out/backends/wrappers/opentracing.md +++ b/generated-docs/out/backends/wrappers/opentracing.md @@ -78,7 +78,7 @@ Add following dependency: ``` libraryDependencies += "io.opentracing.brave" % "brave-opentracing" % "0.37.5" // and for integrationw with okHttp: -libraryDependencies += "io.zipkin.reporter2" % "zipkin-sender-okhttp3" % "2.16.1" +libraryDependencies += "io.zipkin.reporter2" % "zipkin-sender-okhttp3" % "2.16.2" ``` Create an instance of tracer: diff --git a/generated-docs/out/responses/body.md b/generated-docs/out/responses/body.md index 9450ab09f2..e0ce2a3ab7 100644 --- a/generated-docs/out/responses/body.md +++ b/generated-docs/out/responses/body.md @@ -191,9 +191,17 @@ import sttp.client3._ def asStream[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[Either[String, T], Effect[F] with S] = ??? +def asStreamWithMetadata[F[_], T, S](s: Streams[S])( + f: (s.BinaryStream, ResponseMetadata) => F[T] + ): ResponseAs[Either[String, T], Effect[F] with S] = ??? + def asStreamAlways[F[_], T, S](s: Streams[S])(f: s.BinaryStream => F[T]): ResponseAs[T, Effect[F] with S] = ??? +def asStreamAlwaysWithMetadata[F[_], T, S](s: Streams[S])( + f: (s.BinaryStream, ResponseMetadata) => F[T] + ): ResponseAs[T, Effect[F] with S] = ??? + def asStreamUnsafe[S](s: Streams[S]): ResponseAs[Either[String, s.BinaryStream], S] = ??? diff --git a/httpclient-backend/fs2/src/main/scala/sttp/client3/httpclient/fs2/Fs2BodyFromHttpClient.scala b/httpclient-backend/fs2/src/main/scala/sttp/client3/httpclient/fs2/Fs2BodyFromHttpClient.scala index 9ce3313112..6510d70cbc 100644 --- a/httpclient-backend/fs2/src/main/scala/sttp/client3/httpclient/fs2/Fs2BodyFromHttpClient.scala +++ b/httpclient-backend/fs2/src/main/scala/sttp/client3/httpclient/fs2/Fs2BodyFromHttpClient.scala @@ -59,7 +59,7 @@ private[fs2] class Fs2BodyFromHttpClient[F[_]: ConcurrentEffect: ContextShift](b responseAs: WebSocketResponseAs[T, _], meta: ResponseMetadata, ws: WebSocket[F] - ): F[T] = bodyFromWs(responseAs, ws) + ): F[T] = bodyFromWs(responseAs, ws, meta) override protected def cleanupWhenNotAWebSocket( response: Stream[F, Byte], diff --git a/httpclient-backend/monix/src/main/scala/sttp/client3/httpclient/monix/MonixBodyFromHttpClient.scala b/httpclient-backend/monix/src/main/scala/sttp/client3/httpclient/monix/MonixBodyFromHttpClient.scala index 51f8d90f66..b79091c867 100644 --- a/httpclient-backend/monix/src/main/scala/sttp/client3/httpclient/monix/MonixBodyFromHttpClient.scala +++ b/httpclient-backend/monix/src/main/scala/sttp/client3/httpclient/monix/MonixBodyFromHttpClient.scala @@ -57,7 +57,7 @@ trait MonixBodyFromHttpClient extends BodyFromHttpClient[Task, MonixStreams, Mon responseAs: WebSocketResponseAs[T, _], meta: ResponseMetadata, ws: WebSocket[Task] - ): Task[T] = bodyFromWs(responseAs, ws) + ): Task[T] = bodyFromWs(responseAs, ws, meta) override protected def cleanupWhenNotAWebSocket( response: Observable[Array[Byte]], diff --git a/httpclient-backend/src/main/scala/sttp/client3/httpclient/BodyFromHttpClient.scala b/httpclient-backend/src/main/scala/sttp/client3/httpclient/BodyFromHttpClient.scala index b5eb4d628e..4345391520 100644 --- a/httpclient-backend/src/main/scala/sttp/client3/httpclient/BodyFromHttpClient.scala +++ b/httpclient-backend/src/main/scala/sttp/client3/httpclient/BodyFromHttpClient.scala @@ -27,9 +27,10 @@ private[httpclient] trait BodyFromHttpClient[F[_], S, B] { protected def bodyFromResponseAs: BodyFromResponseAs[F, B, WebSocket[F], streams.BinaryStream] - protected def bodyFromWs[T](r: WebSocketResponseAs[T, _], ws: WebSocket[F]): F[T] = + protected def bodyFromWs[T](r: WebSocketResponseAs[T, _], ws: WebSocket[F], meta: ResponseMetadata): F[T] = r match { - case ResponseAsWebSocket(f) => f.asInstanceOf[WebSocket[F] => F[T]](ws).ensure(ws.close()) + case ResponseAsWebSocket(f) => + f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[T]](ws, meta).ensure(ws.close()) case ResponseAsWebSocketUnsafe() => ws.unit.asInstanceOf[F[T]] case ResponseAsWebSocketStream(_, p) => compileWebSocketPipe(ws, p.asInstanceOf[streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame]]) diff --git a/httpclient-backend/src/main/scala/sttp/client3/httpclient/HttpClientSyncBackend.scala b/httpclient-backend/src/main/scala/sttp/client3/httpclient/HttpClientSyncBackend.scala index 19abe338e2..fd9d99b451 100644 --- a/httpclient-backend/src/main/scala/sttp/client3/httpclient/HttpClientSyncBackend.scala +++ b/httpclient-backend/src/main/scala/sttp/client3/httpclient/HttpClientSyncBackend.scala @@ -16,12 +16,9 @@ import sttp.client3.{ Identity, Request, Response, - ResponseAs, - ResponseMetadata, SttpBackend, SttpBackendOptions, - SttpClientException, - WebSocketResponseAs + SttpClientException } import sttp.monad.MonadError import sttp.ws.{WebSocket, WebSocketFrame} diff --git a/httpclient-backend/src/main/scala/sttp/client3/httpclient/InputStreamBodyFromHttpClient.scala b/httpclient-backend/src/main/scala/sttp/client3/httpclient/InputStreamBodyFromHttpClient.scala index cdbff158ff..f58d9bac97 100644 --- a/httpclient-backend/src/main/scala/sttp/client3/httpclient/InputStreamBodyFromHttpClient.scala +++ b/httpclient-backend/src/main/scala/sttp/client3/httpclient/InputStreamBodyFromHttpClient.scala @@ -45,7 +45,7 @@ trait InputStreamBodyFromHttpClient[F[_], S] extends BodyFromHttpClient[F, S, In responseAs: WebSocketResponseAs[T, _], meta: ResponseMetadata, ws: WebSocket[F] - ): F[T] = bodyFromWs(responseAs, ws) + ): F[T] = bodyFromWs(responseAs, ws, meta) override protected def cleanupWhenNotAWebSocket(response: InputStream, e: NotAWebSocketException): F[Unit] = monad.eval(response.close()) diff --git a/httpclient-backend/zio/src/main/scala/sttp/client3/httpclient/zio/ZioBodyFromHttpClient.scala b/httpclient-backend/zio/src/main/scala/sttp/client3/httpclient/zio/ZioBodyFromHttpClient.scala index af97c1f499..84030c4af0 100644 --- a/httpclient-backend/zio/src/main/scala/sttp/client3/httpclient/zio/ZioBodyFromHttpClient.scala +++ b/httpclient-backend/zio/src/main/scala/sttp/client3/httpclient/zio/ZioBodyFromHttpClient.scala @@ -86,7 +86,7 @@ private[zio] class ZioBodyFromHttpClient extends BodyFromHttpClient[Task, ZioStr responseAs: WebSocketResponseAs[T, _], meta: ResponseMetadata, ws: WebSocket[Task] - ): Task[T] = bodyFromWs(responseAs, ws) + ): Task[T] = bodyFromWs(responseAs, ws, meta) override protected def cleanupWhenNotAWebSocket( response: ZStream[Any, Throwable, Byte], diff --git a/okhttp-backend/src/main/scala/sttp/client3/okhttp/BodyFromOkHttp.scala b/okhttp-backend/src/main/scala/sttp/client3/okhttp/BodyFromOkHttp.scala index 302898a35a..914e90987f 100644 --- a/okhttp-backend/src/main/scala/sttp/client3/okhttp/BodyFromOkHttp.scala +++ b/okhttp-backend/src/main/scala/sttp/client3/okhttp/BodyFromOkHttp.scala @@ -25,9 +25,10 @@ private[okhttp] trait BodyFromOkHttp[F[_], S] { def responseBodyToStream(inputStream: InputStream): streams.BinaryStream - private def fromWs[TT](r: WebSocketResponseAs[TT, _], ws: WebSocket[F]): F[TT] = + private def fromWs[TT](r: WebSocketResponseAs[TT, _], ws: WebSocket[F], meta: ResponseMetadata): F[TT] = r match { - case ResponseAsWebSocket(f) => f.asInstanceOf[WebSocket[F] => F[TT]](ws).ensure(ws.close()) + case ResponseAsWebSocket(f) => + f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[TT]](ws, meta).ensure(ws.close()) case ResponseAsWebSocketUnsafe() => ws.unit.asInstanceOf[F[TT]] case ResponseAsWebSocketStream(_, p) => compileWebSocketPipe(ws, p.asInstanceOf[streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame]]) @@ -78,7 +79,7 @@ private[okhttp] trait BodyFromOkHttp[F[_], S] { responseAs: WebSocketResponseAs[T, _], meta: ResponseMetadata, ws: WebSocket[F] - ): F[T] = fromWs(responseAs, ws) + ): F[T] = fromWs(responseAs, ws, meta) override protected def cleanupWhenNotAWebSocket(response: InputStream, e: NotAWebSocketException): F[Unit] = monad.eval(response.close())