Skip to content

Commit

Permalink
Support async command evaluation (#917)
Browse files Browse the repository at this point in the history
  • Loading branch information
mberndt123 authored Nov 17, 2023
1 parent 24ae4a9 commit bac8cc1
Show file tree
Hide file tree
Showing 22 changed files with 685 additions and 786 deletions.
25 changes: 25 additions & 0 deletions modules/redis/src/main/scala/zio/redis/GenRedis.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package zio.redis
import zio.{IO, UIO}

private[redis] trait GenRedis[G[+_]]
extends api.Connection[G]
with api.Geo[G]
with api.Hashes[G]
with api.HyperLogLog[G]
with api.Keys[G]
with api.Lists[G]
with api.Sets[G]
with api.Strings[G]
with api.SortedSets[G]
with api.Streams[G]
with api.Scripting[G]
with api.Cluster[G]
with api.Publishing[G]

private[redis] object GenRedis {
type Async[+A] = UIO[IO[RedisError, A]]
type Sync[+A] = IO[RedisError, A]

def async[A](io: UIO[IO[RedisError, A]]) = io
def sync[A](io: UIO[IO[RedisError, A]]) = io.flatten
}
39 changes: 16 additions & 23 deletions modules/redis/src/main/scala/zio/redis/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,31 @@ package zio.redis
import zio._
import zio.redis.internal._

trait Redis
extends api.Connection
with api.Geo
with api.Hashes
with api.HyperLogLog
with api.Keys
with api.Lists
with api.Sets
with api.Strings
with api.SortedSets
with api.Streams
with api.Scripting
with api.Cluster
with api.Publishing

object Redis {
lazy val cluster: ZLayer[CodecSupplier & RedisClusterConfig, RedisError, Redis] =
ClusterExecutor.layer >>> makeLayer
ZLayer.makeSome[CodecSupplier & RedisClusterConfig, Redis](ClusterExecutor.layer, makeLayer)

lazy val local: ZLayer[CodecSupplier, RedisError.IOError, Redis] =
lazy val local: ZLayer[CodecSupplier, RedisError.IOError, Redis & AsyncRedis] =
SingleNodeExecutor.local >>> makeLayer

lazy val singleNode: ZLayer[CodecSupplier & RedisConfig, RedisError.IOError, Redis] =
lazy val singleNode: ZLayer[CodecSupplier & RedisConfig, RedisError.IOError, Redis & AsyncRedis] =
SingleNodeExecutor.layer >>> makeLayer

private def makeLayer: URLayer[CodecSupplier & RedisExecutor, Redis] =
ZLayer {
private def makeLayer: URLayer[CodecSupplier & RedisExecutor, AsyncRedis & Redis] =
ZLayer.fromZIOEnvironment {
for {
codecSupplier <- ZIO.service[CodecSupplier]
executor <- ZIO.service[RedisExecutor]
} yield new Live(codecSupplier, executor)
} yield ZEnvironment[AsyncRedis, Redis](
new AsyncLive(codecSupplier, executor),
new SyncLive(codecSupplier, executor)
)
}

private final class Live(val codecSupplier: CodecSupplier, val executor: RedisExecutor) extends Redis
private final class SyncLive(val codecSupplier: CodecSupplier, val executor: RedisExecutor) extends Redis {
protected def lift[A](in: UIO[IO[RedisError, A]]): GenRedis.Sync[A] = GenRedis.sync(in)
}

private final class AsyncLive(val codecSupplier: CodecSupplier, val executor: RedisExecutor) extends AsyncRedis {
protected def lift[A](in: UIO[IO[RedisError, A]]): GenRedis.Async[A] = GenRedis.async(in)
}
}
16 changes: 8 additions & 8 deletions modules/redis/src/main/scala/zio/redis/ResultBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ object ResultBuilder {
@annotation.implicitNotFound("Use `returning[A]` to specify method's return type")
final abstract class NeedsReturnType

trait ResultBuilder1[+F[_]] extends ResultBuilder {
def returning[R: Schema]: IO[RedisError, F[R]]
trait ResultBuilder1[+F[_], G[+_]] extends ResultBuilder {
def returning[R: Schema]: G[F[R]]
}

trait ResultBuilder2[+F[_, _]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema]: IO[RedisError, F[R1, R2]]
trait ResultBuilder2[+F[_, _], G[+_]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema]: G[F[R1, R2]]
}

trait ResultBuilder3[+F[_, _, _]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema, R3: Schema]: IO[RedisError, F[R1, R2, R3]]
trait ResultBuilder3[+F[_, _, _], G[+_]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema, R3: Schema]: G[F[R1, R2, R3]]
}

trait ResultOutputBuilder extends ResultBuilder {
def returning[R: Output]: IO[RedisError, R]
trait ResultOutputBuilder[G[+_]] extends ResultBuilder {
def returning[R: Output]: G[R]
}

trait ResultStreamBuilder1[+F[_]] extends ResultBuilder {
Expand Down
40 changes: 18 additions & 22 deletions modules/redis/src/main/scala/zio/redis/api/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@

package zio.redis.api

import zio.Chunk
import zio.redis.Input._
import zio.redis.Output.{ChunkOutput, ClusterPartitionOutput, UnitOutput}
import zio.redis._
import zio.redis.api.Cluster.{AskingCommand, ClusterSetSlots, ClusterSlots}
import zio.redis.internal.{RedisCommand, RedisEnvironment, RedisExecutor}
import zio.redis.api.Cluster.{ClusterSetSlots, ClusterSlots, askingCommand}
import zio.redis.internal.{RedisCommand, RedisEnvironment}
import zio.redis.options.Cluster.SetSlotSubCommand._
import zio.redis.options.Cluster.{Partition, Slot}
import zio.{Chunk, IO}

trait Cluster extends RedisEnvironment {
trait Cluster[G[+_]] extends RedisEnvironment[G] {

/**
* When a cluster client receives an -ASK redirect, the ASKING command is sent to the target node followed by the
Expand All @@ -34,8 +33,8 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def asking: IO[RedisError, Unit] =
AskingCommand(executor).run(())
final def asking: G[Unit] =
askingCommand.run(())

/**
* Set a hash slot in importing state. Command should be executed on the node where hash slot will be migrated
Expand All @@ -48,12 +47,11 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotImporting(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
final def setSlotImporting(slot: Slot, nodeId: String): G[Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryValueInput[String](), ArbitraryValueInput[String]()),
UnitOutput,
executor
UnitOutput
)
command.run((slot.number, Importing.asString, nodeId))
}
Expand All @@ -69,12 +67,11 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotMigrating(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
final def setSlotMigrating(slot: Slot, nodeId: String): G[Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryValueInput[String](), ArbitraryValueInput[String]()),
UnitOutput,
executor
UnitOutput
)
command.run((slot.number, Migrating.asString, nodeId))
}
Expand All @@ -90,12 +87,11 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotNode(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
final def setSlotNode(slot: Slot, nodeId: String): G[Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryValueInput[String](), ArbitraryValueInput[String]()),
UnitOutput,
executor
UnitOutput
)
command.run((slot.number, Node.asString, nodeId))
}
Expand All @@ -108,9 +104,9 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotStable(slot: Slot): IO[RedisError, Unit] = {
final def setSlotStable(slot: Slot): G[Unit] = {
val command =
RedisCommand(ClusterSetSlots, Tuple2(LongInput, ArbitraryValueInput[String]()), UnitOutput, executor)
RedisCommand(ClusterSetSlots, Tuple2(LongInput, ArbitraryValueInput[String]()), UnitOutput)
command.run((slot.number, Stable.asString))
}

Expand All @@ -120,8 +116,8 @@ trait Cluster extends RedisEnvironment {
* @return
* details about which cluster
*/
final def slots: IO[RedisError, Chunk[Partition]] = {
val command = RedisCommand(ClusterSlots, NoInput, ChunkOutput(ClusterPartitionOutput), executor)
final def slots: G[Chunk[Partition]] = {
val command = RedisCommand(ClusterSlots, NoInput, ChunkOutput(ClusterPartitionOutput))
command.run(())
}
}
Expand All @@ -131,6 +127,6 @@ private[redis] object Cluster {
final val ClusterSetSlots = "CLUSTER SETSLOT"
final val ClusterSlots = "CLUSTER SLOTS"

final val AskingCommand: RedisExecutor => RedisCommand[Unit, Unit] =
(executor: RedisExecutor) => RedisCommand(Asking, NoInput, UnitOutput, executor)
final val askingCommand: RedisCommand[Unit, Unit] =
RedisCommand(Asking, NoInput, UnitOutput)
}
27 changes: 13 additions & 14 deletions modules/redis/src/main/scala/zio/redis/api/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

package zio.redis.api

import zio._
import zio.redis.Input._
import zio.redis.Output._
import zio.redis._
import zio.redis.internal.{RedisCommand, RedisEnvironment}

trait Connection extends RedisEnvironment {
trait Connection[G[+_]] extends RedisEnvironment[G] {
import Connection.{Auth => _, _}

/**
Expand All @@ -37,8 +36,8 @@ trait Connection extends RedisEnvironment {
* if the password provided via AUTH matches the password in the configuration file, the Unit value is returned and
* the server starts accepting commands. Otherwise, an error is returned and the client needs to try a new password.
*/
final def auth(password: String): IO[RedisError, Unit] = {
val command = RedisCommand(Connection.Auth, AuthInput, UnitOutput, executor)
final def auth(password: String): G[Unit] = {
val command = RedisCommand(Connection.Auth, AuthInput, UnitOutput)

command.run(Auth(None, password))
}
Expand All @@ -54,8 +53,8 @@ trait Connection extends RedisEnvironment {
* if the password provided via AUTH matches the password in the configuration file, the Unit value is returned and
* the server starts accepting commands. Otherwise, an error is returned and the client needs to try a new password.
*/
final def auth(username: String, password: String): IO[RedisError, Unit] = {
val command = RedisCommand(Connection.Auth, AuthInput, UnitOutput, executor)
final def auth(username: String, password: String): G[Unit] = {
val command = RedisCommand(Connection.Auth, AuthInput, UnitOutput)

command.run(Auth(Some(username), password))
}
Expand All @@ -66,8 +65,8 @@ trait Connection extends RedisEnvironment {
* @return
* the connection name, or None if a name wasn't set.
*/
final def clientGetName: IO[RedisError, Option[String]] = {
val command = RedisCommand(ClientGetName, NoInput, OptionalOutput(MultiStringOutput), executor)
final def clientGetName: G[Option[String]] = {
val command = RedisCommand(ClientGetName, NoInput, OptionalOutput(MultiStringOutput))

command.run(())
}
Expand All @@ -82,8 +81,8 @@ trait Connection extends RedisEnvironment {
* @return
* the ID of the current connection.
*/
final def clientId: IO[RedisError, Long] = {
val command = RedisCommand(ClientId, NoInput, LongOutput, executor)
final def clientId: G[Long] = {
val command = RedisCommand(ClientId, NoInput, LongOutput)

command.run(())
}
Expand All @@ -96,8 +95,8 @@ trait Connection extends RedisEnvironment {
* @return
* the Unit value.
*/
final def clientSetName(name: String): IO[RedisError, Unit] = {
val command = RedisCommand(ClientSetName, StringInput, UnitOutput, executor)
final def clientSetName(name: String): G[Unit] = {
val command = RedisCommand(ClientSetName, StringInput, UnitOutput)

command.run(name)
}
Expand All @@ -112,8 +111,8 @@ trait Connection extends RedisEnvironment {
* @return
* the Unit value.
*/
final def select(index: Long): IO[RedisError, Unit] = {
val command = RedisCommand(Select, LongInput, UnitOutput, executor)
final def select(index: Long): G[Unit] = {
val command = RedisCommand(Select, LongInput, UnitOutput)

command.run(index)
}
Expand Down
Loading

0 comments on commit bac8cc1

Please sign in to comment.