From 7f7ea2afcebf2f0b7be102fec6421ee5f3e3770a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 5 May 2015 19:56:09 -0700 Subject: [PATCH 1/5] [SPARK-7388][SPARK-7383] wrapper for VectorAssembler in Python --- .../spark/ml/feature/VectorAssembler.scala | 2 +- .../org/apache/spark/ml/param/params.scala | 19 ++++++++- .../ml/param/shared/SharedParamsCodeGen.scala | 1 + .../spark/ml/param/shared/sharedParams.scala | 2 +- python/pyspark/ml/feature.py | 41 ++++++++++++++++++- python/pyspark/ml/param/shared.py | 29 +++++++++++++ python/pyspark/ml/wrapper.py | 11 ++--- 7 files changed, 96 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala index 8f2e62a8e2081..b5a69cee6daf3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types._ /** * :: AlphaComponent :: - * A feature transformer than merge multiple columns into a vector column. + * A feature transformer that merges multiple columns into a vector column. */ @AlphaComponent class VectorAssembler extends Transformer with HasInputCols with HasOutputCol { diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 51ce19d29cd29..0579ba3623a64 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -22,6 +22,7 @@ import java.util.NoSuchElementException import scala.annotation.varargs import scala.collection.mutable +import scala.reflect.ClassTag import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.util.Identifiable @@ -218,6 +219,18 @@ class BooleanParam(parent: Params, name: String, doc: String) // No need for isV override def w(value: Boolean): ParamPair[Boolean] = super.w(value) } +/** Specialized version of [[Param[Array[T]]]] for Java. */ +class ArrayParam[T : ClassTag](parent: Params, name: String, doc: String, isValid: Array[T] => Boolean) + extends Param[Array[T]](parent, name, doc, isValid) { + + def this(parent: Params, name: String, doc: String) = + this(parent, name, doc, ParamValidators.alwaysTrue) + + override def w(value: Array[T]): ParamPair[Array[T]] = super.w(value) + + private[param] def wCast(value: Seq[T]): ParamPair[Array[T]] = w(value.toArray) +} + /** * A param amd its value. */ @@ -311,7 +324,11 @@ trait Params extends Identifiable with Serializable { */ protected final def set[T](param: Param[T], value: T): this.type = { shouldOwn(param) - paramMap.put(param.asInstanceOf[Param[Any]], value) + if (param.isInstanceOf[ArrayParam[_]] && value.isInstanceOf[Seq[_]]) { + paramMap.put(param.asInstanceOf[ArrayParam[Any]].wCast(value.asInstanceOf[Seq[Any]])) + } else { + paramMap.put(param.w(value)) + } this } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index d379172e0bf53..ae0950653d8dc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -83,6 +83,7 @@ private[shared] object SharedParamsCodeGen { case _ if c == classOf[Float] => "FloatParam" case _ if c == classOf[Double] => "DoubleParam" case _ if c == classOf[Boolean] => "BooleanParam" + case _ if c.isArray => s"ArrayParam[${getTypeString(c.getComponentType)}]" case _ => s"Param[${getTypeString(c)}]" } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index fb1874ccfc8dc..736374114b82c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -178,7 +178,7 @@ private[ml] trait HasInputCols extends Params { * Param for input column names. * @group param */ - final val inputCols: Param[Array[String]] = new Param[Array[String]](this, "inputCols", "input column names") + final val inputCols: ArrayParam[String] = new ArrayParam[String](this, "inputCols", "input column names") /** @group getParam */ final def getInputCols: Array[String] = $(inputCols) diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 4e4614b859ac6..e2fcc18c4774d 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -16,7 +16,7 @@ # from pyspark.rdd import ignore_unicode_prefix -from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasNumFeatures +from pyspark.ml.param.shared import HasInputCol, HasInputCols, HasOutputCol, HasNumFeatures from pyspark.ml.util import keyword_only from pyspark.ml.wrapper import JavaTransformer from pyspark.mllib.common import inherit_doc @@ -112,6 +112,45 @@ def setParams(self, numFeatures=1 << 18, inputCol=None, outputCol=None): return self._set(**kwargs) +@inherit_doc +class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol): + """ + A feature transformer that merges multiple columns into a vector column. + + >>> from pyspark.sql import Row + >>> df = sc.parallelize([Row(a=1, b=0, c=3)]).toDF() + >>> vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features") + >>> vecAssembler.transform(df).head().features + SparseVector(3, {0: 1.0, 2: 3.0}) + >>> vecAssembler.setParams(outputCol="freqs").transform(df).head().freqs + SparseVector(3, {0: 1.0, 2: 3.0}) + >>> params = {vecAssembler.inputCols: ["b", "a"], vecAssembler.outputCol: "vector"} + >>> vecAssembler.transform(df, params).head().vector + SparseVector(2, {1: 1.0}) + """ + + _java_class = "org.apache.spark.ml.feature.VectorAssembler" + + @keyword_only + def __init__(self, inputCols=None, outputCol=None): + """ + __init__(self, inputCols=None, outputCol=None) + """ + super(VectorAssembler, self).__init__() + self._setDefault() + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + @keyword_only + def setParams(self, inputCols=None, outputCol=None): + """ + setParams(self, inputCols=None, outputCol=None) + Sets params for this VectorAssembler. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + if __name__ == "__main__": import doctest from pyspark.context import SparkContext diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 4f243844f8caa..aaf80f00085bf 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -223,6 +223,35 @@ def getInputCol(self): return self.getOrDefault(self.inputCol) +class HasInputCols(Params): + """ + Mixin for param inputCols: input column names. + """ + + # a placeholder to make it appear in the generated doc + inputCols = Param(Params._dummy(), "inputCols", "input column names") + + def __init__(self): + super(HasInputCols, self).__init__() + #: param for input column names + self.inputCols = Param(self, "inputCols", "input column names") + if None is not None: + self._setDefault(inputCols=None) + + def setInputCols(self, value): + """ + Sets the value of :py:attr:`inputCols`. + """ + self.paramMap[self.inputCols] = value + return self + + def getInputCols(self): + """ + Gets the value of inputCols or its default value. + """ + return self.getOrDefault(self.inputCols) + + class HasOutputCol(Params): """ Mixin for param outputCol: output column name. diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 73741c4b40dfb..cf31c6266e09c 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -67,7 +67,10 @@ def _transfer_params_to_java(self, params, java_obj): paramMap = self.extractParamMap(params) for param in self.params: if param in paramMap: - java_obj.set(param.name, paramMap[param]) + value = paramMap[param] + if isinstance(value, list): + value = _jvm().PythonUtils.toSeq(value) + java_obj.set(param.name, value) def _empty_java_param_map(self): """ @@ -126,10 +129,8 @@ class JavaTransformer(Transformer, JavaWrapper): def transform(self, dataset, params={}): java_obj = self._java_obj() - self._transfer_params_to_java({}, java_obj) - java_param_map = self._create_java_param_map(params, java_obj) - return DataFrame(java_obj.transform(dataset._jdf, java_param_map), - dataset.sql_ctx) + self._transfer_params_to_java(params, java_obj) + return DataFrame(java_obj.transform(dataset._jdf), dataset.sql_ctx) @inherit_doc From 39ecb0794f015410256bc5778f0e77343abd6c0f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 5 May 2015 20:08:40 -0700 Subject: [PATCH 2/5] fix scalastyle --- mllib/src/main/scala/org/apache/spark/ml/param/params.scala | 6 +++++- python/pyspark/ml/feature.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 0579ba3623a64..5c02c82659f5b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -220,7 +220,11 @@ class BooleanParam(parent: Params, name: String, doc: String) // No need for isV } /** Specialized version of [[Param[Array[T]]]] for Java. */ -class ArrayParam[T : ClassTag](parent: Params, name: String, doc: String, isValid: Array[T] => Boolean) +class ArrayParam[T : ClassTag]( + parent: Params, + name: String, + doc: String, + isValid: Array[T] => Boolean) extends Param[Array[T]](parent, name, doc, isValid) { def this(parent: Params, name: String, doc: String) = diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index e2fcc18c4774d..8a0fdddd2d9b5 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -21,7 +21,7 @@ from pyspark.ml.wrapper import JavaTransformer from pyspark.mllib.common import inherit_doc -__all__ = ['Tokenizer', 'HashingTF'] +__all__ = ['Tokenizer', 'HashingTF', 'VectorAssembler'] @inherit_doc From 99c2ebf044f71a8cc90495a0a8656e0212141bca Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 5 May 2015 20:16:50 -0700 Subject: [PATCH 3/5] add to python_shared_params --- python/pyspark/ml/param/_shared_params_code_gen.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index c71c823db2c81..c1c8e921dda87 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -95,6 +95,7 @@ def get$Name(self): ("predictionCol", "prediction column name", "'prediction'"), ("rawPredictionCol", "raw prediction column name", "'rawPrediction'"), ("inputCol", "input column name", None), + ("inputCols", "input column names", None), ("outputCol", "output column name", None), ("numFeatures", "number of features", None)] code = [] From c81072dabdaaf9b9ce6fb08c764f302f639c273c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 6 May 2015 10:05:11 -0700 Subject: [PATCH 4/5] addressed comments --- .../scala/org/apache/spark/ml/param/params.scala | 16 ++++++---------- .../ml/param/shared/SharedParamsCodeGen.scala | 2 +- .../spark/ml/param/shared/sharedParams.scala | 2 +- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 5c02c82659f5b..6525a5a9aee52 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -220,19 +220,15 @@ class BooleanParam(parent: Params, name: String, doc: String) // No need for isV } /** Specialized version of [[Param[Array[T]]]] for Java. */ -class ArrayParam[T : ClassTag]( - parent: Params, - name: String, - doc: String, - isValid: Array[T] => Boolean) - extends Param[Array[T]](parent, name, doc, isValid) { +class StringArrayParam(parent: Params, name: String, doc: String, isValid: Array[String] => Boolean) + extends Param[Array[String]](parent, name, doc, isValid) { def this(parent: Params, name: String, doc: String) = this(parent, name, doc, ParamValidators.alwaysTrue) - override def w(value: Array[T]): ParamPair[Array[T]] = super.w(value) + override def w(value: Array[String]): ParamPair[Array[String]] = super.w(value) - private[param] def wCast(value: Seq[T]): ParamPair[Array[T]] = w(value.toArray) + private[param] def wCast(value: Seq[String]): ParamPair[Array[String]] = w(value.toArray) } /** @@ -328,8 +324,8 @@ trait Params extends Identifiable with Serializable { */ protected final def set[T](param: Param[T], value: T): this.type = { shouldOwn(param) - if (param.isInstanceOf[ArrayParam[_]] && value.isInstanceOf[Seq[_]]) { - paramMap.put(param.asInstanceOf[ArrayParam[Any]].wCast(value.asInstanceOf[Seq[Any]])) + if (param.isInstanceOf[StringArrayParam] && value.isInstanceOf[Seq[_]]) { + paramMap.put(param.asInstanceOf[StringArrayParam].wCast(value.asInstanceOf[Seq[String]])) } else { paramMap.put(param.w(value)) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index ae0950653d8dc..aaa944a19782a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -83,7 +83,7 @@ private[shared] object SharedParamsCodeGen { case _ if c == classOf[Float] => "FloatParam" case _ if c == classOf[Double] => "DoubleParam" case _ if c == classOf[Boolean] => "BooleanParam" - case _ if c.isArray => s"ArrayParam[${getTypeString(c.getComponentType)}]" + case _ if c.isArray && c.getComponentType == classOf[String] => s"StringArrayParam" case _ => s"Param[${getTypeString(c)}]" } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 736374114b82c..054a0123dc5b3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -178,7 +178,7 @@ private[ml] trait HasInputCols extends Params { * Param for input column names. * @group param */ - final val inputCols: ArrayParam[String] = new ArrayParam[String](this, "inputCols", "input column names") + final val inputCols: StringArrayParam = new StringArrayParam(this, "inputCols", "input column names") /** @group getParam */ final def getInputCols: Array[String] = $(inputCols) From c221db9a710c45e1ede0b0cdea623422639677d6 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 7 May 2015 01:07:49 -0700 Subject: [PATCH 5/5] overload StringArrayParam.w --- .../org/apache/spark/ml/param/params.scala | 22 +++++++++++-------- python/pyspark/ml/wrapper.py | 8 +++---- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 6525a5a9aee52..dd1f4a1759568 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -22,7 +22,7 @@ import java.util.NoSuchElementException import scala.annotation.varargs import scala.collection.mutable -import scala.reflect.ClassTag +import scala.collection.JavaConverters._ import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.util.Identifiable @@ -228,7 +228,8 @@ class StringArrayParam(parent: Params, name: String, doc: String, isValid: Array override def w(value: Array[String]): ParamPair[Array[String]] = super.w(value) - private[param] def wCast(value: Seq[String]): ParamPair[Array[String]] = w(value.toArray) + /** Creates a param pair with a [[java.util.List]] of values (for Java and Python). */ + def w(value: java.util.List[String]): ParamPair[Array[String]] = w(value.asScala.toArray) } /** @@ -323,13 +324,7 @@ trait Params extends Identifiable with Serializable { * Sets a parameter in the embedded param map. */ protected final def set[T](param: Param[T], value: T): this.type = { - shouldOwn(param) - if (param.isInstanceOf[StringArrayParam] && value.isInstanceOf[Seq[_]]) { - paramMap.put(param.asInstanceOf[StringArrayParam].wCast(value.asInstanceOf[Seq[String]])) - } else { - paramMap.put(param.w(value)) - } - this + set(param -> value) } /** @@ -339,6 +334,15 @@ trait Params extends Identifiable with Serializable { set(getParam(param), value) } + /** + * Sets a parameter in the embedded param map. + */ + protected final def set(paramPair: ParamPair[_]): this.type = { + shouldOwn(paramPair.param) + paramMap.put(paramPair) + this + } + /** * Optionally returns the user-supplied value of a param. */ diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index cf31c6266e09c..6f9cd9837befe 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -68,9 +68,8 @@ def _transfer_params_to_java(self, params, java_obj): for param in self.params: if param in paramMap: value = paramMap[param] - if isinstance(value, list): - value = _jvm().PythonUtils.toSeq(value) - java_obj.set(param.name, value) + java_param = java_obj.getParam(param.name) + java_obj.set(java_param.w(value)) def _empty_java_param_map(self): """ @@ -82,7 +81,8 @@ def _create_java_param_map(self, params, java_obj): paramMap = self._empty_java_param_map() for param, value in params.items(): if param.parent is self: - paramMap.put(java_obj.getParam(param.name), value) + java_param = java_obj.getParam(param.name) + paramMap.put(java_param.w(value)) return paramMap