Skip to content

Commit

Permalink
Merge pull request #6 from chessman/handle-watch-errors
Browse files Browse the repository at this point in the history
parse error events in Watch API
  • Loading branch information
hagay3 authored Apr 22, 2021
2 parents 9cf1703 + 45dfdc7 commit 397fcf9
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
2 changes: 2 additions & 0 deletions client/src/main/scala/skuber/api/client/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ package object client {
}
}

type WatchEventWrapper[T <: ObjectResource] = Either[Status, WatchEvent[T]]

// for use with the Watch command
case class WatchEvent[T <: ObjectResource](_type: EventType.Value, _object: T)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ import scala.concurrent.ExecutionContext
*/
private[api] object BytesToWatchEventSource {
def apply[O <: ObjectResource](bytesSource: Source[ByteString, _], bufSize: Int)(implicit ec: ExecutionContext, format: Format[O]): Source[WatchEvent[O], _] = {
import skuber.json.format.apiobj.watchEventFormat
import skuber.json.format.apiobj.watchEventWrapperReads
bytesSource.via(
JsonFraming.objectScanner(bufSize)
).map { singleEventBytes =>
Json.parse(singleEventBytes.utf8String).validate(watchEventFormat[O]) match {
case JsSuccess(value, _) => value
case JsError(e) => throw new K8SException(Status(message = Some("Error parsing watched object"), details = Some(e.toString)))
Json.parse(singleEventBytes.utf8String).validate(watchEventWrapperReads[O]) match {
case JsSuccess(value, _) => value match {
case Left(status) => throw new K8SException(status)
case Right(watchEvent) => watchEvent
}
case JsError(e) =>
val details = s"error: $e event: ${singleEventBytes.utf8String}"
throw new K8SException(Status(message = Some("Error parsing watched object"), details = Some(details)))
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions client/src/main/scala/skuber/json/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1159,10 +1159,14 @@ package object format {
(JsPath \ "code").readNullable[Int]
)(Status.apply _)

def watchEventFormat[T <: ObjectResource](implicit objfmt: Format[T]) : Format[WatchEvent[T]] = (
(JsPath \ "type").formatEnum(EventType) and
(JsPath \ "object").format[T]
)(WatchEvent.apply[T] _, unlift(WatchEvent.unapply[T]))
def watchEventWrapperReads[T <: ObjectResource](implicit objreads: Reads[T]) : Reads[WatchEventWrapper[T]] = (
(JsPath \ "type").formatEnum(EventType).flatMap { eventType =>
if (eventType == EventType.ERROR)
(JsPath \ "object").read[Status].map(status => Left(status))
else
(JsPath \ "object").read[T].map(obj => Right(WatchEvent[T](eventType, obj)))
}
)

}

Expand Down

0 comments on commit 397fcf9

Please sign in to comment.