From 5ff35485615fcd7abed254ed4524458b747ba3ad Mon Sep 17 00:00:00 2001 From: okanmercan Date: Tue, 8 Oct 2024 14:19:34 +0300 Subject: [PATCH 1/4] :recycle: refactor: Print multiple source in the log. --- .../tofhir/engine/mapping/MappingTaskExecutor.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala index d1a30ed3..08924e02 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala @@ -214,7 +214,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = Some(mappingExpr), timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(jo)), + source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), mappedResource = Some(Serialization.write(JArray(resources.toList))), fhirInteraction = fhirInteraction, executionId = executionId, @@ -228,7 +228,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = Some(mappingExpr), timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(jo)), + source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), mappedResource = Some(Serialization.write(r)), fhirInteraction = fhirInteraction, executionId = executionId, @@ -261,7 +261,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = Some(mappingExpr), timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(jo)), + source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), error = Some(FhirMappingError( code = FhirMappingErrorCodes.MAPPING_ERROR, description = errorDescription, @@ -276,7 +276,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = None, timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(jo)), + source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), error = Some(FhirMappingError( code = FhirMappingErrorCodes.MAPPING_ERROR, description = ExceptionUtil.extractExceptionMessages(e) @@ -290,7 +290,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = None, timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(jo)), + source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), error = Some(FhirMappingError( code = FhirMappingErrorCodes.MAPPING_TIMEOUT, description = s"A single row could not be mapped to FHIR in ${ToFhirConfig.engineConfig.mappingTimeout.toString}!" @@ -304,7 +304,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = None, timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(jo)), + source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), error = Some(FhirMappingError( code = FhirMappingErrorCodes.UNEXPECTED_PROBLEM, description = "Exception:" + oth.getMessage From fe9539fac9ae01b25317e52a7e95cdc52be7f737 Mon Sep 17 00:00:00 2001 From: okanmercan Date: Wed, 9 Oct 2024 09:26:21 +0300 Subject: [PATCH 2/4] :recycle: refactor: Change optional source field. --- .../processing/ErroneousRecordWriter.scala | 2 +- .../engine/mapping/MappingTaskExecutor.scala | 18 +++++------ .../engine/model/FhirMappingResult.scala | 6 ++-- .../data/write/FileSystemWriterTest.scala | 30 +++++++++---------- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/processing/ErroneousRecordWriter.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/processing/ErroneousRecordWriter.scala index 084a5567..307ee931 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/processing/ErroneousRecordWriter.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/processing/ErroneousRecordWriter.scala @@ -59,7 +59,7 @@ object ErroneousRecordWriter { mappingTaskName: String, errorType: String): Unit = { val outputPath = mappingJobExecution.getErrorOutputDirectory(mappingTaskName, errorType) - val schema = schema_of_json(dataset.rdd.takeSample(withReplacement = false, num = 1).head.source.get) + val schema = schema_of_json(dataset.rdd.takeSample(withReplacement = false, num = 1).head.source) dataset .withColumn("jsonData", from_json(col("source"), schema)) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala index 08924e02..f44c1626 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/MappingTaskExecutor.scala @@ -87,7 +87,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = None, timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(jo)), + source = Serialization.write(jo), error = Some(FhirMappingError( code = FhirMappingErrorCodes.INVALID_INPUT, description = validationError @@ -149,7 +149,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = None, timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(jo)), + source = Serialization.write(jo), error = Some(FhirMappingError( code = FhirMappingErrorCodes.INVALID_INPUT, description = validationErrors.mkString("\n") @@ -189,7 +189,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappedFhirResources = flattenedResources.map(r => MappedFhirResource(Some(r._1), Some(Serialization.write(r._2)), r._3)), timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), + source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs), executionId = executionId, projectId = fhirMappingService.projectId, )) @@ -214,7 +214,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = Some(mappingExpr), timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), + source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs), mappedResource = Some(Serialization.write(JArray(resources.toList))), fhirInteraction = fhirInteraction, executionId = executionId, @@ -228,7 +228,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = Some(mappingExpr), timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), + source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs), mappedResource = Some(Serialization.write(r)), fhirInteraction = fhirInteraction, executionId = executionId, @@ -261,7 +261,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = Some(mappingExpr), timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), + source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs), error = Some(FhirMappingError( code = FhirMappingErrorCodes.MAPPING_ERROR, description = errorDescription, @@ -276,7 +276,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = None, timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), + source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs), error = Some(FhirMappingError( code = FhirMappingErrorCodes.MAPPING_ERROR, description = ExceptionUtil.extractExceptionMessages(e) @@ -290,7 +290,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = None, timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), + source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs), error = Some(FhirMappingError( code = FhirMappingErrorCodes.MAPPING_TIMEOUT, description = s"A single row could not be mapped to FHIR in ${ToFhirConfig.engineConfig.mappingTimeout.toString}!" @@ -304,7 +304,7 @@ object MappingTaskExecutor { mappingTaskName = fhirMappingService.mappingTaskName, mappingExpr = None, timestamp = Timestamp.from(Instant.now()), - source = Some(Serialization.write(JObject("mainSource" -> jo) ~ otherInputs)), + source = Serialization.write(JObject("mainSource" -> jo) ~ otherInputs), error = Some(FhirMappingError( code = FhirMappingErrorCodes.UNEXPECTED_PROBLEM, description = "Exception:" + oth.getMessage diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingResult.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingResult.scala index 237f8860..81ca3a54 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingResult.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingResult.scala @@ -28,7 +28,7 @@ case class FhirMappingResult( mappingExpr:Option[String] = None, timestamp:Timestamp, mappedResource:Option[String] = None, - source:Option[String] = None, + source:String, error:Option[FhirMappingError] = None, fhirInteraction:Option[FhirInteraction] = None, executionId: Option[String] = None, @@ -39,7 +39,7 @@ case class FhirMappingResult( final val eventId:String = "MAPPING_RESULT" override def toString: String = { s"Mapping failure (${error.get.code}) for job '$jobId' and mappingTask '$mappingTaskName'${mappingExpr.map(e => s" within expression '$e'").getOrElse("")} execution '${executionId.getOrElse("")}'!\n"+ - s"\tSource: ${source.get}\n"+ + s"\tSource: ${source}\n"+ s"\tError: ${error.get.description}" + error.get.expression.map(e => s"\n\tExpression: $e").getOrElse("") } @@ -58,7 +58,7 @@ case class FhirMappingResult( markerMap.put("executionId", executionId.getOrElse("")) markerMap.put("mappingTaskName", mappingTaskName) markerMap.put("mappingExpr", mappingExpr.orElse(null)) - markerMap.put("source", source.get) + markerMap.put("source", source) markerMap.put("errorCode", error.get.code) markerMap.put("errorDesc", error.get.description) markerMap.put("errorExpr", error.get.expression.getOrElse("")) diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/FileSystemWriterTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/FileSystemWriterTest.scala index d8b6c074..46620ff6 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/FileSystemWriterTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/FileSystemWriterTest.scala @@ -418,7 +418,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression1"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"34dc88d5972fd5472a942fc80f69f35c\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p1\"}],\"gender\":\"male\",\"birthDate\":\"2000-05-10\"}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -431,7 +431,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"0b3a0b23a0c6e223b941e63787f15a6a\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p2\"}],\"gender\":\"male\",\"birthDate\":\"1985-05-08\",\"deceasedDateTime\":\"2017-03-10\",\"address\":[{\"use\":\"home\",\"type\":\"both\",\"postalCode\":\"G02547\"}]}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -444,7 +444,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"49d3c335681ab7fb2d4cdf19769655db\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p3\"}],\"gender\":\"male\",\"birthDate\":\"1997-02\"}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -457,7 +457,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"0bbad2343eb86d5cdc16a1b292537576\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p4\"}],\"gender\":\"male\",\"birthDate\":\"1999-06-05\",\"address\":[{\"use\":\"home\",\"type\":\"both\",\"postalCode\":\"H10564\"}]}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -470,7 +470,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"7b650be0176d6d29351f84314a5efbe3\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p5\"}],\"gender\":\"male\",\"birthDate\":\"1965-10-01\",\"deceasedDateTime\":\"2019-04-21\",\"address\":[{\"use\":\"home\",\"type\":\"both\",\"postalCode\":\"G02547\"}]}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -483,7 +483,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"17c7c9664ac82f384de0ad4625f2ae4c\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p6\"}],\"gender\":\"female\",\"birthDate\":\"1991-03\"}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -496,7 +496,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"e1ea114dcfcea572982f224e43deb7a6\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p7\"}],\"gender\":\"female\",\"birthDate\":\"1972-10-25\",\"address\":[{\"use\":\"home\",\"type\":\"both\",\"postalCode\":\"V13135\"}]}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -509,7 +509,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"f6bf84d63799f65dcdd4f5027021adf3\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p8\"}],\"gender\":\"female\",\"birthDate\":\"2010-01-10\",\"address\":[{\"use\":\"home\",\"type\":\"both\",\"postalCode\":\"Z54564\"}]}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -522,7 +522,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"a06f7d449f8a655d9163204f0de8237f\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p9\"}],\"gender\":\"female\",\"birthDate\":\"1999-05-12\"}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -535,7 +535,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Patient\",\"id\":\"7bd4fad75b1efbdc50859a736b839e24\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Patient\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"active\":true,\"identifier\":[{\"use\":\"official\",\"system\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\",\"value\":\"p10\"}],\"gender\":\"female\",\"birthDate\":\"2003-11\"}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -549,7 +549,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Condition\",\"id\":\"2faab6373e7c3bba4c1971d089fc6407\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Condition\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"clinicalStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-clinical\",\"code\":\"active\"}]},\"verificationStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-ver-status\",\"code\":\"confirmed\"}]},\"category\":[{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-category\",\"code\":\"problem-list-item\"}]}],\"code\":{\"coding\":[{\"system\":\"http://hl7.org/fhir/sid/icd-10\",\"code\":\"J13\",\"display\":\"Pneumonia due to Streptococcus pneumoniae\"}]},\"subject\":{\"reference\":\"Patient/34dc88d5972fd5472a942fc80f69f35c\"},\"onsetDateTime\":\"2012-10-15\"}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -562,7 +562,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Condition\",\"id\":\"63058b87a718e66d4198703675b0204a\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Condition\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"clinicalStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-clinical\",\"code\":\"inactive\"}]},\"category\":[{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-category\",\"code\":\"encounter-diagnosis\"}]}],\"code\":{\"coding\":[{\"system\":\"http://hl7.org/fhir/sid/icd-10\",\"code\":\"G40\",\"display\":\"Parkinson\"}]},\"subject\":{\"reference\":\"Patient/0b3a0b23a0c6e223b941e63787f15a6a\"},\"encounter\":{\"reference\":\"Encounter/bb7134de6cdbf64352b074e9d2555adb\"},\"onsetDateTime\":\"2013-05-07\",\"abatementDateTime\":\"2013-05-22\"}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -575,7 +575,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Condition\",\"id\":\"ec4aed2cb844c70104e467fad58f6a44\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Condition\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"clinicalStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-clinical\",\"code\":\"inactive\"}]},\"verificationStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-ver-status\",\"code\":\"unconfirmed\"}]},\"category\":[{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-category\",\"code\":\"encounter-diagnosis\"}]}],\"code\":{\"coding\":[{\"system\":\"http://hl7.org/fhir/sid/icd-10\",\"code\":\"J85\",\"display\":\"Abscess of lung and mediastinum\"}]},\"subject\":{\"reference\":\"Patient/49d3c335681ab7fb2d4cdf19769655db\"},\"onsetDateTime\":\"2016-02-11\",\"asserter\":{\"reference\":\"Practitioner/09361569c5dee906d244968c680cf2b4\"}}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -588,7 +588,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Condition\",\"id\":\"6e0337f749b68a5450efb3fe6f918362\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Condition\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"clinicalStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-clinical\",\"code\":\"inactive\"}]},\"category\":[{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-category\",\"code\":\"encounter-diagnosis\"}]}],\"code\":{\"coding\":[{\"system\":\"http://hl7.org/fhir/sid/icd-10\",\"code\":\"M89.9\",\"display\":\"Disorder of bone, unspecified\"}]},\"subject\":{\"reference\":\"Patient/0bbad2343eb86d5cdc16a1b292537576\"},\"onsetDateTime\":\"2014-01-05T10:00:00Z\"}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), @@ -601,7 +601,7 @@ class FileSystemWriterTest extends AnyFlatSpec with BeforeAndAfterAll { mappingExpr = Some("expression"), timestamp = new Timestamp(System.currentTimeMillis()), mappedResource = Some("{\"resourceType\":\"Condition\",\"id\":\"14ce4f8a1b8161ad59e1a8d67ce8d06d\",\"meta\":{\"profile\":[\"https://aiccelerate.eu/fhir/StructureDefinition/AIC-Condition\"],\"source\":\"https://aiccelerate.eu/data-integration-suite/pilot1-data\"},\"clinicalStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-clinical\",\"code\":\"inactive\"}]},\"category\":[{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-category\",\"code\":\"encounter-diagnosis\"}]}],\"code\":{\"coding\":[{\"system\":\"http://hl7.org/fhir/sid/icd-10\",\"code\":\"G40.419\",\"display\":\"Other generalized epilepsy and epileptic syndromes, intractable, without status epilepticus\"}]},\"subject\":{\"reference\":\"Patient/7b650be0176d6d29351f84314a5efbe3\"},\"onsetDateTime\":\"2009-04-07\",\"asserter\":{\"reference\":\"Practitioner/b2e43c8d7dae698f539b1924679a7814\"}}"), - source = Some("Source"), + source = "Source", error = None, fhirInteraction = None, executionId = Some("exec"), From ab2db4f53fd395780d1c8100a3df47a5ef260762 Mon Sep 17 00:00:00 2001 From: okanmercan Date: Wed, 9 Oct 2024 09:39:34 +0300 Subject: [PATCH 3/4] :bug: test: Fix SinkHandlerTest. --- .../io/tofhir/test/engine/data/write/SinkHandlerTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/SinkHandlerTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/SinkHandlerTest.scala index 42cf3b79..6900be4e 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/SinkHandlerTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/engine/data/write/SinkHandlerTest.scala @@ -33,7 +33,7 @@ class SinkHandlerTest extends AnyFlatSpec { .load() .coalesce(1) .map(_ => { // Map the generated rows to some dummy mapping result - FhirMappingResult("someId", "someUrl", None, Timestamp.from(Instant.now()), Some("")) + FhirMappingResult("someId", "someUrl", None, Timestamp.from(Instant.now()), Some(""), "") }) // Configure the mock writer such that it would throw an exception for the first chunk but not for the subsequent chunks From 98134b1344aaaa85ceaaa63deaf2a91fca4170c5 Mon Sep 17 00:00:00 2001 From: dogukan10 Date: Fri, 11 Oct 2024 11:21:44 +0300 Subject: [PATCH 4/4] :sparkles: Improve ErroneousRecordWriter to handle multiple sources --- pom.xml | 2 +- .../processing/ErroneousRecordWriter.scala | 77 ++++++++++++++----- .../test-data/patient-gender-simple.csv | 2 + .../resources/test-data/patient-simple.csv | 2 + .../patient-mapping-job-with-two-sources.json | 51 ++++++++++++ .../patient-mapping-with-two-sources.json | 42 ++++++++++ .../MappingExecutionEndpointTest.scala | 75 ++++++++++++++++-- 7 files changed, 225 insertions(+), 26 deletions(-) create mode 100644 tofhir-server/src/test/resources/test-data/patient-gender-simple.csv create mode 100644 tofhir-server/src/test/resources/test-data/patient-simple.csv create mode 100644 tofhir-server/src/test/resources/test-mapping-job/patient-mapping-job-with-two-sources.json create mode 100644 tofhir-server/src/test/resources/test-mappings/patient-mapping-with-two-sources.json diff --git a/pom.xml b/pom.xml index c9bf415d..dad04647 100644 --- a/pom.xml +++ b/pom.xml @@ -96,7 +96,7 @@ 3.9.5 - 3.4-SNAPSHOT + 3.3-SNAPSHOT 1.1-SNAPSHOT 1.0-SNAPSHOT 3.7.0-M11 diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/processing/ErroneousRecordWriter.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/processing/ErroneousRecordWriter.scala index 307ee931..f68dace7 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/execution/processing/ErroneousRecordWriter.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/execution/processing/ErroneousRecordWriter.scala @@ -2,12 +2,12 @@ package io.tofhir.engine.execution.processing import io.tofhir.engine.model.{FhirMappingErrorCodes, FhirMappingJobExecution, FhirMappingResult} import io.tofhir.engine.util.FileUtils.FileExtensions -import org.apache.hadoop.fs.FileUtil import org.apache.spark.sql.functions.{col, from_json, schema_of_json} import org.apache.spark.sql.{Dataset, SaveMode, SparkSession} import java.io.File import java.util +import io.onfhir.util.JsonFormatter._ /** * This class persists [[FhirMappingResult]]s that produced some error during the mapping process. @@ -46,13 +46,16 @@ object ErroneousRecordWriter { } /** - * Writes the dataset to the errorOutputDirectory. Directory structure: - * error-folder-path\\job-\execution-\\.csv + * Writes the dataset to the error output directory based on the error type and job details. + * Each source (e.g., "mainSource") within the "source" field is extracted dynamically, + * and its data is written into a separate subdirectory under the output path. The directory structure is as follows: * - * @param mappingJobExecution job execution to get output directory of data sources with errors - * @param dataset filtered dataset of data sources with errors to write to configured folder - * @param mappingTaskName to create directory for each mapping name within the job execution - * @param errorType one of invalid_input, mapping_error, invalid_resource + * error-folder-path\\job-\execution-\\\.csv + * + * @param mappingJobExecution The job execution object, used to get the output directory of data sources with errors. + * @param dataset The filtered dataset of data sources with errors to be written to the configured folder. + * @param mappingTaskName The name of the mapping task, used to create a directory for each mapping within the job execution. + * @param errorType The type of error (e.g., invalid_input, mapping_error, invalid_resource). */ private def writeErroneousDataset(mappingJobExecution: FhirMappingJobExecution, dataset: Dataset[FhirMappingResult], @@ -61,18 +64,54 @@ object ErroneousRecordWriter { val outputPath = mappingJobExecution.getErrorOutputDirectory(mappingTaskName, errorType) val schema = schema_of_json(dataset.rdd.takeSample(withReplacement = false, num = 1).head.source) - dataset - .withColumn("jsonData", from_json(col("source"), schema)) - .select("jsonData.*") - .coalesce(1) - .write - .mode(SaveMode.Append) - .option("header", "true") - .csv(outputPath) + // extract each source name (e.g., "mainSource", "secondarySource") from the dataset's "source" field + val jsonColumns: Array[String] = dataset + .select("source") + .rdd + .flatMap(row => row.getAs[String]("source").parseJson.values.keys) // parse JSON and get all keys from the "source" map + .distinct() // remove duplicate source names + .collect() + + // for each source, create and write a separate CSV file + jsonColumns.foreach { sourceKey => + // extract the source data for the given sourceKey and flatten the selected source field + val sourceDataset = dataset + .withColumn("jsonData", from_json(col("source"), schema)) + .selectExpr(s"jsonData.$sourceKey.*") - // Remove all files except the CSV file (to remove .crc files) - val srcFiles = FileUtil.listFiles(new File(outputPath)) - .filterNot(f => f.getPath.endsWith(FileExtensions.CSV.toString)) - srcFiles.foreach(f => f.delete()) + // write the extracted source data as a separate CSV file into the directory for that sourceKey + sourceDataset + .coalesce(1) + .write + .mode(SaveMode.Append) + .option("header", "true") + .csv(s"$outputPath/$sourceKey") + } + + // remove all files except the CSV file (to remove .crc files) + deleteNonCsvFiles(new File(outputPath)) + } + + /** + * Recursively deletes non-CSV files (like .crc files) from the directory. + * + * @param dir The directory to clean up by removing non-CSV files. + */ + private def deleteNonCsvFiles(dir: File): Unit = { + if (dir.exists() && dir.isDirectory) { + // get a list of files to delete, excluding CSV files + val filesToDelete = dir.listFiles() + .filterNot(f => f.getPath.endsWith(FileExtensions.CSV.toString)) + // process each file in the current directory + filesToDelete.foreach(file => { + if (file.isFile) { + // delete the file if it's a regular file + file.delete() + } else if (file.isDirectory) { + // if it's a subdirectory, recursively clean it up + deleteNonCsvFiles(file) + } + }) + } } } diff --git a/tofhir-server/src/test/resources/test-data/patient-gender-simple.csv b/tofhir-server/src/test/resources/test-data/patient-gender-simple.csv new file mode 100644 index 00000000..5e454c4d --- /dev/null +++ b/tofhir-server/src/test/resources/test-data/patient-gender-simple.csv @@ -0,0 +1,2 @@ +pid,gender +test-patient,male \ No newline at end of file diff --git a/tofhir-server/src/test/resources/test-data/patient-simple.csv b/tofhir-server/src/test/resources/test-data/patient-simple.csv new file mode 100644 index 00000000..3ea24eec --- /dev/null +++ b/tofhir-server/src/test/resources/test-data/patient-simple.csv @@ -0,0 +1,2 @@ +pid +test-patient \ No newline at end of file diff --git a/tofhir-server/src/test/resources/test-mapping-job/patient-mapping-job-with-two-sources.json b/tofhir-server/src/test/resources/test-mapping-job/patient-mapping-job-with-two-sources.json new file mode 100644 index 00000000..d9b62656 --- /dev/null +++ b/tofhir-server/src/test/resources/test-mapping-job/patient-mapping-job-with-two-sources.json @@ -0,0 +1,51 @@ +{ + "name": "patient-job-test", + "sourceSettings": { + "patientSource": { + "jsonClass": "FileSystemSourceSettings", + "name": "patient-test-data", + "sourceUri": "http://test-data", + "dataFolderPath": "./test-data", + "asStream": false + }, + "genderSource": { + "jsonClass": "FileSystemSourceSettings", + "name": "patient-gender-test-data", + "sourceUri": "http://test-data", + "dataFolderPath": "./test-data", + "asStream": false + } + }, + "sinkSettings": { + "jsonClass": "FhirRepositorySinkSettings", + "fhirRepoUrl": "http://localhost:8081/fhir", + "returnMinimal": true + }, + "mappings": [ + { + "name": "patient-mapping-with-two-sources", + "mappingRef": "http://patient-mapping-with-two-sources", + "sourceBinding": { + "patient": { + "jsonClass": "FileSystemSource", + "path": "patient-simple.csv", + "contentType": "csv", + "options": {}, + "sourceRef": "patientSource" + }, + "patientGender": { + "jsonClass": "FileSystemSource", + "path": "patient-gender-simple.csv", + "contentType": "csv", + "options": {}, + "sourceRef": "genderSource" + } + } + } + ], + "dataProcessingSettings": { + "saveErroneousRecords": true, + "archiveMode": "off" + }, + "useFhirSinkAsIdentityService": false +} \ No newline at end of file diff --git a/tofhir-server/src/test/resources/test-mappings/patient-mapping-with-two-sources.json b/tofhir-server/src/test/resources/test-mappings/patient-mapping-with-two-sources.json new file mode 100644 index 00000000..4c66a0fe --- /dev/null +++ b/tofhir-server/src/test/resources/test-mappings/patient-mapping-with-two-sources.json @@ -0,0 +1,42 @@ +{ + "id": "patient-mapping-two-sources", + "url": "http://patient-mapping-with-two-sources", + "name": "patient-mapping", + "title": "Patient Mapping", + "source": [ + { + "alias": "patient", + "url": "http://patient-schema", + "joinOn": [ + "pid" + ] + }, + { + "alias": "patientGender", + "url": "http://patient-gender", + "joinOn": [ + "pid" + ] + } + ], + "context": {}, + "variable": [], + "mapping": [ + { + "expression": { + "name": "patient", + "language": "application/fhir-template+json", + "value": { + "gender": "{{%patientGender.gender + 1}}", + "id": "{{pid}}", + "meta": { + "profile": [ + "http://hl7.org/fhir/StructureDefinition/Patient" + ] + }, + "resourceType": "Patient" + } + } + } + ] +} \ No newline at end of file diff --git a/tofhir-server/src/test/scala/io/tofhir/server/endpoint/MappingExecutionEndpointTest.scala b/tofhir-server/src/test/scala/io/tofhir/server/endpoint/MappingExecutionEndpointTest.scala index 12bb5ebb..e1ceee53 100644 --- a/tofhir-server/src/test/scala/io/tofhir/server/endpoint/MappingExecutionEndpointTest.scala +++ b/tofhir-server/src/test/scala/io/tofhir/server/endpoint/MappingExecutionEndpointTest.scala @@ -197,7 +197,10 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta val erroneousRecordsFolder = Paths.get(toFhirEngineConfig.erroneousRecordsFolder, FhirMappingErrorCodes.MAPPING_ERROR) erroneousRecordsFolder.toFile.exists() && { val jobFolder = Paths.get(erroneousRecordsFolder.toString, s"job-${batchJob.id}").toFile - val csvFile = jobFolder.listFiles().head.listFiles().head.listFiles().head + val csvFile = jobFolder.listFiles().head // execution folder + .listFiles().head // mapping task folder + .listFiles().head // source folder i.e. main source, secondary source etc. + .listFiles().head // csv file // Spark initially writes data to files in the "_temporary" directory. After all tasks complete successfully, // the files are moved from "_temporary" to the parent output directory, and "_temporary" is deleted. This // intermediate step can be observed during testing, which is why we check if the file is a CSV. @@ -211,6 +214,66 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta } } + "save erroneous records for job with multiple sources" in { + // Create a new mapping + createMappingAndVerify("test-mappings/patient-mapping-with-two-sources.json", 2) + + // Update the job with the new mapping and new sink configuration + val patientMappingTask: FhirMappingTask = FhirMappingTask( + name = "patient-mapping-two-sources", + mappingRef = "http://patient-mapping-with-two-sources", + sourceBinding = Map("patient" -> FileSystemSource(path = "patient-simple.csv", contentType = SourceContentTypes.CSV), + "patientGender" -> FileSystemSource(path = "patient-gender-simple.csv", contentType = SourceContentTypes.CSV)) + ) + sinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = onFhirClient.getBaseUrl()) + val job = batchJob.copy(id = UUID.randomUUID().toString, mappings = Seq(patientMappingTask), sinkSettings = sinkSettings, name = Some("twoSourceJob")) + + // Create the job + Post(s"/${webServerConfig.baseUri}/${ProjectEndpoint.SEGMENT_PROJECTS}/$projectId/${JobEndpoint.SEGMENT_JOB}", HttpEntity(ContentTypes.`application/json`, writePretty(job))) ~> route ~> check { + status shouldEqual StatusCodes.Created + } + + // Run the job + Post(s"/${webServerConfig.baseUri}/${ProjectEndpoint.SEGMENT_PROJECTS}/$projectId/${JobEndpoint.SEGMENT_JOB}/${job.id}/${JobEndpoint.SEGMENT_RUN}", HttpEntity(ContentTypes.`application/json`, "")) ~> route ~> check { + status shouldEqual StatusCodes.OK + + // test if erroneous records are written to error folder + val success = waitForCondition(120) { + val erroneousRecordsFolder = Paths.get(toFhirEngineConfig.erroneousRecordsFolder, FhirMappingErrorCodes.MAPPING_ERROR) + val jobFolder = Paths.get(erroneousRecordsFolder.toString, s"job-${job.id}").toFile + jobFolder.exists() && { + val sourceFolders = jobFolder.listFiles().head // execution folder + .listFiles().head // mapping task folder + .listFiles() // source folder i.e. main source, secondary source etc. + sourceFolders.length == 2 && { + val mainSource = sourceFolders.head + val csvFile = mainSource.listFiles().head + mainSource.getName.contentEquals("mainSource") && + // Spark initially writes data to files in the "_temporary" directory. After all tasks complete successfully, + // the files are moved from "_temporary" to the parent output directory, and "_temporary" is deleted. This + // intermediate step can be observed during testing, which is why we check if the file is a CSV. + csvFile.exists() && csvFile.getName.endsWith(".csv") && { + val csvFileContent = sparkSession.read.option("header", "true").csv(csvFile.getPath) + csvFileContent.count() == 1 + } + } && { + val secondarySource = sourceFolders.last + val csvFile = secondarySource.listFiles().head + secondarySource.getName.contentEquals("patientGender") && + // Spark initially writes data to files in the "_temporary" directory. After all tasks complete successfully, + // the files are moved from "_temporary" to the parent output directory, and "_temporary" is deleted. This + // intermediate step can be observed during testing, which is why we check if the file is a CSV. + csvFile.exists() && csvFile.getName.endsWith(".csv") && { + val csvFileContent = sparkSession.read.option("header", "true").csv(csvFile.getPath) + csvFileContent.count() == 1 + } + } + } + } + if (!success) fail("Failed to find expected number of erroneous records. Either the erroneous record file is not available or the number of records does not match") + } + } + sinkSettings = FileSystemSinkSettings(path = s"./$fsSinkFolderName/job2", SinkContentTypes.NDJSON) val job2: FhirMappingJob = FhirMappingJob(name = Some("mappingJob2"), sourceSettings = mappingJobSourceSettings, sinkSettings = sinkSettings, mappings = Seq(patientMappingTask), dataProcessingSettings = DataProcessingSettings()) @@ -220,7 +283,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta status shouldEqual StatusCodes.Created // validate that job metadata file is updated val projects: JArray = TestUtil.getProjectJsonFile(toFhirEngineConfig) - (projects.arr.find(p => (p \ "id").extract[String] == projectId).get \ "mappingJobs").asInstanceOf[JArray].arr.length shouldEqual 2 + (projects.arr.find(p => (p \ "id").extract[String] == projectId).get \ "mappingJobs").asInstanceOf[JArray].arr.length shouldEqual 3 // check job folder is created FileUtils.getPath(toFhirEngineConfig.jobRepositoryFolderPath, projectId, s"${job2.id}${FileExtensions.JSON}").toFile should exist } @@ -249,7 +312,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta "execute a mapping within a job without passing the mapping in the mapping task" in { // create the mapping that will be tested - createMappingAndVerify("test-mappings/patient-mapping2.json", 2) + createMappingAndVerify("test-mappings/patient-mapping2.json", 3) initializeTestMappingQuery(job2.id, "https://aiccelerate.eu/fhir/mappings/pilot1/patient-mapping2", Map("source" -> FileSystemSource(path = "patients.csv", contentType = SourceContentTypes.CSV))) ~> check { status shouldEqual StatusCodes.OK @@ -274,7 +337,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta */ "execute a mapping with a context within a job" in { createSchemaAndVerify("test-schemas/other-observation-schema.json", 2) - createMappingAndVerify("test-mappings/other-observation-mapping.json", 3) + createMappingAndVerify("test-mappings/other-observation-mapping.json", 4) // test a mapping initializeTestMappingQuery(job2.id, "https://aiccelerate.eu/fhir/mappings/other-observation-mapping", Map("source" -> FileSystemSource(path = "other-observations.csv", contentType = SourceContentTypes.CSV))) ~> check { @@ -300,7 +363,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta status shouldEqual StatusCodes.Created // validate that job metadata file is updated val projects: JArray = TestUtil.getProjectJsonFile(toFhirEngineConfig) - (projects.arr.find(p => (p \ "id").extract[String] == projectId).get \ "mappingJobs").asInstanceOf[JArray].arr.length shouldEqual 3 + (projects.arr.find(p => (p \ "id").extract[String] == projectId).get \ "mappingJobs").asInstanceOf[JArray].arr.length shouldEqual 4 // check job folder is created FileUtils.getPath(toFhirEngineConfig.jobRepositoryFolderPath, projectId, s"${streamingJob.id}${FileExtensions.JSON}").toFile should exist } @@ -353,7 +416,7 @@ class MappingExecutionEndpointTest extends BaseEndpointTest with OnFhirTestConta */ "run a batch mapping job which converts FHIR Resources into a flat schema and write them to a CSV file" in { // create the mapping - createMappingAndVerify("test-mappings/patient-flat-mapping.json", 4) + createMappingAndVerify("test-mappings/patient-flat-mapping.json", 5) // create the job val jobId = UUID.randomUUID().toString val job: FhirMappingJob = FhirMappingJob(