From f5ca547553a02bbc1289ec0c65b0f21ff61fac44 Mon Sep 17 00:00:00 2001 From: okanmercan Date: Mon, 17 Jul 2023 17:08:46 +0300 Subject: [PATCH 1/7] :sparkles: feat(FhirMappingResult): Append executionId to MappingResult logs. --- .../engine/data/write/SinkHandler.scala | 6 ++- .../mapping/FhirMappingJobManager.scala | 22 +++++---- .../engine/mapping/MappingTaskExecutor.scala | 46 ++++++++++++------- .../engine/model/FhirMappingResult.scala | 21 +++++---- 4 files changed, 59 insertions(+), 36 deletions(-) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala index f45e5cb3..4e1934a4 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala @@ -115,12 +115,14 @@ object SinkHandler { // Log the mapping and invalid input errors if (numOfNotMapped > 0 || numOfInvalids > 0) { mappingErrors.union(invalidInputs).foreach(r => - logger.warn(r.toLogstashMarker, r.toString) + logger.warn(r.copy(executionId = Some(mappingJobExecution.id)).toLogstashMarker, + r.copy(executionId = Some(mappingJobExecution.id)).toString) ) } if (numOfNotWritten > 0) notWrittenResources.forEach(r => - logger.warn(r.toLogstashMarker, r.toString) + logger.warn(r.copy(executionId = Some(mappingJobExecution.id)).toLogstashMarker, + r.copy(executionId = Some(mappingJobExecution.id)).toString) ) } } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala index db1e882d..275790ee 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala @@ -85,7 +85,7 @@ class FhirMappingJobManager( val mappedResourcesDf = mappingJobExecution.mappingTasks - .map(t => Await.result(readSourceAndExecuteTask(mappingJobExecution.jobId, t, sourceSettings, terminologyServiceSettings, identityServiceSettings), Duration.Inf)) + .map(t => Await.result(readSourceAndExecuteTask(mappingJobExecution.jobId, t, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id)), Duration.Inf)) .reduce((ts1, ts2) => ts1.union(ts2)) SinkHandler.writeStream(spark, mappingJobExecution, mappedResourcesDf, fhirWriter) @@ -207,7 +207,7 @@ class FhirMappingJobManager( ): Future[Unit] = { val fhirWriter = FhirWriterFactory.apply(sinkSettings) - readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings) + readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id)) .map { dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingJobExecution.mappingTasks.head.mappingRef), dataset, fhirWriter) @@ -223,6 +223,7 @@ class FhirMappingJobManager( * @param terminologyServiceSettings Terminology service settings * @param identityServiceSettings Identity service settings * @param timeRange Time range for the source data to load + * @param executionId Id of FhirMappingJobExecution object * @return */ private def readSourceAndExecuteTask(jobId: String, @@ -230,10 +231,11 @@ class FhirMappingJobManager( sourceSettings: Map[String, DataSourceSettings], terminologyServiceSettings: Option[TerminologyServiceSettings] = None, identityServiceSettings: Option[IdentityServiceSettings] = None, - timeRange: Option[(LocalDateTime, LocalDateTime)] = None + timeRange: Option[(LocalDateTime, LocalDateTime)] = None, + executionId: Option[String] = None ): Future[Dataset[FhirMappingResult]] = { val (fhirMapping, mds, df) = readJoinSourceData(task, sourceSettings, timeRange) - executeTask(jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings) + executeTask(jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, executionId) } /** @@ -264,11 +266,11 @@ class FhirMappingJobManager( //If not specify run it as single batch case None => logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.jobId} ...") - readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange) // Retrieve the source data and execute the mapping + readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id)) // Retrieve the source data and execute the mapping .map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter case Some(batchSize) if sizeOfDf < batchSize => logger.debug(s"Executing the mapping ${mappingTask.mappingRef} within job ${mappingJobExecution.jobId} ...") - readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange) // Retrieve the source data and execute the mapping + readSourceAndExecuteTask(mappingJobExecution.jobId, mappingTask, sourceSettings, terminologyServiceSettings, identityServiceSettings, timeRange, executionId = Some(mappingJobExecution.id)) // Retrieve the source data and execute the mapping .map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter //Otherwise divide the data into batches case Some(batchSize) => @@ -279,7 +281,7 @@ class FhirMappingJobManager( .zipWithIndex .foldLeft(Future.apply(())) { case (fj, (df, i)) => fj.flatMap(_ => - executeTask(mappingJobExecution.jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings) + executeTask(mappingJobExecution.jobId, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id)) .map(dataset => SinkHandler.writeBatch(spark, mappingJobExecution, Some(mappingTask.mappingRef), dataset, fhirWriter)) .map(_ => logger.debug(s"Batch ${i + 1} is completed for mapping ${mappingTask.mappingRef} within MappingJob: ${mappingJobExecution.jobId}...")) ) @@ -348,6 +350,7 @@ class FhirMappingJobManager( * @param mainSourceSettings Main source data settings * @param terminologyServiceSettings Terminology service settings * @param identityServiceSettings Identity service settings + * @param executionId Id of FhirMappingJobExecution object * @return */ def executeTask(jobId: String, @@ -356,6 +359,7 @@ class FhirMappingJobManager( mainSourceSettings: DataSourceSettings, terminologyServiceSettings: Option[TerminologyServiceSettings] = None, identityServiceSettings: Option[IdentityServiceSettings] = None, + executionId: Option[String] = None ): Future[Dataset[FhirMappingResult]] = { //Load the contextual data for the mapping Future @@ -369,7 +373,7 @@ class FhirMappingJobManager( val configurationContext = mainSourceSettings.toConfigurationContext //Construct the mapping service val fhirMappingService = new FhirMappingService(jobId, fhirMapping.url, fhirMapping.source.map(_.alias), (loadedContextMap :+ configurationContext).toMap, fhirMapping.mapping, fhirMapping.variable, terminologyServiceSettings, identityServiceSettings, functionLibraries) - MappingTaskExecutor.executeMapping(spark, df, fhirMappingService, mappingErrorHandlingType) + MappingTaskExecutor.executeMapping(spark, df, fhirMappingService, mappingErrorHandlingType, executionId) }) } @@ -433,7 +437,7 @@ class FhirMappingJobManager( terminologyServiceSettings: Option[TerminologyServiceSettings] = None, identityServiceSettings: Option[IdentityServiceSettings] = None, ): Future[Seq[FhirMappingResult]] = { - readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings) + readSourceAndExecuteTask(mappingJobExecution.jobId, mappingJobExecution.mappingTasks.head, sourceSettings, terminologyServiceSettings, identityServiceSettings, executionId = Some(mappingJobExecution.id)) .map { dataFrame => dataFrame .collect() // Collect into an Array[String] diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala index 5f8ad3cf..5a8e3232 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala @@ -40,13 +40,14 @@ object MappingTaskExecutor { * @param spark Spark session * @param df DataFrame to be mapped * @param fhirMappingService Mapping service for a specific FhirMapping together with contextual data and mapping scripts + * @param executionId Id of FhirMappingJobExecution object * @return */ - def executeMapping(spark: SparkSession, df: DataFrame, fhirMappingService: FhirMappingService, errorHandlingType: ErrorHandlingType): Dataset[FhirMappingResult] = { + def executeMapping(spark: SparkSession, df: DataFrame, fhirMappingService: FhirMappingService, errorHandlingType: ErrorHandlingType, executionId: Option[String] = None): Dataset[FhirMappingResult] = { fhirMappingService.sources match { - case Seq(_) => executeMappingOnSingleSource(spark, df, fhirMappingService, errorHandlingType) + case Seq(_) => executeMappingOnSingleSource(spark, df, fhirMappingService, errorHandlingType, executionId) //Executing on multiple sources - case oth => executeMappingOnMultipleSources(spark, df, fhirMappingService, oth, errorHandlingType) + case oth => executeMappingOnMultipleSources(spark, df, fhirMappingService, oth, errorHandlingType, executionId) } } @@ -55,12 +56,14 @@ object MappingTaskExecutor { * @param spark Spark session * @param df DataFrame to be mapped * @param fhirMappingService Mapping service for a specific FhirMapping together with contextual data and mapping scripts + * @param executionId Id of FhirMappingJobExecution object * @return */ private def executeMappingOnSingleSource(spark: SparkSession, df: DataFrame, fhirMappingService: FhirMappingService, - errorHandlingType: ErrorHandlingType): Dataset[FhirMappingResult] = { + errorHandlingType: ErrorHandlingType, + executionId: Option[String] = None): Dataset[FhirMappingResult] = { import spark.implicits._ val result = df @@ -72,7 +75,7 @@ object MappingTaskExecutor { Option(row.getAs[String](SourceHandler.INPUT_VALIDITY_ERROR)) match { //If input is valid - case None => executeMappingOnInput(jo, Map.empty[String, JValue], fhirMappingService, errorHandlingType) + case None => executeMappingOnInput(jo, Map.empty[String, JValue], fhirMappingService, errorHandlingType, executionId) //If the input is not valid, return the error case Some(validationError) => Seq(FhirMappingResult( @@ -84,7 +87,8 @@ object MappingTaskExecutor { error = Some(FhirMappingError( code = FhirMappingErrorCodes.INVALID_INPUT, description = validationError - )) + )), + executionId = executionId )) } }) @@ -95,7 +99,8 @@ object MappingTaskExecutor { df: DataFrame, fhirMappingService: FhirMappingService, sources: Seq[String], - errorHandlingType: ErrorHandlingType): Dataset[FhirMappingResult] = { + errorHandlingType: ErrorHandlingType, + executionId: Option[String] = None): Dataset[FhirMappingResult] = { import spark.implicits._ val result = df @@ -132,7 +137,7 @@ object MappingTaskExecutor { validationErrors match { //If input is valid - case Nil => executeMappingOnInput(jo, otherObjectMap, fhirMappingService, errorHandlingType) + case Nil => executeMappingOnInput(jo, otherObjectMap, fhirMappingService, errorHandlingType, executionId) //If the input is not valid, return the error case _ => Seq(FhirMappingResult( @@ -144,7 +149,8 @@ object MappingTaskExecutor { error = Some(FhirMappingError( code = FhirMappingErrorCodes.INVALID_INPUT, description = validationErrors.mkString("\n") - )) + )), + executionId = executionId )) } }) @@ -157,12 +163,14 @@ object MappingTaskExecutor { * @param jo Input object * @param fhirMappingService Mapping service * @param errorHandlingType Error handling type + * @param executionId Id of FhirMappingJobExecution object * @return */ private def executeMappingOnInput(jo: JObject, otherInputs: Map[String, JValue], fhirMappingService: FhirMappingService, - errorHandlingType: ErrorHandlingType): Seq[FhirMappingResult] = { + errorHandlingType: ErrorHandlingType, + executionId: Option[String] = None): Seq[FhirMappingResult] = { val results = try { @@ -177,7 +185,8 @@ object MappingTaskExecutor { timestamp = Timestamp.from(Instant.now()), source = Some(jo.toJson), mappedResource = Some(JArray(resources.toList).toJson), - fhirInteraction = fhirInteraction + fhirInteraction = fhirInteraction, + executionId = executionId )) //Otherwise return each resource as a separate mapping result case (mappingExpr, resources, fhirInteraction) => @@ -189,7 +198,8 @@ object MappingTaskExecutor { timestamp = Timestamp.from(Instant.now()), source = Some(jo.toJson), mappedResource = Some(r.toJson), - fhirInteraction = fhirInteraction + fhirInteraction = fhirInteraction, + executionId = executionId ) ) } @@ -206,7 +216,8 @@ object MappingTaskExecutor { code = FhirMappingErrorCodes.MAPPING_ERROR, description = t.msg + t.t.map(_.getMessage).map(" " + _).getOrElse(""), expression = t.expression - ))) + )), + executionId = executionId) if (errorHandlingType == ErrorHandlingType.CONTINUE) { Seq(fmr) } else { @@ -223,7 +234,8 @@ object MappingTaskExecutor { error = Some(FhirMappingError( code = FhirMappingErrorCodes.MAPPING_ERROR, description = e.getMessage - ))) + )), + executionId = executionId) if (errorHandlingType == ErrorHandlingType.CONTINUE) { Seq(fmr) } else { @@ -239,7 +251,8 @@ object MappingTaskExecutor { error = Some(FhirMappingError( code = FhirMappingErrorCodes.MAPPING_TIMEOUT, description = s"A single row could not be mapped to FHIR in ${ToFhirConfig.engineConfig.mappingTimeout.toString}!" - ))) + )), + executionId = executionId) if (errorHandlingType == ErrorHandlingType.CONTINUE) { logger.debug("Mapping timeout, continuing the processing of mappings...") Seq(fmr) @@ -258,7 +271,8 @@ object MappingTaskExecutor { error = Some(FhirMappingError( code = FhirMappingErrorCodes.UNEXPECTED_PROBLEM, description = "Exception:" + oth.getMessage - ))) + )), + executionId = executionId) if (errorHandlingType == ErrorHandlingType.CONTINUE) { logger.error("Unexpected problem while executing the mappings...", oth) Seq(fmr) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingResult.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingResult.scala index a5807393..e6aae117 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingResult.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingResult.scala @@ -16,6 +16,7 @@ import net.logstash.logback.marker.Markers._ * @param source If there is a problem in the process, the JSON serialization of the source data * @param error If there is a problem in the process, description of the problem * @param fhirInteraction FHIR interaction details to persist the mapped result + * @param executionId Id of FhirMappingJobExecution object */ case class FhirMappingResult( jobId:String, @@ -25,11 +26,12 @@ case class FhirMappingResult( mappedResource:Option[String] = None, source:Option[String] = None, error:Option[FhirMappingError] = None, - fhirInteraction:Option[FhirInteraction] = None + fhirInteraction:Option[FhirInteraction] = None, + executionId: Option[String] = None ) { final val eventId:String = "MAPPING_RESULT" override def toString: String = { - s"Mapping failure (${error.get.code}) for job '$jobId' and mapping '$mappingUrl'${mappingExpr.map(e => s" within expression '$e'").getOrElse("")}!\n"+ + s"Mapping failure (${error.get.code}) for job '$jobId' and mapping '$mappingUrl'${mappingExpr.map(e => s" within expression '$e'").getOrElse("")} execution '${executionId.getOrElse("")}'!\n"+ s"\tSource: ${source.get}\n"+ s"\tError: ${error.get.description}" + error.get.expression.map(e => s"\n\tExpression: $e").getOrElse("") @@ -42,13 +44,14 @@ case class FhirMappingResult( def toLogstashMarker:LogstashMarker = { val marker:LogstashMarker = append("jobId", jobId) - .and(append("mappingUrl", mappingUrl) - .and(append("mappingExpr", mappingExpr.orElse(null)) - .and(appendRaw("source", source.get) - .and(append("errorCode", error.get.code) - .and(append("errorDesc", error.get.description) - .and(append("errorExpr", error.get.expression.orElse(null)) - .and(append("eventId", eventId)))))))) + .and(append("executionId", executionId.getOrElse("")) + .and(append("mappingUrl", mappingUrl) + .and(append("mappingExpr", mappingExpr.orElse(null)) + .and(appendRaw("source", source.get) + .and(append("errorCode", error.get.code) + .and(append("errorDesc", error.get.description) + .and(append("errorExpr", error.get.expression.orElse(null)) + .and(append("eventId", eventId))))))))) if(mappedResource.isDefined && error.get.code == FhirMappingErrorCodes.INVALID_RESOURCE) marker.and(appendRaw("mappedResource", mappedResource.get)) From 2304788c168ceb7f7bcd43bf60c4616632de3eeb Mon Sep 17 00:00:00 2001 From: okanmercan Date: Tue, 18 Jul 2023 15:45:10 +0300 Subject: [PATCH 2/7] :sparkles: feat(ExecutionService): Append intermediate execution error logs to job result logs. --- .../server/service/ExecutionService.scala | 53 ++++++++++++++++--- 1 file changed, 47 insertions(+), 6 deletions(-) diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala index 7f9f0d1c..81767ff6 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala @@ -1,5 +1,6 @@ package io.tofhir.server.service +import akka.shapeless.HList.ListCompat.:: import com.typesafe.scalalogging.LazyLogging import io.tofhir.engine.ToFhirEngine import io.tofhir.engine.config.{ErrorHandlingType, ToFhirConfig} @@ -14,8 +15,9 @@ import io.tofhir.server.service.mapping.IMappingRepository import io.tofhir.server.service.schema.ISchemaRepository import io.tofhir.server.util.DataFrameUtil import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import org.apache.spark.sql.{Encoders, Row} +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.types.{ArrayType, BinaryType, IntegerType, ObjectType, StringType, StructField, StructType} +import org.apache.spark.sql.{Encoder, Encoders, Row} import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods @@ -145,12 +147,51 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin Seq.empty } else { - val jobRuns = dataFrame.filter(s"executionId = '$executionId'") - // handle the case where the job has not been run yet which makes the data frame empty - if (jobRuns.isEmpty) { + // Get job run logs for the given execution + val jobRunLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is not null") + + // Handle the case where the job has not been run yet, which makes the data frame empty + if (jobRunLogs.isEmpty) { Seq.empty } else { - jobRuns.collect().map(row => { + // Get error logs for the given execution and select needed columns + val jobErrorLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is null").select(List("errorCode", "errorDesc", "message", "mappingUrl").map(col):_*) + + // Group job error logs by mapping url + val jobErrorLogsGroupedByMappingUrl = jobErrorLogs.groupByKey(row => row.get(row.fieldIndex("mappingUrl")).toString)(Encoders.STRING) + + // Define a new schema for the resulting rows. We will add a "error_logs" column to job run logs that contains related error logs. + val updatedSchema = jobRunLogs.schema.add("error_logs", ArrayType( + new StructType() + .add("errorCode", StringType) + .add("errorDesc", StringType) + .add("message", StringType) + .add("mappingUrl", StringType) + )) + // Define a row encoder for new schema + implicit val rowEncoder: Encoder[Row] = RowEncoder(updatedSchema) + + // Collect job run logs for matching with mappingUrl field of job error logs + var jobRunLogsData = jobRunLogs.collect() + + // Get updated job run logs + val updatedJobRunLogs = jobErrorLogsGroupedByMappingUrl.mapGroups((key, values) => { + // Find the related job run log to given mapping url + val jobRunLog = jobRunLogsData.filter(row => row.getAs[String]("mappingUrl") == key) + // Append error logs to the related job run log + Row.fromSeq(Row.unapplySeq(jobRunLog.head).get :+ values.toSeq) + })(rowEncoder) + + // Build a map for updated job run logs (mappingUrl -> updatedJobRunLogs) + val updatedJobRunLogsMap = updatedJobRunLogs.collect().map(updatedJobRunLog => + updatedJobRunLog.getAs[String]("mappingUrl") -> updatedJobRunLog).toMap + + // Replace job run logs if it is in the map + jobRunLogsData = jobRunLogsData.map(jobRunLog => + updatedJobRunLogsMap.getOrElse(jobRunLog.getAs[String]("mappingUrl"), jobRunLog)) + + // return json objects for job run logs + jobRunLogsData.map(row => { JsonMethods.parse(row.json) }) } From ff43d5bd05544545041ab7088ad2eab475928dcb Mon Sep 17 00:00:00 2001 From: okanmercan Date: Wed, 19 Jul 2023 17:17:08 +0300 Subject: [PATCH 3/7] :recycle: refactor(ExecutionService): Minor refactoring. --- .../server/service/ExecutionService.scala | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala index 81767ff6..347b24f9 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala @@ -147,43 +147,42 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin Seq.empty } else { - // Get job run logs for the given execution + // Get job run logs for the given execution. ProjectId field is not null for selecting jobRunsLogs, filter out job error logs. val jobRunLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is not null") // Handle the case where the job has not been run yet, which makes the data frame empty if (jobRunLogs.isEmpty) { Seq.empty } else { - // Get error logs for the given execution and select needed columns + // Get error logs for the given execution and select needed columns. ProjectId field is null for selecting job error logs, filter out jobRunsLogs. val jobErrorLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is null").select(List("errorCode", "errorDesc", "message", "mappingUrl").map(col):_*) // Group job error logs by mapping url val jobErrorLogsGroupedByMappingUrl = jobErrorLogs.groupByKey(row => row.get(row.fieldIndex("mappingUrl")).toString)(Encoders.STRING) - // Define a new schema for the resulting rows. We will add a "error_logs" column to job run logs that contains related error logs. - val updatedSchema = jobRunLogs.schema.add("error_logs", ArrayType( - new StructType() - .add("errorCode", StringType) - .add("errorDesc", StringType) - .add("message", StringType) - .add("mappingUrl", StringType) - )) - // Define a row encoder for new schema - implicit val rowEncoder: Encoder[Row] = RowEncoder(updatedSchema) - // Collect job run logs for matching with mappingUrl field of job error logs var jobRunLogsData = jobRunLogs.collect() - // Get updated job run logs - val updatedJobRunLogs = jobErrorLogsGroupedByMappingUrl.mapGroups((key, values) => { + // Add error details to job run logs if any error occurred while executing the job. + val jobRunLogsWithErrorDetails = jobErrorLogsGroupedByMappingUrl.mapGroups((key, values) => { // Find the related job run log to given mapping url val jobRunLog = jobRunLogsData.filter(row => row.getAs[String]("mappingUrl") == key) // Append error logs to the related job run log Row.fromSeq(Row.unapplySeq(jobRunLog.head).get :+ values.toSeq) - })(rowEncoder) + })( + // Define a new schema for the resulting rows and create an encoder for it. We will add a "error_logs" column to job run logs that contains related error logs. + RowEncoder(jobRunLogs.schema.add("error_logs", ArrayType( + new StructType() + .add("errorCode", StringType) + .add("errorDesc", StringType) + .add("message", StringType) + .add("mappingUrl", StringType) + )) + ) + ) - // Build a map for updated job run logs (mappingUrl -> updatedJobRunLogs) - val updatedJobRunLogsMap = updatedJobRunLogs.collect().map(updatedJobRunLog => + // Build a map for updated job run logs (mappingUrl -> jobRunLogsWithErrorDetails) + val updatedJobRunLogsMap = jobRunLogsWithErrorDetails.collect().map(updatedJobRunLog => updatedJobRunLog.getAs[String]("mappingUrl") -> updatedJobRunLog).toMap // Replace job run logs if it is in the map From f7e034e0ca5a157d8329a7121bd60aa1cf587f7c Mon Sep 17 00:00:00 2001 From: okanmercan Date: Thu, 20 Jul 2023 09:16:18 +0300 Subject: [PATCH 4/7] :lipstick: style(FhirMappingJobResult): Change logging order of the results. --- .../scala/io/tofhir/engine/model/FhirMappingJobResult.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobResult.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobResult.scala index 9d86f076..3ef1912a 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobResult.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingJobResult.scala @@ -32,8 +32,8 @@ case class FhirMappingJobResult(mappingJobExecution: FhirMappingJobExecution, (if (result != "FAILURE") s"\t# of Invalid Rows: \t$numOfInvalids\n" + s"\t# of Not Mapped: \t$numOfNotMapped\n" + - s"\t# of FHIR resources:\t$numOfFhirResources\n" + - s"\t# of Failed writes:\t$numOfFailedWrites" + s"\t# of Failed writes:\t$numOfFailedWrites\n" + + s"\t# of Written FHIR resources:\t$numOfFhirResources" else "" ) From a34831f6415b9401c54ea4019c30fefe19727f1a Mon Sep 17 00:00:00 2001 From: okanmercan Date: Thu, 20 Jul 2023 09:28:22 +0300 Subject: [PATCH 5/7] :bug: fix(ExecutionService): Order of mappings is restored while executing job. --- .../main/scala/io/tofhir/server/service/ExecutionService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala index 347b24f9..63796f2e 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala @@ -64,7 +64,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin // get the list of mapping task to be executed val mappingTasks = mappingUrls match { - case Some(urls) => mappingJob.mappings.filter(m => urls.contains(m.mappingRef)) + case Some(urls) => urls.flatMap(url => mappingJob.mappings.find(p => p.mappingRef.contentEquals(url))) case None => mappingJob.mappings } // create execution From a77297b20a059c2b117da5cfd62005b8823ce960 Mon Sep 17 00:00:00 2001 From: okanmercan Date: Thu, 20 Jul 2023 11:56:49 +0300 Subject: [PATCH 6/7] :bug: fix(SinkHandler): Subtract numOfFailedWrites from numberOfFhirResources. --- .../main/scala/io/tofhir/engine/data/write/SinkHandler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala index 4e1934a4..552bdc83 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/data/write/SinkHandler.scala @@ -107,9 +107,10 @@ object SinkHandler { val numOfInvalids = invalidInputs.count() val numOfNotMapped = mappingErrors.count() val numOfNotWritten = notWrittenResources.size() + val numOfWritten = numOfFhirResources - numOfNotWritten //Log the job result - val jobResult = FhirMappingJobResult(mappingJobExecution, mappingUrl, numOfInvalids, numOfNotMapped, numOfFhirResources, numOfNotWritten) + val jobResult = FhirMappingJobResult(mappingJobExecution, mappingUrl, numOfInvalids, numOfNotMapped, numOfWritten, numOfNotWritten) logger.info(jobResult.toLogstashMarker, jobResult.toString) // Log the mapping and invalid input errors From 127cf5207a7065e4612eebde6ff377142ad5a9ba Mon Sep 17 00:00:00 2001 From: okanmercan Date: Thu, 20 Jul 2023 14:41:34 +0300 Subject: [PATCH 7/7] :recycle: refactor(ExecutionService): Rename jobErrorLogs as mappingErrorLogs to prevent confusion. --- .../tofhir/server/service/ExecutionService.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala index 63796f2e..21ea9532 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala @@ -147,27 +147,27 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin Seq.empty } else { - // Get job run logs for the given execution. ProjectId field is not null for selecting jobRunsLogs, filter out job error logs. + // Get job run logs for the given execution. ProjectId field is not null for selecting jobRunsLogs, filter out mapping error logs. val jobRunLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is not null") // Handle the case where the job has not been run yet, which makes the data frame empty if (jobRunLogs.isEmpty) { Seq.empty } else { - // Get error logs for the given execution and select needed columns. ProjectId field is null for selecting job error logs, filter out jobRunsLogs. - val jobErrorLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is null").select(List("errorCode", "errorDesc", "message", "mappingUrl").map(col):_*) + // Get error logs for the given execution and select needed columns. ProjectId field is null for selecting mapping error logs, filter out jobRunsLogs. + val mappingErrorLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is null").select(List("errorCode", "errorDesc", "message", "mappingUrl").map(col):_*) - // Group job error logs by mapping url - val jobErrorLogsGroupedByMappingUrl = jobErrorLogs.groupByKey(row => row.get(row.fieldIndex("mappingUrl")).toString)(Encoders.STRING) + // Group mapping error logs by mapping url + val jobErrorLogsGroupedByMappingUrl = mappingErrorLogs.groupByKey(row => row.get(row.fieldIndex("mappingUrl")).toString)(Encoders.STRING) - // Collect job run logs for matching with mappingUrl field of job error logs + // Collect job run logs for matching with mappingUrl field of mapping error logs var jobRunLogsData = jobRunLogs.collect() - // Add error details to job run logs if any error occurred while executing the job. + // Add mapping error details to job run logs if any error occurred while executing the job. val jobRunLogsWithErrorDetails = jobErrorLogsGroupedByMappingUrl.mapGroups((key, values) => { // Find the related job run log to given mapping url val jobRunLog = jobRunLogsData.filter(row => row.getAs[String]("mappingUrl") == key) - // Append error logs to the related job run log + // Append mapping error logs to the related job run log Row.fromSeq(Row.unapplySeq(jobRunLog.head).get :+ values.toSeq) })( // Define a new schema for the resulting rows and create an encoder for it. We will add a "error_logs" column to job run logs that contains related error logs.