Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade all ZIO backends to comply with the latest version of ZIO #1431

Merged
merged 2 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package sttp.client3.armeria.zio

import _root_.zio.interop.reactivestreams.{
publisherToStream => publisherToZioStream,
streamToPublisher => zioStreamToPublisher
}
import com.linecorp.armeria.client.WebClient
import com.linecorp.armeria.common.HttpData
import com.linecorp.armeria.common.stream.StreamMessage
Expand All @@ -10,13 +14,8 @@ import sttp.client3.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage}
import sttp.client3.impl.zio.RIOMonadAsyncError
import sttp.client3.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions}
import sttp.monad.MonadAsyncError
import zio.{Chunk, Task}
import zio.stream.Stream
import _root_.zio._
import _root_.zio.interop.reactivestreams.{
publisherToStream => publisherToZioStream,
streamToPublisher => zioStreamToPublisher
}
import _root_.zio.{Chunk, Task, _}

private final class ArmeriaZioBackend(runtime: Runtime[Any], client: WebClient, closeFactory: Boolean)
extends AbstractArmeriaBackend[Task, ZioStreams](client, closeFactory, new RIOMonadAsyncError[Any]) {
Expand All @@ -31,7 +30,7 @@ private final class ArmeriaZioBackend(runtime: Runtime[Any], client: WebClient,
override implicit def monad: MonadAsyncError[Task] = new RIOMonadAsyncError[Any]

override def publisherToStream(streamMessage: StreamMessage[HttpData]): Stream[Throwable, Byte] =
streamMessage.toStream().mapConcatChunk(httpData => Chunk.fromArray(httpData.array()))
streamMessage.toZIOStream().mapConcatChunk(httpData => Chunk.fromArray(httpData.array()))
}

override protected def streamToPublisher(stream: Stream[Throwable, Byte]): Publisher[HttpData] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import sttp.client3.internal._
import sttp.client3.testing.ConvertToFuture
import sttp.client3.testing.streaming.StreamingTest
import sttp.model.sse.ServerSentEvent
import zio.stream.Stream
import zio.stream.{Stream, ZStream}
import zio.{Chunk, Task}

class ArmeriaZioStreamingTest extends StreamingTest[Task, ZioStreams] with ZioTestBase {
Expand All @@ -18,7 +18,7 @@ class ArmeriaZioStreamingTest extends StreamingTest[Task, ZioStreams] with ZioTe
override implicit val convertToFuture: ConvertToFuture[Task] = convertZioTaskToFuture

override def bodyProducer(arrays: Iterable[Array[Byte]]): Stream[Throwable, Byte] =
Stream.fromChunks(arrays.map(Chunk.fromArray).toSeq: _*)
ZStream.fromChunks(arrays.map(Chunk.fromArray).toSeq: _*)

override def bodyConsumer(stream: Stream[Throwable, Byte]): Task[String] =
stream.runCollect.map(bytes => new String(bytes.toArray, Utf8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,23 @@ class AsyncHttpClientZioBackend private (
override implicit val monad: MonadAsyncError[Task] = new RIOMonadAsyncError[Any]

override def publisherToStream(p: Publisher[ByteBuffer]): Stream[Throwable, Byte] =
p.toStream(bufferSize).mapConcatChunk(Chunk.fromByteBuffer(_))
p.toZIOStream(bufferSize).mapConcatChunk(Chunk.fromByteBuffer(_))

override def publisherToBytes(p: Publisher[ByteBuffer]): Task[Array[Byte]] =
p.toStream(bufferSize)
p.toZIOStream(bufferSize)
.runFold(immutable.Queue.empty[Array[Byte]])(enqueueBytes)
.map(concatBytes)

override def publisherToFile(p: Publisher[ByteBuffer], f: File): Task[Unit] = {
p.toStream(bufferSize)
p.toZIOStream(bufferSize)
.map(Chunk.fromByteBuffer)
.flattenChunks
.run(ZSink.fromOutputStream(new FileOutputStream(f)))
.unit
}

override def bytesToPublisher(b: Array[Byte]): Task[Publisher[ByteBuffer]] =
Stream.apply(ByteBuffer.wrap(b)).toPublisher
ZStream.apply(ByteBuffer.wrap(b)).toPublisher

override def fileToPublisher(f: File): Task[Publisher[ByteBuffer]] =
ZStream
Expand Down Expand Up @@ -114,7 +114,7 @@ object AsyncHttpClientZioBackend {
ZIO
.runtime[Any]
.flatMap(runtime =>
Task.attempt(
ZIO.attempt(
AsyncHttpClientZioBackend(
runtime,
AsyncHttpClientBackend.defaultClient(options),
Expand Down Expand Up @@ -147,7 +147,7 @@ object AsyncHttpClientZioBackend {
ZIO
.runtime[Any]
.flatMap(runtime =>
Task.attempt(
ZIO.attempt(
AsyncHttpClientZioBackend(
runtime,
new DefaultAsyncHttpClient(cfg),
Expand Down Expand Up @@ -184,7 +184,7 @@ object AsyncHttpClientZioBackend {
ZIO
.runtime[Any]
.flatMap(runtime =>
Task.attempt(
ZIO.attempt(
AsyncHttpClientZioBackend(
runtime,
AsyncHttpClientBackend.clientWithModifiedOptions(options, updateConfig),
Expand Down Expand Up @@ -234,7 +234,7 @@ object AsyncHttpClientZioBackend {
): Layer[Nothing, SttpClient] =
ZLayer.scoped(
ZIO.acquireRelease(
UIO.runtime.map((runtime: Runtime[Any]) =>
ZIO.runtime.map((runtime: Runtime[Any]) =>
usingClient(runtime, client, customizeRequest, webSocketBufferCapacity)
)
)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import sttp.client3.internal._
import sttp.model.sse.ServerSentEvent
import sttp.client3.testing.ConvertToFuture
import sttp.client3.testing.streaming.StreamingTest
import zio.stream.Stream
import zio.stream.{Stream, ZStream}
import zio.{Chunk, Task}

class AsyncHttpClientZioStreamingTest extends StreamingTest[Task, ZioStreams] with ZioTestBase {
Expand All @@ -18,7 +18,7 @@ class AsyncHttpClientZioStreamingTest extends StreamingTest[Task, ZioStreams] wi
override implicit val convertToFuture: ConvertToFuture[Task] = convertZioTaskToFuture

override def bodyProducer(arrays: Iterable[Array[Byte]]): Stream[Throwable, Byte] =
Stream.fromChunks(arrays.map(Chunk.fromArray).toSeq: _*)
ZStream.fromChunks(arrays.map(Chunk.fromArray).toSeq: _*)

override def bodyConsumer(stream: Stream[Throwable, Byte]): Task[String] =
stream.runCollect.map(bytes => new String(bytes.toArray, Utf8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ class AsyncHttpClientZioWebSocketTest extends AsyncHttpClientWebSocketTest[Task,
): ZioStreams.Pipe[WebSocketFrame.Data[_], WebSocketFrame] =
to.andThen(rest => ZStream(item) ++ rest)

override def concurrently[T](fs: List[() => Task[T]]): Task[List[T]] = Task.collectAllPar(fs.map(_()))
override def concurrently[T](fs: List[() => Task[T]]): Task[List[T]] = ZIO.collectAllPar(fs.map(_()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package sttp.client3.asynchttpclient.zio
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import sttp.capabilities
import sttp.client3.impl.zio.{RIOMonadAsyncError, ZioTestBase}
import sttp.client3._
import sttp.client3.impl.zio.{RIOMonadAsyncError, ZioTestBase}
import sttp.model.{Header, StatusCode}
import sttp.monad.MonadError
import zio.Task
import zio.{Task, ZIO}

class ZioFollowRedirectsBackendTest extends AsyncFlatSpec with Matchers with ZioTestBase {
it should "properly handle invalid redirect URIs" in {
val stubBackend: SttpBackend[Task, Any] = new SttpBackend[Task, Any] {
override def send[T, R >: capabilities.Effect[Task]](request: Request[T, R]): Task[Response[T]] = {
Task.succeed(
ZIO.succeed(
if (request.uri.toString.contains("redirect"))
Response.ok("ok".asInstanceOf[T])
else
Expand All @@ -26,7 +26,7 @@ class ZioFollowRedirectsBackendTest extends AsyncFlatSpec with Matchers with Zio
)
}

override def close(): Task[Unit] = Task.succeed(())
override def close(): Task[Unit] = ZIO.unit
override def responseMonad: MonadError[Task] = new RIOMonadAsyncError[Any]
}

Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ val scalaTest = libraryDependencies ++= Seq("freespec", "funsuite", "flatspec",
)

val zio1Version = "1.0.14"
val zio2Version = "2.0.0-RC5"
val zio2Version = "2.0.0-RC6"
val zio1InteropRsVersion = "1.3.10"
val zio2InteropRsVersion = "2.0.0-RC6"
val zio2InteropRsVersion = "2.0.0-RC7"

val sttpModelVersion = "1.4.26"
val sttpSharedVersion = "1.3.5"
Expand Down Expand Up @@ -461,7 +461,7 @@ lazy val zio = (projectMatrix in file("effects/zio"))
"dev.zio" %% "zio" % zio2Version,
"com.softwaremill.sttp.shared" %% "zio" % sttpSharedVersion,
"dev.zio" %% "zio-interop-reactivestreams" % zio2InteropRsVersion,
"dev.zio" %% "zio-nio" % "2.0.0-RC6"
"dev.zio" %% "zio-nio" % "2.0.0-RC7"
)
)
.settings(testServerSettings)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
package sttp.client3.impl.zio

import sttp.monad.{Canceler, MonadAsyncError}
import zio.RIO
import zio.{RIO, ZIO}

class RIOMonadAsyncError[R] extends MonadAsyncError[RIO[R, *]] {
override def unit[T](t: T): RIO[R, T] = RIO.succeed(t)
override def unit[T](t: T): RIO[R, T] = ZIO.succeed(t)

override def map[T, T2](fa: RIO[R, T])(f: T => T2): RIO[R, T2] = fa.map(f)

override def flatMap[T, T2](fa: RIO[R, T])(f: T => RIO[R, T2]): RIO[R, T2] =
fa.flatMap(f)

override def async[T](register: (Either[Throwable, T] => Unit) => Canceler): RIO[R, T] =
RIO.asyncInterrupt { cb =>
ZIO.asyncInterrupt { cb =>
val canceler = register {
case Left(t) => cb(RIO.fail(t))
case Right(t) => cb(RIO.succeed(t))
case Left(t) => cb(ZIO.fail(t))
case Right(t) => cb(ZIO.succeed(t))
}

Left(RIO.succeed(canceler.cancel()))
Left(ZIO.succeed(canceler.cancel()))
}

override def error[T](t: Throwable): RIO[R, T] = RIO.fail(t)
override def error[T](t: Throwable): RIO[R, T] = ZIO.fail(t)

override protected def handleWrappedError[T](rt: RIO[R, T])(h: PartialFunction[Throwable, RIO[R, T]]): RIO[R, T] =
rt.catchSome(h)

override def eval[T](t: => T): RIO[R, T] = RIO.attempt(t)
override def eval[T](t: => T): RIO[R, T] = ZIO.attempt(t)

override def suspend[T](t: => RIO[R, T]): RIO[R, T] = RIO.suspend(t)
override def suspend[T](t: => RIO[R, T]): RIO[R, T] = ZIO.suspend(t)

override def flatten[T](ffa: RIO[R, RIO[R, T]]): RIO[R, T] = ffa.flatten

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import sttp.client3.testing.SttpBackendStub
import sttp.client3.{Request, Response, SttpBackend}
import sttp.model.StatusCode
import sttp.monad.MonadError
import zio.{RIO, Ref, Tag, UIO, URIO, ZEnvironment, ZLayer}
import zio.{RIO, Ref, Tag, UIO, URIO, ZEnvironment, ZIO, ZLayer}

trait SttpClientStubbingBase[R, P] {

Expand Down Expand Up @@ -64,7 +64,7 @@ trait SttpClientStubbingBase[R, P] {
private def whenRequest(
f: SttpBackendStub[RIO[R, *], P]#WhenRequest => SttpBackendStub[RIO[R, *], P]
): URIO[SttpClientStubbing, Unit] =
URIO.serviceWithZIO((_: SttpClientStubbing).update(stub => f(stub.whenRequestMatches(p))))
ZIO.serviceWithZIO((_: SttpClientStubbing).update(stub => f(stub.whenRequestMatches(p))))
}

val layer: ZLayer[Any, Nothing, SttpClientStubbing with SttpBackend[RIO[R, *], P]] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sttp.client3.impl.zio

import sttp.ws.{WebSocket, WebSocketClosed, WebSocketFrame}
import zio.stream.{Stream, ZStream}
import zio.stream.ZStream
import zio.{Ref, ZIO}

object ZioWebSockets {
Expand All @@ -10,16 +10,16 @@ object ZioWebSockets {
pipe: ZStream[R, Throwable, WebSocketFrame.Data[_]] => ZStream[R, Throwable, WebSocketFrame]
): ZIO[R, Throwable, Unit] =
Ref.make(true).flatMap { open =>
val onClose = Stream.fromZIO(open.set(false).map(_ => None: Option[WebSocketFrame.Data[_]]))
val onClose = ZStream.fromZIO(open.set(false).map(_ => None: Option[WebSocketFrame.Data[_]]))
pipe(
Stream
ZStream
.repeatZIO(ws.receive())
.flatMap {
case WebSocketFrame.Close(_, _) => onClose
case WebSocketFrame.Ping(payload) =>
Stream.fromZIO(ws.send(WebSocketFrame.Pong(payload))).flatMap(_ => Stream.empty)
case WebSocketFrame.Pong(_) => Stream.empty
case in: WebSocketFrame.Data[_] => Stream(Some(in))
ZStream.fromZIO(ws.send(WebSocketFrame.Pong(payload))).flatMap(_ => ZStream.empty)
case WebSocketFrame.Pong(_) => ZStream.empty
case in: WebSocketFrame.Data[_] => ZStream(Some(in))
}
.catchSome { case _: WebSocketClosed => onClose }
.collectWhileSome
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.http.{HttpClient, HttpRequest}
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.Flow.Publisher
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

class HttpClientZioBackend private (
client: HttpClient,
Expand All @@ -47,7 +47,7 @@ class HttpClientZioBackend private (
override protected def emptyBody(): ZStream[Any, Throwable, Byte] = ZStream.empty

override protected def publisherToBody(p: Publisher[util.List[ByteBuffer]]): ZStream[Any, Throwable, Byte] =
FlowAdapters.toPublisher(p).toStream().mapConcatChunk { list =>
FlowAdapters.toPublisher(p).toZIOStream().mapConcatChunk { list =>
val a = list.asScala.toList.flatMap(_.safeRead()).toArray
ByteArray(a, 0, a.length)
}
Expand Down Expand Up @@ -107,7 +107,7 @@ object HttpClientZioBackend {
customizeRequest: HttpRequest => HttpRequest = identity,
customEncodingHandler: ZioEncodingHandler = PartialFunction.empty
): Task[SttpBackend[Task, ZioStreams with WebSockets]] =
Task.attempt(
ZIO.attempt(
HttpClientZioBackend(
HttpClientBackend.defaultClient(options),
closeClient = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import sttp.monad.MonadError
import sttp.ws.{WebSocket, WebSocketFrame}
import zio.nio.channels.AsynchronousFileChannel
import zio.nio.file.Path
import zio.stream.{Stream, ZSink, ZStream}
import zio.stream.{ZSink, ZStream}
import zio.{Task, ZIO}

import java.io.IOException
Expand All @@ -38,11 +38,11 @@ private[zio] class ZioBodyFromHttpClient extends BodyFromHttpClient[Task, ZioStr
replayableBody: Either[Array[Byte], SttpFile]
): Task[ZStream[Any, Throwable, Byte]] = {
replayableBody match {
case Left(byteArray) => ZIO.succeed(Stream.fromIterable(byteArray))
case Left(byteArray) => ZIO.succeed(ZStream.fromIterable(byteArray))
case Right(file) =>
ZIO.succeed(
for {
fileChannel <- Stream.scoped(
fileChannel <- ZStream.scoped(
AsynchronousFileChannel.open(Path.fromJava(file.toPath), StandardOpenOption.READ)
)
bytes <- readAllBytes(fileChannel)
Expand Down Expand Up @@ -80,7 +80,7 @@ private[zio] class ZioBodyFromHttpClient extends BodyFromHttpClient[Task, ZioStr
override protected def regularAsStream(
response: ZStream[Any, Throwable, Byte]
): Task[(ZStream[Any, Throwable, Byte], () => Task[Unit])] =
Task.succeed((response, () => response.runDrain.catchAll(_ => ZIO.unit)))
ZIO.succeed((response, () => response.runDrain.catchAll(_ => ZIO.unit)))

override protected def handleWS[T](
responseAs: WebSocketResponseAs[T, _],
Expand All @@ -101,7 +101,7 @@ private[zio] class ZioBodyFromHttpClient extends BodyFromHttpClient[Task, ZioStr

private def readAllBytes(fileChannel: AsynchronousFileChannel) = {
val bufferSize = 4096
Stream.paginateChunkZIO(0L)(position =>
ZStream.paginateChunkZIO(0L)(position =>
fileChannel
.readChunk(bufferSize, position)
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HttpClientZioStreamingTest extends StreamingTest[Task, ZioStreams] with Zi
override implicit val convertToFuture: ConvertToFuture[Task] = convertZioTaskToFuture

override def bodyProducer(chunks: Iterable[Array[Byte]]): ZStream[Any, Throwable, Byte] =
Stream.fromChunks(chunks.map(Chunk.fromArray).toSeq: _*)
ZStream.fromChunks(chunks.map(Chunk.fromArray).toSeq: _*)

override def bodyConsumer(stream: ZStream[Any, Throwable, Byte]): Task[String] =
stream.runCollect.map(bytes => new String(bytes.toArray, Utf8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import sttp.client3.testing.ConvertToFuture
import sttp.client3.testing.websocket.{WebSocketConcurrentTest, WebSocketStreamingTest, WebSocketTest}
import sttp.monad.MonadError
import sttp.ws.WebSocketFrame
import zio.Task
import zio.stream._
import zio.{Task, ZIO}

class HttpClientZioWebSocketTest
extends WebSocketTest[Task]
Expand All @@ -37,5 +37,5 @@ class HttpClientZioWebSocketTest
): ZioStreams.Pipe[WebSocketFrame.Data[_], WebSocketFrame] =
to.andThen(rest => ZStream(item) ++ rest)

override def concurrently[T](fs: List[() => Task[T]]): Task[List[T]] = Task.collectAllPar(fs.map(_()))
override def concurrently[T](fs: List[() => Task[T]]): Task[List[T]] = ZIO.collectAllPar(fs.map(_()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object StreamFs2 extends App {
basicRequest
.body("I want a stream!")
.post(uri"https://httpbin.org/post")
.response(asStreamAlways(Fs2Streams[IO])(_.chunks.through(text.utf8DecodeC).compile.foldMonoid))
.response(asStreamAlways(Fs2Streams[IO])(_.chunks.through(text.utf8.decodeC).compile.foldMonoid))
.send(backend)
.map { response => println(s"RECEIVED:\n${response.body}") }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import zio.stream._

object StreamZio extends ZIOAppDefault {
def streamRequestBody: RIO[Console with SttpClient, Unit] = {
val stream: Stream[Throwable, Byte] = Stream("Hello, world".getBytes: _*)
val stream: Stream[Throwable, Byte] = ZStream("Hello, world".getBytes: _*)

send(
basicRequest
Expand Down