Skip to content

Commit

Permalink
[SPARK-5726] [MLLIB] Hadamard Vector Product Transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
ogeagla committed May 7, 2015
1 parent 068c315 commit 4922722
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 0 deletions.
73 changes: 73 additions & 0 deletions docs/mllib-feature-extraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,3 +477,76 @@ sc.stop();
</div>
</div>

## HadamardProduct

HadamardProduct scales individual vector samples by a provided weighting vector component-wise. This represents the [Hadamard product](https://en.wikipedia.org/wiki/Hadamard_product_%28matrices%29) between the input vector, `v` and weighting vector, `w`, to yield a result vector.

`\[ \begin{pmatrix}
v_1 \\
\vdots \\
v_N
\end{pmatrix} \circ \begin{pmatrix}
w_1 \\
\vdots \\
w_N
\end{pmatrix}
= \begin{pmatrix}
v_1 w_1 \\
\vdots \\
v_N w_N
\end{pmatrix}
\]`

[`HadamardProduct`](api/scala/index.html#org.apache.spark.mllib.feature.HadamardProduct) has the following parameter in the constructor:

* `w` Vector, the scaling vector.

`HadamardProduct` implements [`VectorTransformer`](api/scala/index.html#org.apache.spark.mllib.feature.VectorTransformer) which can apply the weighting on a `Vector` to produce a transformed `Vector` or on an `RDD[Vector]` to produce a transformed `RDD[Vector]`.

### Example

This example below demonstrates how to load a simple vectors file, extract a set of vectors, then weight those vectors using a weighting vector value.


<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.HadamardProduct
import org.apache.spark.mllib.linalg.Vectors

//load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))

val weightingVector = Vectors.dense(0.0, 1.0, 2.0)
val scaler = new HadamardProduct(weightingVector)

//same results:
val weightedData = scaler.transform(parsedData)
val weightedData2 = parsedData.map(x => scaler.transform(x))

{% endhighlight %}
</div>

<div data-lang="python">
{% highlight python %}
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import HadamardProduct

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

weightingVector = Vectors.dense(0.0, 1.0, 2.0)
scaler = HadamardProduct(weightingVector)

# Same results:
weightedData = scaler.transform(parsedData)
weightedData2 = parsedData.map(lambda x: scaler.transform(x))

{% endhighlight %}
</div>
</div>


Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.mllib.feature.HadamardProduct
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.types.DataType

/**
* :: AlphaComponent
* Maps a vector to the hadamard product of it and a reference vector.
*/
@AlphaComponent
class HadamardProductTF extends UnaryTransformer[Vector, Vector, HadamardProductTF] {

/** the vector to multiply with input vectors */
val scalingVec : Param[Vector] = new Param(this, "scalingVector", "vector for hadamard product")
def setScalingVec(value: Vector) = set(scalingVec, value)
def getScalingVec: Vector = get(scalingVec)

override protected def createTransformFunc(paramMap: ParamMap): Vector => Vector = {
val hadScaler = new HadamardProduct(paramMap(scalingVec))
hadScaler.transform
}

override protected def outputDataType: DataType = new VectorUDT()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.feature

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg._

/**
* :: Experimental ::
* Component-wise scaling of dense vectors by a provided vector's components.
*
* @param scalingVector The values used to scale the reference vector's individual components.
*/
@Experimental
class HadamardProduct(val scalingVector: Vector) extends VectorTransformer {

/**
* Does the hadamard product transformation.
*
* @param vector vector to be transformed.
* @return transformed vector.
*/
override def transform(vector: Vector): Vector = {
require(vector.size == scalingVector.size)
vector match {
case dv: DenseVector =>
val values: Array[Double] = dv.values.clone()
val dim = scalingVector.size
var i = 0
while(i < dim) {
values(i) *= scalingVector(i)
i+=1
}
Vectors.dense(values)
case SparseVector(size, indices, vs) =>
val values = vs.clone()
val dim = values.size
var i = 0
while (i < dim) {
values(i) *= scalingVector.apply(indices(i))
i += 1
}
Vectors.sparse(size, indices, values)
case v => throw new IllegalArgumentException("Does not support vector type " + v.getClass)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.feature

import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.scalatest.FunSuite
import org.apache.spark.mllib.util.TestingUtils._

class HadamardProductSuite extends FunSuite with MLlibTestSparkContext{

val denseData = Array(
Vectors.dense(1.0, 1.0, 0.0, 0.0),
Vectors.dense(1.0, 2.0, -3.0, 0.0),
Vectors.dense(1.0, 3.0, 0.0, 0.0),
Vectors.dense(1.0, 4.0, 1.9, -9.0),
Vectors.dense(1.0, 5.0, 0.0, 0.0)
)

val sparseData = Array(
Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
Vectors.sparse(3, Seq((1, -1.0), (2, -3.0))),
Vectors.sparse(3, Seq((1, -5.1))),
Vectors.sparse(3, Seq((0, 3.8), (2, 1.9))),
Vectors.sparse(3, Seq((0, 1.7), (1, -0.6))),
Vectors.sparse(3, Seq((1, 1.9)))
)

val scalingVector = Vectors.dense(2.0, 0.5, 0.0, 0.25)

test("hadamard product should properly apply vector to dense data set") {

val scaler = new HadamardProduct(scalingVector)
val scaledData = scaler.transform(sc.makeRDD(denseData))

val scaledVecs = scaledData.collect()

val fourthVec = scaledVecs.apply(3).toArray

assert(fourthVec.apply(0) === 2.0, "product by 2.0 should have been applied")
assert(fourthVec.apply(1) === 2.0, "product by 0.5 should have been applied")
assert(fourthVec.apply(2) === 0.0, "product by 0.0 should have been applied")
assert(fourthVec.apply(3) === -2.25, "product by 0.25 should have been applied")
}

test("hadamard product should properly apply vector to sparse data set") {

val dataRDD = sc.parallelize(sparseData, 3)

val scalingVec = Vectors.dense(1.0, 0.0, 0.5)

val hadScaler = new HadamardProduct(scalingVec)

val data2 = sparseData.map(hadScaler.transform)
val data2RDD = hadScaler.transform(dataRDD)

assert((sparseData, data2, data2RDD.collect()).zipped.forall {
case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
case _ => false
}, "The vector type should be preserved after hadamard product")

assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))

assert(data2(0) ~== Vectors.sparse(3, Seq((0, -2.0), (1, 0.0))) absTol 1E-5)
assert(data2(1) ~== Vectors.sparse(3, Seq((1, 0.0), (2, -1.5))) absTol 1E-5)
}
}

0 comments on commit 4922722

Please sign in to comment.