Skip to content

Commit

Permalink
fix scalastyle check
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk committed Nov 26, 2014
1 parent f68a0bd commit 7b77ad7
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ object AprioriByBroadcast extends Logging with Serializable {
* @return single item in data Set
*/
def createC1(dataSet: RDD[Array[String]]): Array[Array[String]] = {
//get all distinct item in the RDD
// get all distinct item in the RDD
val itemCollection = dataSet.flatMap(line => line).distinct().collect()

//define new array which item is an array form
// define new array which item is an array form
val itemArrCollection = collection.mutable.ArrayBuffer[Array[String]]()

//change the itemsCollection into itemArrCollection
// change the itemsCollection into itemArrCollection
for (item <- itemCollection) {
itemArrCollection += Array[String](item)
}
Expand All @@ -62,10 +62,8 @@ object AprioriByBroadcast extends Logging with Serializable {
Ck: Array[Array[String]],
minCount: Double,
sc: SparkContext): Array[(Array[String], Int)] = {
//broadcast Ck
// broadcast Ck
val broadcastCk = sc.broadcast(Ck)
//val broadcastCkList: Array[Array[String]] = broadcastCk.value

val Lk = dataSet.flatMap(line => containCk(line, broadcastCk))
.filter(_.length > 0)
.map(v => (v, 1))
Expand Down Expand Up @@ -122,7 +120,7 @@ object AprioriByBroadcast extends Logging with Serializable {
val LkLen = Lk.length
val CkBuffer = collection.mutable.ArrayBuffer[Array[String]]()

//get Ck from Lk
// get Ck from Lk
for (i <- 0 to LkLen - 1)
for (j <- i + 1 to LkLen - 1) {
// get Lk:k-2 before k-2 item
Expand Down Expand Up @@ -183,16 +181,16 @@ object AprioriByBroadcast extends Logging with Serializable {
minSupport: Double,
sc: SparkContext): Array[(Set[String], Int)] = {

//dataSet length
// dataSet length
val dataSetLen: Long = dataSet.count()
//the count line for minSupport
// the count line for minSupport
val minCount = minSupport * dataSetLen

//definite L collection that using save all of frequent item set
// definite L collection that using save all of frequent item set
val L = collection.mutable.ArrayBuffer[Array[(Array[String], Int)]]()
val FIS = collection.mutable.ArrayBuffer[(Set[String], Int)]()

//call aprioriStepOne method to get L1
// call aprioriStepOne method to get L1
val L1: Array[(Array[String], Int)] = aprioriStepOne(dataSet, minCount)
logDebug("L1 length:" + L1.length)
logDebug("L1:" + L1)
Expand All @@ -210,11 +208,11 @@ object AprioriByBroadcast extends Logging with Serializable {
// do the loop while the k > 0 and L length > 1
while ((k > 0) && L(k - 2).length > 1) {

//call createCk method to get Ck
// call createCk method to get Ck
val Ck: Array[Array[String]] = aprioriGen(L(k - 2), k)

if (Ck != null) {
//call createLk method to get Lk
// call createLk method to get Lk
val Lk: Array[(Array[String], Int)] =
scanD(
dataSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ object AprioriByCartesian extends Logging with Serializable {
minSupport: Double,
sc: SparkContext): Array[(Set[String], Int)] = {

//dataSet length
// dataSet length
val dataSetLen: Long = input.count()
//the count line for minSupport
// the count line for minSupport
val minCount = minSupport * dataSetLen
// This algorithm finds frequent item set, so convert each element of RDD to set
val dataSet = input.map(_.toSet)

//definite L collection that using save all of frequent item set
// definite L collection that using save all of frequent item set
val L = collection.mutable.ArrayBuffer[RDD[(Set[String], Int)]]()

val L1: RDD[(Set[String], Int)] = aprioriStepOne(dataSet, minCount)
Expand All @@ -121,7 +121,7 @@ object AprioriByCartesian extends Logging with Serializable {
k = k + 1
L += Lk
}
//return all result in L
// return all result in L
val retArr = collection.mutable.ArrayBuffer[(Set[String], Int)]()
for (l <- L) {
retArr.appendAll(l.collect())
Expand Down
46 changes: 17 additions & 29 deletions mllib/src/main/scala/org/apache/spark/mllib/fim/FPGrowth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.mllib.fim

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.broadcast._
import org.apache.spark.rdd.RDD
import org.apache.spark.{Logging, SparkContext}

import scala.collection.mutable.{ArrayBuffer, Map}

/**
* calculate frequent item set using FPGrowth algorithm with dada set and minSupport
Expand All @@ -47,18 +49,15 @@ class FPGrowth extends Logging with Serializable {
val minCount = minSuport * count
logDebug("minSuppot count:" + minSuport)

//one times scan data db to get L1
// one times scan data db to get L1
val L1 = FPGStepOne(RDD, minCount)
logDebug("L1 length:" + L1.length)
logDebug("L1:" + L1)

//two times scan data db to get Ln
// two times scan data db to get Ln
val Ln = FPGStepTwo(sc, RDD, minCount, L1)
//add L1 and Ln to get fim
val fim = L1 ++ Ln

return fim

// add L1 and Ln to get fim, and return it
L1 ++ Ln
}

/**
Expand Down Expand Up @@ -90,10 +89,8 @@ class FPGrowth extends Logging with Serializable {
RDD: RDD[Array[String]],
minCount: Double,
L1: Array[(String, Int)]): Array[(String, Int)] = {
//broadcast L1
// broadcast L1
val bdL1 = sc.broadcast(L1)
//val bdL1List = bdL1.value

RDD.flatMap(line => L12LineMap(line, bdL1))
.groupByKey()
.flatMap(line => FPTree(line, minCount))
Expand All @@ -117,16 +114,13 @@ class FPGrowth extends Logging with Serializable {
// broadcast value
val bdL1List = bdL1.value
// the result variable
var lineArrayBuffer = collection.mutable.ArrayBuffer[(String, Int)]()
var lineArrayBuffer = ArrayBuffer[(String, Int)]()

for (item <- line) {

val opt = bdL1List.find(_._1.equals(item))

if (opt != None) {
lineArrayBuffer ++= opt
}

}

// sort array
Expand All @@ -135,32 +129,28 @@ class FPGrowth extends Logging with Serializable {
.sortWith(_._2 > _._2)
.toArray


var arrArrayBuffer = collection.mutable.ArrayBuffer[(String, Array[String])]()
var arrArrayBuffer = ArrayBuffer[(String, Array[String])]()

/**
* give (a,4) (b 3),(c,3),after
* b,((a,4)
* c,((a,4) (b 3))
*/
var arrBuffer = collection.mutable.ArrayBuffer[String]()
var arrBuffer = ArrayBuffer[String]()
for (item <- lineArray) {
val arr = lineArray.take(lineArray.indexOf(item))

arrBuffer.clear()

if (arr.length > 0) {
for (tempArr <- arr) {
//remain key
arrBuffer += tempArr._1
}
arrArrayBuffer += ((item._1, arrBuffer.toArray))
}

}

return arrArrayBuffer.toArray

arrArrayBuffer.toArray
}

/**
Expand All @@ -175,8 +165,8 @@ class FPGrowth extends Logging with Serializable {
// the set of construction CPFTree
val value = line._2

val _lineBuffer = collection.mutable.ArrayBuffer[(String, Int)]()
val map = scala.collection.mutable.Map[String, Int]()
val resultBuffer = ArrayBuffer[(String, Int)]()
val map = Map[String, Int]()
// tree step
var k = 1
// loop the data set while k>0
Expand Down Expand Up @@ -215,7 +205,7 @@ class FPGrowth extends Logging with Serializable {

if (lineTemp.size != 0) {
line = lineTemp.toArray.array
_lineBuffer ++= line
resultBuffer ++= line
}

}
Expand All @@ -229,9 +219,7 @@ class FPGrowth extends Logging with Serializable {
}

}

return _lineBuffer.toArray

resultBuffer.toArray
}

}
Expand Down

0 comments on commit 7b77ad7

Please sign in to comment.