diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala index 1487d1d8..064ebadf 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/FhirMappingJobManager.scala @@ -25,6 +25,7 @@ import scala.io.Source * @param contextLoader Context loader * @param schemaLoader Schema (StructureDefinition) loader * @param spark Spark session + * @param mappingErrorHandlingType How to handle errors encountered while executing the mapping * @param ec Execution context */ class FhirMappingJobManager( diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala index f551b96f..b1250ccc 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala @@ -2,7 +2,8 @@ package io.tofhir.server.service import com.typesafe.scalalogging.LazyLogging import io.tofhir.engine.ToFhirEngine -import io.tofhir.engine.config.{ErrorHandlingType, ToFhirConfig} +import io.tofhir.engine.config.ErrorHandlingType.ErrorHandlingType +import io.tofhir.engine.config.ToFhirConfig import io.tofhir.engine.mapping.{FhirMappingJobManager, MappingContextLoader} import io.tofhir.engine.model._ import io.tofhir.engine.util.FileUtils @@ -36,16 +37,6 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin val toFhirEngine = new ToFhirEngine(Some(mappingRepository), Some(schemaRepository)) - val fhirMappingJobManager = - new FhirMappingJobManager( - toFhirEngine.mappingRepo, - toFhirEngine.contextLoader, - toFhirEngine.schemaLoader, - toFhirEngine.functionLibraries, - toFhirEngine.sparkSession, - ErrorHandlingType.CONTINUE - ) - /** * Run the job for the given execute job tasks * @@ -69,6 +60,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin // create execution val mappingJobExecution = FhirMappingJobExecution(jobId = mappingJob.id, projectId = projectId, mappingTasks = mappingTasks, mappingErrorHandling = executeJobTask.flatMap(_.mappingErrorHandling).getOrElse(mappingJob.mappingErrorHandling)) + val fhirMappingJobManager = getFhirMappingJobManager(mappingJob.mappingErrorHandling) if (mappingJob.sourceSettings.exists(_._2.asStream)) { Future { // TODO we lose the ability to stop the streaming job val streamingQuery = @@ -122,6 +114,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin testResourceCreationRequest.fhirMappingTask.copy(mapping = Some(mappingWithNormalizedContextUrls)) } + val fhirMappingJobManager = getFhirMappingJobManager(mappingJob.mappingErrorHandling) val (fhirMapping, dataSourceSettings, dataFrame) = fhirMappingJobManager.readJoinSourceData(mappingTask, mappingJob.sourceSettings, jobId = Some(jobId)) val selected = DataFrameUtil.applyResourceFilter(dataFrame, testResourceCreationRequest.resourceFilter) fhirMappingJobManager.executeTask(mappingJob.id, fhirMapping, selected, dataSourceSettings, mappingJob.terminologyServiceSettings, mappingJob.getIdentityServiceSettings()) @@ -279,4 +272,19 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin case None => throw ResourceNotFound("Mapping job does not exists.", s"A mapping job with id $jobId does not exists") } } + + /** + * Returns FhirMappingJobManager instance for the given mapping error handling type. + * @param mappingErrorHandlingType How to handle errors encountered while executing the mapping + * @return FhirMappingJobManager + */ + private def getFhirMappingJobManager(mappingErrorHandlingType: ErrorHandlingType) = + new FhirMappingJobManager( + toFhirEngine.mappingRepo, + toFhirEngine.contextLoader, + toFhirEngine.schemaLoader, + toFhirEngine.functionLibraries, + toFhirEngine.sparkSession, + mappingErrorHandlingType + ) }