Skip to content

Commit

Permalink
🐛 fix(ErrorHandling): Create an instance of FhirMappingJobManager for…
Browse files Browse the repository at this point in the history
… the mapping job to be executed
  • Loading branch information
dogukan10 authored and suatgonul committed Sep 1, 2023
1 parent 31d29dd commit ca2b63b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.io.Source
* @param contextLoader Context loader
* @param schemaLoader Schema (StructureDefinition) loader
* @param spark Spark session
* @param mappingErrorHandlingType How to handle errors encountered while executing the mapping
* @param ec Execution context
*/
class FhirMappingJobManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package io.tofhir.server.service

import com.typesafe.scalalogging.LazyLogging
import io.tofhir.engine.ToFhirEngine
import io.tofhir.engine.config.{ErrorHandlingType, ToFhirConfig}
import io.tofhir.engine.config.ErrorHandlingType.ErrorHandlingType
import io.tofhir.engine.config.ToFhirConfig
import io.tofhir.engine.mapping.{FhirMappingJobManager, MappingContextLoader}
import io.tofhir.engine.model._
import io.tofhir.engine.util.FileUtils
Expand Down Expand Up @@ -36,16 +37,6 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin

val toFhirEngine = new ToFhirEngine(Some(mappingRepository), Some(schemaRepository))

val fhirMappingJobManager =
new FhirMappingJobManager(
toFhirEngine.mappingRepo,
toFhirEngine.contextLoader,
toFhirEngine.schemaLoader,
toFhirEngine.functionLibraries,
toFhirEngine.sparkSession,
ErrorHandlingType.CONTINUE
)

/**
* Run the job for the given execute job tasks
*
Expand All @@ -69,6 +60,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
// create execution
val mappingJobExecution = FhirMappingJobExecution(jobId = mappingJob.id, projectId = projectId, mappingTasks = mappingTasks,
mappingErrorHandling = executeJobTask.flatMap(_.mappingErrorHandling).getOrElse(mappingJob.mappingErrorHandling))
val fhirMappingJobManager = getFhirMappingJobManager(mappingJob.mappingErrorHandling)
if (mappingJob.sourceSettings.exists(_._2.asStream)) {
Future { // TODO we lose the ability to stop the streaming job
val streamingQuery =
Expand Down Expand Up @@ -122,6 +114,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
testResourceCreationRequest.fhirMappingTask.copy(mapping = Some(mappingWithNormalizedContextUrls))
}

val fhirMappingJobManager = getFhirMappingJobManager(mappingJob.mappingErrorHandling)
val (fhirMapping, dataSourceSettings, dataFrame) = fhirMappingJobManager.readJoinSourceData(mappingTask, mappingJob.sourceSettings, jobId = Some(jobId))
val selected = DataFrameUtil.applyResourceFilter(dataFrame, testResourceCreationRequest.resourceFilter)
fhirMappingJobManager.executeTask(mappingJob.id, fhirMapping, selected, dataSourceSettings, mappingJob.terminologyServiceSettings, mappingJob.getIdentityServiceSettings())
Expand Down Expand Up @@ -279,4 +272,19 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
case None => throw ResourceNotFound("Mapping job does not exists.", s"A mapping job with id $jobId does not exists")
}
}

/**
* Returns FhirMappingJobManager instance for the given mapping error handling type.
* @param mappingErrorHandlingType How to handle errors encountered while executing the mapping
* @return FhirMappingJobManager
*/
private def getFhirMappingJobManager(mappingErrorHandlingType: ErrorHandlingType) =
new FhirMappingJobManager(
toFhirEngine.mappingRepo,
toFhirEngine.contextLoader,
toFhirEngine.schemaLoader,
toFhirEngine.functionLibraries,
toFhirEngine.sparkSession,
mappingErrorHandlingType
)
}

0 comments on commit ca2b63b

Please sign in to comment.