Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc issue fixes #581

Merged
merged 1 commit into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/scala/com/linkedin/feathr/common/Types.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class Header(val featureInfoMap: Map[TaggedFeatureName, FeatureInfo])
* @param columnName the column in the dataframe that represents this feature
* @param featureType the feature type of this feature
*/
class FeatureInfo(val columnName: String, val featureType: FeatureTypes) {
class FeatureInfo(val columnName: String, val featureType: FeatureTypeConfig) {
override def toString: String = s"columnName: $columnName; featureType: $featureType"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[offline] class SimpleConfigurableAnchorExtractor( @JsonProperty("key") k

@transient private lazy val parserContext = MvelContext.newParserContext

private val keyExpression = key.map(k => MVEL.compileExpression(k, parserContext))
private val keyExpression = if (key == null) Seq() else key.map(k => MVEL.compileExpression(k, parserContext))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the meaning of the Seq() case here? Can you explain it in a quick comment in the code


/*
* Create a map of FeatureRef string to (MVEL expression, optional FeatureType) tuple.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,27 +368,27 @@ object DataFrameColName {
inferredFeatureTypeConfigs: Map[String, FeatureTypeConfig]): Header = {
// generate a map of feature name to its feature type
// if the feature type is unspecified in the anchor config, we will use FeatureTypes.UNSPECIFIED
val anchoredFeatureTypes: Map[String, FeatureTypes] = allAnchoredFeatures.map {
val anchoredFeatureTypes: Map[String, FeatureTypeConfig] = allAnchoredFeatures.map {
case (featureName, anchorWithSource) =>
val featureTypeOpt = anchorWithSource.featureAnchor.getFeatureTypes.map(types => {
// Get the actual type in the output dataframe, the type is inferred and stored previously, if not specified by users
val inferredType = inferredFeatureTypeConfigs.getOrElse(featureName, FeatureTypeConfig.UNDEFINED_TYPE_CONFIG)
val fType = types.getOrElse(featureName, FeatureTypes.UNSPECIFIED)
if (fType == FeatureTypes.UNSPECIFIED) inferredType.getFeatureType else fType
val fType = new FeatureTypeConfig(types.getOrElse(featureName, FeatureTypes.UNSPECIFIED))
if (fType == FeatureTypeConfig.UNDEFINED_TYPE_CONFIG) inferredType else fType
})
val featureType = featureTypeOpt.getOrElse(FeatureTypes.UNSPECIFIED)
val featureType = featureTypeOpt.getOrElse(FeatureTypeConfig.UNDEFINED_TYPE_CONFIG)
featureName -> featureType
}

val derivedFeatureTypes: Map[String, FeatureTypes] = allDerivedFeatures.flatMap {
val derivedFeatureTypes: Map[String, FeatureTypeConfig] = allDerivedFeatures.flatMap {
case (_, derivedFeature) =>
derivedFeature.getFeatureTypes
derivedFeature.featureTypeConfigs
}
val allFeatureTypes = inferredFeatureTypeConfigs.map(x => (x._1, x._2.getFeatureType)) ++ derivedFeatureTypes ++ anchoredFeatureTypes
val allFeatureTypes = inferredFeatureTypeConfigs.map(x => (x._1, x._2)) ++ derivedFeatureTypes ++ anchoredFeatureTypes
val featuresInfo = featureToColumnNameMap.map {
case (taggedFeatureName, columnName) =>
val featureInfo = new FeatureInfo(columnName, allFeatureTypes.getOrElse(taggedFeatureName.getFeatureName,
FeatureTypes.UNSPECIFIED))
FeatureTypeConfig.UNDEFINED_TYPE_CONFIG))
taggedFeatureName -> featureInfo
}
new Header(featuresInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
val sparkConf = sparkSession.sparkContext.getConf
FeathrUtils.enableDebugLogging(sparkConf)

val (joinedDF, _) = doJoinObsAndFeatures(joinConfig, jobContext, obsData.data)
SparkFeaturizedDataset(joinedDF, FeaturizedDatasetMetadata())
val (joinedDF, header) = doJoinObsAndFeatures(joinConfig, jobContext, obsData.data)
SparkFeaturizedDataset(joinedDF, FeaturizedDatasetMetadata(header=Some(header)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,19 @@ private[offline] class AnchorLoader extends JsonDeserializer[FeatureAnchor] {
new MVELSourceKeyExtractor(anchorExtractor)
}
case None =>
if (!anchorExtractorBase.isInstanceOf[AnchorExtractorBase[_]]) {
throw new FeathrException(
ErrorLabel.FEATHR_USER_ERROR,
s"In ${node}, ${anchorExtractorBase} with no key and no keyExtractor must be extends AnchorExtractorBase")
}
val keyAlias = FeathrConfigLoader.extractStringListOpt(node.get(KEY_ALIAS))
val anchorExtractor = anchorExtractorBase.asInstanceOf[AnchorExtractor[Any]]
val anchorExtractor = if (!anchorExtractorBase.isInstanceOf[AnchorExtractorBase[_]]) {
FeathrUdfPluginContext.getRegisteredUdfAdaptor(anchorExtractorBase.getClass) match {
case Some(adaptor: AnchorExtractorAdaptor) =>
adaptor.adaptUdf(anchorExtractorBase).asInstanceOf[AnchorExtractor[Any]]
case _ =>
throw new FeathrException(
ErrorLabel.FEATHR_USER_ERROR,
s"In ${node}, ${anchorExtractorBase} with no key and no keyExtractor must be extends AnchorExtractorBase")
}
} else {
anchorExtractorBase.asInstanceOf[AnchorExtractor[Any]]
}
new MVELSourceKeyExtractor(anchorExtractor, keyAlias)
}
}
Expand Down Expand Up @@ -571,7 +577,7 @@ private[offline] class DerivationLoader extends JsonDeserializer[DerivedFeature]
val consumedFeatures = config.inputs.map(x => ErasedEntityTaggedFeature(x.key.map(config.key.zipWithIndex.toMap), x.feature)).toIndexedSeq

// consumedFeatures and parameterNames have same order, since they are all from config.inputs
DerivedFeature(consumedFeatures, producedFeatures, derivationFunction, config.parameterNames, featureTypeConfigMap)
DerivedFeature(consumedFeatures, producedFeatures, maybeAdaptedDerivationFunction, config.parameterNames, featureTypeConfigMap)
} else if (x.has("join")) { // when the derived feature config is a seqJoin config
val config = codec.treeToValue(x, classOf[SeqJoinFeatureConfig])
if (config.aggregation.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
package com.linkedin.feathr.offline.util

case class FeaturizedDatasetMetadata(meta: Map[String, String] = Map()) {
import com.linkedin.feathr.common.Header
/**
* The metadata for FeaturizedDataset
* @param meta extra metadata
* @param header feature type header info
*/
case class FeaturizedDatasetMetadata(meta: Map[String, String] = Map(), header: Option[Header] = None ) {
Comment on lines +4 to +9
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be more docs about what this means; what a featurized-dataset is? (Do we have docs about it in this public version of feathr? Should we just call it as feathr's output dataset that includes info about the tensor types of joined features...?)

Also, what does "extra metadata" mean; what kinds of things is the map allowed/expected to contain, what are the keys, values, etc.

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
package com.linkedin.feathr.offline

import com.linkedin.feathr.common.FeatureTypes
import com.linkedin.feathr.offline.anchored.keyExtractor.AlienSourceKeyExtractorAdaptor
import com.linkedin.feathr.offline.client.plugins.FeathrUdfPluginContext
import com.linkedin.feathr.offline.derived.AlienDerivationFunctionAdaptor
import com.linkedin.feathr.offline.mvel.plugins.FeathrMvelPluginContext
import com.linkedin.feathr.offline.plugins.{AlienFeatureValue, AlienFeatureValueTypeAdaptor}
import com.linkedin.feathr.offline.util.FeathrTestUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{FloatType, StringType, StructField, StructType}
import org.testng.Assert.assertEquals
import org.testng.annotations.Test

class TestFeathrUdfPlugins extends FeathrIntegTest {
Expand All @@ -11,12 +19,13 @@ class TestFeathrUdfPlugins extends FeathrIntegTest {
@Test
def testMvelUdfPluginSupport: Unit = {
FeathrMvelPluginContext.addFeatureTypeAdaptor(classOf[AlienFeatureValue], new AlienFeatureValueTypeAdaptor())

FeathrUdfPluginContext.registerUdfAdaptor(new AlienDerivationFunctionAdaptor())
FeathrUdfPluginContext.registerUdfAdaptor(new AlienSourceKeyExtractorAdaptor())
val df = runLocalFeatureJoinForTest(
joinConfigAsString = """
| features: {
| key: a_id
| featureList: ["f1", "f2", "f3", "f4", "f5", "f6", "f7"]
| featureList: ["f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "fA"]
| }
""".stripMargin,
featureDefAsString = s"""
Expand Down Expand Up @@ -54,6 +63,17 @@ class TestFeathrUdfPlugins extends FeathrIntegTest {
| }
| }
| }
| anchor2: {
| source: "anchor1-source.csv"
| keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.AlienSampleKeyExtractor"
| features: {
| fA: {
| def: cast_float(beta)
| type: NUMERIC
| default: 0
| }
| }
| }
|}
|
|derivations: {
Expand All @@ -79,39 +99,39 @@ class TestFeathrUdfPlugins extends FeathrIntegTest {
| AlienFeatureValueMvelUDFs.lowercase_string_afv(f4);
| $MULTILINE_QUOTE
| }
| f8: {
| key: ["mId"]
| inputs: [{ key: "mId", feature: "f6" }]
| class: "com.linkedin.feathr.offline.derived.SampleAlienFeatureDerivationFunction"
| type: NUMERIC
| }
|}
""".stripMargin,
observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv")

df.data.show()
val f8Type = df.fdsMetadata.header.get.featureInfoMap.filter(_._1.getFeatureName == "f8").head._2.featureType.getFeatureType
assertEquals(f8Type, FeatureTypes.NUMERIC)

val selectedColumns = Seq("a_id", "fA")
val filteredDf = df.data.select(selectedColumns.head, selectedColumns.tail: _*)

// TODO UPDATE THE EXPECTED DF BELOW
// val selectedColumns = Seq("a_id", "featureWithNull")
// val filteredDf = df.data.select(selectedColumns.head, selectedColumns.tail: _*)
//
// val expectedDf = ss.createDataFrame(
// ss.sparkContext.parallelize(
// Seq(
// Row(
// // a_id
// "1",
// // featureWithNull
// 1.0f),
// Row(
// // a_id
// "2",
// // featureWithNull
// 0.0f),
// Row(
// // a_id
// "3",
// // featureWithNull
// 3.0f))),
// StructType(
// List(
// StructField("a_id", StringType, true),
// StructField("featureWithNull", FloatType, true))))
// def cmpFunc(row: Row): String = row.get(0).toString
// FeathrTestUtils.assertDataFrameApproximatelyEquals(filteredDf, expectedDf, cmpFunc)
hangfei marked this conversation as resolved.
Show resolved Hide resolved
val expectedDf = ss.createDataFrame(
ss.sparkContext.parallelize(
Seq(
Row(
"1",
10.0f),
Row(
"2",
10.0f),
Row(
"3",
10.0f))),
StructType(
List(
StructField("a_id", StringType, true),
StructField("fA", FloatType, true))))
def cmpFunc(row: Row): String = row.get(0).toString
FeathrTestUtils.assertDataFrameApproximatelyEquals(filteredDf, expectedDf, cmpFunc)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.linkedin.feathr.offline.anchored.keyExtractor

class AlienSampleKeyExtractor extends AlienSourceKeyExtractor {
override def getKey(): Seq[String] = Seq("1")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.linkedin.feathr.offline.anchored.keyExtractor

trait AlienSourceKeyExtractor extends Serializable {
def getKey(): Seq[String]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linkedin.feathr.offline.anchored.keyExtractor

import com.linkedin.feathr.offline.client.plugins.SourceKeyExtractorAdaptor
import com.linkedin.feathr.sparkcommon.SourceKeyExtractor
import org.apache.spark.sql.DataFrame

class AlienSourceKeyExtractorAdaptor extends SourceKeyExtractorAdaptor {
/**
* Indicates whether this adaptor can be applied to an object of the provided class.
*
* Implementations should usually look like <pre>classOf[UdfTraitThatIsNotPartOfFeathr].isAssignableFrom(clazz)</pre>
*
* @param clazz some external UDF type
* @return true if this adaptor can "adapt" the given class type; false otherwise
*/
override def canAdapt(clazz: Class[_]): Boolean = classOf[AlienSourceKeyExtractor].isAssignableFrom(clazz)

/**
* Returns an instance of a Feathr UDF, that follows the behavior of some external UDF instance, e.g. via delegation.
*
* @param externalUdf instance of the "external" UDF
* @return the Feathr UDF
*/
override def adaptUdf(externalUdf: AnyRef): SourceKeyExtractor = new AlienSourceKeyExtractorWrapper(externalUdf.asInstanceOf[AlienSourceKeyExtractor])

/**
* Wrap Alien SourceKeyExtractor as Feathr SourceKeyExtractor
*/
private class AlienSourceKeyExtractorWrapper(keyExtractor: AlienSourceKeyExtractor) extends SourceKeyExtractor{
override def getKeyColumnNames(datum: Option[Any]): Seq[String] = Seq("mId")

override def appendKeyColumns(dataFrame: DataFrame): DataFrame = {
keyExtractor.getKey()
dataFrame
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.linkedin.feathr.offline.derived

import com.linkedin.feathr.common.{FeatureDerivationFunction, FeatureValue}
import com.linkedin.feathr.offline.client.plugins.FeatureDerivationFunctionAdaptor
import com.linkedin.feathr.offline.plugins.AlienFeatureValue

class AlienDerivationFunctionAdaptor extends FeatureDerivationFunctionAdaptor {
/**
* Indicates whether this adaptor can be applied to an object of the provided class.
*
* Implementations should usually look like <pre>classOf[UdfTraitThatIsNotPartOfFeathr].isAssignableFrom(clazz)</pre>
*
* @param clazz some external UDF type
* @return true if this adaptor can "adapt" the given class type; false otherwise
*/
override def canAdapt(clazz: Class[_]): Boolean = classOf[AlienFeatureDerivationFunction].isAssignableFrom(clazz)

/**
* Returns an instance of a Feathr UDF, that follows the behavior of some external UDF instance, e.g. via delegation.
*
* @param externalUdf instance of the "external" UDF
* @return the Feathr UDF
*/
override def adaptUdf(externalUdf: AnyRef): FeatureDerivationFunction =
new AlienFeatureDerivationFunctionWrapper(externalUdf.asInstanceOf[AlienFeatureDerivationFunction])

/**
* Wrap Alien FeatureDerivationFunction as Feathr FeatureDerivationFunction
*/
private[derived] class AlienFeatureDerivationFunctionWrapper(derived: AlienFeatureDerivationFunction) extends FeatureDerivationFunction {
override def getFeatures(inputs: Seq[Option[FeatureValue]]): Seq[Option[FeatureValue]] = {
derived.getFeatures(Seq(Some(AlienFeatureValue.fromFloat(1.0f))))
Seq(Some(FeatureValue.createNumeric(1.0f)))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.feathr.offline.derived

import com.linkedin.feathr.offline.plugins.AlienFeatureValue
/*
* Sample Alien FeatureDerivationFunction interface that can be adapted into Feathr FeatureDerivationFunction
*/
abstract class AlienFeatureDerivationFunction extends Serializable {
def getFeatures(inputs: Seq[Option[AlienFeatureValue]]): Seq[Option[AlienFeatureValue]]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.linkedin.feathr.offline.derived
import com.linkedin.feathr.offline.plugins.AlienFeatureValue

class SampleAlienFeatureDerivationFunction extends AlienFeatureDerivationFunction {
override def getFeatures(inputs: Seq[Option[AlienFeatureValue]]): Seq[Option[AlienFeatureValue]] =
Seq(Some(AlienFeatureValue.fromFloat(1.0f)))
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.feathr.offline.generation

import com.linkedin.feathr.common.types.protobuf.FeatureValueOuterClass
import com.linkedin.feathr.common.{FeatureInfo, FeatureTypes, Header, TaggedFeatureName}
import com.linkedin.feathr.common.{FeatureInfo, FeatureTypeConfig, FeatureTypes, Header, TaggedFeatureName}
import com.linkedin.feathr.offline.generation.outputProcessor.RedisOutputUtils
import com.linkedin.feathr.offline.{AssertFeatureUtils, TestFeathr}
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -52,13 +52,13 @@ class TestPushToRedisOutputProcessor extends TestFeathr with MockitoSugar {

rawDf.show()
val featureInfoMap = Map(
new TaggedFeatureName("", "__feathr_feature_f") -> new FeatureInfo("__feathr_feature_f", FeatureTypes.NUMERIC),
new TaggedFeatureName("", "__feathr_feature_f2") -> new FeatureInfo("__feathr_feature_f2", FeatureTypes.NUMERIC),
new TaggedFeatureName("", "__feathr_feature_g") -> new FeatureInfo("__feathr_feature_g", FeatureTypes.CATEGORICAL),
new TaggedFeatureName("", "__feathr_feature_h") -> new FeatureInfo("__feathr_feature_h", FeatureTypes.BOOLEAN),
new TaggedFeatureName("", "__feathr_feature_j") -> new FeatureInfo("__feathr_feature_j", FeatureTypes.TENSOR),
new TaggedFeatureName("", "__feathr_feature_sparse1") -> new FeatureInfo("__feathr_feature_sparse1", FeatureTypes.TENSOR),
new TaggedFeatureName("", "__feathr_feature_sparse2") -> new FeatureInfo("__feathr_feature_sparse2", FeatureTypes.TENSOR),
new TaggedFeatureName("", "__feathr_feature_f") -> new FeatureInfo("__feathr_feature_f", new FeatureTypeConfig(FeatureTypes.NUMERIC)),
new TaggedFeatureName("", "__feathr_feature_f2") -> new FeatureInfo("__feathr_feature_f2", new FeatureTypeConfig(FeatureTypes.NUMERIC)),
new TaggedFeatureName("", "__feathr_feature_g") -> new FeatureInfo("__feathr_feature_g", new FeatureTypeConfig(FeatureTypes.CATEGORICAL)),
new TaggedFeatureName("", "__feathr_feature_h") -> new FeatureInfo("__feathr_feature_h", new FeatureTypeConfig(FeatureTypes.BOOLEAN)),
new TaggedFeatureName("", "__feathr_feature_j") -> new FeatureInfo("__feathr_feature_j", new FeatureTypeConfig(FeatureTypes.TENSOR)),
new TaggedFeatureName("", "__feathr_feature_sparse1") -> new FeatureInfo("__feathr_feature_sparse1", new FeatureTypeConfig(FeatureTypes.TENSOR)),
new TaggedFeatureName("", "__feathr_feature_sparse2") -> new FeatureInfo("__feathr_feature_sparse2", new FeatureTypeConfig(FeatureTypes.TENSOR)),
)
val header = new Header(featureInfoMap)

Expand Down Expand Up @@ -136,7 +136,7 @@ class TestPushToRedisOutputProcessor extends TestFeathr with MockitoSugar {
expSchema)

val featureInfoMap = Map(
new TaggedFeatureName("", "__feathr_feature_sparse1") -> new FeatureInfo("__feathr_feature_sparse1", FeatureTypes.TENSOR),
new TaggedFeatureName("", "__feathr_feature_sparse1") -> new FeatureInfo("__feathr_feature_sparse1", new FeatureTypeConfig(FeatureTypes.TENSOR)),
)
val header = new Header(featureInfoMap)

Expand Down