Skip to content

Commit

Permalink
Merge pull request #30 in BACIBBD/spline from feature/SL-43-new-data-…
Browse files Browse the repository at this point in the history
…model to release/0.2

* commit '9c5cd9d43b8866d7be9646b8a8c66b8195107a7f':
  SL-62 + scala doc
  SL-62 Fix REST serialization by switching from Salat JSON to Json4s impl
  SL-61 Redo loading indicator
  SL-61 Redo nodes highlighting
  SL-61 Redo attribute selection
  SL-61 Redo full schema view
  SL-60 - Moving reference to salat-core from api to mongo module.
  SL-60 - Fixing binary incompatibility of json4s-native between hdfs and atlas persistence layers.
  SL-59 Updating comments of Dataset converter.
  SL-59 Fixing Atlas persistence layer
  SL-61 Angular/material: Replace 'MD' suffixes to 'MAT' according to the recent change in the library (see angular/components#7241)
  SL-61 Redo graph visualization + fix types + fix server side unit tests
  SL-59 Adding comment parameters
  SL-59 Refactoring Atlas persistence layer according to the latest Splline data model.
  SL-57 Refactoring harvester according the latest Spline data model
  SL-61 Start updating Web UI layer according to the new data model Fix SL-58 compilation errors
  SL-59 Renaming AtlasDataLineagePersistor to AtlasDataLineageWriter
  SL-58 Changing property name according to the naming convention for ids (yyyID -> yyyId)
  SL-58 Updating Mongo persistence layer according to the new data model
  SL-43 Data Set Oriented Data Model
  • Loading branch information
wajda committed Oct 12, 2017
2 parents 87f349c + 9c5cd9d commit 13b1de7
Show file tree
Hide file tree
Showing 149 changed files with 9,439 additions and 100,401 deletions.
4 changes: 4 additions & 0 deletions commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2017 Barclays Africa Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.common

import scala.reflect.runtime.{universe => ru}

/**
* Reflection utils
*/
object ReflectionUtils {

private val mirror: ru.Mirror = ru.runtimeMirror(getClass.getClassLoader)

/**
* Lists all direct sub-classes of the given trait T
*
* @tparam T sealed trait type
* @return List of Class[_] instances
*/
def subClassesOf[T: ru.TypeTag]: List[Class[_]] = {
val clazz: ru.ClassSymbol = ru.typeOf[T].typeSymbol.asClass
require(clazz.isTrait && clazz.isSealed)
clazz.knownDirectSubclasses.toList map ((s: ru.Symbol) => mirror runtimeClass s.asClass)
}
}
88 changes: 88 additions & 0 deletions core/src/main/scala/za/co/absa/spline/core/AttributeFactory.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2017 Barclays Africa Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.core

import java.util.UUID
import java.util.UUID.randomUUID

import org.apache.spark.sql
import za.co.absa.spline.model.Attribute

import scala.collection.mutable.{Map, ListBuffer}

/**
* The class is responsible for creation of [[za.co.absa.spline.model.Attribute attributes]] and assigning them unique identifiers.
*/
class AttributeFactory extends DataTypeMapper{
private val mapById : Map[UUID, Attribute] = Map()
private val mapBySparkId : Map[Long, Attribute] = Map()
private val allAttributes : ListBuffer[Attribute] = ListBuffer[Attribute]()

/**
* The method fills up the internal collection with initial sequence of attributes.
* @param sparkIds A sequence of unique identifiers provided by Spark
* @param attributes A sequence of attributes
*/
def initialize(sparkIds: Seq[Long], attributes: Seq[Attribute]) : Unit =
mapById.synchronized {
mapById.clear()
mapBySparkId.clear()
sparkIds.zip(attributes).foreach {
case (k, a) => {
mapBySparkId.put(k, a)
mapById.put(a.id, a)
allAttributes ++= attributes
}
}
}

/**
* The method creates an attribute if does not exist. Returns identifier to the attribute matching the input criteria.
* @param sparkAttributeId An unique identifier of the attribute assigned by Spark
* @param name A name of the attribute
* @param sparkDataType A Spark dataType related to the attribute
* @param nullable A flag expressing whether the attribute is nullable or not
* @return An unique identifier of the created attribute
*/
def getOrCreate(sparkAttributeId : Long, name : String, sparkDataType: sql.types.DataType, nullable: Boolean) : UUID =
mapById.synchronized(
mapBySparkId.get(sparkAttributeId) match {
case Some(x) => x.id
case None => {
val a = Attribute(randomUUID, name, fromSparkDataType(sparkDataType, nullable))
mapBySparkId.put(sparkAttributeId, a)
mapById.put(a.id, a)
allAttributes += a
a.id
}
}
)

/**
* The method returns an attribute for a specific identifier if has already been created by the factory. Otherwise, returns None.
* @param id An identifier of the attribute
* @return An option
*/
def getById(id : UUID) : Option[Attribute] = mapById.synchronized(mapById.get(id))

/**
* The method returns all attributes created by the factory.
* @return A sequence of attributes
*/
def getAll(): Seq[Attribute] = mapById.synchronized(allAttributes.toList)

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ package za.co.absa.spline.core
import java.util.UUID

import org.apache.hadoop.conf.Configuration
import za.co.absa.spline.model.{DataLineage, OperationNode}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
import za.co.absa.spline.common.transformations.TransformationPipeline
import za.co.absa.spline.core.transformations.ProjectionMerger
import za.co.absa.spline.core.transformations.{ProjectionMerger, ReferenceConsolidator}
import za.co.absa.spline.model.DataLineage
import za.co.absa.spline.model.op.Operation

import scala.collection.mutable
import scala.language.postfixOps

/** The object is responsible for gathering lineage information from Spark internal structures (logical plan, physical plan, etc.) */
object DataLineageHarvester {
/** The class is responsible for gathering lineage information from Spark internal structures (logical plan, physical plan, etc.) */
class DataLineageHarvester(hadoopConfiguration: Configuration) {

val transformationPipeline = new TransformationPipeline(Seq(ProjectionMerger))

Expand All @@ -38,18 +40,29 @@ object DataLineageHarvester {
* @param queryExecution An instance holding Spark internal structures (logical plan, physical plan, etc.)
* @return A lineage representation
*/
def harvestLineage(queryExecution: QueryExecution, hadoopConfiguration: Configuration): DataLineage = {
val nodes = harvestOperationNodes(queryExecution.analyzed, hadoopConfiguration)
def harvestLineage(queryExecution: QueryExecution): DataLineage = {
val attributeFactory = new AttributeFactory()
val metaDatasetFactory = new MetaDatasetFactory(attributeFactory)
val operationNodeBuilderFactory = new OperationNodeBuilderFactory(hadoopConfiguration, metaDatasetFactory)
val nodes = harvestOperationNodes(queryExecution.analyzed, operationNodeBuilderFactory)
val transformedNodes = transformationPipeline.apply(nodes)

DataLineage(
val sparkContext = queryExecution.sparkSession.sparkContext

val lineage = DataLineage(
UUID.randomUUID,
queryExecution.sparkSession.sparkContext.appName,
transformedNodes
sparkContext.applicationId,
sparkContext.appName,
System.currentTimeMillis(),
transformedNodes,
metaDatasetFactory.getAll(),
attributeFactory.getAll()
)

ReferenceConsolidator(lineage)
}

private def harvestOperationNodes(logicalPlan: LogicalPlan, hadoopConfiguration: Configuration): Seq[OperationNode] = {
private def harvestOperationNodes(logicalPlan: LogicalPlan, operationNodeBuilderFactory: OperationNodeBuilderFactory): Seq[Operation] = {
val result = mutable.ArrayBuffer[OperationNodeBuilder[_]]()
val stack = mutable.Stack[(LogicalPlan, Int)]((logicalPlan, -1))
val visitedNodes = mutable.Map[LogicalPlan, Int]()
Expand All @@ -60,7 +73,7 @@ object DataLineageHarvester {
val currentNode: OperationNodeBuilder[_] = currentPosition match {
case Some(pos) => result(pos)
case None =>
val newNode = OperationNodeBuilderFactory.create(currentOperation, hadoopConfiguration)
val newNode = operationNodeBuilderFactory.create(currentOperation)
visitedNodes += (currentOperation -> result.size)
currentPosition = Some(result.size)
result += newNode
Expand All @@ -73,14 +86,9 @@ object DataLineageHarvester {

if (parentPosition >= 0) {
val parent = result(parentPosition)
parent.childRefs += currentPosition.get
currentNode.output foreach (parent.input +=)
currentNode.parentRefs += parentPosition

parent.inputMetaDatasets += currentNode.outputMetaDataset
}
}
result.map(i => i.build())
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,20 @@

package za.co.absa.spline.core

import java.util.UUID

import org.apache.hadoop.conf.Configuration
import za.co.absa.spline.model.Execution
import za.co.absa.spline.persistence.api.PersistenceFactory
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
import za.co.absa.spline.common.FutureImplicits._
import scala.concurrent.Future
import za.co.absa.spline.persistence.api.PersistenceWriterFactory

/**
* The class represents a handler listening on events that Spark triggers when an execution any action is performed. It can be considered as an entry point to Spline library.
*
* @param dataStorageFactory A factory of persistence layers
* @param persistenceWriterFactory A factory of persistence writers
* @param hadoopConfiguration A hadoop configuration
*/
class DataLineageListener(dataStorageFactory: PersistenceFactory, hadoopConfiguration: Configuration) extends QueryExecutionListener {
private lazy val dataLineagePersistor = dataStorageFactory.createDataLineagePersistor()
private lazy val executionPersistor = dataStorageFactory.createExecutionPersistor()
class DataLineageListener(persistenceWriterFactory: PersistenceWriterFactory, hadoopConfiguration: Configuration) extends QueryExecutionListener {
private lazy val persistenceWriter = persistenceWriterFactory.createDataLineageWriter()
private lazy val harvester = new DataLineageHarvester(hadoopConfiguration)

/**
* The method is executed when an action execution is successful.
Expand All @@ -59,16 +55,8 @@ class DataLineageListener(dataStorageFactory: PersistenceFactory, hadoopConfigur

private def processQueryExecution(funcName: String, qe: QueryExecution): Unit = {
if (funcName == "save") {
val lineage = DataLineageHarvester.harvestLineage(qe, hadoopConfiguration)
dataLineagePersistor.exists(lineage).flatMap( lineageIdOption =>
lineageIdOption match
{
case None => dataLineagePersistor.store(lineage).map(_ => lineage.id)
case Some(x) => Future.successful(x)
}).flatMap(lineageId => {
val execution = Execution(UUID.randomUUID(), lineageId, qe.sparkSession.sparkContext.applicationId, System.currentTimeMillis())
executionPersistor.store(execution)
})
}
val lineage = harvester harvestLineage qe
persistenceWriter store lineage
}
}
}
11 changes: 5 additions & 6 deletions core/src/main/scala/za/co/absa/spline/core/DataTypeMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package za.co.absa.spline.core

import za.co.absa.spline.model.StructField
import za.co.absa.spline.model
import org.apache.spark.sql
import za.co.absa.spline.model.dt._

/**
* The trait represents a mapper translating Spark data types to data types specified by Spline library.
Expand All @@ -32,9 +31,9 @@ trait DataTypeMapper {
* @param nullable A flag specifying whether result data type will be nullable or not
* @return A Spline data type
*/
def fromSparkDataType(sparkDataType: sql.types.DataType, nullable: Boolean): model.DataType = sparkDataType match {
case s: sql.types.StructType => model.StructType(s.fields.map(i => StructField(i.name, fromSparkDataType(i.dataType, i.nullable))), nullable)
case a: sql.types.ArrayType => model.ArrayType(fromSparkDataType(a.elementType, a.containsNull), nullable)
case x => model.SimpleType(x.typeName, nullable)
def fromSparkDataType(sparkDataType: sql.types.DataType, nullable: Boolean): DataType = sparkDataType match {
case s: sql.types.StructType => Struct(s.fields.map(i => StructField(i.name, fromSparkDataType(i.dataType, i.nullable))), nullable)
case a: sql.types.ArrayType => Array(fromSparkDataType(a.elementType, a.containsNull), nullable)
case x => Simple(x.typeName, nullable)
}
}
18 changes: 11 additions & 7 deletions core/src/main/scala/za/co/absa/spline/core/ExpressionMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,29 @@

package za.co.absa.spline.core

import za.co.absa.spline.model._
import org.apache.spark.sql.catalyst.expressions
import za.co.absa.spline.model._

import scala.language.implicitConversions

/**
* The trait represents a mapper translating Spark expressions to expressions specified by Spline library.
*/
trait ExpressionMapper extends DataTypeMapper {

val attributeFactory : AttributeFactory

/**
* The method translates a Spark expression to an expression specified by Spline library.
*
* @param sparkExpr An input Spark expression
* @return A Spline expression
*/
implicit def fromSparkExpression(sparkExpr: org.apache.spark.sql.catalyst.expressions.Expression): Expression = sparkExpr match {
case a: expressions.Alias => AliasExpression(a.name,a.simpleString, fromSparkDataType(a.dataType, a.nullable), a.children map fromSparkExpression)
case a: expressions.AttributeReference => AttributeReference(a.exprId.id, a.name, fromSparkDataType(a.dataType, a.nullable))
case bo: expressions.BinaryOperator => BinaryOperator(bo.nodeName, bo.symbol, bo.simpleString, fromSparkDataType(bo.dataType, bo.nullable), bo.children map fromSparkExpression)
case u: expressions.ScalaUDF => UserDefinedFunction(u.udfName getOrElse u.function.getClass.getName, u.simpleString, fromSparkDataType(u.dataType, u.nullable), u.children map fromSparkExpression)
case x => GenericExpression(x.nodeName, x.simpleString, fromSparkDataType(x.dataType, x.nullable), x.children map fromSparkExpression)
implicit def fromSparkExpression(sparkExpr: org.apache.spark.sql.catalyst.expressions.Expression): expr.Expression = sparkExpr match {
case a: expressions.Alias => expr.Alias(a.name, a.simpleString, fromSparkDataType(a.dataType, a.nullable), a.children map fromSparkExpression)
case a: expressions.AttributeReference => expr.AttributeReference(attributeFactory.getOrCreate(a.exprId.id, a.name, a.dataType, a.nullable), a.name, fromSparkDataType(a.dataType, a.nullable))
case bo: expressions.BinaryOperator => expr.Binary(bo.nodeName, bo.symbol, bo.simpleString, fromSparkDataType(bo.dataType, bo.nullable), bo.children map fromSparkExpression)
case u: expressions.ScalaUDF => expr.UserDefinedFunction(u.udfName getOrElse u.function.getClass.getName, u.simpleString, fromSparkDataType(u.dataType, u.nullable), u.children map fromSparkExpression)
case x => expr.Generic(x.nodeName, x.simpleString, fromSparkDataType(x.dataType, x.nullable), x.children map fromSparkExpression)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2017 Barclays Africa Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.core

import java.util.UUID
import java.util.UUID.randomUUID

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import za.co.absa.spline.model.{MetaDataset, Schema}

import scala.collection.mutable.{ListBuffer, Map}

/**
* The class is responsible for creation of [[za.co.absa.spline.model.MetaDataset meta data sets]] and assigning them unique identifiers.
* @param attributeFactory An attribute factory
*/
class MetaDatasetFactory(val attributeFactory: AttributeFactory) {
private val datasets : Map[UUID,MetaDataset] = Map()
private val allDatasets : ListBuffer[MetaDataset] = ListBuffer[MetaDataset]()

/**
* The method crates a meta data set for a specific Spark operation and returns its identifier.
* @param operation A Spark operation
* @return An identifier of created meta data set
*/
def create(operation: LogicalPlan) : UUID = datasets.synchronized{
val attributeIds = operation.output.map(i => attributeFactory.getOrCreate(i.exprId.id, i.name, i.dataType, i.nullable))
val metaDataset = MetaDataset(randomUUID, Schema(attributeIds))
datasets.put(metaDataset.id, metaDataset)
allDatasets += metaDataset
metaDataset.id
}

/**
* The method returns a meta data set for a specific identifier if has already been created by the factory. Otherwise, returns None.
* @param id An identifier of the meta data set
* @return An option
*/
def getById(id: UUID) : Option[MetaDataset] = datasets.synchronized(datasets.get(id))

/**
* The method returns all meta data sets created by the factory.
* @return A sequence of meta data sets
*/
def getAll() : Seq[MetaDataset] = datasets.synchronized(allDatasets)
}
Loading

0 comments on commit 13b1de7

Please sign in to comment.