Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4001][MLlib] adding parallel FP-Growth algorithm for frequent pattern mining in MLlib #2847

Closed
wants to merge 12 commits into from
208 changes: 208 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.fpm

import org.apache.spark.Logging
import org.apache.spark.SparkContext._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed.

import org.apache.spark.broadcast._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to be explicit on imports

import org.apache.spark.rdd.RDD

import scala.collection.mutable.{ArrayBuffer, Map}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order imports into groups: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports

For a mutable collection, we can import it directly if there is no immutable collection with the same name, e.g., ArrayBuffer. Otherwise, we only import scala.collection.mutable and use mutable.Map in the code to be explicit.


/**
* This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FPGrowth -> FP-growth, use the same name appeared in the paper.

* Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
* independent group of mining tasks. More detail of this algorithm can be found at
* http://infolab.stanford.edu/~echang/recsys08-69.pdf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the permanent link: http://dx.doi.org/10.1145/1454008.1454027

It is nice to cite the FP-Growth paper from Han: http://dx.doi.org/10.1145/335191.335372

*/
class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc minSupport.


/**
* Constructs a FPGrowth instance with default parameters:
* {minSupport: 0.5}
*/
def this() = this(0.5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it too high? How do you set the default value?


/**
* set the minimal support level, default is 0.5
* @param minSupport minimal support level
*/
def setMinSupport(minSupport: Double): this.type = {
this.minSupport = minSupport
this
}

/**
* Compute a FPGrowth Model that contains frequent pattern result.
* @param data input data set
* @return FPGrowth Model
*/
def run(data: RDD[Array[String]]): FPGrowthModel = {
val model = runAlgorithm(data)
model
}

/**
* Implementation of PFP.
*/
private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary to have runAlgorithm, which is the same as run.

val count = data.count()
val minCount = minSupport * count
val single = generateSingleItem(data, minCount)
val combinations = generateCombinations(data, minCount, single)
new FPGrowthModel(single ++ combinations)
}

/**
* Generate single item pattern by filtering the input data using minimal support level
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should document at least the return value. It would be best document the input arguments as well.

*/
private def generateSingleItem(
data: RDD[Array[String]],
minCount: Double): Array[(String, Int)] = {
data.flatMap(v => v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we assuming that the items inside the same basket are distinct? If not, this could be changed to v => v.toSet.

.map(v => (v, 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 -> 1L, if we do need to handle billions of items?

.reduceByKey(_ + _)
.filter(_._2 >= minCount)
.collect()
.distinct
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need distinct?

.sortWith(_._2 > _._2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.sortBy(-_._2) may be easier to understand. It is hard to remember which one results descending ordering, (_._2 > _._2) or (_._2 < _._2).

}

/**
* Generate combination of items by computing on FPTree,
* the computation is done on each FPTree partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should document at least the return value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the return type, it would be nice to use Array[(Array[String], Int)] or RDD[(Array[String], Int)]. For frequent sets with single elements, we can also use this type to represent them. It is not necessary to build strings.

*/
private def generateCombinations(
data: RDD[Array[String]],
minCount: Double,
singleItem: Array[(String, Int)]): Array[(String, Int)] = {
val single = data.context.broadcast(singleItem)
data.flatMap(basket => createFPTree(basket, single))
.groupByKey()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be an aggregateByKey? GroupByKey collects the data to a single reducer, which usually causes scalability issues.

.flatMap(partition => runFPTree(partition, minCount))
.collect()
}

/**
* Create FP-Tree partition for the giving basket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: missing docs.

*/
private def createFPTree(
basket: Array[String],
singleItem: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can call single.value in generateCombinations and remove Broadcast[..] for this function. This produces better code separation.

var output = ArrayBuffer[(String, Array[String])]()
var combination = ArrayBuffer[String]()
val single = singleItem.value
var items = ArrayBuffer[(String, Int)]()

// Filter the basket by single item pattern
val iterator = basket.iterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need an iterator. The code could be simplified to

val candidates = basket.filter(single.contains).map(item => (item, single(item)))

while (iterator.hasNext){
val item = iterator.next
val opt = single.find(_._1.equals(item))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to test membership, should we make single a map?

if (opt != None) {
items ++= opt
}
}

// Sort it and create the item combinations
val sortedItems = items.sortWith(_._1 > _._1).sortWith(_._2 > _._2).toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sorting twice? The second will overwrite the first. Besides, using sortBy(-_._2) would be better.

val itemIterator = sortedItems.iterator
while (itemIterator.hasNext) {
combination.clear()
val item = itemIterator.next
val firstNItems = sortedItems.take(sortedItems.indexOf(item))
if (firstNItems.length > 0) {
val iterator = firstNItems.iterator
while (iterator.hasNext) {
val elem = iterator.next
combination += elem._1
}
output += ((item._1, combination.toArray))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how this implements the Mapper in Algorithm 4 of the PFP paper. The code here doesn't filter the output. For example, if we have a b c d e f and c and f are hashed into the same partition, we only need to send a b c d e f but not a b c to the same partition.

}
}
output.toArray
}

/**
* Generate frequent pattern by walking through the FPTree
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing doc on the input args and the return value.

*/
private def runFPTree(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it a method inside object FPGrowth? It is worth having unit tests for FP-tree building.

partition: (String, Iterable[Array[String]]),
minCount: Double): Array[(String, Int)] = {
val key = partition._1
val value = partition._2
val output = ArrayBuffer[(String, Int)]()
val map = Map[String, Int]()

// Walk through the FPTree partition to generate all combinations that satisfy
// the minimal support level.
var k = 1
while (k > 0) {
map.clear()
val iterator = value.iterator
while (iterator.hasNext) {
val pattern = iterator.next
if (pattern.length >= k) {
val combination = pattern.toList.combinations(k).toList
val itemIterator = combination.iterator
while (itemIterator.hasNext){
val item = itemIterator.next
val list2key: List[String] = (item :+ key).sortWith(_ > _)
val newKey = list2key.mkString(" ")
if (map.get(newKey) == None) {
map(newKey) = 1
} else {
map(newKey) = map.apply(newKey) + 1
}
}
}
}
var eligible: Array[(String, Int)] = null
if (map.size != 0) {
val candidate = map.filter(_._2 >= minCount)
if (candidate.size != 0) {
eligible = candidate.toArray
output ++= eligible
}
}
if ((eligible == null) || (eligible.length == 0)) {
k = 0
} else {
k = k + 1
}
}
output.toArray
}
}

/**
* Top-level methods for calling FPGrowth.
*/
object FPGrowth{

/**
* Generate a FPGrowth Model using the given minimal support level.
*
* @param data input baskets stored as `RDD[Array[String]]`
* @param minSupport minimal support level, for example 0.5
*/
def train(data: RDD[Array[String]], minSupport: Double): FPGrowthModel = {
new FPGrowth().setMinSupport(minSupport).run(data)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.fpm

/**
* A FPGrowth Model for FPGrowth, each element is a frequent pattern with count.
*/
class FPGrowthModel (val frequentPattern: Array[(String, Int)]) extends Serializable {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.fpm

import org.scalatest.FunSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext

class FPGrowthSuite extends FunSuite with MLlibTestSparkContext {

test("test FPGrowth algorithm")
{
val arr = FPGrowthSuite.createTestData()

assert(arr.length === 6)
val dataSet = sc.parallelize(arr)
assert(dataSet.count() == 6)
val rdd = dataSet.map(line => line.split(" "))
assert(rdd.count() == 6)

val algorithm = new FPGrowth()
algorithm.setMinSupport(0.9)
assert(algorithm.run(rdd).frequentPattern.length == 0)
algorithm.setMinSupport(0.8)
assert(algorithm.run(rdd).frequentPattern.length == 1)
algorithm.setMinSupport(0.7)
assert(algorithm.run(rdd).frequentPattern.length == 1)
algorithm.setMinSupport(0.6)
assert(algorithm.run(rdd).frequentPattern.length == 2)
algorithm.setMinSupport(0.5)
assert(algorithm.run(rdd).frequentPattern.length == 18)
algorithm.setMinSupport(0.4)
assert(algorithm.run(rdd).frequentPattern.length == 18)
algorithm.setMinSupport(0.3)
assert(algorithm.run(rdd).frequentPattern.length == 54)
algorithm.setMinSupport(0.2)
assert(algorithm.run(rdd).frequentPattern.length == 54)
algorithm.setMinSupport(0.1)
assert(algorithm.run(rdd).frequentPattern.length == 625)
}
}

object FPGrowthSuite
{
/**
* Create test data set
*/
def createTestData():Array[String] =
{
val arr = Array[String](
"r z h k p",
"z y x w v u t s",
"s x o n r",
"x z y m t s q e",
"z",
"x z y r q t p")
arr
}
}