diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 5fd3f73772767..61c065e711152 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -1125,6 +1125,7 @@ def __hash__(self): "pyspark.ml.tests.connect.test_parity_regression", "pyspark.ml.tests.connect.test_parity_clustering", "pyspark.ml.tests.connect.test_parity_evaluation", + "pyspark.ml.tests.connect.test_parity_feature", ], excluded_python_implementations=[ "PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala index 5a0ee9307ab8a..94548afb6c292 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala @@ -240,6 +240,8 @@ sealed trait Vector extends Serializable { @Since("2.0.0") object Vectors { + private[ml] val empty: Vector = zeros(0) + /** * Creates a dense vector from its values. */ diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator index 4046cca07dc0f..23f70521214d0 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator @@ -43,3 +43,9 @@ org.apache.spark.ml.recommendation.ALS # fpm org.apache.spark.ml.fpm.FPGrowth + + +# feature +org.apache.spark.ml.feature.StandardScaler +org.apache.spark.ml.feature.MaxAbsScaler +org.apache.spark.ml.feature.MinMaxScaler diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer index 7c10796f9a877..4b029ae610d76 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer @@ -41,3 +41,8 @@ org.apache.spark.ml.recommendation.ALSModel # fpm org.apache.spark.ml.fpm.FPGrowthModel + +# feature +org.apache.spark.ml.feature.StandardScalerModel +org.apache.spark.ml.feature.MaxAbsScalerModel +org.apache.spark.ml.feature.MinMaxScalerModel diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala index 1a378cd85f3e4..66dbabc6187e7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -107,6 +107,8 @@ class MaxAbsScalerModel private[ml] ( import MaxAbsScalerModel._ + private[ml] def this() = this(Identifiable.randomUID("maxAbsScal"), Vectors.empty) + /** @group setParam */ @Since("2.0.0") def setInputCol(value: String): this.type = set(inputCol, value) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index c311f4260424d..e3b0590524f3c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -154,6 +154,8 @@ class MinMaxScalerModel private[ml] ( import MinMaxScalerModel._ + private[ml] def this() = this(Identifiable.randomUID("minMaxScal"), Vectors.empty, Vectors.empty) + /** @group setParam */ @Since("1.5.0") def setInputCol(value: String): this.type = set(inputCol, value) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index f1e48b053d883..546463c158444 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -147,6 +147,8 @@ class StandardScalerModel private[ml] ( import StandardScalerModel._ + private[ml] def this() = this(Identifiable.randomUID("stdScal"), Vectors.empty, Vectors.empty) + /** @group setParam */ @Since("1.2.0") def setInputCol(value: String): this.type = set(inputCol, value) diff --git a/python/pyspark/ml/tests/connect/test_parity_feature.py b/python/pyspark/ml/tests/connect/test_parity_feature.py new file mode 100644 index 0000000000000..105ba07df43bf --- /dev/null +++ b/python/pyspark/ml/tests/connect/test_parity_feature.py @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from pyspark.ml.tests.test_feature import FeatureTestsMixin +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class FeatureParityTests(FeatureTestsMixin, ReusedConnectTestCase): + @unittest.skip("Need to support.") + def test_binarizer(self): + super().test_binarizer() + + @unittest.skip("Need to support.") + def test_idf(self): + super().test_idf() + + @unittest.skip("Need to support.") + def test_ngram(self): + super().test_ngram() + + @unittest.skip("Need to support.") + def test_stopwordsremover(self): + super().test_stopwordsremover() + + @unittest.skip("Need to support.") + def test_count_vectorizer_with_binary(self): + super().test_count_vectorizer_with_binary() + + @unittest.skip("Need to support.") + def test_count_vectorizer_with_maxDF(self): + super().test_count_vectorizer_with_maxDF() + + @unittest.skip("Need to support.") + def test_count_vectorizer_from_vocab(self): + super().test_count_vectorizer_from_vocab() + + @unittest.skip("Need to support.") + def test_rformula_force_index_label(self): + super().test_rformula_force_index_label() + + @unittest.skip("Need to support.") + def test_rformula_string_indexer_order_type(self): + super().test_rformula_string_indexer_order_type() + + @unittest.skip("Need to support.") + def test_string_indexer_handle_invalid(self): + super().test_string_indexer_handle_invalid() + + @unittest.skip("Need to support.") + def test_string_indexer_from_labels(self): + super().test_string_indexer_from_labels() + + @unittest.skip("Need to support.") + def test_target_encoder_binary(self): + super().test_target_encoder_binary() + + @unittest.skip("Need to support.") + def test_target_encoder_continuous(self): + super().test_target_encoder_continuous() + + @unittest.skip("Need to support.") + def test_vector_size_hint(self): + super().test_vector_size_hint() + + @unittest.skip("Need to support.") + def test_apply_binary_term_freqs(self): + super().test_apply_binary_term_freqs() + + +if __name__ == "__main__": + from pyspark.ml.tests.connect.test_parity_feature import * # noqa: F401 + + try: + import xmlrunner # type: ignore[import] + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/ml/tests/test_feature.py b/python/pyspark/ml/tests/test_feature.py index 92919adecd069..a46fdd22e2bcb 100644 --- a/python/pyspark/ml/tests/test_feature.py +++ b/python/pyspark/ml/tests/test_feature.py @@ -16,8 +16,11 @@ # limitations under the License. # +import tempfile import unittest +import numpy as np + from pyspark.ml.feature import ( Binarizer, CountVectorizer, @@ -26,6 +29,12 @@ IDF, NGram, RFormula, + StandardScaler, + StandardScalerModel, + MaxAbsScaler, + MaxAbsScalerModel, + MinMaxScaler, + MinMaxScalerModel, StopWordsRemover, StringIndexer, StringIndexerModel, @@ -38,7 +47,122 @@ from pyspark.testing.mlutils import check_params, SparkSessionTestCase -class FeatureTests(SparkSessionTestCase): +class FeatureTestsMixin: + def test_standard_scaler(self): + df = ( + self.spark.createDataFrame( + [ + (1, 1.0, Vectors.dense([0.0])), + (2, 2.0, Vectors.dense([2.0])), + (3, 3.0, Vectors.sparse(1, [(0, 3.0)])), + ], + ["index", "weight", "features"], + ) + .coalesce(1) + .sortWithinPartitions("weight") + .select("features") + ) + scaler = StandardScaler(inputCol="features", outputCol="scaled") + self.assertEqual(scaler.getInputCol(), "features") + self.assertEqual(scaler.getOutputCol(), "scaled") + + # Estimator save & load + with tempfile.TemporaryDirectory(prefix="standard_scaler") as d: + scaler.write().overwrite().save(d) + scaler2 = StandardScaler.load(d) + self.assertEqual(str(scaler), str(scaler2)) + + model = scaler.fit(df) + self.assertTrue(np.allclose(model.mean.toArray(), [1.66666667], atol=1e-4)) + self.assertTrue(np.allclose(model.std.toArray(), [1.52752523], atol=1e-4)) + + output = model.transform(df) + self.assertEqual(output.columns, ["features", "scaled"]) + self.assertEqual(output.count(), 3) + + # Model save & load + with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d: + model.write().overwrite().save(d) + model2 = StandardScalerModel.load(d) + self.assertEqual(str(model), str(model2)) + + def test_maxabs_scaler(self): + df = ( + self.spark.createDataFrame( + [ + (1, 1.0, Vectors.dense([0.0])), + (2, 2.0, Vectors.dense([2.0])), + (3, 3.0, Vectors.sparse(1, [(0, 3.0)])), + ], + ["index", "weight", "features"], + ) + .coalesce(1) + .sortWithinPartitions("weight") + .select("features") + ) + + scaler = MaxAbsScaler(inputCol="features", outputCol="scaled") + self.assertEqual(scaler.getInputCol(), "features") + self.assertEqual(scaler.getOutputCol(), "scaled") + + # Estimator save & load + with tempfile.TemporaryDirectory(prefix="maxabs_scaler") as d: + scaler.write().overwrite().save(d) + scaler2 = MaxAbsScaler.load(d) + self.assertEqual(str(scaler), str(scaler2)) + + model = scaler.fit(df) + self.assertTrue(np.allclose(model.maxAbs.toArray(), [3.0], atol=1e-4)) + + output = model.transform(df) + self.assertEqual(output.columns, ["features", "scaled"]) + self.assertEqual(output.count(), 3) + + # Model save & load + with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d: + model.write().overwrite().save(d) + model2 = MaxAbsScalerModel.load(d) + self.assertEqual(str(model), str(model2)) + + def test_minmax_scaler(self): + df = ( + self.spark.createDataFrame( + [ + (1, 1.0, Vectors.dense([0.0])), + (2, 2.0, Vectors.dense([2.0])), + (3, 3.0, Vectors.sparse(1, [(0, 3.0)])), + ], + ["index", "weight", "features"], + ) + .coalesce(1) + .sortWithinPartitions("weight") + .select("features") + ) + + scaler = MinMaxScaler(inputCol="features", outputCol="scaled") + self.assertEqual(scaler.getInputCol(), "features") + self.assertEqual(scaler.getOutputCol(), "scaled") + + # Estimator save & load + with tempfile.TemporaryDirectory(prefix="maxabs_scaler") as d: + scaler.write().overwrite().save(d) + scaler2 = MinMaxScaler.load(d) + self.assertEqual(str(scaler), str(scaler2)) + + model = scaler.fit(df) + self.assertTrue(np.allclose(model.originalMax.toArray(), [3.0], atol=1e-4)) + self.assertTrue(np.allclose(model.originalMin.toArray(), [0.0], atol=1e-4)) + + output = model.transform(df) + self.assertEqual(output.columns, ["features", "scaled"]) + self.assertEqual(output.count(), 3) + + # Model save & load + with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d: + model.write().overwrite().save(d) + model2 = MinMaxScalerModel.load(d) + self.assertEqual(str(model), str(model2)) + def test_binarizer(self): b0 = Binarizer() self.assertListEqual( @@ -530,8 +654,6 @@ def test_vector_size_hint(self): expected = DenseVector([0.0, 10.0, 0.5]) self.assertEqual(output, expected) - -class HashingTFTest(SparkSessionTestCase): def test_apply_binary_term_freqs(self): df = self.spark.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], ["id", "words"]) n = 10 @@ -554,6 +676,10 @@ def test_apply_binary_term_freqs(self): ) +class FeatureTests(FeatureTestsMixin, SparkSessionTestCase): + pass + + if __name__ == "__main__": from pyspark.ml.tests.test_feature import * # noqa: F401 diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala index b85bc6771f8ec..04dbb60cb1edc 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala @@ -425,6 +425,11 @@ private[ml] object MLUtils { // leave a security hole, we define an allowed attribute list that can be accessed. // The attributes could be retrieved from the corresponding python class private lazy val ALLOWED_ATTRIBUTES = HashSet( + "mean", // StandardScalerModel + "std", // StandardScalerModel + "maxAbs", // MaxAbsScalerModel + "originalMax", // MinMaxScalerModel + "originalMin", // MinMaxScalerModel "toString", "toDebugString", "numFeatures",