diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fim/Apriori.scala b/mllib/src/main/scala/org/apache/spark/mllib/fim/Apriori.scala index e956f6338ae63..c9d73b8c59604 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fim/Apriori.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fim/Apriori.scala @@ -22,20 +22,21 @@ import org.apache.spark.SparkContext._ import org.apache.spark.{Logging, SparkContext} /** - * This object implements Apriori algorithm using Spark to find frequent item set in the given data set. + * This object implements Apriori algorithm using Spark to find frequent item set in + * the given data set. */ object Apriori extends Logging with Serializable { /** - * Generate the first round FIS(frequent item set) from input data set. Returns single distinct item that - * appear greater than minCount times. - * + * Generate the first round FIS(frequent item set) from input data set. Returns single + * distinct item that appear greater than minCount times. + * * @param dataSet input data set * @param minCount the minimum appearance time that computed from minimum degree of support * @return FIS */ private def genFirstRoundFIS(dataSet: RDD[Set[String]], - minCount: Double): RDD[(Set[String], Int)] = { + minCount: Double): RDD[(Set[String], Int)] = { dataSet.flatMap(line => line) .map(v => (v, 1)) .reduceByKey(_ + _) @@ -50,16 +51,16 @@ object Apriori extends Logging with Serializable { * @return FIS */ private def scanAndFilter(dataSet: RDD[Set[String]], - candidate: RDD[Set[String]], - minCount: Double, - sc: SparkContext): RDD[(Set[String], Int)] = { + candidate: RDD[Set[String]], + minCount: Double, + sc: SparkContext): RDD[(Set[String], Int)] = { dataSet.cartesian(candidate).map(x => if (x._2.subsetOf(x._1)) { (x._2, 1) } else { (x._2, 0) - }).reduceByKey(_+_).filter(x => x._2 >= minCount) + }).reduceByKey(_ + _).filter(x => x._2 >= minCount) } /** @@ -69,7 +70,7 @@ object Apriori extends Logging with Serializable { * @return candidate FIS */ private def generateCombination(FISk: RDD[Set[String]], - k: Int): RDD[Set[String]] = { + k: Int): RDD[Set[String]] = { FISk.cartesian(FISk) .map(x => x._1 ++ x._2) .filter(x => x.size == k) @@ -85,15 +86,16 @@ object Apriori extends Logging with Serializable { * @return frequent item sets in a array */ def apriori(input: RDD[Array[String]], - minSupport: Double, - sc: SparkContext): Array[(Set[String], Int)] = { + minSupport: Double, + sc: SparkContext): Array[(Set[String], Int)] = { /* * This apriori implementation uses cartesian of two RDD, input data set and candidate * FIS (frequent item set). * The resulting FIS are computed in two steps: * The first step, find eligible distinct item in data set. - * The second step, loop in k round, in each round generate candidate FIS and filter out eligible FIS + * The second step, loop in k round, in each round generate candidate FIS and filter + * out eligible FIS */ // calculate minimum appearance count for minimum degree of support @@ -144,9 +146,9 @@ object Apriori extends Logging with Serializable { FIS.collect().foreach(x => print("(" + x._1 + ", " + x._2 + ") ")) println() } - + private def printCk(Ck: RDD[Set[String]], k: Int) { - print("C" + (k - 2) + " size "+ Ck.count() + " value: ") + print("C" + (k - 2) + " size " + Ck.count() + " value: ") Ck.collect().foreach(print) println() }