Skip to content

Commit

Permalink
[SPARK-45757][ML] Avoid re-computation of NNZ in Binarizer
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1, compress vectors with given nnz in Binarizer;

2, rename internal function `def compressed(nnz: Int): Vector` to avoid ambiguous reference issue (`vec.compressed.apply(nnz)`) when there is no type hint
```
[error] /Users/ruifeng.zheng/Dev/spark/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala:132:61: ambiguous reference to overloaded definition,
[error] both method compressed in trait Vector of type (nnz: Int): org.apache.spark.ml.linalg.Vector
[error] and  method compressed in trait Vector of type org.apache.spark.ml.linalg.Vector
```

### Why are the changes needed?
`nnz` is known before compression

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#43619 from zhengruifeng/ml_binarizer_nnz.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
zhengruifeng authored and dongjoon-hyun committed Nov 2, 2023
1 parent eba37d2 commit cdc66a7
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ sealed trait Vector extends Serializable {
* Returns a vector in either dense or sparse format, whichever uses less storage.
*/
@Since("2.0.0")
def compressed: Vector = compressed(numNonzeros)
def compressed: Vector = compressedWithNNZ(numNonzeros)

private[ml] def compressed(nnz: Int): Vector = {
private[ml] def compressedWithNNZ(nnz: Int): Vector = {
// A dense vector needs 8 * size + 8 bytes, while a sparse vector needs 12 * nnz + 20 bytes.
if (1.5 * (nnz + 1.0) < size) {
toSparseWithSize(nnz)
Expand Down
26 changes: 16 additions & 10 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
(Seq($(inputCol)), Seq($(outputCol)), Seq($(threshold)))
}

val mappedOutputCols = inputColNames.zip(tds).map { case (inputColName, td) =>
val binarizerUDF = dataset.schema(inputColName).dataType match {
val mappedOutputCols = inputColNames.zip(tds).map { case (colName, td) =>
dataset.schema(colName).dataType match {
case DoubleType =>
udf { in: Double => if (in > td) 1.0 else 0.0 }
when(!col(colName).isNaN && col(colName) > td, lit(1.0))
.otherwise(lit(0.0))

case _: VectorUDT if td >= 0 =>
udf { vector: Vector =>
Expand All @@ -124,27 +125,32 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
vector.foreachNonZero { (index, value) =>
if (value > td) {
indices += index
values += 1.0
values += 1.0
}
}
Vectors.sparse(vector.size, indices.result(), values.result()).compressed
}

val idxArray = indices.result()
val valArray = values.result()
Vectors.sparse(vector.size, idxArray, valArray)
.compressedWithNNZ(idxArray.length)
}.apply(col(colName))

case _: VectorUDT if td < 0 =>
this.logWarning(s"Binarization operations on sparse dataset with negative threshold " +
s"$td will build a dense output, so take care when applying to sparse input.")
udf { vector: Vector =>
val values = Array.fill(vector.size)(1.0)
var nnz = vector.size
vector.foreachNonZero { (index, value) =>
if (value <= td) {
values(index) = 0.0
nnz -= 1
}
}
Vectors.dense(values).compressed
}
}

binarizerUDF(col(inputColName))
Vectors.dense(values).compressedWithNNZ(nnz)
}.apply(col(colName))
}
}

val outputMetadata = outputColNames.map(outputSchema(_).metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
throw new SparkException(s"$o of type ${o.getClass.getName} is not supported.")
}

val (idxArray, valArray) = (indices.result(), values.result())
Vectors.sparse(featureIndex, idxArray, valArray).compressed(idxArray.length)
val idxArray = indices.result()
val valArray = values.result()
Vectors.sparse(featureIndex, idxArray, valArray)
.compressedWithNNZ(idxArray.length)
}
}

0 comments on commit cdc66a7

Please sign in to comment.