From 91704f201ae725d650504811cc83b3661ba702f6 Mon Sep 17 00:00:00 2001 From: okanmercan Date: Tue, 18 Jul 2023 15:45:10 +0300 Subject: [PATCH] :sparkles: feat(ExecutionService): Append intermediate execution error logs to job result logs. --- .../server/service/ExecutionService.scala | 53 ++++++++++++++++--- 1 file changed, 47 insertions(+), 6 deletions(-) diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala index 7f9f0d1c..81767ff6 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala @@ -1,5 +1,6 @@ package io.tofhir.server.service +import akka.shapeless.HList.ListCompat.:: import com.typesafe.scalalogging.LazyLogging import io.tofhir.engine.ToFhirEngine import io.tofhir.engine.config.{ErrorHandlingType, ToFhirConfig} @@ -14,8 +15,9 @@ import io.tofhir.server.service.mapping.IMappingRepository import io.tofhir.server.service.schema.ISchemaRepository import io.tofhir.server.util.DataFrameUtil import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import org.apache.spark.sql.{Encoders, Row} +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.types.{ArrayType, BinaryType, IntegerType, ObjectType, StringType, StructField, StructType} +import org.apache.spark.sql.{Encoder, Encoders, Row} import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods @@ -145,12 +147,51 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin Seq.empty } else { - val jobRuns = dataFrame.filter(s"executionId = '$executionId'") - // handle the case where the job has not been run yet which makes the data frame empty - if (jobRuns.isEmpty) { + // Get job run logs for the given execution + val jobRunLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is not null") + + // Handle the case where the job has not been run yet, which makes the data frame empty + if (jobRunLogs.isEmpty) { Seq.empty } else { - jobRuns.collect().map(row => { + // Get error logs for the given execution and select needed columns + val jobErrorLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is null").select(List("errorCode", "errorDesc", "message", "mappingUrl").map(col):_*) + + // Group job error logs by mapping url + val jobErrorLogsGroupedByMappingUrl = jobErrorLogs.groupByKey(row => row.get(row.fieldIndex("mappingUrl")).toString)(Encoders.STRING) + + // Define a new schema for the resulting rows. We will add a "error_logs" column to job run logs that contains related error logs. + val updatedSchema = jobRunLogs.schema.add("error_logs", ArrayType( + new StructType() + .add("errorCode", StringType) + .add("errorDesc", StringType) + .add("message", StringType) + .add("mappingUrl", StringType) + )) + // Define a row encoder for new schema + implicit val rowEncoder: Encoder[Row] = RowEncoder(updatedSchema) + + // Collect job run logs for matching with mappingUrl field of job error logs + var jobRunLogsData = jobRunLogs.collect() + + // Get updated job run logs + val updatedJobRunLogs = jobErrorLogsGroupedByMappingUrl.mapGroups((key, values) => { + // Find the related job run log to given mapping url + val jobRunLog = jobRunLogsData.filter(row => row.getAs[String]("mappingUrl") == key) + // Append error logs to the related job run log + Row.fromSeq(Row.unapplySeq(jobRunLog.head).get :+ values.toSeq) + })(rowEncoder) + + // Build a map for updated job run logs (mappingUrl -> updatedJobRunLogs) + val updatedJobRunLogsMap = updatedJobRunLogs.collect().map(updatedJobRunLog => + updatedJobRunLog.getAs[String]("mappingUrl") -> updatedJobRunLog).toMap + + // Replace job run logs if it is in the map + jobRunLogsData = jobRunLogsData.map(jobRunLog => + updatedJobRunLogsMap.getOrElse(jobRunLog.getAs[String]("mappingUrl"), jobRunLog)) + + // return json objects for job run logs + jobRunLogsData.map(row => { JsonMethods.parse(row.json) }) }