Skip to content

Commit

Permalink
♻️ refactor(ExecutionService): Rename jobErrorLogs as mappingErrorLog…
Browse files Browse the repository at this point in the history
…s to prevent confusion.
  • Loading branch information
Okanmercan99 committed Jul 20, 2023
1 parent a77297b commit 127cf52
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,27 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
Seq.empty
}
else {
// Get job run logs for the given execution. ProjectId field is not null for selecting jobRunsLogs, filter out job error logs.
// Get job run logs for the given execution. ProjectId field is not null for selecting jobRunsLogs, filter out mapping error logs.
val jobRunLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is not null")

// Handle the case where the job has not been run yet, which makes the data frame empty
if (jobRunLogs.isEmpty) {
Seq.empty
} else {
// Get error logs for the given execution and select needed columns. ProjectId field is null for selecting job error logs, filter out jobRunsLogs.
val jobErrorLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is null").select(List("errorCode", "errorDesc", "message", "mappingUrl").map(col):_*)
// Get error logs for the given execution and select needed columns. ProjectId field is null for selecting mapping error logs, filter out jobRunsLogs.
val mappingErrorLogs = dataFrame.filter(s"executionId = '$executionId' and projectId is null").select(List("errorCode", "errorDesc", "message", "mappingUrl").map(col):_*)

// Group job error logs by mapping url
val jobErrorLogsGroupedByMappingUrl = jobErrorLogs.groupByKey(row => row.get(row.fieldIndex("mappingUrl")).toString)(Encoders.STRING)
// Group mapping error logs by mapping url
val jobErrorLogsGroupedByMappingUrl = mappingErrorLogs.groupByKey(row => row.get(row.fieldIndex("mappingUrl")).toString)(Encoders.STRING)

// Collect job run logs for matching with mappingUrl field of job error logs
// Collect job run logs for matching with mappingUrl field of mapping error logs
var jobRunLogsData = jobRunLogs.collect()

// Add error details to job run logs if any error occurred while executing the job.
// Add mapping error details to job run logs if any error occurred while executing the job.
val jobRunLogsWithErrorDetails = jobErrorLogsGroupedByMappingUrl.mapGroups((key, values) => {
// Find the related job run log to given mapping url
val jobRunLog = jobRunLogsData.filter(row => row.getAs[String]("mappingUrl") == key)
// Append error logs to the related job run log
// Append mapping error logs to the related job run log
Row.fromSeq(Row.unapplySeq(jobRunLog.head).get :+ values.toSeq)
})(
// Define a new schema for the resulting rows and create an encoder for it. We will add a "error_logs" column to job run logs that contains related error logs.
Expand Down

0 comments on commit 127cf52

Please sign in to comment.