Output.scala 37.6 KB
Newer Older
D
Dejan Mijić 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright 2021 John A. De Goes and the ZIO contributors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

D
Dejan Mijić 已提交
17 18
package zio.redis

梦境迷离's avatar
梦境迷离 已提交
19
import zio._
20
import zio.redis.internal.RespValue
A
Anatoly Sergeev 已提交
21
import zio.redis.options.Cluster.{Node, Partition, SlotRange}
A
Anatoly Sergeev 已提交
22
import zio.schema.Schema
A
Anatoly Sergeev 已提交
23
import zio.schema.codec.BinaryCodec
D
Dejan Mijić 已提交
24

D
Dejan Mijić 已提交
25 26
import java.nio.charset.StandardCharsets

27
sealed trait Output[+A] { self =>
28
  protected def tryDecode(respValue: RespValue): A
D
Dejan Mijić 已提交
29

30 31
  final def map[B](f: A => B): Output[B] =
    new Output[B] {
32
      protected def tryDecode(respValue: RespValue): B = f(self.tryDecode(respValue))
33
    }
D
Dejan Mijić 已提交
34

35 36
  private[redis] final def unsafeDecode(respValue: RespValue): A =
    respValue match {
D
Dejan Mijić 已提交
37
      case error: RespValue.Error => throw error.asRedisError
38 39
      case success                => tryDecode(success)
    }
D
Dejan Mijić 已提交
40
}
D
Dejan Mijić 已提交
41 42

object Output {
D
Dejan Mijić 已提交
43 44
  import RedisError._

45 46 47
  def apply[A](implicit output: Output[A]): Output[A] = output

  case object RespValueOutput extends Output[RespValue] {
48
    protected def tryDecode(respValue: RespValue): RespValue = respValue
49 50
  }

51
  case object BoolOutput extends Output[Boolean] {
52
    protected def tryDecode(respValue: RespValue): Boolean =
53 54 55 56
      respValue match {
        case RespValue.Integer(0) => false
        case RespValue.Integer(1) => true
        case other                => throw ProtocolError(s"$other isn't a boolean")
D
Dejan Mijić 已提交
57 58 59
      }
  }

A
Anatoly Sergeev 已提交
60
  final case class ChunkOutput[+A](output: Output[A]) extends Output[Chunk[A]] {
61
    protected def tryDecode(respValue: RespValue): Chunk[A] =
62
      respValue match {
A
Anatoly Sergeev 已提交
63 64 65 66 67 68
        case RespValue.NullArray     => Chunk.empty
        case RespValue.Array(values) => values.map(output.tryDecode)
        case other                   => throw ProtocolError(s"$other isn't an array")
      }
  }

69
  final case class ZRandMemberOutput[+A](output: Output[A]) extends Output[Chunk[A]] {
70
    protected def tryDecode(respValue: RespValue): Chunk[A] =
71 72 73 74 75 76 77 78
      respValue match {
        case RespValue.NullBulkString => Chunk.empty
        case RespValue.NullArray      => Chunk.empty
        case RespValue.Array(values)  => values.map(output.tryDecode)
        case other                    => throw ProtocolError(s"$other isn't an array")
      }
  }

A
Anatoly Sergeev 已提交
79
  final case class ChunkTuple2Output[+A, +B](_1: Output[A], _2: Output[B]) extends Output[Chunk[(A, B)]] {
80
    protected def tryDecode(respValue: RespValue): Chunk[(A, B)] =
A
Anatoly Sergeev 已提交
81 82 83 84 85 86 87
      respValue match {
        case RespValue.NullArray =>
          Chunk.empty
        case RespValue.Array(values) if values.length % 2 == 0 =>
          Chunk.fromIterator(values.grouped(2).map(g => _1.tryDecode(g(0)) -> _2.tryDecode(g(1))))
        case array @ RespValue.Array(_) =>
          throw ProtocolError(s"$array doesn't have an even number of elements")
88 89 90 91
        case other =>
          throw ProtocolError(s"$other isn't an array")
      }
  }
A
Anatoly Sergeev 已提交
92

93
  final case class ZRandMemberTuple2Output[+A, +B](_1: Output[A], _2: Output[B]) extends Output[Chunk[(A, B)]] {
94
    protected def tryDecode(respValue: RespValue): Chunk[(A, B)] =
95 96 97 98 99 100 101
      respValue match {
        case RespValue.NullBulkString => Chunk.empty
        case RespValue.NullArray      => Chunk.empty
        case RespValue.Array(values) if values.length % 2 == 0 =>
          Chunk.fromIterator(values.grouped(2).map(g => _1.tryDecode(g(0)) -> _2.tryDecode(g(1))))
        case array @ RespValue.Array(_) =>
          throw ProtocolError(s"$array doesn't have an even number of elements")
D
Dejan Mijić 已提交
102
        case other =>
103 104
          throw ProtocolError(s"$other isn't an array")
      }
D
Dejan Mijić 已提交
105 106
  }

107
  case object DoubleOutput extends Output[Double] {
108
    protected def tryDecode(respValue: RespValue): Double =
109 110 111
      respValue match {
        case RespValue.BulkString(bytes) => decodeDouble(bytes)
        case other                       => throw ProtocolError(s"$other isn't a double.")
D
Dejan Mijić 已提交
112
      }
113 114
  }

115
  private object DurationOutput extends Output[Long] {
116
    protected def tryDecode(respValue: RespValue): Long =
117 118 119 120 121 122
      respValue match {
        case RespValue.Integer(-2L) => throw ProtocolError("Key not found.")
        case RespValue.Integer(-1L) => throw ProtocolError("Key has no expire.")
        case RespValue.Integer(n)   => n
        case other                  => throw ProtocolError(s"$other isn't a duration.")
      }
A
Aleksandar Novaković 已提交
123
  }
D
Dejan Mijić 已提交
124

125
  final val DurationMillisecondsOutput: Output[Duration] = DurationOutput.map(_.milliseconds)
D
Dejan Mijić 已提交
126

127
  final val DurationSecondsOutput: Output[Duration] = DurationOutput.map(_.seconds)
128

129
  case object LongOutput extends Output[Long] {
130
    protected def tryDecode(respValue: RespValue): Long =
131 132 133
      respValue match {
        case RespValue.Integer(v) => v
        case other                => throw ProtocolError(s"$other isn't an integer")
A
Aleksandar Skrbic 已提交
134
      }
D
Dejan Mijić 已提交
135 136
  }

137
  final case class OptionalOutput[+A](output: Output[A]) extends Output[Option[A]] {
138
    protected def tryDecode(respValue: RespValue): Option[A] =
139
      respValue match {
梦境迷离's avatar
梦境迷离 已提交
140 141 142
        case RespValue.NullBulkString | RespValue.NullArray => None
        case RespValue.BulkString(value) if value.isEmpty   => None
        case other                                          => Some(output.tryDecode(other))
A
Aleksandar Skrbic 已提交
143
      }
144
  }
A
Aleksandar Skrbic 已提交
145

A
Anatoly Sergeev 已提交
146
  final case class ScanOutput[+A](output: Output[A]) extends Output[(Long, Chunk[A])] {
147
    protected def tryDecode(respValue: RespValue): (Long, Chunk[A]) =
148 149
      respValue match {
        case RespValue.ArrayValues(cursor @ RespValue.BulkString(_), RespValue.Array(items)) =>
A
Anatoly Sergeev 已提交
150
          (cursor.asLong, items.map(output.tryDecode))
D
Dejan Mijić 已提交
151
        case other =>
152
          throw ProtocolError(s"$other isn't scan output")
A
Aleksandar Skrbic 已提交
153
      }
154
  }
A
Aleksandar Skrbic 已提交
155

156
  case object KeyElemOutput extends Output[Option[(String, String)]] {
157
    protected def tryDecode(respValue: RespValue): Option[(String, String)] =
158
      respValue match {
159
        case RespValue.NullArray =>
160 161 162
          None
        case RespValue.ArrayValues(a @ RespValue.BulkString(_), b @ RespValue.BulkString(_)) =>
          Some((a.asString, b.asString))
D
Dejan Mijić 已提交
163
        case other => throw ProtocolError(s"$other isn't blPop output")
A
Aleksandar Skrbic 已提交
164 165 166
      }
  }

167
  case object StringOutput extends Output[String] {
168
    protected def tryDecode(respValue: RespValue): String =
169 170 171
      respValue match {
        case RespValue.SimpleString(s) => s
        case other                     => throw ProtocolError(s"$other isn't a simple string")
A
Aleksandar Skrbic 已提交
172 173 174
      }
  }

175
  case object MultiStringOutput extends Output[String] {
176
    protected def tryDecode(respValue: RespValue): String =
177 178 179
      respValue match {
        case s @ RespValue.BulkString(_) => s.asString
        case other                       => throw ProtocolError(s"$other isn't a bulk string")
A
Aleksandar Skrbic 已提交
180 181 182
      }
  }

183
  case object BulkStringOutput extends Output[Chunk[Byte]] {
184
    protected def tryDecode(respValue: RespValue): Chunk[Byte] =
185 186 187 188 189 190
      respValue match {
        case RespValue.BulkString(value) => value
        case other                       => throw ProtocolError(s"$other isn't a bulk string")
      }
  }

191 192
  final case class ArbitraryOutput[A]()(implicit codec: BinaryCodec[A]) extends Output[A] {
    protected def tryDecode(respValue: RespValue): A =
193
      respValue match {
194
        case RespValue.BulkString(s) => codec.decode(s).fold(e => throw CodecError(e.message), identity)
A
Anatoly Sergeev 已提交
195
        case other                   => throw ProtocolError(s"$other isn't a bulk string")
196 197 198
      }
  }

A
Anatoly Sergeev 已提交
199
  final case class Tuple2Output[+A, +B](_1: Output[A], _2: Output[B]) extends Output[(A, B)] {
200
    protected def tryDecode(respValue: RespValue): (A, B) =
201
      respValue match {
A
Anatoly Sergeev 已提交
202 203
        case RespValue.ArrayValues(a: RespValue, b: RespValue) => (_1.tryDecode(a), _2.tryDecode(b))
        case other                                             => throw ProtocolError(s"$other isn't a tuple2")
A
Aleksandar Skrbic 已提交
204 205 206
      }
  }

A
Anatoly Sergeev 已提交
207
  final case class Tuple3Output[+A, +B, +C](_1: Output[A], _2: Output[B], _3: Output[C]) extends Output[(A, B, C)] {
208
    protected def tryDecode(respValue: RespValue): (A, B, C) =
209
      respValue match {
A
Anatoly Sergeev 已提交
210 211 212
        case RespValue.ArrayValues(a: RespValue, b: RespValue, c: RespValue) =>
          (_1.tryDecode(a), _2.tryDecode(b), _3.tryDecode(c))
        case other => throw ProtocolError(s"$other isn't a tuple3")
213
      }
A
Aleksandar Skrbic 已提交
214 215
  }

A
Anatoly Sergeev 已提交
216
  case object SingleOrMultiStringOutput extends Output[String] {
217
    protected def tryDecode(respValue: RespValue): String =
218
      respValue match {
A
Anatoly Sergeev 已提交
219 220 221 222 223 224 225
        case RespValue.SimpleString(s)   => s
        case s @ RespValue.BulkString(_) => s.asString
        case other                       => throw ProtocolError(s"$other isn't a bulk string")
      }
  }

  final case class MultiStringChunkOutput[+A](output: Output[A]) extends Output[Chunk[A]] {
226
    protected def tryDecode(respValue: RespValue): Chunk[A] =
A
Anatoly Sergeev 已提交
227 228 229 230 231
      respValue match {
        case RespValue.NullBulkString    => Chunk.empty
        case s @ RespValue.BulkString(_) => Chunk.single(output.tryDecode(s))
        case RespValue.Array(elements)   => elements.map(output.tryDecode)
        case other                       => throw ProtocolError(s"$other isn't a string nor an array")
232
      }
A
Aleksandar Skrbic 已提交
233 234
  }

235
  case object TypeOutput extends Output[RedisType] {
236
    protected def tryDecode(respValue: RespValue): RedisType =
237 238 239 240 241 242
      respValue match {
        case RespValue.SimpleString("string") => RedisType.String
        case RespValue.SimpleString("list")   => RedisType.List
        case RespValue.SimpleString("set")    => RedisType.Set
        case RespValue.SimpleString("zset")   => RedisType.SortedSet
        case RespValue.SimpleString("hash")   => RedisType.Hash
243
        case RespValue.SimpleString("stream") => RedisType.Stream
244 245
        case other                            => throw ProtocolError(s"$other isn't redis type.")
      }
A
Aleksandar Skrbic 已提交
246 247
  }

248
  case object UnitOutput extends Output[Unit] {
249
    protected def tryDecode(respValue: RespValue): Unit =
250 251 252
      respValue match {
        case RespValue.SimpleString("OK") => ()
        case other                        => throw ProtocolError(s"$other isn't unit.")
D
Dejan Mijić 已提交
253 254
      }
  }
D
Dejan Mijić 已提交
255

256
  case object ResetOutput extends Output[Unit] {
257
    protected def tryDecode(respValue: RespValue): Unit =
258 259 260 261 262 263
      respValue match {
        case RespValue.SimpleString("RESET") => ()
        case other                           => throw ProtocolError(s"$other isn't unit.")
      }
  }

264
  case object GeoOutput extends Output[Chunk[Option[LongLat]]] {
265
    protected def tryDecode(respValue: RespValue): Chunk[Option[LongLat]] =
266
      respValue match {
267 268
        case RespValue.NullArray =>
          Chunk.empty
269 270
        case RespValue.Array(elements) =>
          elements.map {
271
            case RespValue.NullArray => None
272
            case RespValue.ArrayValues(RespValue.BulkString(long), RespValue.BulkString(lat)) =>
273
              Some(LongLat(decodeDouble(long), decodeDouble(lat)))
D
Dejan Mijić 已提交
274
            case other =>
275
              throw ProtocolError(s"$other was not a longitude,latitude pair")
A
Aleksandar Novaković 已提交
276
          }
D
Dejan Mijić 已提交
277
        case other =>
278
          throw ProtocolError(s"$other isn't geo output")
A
Aleksandar Novaković 已提交
279 280 281
      }
  }

282
  case object GeoRadiusOutput extends Output[Chunk[GeoView]] {
283
    protected def tryDecode(respValue: RespValue): Chunk[GeoView] =
284 285 286
      respValue match {
        case RespValue.Array(elements) =>
          elements.map {
D
Dejan Mijić 已提交
287
            case s @ RespValue.BulkString(_) =>
288 289
              GeoView(s.asString, None, None, None)
            case RespValue.ArrayValues(name @ RespValue.BulkString(_), infos @ _*) =>
290 291 292 293
              val distance = infos.collectFirst { case RespValue.BulkString(bytes) => decodeDouble(bytes) }

              val hash = infos.collectFirst { case RespValue.Integer(i) => i }

294 295 296 297
              val position = infos.collectFirst {
                case RespValue.ArrayValues(RespValue.BulkString(long), RespValue.BulkString(lat)) =>
                  LongLat(decodeDouble(long), decodeDouble(lat))
              }
298

299
              GeoView(name.asString, distance, hash, position)
300 301

            case other => throw ProtocolError(s"$other is not a geo radious output")
302
          }
303

D
Dejan Mijić 已提交
304
        case other => throw ProtocolError(s"$other is not an array")
305 306 307
      }
  }

A
Anatoly Sergeev 已提交
308
  final case class KeyValueOutput[K, V](outK: Output[K], outV: Output[V]) extends Output[Map[K, V]] {
309
    protected def tryDecode(respValue: RespValue): Map[K, V] =
310
      respValue match {
A
Anatoly Sergeev 已提交
311 312
        case RespValue.NullArray =>
          Map.empty[K, V]
313
        case RespValue.Array(elements) if elements.length % 2 == 0 =>
A
Anatoly Sergeev 已提交
314
          val output = collection.mutable.Map.empty[K, V]
315 316
          val len    = elements.length
          var pos    = 0
317

318
          while (pos < len) {
A
Anatoly Sergeev 已提交
319 320 321
            val key   = outK.unsafeDecode(elements(pos))
            val value = outV.unsafeDecode(elements(pos + 1))
            output += key -> value
322 323
            pos += 2
          }
324

325 326 327 328 329 330
          output.toMap
        case array @ RespValue.Array(_) =>
          throw ProtocolError(s"$array doesn't have an even number of elements")
        case other =>
          throw ProtocolError(s"$other isn't an array")
      }
D
Dejan Mijić 已提交
331
  }
A
Aleksandar Novaković 已提交
332

333
  final case class StreamEntryOutput[I: BinaryCodec, K: BinaryCodec, V: BinaryCodec]()(implicit
334 335 336 337
    idSchema: Schema[I],
    keySchema: Schema[K],
    valueSchema: Schema[V]
  ) extends Output[StreamEntry[I, K, V]] {
338
    protected def tryDecode(respValue: RespValue): StreamEntry[I, K, V] =
339
      respValue match {
340 341 342 343 344 345
        case RespValue.Array(Seq(id @ RespValue.BulkString(_), value)) =>
          val entryId = ArbitraryOutput[I]().unsafeDecode(id)
          val entry   = KeyValueOutput(ArbitraryOutput[K](), ArbitraryOutput[V]()).unsafeDecode(value)
          StreamEntry(entryId, entry)
        case other =>
          throw ProtocolError(s"$other isn't a valid array")
346 347 348
      }
  }

349
  final case class StreamEntriesOutput[I: BinaryCodec, K: BinaryCodec, V: BinaryCodec]()(implicit
A
Anatoly Sergeev 已提交
350 351 352
    idSchema: Schema[I],
    keySchema: Schema[K],
    valueSchema: Schema[V]
353
  ) extends Output[Chunk[StreamEntry[I, K, V]]] {
354
    protected def tryDecode(respValue: RespValue): Chunk[StreamEntry[I, K, V]] =
355 356 357
      ChunkOutput(StreamEntryOutput[I, K, V]()).unsafeDecode(respValue)
  }

358
  final case class StreamOutput[N: BinaryCodec, I: BinaryCodec, K: BinaryCodec, V: BinaryCodec]()(implicit
359 360 361 362 363
    nameSchema: Schema[N],
    idSchema: Schema[I],
    keySchema: Schema[K],
    valueSchema: Schema[V]
  ) extends Output[StreamChunk[N, I, K, V]] {
364
    protected def tryDecode(respValue: RespValue): StreamChunk[N, I, K, V] = {
365 366 367
      val (name, entries) = Tuple2Output(ArbitraryOutput[N](), StreamEntriesOutput[I, K, V]()).unsafeDecode(respValue)
      StreamChunk(name, entries)
    }
368 369
  }

370
  case object StreamGroupsInfoOutput extends Output[Chunk[StreamGroupsInfo]] {
371
    protected def tryDecode(respValue: RespValue): Chunk[StreamGroupsInfo] =
372 373 374 375 376 377
      respValue match {
        case RespValue.NullArray => Chunk.empty
        case RespValue.Array(messages) =>
          messages.collect {
            // Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
            case RespValue.Array(elements) if elements.length % 2 == 0 =>
378 379 380
              var streamGroupsInfo: StreamGroupsInfo = StreamGroupsInfo.empty
              val len                                = elements.length
              var pos                                = 0
381 382 383 384
              while (pos < len) {
                (elements(pos), elements(pos + 1)) match {
                  case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) =>
                    if (key.asString == XInfoFields.Name)
385
                      streamGroupsInfo = streamGroupsInfo.copy(name = value.asString)
梦境迷离's avatar
梦境迷离 已提交
386
                    else if (key.asString == XInfoFields.LastDeliveredId)
387
                      streamGroupsInfo = streamGroupsInfo.copy(lastDeliveredId = value.asString)
388 389
                  case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
                    if (key.asString == XInfoFields.Pending)
390
                      streamGroupsInfo = streamGroupsInfo.copy(pending = value.value)
391
                    else if (key.asString == XInfoFields.Consumers)
392
                      streamGroupsInfo = streamGroupsInfo.copy(consumers = value.value)
393 394 395 396
                  case _ =>
                }
                pos += 2
              }
397
              streamGroupsInfo
398 399
            case array @ RespValue.Array(_) =>
              throw ProtocolError(s"$array doesn't have an even number of elements")
400
            case other =>
401
              throw ProtocolError(s"$other isn't an array")
402
          }
A
Aleksandar Novaković 已提交
403

404 405 406 407 408
        case other =>
          throw ProtocolError(s"$other isn't an array")
      }
  }

409
  case object StreamConsumersInfoOutput extends Output[Chunk[StreamConsumersInfo]] {
410
    protected def tryDecode(respValue: RespValue): Chunk[StreamConsumersInfo] =
411 412 413 414 415 416
      respValue match {
        case RespValue.NullArray => Chunk.empty
        case RespValue.Array(messages) =>
          messages.collect {
            // Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
            case RespValue.Array(elements) if elements.length % 2 == 0 =>
417 418 419
              var streamConsumersInfo: StreamConsumersInfo = StreamConsumersInfo.empty
              val len                                      = elements.length
              var pos                                      = 0
420 421 422 423
              while (pos < len) {
                (elements(pos), elements(pos + 1)) match {
                  case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_))
                      if key.asString == XInfoFields.Name =>
424
                    streamConsumersInfo = streamConsumersInfo.copy(name = value.asString)
425 426
                  case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
                    if (key.asString == XInfoFields.Pending)
427
                      streamConsumersInfo = streamConsumersInfo.copy(pending = value.value)
428
                    else if (key.asString == XInfoFields.Idle)
429
                      streamConsumersInfo = streamConsumersInfo.copy(idle = value.value.millis)
430 431 432 433
                  case _ =>
                }
                pos += 2
              }
434
              streamConsumersInfo
435 436 437 438 439 440 441 442 443 444 445
            case array @ RespValue.Array(_) =>
              throw ProtocolError(s"$array doesn't have an even number of elements")
            case other =>
              throw ProtocolError(s"$other isn't an array")
          }

        case other =>
          throw ProtocolError(s"$other isn't an array")
      }
  }

446
  final case class StreamInfoFullOutput[I: Schema: BinaryCodec, K: Schema: BinaryCodec, V: Schema: BinaryCodec]()
A
Anatoly Sergeev 已提交
447
      extends Output[StreamInfoWithFull.FullStreamInfo[I, K, V]] {
448
    protected def tryDecode(
A
Anatoly Sergeev 已提交
449
      respValue: RespValue
450
    ): StreamInfoWithFull.FullStreamInfo[I, K, V] = {
A
Anatoly Sergeev 已提交
451
      var streamInfoFull: StreamInfoWithFull.FullStreamInfo[I, K, V] = StreamInfoWithFull.FullStreamInfo.empty
梦境迷离's avatar
梦境迷离 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
      respValue match {
        // Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
        case RespValue.Array(elements) if elements.length % 2 == 0 =>
          var pos = 0
          while (pos < elements.length) {
            (elements(pos), elements(pos + 1)) match {
              // Get the basic information of the outermost stream.
              case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
                key.asString match {
                  case XInfoFields.Length         => streamInfoFull = streamInfoFull.copy(length = value.value)
                  case XInfoFields.RadixTreeNodes => streamInfoFull = streamInfoFull.copy(radixTreeNodes = value.value)
                  case XInfoFields.RadixTreeKeys  => streamInfoFull = streamInfoFull.copy(radixTreeKeys = value.value)
                  case _                          =>
                }
              case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_))
                  if key.asString == XInfoFields.LastGeneratedId =>
                streamInfoFull = streamInfoFull.copy(lastGeneratedId = value.asString)
469 470
              case (key @ RespValue.BulkString(_), value) if key.asString == XInfoFields.Entries =>
                streamInfoFull = streamInfoFull.copy(entries = StreamEntriesOutput[I, K, V]().unsafeDecode(value))
梦境迷离's avatar
梦境迷离 已提交
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584
              case (key @ RespValue.BulkString(_), RespValue.Array(values)) if key.asString == XInfoFields.Groups =>
                // Get the group list of the stream.
                streamInfoFull = streamInfoFull.copy(groups = values.map(extractXInfoFullGroup))
              case _ =>
            }
            pos += 2
          }
          streamInfoFull
        case array @ RespValue.Array(_) =>
          throw ProtocolError(s"$array doesn't have an even number of elements")
        case other =>
          throw ProtocolError(s"$other isn't an array")
      }
    }
  }

  private def extractXInfoFullGroup(group: RespValue): StreamInfoWithFull.ConsumerGroups = {
    var readyGroup = StreamInfoWithFull.ConsumerGroups.empty
    group match {
      case RespValue.Array(groupElements) if groupElements.length % 2 == 0 =>
        var groupElementPos = 0
        while (groupElementPos < groupElements.length) {
          (groupElements(groupElementPos), groupElements(groupElementPos + 1)) match {
            // Get the basic information of the current group.
            case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) =>
              key.asString match {
                case XInfoFields.LastDeliveredId => readyGroup = readyGroup.copy(lastDeliveredId = value.asString)
                case XInfoFields.Name            => readyGroup = readyGroup.copy(name = value.asString)
                case _                           =>
              }
            case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_))
                if XInfoFields.PelCount == key.asString =>
              readyGroup = readyGroup.copy(pelCount = value.value)
            case (key @ RespValue.BulkString(_), RespValue.Array(values)) =>
              // Get the consumer list of the current group.
              key.asString match {
                case XInfoFields.Consumers =>
                  readyGroup = readyGroup.copy(consumers = values.map(extractXInfoFullConsumer))
                case XInfoFields.Pending =>
                  // Get the pel list of the current group.
                  val groupPelList = values.map {
                    case RespValue.Array(
                          Seq(
                            entryId @ RespValue.BulkString(_),
                            consumerName @ RespValue.BulkString(_),
                            deliveryTime @ RespValue.Integer(_),
                            deliveryCount @ RespValue.Integer(_)
                          )
                        ) =>
                      StreamInfoWithFull.GroupPel(
                        entryId.asString,
                        consumerName.asString,
                        deliveryTime.value.millis,
                        deliveryCount.value
                      )
                    case other => throw ProtocolError(s"$other isn't a valid array")
                  }
                  readyGroup = readyGroup.copy(pending = groupPelList)
                case _ =>
              }
            case _ =>
          }
          groupElementPos += 2
        }
        readyGroup
      case array @ RespValue.Array(_) =>
        throw ProtocolError(s"$array doesn't have an even number of elements")
      case other =>
        throw ProtocolError(s"$other isn't an array")
    }
  }

  private def extractXInfoFullConsumer(consumer: RespValue): StreamInfoWithFull.Consumers = {
    var readyConsumer = StreamInfoWithFull.Consumers.empty
    consumer match {
      case RespValue.Array(consumerElements) if consumerElements.length % 2 == 0 =>
        var consumerElementPos = 0
        while (consumerElementPos < consumerElements.length) {
          (consumerElements(consumerElementPos), consumerElements(consumerElementPos + 1)) match {
            // Get the basic information of the current consumer.
            case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) if key.asString == XInfoFields.Name =>
              readyConsumer = readyConsumer.copy(name = value.asString)
            case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
              key.asString match {
                case XInfoFields.PelCount => readyConsumer = readyConsumer.copy(pelCount = value.value)
                case XInfoFields.SeenTime => readyConsumer = readyConsumer.copy(seenTime = value.value.millis)
                case _                    =>
              }
            // Get the pel list of the current consumer.
            case (key @ RespValue.BulkString(_), RespValue.Array(values)) if key.asString == XInfoFields.Pending =>
              val consumerPelList = values.map {
                case RespValue.Array(
                      Seq(
                        entryId @ RespValue.BulkString(_),
                        deliveryTime @ RespValue.Integer(_),
                        deliveryCount @ RespValue.Integer(_)
                      )
                    ) =>
                  StreamInfoWithFull.ConsumerPel(entryId.asString, deliveryTime.value.millis, deliveryCount.value)
                case other => throw ProtocolError(s"$other isn't a valid array")
              }
              readyConsumer = readyConsumer.copy(pending = consumerPelList)
            case _ =>
          }
          consumerElementPos += 2
        }
        readyConsumer
      case array @ RespValue.Array(_) =>
        throw ProtocolError(s"$array doesn't have an even number of elements")
      case other =>
        throw ProtocolError(s"$other isn't an array")
    }
  }

585 586
  final case class StreamInfoOutput[I: Schema: BinaryCodec, K: Schema: BinaryCodec, V: Schema: BinaryCodec]()
      extends Output[StreamInfo[I, K, V]] {
587
    protected def tryDecode(respValue: RespValue): StreamInfo[I, K, V] = {
A
Anatoly Sergeev 已提交
588
      var streamInfo: StreamInfo[I, K, V] = StreamInfo.empty
589 590 591 592 593 594 595 596 597
      respValue match {
        // Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
        case RespValue.Array(elements) if elements.length % 2 == 0 =>
          val len = elements.length
          var pos = 0
          while (pos < len) {
            (elements(pos), elements(pos + 1)) match {
              case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
                if (key.asString == XInfoFields.Length)
598
                  streamInfo = streamInfo.copy(length = value.value)
599
                else if (key.asString == XInfoFields.RadixTreeNodes)
600
                  streamInfo = streamInfo.copy(radixTreeNodes = value.value)
601
                else if (key.asString == XInfoFields.RadixTreeKeys)
602
                  streamInfo = streamInfo.copy(radixTreeKeys = value.value)
603
                else if (key.asString == XInfoFields.Groups)
604
                  streamInfo = streamInfo.copy(groups = value.value)
605 606
              case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_))
                  if key.asString == XInfoFields.LastGeneratedId =>
607
                streamInfo = streamInfo.copy(lastGeneratedId = value.asString)
608 609
              case (key @ RespValue.BulkString(_), value @ RespValue.Array(_)) =>
                if (key.asString == XInfoFields.FirstEntry)
610
                  streamInfo = streamInfo.copy(firstEntry = Some(StreamEntryOutput[I, K, V]().unsafeDecode(value)))
611
                else if (key.asString == XInfoFields.LastEntry)
612
                  streamInfo = streamInfo.copy(lastEntry = Some(StreamEntryOutput[I, K, V]().unsafeDecode(value)))
613 614 615 616 617 618 619 620 621 622
              case _ =>
            }
            pos += 2
          }
          streamInfo
        case array @ RespValue.Array(_) =>
          throw ProtocolError(s"$array doesn't have an even number of elements")

        case other =>
          throw ProtocolError(s"$other isn't an array")
623
      }
624 625
    }
  }
626 627

  case object XPendingOutput extends Output[PendingInfo] {
628
    protected def tryDecode(respValue: RespValue): PendingInfo =
629
      respValue match {
630
        case RespValue.Array(Seq(RespValue.Integer(total), f, l, ps)) =>
A
Aleksandar Novaković 已提交
631 632
          val first = OptionalOutput(MultiStringOutput).unsafeDecode(f)
          val last  = OptionalOutput(MultiStringOutput).unsafeDecode(l)
633

A
Aleksandar Novaković 已提交
634
          val pairs = ps match {
635
            case RespValue.NullArray    => Chunk.empty
636 637
            case RespValue.Array(value) => value
            case other                  => throw ProtocolError(s"$other isn't an array")
A
Aleksandar Novaković 已提交
638 639
          }

640
          val consumers = collection.mutable.Map.empty[String, Long]
641

642
          pairs.foreach {
A
Aleksandar Novaković 已提交
643
            case RespValue.Array(Seq(consumer @ RespValue.BulkString(_), total @ RespValue.BulkString(_))) =>
644
              consumers += (consumer.asString -> total.asLong)
D
Dejan Mijić 已提交
645
            case _ =>
A
Aleksandar Novaković 已提交
646
              throw ProtocolError(s"Consumers doesn't have 2 elements")
647 648
          }

A
Aleksandar Novaković 已提交
649
          PendingInfo(total, first, last, consumers.toMap)
650

651 652
        case array @ RespValue.Array(_) =>
          throw ProtocolError(s"$array doesn't have valid format")
653

D
Dejan Mijić 已提交
654
        case other =>
655 656 657 658 659
          throw ProtocolError(s"$other isn't an array")
      }
  }

  case object PendingMessagesOutput extends Output[Chunk[PendingMessage]] {
660
    protected def tryDecode(respValue: RespValue): Chunk[PendingMessage] =
661 662 663 664 665 666 667 668 669 670 671 672
      respValue match {
        case RespValue.Array(messages) =>
          messages.collect {
            case RespValue.Array(
                  Seq(
                    id @ RespValue.BulkString(_),
                    owner @ RespValue.BulkString(_),
                    RespValue.Integer(lastDelivered),
                    RespValue.Integer(counter)
                  )
                ) =>
              PendingMessage(id.asString, owner.asString, lastDelivered.millis, counter)
673 674
            case other =>
              throw ProtocolError(s"$other isn't an array with four elements")
675
          }
676

D
Dejan Mijić 已提交
677
        case other =>
678 679 680 681 682
          throw ProtocolError(s"$other isn't an array")
      }
  }

  case object SetOutput extends Output[Boolean] {
683
    protected def tryDecode(respValue: RespValue): Boolean =
684
      respValue match {
685
        case RespValue.NullBulkString  => false
686 687 688 689
        case RespValue.SimpleString(_) => true
        case other                     => throw ProtocolError(s"$other isn't a valid set response")
      }
  }
690

B
barthorre 已提交
691
  case object StrAlgoLcsOutput extends Output[LcsOutput] {
692
    protected def tryDecode(respValue: RespValue): LcsOutput =
B
barthorre 已提交
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
      respValue match {
        case result @ RespValue.BulkString(_) => LcsOutput.Lcs(result.asString)
        case RespValue.Integer(length)        => LcsOutput.Length(length)
        case RespValue.ArrayValues(
              RespValue.BulkString(_),
              RespValue.Array(items),
              RespValue.BulkString(_),
              RespValue.Integer(length)
            ) =>
          val matches = items.map {
            case RespValue.Array(array) =>
              val matchIdxs = array.collect { case RespValue.Array(values) =>
                val idxs = values.map {
                  case RespValue.Integer(value) => value
                  case other                    => throw ProtocolError(s"$other isn't a valid response")
                }
                if (idxs.size == 2)
                  MatchIdx(idxs.head, idxs(1))
                else throw ProtocolError(s"Response contains illegal number of indices for a match: ${idxs.size}")
              }
              val matchLength = array.collectFirst { case RespValue.Integer(value) => value }
              Match(matchIdxs(0), matchIdxs(1), matchLength)
            case other => throw ProtocolError(s"$other isn't a valid response")
          }
          LcsOutput.Matches(matches.toList, length)
        case other => throw ProtocolError(s"$other isn't a valid set response")
      }
  }

722
  private def decodeDouble(bytes: Chunk[Byte]): Double = {
D
Dejan Mijić 已提交
723
    val text = new String(bytes.toArray, StandardCharsets.UTF_8)
724 725 726 727 728
    try text.toDouble
    catch {
      case _: NumberFormatException => throw ProtocolError(s"'$text' isn't a double.")
    }
  }
729

730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
  case object ClientInfoOutput extends Output[ClientInfo] {
    protected def tryDecode(respValue: RespValue): ClientInfo =
      respValue match {
        case RespValue.BulkString(s) => ClientInfo.from(s.asString)
        case other                   => throw ProtocolError(s"$other isn't a bulk string")
      }
  }

  case object ClientListOutput extends Output[Chunk[ClientInfo]] {
    protected def tryDecode(respValue: RespValue): Chunk[ClientInfo] =
      respValue match {
        case RespValue.BulkString(s) => ClientInfo.from(s.asString.split("\r\n").filter(_.nonEmpty))
        case other                   => throw ProtocolError(s"$other isn't a bulk string")
      }
  }

746
  case object ClientTrackingInfoOutput extends Output[ClientTrackingInfo] {
747
    protected def tryDecode(respValue: RespValue): ClientTrackingInfo =
748
      respValue match {
749
        case RespValue.NullArray => throw ProtocolError("Array must not be empty")
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
        case RespValue.Array(values) if values.length % 2 == 0 =>
          val fields = values.toList
            .grouped(2)
            .map {
              case (bulk @ RespValue.BulkString(_)) :: value :: Nil => (bulk.asString, value)
              case other                                            => throw ProtocolError(s"$other isn't a valid format")
            }
            .toMap
          ClientTrackingInfo(
            fields
              .get("flags")
              .fold(throw ProtocolError("Missing flags field")) {
                case RespValue.Array(value) =>
                  val set = value.map {
                    case bulk @ RespValue.BulkString(_) => bulk.asString
                    case other                          => throw ProtocolError(s"$other isn't a string")
                  }.toSet
                  ClientTrackingFlags(
                    set.contains("on"),
                    set match {
                      case s if s.contains("optin")  => Some(ClientTrackingMode.OptIn)
                      case s if s.contains("optout") => Some(ClientTrackingMode.OptOut)
                      case s if s.contains("bcast")  => Some(ClientTrackingMode.Broadcast)
                      case _                         => None
                    },
                    set.contains("noloop"),
                    set match {
                      case s if s.contains("caching-yes") => Some(true)
                      case s if s.contains("caching-no")  => Some(false)
                      case _                              => None
                    },
                    set.contains("broken_redirect")
                  )
                case other => throw ProtocolError(s"$other isn't an array with elements")
              },
            fields
              .get("redirect")
              .fold(throw ProtocolError("Missing redirect field")) {
                case RespValue.Integer(-1L)         => ClientTrackingRedirect.NotEnabled
                case RespValue.Integer(0L)          => ClientTrackingRedirect.NotRedirected
                case RespValue.Integer(v) if v > 0L => ClientTrackingRedirect.RedirectedTo(v)
                case other                          => throw ProtocolError(s"$other isn't an integer >= -1")
              },
            fields
              .get("prefixes")
              .fold(throw ProtocolError("Missing prefixes field")) {
                case RespValue.NullArray => Set.empty[String]
                case RespValue.Array(value) =>
                  value.map {
                    case bulk @ RespValue.BulkString(_) => bulk.asString
                    case other                          => throw ProtocolError(s"$other isn't a string")
                  }.toSet[String]
                case other => throw ProtocolError(s"$other isn't an array")
              }
          )
        case array @ RespValue.Array(_) => throw ProtocolError(s"$array doesn't have an even number of elements")
        case other                      => throw ProtocolError(s"$other isn't an array")
      }
  }

  case object ClientTrackingRedirectOutput extends Output[ClientTrackingRedirect] {
811
    protected def tryDecode(respValue: RespValue): ClientTrackingRedirect =
812 813 814 815 816 817 818
      respValue match {
        case RespValue.Integer(-1L)         => ClientTrackingRedirect.NotEnabled
        case RespValue.Integer(0L)          => ClientTrackingRedirect.NotRedirected
        case RespValue.Integer(v) if v > 0L => ClientTrackingRedirect.RedirectedTo(v)
        case other                          => throw ProtocolError(s"$other isn't an integer >= -1")
      }
  }
A
Anatoly Sergeev 已提交
819 820

  case object ClusterPartitionOutput extends Output[Partition] {
821
    protected def tryDecode(respValue: RespValue): Partition =
A
Anatoly Sergeev 已提交
822 823 824 825 826 827 828 829 830 831 832 833 834
      respValue match {
        case RespValue.NullArray => throw ProtocolError(s"Array must not be empty")
        case RespValue.Array(values) =>
          val start  = LongOutput.unsafeDecode(values(0))
          val end    = LongOutput.unsafeDecode(values(1))
          val master = ClusterPartitionNodeOutput.unsafeDecode(values(2))
          val slaves = values.drop(3).map(ClusterPartitionNodeOutput.unsafeDecode)
          Partition(SlotRange(start, end), master, slaves)
        case other => throw ProtocolError(s"$other isn't an array")
      }
  }

  case object ClusterPartitionNodeOutput extends Output[Node] {
835
    protected def tryDecode(respValue: RespValue): Node =
A
Anatoly Sergeev 已提交
836 837 838 839 840 841 842 843 844 845
      respValue match {
        case RespValue.NullArray => throw ProtocolError(s"Array must not be empty")
        case RespValue.Array(values) =>
          val host   = MultiStringOutput.unsafeDecode(values(0))
          val port   = LongOutput.unsafeDecode(values(1))
          val nodeId = MultiStringOutput.unsafeDecode(values(2))
          Node(nodeId, RedisUri(host, port.toInt))
        case other => throw ProtocolError(s"$other isn't an array")
      }
  }
D
Dejan Mijić 已提交
846
}