Skip to content

Commit

Permalink
chore: Replace discovery reportError and healthCheck with spi
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 16, 2024
1 parent 4365544 commit 0c68974
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 72 deletions.
72 changes: 3 additions & 69 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/DiscoveryImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ import kalix.protocol.action.Actions
import kalix.protocol.discovery._
import org.slf4j.LoggerFactory
import java.util
import java.util.Locale
import java.util.UUID

import scala.concurrent.Future
import scala.io.Source
import scala.jdk.CollectionConverters._

/**
Expand All @@ -36,8 +34,6 @@ class DiscoveryImpl(

private val log = LoggerFactory.getLogger(getClass)

private val userServiceLog = LoggerFactory.getLogger("akka.javasdk.ServiceLog")

private val applicationConfig = ApplicationConfig(system).getConfig

private val serviceIncarnationUuid = UUID.randomUUID().toString
Expand Down Expand Up @@ -133,73 +129,11 @@ class DiscoveryImpl(
}
}

/**
* Report an error back to the user function. This will only be invoked to tell the user function that it has done
* something wrong, eg, violated the protocol, tried to use an entity type that isn't supported, or attempted to
* forward to an entity that doesn't exist, etc. These messages should be logged clearly for debugging purposes.
*/
override def reportError(in: UserFunctionError): scala.concurrent.Future[com.google.protobuf.empty.Empty] = {
val sourceMsgs = in.sourceLocations.map { location =>
loadSource(location) match {
case None if location.startLine == 0 && location.startCol == 0 =>
s"At ${location.fileName}"
case None =>
s"At ${location.fileName}:${location.startLine + 1}:${location.startCol + 1}"
case Some(source) =>
s"At ${location.fileName}:${location.startLine + 1}:${location.startCol + 1}:${"\n"}$source"
}
}.toList
val severityString = in.severity.name.take(1) + in.severity.name.drop(1).toLowerCase(Locale.ROOT)
val message = s"$severityString reported from Akka runtime: ${in.code} ${in.message}"
val detail = if (in.detail.isEmpty) Nil else List(in.detail)
val seeDocs = DocLinks.forErrorCode(in.code).map(link => s"See documentation: $link").toList
val messages = message :: detail ::: seeDocs ::: sourceMsgs
val logMessage = messages.mkString("\n\n")

// ignoring waring for runtime version
// TODO: remove it once we remove this check in the runtime
if (in.code != "AK-00010") {
in.severity match {
case UserFunctionError.Severity.ERROR => userServiceLog.error(logMessage)
case UserFunctionError.Severity.WARNING => userServiceLog.warn(logMessage)
case UserFunctionError.Severity.INFO => userServiceLog.info(logMessage)
case UserFunctionError.Severity.UNSPECIFIED | UserFunctionError.Severity.Unrecognized(_) =>
userServiceLog.error(logMessage)
}
}

Future.successful(com.google.protobuf.empty.Empty.defaultInstance)
}
override def reportError(in: UserFunctionError): scala.concurrent.Future[com.google.protobuf.empty.Empty] =
throw new IllegalStateException("Unexpected call to Discovery.reportError, should use SpiComponents.reportError")

override def healthCheck(in: Empty): Future[HealthCheckResponse] =
Future.successful(HealthCheckResponse(serviceIncarnationUuid))

private def loadSource(location: UserFunctionError.SourceLocation): Option[String] =
if (location.endLine == 0 && location.endCol == 0) {
// It's been sent without line/col data
None
} else {
val resourceStream = getClass.getClassLoader.getResourceAsStream(location.fileName)
if (resourceStream != null) {
val lines = Source
.fromInputStream(resourceStream, "utf-8")
.getLines()
.slice(location.startLine, location.endLine + 1)
.take(6) // Don't render more than 6 lines, we don't want to fill the logs too much
.toList
if (lines.size > 1) {
Some(lines.mkString("\n"))
} else {
lines.headOption
.map { line =>
line + "\n" + line.take(location.startCol).map {
case '\t' => '\t'
case _ => ' '
} + "^"
}
}
} else None
}
throw new IllegalStateException("Unexpected call to Discovery.healthCheck, should use SpiComponents.healthCheck")

override def proxyTerminated(in: Empty): Future[Empty] =
Future.successful(Empty.defaultInstance)
Expand Down
31 changes: 28 additions & 3 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.javasdk.impl
import java.lang.reflect.Constructor
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.Locale
import java.util.concurrent.CompletionStage

import scala.annotation.nowarn
Expand Down Expand Up @@ -93,8 +94,19 @@ import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.{ Context => OtelContext }
import kalix.protocol.discovery.Discovery
import org.slf4j.Logger
import org.slf4j.LoggerFactory

/**
* INTERNAL API
*/
@InternalApi
object SdkRunner {
val userServiceLog: Logger = LoggerFactory.getLogger("akka.javasdk.ServiceLog")

val FutureDone: Future[Done] = Future.successful(Done)
}

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -646,11 +658,24 @@ private final class Sdk(
override def workflowDescriptors: Seq[WorkflowDescriptor] =
Sdk.this.workflowDescriptors

override def reportError(err: UserFunctionError): Future[Done] =
Future.successful(Done) // FIXME implemented in other PR
override def reportError(err: UserFunctionError): Future[Done] = {
val severityString = err.severity.name.take(1) + err.severity.name.drop(1).toLowerCase(Locale.ROOT)
val message = s"$severityString reported from Akka runtime: ${err.code} ${err.message}"
val detail = if (err.detail.isEmpty) Nil else List(err.detail)
val seeDocs = DocLinks.forErrorCode(err.code).map(link => s"See documentation: $link").toList
val messages = message :: detail ::: seeDocs
val logMessage = messages.mkString("\n\n")

// ignoring waring for runtime version
// TODO: remove it once we remove this check in the runtime
if (err.code != "AK-00010") {
SdkRunner.userServiceLog.atLevel(err.severity).log(logMessage)
}
SdkRunner.FutureDone
}

override def healthCheck(): Future[Done] =
Future.successful(Done) // FIXME implemented in other PR
SdkRunner.FutureDone
}
}

Expand Down

0 comments on commit 0c68974

Please sign in to comment.