Skip to content

Commit

Permalink
modify per scalastyle check
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk committed Oct 19, 2014
1 parent da2cba7 commit 889b33f
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions mllib/src/main/scala/org/apache/spark/mllib/fim/Apriori.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ import org.apache.spark.SparkContext._
import org.apache.spark.{Logging, SparkContext}

/**
* This object implements Apriori algorithm using Spark to find frequent item set in the given data set.
* This object implements Apriori algorithm using Spark to find frequent item set in
* the given data set.
*/
object Apriori extends Logging with Serializable {

/**
* Generate the first round FIS(frequent item set) from input data set. Returns single distinct item that
* appear greater than minCount times.
*
* Generate the first round FIS(frequent item set) from input data set. Returns single
* distinct item that appear greater than minCount times.
*
* @param dataSet input data set
* @param minCount the minimum appearance time that computed from minimum degree of support
* @return FIS
*/
private def genFirstRoundFIS(dataSet: RDD[Set[String]],
minCount: Double): RDD[(Set[String], Int)] = {
minCount: Double): RDD[(Set[String], Int)] = {
dataSet.flatMap(line => line)
.map(v => (v, 1))
.reduceByKey(_ + _)
Expand All @@ -50,16 +51,16 @@ object Apriori extends Logging with Serializable {
* @return FIS
*/
private def scanAndFilter(dataSet: RDD[Set[String]],
candidate: RDD[Set[String]],
minCount: Double,
sc: SparkContext): RDD[(Set[String], Int)] = {
candidate: RDD[Set[String]],
minCount: Double,
sc: SparkContext): RDD[(Set[String], Int)] = {

dataSet.cartesian(candidate).map(x =>
if (x._2.subsetOf(x._1)) {
(x._2, 1)
} else {
(x._2, 0)
}).reduceByKey(_+_).filter(x => x._2 >= minCount)
}).reduceByKey(_ + _).filter(x => x._2 >= minCount)
}

/**
Expand All @@ -69,7 +70,7 @@ object Apriori extends Logging with Serializable {
* @return candidate FIS
*/
private def generateCombination(FISk: RDD[Set[String]],
k: Int): RDD[Set[String]] = {
k: Int): RDD[Set[String]] = {
FISk.cartesian(FISk)
.map(x => x._1 ++ x._2)
.filter(x => x.size == k)
Expand All @@ -85,15 +86,16 @@ object Apriori extends Logging with Serializable {
* @return frequent item sets in a array
*/
def apriori(input: RDD[Array[String]],
minSupport: Double,
sc: SparkContext): Array[(Set[String], Int)] = {
minSupport: Double,
sc: SparkContext): Array[(Set[String], Int)] = {

/*
* This apriori implementation uses cartesian of two RDD, input data set and candidate
* FIS (frequent item set).
* The resulting FIS are computed in two steps:
* The first step, find eligible distinct item in data set.
* The second step, loop in k round, in each round generate candidate FIS and filter out eligible FIS
* The second step, loop in k round, in each round generate candidate FIS and filter
* out eligible FIS
*/

// calculate minimum appearance count for minimum degree of support
Expand Down Expand Up @@ -144,9 +146,9 @@ object Apriori extends Logging with Serializable {
FIS.collect().foreach(x => print("(" + x._1 + ", " + x._2 + ") "))
println()
}

private def printCk(Ck: RDD[Set[String]], k: Int) {
print("C" + (k - 2) + " size "+ Ck.count() + " value: ")
print("C" + (k - 2) + " size " + Ck.count() + " value: ")
Ck.collect().foreach(print)
println()
}
Expand Down

0 comments on commit 889b33f

Please sign in to comment.