Skip to content

Commit

Permalink
✨ feat(ExecutionService): Append intermediate execution error logs to…
Browse files Browse the repository at this point in the history
… job result logs.
  • Loading branch information
Okanmercan99 committed Jul 19, 2023
1 parent f5ca547 commit 2304788
Showing 1 changed file with 47 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.tofhir.server.service

import akka.shapeless.HList.ListCompat.::
import com.typesafe.scalalogging.LazyLogging
import io.tofhir.engine.ToFhirEngine
import io.tofhir.engine.config.{ErrorHandlingType, ToFhirConfig}
Expand All @@ -14,8 +15,9 @@ import io.tofhir.server.service.mapping.IMappingRepository
import io.tofhir.server.service.schema.ISchemaRepository
import io.tofhir.server.util.DataFrameUtil
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{ArrayType, BinaryType, IntegerType, ObjectType, StringType, StructField, StructType}
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods

Expand Down Expand Up @@ -145,12 +147,51 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
Seq.empty
}
else {
val jobRuns = dataFrame.filter(s"executionId = '$executionId'")
// handle the case where the job has not been run yet which makes the data frame empty
if (jobRuns.isEmpty) {
// Get job run logs for the given execution
val jobRunLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is not null")

// Handle the case where the job has not been run yet, which makes the data frame empty
if (jobRunLogs.isEmpty) {
Seq.empty
} else {
jobRuns.collect().map(row => {
// Get error logs for the given execution and select needed columns
val jobErrorLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is null").select(List("errorCode", "errorDesc", "message", "mappingUrl").map(col):_*)

// Group job error logs by mapping url
val jobErrorLogsGroupedByMappingUrl = jobErrorLogs.groupByKey(row => row.get(row.fieldIndex("mappingUrl")).toString)(Encoders.STRING)

// Define a new schema for the resulting rows. We will add a "error_logs" column to job run logs that contains related error logs.
val updatedSchema = jobRunLogs.schema.add("error_logs", ArrayType(
new StructType()
.add("errorCode", StringType)
.add("errorDesc", StringType)
.add("message", StringType)
.add("mappingUrl", StringType)
))
// Define a row encoder for new schema
implicit val rowEncoder: Encoder[Row] = RowEncoder(updatedSchema)

// Collect job run logs for matching with mappingUrl field of job error logs
var jobRunLogsData = jobRunLogs.collect()

// Get updated job run logs
val updatedJobRunLogs = jobErrorLogsGroupedByMappingUrl.mapGroups((key, values) => {
// Find the related job run log to given mapping url
val jobRunLog = jobRunLogsData.filter(row => row.getAs[String]("mappingUrl") == key)
// Append error logs to the related job run log
Row.fromSeq(Row.unapplySeq(jobRunLog.head).get :+ values.toSeq)
})(rowEncoder)

// Build a map for updated job run logs (mappingUrl -> updatedJobRunLogs)
val updatedJobRunLogsMap = updatedJobRunLogs.collect().map(updatedJobRunLog =>
updatedJobRunLog.getAs[String]("mappingUrl") -> updatedJobRunLog).toMap

// Replace job run logs if it is in the map
jobRunLogsData = jobRunLogsData.map(jobRunLog =>
updatedJobRunLogsMap.getOrElse(jobRunLog.getAs[String]("mappingUrl"), jobRunLog))

// return json objects for job run logs
jobRunLogsData.map(row => {
JsonMethods.parse(row.json)
})
}
Expand Down

0 comments on commit 2304788

Please sign in to comment.