Skip to content

Commit

Permalink
✨ Improve ErroneousRecordWriter to handle multiple sources
Browse files Browse the repository at this point in the history
  • Loading branch information
dogukan10 committed Oct 11, 2024
1 parent b81826e commit 650a096
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package io.tofhir.engine.execution.processing

import io.tofhir.engine.model.{FhirMappingErrorCodes, FhirMappingJobExecution, FhirMappingResult}
import io.tofhir.engine.util.FileUtils.FileExtensions
import org.apache.hadoop.fs.FileUtil
import org.apache.spark.sql.functions.{col, from_json, schema_of_json}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}

import java.io.File
import java.util
import io.onfhir.util.JsonFormatter._

/**
* This class persists [[FhirMappingResult]]s that produced some error during the mapping process.
Expand Down Expand Up @@ -46,13 +46,16 @@ object ErroneousRecordWriter {
}

/**
* Writes the dataset to the errorOutputDirectory. Directory structure:
* error-folder-path\<error-type>\job-<jobId>\execution-<executionId>\<mappingTaskName>\<random-generated-name-by-spark>.csv
* Writes the dataset to the error output directory based on the error type and job details.
* Each source (e.g., "mainSource") within the "source" field is extracted dynamically,
* and its data is written into a separate subdirectory under the output path. The directory structure is as follows:
*
* @param mappingJobExecution job execution to get output directory of data sources with errors
* @param dataset filtered dataset of data sources with errors to write to configured folder
* @param mappingTaskName to create directory for each mapping name within the job execution
* @param errorType one of invalid_input, mapping_error, invalid_resource
* error-folder-path\<error-type>\job-<jobId>\execution-<executionId>\<mappingTaskName>\<sourceName>\<random-generated-name-by-spark>.csv
*
* @param mappingJobExecution The job execution object, used to get the output directory of data sources with errors.
* @param dataset The filtered dataset of data sources with errors to be written to the configured folder.
* @param mappingTaskName The name of the mapping task, used to create a directory for each mapping within the job execution.
* @param errorType The type of error (e.g., invalid_input, mapping_error, invalid_resource).
*/
private def writeErroneousDataset(mappingJobExecution: FhirMappingJobExecution,
dataset: Dataset[FhirMappingResult],
Expand All @@ -61,18 +64,54 @@ object ErroneousRecordWriter {
val outputPath = mappingJobExecution.getErrorOutputDirectory(mappingTaskName, errorType)
val schema = schema_of_json(dataset.rdd.takeSample(withReplacement = false, num = 1).head.source)

dataset
.withColumn("jsonData", from_json(col("source"), schema))
.select("jsonData.*")
.coalesce(1)
.write
.mode(SaveMode.Append)
.option("header", "true")
.csv(outputPath)
// extract each source name (e.g., "mainSource", "secondarySource") from the dataset's "source" field
val jsonColumns: Array[String] = dataset
.select("source")
.rdd
.flatMap(row => row.getAs[String]("source").parseJson.values.keys) // parse JSON and get all keys from the "source" map
.distinct() // remove duplicate source names
.collect()

// for each source, create and write a separate CSV file
jsonColumns.foreach { sourceKey =>
// extract the source data for the given sourceKey and flatten the selected source field
val sourceDataset = dataset
.withColumn("jsonData", from_json(col("source"), schema))
.selectExpr(s"jsonData.$sourceKey.*")

// Remove all files except the CSV file (to remove .crc files)
val srcFiles = FileUtil.listFiles(new File(outputPath))
.filterNot(f => f.getPath.endsWith(FileExtensions.CSV.toString))
srcFiles.foreach(f => f.delete())
// write the extracted source data as a separate CSV file into the directory for that sourceKey
sourceDataset
.coalesce(1)
.write
.mode(SaveMode.Append)
.option("header", "true")
.csv(s"$outputPath/$sourceKey")
}

// remove all files except the CSV file (to remove .crc files)
deleteNonCsvFiles(new File(outputPath))
}

/**
* Recursively deletes non-CSV files (like .crc files) from the directory.
*
* @param dir The directory to clean up by removing non-CSV files.
*/
private def deleteNonCsvFiles(dir: File): Unit = {
if (dir.exists() && dir.isDirectory) {
// get a list of files to delete, excluding CSV files
val filesToDelete = dir.listFiles()
.filterNot(f => f.getPath.endsWith(FileExtensions.CSV.toString))
// process each file in the current directory
filesToDelete.foreach(file => {
if (file.isFile) {
// delete the file if it's a regular file
file.delete()
} else if (file.isDirectory) {
// if it's a subdirectory, recursively clean it up
deleteNonCsvFiles(file)
}
})
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pid,gender
test-patient,male
2 changes: 2 additions & 0 deletions tofhir-server/src/test/resources/test-data/patient-simple.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pid
test-patient
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"name": "patient-job-test",
"sourceSettings": {
"patientSource": {
"jsonClass": "FileSystemSourceSettings",
"name": "patient-test-data",
"sourceUri": "http://test-data",
"dataFolderPath": "./test-data",
"asStream": false
},
"genderSource": {
"jsonClass": "FileSystemSourceSettings",
"name": "patient-gender-test-data",
"sourceUri": "http://test-data",
"dataFolderPath": "./test-data",
"asStream": false
}
},
"sinkSettings": {
"jsonClass": "FhirRepositorySinkSettings",
"fhirRepoUrl": "http://localhost:8081/fhir",
"returnMinimal": true
},
"mappings": [
{
"name": "patient-mapping-with-two-sources",
"mappingRef": "http://patient-mapping-with-two-sources",
"sourceBinding": {
"patient": {
"jsonClass": "FileSystemSource",
"path": "patient-simple.csv",
"contentType": "csv",
"options": {},
"sourceRef": "patientSource"
},
"patientGender": {
"jsonClass": "FileSystemSource",
"path": "patient-gender-simple.csv",
"contentType": "csv",
"options": {},
"sourceRef": "genderSource"
}
}
}
],
"dataProcessingSettings": {
"saveErroneousRecords": true,
"archiveMode": "off"
},
"useFhirSinkAsIdentityService": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"id": "patient-mapping-two-sources",
"url": "http://patient-mapping-with-two-sources",
"name": "patient-mapping",
"title": "Patient Mapping",
"source": [
{
"alias": "patient",
"url": "http://patient-schema",
"joinOn": [
"pid"
]
},
{
"alias": "patientGender",
"url": "http://patient-gender",
"joinOn": [
"pid"
]
}
],
"context": {},
"variable": [],
"mapping": [
{
"expression": {
"name": "patient",
"language": "application/fhir-template+json",
"value": {
"gender": "{{%patientGender.gender + 1}}",
"id": "{{pid}}",
"meta": {
"profile": [
"http://hl7.org/fhir/StructureDefinition/Patient"
]
},
"resourceType": "Patient"
}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta
val erroneousRecordsFolder = Paths.get(toFhirEngineConfig.erroneousRecordsFolder, FhirMappingErrorCodes.MAPPING_ERROR)
erroneousRecordsFolder.toFile.exists() && {
val jobFolder = Paths.get(erroneousRecordsFolder.toString, s"job-${batchJob.id}").toFile
val csvFile = jobFolder.listFiles().head.listFiles().head.listFiles().head
val csvFile = jobFolder.listFiles().head // execution folder
.listFiles().head // mapping task folder
.listFiles().head // source folder i.e. main source, secondary source etc.
.listFiles().head // csv file
// Spark initially writes data to files in the "_temporary" directory. After all tasks complete successfully,
// the files are moved from "_temporary" to the parent output directory, and "_temporary" is deleted. This
// intermediate step can be observed during testing, which is why we check if the file is a CSV.
Expand All @@ -211,6 +214,66 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta
}
}

"save erroneous records for job with multiple sources" in {
// Create a new mapping
createMappingAndVerify("test-mappings/patient-mapping-with-two-sources.json", 2)

// Update the job with the new mapping and new sink configuration
val patientMappingTask: FhirMappingTask = FhirMappingTask(
name = "patient-mapping-two-sources",
mappingRef = "http://patient-mapping-with-two-sources",
sourceBinding = Map("patient" -> FileSystemSource(path = "patient-simple.csv", contentType = SourceContentTypes.CSV),
"patientGender" -> FileSystemSource(path = "patient-gender-simple.csv", contentType = SourceContentTypes.CSV))
)
sinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = onFhirClient.getBaseUrl())
val job = batchJob.copy(id = UUID.randomUUID().toString, mappings = Seq(patientMappingTask), sinkSettings = sinkSettings, name = Some("twoSourceJob"))

// Create the job
Post(s"/${webServerConfig.baseUri}/${ProjectEndpoint.SEGMENT_PROJECTS}/$projectId/${JobEndpoint.SEGMENT_JOB}", HttpEntity(ContentTypes.`application/json`, writePretty(job))) ~> route ~> check {
status shouldEqual StatusCodes.Created
}

// Run the job
Post(s"/${webServerConfig.baseUri}/${ProjectEndpoint.SEGMENT_PROJECTS}/$projectId/${JobEndpoint.SEGMENT_JOB}/${job.id}/${JobEndpoint.SEGMENT_RUN}", HttpEntity(ContentTypes.`application/json`, "")) ~> route ~> check {
status shouldEqual StatusCodes.OK

// test if erroneous records are written to error folder
val success = waitForCondition(120) {
val erroneousRecordsFolder = Paths.get(toFhirEngineConfig.erroneousRecordsFolder, FhirMappingErrorCodes.MAPPING_ERROR)
val jobFolder = Paths.get(erroneousRecordsFolder.toString, s"job-${job.id}").toFile
jobFolder.exists() && {
val sourceFolders = jobFolder.listFiles().head // execution folder
.listFiles().head // mapping task folder
.listFiles() // source folder i.e. main source, secondary source etc.
sourceFolders.length == 2 && {
val mainSource = sourceFolders.head
val csvFile = mainSource.listFiles().head
mainSource.getName.contentEquals("mainSource") &&
// Spark initially writes data to files in the "_temporary" directory. After all tasks complete successfully,
// the files are moved from "_temporary" to the parent output directory, and "_temporary" is deleted. This
// intermediate step can be observed during testing, which is why we check if the file is a CSV.
csvFile.exists() && csvFile.getName.endsWith(".csv") && {
val csvFileContent = sparkSession.read.option("header", "true").csv(csvFile.getPath)
csvFileContent.count() == 1
}
} && {
val secondarySource = sourceFolders.last
val csvFile = secondarySource.listFiles().head
secondarySource.getName.contentEquals("patientGender") &&
// Spark initially writes data to files in the "_temporary" directory. After all tasks complete successfully,
// the files are moved from "_temporary" to the parent output directory, and "_temporary" is deleted. This
// intermediate step can be observed during testing, which is why we check if the file is a CSV.
csvFile.exists() && csvFile.getName.endsWith(".csv") && {
val csvFileContent = sparkSession.read.option("header", "true").csv(csvFile.getPath)
csvFileContent.count() == 1
}
}
}
}
if (!success) fail("Failed to find expected number of erroneous records. Either the erroneous record file is not available or the number of records does not match")
}
}

sinkSettings = FileSystemSinkSettings(path = s"./$fsSinkFolderName/job2", SinkContentTypes.NDJSON)
val job2: FhirMappingJob = FhirMappingJob(name = Some("mappingJob2"), sourceSettings = mappingJobSourceSettings, sinkSettings = sinkSettings, mappings = Seq(patientMappingTask), dataProcessingSettings = DataProcessingSettings())

Expand All @@ -220,7 +283,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta
status shouldEqual StatusCodes.Created
// validate that job metadata file is updated
val projects: JArray = TestUtil.getProjectJsonFile(toFhirEngineConfig)
(projects.arr.find(p => (p \ "id").extract[String] == projectId).get \ "mappingJobs").asInstanceOf[JArray].arr.length shouldEqual 2
(projects.arr.find(p => (p \ "id").extract[String] == projectId).get \ "mappingJobs").asInstanceOf[JArray].arr.length shouldEqual 3
// check job folder is created
FileUtils.getPath(toFhirEngineConfig.jobRepositoryFolderPath, projectId, s"${job2.id}${FileExtensions.JSON}").toFile should exist
}
Expand Down Expand Up @@ -249,7 +312,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta

"execute a mapping within a job without passing the mapping in the mapping task" in {
// create the mapping that will be tested
createMappingAndVerify("test-mappings/patient-mapping2.json", 2)
createMappingAndVerify("test-mappings/patient-mapping2.json", 3)

initializeTestMappingQuery(job2.id, "https://aiccelerate.eu/fhir/mappings/pilot1/patient-mapping2", Map("source" -> FileSystemSource(path = "patients.csv", contentType = SourceContentTypes.CSV))) ~> check {
status shouldEqual StatusCodes.OK
Expand All @@ -274,7 +337,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta
*/
"execute a mapping with a context within a job" in {
createSchemaAndVerify("test-schemas/other-observation-schema.json", 2)
createMappingAndVerify("test-mappings/other-observation-mapping.json", 3)
createMappingAndVerify("test-mappings/other-observation-mapping.json", 4)

// test a mapping
initializeTestMappingQuery(job2.id, "https://aiccelerate.eu/fhir/mappings/other-observation-mapping", Map("source" -> FileSystemSource(path = "other-observations.csv", contentType = SourceContentTypes.CSV))) ~> check {
Expand All @@ -300,7 +363,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta
status shouldEqual StatusCodes.Created
// validate that job metadata file is updated
val projects: JArray = TestUtil.getProjectJsonFile(toFhirEngineConfig)
(projects.arr.find(p => (p \ "id").extract[String] == projectId).get \ "mappingJobs").asInstanceOf[JArray].arr.length shouldEqual 3
(projects.arr.find(p => (p \ "id").extract[String] == projectId).get \ "mappingJobs").asInstanceOf[JArray].arr.length shouldEqual 4
// check job folder is created
FileUtils.getPath(toFhirEngineConfig.jobRepositoryFolderPath, projectId, s"${streamingJob.id}${FileExtensions.JSON}").toFile should exist
}
Expand Down Expand Up @@ -353,7 +416,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta
*/
"run a batch mapping job which converts FHIR Resources into a flat schema and write them to a CSV file" in {
// create the mapping
createMappingAndVerify("test-mappings/patient-flat-mapping.json", 4)
createMappingAndVerify("test-mappings/patient-flat-mapping.json", 5)
// create the job
val jobId = UUID.randomUUID().toString
val job: FhirMappingJob = FhirMappingJob(
Expand Down

0 comments on commit 650a096

Please sign in to comment.