diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByBroadcast.scala b/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByBroadcast.scala deleted file mode 100644 index ca67c6069d0f3..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByBroadcast.scala +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.mllib.fim - -import org.apache.spark.SparkContext._ -import org.apache.spark.broadcast._ -import org.apache.spark.rdd.RDD -import org.apache.spark.{Logging, SparkContext} - -/** - * Calculate frequent item set using Apriori algorithm with minSupport. - * The apriori algorithm have two steps: - * step one is scan the data set to get L1 by minSuppprt - * step two is scan the data set multiple to get Lk - */ -object AprioriByBroadcast extends Logging with Serializable { - - /** - * Create C1 which contains all of single item in data Set. - * - * @param dataSet For mining frequent itemsets dataset - * @return single item in data Set - */ - def createC1(dataSet: RDD[Array[String]]): Array[Array[String]] = { - // get all distinct item in the RDD - val itemCollection = dataSet.flatMap(line => line).distinct().collect() - - // define new array which item is an array form - val itemArrCollection = collection.mutable.ArrayBuffer[Array[String]]() - - // change the itemsCollection into itemArrCollection - for (item <- itemCollection) { - itemArrCollection += Array[String](item) - } - - itemArrCollection.toArray - } - - - /** - * create Lk from Ck.Lk is generated by Ck when the frequent of Ck bigger than minCount - * @param dataSet For mining frequent itemsets dataset - * @param Ck Candidate set - * @param minCount The minimum degree of support - * @return Lk - */ - def scanD(dataSet: RDD[Array[String]], - Ck: Array[Array[String]], - minCount: Double, - sc: SparkContext): Array[(Array[String], Int)] = { - // broadcast Ck - val broadcastCk = sc.broadcast(Ck) - val Lk = dataSet.flatMap(line => containCk(line, broadcastCk)) - .filter(_.length > 0) - .map(v => (v, 1)) - .reduceByKey(_ + _) - .filter(_._2 >= (minCount)) - .map(v => (v._1.split(" "), v._2)) - .collect() - - Lk - } - - - /** - * containCk method. - * @param line dataset line - * @param broadcastCk L1 - * @return get Ck array - */ - def containCk(line: Array[String], - broadcastCk: Broadcast[Array[Array[String]]]): Array[String] = { - - // Ck broadcast value - val broadcastCkList: Array[Array[String]] = broadcastCk.value - // - var dbLineArrayBuffer = collection.mutable.ArrayBuffer[String]() - // the count number - var k: Int = 0 - - for (broadcastCk <- broadcastCkList) { - val bdArray: Array[String] = broadcastCk.sortWith((s, t) => s.compareTo(t) < 0).array - - if (bdArray.toSet subsetOf (line.toSet)) { - val bdString: String = bdArray.mkString(" ") - dbLineArrayBuffer ++= Array[(String)](bdString) - k = k + 1 - } - } - - if (k == 0) { - dbLineArrayBuffer ++= Array[(String)]("") - } - dbLineArrayBuffer.toArray.array - } - - /** - * create Ck by Lk - * @param Lk - * @param k - * @return Ck - */ - def aprioriGen(Lk: Array[(Array[String], Int)], - k: Int): Array[Array[String]] = { - - val LkLen = Lk.length - val CkBuffer = collection.mutable.ArrayBuffer[Array[String]]() - - // get Ck from Lk - for (i <- 0 to LkLen - 1) - for (j <- i + 1 to LkLen - 1) { - // get Lk:k-2 before k-2 item - val L1: Array[String] = - Lk(i)._1.take(k - 2).sortWith((s, t) => s.compareTo(t) < 0) - val L2: Array[String] = - Lk(j)._1.take(k - 2).sortWith((s, t) => s.compareTo(t) < 0) - - // merge set while the two set L1 and L2 equals - if (L1.mkString.equals(L2.mkString)) { - CkBuffer.append((Lk(i)._1.toSet ++ Lk(j)._1.toSet).toArray) - } - } - - if (CkBuffer.length > 0) { - CkBuffer.toArray.array - } else { - null - } - } - - /** - * create L1 - * @param dataSet For mining frequent item sets dataset - * @param minCount The minimum degree of support - * @return L1 - */ - def aprioriStepOne(dataSet: RDD[Array[String]], - minCount: Double): Array[(Array[String], Int)] = { - dataSet.flatMap(line => line) - .map(v => (v, 1)) - .reduceByKey(_ + _) - .filter(_._2 >= minCount) - .map(v => line2Array(v)) - .collect() - } - - /** - * change line type - * @param line line type (String,Int) - * @return line tpye (Array[String],Int) - */ - def line2Array(line: (String, Int)): (Array[String], Int) = { - (Array[String](line._1), line._2) - } - - /** - * apriori algorithm. - * Solving frequent item sets based on the data set and the minimum degree of support. - * The first phase, the scan time data sets, computing frequent item sets L1. - * The second stage, multiple scan data sets, computing frequent item sets Lk. - * @param dataSet For mining frequent item sets dataset - * @param minSupport The minimum degree of support - * @param sc - * @return frequent item sets - */ - def apriori(dataSet: RDD[Array[String]], - minSupport: Double, - sc: SparkContext): Array[(Set[String], Int)] = { - - // dataSet length - val dataSetLen: Long = dataSet.count() - // the count line for minSupport - val minCount = minSupport * dataSetLen - - // definite L collection that using save all of frequent item set - val L = collection.mutable.ArrayBuffer[Array[(Array[String], Int)]]() - val FIS = collection.mutable.ArrayBuffer[(Set[String], Int)]() - - // call aprioriStepOne method to get L1 - val L1: Array[(Array[String], Int)] = aprioriStepOne(dataSet, minCount) - logDebug("L1 length:" + L1.length) - logDebug("L1:" + L1) - - // L1 assignment to L - if (L1.length > 0) { - L += L1 - - for (arr <- L1) { - FIS.append((arr._1.toSet, arr._2)) - } - - // step counter - var k: Int = 2 - // do the loop while the k > 0 and L length > 1 - while ((k > 0) && L(k - 2).length > 1) { - - // call createCk method to get Ck - val Ck: Array[Array[String]] = aprioriGen(L(k - 2), k) - - if (Ck != null) { - // call createLk method to get Lk - val Lk: Array[(Array[String], Int)] = - scanD( - dataSet, - Ck, - minCount, - sc) - // Lk assignment to L - L += Lk - - for (arr <- Lk) { - FIS.append((arr._1.toSet, arr._2)) - } - - k = k + 1 - } - else { - k = -1 - } - } - FIS.toArray - } - else { - Array[(Set[String], Int)]() - } - } - -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByCartesian.scala b/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByCartesian.scala deleted file mode 100644 index 3e90f9f9dac2a..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByCartesian.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.fim - -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.apache.spark.{Logging, SparkContext} - -/** - * Calculate frequent item set using Apriori algorithm with minSupport. - * The apriori algorithm have two steps: - * step one is scan the data set to get L1 by minSuppprt - * step two is scan the data set multiple to get Lk - */ -object AprioriByCartesian extends Logging with Serializable { - - /** - * create L1 - * @param dataSet For mining frequent item sets dataset - * @param minCount The minimum degree of support - * @return L1 - */ - def aprioriStepOne(dataSet: RDD[Set[String]], - minCount: Double): RDD[(Set[String], Int)] = { - dataSet.flatMap(line => line) - .map(v => (v, 1)) - .reduceByKey(_ + _) - .filter(_._2 >= minCount) - .map(x => (Set(x._1), x._2)) - } - - /** - * create Lk from Ck.Lk is generated by Ck when the frequent of Ck bigger than minSupport - * @param Ck Candidate set - * @param minCount The minimum degree of support - * @return Lk - */ - def scanD(dataSet: RDD[Set[String]], - Ck: RDD[Set[String]], - minCount: Double, - sc: SparkContext): RDD[(Set[String], Int)] = { - - dataSet.cartesian(Ck).map(x => - if (x._2.subsetOf(x._1)) { - (x._2, 1) - } else { - (x._2, 0) - }).reduceByKey(_ + _).filter(x => x._2 >= minCount) - } - - /** - * create Ck by Lk - * @param Lk - * @param k - * @return Ck - */ - def aprioriGen(Lk: RDD[Set[String]], - k: Int): RDD[Set[String]] = { - Lk.cartesian(Lk) - .map(x => x._1 ++ x._2) - .filter(x => x.size == k) - .distinct() - } - - /** - * apriori algorithm using cartesian of two RDD. - * Solving frequent item sets based on the data set and the minimum degree of support. - * The first phase, the scan time data sets, computing frequent item sets L1. - * The second stage, multiple scan data sets, computing frequent item sets Lk. - * @param input For mining frequent item sets dataset - * @param minSupport The minimum degree of support - * @param sc - * @return frequent item sets - */ - def apriori(input: RDD[Array[String]], - minSupport: Double, - sc: SparkContext): Array[(Set[String], Int)] = { - - // dataSet length - val dataSetLen: Long = input.count() - // the count line for minSupport - val minCount = minSupport * dataSetLen - // This algorithm finds frequent item set, so convert each element of RDD to set - val dataSet = input.map(_.toSet) - - // definite L collection that using save all of frequent item set - val L = collection.mutable.ArrayBuffer[RDD[(Set[String], Int)]]() - - val L1: RDD[(Set[String], Int)] = aprioriStepOne(dataSet, minCount) - if (L1.count() > 0) { - L += L1 - var Lk = L1 - - // step counter - var k = 2 - - while (L(k - 2).count() > 1) { - - // get candidate of frequent item set - val Ck: RDD[Set[String]] = aprioriGen(L(k - 2).map(x => x._1), k) - - // scan input data set to calculate degree of support for each candidate, - // and filter out the one not ineligible - Lk = scanD(dataSet, Ck, minCount, sc) - - k = k + 1 - L += Lk - } - // return all result in L - val retArr = collection.mutable.ArrayBuffer[(Set[String], Int)]() - for (l <- L) { - retArr.appendAll(l.collect()) - } - retArr.toArray - } else { - Array[(Set[String], Int)]() - } - } - - def printLk(Lk: RDD[(Set[String], Int)], k: Int) { - print("L" + (k - 2) + " size " + Lk.count() + " value: ") - Lk.collect().foreach(x => print("(" + x._1 + ", " + x._2 + ") ")) - println() - } - - def printCk(Ck: RDD[Set[String]], k: Int) { - print("C" + (k - 2) + " size " + Ck.count() + " value: ") - Ck.collect().foreach(print) - println() - } -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fim/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fim/FPGrowth.scala deleted file mode 100644 index 025274009503f..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/fim/FPGrowth.scala +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.fim - -import org.apache.spark.SparkContext._ -import org.apache.spark.broadcast._ -import org.apache.spark.rdd.RDD -import org.apache.spark.{Logging, SparkContext} - -import scala.collection.mutable.{ArrayBuffer, Map} - -/** - * calculate frequent item set using FPGrowth algorithm with dada set and minSupport - * the FPGrowth algorithm have two step task - * step one is scaning data db to get L1 by minSuppprt - * step two is scan data db once to get Lk - */ -class FPGrowth extends Logging with Serializable { - - /** - * FPGrowth algorithm: - * step 1:calculate L1 by minSupport - * step 2: calculate Ln by FP-Tree - * @param sc sparkContext - * @param RDD For mining frequent item sets dataset - * @param minSuport The minimum degree of support - * @return frequent item sets - */ - def FPGrowth(RDD: RDD[Array[String]], - minSuport: Double, - sc: SparkContext): Array[(String, Int)] = { - val count = RDD.count() - logDebug("data set count:" + count) - val minCount = minSuport * count - logDebug("minSuppot count:" + minSuport) - - // one times scan data db to get L1 - val L1 = FPGStepOne(RDD, minCount) - logDebug("L1 length:" + L1.length) - logDebug("L1:" + L1) - - // two times scan data db to get Ln - val Ln = FPGStepTwo(sc, RDD, minCount, L1) - // add L1 and Ln to get fim, and return it - L1 ++ Ln - } - - /** - * Step 1: calculate L1 by min support - * @param RDD For mining frequent item sets dataset - * @param minCount The minimum degree of support - * @return L1 - */ - def FPGStepOne(RDD: RDD[Array[String]], - minCount: Double): Array[(String, Int)] = { - RDD.flatMap(v => v) - .map(v => (v, 1)) - .reduceByKey(_ + _) - .filter(_._2 >= minCount) - .collect() - .distinct - .sortWith(_._2 > _._2) - } - - /** - * step 2: using PFP-Tree to calculate the fim - * @param sc sparkContext - * @param RDD For mining frequent item sets dataset - * @param minCount The minimum degree of support - * @param L1 frenauent item set as length 1 - * @return Ln - */ - def FPGStepTwo(sc: SparkContext, - RDD: RDD[Array[String]], - minCount: Double, - L1: Array[(String, Int)]): Array[(String, Int)] = { - // broadcast L1 - val bdL1 = sc.broadcast(L1) - RDD.flatMap(line => L12LineMap(line, bdL1)) - .groupByKey() - .flatMap(line => FPTree(line, minCount)) - .collect() - - } - - /** - * create CFP-Tree - * give L1,example:a 2,b 4,c 3 and give line,example: a,b,c,d - * after calculate,the result is: - * a,(b,c) - * c,(b) - * note,the result have not b,() - * @param line dataset line - * @param bdL1 L1 - * @return CFP-Tree - */ - def L12LineMap(line: Array[String], - bdL1: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = { - // broadcast value - val bdL1List = bdL1.value - // the result variable - var lineArrayBuffer = ArrayBuffer[(String, Int)]() - - for (item <- line) { - val opt = bdL1List.find(_._1.equals(item)) - if (opt != None) { - lineArrayBuffer ++= opt - } - } - - // sort array - val lineArray = lineArrayBuffer - .sortWith(_._1 > _._1) - .sortWith(_._2 > _._2) - .toArray - - var arrArrayBuffer = ArrayBuffer[(String, Array[String])]() - - /** - * give (a,4) (b 3),(c,3),after - * b,((a,4) - * c,((a,4) (b 3)) - */ - var arrBuffer = ArrayBuffer[String]() - for (item <- lineArray) { - val arr = lineArray.take(lineArray.indexOf(item)) - - arrBuffer.clear() - - if (arr.length > 0) { - for (tempArr <- arr) { - arrBuffer += tempArr._1 - } - arrArrayBuffer += ((item._1, arrBuffer.toArray)) - } - - } - arrArrayBuffer.toArray - } - - /** - * genarate fim set by FPTree,everyone node have a CPFTree that can combination frenquent item - * @param line dataset line - * @param minCount The minimum degree of support - * @return fim - */ - def FPTree(line: (String, Iterable[Array[String]]), minCount: Double): Array[(String, Int)] = { - // frequently item - val key = line._1 - // the set of construction CPFTree - val value = line._2 - - val resultBuffer = ArrayBuffer[(String, Int)]() - val map = Map[String, Int]() - // tree step - var k = 1 - // loop the data set while k>0 - while (k > 0) { - map.clear() - - // loop data set - for (it <- value) { - if (it.length >= k) { - // from m get n combinations,using scala method - val lineCom = it.toList.combinations(k).toList - - // add key to combination - for (item <- lineCom) { - // sort array - val list2key: List[String] = (item :+ key) - .sortWith(_ > _) - - val s = list2key.mkString(" ") - - if (map.get(s) == None) { - map(s) = 1 - } - else { - map(s) = map.apply(s) + 1 - } - } - } - } - - var line: Array[(String, Int)] = null - - if (map.size != 0) { - // get fim set - val lineTemp = map.filter(_._2 >= minCount) - - if (lineTemp.size != 0) { - line = lineTemp.toArray.array - resultBuffer ++= line - } - - } - - // reset k value - if ((line == null) || (line.length == 0)) { - k = 0 - } - else { - k = k + 1 - } - - } - resultBuffer.toArray - } - -} - diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fim/AprioriSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fim/AprioriSuite.scala deleted file mode 100644 index e08e6b6ffb41a..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/fim/AprioriSuite.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.mllib.fim - -import org.apache.spark.SparkContext -import org.apache.spark.mllib.util.LocalSparkContext -import org.apache.spark.rdd.RDD -import org.scalatest.FunSuite - -/** - * scala test unit - * using Practical Machine Learning Book data test the apriori algorithm result by minSupport from 0.9 to 0.1 - */ -class AprioriSuite extends FunSuite with LocalSparkContext { - - test("test FIM with AprioriByBroadcast dataset 1") - { - val arr = AprioriSuite.createFIMDataSet1() - val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= AprioriByBroadcast.apriori - - val dataSet = sc.parallelize(arr) - val rdd = dataSet.map(line => line.split(" ")) - - for (i <- 1 to 9){ - println(s"frequent item set with support ${i/10d}") - target(rdd, i/10d, sc).foreach(x => print("(" + x._1 + "), ")) - println() - } - } - - test("test FIM with AprioriByBroadcast dataset 2") - { - val arr = AprioriSuite.createFIMDataSet2() - val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= AprioriByBroadcast.apriori - - val dataSet = sc.parallelize(arr) - val rdd = dataSet.map(line => line.split(" ")) - - assert(target(rdd,0.9,sc).length == 0) - - assert(target(rdd,0.8,sc).length == 1) - - assert(target(rdd,0.7,sc).length == 1) - - assert(target(rdd,0.6,sc).length == 2) - - assert(target(rdd,0.5,sc).length == 18) - - assert(target(rdd,0.4,sc).length == 18) - - assert(target(rdd,0.3,sc).length == 54) - - assert(target(rdd,0.2,sc).length == 54) - - assert(target(rdd,0.1,sc).length == 625) - - } - - test("test FIM with AprioriByCartesian dataset 1") - { - val arr = AprioriSuite.createFIMDataSet1() - val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= AprioriByCartesian.apriori - - val dataSet = sc.parallelize(arr) - val rdd = dataSet.map(line => line.split(" ")) - - for (i <- 1 to 9){ - println(s"frequent item set with support ${i/10d}") - target(rdd, i/10d, sc).foreach(x => print("(" + x._1 + "), ")) - println() - } - - - } - test("test FIM with AprioriByCartesian dataset 2") - { - val arr = AprioriSuite.createFIMDataSet2() - val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= AprioriByCartesian.apriori - - val dataSet = sc.parallelize(arr) - val rdd = dataSet.map(line => line.split(" ")) - - assert(target(rdd,0.9,sc).length == 0) - - assert(target(rdd,0.8,sc).length == 1) - - assert(target(rdd,0.7,sc).length == 1) - - assert(target(rdd,0.6,sc).length == 2) - - assert(target(rdd,0.5,sc).length == 18) - - assert(target(rdd,0.4,sc).length == 18) - - assert(target(rdd,0.3,sc).length == 54) - - assert(target(rdd,0.2,sc).length == 54) - - assert(target(rdd,0.1,sc).length == 625) - - } - -} - -/** - * create dataset - */ -object AprioriSuite -{ - /** - * create dataset using Practical Machine Learning Book data - * @return dataset - */ - def createFIMDataSet1():Array[String] = { - val arr = Array[String]( - "1 3 4", - "2 3 5", - "1 2 3 5", - "2 5") - arr - } - - def createFIMDataSet2():Array[String] = { - val arr = Array[String]( - "r z h j p", - "z y x w v u t s", - "z", - "r x n o s", - "y r x z q t p", - "y z x e q s t m") - arr - } -} - diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fim/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fim/FPGrowthSuite.scala deleted file mode 100644 index 5156babcd1975..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/fim/FPGrowthSuite.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.mllib.fim - -import org.scalatest.FunSuite -import org.apache.spark.mllib.util.LocalSparkContext - -/** - * scala test unit - * using Practical Machine Learning Book data test the FPGrowth algorithm result by minSupport from 0.9 to 0.1 - */ -class FPGrowthSuite extends FunSuite with LocalSparkContext { - - test("test FIM with FPGrowth") - { - val arr = AprioriSuite.createFIMDataSet2() - - assert(arr.length === 6) - val dataSet = sc.parallelize(arr) - assert(dataSet.count() == 6) - val rdd = dataSet.map(line => line.split(" ")) - assert(rdd.count() == 6) - - //check frenquent item set length - val fimWithFPGrowth = new FPGrowth() - println(fimWithFPGrowth.FPGrowth(rdd,0.1,sc).length) - - assert(fimWithFPGrowth.FPGrowth(rdd,0.9,sc).length == 0) - - assert(fimWithFPGrowth.FPGrowth(rdd,0.8,sc).length == 1) - - assert(fimWithFPGrowth.FPGrowth(rdd,0.7,sc).length == 1) - - assert(fimWithFPGrowth.FPGrowth(rdd,0.6,sc).length == 2) - - assert(fimWithFPGrowth.FPGrowth(rdd,0.5,sc).length == 18) - - assert(fimWithFPGrowth.FPGrowth(rdd,0.4,sc).length == 18) - - assert(fimWithFPGrowth.FPGrowth(rdd,0.3,sc).length == 54) - - assert(fimWithFPGrowth.FPGrowth(rdd,0.2,sc).length == 54) - - assert(fimWithFPGrowth.FPGrowth(rdd,0.1,sc).length == 625) - - } -} - -/** - * create dataset - */ -object FPGrowthSuite -{ - /** - * create dataset using Practical Machine Learning Book data - * @return dataset - */ - def createFIMDataSet():Array[String] = - { - val arr = Array[String]("r z h j p", - "z y x w v u t s", - "z", - "r x n o s", - "y r x z q t p", - "y z x e q s t m") - return arr - } -}