Skip to content

Commit

Permalink
[SPARK-50879][ML][PYTHON][CONNECT] Support feature scalers on Connect
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Support feature scalers on Connect:

- org.apache.spark.ml.feature.StandardScaler
- org.apache.spark.ml.feature.MaxAbsScaler
- org.apache.spark.ml.feature.MinMaxScaler

### Why are the changes needed?
for feature parity

### Does this PR introduce _any_ user-facing change?
yes, new algorithms supported on connect

### How was this patch tested?
added tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #49581 from zhengruifeng/ml_connect_scaler.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
  • Loading branch information
zhengruifeng committed Jan 21, 2025
1 parent 3ba74bf commit 6d66f26
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 3 deletions.
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,7 @@ def __hash__(self):
"pyspark.ml.tests.connect.test_parity_regression",
"pyspark.ml.tests.connect.test_parity_clustering",
"pyspark.ml.tests.connect.test_parity_evaluation",
"pyspark.ml.tests.connect.test_parity_feature",
],
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ sealed trait Vector extends Serializable {
@Since("2.0.0")
object Vectors {

private[ml] val empty: Vector = zeros(0)

/**
* Creates a dense vector from its values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ org.apache.spark.ml.recommendation.ALS

# fpm
org.apache.spark.ml.fpm.FPGrowth


# feature
org.apache.spark.ml.feature.StandardScaler
org.apache.spark.ml.feature.MaxAbsScaler
org.apache.spark.ml.feature.MinMaxScaler
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@ org.apache.spark.ml.recommendation.ALSModel

# fpm
org.apache.spark.ml.fpm.FPGrowthModel

# feature
org.apache.spark.ml.feature.StandardScalerModel
org.apache.spark.ml.feature.MaxAbsScalerModel
org.apache.spark.ml.feature.MinMaxScalerModel
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class MaxAbsScalerModel private[ml] (

import MaxAbsScalerModel._

private[ml] def this() = this(Identifiable.randomUID("maxAbsScal"), Vectors.empty)

/** @group setParam */
@Since("2.0.0")
def setInputCol(value: String): this.type = set(inputCol, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class MinMaxScalerModel private[ml] (

import MinMaxScalerModel._

private[ml] def this() = this(Identifiable.randomUID("minMaxScal"), Vectors.empty, Vectors.empty)

/** @group setParam */
@Since("1.5.0")
def setInputCol(value: String): this.type = set(inputCol, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class StandardScalerModel private[ml] (

import StandardScalerModel._

private[ml] def this() = this(Identifiable.randomUID("stdScal"), Vectors.empty, Vectors.empty)

/** @group setParam */
@Since("1.2.0")
def setInputCol(value: String): this.type = set(inputCol, value)
Expand Down
95 changes: 95 additions & 0 deletions python/pyspark/ml/tests/connect/test_parity_feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest

from pyspark.ml.tests.test_feature import FeatureTestsMixin
from pyspark.testing.connectutils import ReusedConnectTestCase


class FeatureParityTests(FeatureTestsMixin, ReusedConnectTestCase):
@unittest.skip("Need to support.")
def test_binarizer(self):
super().test_binarizer()

@unittest.skip("Need to support.")
def test_idf(self):
super().test_idf()

@unittest.skip("Need to support.")
def test_ngram(self):
super().test_ngram()

@unittest.skip("Need to support.")
def test_stopwordsremover(self):
super().test_stopwordsremover()

@unittest.skip("Need to support.")
def test_count_vectorizer_with_binary(self):
super().test_count_vectorizer_with_binary()

@unittest.skip("Need to support.")
def test_count_vectorizer_with_maxDF(self):
super().test_count_vectorizer_with_maxDF()

@unittest.skip("Need to support.")
def test_count_vectorizer_from_vocab(self):
super().test_count_vectorizer_from_vocab()

@unittest.skip("Need to support.")
def test_rformula_force_index_label(self):
super().test_rformula_force_index_label()

@unittest.skip("Need to support.")
def test_rformula_string_indexer_order_type(self):
super().test_rformula_string_indexer_order_type()

@unittest.skip("Need to support.")
def test_string_indexer_handle_invalid(self):
super().test_string_indexer_handle_invalid()

@unittest.skip("Need to support.")
def test_string_indexer_from_labels(self):
super().test_string_indexer_from_labels()

@unittest.skip("Need to support.")
def test_target_encoder_binary(self):
super().test_target_encoder_binary()

@unittest.skip("Need to support.")
def test_target_encoder_continuous(self):
super().test_target_encoder_continuous()

@unittest.skip("Need to support.")
def test_vector_size_hint(self):
super().test_vector_size_hint()

@unittest.skip("Need to support.")
def test_apply_binary_term_freqs(self):
super().test_apply_binary_term_freqs()


if __name__ == "__main__":
from pyspark.ml.tests.connect.test_parity_feature import * # noqa: F401

try:
import xmlrunner # type: ignore[import]

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
132 changes: 129 additions & 3 deletions python/pyspark/ml/tests/test_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
# limitations under the License.
#

import tempfile
import unittest

import numpy as np

from pyspark.ml.feature import (
Binarizer,
CountVectorizer,
Expand All @@ -26,6 +29,12 @@
IDF,
NGram,
RFormula,
StandardScaler,
StandardScalerModel,
MaxAbsScaler,
MaxAbsScalerModel,
MinMaxScaler,
MinMaxScalerModel,
StopWordsRemover,
StringIndexer,
StringIndexerModel,
Expand All @@ -38,7 +47,122 @@
from pyspark.testing.mlutils import check_params, SparkSessionTestCase


class FeatureTests(SparkSessionTestCase):
class FeatureTestsMixin:
def test_standard_scaler(self):
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([0.0])),
(2, 2.0, Vectors.dense([2.0])),
(3, 3.0, Vectors.sparse(1, [(0, 3.0)])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
.select("features")
)
scaler = StandardScaler(inputCol="features", outputCol="scaled")
self.assertEqual(scaler.getInputCol(), "features")
self.assertEqual(scaler.getOutputCol(), "scaled")

# Estimator save & load
with tempfile.TemporaryDirectory(prefix="standard_scaler") as d:
scaler.write().overwrite().save(d)
scaler2 = StandardScaler.load(d)
self.assertEqual(str(scaler), str(scaler2))

model = scaler.fit(df)
self.assertTrue(np.allclose(model.mean.toArray(), [1.66666667], atol=1e-4))
self.assertTrue(np.allclose(model.std.toArray(), [1.52752523], atol=1e-4))

output = model.transform(df)
self.assertEqual(output.columns, ["features", "scaled"])
self.assertEqual(output.count(), 3)

# Model save & load
with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d:
model.write().overwrite().save(d)
model2 = StandardScalerModel.load(d)
self.assertEqual(str(model), str(model2))

def test_maxabs_scaler(self):
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([0.0])),
(2, 2.0, Vectors.dense([2.0])),
(3, 3.0, Vectors.sparse(1, [(0, 3.0)])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
.select("features")
)

scaler = MaxAbsScaler(inputCol="features", outputCol="scaled")
self.assertEqual(scaler.getInputCol(), "features")
self.assertEqual(scaler.getOutputCol(), "scaled")

# Estimator save & load
with tempfile.TemporaryDirectory(prefix="maxabs_scaler") as d:
scaler.write().overwrite().save(d)
scaler2 = MaxAbsScaler.load(d)
self.assertEqual(str(scaler), str(scaler2))

model = scaler.fit(df)
self.assertTrue(np.allclose(model.maxAbs.toArray(), [3.0], atol=1e-4))

output = model.transform(df)
self.assertEqual(output.columns, ["features", "scaled"])
self.assertEqual(output.count(), 3)

# Model save & load
with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d:
model.write().overwrite().save(d)
model2 = MaxAbsScalerModel.load(d)
self.assertEqual(str(model), str(model2))

def test_minmax_scaler(self):
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([0.0])),
(2, 2.0, Vectors.dense([2.0])),
(3, 3.0, Vectors.sparse(1, [(0, 3.0)])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("weight")
.select("features")
)

scaler = MinMaxScaler(inputCol="features", outputCol="scaled")
self.assertEqual(scaler.getInputCol(), "features")
self.assertEqual(scaler.getOutputCol(), "scaled")

# Estimator save & load
with tempfile.TemporaryDirectory(prefix="maxabs_scaler") as d:
scaler.write().overwrite().save(d)
scaler2 = MinMaxScaler.load(d)
self.assertEqual(str(scaler), str(scaler2))

model = scaler.fit(df)
self.assertTrue(np.allclose(model.originalMax.toArray(), [3.0], atol=1e-4))
self.assertTrue(np.allclose(model.originalMin.toArray(), [0.0], atol=1e-4))

output = model.transform(df)
self.assertEqual(output.columns, ["features", "scaled"])
self.assertEqual(output.count(), 3)

# Model save & load
with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d:
model.write().overwrite().save(d)
model2 = MinMaxScalerModel.load(d)
self.assertEqual(str(model), str(model2))

def test_binarizer(self):
b0 = Binarizer()
self.assertListEqual(
Expand Down Expand Up @@ -530,8 +654,6 @@ def test_vector_size_hint(self):
expected = DenseVector([0.0, 10.0, 0.5])
self.assertEqual(output, expected)


class HashingTFTest(SparkSessionTestCase):
def test_apply_binary_term_freqs(self):
df = self.spark.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], ["id", "words"])
n = 10
Expand All @@ -554,6 +676,10 @@ def test_apply_binary_term_freqs(self):
)


class FeatureTests(FeatureTestsMixin, SparkSessionTestCase):
pass


if __name__ == "__main__":
from pyspark.ml.tests.test_feature import * # noqa: F401

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,11 @@ private[ml] object MLUtils {
// leave a security hole, we define an allowed attribute list that can be accessed.
// The attributes could be retrieved from the corresponding python class
private lazy val ALLOWED_ATTRIBUTES = HashSet(
"mean", // StandardScalerModel
"std", // StandardScalerModel
"maxAbs", // MaxAbsScalerModel
"originalMax", // MinMaxScalerModel
"originalMin", // MinMaxScalerModel
"toString",
"toDebugString",
"numFeatures",
Expand Down

0 comments on commit 6d66f26

Please sign in to comment.