Skip to content

Commit

Permalink
Merge pull request #90 from srdc/fix-preprocessql-infer-schema
Browse files Browse the repository at this point in the history
✨ feat(InferTask): Add name field for using name in preproce…
  • Loading branch information
dogukan10 authored Aug 22, 2023
2 parents 4fce4ac + 4ebb410 commit 64448fa
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import io.tofhir.engine.model.{DataSourceSettings, FhirMappingSourceContext}
/**
* Infer task instance for getting source settings and source context settings.
* InferTask object is implemented only for SQL sources and file system sources.
*
* @param name alias for source context
* @param sourceSettings connection details for data source (e.g. JDBC connection details, file path)
* @param sourceContext mapping task source context (e.g. file name, table name, query)
*/
case class InferTask(sourceSettings: Map[String, DataSourceSettings], sourceContext: FhirMappingSourceContext)
case class InferTask(name: String, sourceSettings: Map[String, DataSourceSettings], sourceContext: FhirMappingSourceContext)
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class SchemaDefinitionService(schemaRepository: ISchemaRepository, mappingReposi
*/
def inferSchema(inferTask: InferTask): Future[Option[SchemaDefinition]] = {
// Execute SQL and get the dataFrame
val dataFrame = SourceHandler.readSource("unnamed", SparkConfig.sparkSession,
val dataFrame = SourceHandler.readSource(inferTask.name, SparkConfig.sparkSession,
inferTask.sourceContext, inferTask.sourceSettings.head._2, None, None, Some(1))
// Default name for undefined information
val defaultName: String = "unnamed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class SchemaEndpointTest extends BaseEndpointTest {
// set 5 second timeout for test because infer schema test can take longer than 1 second
implicit def default(implicit system: ActorSystem): RouteTestTimeout = RouteTestTimeout(5.seconds)
// inferTask object for infer schema test
val inferTask: InferTask = InferTask(sourceSettings = Map(
val inferTask: InferTask = InferTask(name="test", sourceSettings = Map(
"source" ->
SqlSourceSettings(name = "test-db-source", sourceUri = "https://aiccelerate.eu/data-integration-suite/test-data", databaseUrl = DATABASE_URL, username = "", password = "")
), sourceContext = SqlSource(query = Some("select * from death")))
), sourceContext = SqlSource(query = Some("select * from death"), preprocessSql = Some("select person_id, death_date, death_datetime, cause_source_value from test")))


// first schema schema to be created
Expand Down Expand Up @@ -232,14 +232,11 @@ class SchemaEndpointTest extends BaseEndpointTest {
// validate data types of schema
val schema: SchemaDefinition = JsonMethods.parse(responseAs[String]).extract[SchemaDefinition]
val fieldDefinitions = schema.fieldDefinitions.get
fieldDefinitions.size shouldEqual 7
fieldDefinitions.size shouldEqual 4
fieldDefinitions.head.dataTypes.get.head.dataType shouldEqual "integer"
fieldDefinitions(1).dataTypes.get.head.dataType shouldEqual "date"
fieldDefinitions(2).dataTypes.get.head.dataType shouldEqual "dateTime"
fieldDefinitions(3).dataTypes.get.head.dataType shouldEqual "integer"
fieldDefinitions(4).dataTypes.get.head.dataType shouldEqual "integer"
fieldDefinitions(5).dataTypes.get.head.dataType shouldEqual "string"
fieldDefinitions(6).dataTypes.get.head.dataType shouldEqual "integer"
fieldDefinitions(3).dataTypes.get.head.dataType shouldEqual "string"
}
}
}
Expand Down

0 comments on commit 64448fa

Please sign in to comment.