Skip to content

Commit

Permalink
Add support for accessing pod logs (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
doriordan authored May 18, 2018
1 parent bfd29f8 commit de6179e
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 11 deletions.
8 changes: 4 additions & 4 deletions client/src/it/scala/skuber/PodSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import skuber.json.format._

import scala.concurrent.duration._
import scala.concurrent.Await
import scala.util.{Success,Failure}
import scala.util.{Failure, Success, Try}

import akka.event.Logging
import akka.stream.scaladsl._
import akka.util.ByteString

import akka.stream.scaladsl.Sink

import scala.concurrent.{Future}
import scala.concurrent.Future

class PodSpec extends K8SFixture with Eventually with Matchers {
val nginxPodName: String = java.util.UUID.randomUUID().toString
Expand Down
1 change: 1 addition & 0 deletions client/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
skuber {

akka {
# The ID of the dispatcher to use by Skuber. If undefined or empty the default Akka dispatcher is used.
dispatcher = ""
Expand Down
25 changes: 25 additions & 0 deletions client/src/main/scala/skuber/Pod.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,29 @@ object Pod {
override val name: String = "PreferNoSchedule"
}
}

case class LogQueryParams(
containerName: Option[String] = None,
follow: Option[Boolean] = None,
limitBytes: Option[Int] = None,
pretty: Option[Boolean] = None,
previous: Option[Boolean] = None,
sinceSeconds: Option[Int] = None,
tailLines: Option[Int] = None,
timestamps: Option[Boolean] = None)
{
lazy val asOptionalsMap: Map[String, Option[String]] = Map(
"containerName" -> containerName,
"follow" -> follow.map(_.toString),
"limitBytes" -> limitBytes.map(_.toString),
"pretty" -> pretty.map(_.toString),
"previous" -> previous.map(_.toString),
"sinceSeconds" -> sinceSeconds.map(_.toString),
"tailLines" -> tailLines.map(_.toString),
"timestamps" -> timestamps.map(_.toString))

lazy val asMap: Map[String, String] = asOptionalsMap.collect {
case (key, Some(value)) => key -> value
}
}
}
19 changes: 18 additions & 1 deletion client/src/main/scala/skuber/api/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,23 @@ package object client {
} yield ()
}

def getPodLogSource(name: String, queryParams: Pod.LogQueryParams, namespace: Option[String] = None)(
implicit lc: LoggingContext=RequestLoggingContext()): Future[Source[ByteString, _]] =
{
val targetNamespace=namespace.getOrElse(this.namespaceName)
val queryMap=queryParams.asMap
val query: Option[Uri.Query] = if (queryMap.isEmpty) {
None
} else {
Some(Uri.Query(queryMap))
}
val nameComponent=s"${name}/log"
val rd = implicitly[ResourceDefinition[Pod]]
val request=buildRequest(HttpMethods.GET, rd, Some(nameComponent), query, false, targetNamespace)
invoke(request).map { response =>
response.entity.dataBytes
}
}

def watch[O <: ObjectResource](obj: O)(
implicit fmt: Format[O], rd: ResourceDefinition[O]): Future[Source[WatchEvent[O], _]] =
Expand Down Expand Up @@ -748,7 +765,7 @@ package object client {
new RequestContext(requestMaker, requestInvoker, k8sContext.cluster.server, k8sContext.authInfo, theNamespaceName, logConfig, closeHook)
}

private def defaultK8sConfig: Configuration = {
def defaultK8sConfig: Configuration = {
import java.nio.file.Paths

val skuberUrlOverride = sys.env.get("SKUBER_URL")
Expand Down
2 changes: 1 addition & 1 deletion examples/application.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
loglevel = "OFF"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
debug {
Expand Down
4 changes: 2 additions & 2 deletions examples/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
</appender>

<logger name="skuber.api" level="INFO"/>
<logger name="akka" level="DEBUG" />
<logger name="akka" level="ERROR" />

<root level="DEBUG">
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
</configuration>
3 changes: 2 additions & 1 deletion examples/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
stdout-loglevel = "OFF"
loglevel = "OFF"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
debug {
Expand Down
4 changes: 2 additions & 2 deletions examples/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
</appender>

<logger name="skuber.api" level="INFO"/>
<logger name="akka" level="INFO" />
<logger name="akka" level="ERROR"/>

<root level="INFO">
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package skuber.examples.podlogs

import akka.NotUsed
import skuber._
import skuber.json.format._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import skuber.api.client

import scala.concurrent.Await
import scala.concurrent.duration._

/**
* @author David O'Riordan
*
* Demonstrate streaming of pod logs, in this example the log is just a couple of short line printed to stdout
*/
object PodLogExample extends App {

val printLogFlow: Sink[ByteString, NotUsed] = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.to(Sink.foreach(text => println(s"[hello-world logs] $text")))


implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher
val k8s = client.init(
client.defaultK8sConfig.currentContext,
client.LoggingConfig(logRequestBasic = false, logResponseBasic = false) )

val helloWorldContainer=Container(name="hello-world", image="busybox", command=List("sh", "-c", "echo Hello World! && echo Goodbye World && sleep 60"))
val helloWorldPod=Pod("hello-world", Pod.Spec().addContainer(helloWorldContainer))

val podFut = k8s.create(helloWorldPod)
println("Waiting 30 seconds to allow pod initialisation to complete before getting logs...")
Thread.sleep(30000)
for {
pod <- podFut
logsSource <- k8s.getPodLogSource("hello-world", Pod.LogQueryParams())
donePrinting = logsSource.runWith(printLogFlow)
} yield donePrinting

// allow another 5 seconds for logs to be streamed from the pod to stdout before cleaning up
Thread.sleep(5000)
Await.result(k8s.delete[Pod]("hello-world"), 5 seconds)
k8s.close
system.terminate
System.exit(0)
}

0 comments on commit de6179e

Please sign in to comment.