diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/DiscoveryImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/DiscoveryImpl.scala index 3069e7cd2..8f42d6e7e 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/DiscoveryImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/DiscoveryImpl.scala @@ -14,11 +14,9 @@ import kalix.protocol.action.Actions import kalix.protocol.discovery._ import org.slf4j.LoggerFactory import java.util -import java.util.Locale import java.util.UUID import scala.concurrent.Future -import scala.io.Source import scala.jdk.CollectionConverters._ /** @@ -36,8 +34,6 @@ class DiscoveryImpl( private val log = LoggerFactory.getLogger(getClass) - private val userServiceLog = LoggerFactory.getLogger("akka.javasdk.ServiceLog") - private val applicationConfig = ApplicationConfig(system).getConfig private val serviceIncarnationUuid = UUID.randomUUID().toString @@ -133,73 +129,11 @@ class DiscoveryImpl( } } - /** - * Report an error back to the user function. This will only be invoked to tell the user function that it has done - * something wrong, eg, violated the protocol, tried to use an entity type that isn't supported, or attempted to - * forward to an entity that doesn't exist, etc. These messages should be logged clearly for debugging purposes. - */ - override def reportError(in: UserFunctionError): scala.concurrent.Future[com.google.protobuf.empty.Empty] = { - val sourceMsgs = in.sourceLocations.map { location => - loadSource(location) match { - case None if location.startLine == 0 && location.startCol == 0 => - s"At ${location.fileName}" - case None => - s"At ${location.fileName}:${location.startLine + 1}:${location.startCol + 1}" - case Some(source) => - s"At ${location.fileName}:${location.startLine + 1}:${location.startCol + 1}:${"\n"}$source" - } - }.toList - val severityString = in.severity.name.take(1) + in.severity.name.drop(1).toLowerCase(Locale.ROOT) - val message = s"$severityString reported from Akka runtime: ${in.code} ${in.message}" - val detail = if (in.detail.isEmpty) Nil else List(in.detail) - val seeDocs = DocLinks.forErrorCode(in.code).map(link => s"See documentation: $link").toList - val messages = message :: detail ::: seeDocs ::: sourceMsgs - val logMessage = messages.mkString("\n\n") - - // ignoring waring for runtime version - // TODO: remove it once we remove this check in the runtime - if (in.code != "AK-00010") { - in.severity match { - case UserFunctionError.Severity.ERROR => userServiceLog.error(logMessage) - case UserFunctionError.Severity.WARNING => userServiceLog.warn(logMessage) - case UserFunctionError.Severity.INFO => userServiceLog.info(logMessage) - case UserFunctionError.Severity.UNSPECIFIED | UserFunctionError.Severity.Unrecognized(_) => - userServiceLog.error(logMessage) - } - } - - Future.successful(com.google.protobuf.empty.Empty.defaultInstance) - } + override def reportError(in: UserFunctionError): scala.concurrent.Future[com.google.protobuf.empty.Empty] = + throw new IllegalStateException("Unexpected call to Discovery.reportError, should use SpiComponents.reportError") override def healthCheck(in: Empty): Future[HealthCheckResponse] = - Future.successful(HealthCheckResponse(serviceIncarnationUuid)) - - private def loadSource(location: UserFunctionError.SourceLocation): Option[String] = - if (location.endLine == 0 && location.endCol == 0) { - // It's been sent without line/col data - None - } else { - val resourceStream = getClass.getClassLoader.getResourceAsStream(location.fileName) - if (resourceStream != null) { - val lines = Source - .fromInputStream(resourceStream, "utf-8") - .getLines() - .slice(location.startLine, location.endLine + 1) - .take(6) // Don't render more than 6 lines, we don't want to fill the logs too much - .toList - if (lines.size > 1) { - Some(lines.mkString("\n")) - } else { - lines.headOption - .map { line => - line + "\n" + line.take(location.startCol).map { - case '\t' => '\t' - case _ => ' ' - } + "^" - } - } - } else None - } + throw new IllegalStateException("Unexpected call to Discovery.healthCheck, should use SpiComponents.healthCheck") override def proxyTerminated(in: Empty): Future[Empty] = Future.successful(Empty.defaultInstance) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index 13235dc9d..1c6998f29 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -7,6 +7,7 @@ package akka.javasdk.impl import java.lang.reflect.Constructor import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method +import java.util.Locale import java.util.concurrent.CompletionStage import scala.annotation.nowarn @@ -93,8 +94,19 @@ import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.Tracer import io.opentelemetry.context.{ Context => OtelContext } import kalix.protocol.discovery.Discovery +import org.slf4j.Logger import org.slf4j.LoggerFactory +/** + * INTERNAL API + */ +@InternalApi +object SdkRunner { + val userServiceLog: Logger = LoggerFactory.getLogger("akka.javasdk.ServiceLog") + + val FutureDone: Future[Done] = Future.successful(Done) +} + /** * INTERNAL API */ @@ -646,11 +658,24 @@ private final class Sdk( override def workflowDescriptors: Seq[WorkflowDescriptor] = Sdk.this.workflowDescriptors - override def reportError(err: UserFunctionError): Future[Done] = - Future.successful(Done) // FIXME implemented in other PR + override def reportError(err: UserFunctionError): Future[Done] = { + val severityString = err.severity.name.take(1) + err.severity.name.drop(1).toLowerCase(Locale.ROOT) + val message = s"$severityString reported from Akka runtime: ${err.code} ${err.message}" + val detail = if (err.detail.isEmpty) Nil else List(err.detail) + val seeDocs = DocLinks.forErrorCode(err.code).map(link => s"See documentation: $link").toList + val messages = message :: detail ::: seeDocs + val logMessage = messages.mkString("\n\n") + + // ignoring waring for runtime version + // TODO: remove it once we remove this check in the runtime + if (err.code != "AK-00010") { + SdkRunner.userServiceLog.atLevel(err.severity).log(logMessage) + } + SdkRunner.FutureDone + } override def healthCheck(): Future[Done] = - Future.successful(Done) // FIXME implemented in other PR + SdkRunner.FutureDone } }