Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the XINFO command #301

Merged
merged 2 commits into from
Mar 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions redis/src/main/scala/zio/redis/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,21 @@ object Input {
Chunk(encodeString("DELCONSUMER"), encodeString(data.key), encodeString(data.group), encodeString(data.consumer))
}

case object XInfoGroupInput extends Input[XInfoCommand.Group] {
def encode(data: XInfoCommand.Group): Chunk[RespValue.BulkString] =
Chunk(encodeString("GROUPS"), encodeString(data.key))
}

case object XInfoStreamInput extends Input[XInfoCommand.Stream] {
def encode(data: XInfoCommand.Stream): Chunk[RespValue.BulkString] =
Chunk(encodeString("STREAM"), encodeString(data.key))
}

case object XInfoConsumerInput extends Input[XInfoCommand.Consumer] {
def encode(data: XInfoCommand.Consumer): Chunk[RespValue.BulkString] =
Chunk(encodeString("CONSUMERS"), encodeString(data.key), encodeString(data.group))
}

case object BlockInput extends Input[Duration] {
def encode(data: Duration): Chunk[RespValue.BulkString] =
Chunk(encodeString("BLOCK"), encodeString(data.toMillis.toString))
Expand Down
155 changes: 146 additions & 9 deletions redis/src/main/scala/zio/redis/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,21 +292,158 @@ object Output {
case object StreamOutput extends Output[Map[String, Map[String, String]]] {
protected def tryDecode(respValue: RespValue): Map[String, Map[String, String]] =
respValue match {
case RespValue.NullArray => Map.empty[String, Map[String, String]]
case RespValue.Array(entities) =>
val output = collection.mutable.Map.empty[String, Map[String, String]]
entities.foreach {
case RespValue.Array(Seq(id @ RespValue.BulkString(_), value)) =>
output += (id.asString -> KeyValueOutput.unsafeDecode(value))
case RespValue.NullArray => Map.empty[String, Map[String, String]]
case RespValue.Array(entities) => extractMapMap(entities)
case other => throw ProtocolError(s"$other isn't an array")
}
}

private def extractMapMap(entities: Chunk[RespValue]): Map[String, Map[String, String]] = {
val output = collection.mutable.Map.empty[String, Map[String, String]]
entities.foreach {
case RespValue.Array(Seq(id @ RespValue.BulkString(_), value)) =>
output += (id.asString -> KeyValueOutput.unsafeDecode(value))
case other =>
throw ProtocolError(s"$other isn't a valid array")
}
output.toMap
}

case object StreamGroupInfoOutput extends Output[Chunk[StreamGroupInfo]] {
override protected def tryDecode(respValue: RespValue): Chunk[StreamGroupInfo] =
respValue match {
case RespValue.NullArray => Chunk.empty
case RespValue.Array(messages) =>
messages.collect {
// Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
case RespValue.Array(elements) if elements.length % 2 == 0 =>
var streamGroupInfo: StreamGroupInfo = StreamGroupInfo.empty
val len = elements.length
var pos = 0
while (pos < len) {
(elements(pos), elements(pos + 1)) match {
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) =>
if (key.asString == XInfoFields.Name)
streamGroupInfo = streamGroupInfo.copy(name = Some(value.asString))
else if (key.asString == XInfoFields.LastDelivered)
streamGroupInfo = streamGroupInfo.copy(lastDeliveredId = Some(value.asString))
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
if (key.asString == XInfoFields.Pending)
streamGroupInfo = streamGroupInfo.copy(pending = Some(value.value))
else if (key.asString == XInfoFields.Consumers)
streamGroupInfo = streamGroupInfo.copy(consumers = Some(value.value))
case _ =>
}
pos += 2
}
streamGroupInfo
case array @ RespValue.Array(_) =>
throw ProtocolError(s"$array doesn't have an even number of elements")
case other =>
throw ProtocolError(s"$other isn't a valid array")
throw ProtocolError(s"$other isn't an array")
}

output.toMap
case other => throw ProtocolError(s"$other isn't an array")
case other =>
throw ProtocolError(s"$other isn't an array")
}
}

case object StreamConsumerInfoOutput extends Output[Chunk[StreamConsumerInfo]] {
override protected def tryDecode(respValue: RespValue): Chunk[StreamConsumerInfo] =
respValue match {
case RespValue.NullArray => Chunk.empty
case RespValue.Array(messages) =>
messages.collect {
// Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
case RespValue.Array(elements) if elements.length % 2 == 0 =>
var streamConsumerInfo: StreamConsumerInfo = StreamConsumerInfo.empty
val len = elements.length
var pos = 0
while (pos < len) {
(elements(pos), elements(pos + 1)) match {
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_))
if key.asString == XInfoFields.Name =>
streamConsumerInfo = streamConsumerInfo.copy(name = Some(value.asString))
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
if (key.asString == XInfoFields.Pending)
streamConsumerInfo = streamConsumerInfo.copy(pending = Some(value.value))
else if (key.asString == XInfoFields.Idle)
streamConsumerInfo = streamConsumerInfo.copy(idle = Some(value.value))
case _ =>
}
pos += 2
}
streamConsumerInfo
case array @ RespValue.Array(_) =>
throw ProtocolError(s"$array doesn't have an even number of elements")
case other =>
throw ProtocolError(s"$other isn't an array")
}

case other =>
throw ProtocolError(s"$other isn't an array")
}
}

case object StreamInfoOutput extends Output[StreamInfo] {
override protected def tryDecode(respValue: RespValue): StreamInfo = {
var streamInfo: StreamInfo = StreamInfo.empty
respValue match {
// Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
case RespValue.Array(elements) if elements.length % 2 == 0 =>
val len = elements.length
var pos = 0
while (pos < len) {
(elements(pos), elements(pos + 1)) match {
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
if (key.asString == XInfoFields.Length)
streamInfo = streamInfo.copy(length = Some(value.value))
else if (key.asString == XInfoFields.RadixTreeNodes)
streamInfo = streamInfo.copy(radixTreeNodes = Some(value.value))
else if (key.asString == XInfoFields.RadixTreeKeys)
streamInfo = streamInfo.copy(radixTreeKeys = Some(value.value))
else if (key.asString == XInfoFields.Groups)
streamInfo = streamInfo.copy(groups = Some(value.value))
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_))
if key.asString == XInfoFields.LastGeneratedId =>
streamInfo = streamInfo.copy(lastGeneratedId = Some(value.asString))
case (key @ RespValue.BulkString(_), value @ RespValue.Array(_)) =>
if (key.asString == XInfoFields.FirstEntry)
streamInfo = streamInfo.copy(firstEntry = Some(extractStreamEntry(value)))
else if (key.asString == XInfoFields.LastEntry)
streamInfo = streamInfo.copy(lastEntry = Some(extractStreamEntry(value)))
case _ =>
}
pos += 2
}
streamInfo
case array @ RespValue.Array(_) =>
throw ProtocolError(s"$array doesn't have an even number of elements")

case other =>
throw ProtocolError(s"$other isn't an array")
}
}
}

private def extractStreamEntry(es: RespValue): StreamEntry = {
val entry = collection.mutable.Map.empty[String, String]
var entryId: String = ""
es match {
case RespValue.Array(entities) =>
entities.foreach {
case id @ RespValue.BulkString(_) => entryId = id.asString
case RespValue.ArrayValues(id @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) =>
entry += (id.asString -> value.asString)
case other =>
throw ProtocolError(s"$other isn't a valid array")
}
case other =>
throw ProtocolError(s"$other isn't a valid array")
}
StreamEntry(id = entryId, entry.toMap)
}

case object XPendingOutput extends Output[PendingInfo] {
protected def tryDecode(respValue: RespValue): PendingInfo =
respValue match {
Expand Down
45 changes: 42 additions & 3 deletions redis/src/main/scala/zio/redis/api/Streams.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package zio.redis.api

import zio.duration._
import zio.redis.Input._
import zio.redis.Output._
import zio.redis.Input.{ XInfoConsumerInput, _ }
import zio.redis.Output.{ StreamGroupInfoOutput, _ }
import zio.redis._
import zio.{ Chunk, ZIO }

Expand Down Expand Up @@ -39,6 +39,38 @@ trait Streams {
): ZIO[RedisExecutor, RedisError, String] =
XAdd.run((key, None, id, (pair, pairs.toList)))

/**
* An introspection command used in order to retrieve different information about the group.
*
* @param key ID of the stream
* @return List of consumer groups associated with the stream stored at the specified key.
*/
final def xInfoGroup(
key: String
): ZIO[RedisExecutor, RedisError, Chunk[StreamGroupInfo]] = XInfoGroups.run(XInfoCommand.Group(key))

/**
* An introspection command used in order to retrieve different information about the consumers.
*
* @param key ID of the stream
* @param group ID of the consumer group
* @return List of every consumer in a specific consumer group.
*/
final def xInfoConsumers(
key: String,
group: String
): ZIO[RedisExecutor, RedisError, Chunk[StreamConsumerInfo]] = XInfoConsumers.run(XInfoCommand.Consumer(key, group))

/**
* An introspection command used in order to retrieve different information about the stream.
*
* @param key ID of the stream
* @return General information about the stream stored at the specified key.
*/
final def xInfoStream(
key: String
): ZIO[RedisExecutor, RedisError, StreamInfo] = XInfoStream.run(XInfoCommand.Stream(key))

/**
* Appends the specified stream entry to the stream at the specified key while limiting the size of the stream.
*
Expand Down Expand Up @@ -468,7 +500,14 @@ private object Streams {
final val XGroupDelConsumer: RedisCommand[XGroupCommand.DelConsumer, Long] =
RedisCommand("XGROUP", XGroupDelConsumerInput, LongOutput)

// TODO: implement XINFO command
final val XInfoGroups: RedisCommand[XInfoCommand.Group, Chunk[StreamGroupInfo]] =
RedisCommand("XINFO", XInfoGroupInput, StreamGroupInfoOutput)

final val XInfoStream: RedisCommand[XInfoCommand.Stream, StreamInfo] =
RedisCommand("XINFO", XInfoStreamInput, StreamInfoOutput)

final val XInfoConsumers: RedisCommand[XInfoCommand.Consumer, Chunk[StreamConsumerInfo]] =
RedisCommand("XINFO", XInfoConsumerInput, StreamConsumerInfoOutput)

final val XLen: RedisCommand[String, Long] = RedisCommand("XLEN", StringInput, LongOutput)

Expand Down
85 changes: 81 additions & 4 deletions redis/src/main/scala/zio/redis/options/Streams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zio.redis.options
import zio.duration._

trait Streams {

case object WithForce {
private[redis] def stringify = "FORCE"
}
Expand All @@ -18,11 +19,28 @@ trait Streams {
sealed trait XGroupCommand

object XGroupCommand {

case class Create(key: String, group: String, id: String, mkStream: Boolean) extends XGroupCommand
case class SetId(key: String, group: String, id: String) extends XGroupCommand
case class Destroy(key: String, group: String) extends XGroupCommand
case class CreateConsumer(key: String, group: String, consumer: String) extends XGroupCommand
case class DelConsumer(key: String, group: String, consumer: String) extends XGroupCommand

case class SetId(key: String, group: String, id: String) extends XGroupCommand

case class Destroy(key: String, group: String) extends XGroupCommand

case class CreateConsumer(key: String, group: String, consumer: String) extends XGroupCommand

case class DelConsumer(key: String, group: String, consumer: String) extends XGroupCommand

}

sealed trait XInfoCommand

object XInfoCommand {

case class Group(key: String) extends XInfoCommand

case class Stream(key: String) extends XInfoCommand

case class Consumer(key: String, group: String) extends XInfoCommand
}

case object MkStream {
Expand Down Expand Up @@ -54,4 +72,63 @@ trait Streams {
type NoAck = NoAck.type

case class MaxLen(approximate: Boolean, count: Long)

case class StreamEntry(id: String, fields: Map[String, String])

case class StreamInfo(
length: Option[Long],
radixTreeKeys: Option[Long],
radixTreeNodes: Option[Long],
groups: Option[Long],
lastGeneratedId: Option[String],
firstEntry: Option[StreamEntry],
lastEntry: Option[StreamEntry]
)

object StreamInfo {
def empty: StreamInfo = StreamInfo(None, None, None, None, None, None, None)
}

case class StreamGroupInfo(
name: Option[String],
consumers: Option[Long],
pending: Option[Long],
lastDeliveredId: Option[String]
)

object StreamGroupInfo {
def empty: StreamGroupInfo = StreamGroupInfo(None, None, None, None)
}

case class StreamConsumerInfo(
name: Option[String],
idle: Option[Long],
pending: Option[Long]
)

object StreamConsumerInfo {
def empty: StreamConsumerInfo = StreamConsumerInfo(None, None, None)
}

private[redis] object XInfoFields {
val Name: String = "name"
val Idle: String = "idle"
val Pending: String = "pending"

val Consumers: String = "consumers"
val LastDelivered: String = "last-delivered-id"

val Length: String = "length"
val RadixTreeKeys: String = "radix-tree-keys"
val RadixTreeNodes: String = "radix-tree-nodes"
val Groups: String = "groups"
val LastGeneratedId: String = "last-generated-id"
val FirstEntry: String = "first-entry"
val LastEntry: String = "last-entry"

val Entries: String = "entries"
val PelCount: String = "pel-count"
val SeenTime: String = "seen-time"
}

}
14 changes: 14 additions & 0 deletions redis/src/test/scala/zio/redis/InputSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,20 @@ object InputSpec extends BaseSpec {
.map(assert(_)(equalTo(respArgs("STREAMS", "a", "c", "b", "d"))))
}
),
suite("XInfo")(
testM("stream info") {
Task(XInfoStreamInput.encode(XInfoCommand.Stream("key")))
.map(assert(_)(equalTo(respArgs("STREAM", "key"))))
},
testM("group info") {
Task(XInfoGroupInput.encode(XInfoCommand.Group("key")))
.map(assert(_)(equalTo(respArgs("GROUPS", "key"))))
},
testM("consumer info") {
Task(XInfoConsumerInput.encode(XInfoCommand.Consumer("key", "group")))
.map(assert(_)(equalTo(respArgs("CONSUMERS", "key", "group"))))
}
),
suite("Group")(
testM("valid value") {
Task(GroupInput.encode(Group("group", "consumer")))
Expand Down
Loading