Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution intermediate errors #79

Merged
merged 7 commits into from
Jul 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ object SinkHandler {
// Log the mapping and invalid input errors
if (numOfNotMapped > 0 || numOfInvalids > 0) {
mappingErrors.union(invalidInputs).foreach(r =>
logger.warn(r.toLogstashMarker, r.toString)
logger.warn(r.copy(executionId = Some(mappingJobExecution.id)).toLogstashMarker,
r.copy(executionId = Some(mappingJobExecution.id)).toString)
)
}
if (numOfNotWritten > 0)
notWrittenResources.forEach(r =>
logger.warn(r.toLogstashMarker, r.toString)
logger.warn(r.copy(executionId = Some(mappingJobExecution.id)).toLogstashMarker,
r.copy(executionId = Some(mappingJobExecution.id)).toString)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class FhirMappingJobManager(

val mappedResourcesDf =
mappingJobExecution.mappingTasks
.map(t => Await.result(readSourceAndExecuteTask(mappingJobExecution.jobId, t, sourceSettings, terminologyServiceSettings, identityServiceSettings), Duration.Inf))
.map(t => Await.result(readSourceAndExecuteTask(mappingJobExecution.jobId, t, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id)), Duration.Inf))
.reduce((ts1, ts2) => ts1.union(ts2))

SinkHandler.writeStream(spark, mappingJobExecution, mappedResourcesDf, fhirWriter)
Expand Down Expand Up @@ -207,7 +207,7 @@ class FhirMappingJobManager(
): Future[Unit] = {
val fhirWriter = FhirWriterFactory.apply(sinkSettings)

readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings)
readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id))
.map {
dataset =>
SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingJobExecution.mappingTasks.head.mappingRef), dataset, fhirWriter)
Expand All @@ -223,17 +223,19 @@ class FhirMappingJobManager(
* @param terminologyServiceSettings Terminology service settings
* @param identityServiceSettings Identity service settings
* @param timeRange Time range for the source data to load
* @param executionId Id of FhirMappingJobExecution object
* @return
*/
private def readSourceAndExecuteTask(jobId: String,
task: FhirMappingTask,
sourceSettings: Map[String, DataSourceSettings],
terminologyServiceSettings: Option[TerminologyServiceSettings] = None,
identityServiceSettings: Option[IdentityServiceSettings] = None,
timeRange: Option[(LocalDateTime, LocalDateTime)] = None
timeRange: Option[(LocalDateTime, LocalDateTime)] = None,
executionId: Option[String] = None
): Future[Dataset[FhirMappingResult]] = {
val (fhirMapping, mds, df) = readJoinSourceData(task, sourceSettings, timeRange)
executeTask(jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings)
executeTask(jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, executionId)
}

/**
Expand Down Expand Up @@ -264,11 +266,11 @@ class FhirMappingJobManager(
//If not specify run it as single batch
case None =>
logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.jobId} ...")
readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange) // Retrieve the source data and execute the mapping
readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id)) // Retrieve the source data and execute the mapping
.map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter
case Some(batchSize) if sizeOfDf < batchSize =>
logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.jobId} ...")
readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange) // Retrieve the source data and execute the mapping
readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id)) // Retrieve the source data and execute the mapping
.map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter
//Otherwise divide the data into batches
case Some(batchSize) =>
Expand All @@ -279,7 +281,7 @@ class FhirMappingJobManager(
.zipWithIndex
.foldLeft(Future.apply(())) {
case (fj, (df, i)) => fj.flatMap(_ =>
executeTask(mappingJobExecution.jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings)
executeTask(mappingJobExecution.jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id))
.map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter))
.map(_ => logger.debug(s"Batch ${i + 1} is completed for mapping ${mappingTask.mappingRef} within MappingJob: ${mappingJobExecution.jobId}..."))
)
Expand Down Expand Up @@ -348,6 +350,7 @@ class FhirMappingJobManager(
* @param mainSourceSettings Main source data settings
* @param terminologyServiceSettings Terminology service settings
* @param identityServiceSettings Identity service settings
* @param executionId Id of FhirMappingJobExecution object
* @return
*/
def executeTask(jobId: String,
Expand All @@ -356,6 +359,7 @@ class FhirMappingJobManager(
mainSourceSettings: DataSourceSettings,
terminologyServiceSettings: Option[TerminologyServiceSettings] = None,
identityServiceSettings: Option[IdentityServiceSettings] = None,
executionId: Option[String] = None
): Future[Dataset[FhirMappingResult]] = {
//Load the contextual data for the mapping
Future
Expand All @@ -369,7 +373,7 @@ class FhirMappingJobManager(
val configurationContext = mainSourceSettings.toConfigurationContext
//Construct the mapping service
val fhirMappingService = new FhirMappingService(jobId, fhirMapping.url, fhirMapping.source.map(_.alias), (loadedContextMap :+ configurationContext).toMap, fhirMapping.mapping, fhirMapping.variable, terminologyServiceSettings, identityServiceSettings, functionLibraries)
MappingTaskExecutor.executeMapping(spark, df, fhirMappingService, mappingErrorHandlingType)
MappingTaskExecutor.executeMapping(spark, df, fhirMappingService, mappingErrorHandlingType, executionId)
})
}

Expand Down Expand Up @@ -433,7 +437,7 @@ class FhirMappingJobManager(
terminologyServiceSettings: Option[TerminologyServiceSettings] = None,
identityServiceSettings: Option[IdentityServiceSettings] = None,
): Future[Seq[FhirMappingResult]] = {
readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings)
readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id))
.map { dataFrame =>
dataFrame
.collect() // Collect into an Array[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ object MappingTaskExecutor {
* @param spark Spark session
* @param df DataFrame to be mapped
* @param fhirMappingService Mapping service for a specific FhirMapping together with contextual data and mapping scripts
* @param executionId Id of FhirMappingJobExecution object
* @return
*/
def executeMapping(spark: SparkSession, df: DataFrame, fhirMappingService: FhirMappingService, errorHandlingType: ErrorHandlingType): Dataset[FhirMappingResult] = {
def executeMapping(spark: SparkSession, df: DataFrame, fhirMappingService: FhirMappingService, errorHandlingType: ErrorHandlingType, executionId: Option[String] = None): Dataset[FhirMappingResult] = {
fhirMappingService.sources match {
case Seq(_) => executeMappingOnSingleSource(spark, df, fhirMappingService, errorHandlingType)
case Seq(_) => executeMappingOnSingleSource(spark, df, fhirMappingService, errorHandlingType, executionId)
//Executing on multiple sources
case oth => executeMappingOnMultipleSources(spark, df, fhirMappingService, oth, errorHandlingType)
case oth => executeMappingOnMultipleSources(spark, df, fhirMappingService, oth, errorHandlingType, executionId)
}
}

Expand All @@ -55,12 +56,14 @@ object MappingTaskExecutor {
* @param spark Spark session
* @param df DataFrame to be mapped
* @param fhirMappingService Mapping service for a specific FhirMapping together with contextual data and mapping scripts
* @param executionId Id of FhirMappingJobExecution object
* @return
*/
private def executeMappingOnSingleSource(spark: SparkSession,
df: DataFrame,
fhirMappingService: FhirMappingService,
errorHandlingType: ErrorHandlingType): Dataset[FhirMappingResult] = {
errorHandlingType: ErrorHandlingType,
executionId: Option[String] = None): Dataset[FhirMappingResult] = {
import spark.implicits._
val result =
df
Expand All @@ -72,7 +75,7 @@ object MappingTaskExecutor {

Option(row.getAs[String](SourceHandler.INPUT_VALIDITY_ERROR)) match {
//If input is valid
case None => executeMappingOnInput(jo, Map.empty[String, JValue], fhirMappingService, errorHandlingType)
case None => executeMappingOnInput(jo, Map.empty[String, JValue], fhirMappingService, errorHandlingType, executionId)
//If the input is not valid, return the error
case Some(validationError) =>
Seq(FhirMappingResult(
Expand All @@ -84,7 +87,8 @@ object MappingTaskExecutor {
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.INVALID_INPUT,
description = validationError
))
)),
executionId = executionId
))
}
})
Expand All @@ -95,7 +99,8 @@ object MappingTaskExecutor {
df: DataFrame,
fhirMappingService: FhirMappingService,
sources: Seq[String],
errorHandlingType: ErrorHandlingType): Dataset[FhirMappingResult] = {
errorHandlingType: ErrorHandlingType,
executionId: Option[String] = None): Dataset[FhirMappingResult] = {
import spark.implicits._
val result =
df
Expand Down Expand Up @@ -132,7 +137,7 @@ object MappingTaskExecutor {

validationErrors match {
//If input is valid
case Nil => executeMappingOnInput(jo, otherObjectMap, fhirMappingService, errorHandlingType)
case Nil => executeMappingOnInput(jo, otherObjectMap, fhirMappingService, errorHandlingType, executionId)
//If the input is not valid, return the error
case _ =>
Seq(FhirMappingResult(
Expand All @@ -144,7 +149,8 @@ object MappingTaskExecutor {
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.INVALID_INPUT,
description = validationErrors.mkString("\n")
))
)),
executionId = executionId
))
}
})
Expand All @@ -157,12 +163,14 @@ object MappingTaskExecutor {
* @param jo Input object
* @param fhirMappingService Mapping service
* @param errorHandlingType Error handling type
* @param executionId Id of FhirMappingJobExecution object
* @return
*/
private def executeMappingOnInput(jo: JObject,
otherInputs: Map[String, JValue],
fhirMappingService: FhirMappingService,
errorHandlingType: ErrorHandlingType): Seq[FhirMappingResult] = {
errorHandlingType: ErrorHandlingType,
executionId: Option[String] = None): Seq[FhirMappingResult] = {

val results =
try {
Expand All @@ -177,7 +185,8 @@ object MappingTaskExecutor {
timestamp = Timestamp.from(Instant.now()),
source = Some(jo.toJson),
mappedResource = Some(JArray(resources.toList).toJson),
fhirInteraction = fhirInteraction
fhirInteraction = fhirInteraction,
executionId = executionId
))
//Otherwise return each resource as a separate mapping result
case (mappingExpr, resources, fhirInteraction) =>
Expand All @@ -189,7 +198,8 @@ object MappingTaskExecutor {
timestamp = Timestamp.from(Instant.now()),
source = Some(jo.toJson),
mappedResource = Some(r.toJson),
fhirInteraction = fhirInteraction
fhirInteraction = fhirInteraction,
executionId = executionId
)
)
}
Expand All @@ -206,7 +216,8 @@ object MappingTaskExecutor {
code = FhirMappingErrorCodes.MAPPING_ERROR,
description = t.msg + t.t.map(_.getMessage).map(" " + _).getOrElse(""),
expression = t.expression
)))
)),
executionId = executionId)
if (errorHandlingType == ErrorHandlingType.CONTINUE) {
Seq(fmr)
} else {
Expand All @@ -223,7 +234,8 @@ object MappingTaskExecutor {
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.MAPPING_ERROR,
description = e.getMessage
)))
)),
executionId = executionId)
if (errorHandlingType == ErrorHandlingType.CONTINUE) {
Seq(fmr)
} else {
Expand All @@ -239,7 +251,8 @@ object MappingTaskExecutor {
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.MAPPING_TIMEOUT,
description = s"A single row could not be mapped to FHIR in ${ToFhirConfig.engineConfig.mappingTimeout.toString}!"
)))
)),
executionId = executionId)
if (errorHandlingType == ErrorHandlingType.CONTINUE) {
logger.debug("Mapping timeout, continuing the processing of mappings...")
Seq(fmr)
Expand All @@ -258,7 +271,8 @@ object MappingTaskExecutor {
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.UNEXPECTED_PROBLEM,
description = "Exception:" + oth.getMessage
)))
)),
executionId = executionId)
if (errorHandlingType == ErrorHandlingType.CONTINUE) {
logger.error("Unexpected problem while executing the mappings...", oth)
Seq(fmr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ case class FhirMappingJobResult(mappingJobExecution: FhirMappingJobExecution,
(if (result != "FAILURE")
s"\t# of Invalid Rows: \t$numOfInvalids\n" +
s"\t# of Not Mapped: \t$numOfNotMapped\n" +
s"\t# of FHIR resources:\t$numOfFhirResources\n" +
s"\t# of Failed writes:\t$numOfFailedWrites"
s"\t# of Failed writes:\t$numOfFailedWrites\n" +
s"\t# of Written FHIR resources:\t$numOfFhirResources"
else
""
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import net.logstash.logback.marker.Markers._
* @param source If there is a problem in the process, the JSON serialization of the source data
* @param error If there is a problem in the process, description of the problem
* @param fhirInteraction FHIR interaction details to persist the mapped result
* @param executionId Id of FhirMappingJobExecution object
*/
case class FhirMappingResult(
jobId:String,
Expand All @@ -25,11 +26,12 @@ case class FhirMappingResult(
mappedResource:Option[String] = None,
source:Option[String] = None,
error:Option[FhirMappingError] = None,
fhirInteraction:Option[FhirInteraction] = None
fhirInteraction:Option[FhirInteraction] = None,
executionId: Option[String] = None
) {
final val eventId:String = "MAPPING_RESULT"
override def toString: String = {
s"Mapping failure (${error.get.code}) for job '$jobId' and mapping '$mappingUrl'${mappingExpr.map(e => s" within expression '$e'").getOrElse("")}!\n"+
s"Mapping failure (${error.get.code}) for job '$jobId' and mapping '$mappingUrl'${mappingExpr.map(e => s" within expression '$e'").getOrElse("")} execution '${executionId.getOrElse("")}'!\n"+
s"\tSource: ${source.get}\n"+
s"\tError: ${error.get.description}" +
error.get.expression.map(e => s"\n\tExpression: $e").getOrElse("")
Expand All @@ -42,13 +44,14 @@ case class FhirMappingResult(
def toLogstashMarker:LogstashMarker = {
val marker:LogstashMarker =
append("jobId", jobId)
.and(append("mappingUrl", mappingUrl)
.and(append("mappingExpr", mappingExpr.orElse(null))
.and(appendRaw("source", source.get)
.and(append("errorCode", error.get.code)
.and(append("errorDesc", error.get.description)
.and(append("errorExpr", error.get.expression.orElse(null))
.and(append("eventId", eventId))))))))
.and(append("executionId", executionId.getOrElse(""))
.and(append("mappingUrl", mappingUrl)
.and(append("mappingExpr", mappingExpr.orElse(null))
.and(appendRaw("source", source.get)
.and(append("errorCode", error.get.code)
.and(append("errorDesc", error.get.description)
.and(append("errorExpr", error.get.expression.orElse(null))
.and(append("eventId", eventId)))))))))

if(mappedResource.isDefined && error.get.code == FhirMappingErrorCodes.INVALID_RESOURCE)
marker.and(appendRaw("mappedResource", mappedResource.get))
Expand Down
Loading