Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple source log #239

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@

<!--Dependency versions-->
<scala-logging.version>3.9.5</scala-logging.version>
<onfhir.version>3.4-SNAPSHOT</onfhir.version>
<onfhir.version>3.3-SNAPSHOT</onfhir.version>
<onfhir-template-engine.version>1.1-SNAPSHOT</onfhir-template-engine.version>
<spark-on-fhir.version>1.0-SNAPSHOT</spark-on-fhir.version>
<json4s.version>3.7.0-M11</json4s.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package io.tofhir.engine.execution.processing

import io.tofhir.engine.model.{FhirMappingErrorCodes, FhirMappingJobExecution, FhirMappingResult}
import io.tofhir.engine.util.FileUtils.FileExtensions
import org.apache.hadoop.fs.FileUtil
import org.apache.spark.sql.functions.{col, from_json, schema_of_json}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}

import java.io.File
import java.util
import io.onfhir.util.JsonFormatter._

/**
* This class persists [[FhirMappingResult]]s that produced some error during the mapping process.
Expand Down Expand Up @@ -46,33 +46,72 @@ object ErroneousRecordWriter {
}

/**
* Writes the dataset to the errorOutputDirectory. Directory structure:
* error-folder-path\<error-type>\job-<jobId>\execution-<executionId>\<mappingTaskName>\<random-generated-name-by-spark>.csv
* Writes the dataset to the error output directory based on the error type and job details.
* Each source (e.g., "mainSource") within the "source" field is extracted dynamically,
* and its data is written into a separate subdirectory under the output path. The directory structure is as follows:
*
* @param mappingJobExecution job execution to get output directory of data sources with errors
* @param dataset filtered dataset of data sources with errors to write to configured folder
* @param mappingTaskName to create directory for each mapping name within the job execution
* @param errorType one of invalid_input, mapping_error, invalid_resource
* error-folder-path\<error-type>\job-<jobId>\execution-<executionId>\<mappingTaskName>\<sourceName>\<random-generated-name-by-spark>.csv
*
* @param mappingJobExecution The job execution object, used to get the output directory of data sources with errors.
* @param dataset The filtered dataset of data sources with errors to be written to the configured folder.
* @param mappingTaskName The name of the mapping task, used to create a directory for each mapping within the job execution.
* @param errorType The type of error (e.g., invalid_input, mapping_error, invalid_resource).
*/
private def writeErroneousDataset(mappingJobExecution: FhirMappingJobExecution,
dataset: Dataset[FhirMappingResult],
mappingTaskName: String,
errorType: String): Unit = {
val outputPath = mappingJobExecution.getErrorOutputDirectory(mappingTaskName, errorType)
val schema = schema_of_json(dataset.rdd.takeSample(withReplacement = false, num = 1).head.source.get)
val schema = schema_of_json(dataset.rdd.takeSample(withReplacement = false, num = 1).head.source)

// extract each source name (e.g., "mainSource", "secondarySource") from the dataset's "source" field
val jsonColumns: Array[String] = dataset
.select("source")
.rdd
.flatMap(row => row.getAs[String]("source").parseJson.values.keys) // parse JSON and get all keys from the "source" map
.distinct() // remove duplicate source names
.collect()

dataset
.withColumn("jsonData", from_json(col("source"), schema))
.select("jsonData.*")
.coalesce(1)
.write
.mode(SaveMode.Append)
.option("header", "true")
.csv(outputPath)
// for each source, create and write a separate CSV file
jsonColumns.foreach { sourceKey =>
// extract the source data for the given sourceKey and flatten the selected source field
val sourceDataset = dataset
.withColumn("jsonData", from_json(col("source"), schema))
.selectExpr(s"jsonData.$sourceKey.*")

// Remove all files except the CSV file (to remove .crc files)
val srcFiles = FileUtil.listFiles(new File(outputPath))
.filterNot(f => f.getPath.endsWith(FileExtensions.CSV.toString))
srcFiles.foreach(f => f.delete())
// write the extracted source data as a separate CSV file into the directory for that sourceKey
sourceDataset
.coalesce(1)
.write
.mode(SaveMode.Append)
.option("header", "true")
.csv(s"$outputPath/$sourceKey")
}

// remove all files except the CSV file (to remove .crc files)
deleteNonCsvFiles(new File(outputPath))
}

/**
* Recursively deletes non-CSV files (like .crc files) from the directory.
*
* @param dir The directory to clean up by removing non-CSV files.
*/
private def deleteNonCsvFiles(dir: File): Unit = {
if (dir.exists() && dir.isDirectory) {
// get a list of files to delete, excluding CSV files
val filesToDelete = dir.listFiles()
.filterNot(f => f.getPath.endsWith(FileExtensions.CSV.toString))
// process each file in the current directory
filesToDelete.foreach(file => {
if (file.isFile) {
// delete the file if it's a regular file
file.delete()
} else if (file.isDirectory) {
// if it's a subdirectory, recursively clean it up
deleteNonCsvFiles(file)
}
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(jo),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.INVALID_INPUT,
description = validationError
Expand Down Expand Up @@ -149,7 +149,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(jo),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.INVALID_INPUT,
description = validationErrors.mkString("\n")
Expand Down Expand Up @@ -189,7 +189,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappedFhirResources = flattenedResources.map(r => MappedFhirResource(Some(r._1), Some(Serialization.write(r._2)), r._3)),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
executionId = executionId,
projectId = fhirMappingService.projectId,
))
Expand All @@ -214,7 +214,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
mappedResource = Some(Serialization.write(JArray(resources.toList))),
fhirInteraction = fhirInteraction,
executionId = executionId,
Expand All @@ -228,7 +228,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
mappedResource = Some(Serialization.write(r)),
fhirInteraction = fhirInteraction,
executionId = executionId,
Expand Down Expand Up @@ -261,7 +261,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = Some(mappingExpr),
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.MAPPING_ERROR,
description = errorDescription,
Expand All @@ -276,7 +276,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.MAPPING_ERROR,
description = ExceptionUtil.extractExceptionMessages(e)
Expand All @@ -290,7 +290,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.MAPPING_TIMEOUT,
description = s"A single row could not be mapped to FHIR in ${ToFhirConfig.engineConfig.mappingTimeout.toString}!"
Expand All @@ -304,7 +304,7 @@ object MappingTaskExecutor {
mappingTaskName = fhirMappingService.mappingTaskName,
mappingExpr = None,
timestamp = Timestamp.from(Instant.now()),
source = Some(Serialization.write(jo)),
source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs),
error = Some(FhirMappingError(
code = FhirMappingErrorCodes.UNEXPECTED_PROBLEM,
description = "Exception:" + oth.getMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ case class FhirMappingResult(
mappingExpr:Option[String] = None,
timestamp:Timestamp,
mappedResource:Option[String] = None,
source:Option[String] = None,
source:String,
error:Option[FhirMappingError] = None,
fhirInteraction:Option[FhirInteraction] = None,
executionId: Option[String] = None,
Expand All @@ -39,7 +39,7 @@ case class FhirMappingResult(
final val eventId:String = "MAPPING_RESULT"
override def toString: String = {
s"Mapping failure (${error.get.code}) for job '$jobId' and mappingTask '$mappingTaskName'${mappingExpr.map(e => s" within expression '$e'").getOrElse("")} execution '${executionId.getOrElse("")}'!\n"+
s"\tSource: ${source.get}\n"+
s"\tSource: ${source}\n"+
s"\tError: ${error.get.description}" +
error.get.expression.map(e => s"\n\tExpression: $e").getOrElse("")
}
Expand All @@ -58,7 +58,7 @@ case class FhirMappingResult(
markerMap.put("executionId", executionId.getOrElse(""))
markerMap.put("mappingTaskName", mappingTaskName)
markerMap.put("mappingExpr", mappingExpr.orElse(null))
markerMap.put("source", source.get)
markerMap.put("source", source)
markerMap.put("errorCode", error.get.code)
markerMap.put("errorDesc", error.get.description)
markerMap.put("errorExpr", error.get.expression.getOrElse(""))
Expand Down
Loading
Loading