diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fim/Apriori.scala b/mllib/src/main/scala/org/apache/spark/mllib/fim/Apriori.scala deleted file mode 100644 index c9d73b8c59604..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/fim/Apriori.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.fim - -import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ -import org.apache.spark.{Logging, SparkContext} - -/** - * This object implements Apriori algorithm using Spark to find frequent item set in - * the given data set. - */ -object Apriori extends Logging with Serializable { - - /** - * Generate the first round FIS(frequent item set) from input data set. Returns single - * distinct item that appear greater than minCount times. - * - * @param dataSet input data set - * @param minCount the minimum appearance time that computed from minimum degree of support - * @return FIS - */ - private def genFirstRoundFIS(dataSet: RDD[Set[String]], - minCount: Double): RDD[(Set[String], Int)] = { - dataSet.flatMap(line => line) - .map(v => (v, 1)) - .reduceByKey(_ + _) - .filter(_._2 >= minCount) - .map(x => (Set(x._1), x._2)) - } - - /** - * Scan the input data set and filter out the eligible FIS - * @param candidate candidate FIS - * @param minCount the minimum appearance time that computed from minimum degree of support - * @return FIS - */ - private def scanAndFilter(dataSet: RDD[Set[String]], - candidate: RDD[Set[String]], - minCount: Double, - sc: SparkContext): RDD[(Set[String], Int)] = { - - dataSet.cartesian(candidate).map(x => - if (x._2.subsetOf(x._1)) { - (x._2, 1) - } else { - (x._2, 0) - }).reduceByKey(_ + _).filter(x => x._2 >= minCount) - } - - /** - * Generate the next round of FIS candidate using this round FIS - * @param FISk - * @param k - * @return candidate FIS - */ - private def generateCombination(FISk: RDD[Set[String]], - k: Int): RDD[Set[String]] = { - FISk.cartesian(FISk) - .map(x => x._1 ++ x._2) - .filter(x => x.size == k) - .distinct() - } - - /** - * Function of apriori algorithm implementation. - * - * @param input Input data set to find frequent item set - * @param minSupport The minimum degree of support - * @param sc SparkContext to use - * @return frequent item sets in a array - */ - def apriori(input: RDD[Array[String]], - minSupport: Double, - sc: SparkContext): Array[(Set[String], Int)] = { - - /* - * This apriori implementation uses cartesian of two RDD, input data set and candidate - * FIS (frequent item set). - * The resulting FIS are computed in two steps: - * The first step, find eligible distinct item in data set. - * The second step, loop in k round, in each round generate candidate FIS and filter - * out eligible FIS - */ - - // calculate minimum appearance count for minimum degree of support - val dataSetLen: Long = input.count() - val minCount = minSupport * dataSetLen - - // This algorithm finds frequent item set, so convert each element of RDD to set to - // eliminate duplicated item if any - val dataSet = input.map(_.toSet) - - // FIS is the result to return - val FIS = collection.mutable.ArrayBuffer[RDD[(Set[String], Int)]]() - val FIS1: RDD[(Set[String], Int)] = genFirstRoundFIS(dataSet, minCount) - if (FIS1.count() < 0) { - return Array[(Set[String], Int)]() - } - - FIS += FIS1 - - // FIS for round k - var FISk = FIS1 - // round counter - var k = 2 - - while (FIS(k - 2).count() > 1) { - - // generate candidate FIS - val candidate: RDD[Set[String]] = generateCombination(FIS(k - 2).map(x => x._1), k) - - // filter out eligible FIS - FISk = scanAndFilter(dataSet, candidate, minCount, sc) - - // add it to the result and go to next round - FIS += FISk - k = k + 1 - } - - // convert all FIS to array before returning them - val retArr = collection.mutable.ArrayBuffer[(Set[String], Int)]() - for (l <- FIS) { - retArr.appendAll(l.collect()) - } - retArr.toArray - } - - private def printFISk(FIS: RDD[(Set[String], Int)], k: Int) { - print("FIS" + (k - 2) + " size " + FIS.count() + " value: ") - FIS.collect().foreach(x => print("(" + x._1 + ", " + x._2 + ") ")) - println() - } - - private def printCk(Ck: RDD[Set[String]], k: Int) { - print("C" + (k - 2) + " size " + Ck.count() + " value: ") - Ck.collect().foreach(print) - println() - } -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByBroadcast.scala b/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByBroadcast.scala new file mode 100644 index 0000000000000..7dc7bea26d166 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByBroadcast.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.fim + +import org.apache.spark.SparkContext._ +import org.apache.spark.broadcast._ +import org.apache.spark.rdd.RDD +import org.apache.spark.{Logging, SparkContext} + +/** + * Calculate frequent item set using Apriori algorithm with minSupport. + * The apriori algorithm have two steps: + * step one is scan the data set to get L1 by minSuppprt + * step two is scan the data set multiple to get Lk + */ +object AprioriByBroadcast extends Logging with Serializable { + + /** + * Create C1 which contains all of single item in data Set. + * + * @param dataSet For mining frequent itemsets dataset + * @return single item in data Set + */ + def createC1(dataSet: RDD[Array[String]]): Array[Array[String]] = { + //get all distinct item in the RDD + val itemCollection = dataSet.flatMap(line => line).distinct().collect() + + //define new array which item is an array form + val itemArrCollection = collection.mutable.ArrayBuffer[Array[String]]() + + //change the itemsCollection into itemArrCollection + for (item <- itemCollection) { + itemArrCollection += Array[String](item) + } + + itemArrCollection.toArray + } + + + /** + * create Lk from Ck.Lk is generated by Ck when the frequent of Ck bigger than minCount + * @param dataSet For mining frequent itemsets dataset + * @param Ck Candidate set + * @param minCount The minimum degree of support + * @return Lk + */ + def scanD(dataSet: RDD[Array[String]], + Ck: Array[Array[String]], + minCount: Double, + sc: SparkContext): Array[(Array[String], Int)] = { + //broadcast Ck + val broadcastCk = sc.broadcast(Ck) + //val broadcastCkList: Array[Array[String]] = broadcastCk.value + + val Lk = dataSet.flatMap(line => containCk(line, broadcastCk)) + .filter(_.length > 0) + .map(v => (v, 1)) + .reduceByKey(_ + _) + .filter(_._2 >= (minCount)) + .map(v => (v._1.split(" "), v._2)) + .collect() + + Lk + } + + + /** + * containCk method. + * @param line dataset line + * @param broadcastCk L1 + * @return get Ck array + */ + def containCk(line: Array[String], + broadcastCk: Broadcast[Array[Array[String]]]): Array[String] = { + + // Ck broadcast value + val broadcastCkList: Array[Array[String]] = broadcastCk.value + // + var dbLineArrayBuffer = collection.mutable.ArrayBuffer[String]() + // the count number + var k: Int = 0 + + for (broadcastCk <- broadcastCkList) { + val bdArray: Array[String] = broadcastCk.sortWith((s, t) => s.compareTo(t) < 0).array + + if (bdArray.toSet subsetOf (line.toSet)) { + val bdString: String = bdArray.mkString(" ") + dbLineArrayBuffer ++= Array[(String)](bdString) + k = k + 1 + } + } + + if (k == 0) { + dbLineArrayBuffer ++= Array[(String)]("") + } + dbLineArrayBuffer.toArray.array + } + + /** + * create Ck by Lk + * @param Lk + * @param k + * @return Ck + */ + def aprioriGen(Lk: Array[(Array[String], Int)], + k: Int): Array[Array[String]] = { + + val LkLen = Lk.length + val CkBuffer = collection.mutable.ArrayBuffer[Array[String]]() + + //get Ck from Lk + for (i <- 0 to LkLen - 1) + for (j <- i + 1 to LkLen - 1) { + // get Lk:k-2 before k-2 item + val L1: Array[String] = + Lk(i)._1.take(k - 2).sortWith((s, t) => s.compareTo(t) < 0) + val L2: Array[String] = + Lk(j)._1.take(k - 2).sortWith((s, t) => s.compareTo(t) < 0) + + // merge set while the two set L1 and L2 equals + if (L1.mkString.equals(L2.mkString)) { + CkBuffer.append((Lk(i)._1.toSet ++ Lk(j)._1.toSet).toArray) + } + } + + if (CkBuffer.length > 0) { + CkBuffer.toArray.array + } else { + null + } + } + + /** + * create L1 + * @param dataSet For mining frequent item sets dataset + * @param minCount The minimum degree of support + * @return L1 + */ + def aprioriStepOne(dataSet: RDD[Array[String]], + minCount: Double): Array[(Array[String], Int)] = { + dataSet.flatMap(line => line) + .map(v => (v, 1)) + .reduceByKey(_ + _) + .filter(_._2 >= minCount) + .map(v => line2Array(v)) + .collect() + } + + /** + * change line type + * @param line line type (String,Int) + * @return line tpye (Array[String],Int) + */ + def line2Array(line: (String, Int)): (Array[String], Int) = { + (Array[String](line._1), line._2) + } + + /** + * apriori algorithm. + * Solving frequent item sets based on the data set and the minimum degree of support. + * The first phase, the scan time data sets, computing frequent item sets L1. + * The second stage, multiple scan data sets, computing frequent item sets Lk. + * @param dataSet For mining frequent item sets dataset + * @param minSupport The minimum degree of support + * @param sc + * @return frequent item sets + */ + def apriori(dataSet: RDD[Array[String]], + minSupport: Double, + sc: SparkContext): Array[(Set[String], Int)] = { + + //dataSet length + val dataSetLen: Long = dataSet.count() + //the count line for minSupport + val minCount = minSupport * dataSetLen + + //definite L collection that using save all of frequent item set + val L = collection.mutable.ArrayBuffer[Array[(Array[String], Int)]]() + val FIS = collection.mutable.ArrayBuffer[(Set[String], Int)]() + + //call aprioriStepOne method to get L1 + val L1: Array[(Array[String], Int)] = aprioriStepOne(dataSet, minCount) + logDebug("L1 length:" + L1.length) + logDebug("L1:" + L1) + + // L1 assignment to L + if (L1.length > 0) { + L += L1 + + for (arr <- L1) { + FIS.append((arr._1.toSet, arr._2)) + } + + // step counter + var k: Int = 2 + // do the loop while the k > 0 and L length > 1 + while ((k > 0) && L(k - 2).length > 1) { + + //call createCk method to get Ck + val Ck: Array[Array[String]] = aprioriGen(L(k - 2), k) + + if (Ck != null) { + //call createLk method to get Lk + val Lk: Array[(Array[String], Int)] = + scanD( + dataSet, + Ck, + minCount, + sc) + // Lk assignment to L + L += Lk + + for (arr <- Lk) { + FIS.append((arr._1.toSet, arr._2)) + } + + k = k + 1 + } + else { + k = -1 + } + } + FIS.toArray + } + else { + Array[(Set[String], Int)]() + } + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByCartesian.scala b/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByCartesian.scala new file mode 100644 index 0000000000000..bd256bac10c0f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/fim/AprioriByCartesian.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.fim + +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.{Logging, SparkContext} + +/** + * Calculate frequent item set using Apriori algorithm with minSupport. + * The apriori algorithm have two steps: + * step one is scan the data set to get L1 by minSuppprt + * step two is scan the data set multiple to get Lk + */ +object AprioriByCartesian extends Logging with Serializable { + + /** + * create L1 + * @param dataSet For mining frequent item sets dataset + * @param minCount The minimum degree of support + * @return L1 + */ + def aprioriStepOne(dataSet: RDD[Set[String]], + minCount: Double): RDD[(Set[String], Int)] = { + dataSet.flatMap(line => line) + .map(v => (v, 1)) + .reduceByKey(_ + _) + .filter(_._2 >= minCount) + .map(x => (Set(x._1), x._2)) + } + + /** + * create Lk from Ck.Lk is generated by Ck when the frequent of Ck bigger than minSupport + * @param Ck Candidate set + * @param minCount The minimum degree of support + * @return Lk + */ + def scanD(dataSet: RDD[Set[String]], + Ck: RDD[Set[String]], + minCount: Double, + sc: SparkContext): RDD[(Set[String], Int)] = { + + dataSet.cartesian(Ck).map(x => + if (x._2.subsetOf(x._1)) { + (x._2, 1) + } else { + (x._2, 0) + }).reduceByKey(_ + _).filter(x => x._2 >= minCount) + } + + /** + * create Ck by Lk + * @param Lk + * @param k + * @return Ck + */ + def aprioriGen(Lk: RDD[Set[String]], + k: Int): RDD[Set[String]] = { + Lk.cartesian(Lk) + .map(x => x._1 ++ x._2) + .filter(x => x.size == k) + .distinct() + } + + /** + * apriori algorithm using cartesian of two RDD. + * Solving frequent item sets based on the data set and the minimum degree of support. + * The first phase, the scan time data sets, computing frequent item sets L1. + * The second stage, multiple scan data sets, computing frequent item sets Lk. + * @param input For mining frequent item sets dataset + * @param minSupport The minimum degree of support + * @param sc + * @return frequent item sets + */ + def apriori(input: RDD[Array[String]], + minSupport: Double, + sc: SparkContext): Array[(Set[String], Int)] = { + + //dataSet length + val dataSetLen: Long = input.count() + //the count line for minSupport + val minCount = minSupport * dataSetLen + // This algorithm finds frequent item set, so convert each element of RDD to set + val dataSet = input.map(_.toSet) + + //definite L collection that using save all of frequent item set + val L = collection.mutable.ArrayBuffer[RDD[(Set[String], Int)]]() + + val L1: RDD[(Set[String], Int)] = aprioriStepOne(dataSet, minCount) + if (L1.count() > 0) { + L += L1 + var Lk = L1 + + // step counter + var k = 2 + + while (L(k - 2).count() > 1) { + + // get candidate of frequent item set + val Ck: RDD[Set[String]] = aprioriGen(L(k - 2).map(x => x._1), k) + + // scan input data set to calculate degree of support for each candidate, + // and filter out the one not ineligible + Lk = scanD(dataSet, Ck, minCount, sc) + + k = k + 1 + L += Lk + } + //return all result in L + val retArr = collection.mutable.ArrayBuffer[(Set[String], Int)]() + for (l <- L) { + retArr.appendAll(l.collect()) + } + retArr.toArray + } else { + Array[(Set[String], Int)]() + } + } + + def printLk(Lk: RDD[(Set[String], Int)], k: Int) { + print("L" + (k - 2) + " size " + Lk.count() + " value: ") + Lk.collect().foreach(x => print("(" + x._1 + ", " + x._2 + ") ")) + println() + } + + def printCk(Ck: RDD[Set[String]], k: Int) { + print("C" + (k - 2) + " size " + Ck.count() + " value: ") + Ck.collect().foreach(print) + println() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fim/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fim/FPGrowth.scala new file mode 100644 index 0000000000000..fc2db8e32073e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/fim/FPGrowth.scala @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.fim + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.broadcast._ + +/** + * calculate frequent item set using FPGrowth algorithm with dada set and minSupport + * the FPGrowth algorithm have two step task + * step one is scaning data db to get L1 by minSuppprt + * step two is scan data db once to get Lk + */ +class FPGrowth extends Logging with Serializable { + + /** + * FPGrowth algorithm: + * step 1:calculate L1 by minSupport + * step 2: calculate Ln by FP-Tree + * @param sc sparkContext + * @param RDD For mining frequent item sets dataset + * @param minSuport The minimum degree of support + * @return frequent item sets + */ + def FPGrowth(RDD: RDD[Array[String]], + minSuport: Double, + sc: SparkContext): Array[(String, Int)] = { + val count = RDD.count() + logDebug("data set count:" + count) + val minCount = minSuport * count + logDebug("minSuppot count:" + minSuport) + + //one times scan data db to get L1 + val L1 = FPGStepOne(RDD, minCount) + logDebug("L1 length:" + L1.length) + logDebug("L1:" + L1) + + //two times scan data db to get Ln + val Ln = FPGStepTwo(sc, RDD, minCount, L1) + //add L1 and Ln to get fim + val fim = L1 ++ Ln + + return fim + + } + + /** + * Step 1: calculate L1 by min support + * @param RDD For mining frequent item sets dataset + * @param minCount The minimum degree of support + * @return L1 + */ + def FPGStepOne(RDD: RDD[Array[String]], + minCount: Double): Array[(String, Int)] = { + RDD.flatMap(v => v) + .map(v => (v, 1)) + .reduceByKey(_ + _) + .filter(_._2 >= minCount) + .collect() + .distinct + .sortWith(_._2 > _._2) + } + + /** + * step 2: using PFP-Tree to calculate the fim + * @param sc sparkContext + * @param RDD For mining frequent item sets dataset + * @param minCount The minimum degree of support + * @param L1 frenauent item set as length 1 + * @return Ln + */ + def FPGStepTwo(sc: SparkContext, + RDD: RDD[Array[String]], + minCount: Double, + L1: Array[(String, Int)]): Array[(String, Int)] = { + //broadcast L1 + val bdL1 = sc.broadcast(L1) + //val bdL1List = bdL1.value + + RDD.flatMap(line => L12LineMap(line, bdL1)) + .groupByKey() + .flatMap(line => FPTree(line, minCount)) + .collect() + + } + + /** + * create CFP-Tree + * give L1,example:a 2,b 4,c 3 and give line,example: a,b,c,d + * after calculate,the result is: + * a,(b,c) + * c,(b) + * note,the result have not b,() + * @param line dataset line + * @param bdL1 L1 + * @return CFP-Tree + */ + def L12LineMap(line: Array[String], + bdL1: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = { + // broadcast value + val bdL1List = bdL1.value + // the result variable + var lineArrayBuffer = collection.mutable.ArrayBuffer[(String, Int)]() + + for (item <- line) { + + val opt = bdL1List.find(_._1.equals(item)) + + if (opt != None) { + lineArrayBuffer ++= opt + } + + } + + // sort array + val lineArray = lineArrayBuffer + .sortWith(_._1 > _._1) + .sortWith(_._2 > _._2) + .toArray + + + var arrArrayBuffer = collection.mutable.ArrayBuffer[(String, Array[String])]() + + /** + * give (a,4) (b 3),(c,3),after + * b,((a,4) + * c,((a,4) (b 3)) + */ + var arrBuffer = collection.mutable.ArrayBuffer[String]() + for (item <- lineArray) { + val arr = lineArray.take(lineArray.indexOf(item)) + + arrBuffer.clear() + + if (arr.length > 0) { + for (tempArr <- arr) { + //remain key + arrBuffer += tempArr._1 + } + arrArrayBuffer += ((item._1, arrBuffer.toArray)) + } + + } + + return arrArrayBuffer.toArray + + } + + /** + * genarate fim set by FPTree,everyone node have a CPFTree that can combination frenquent item + * @param line dataset line + * @param minCount The minimum degree of support + * @return fim + */ + def FPTree(line: (String, Iterable[Array[String]]), minCount: Double): Array[(String, Int)] = { + // frequently item + val key = line._1 + // the set of construction CPFTree + val value = line._2 + + val _lineBuffer = collection.mutable.ArrayBuffer[(String, Int)]() + val map = scala.collection.mutable.Map[String, Int]() + // tree step + var k = 1 + // loop the data set while k>0 + while (k > 0) { + map.clear() + + // loop data set + for (it <- value) { + if (it.length >= k) { + // from m get n combinations,using scala method + val lineCom = it.toList.combinations(k).toList + + // add key to combination + for (item <- lineCom) { + // sort array + val list2key: List[String] = (item :+ key) + .sortWith(_ > _) + + val s = list2key.mkString(" ") + + if (map.get(s) == None) { + map(s) = 1 + } + else { + map(s) = map.apply(s) + 1 + } + } + } + } + + var line: Array[(String, Int)] = null + + if (map.size != 0) { + // get fim set + val lineTemp = map.filter(_._2 >= minCount) + + if (lineTemp.size != 0) { + line = lineTemp.toArray.array + _lineBuffer ++= line + } + + } + + // reset k value + if ((line == null) || (line.length == 0)) { + k = 0 + } + else { + k = k + 1 + } + + } + + return _lineBuffer.toArray + + } + +} + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fim/AprioriSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fim/AprioriSuite.scala index 1398d904f4501..e08e6b6ffb41a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fim/AprioriSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fim/AprioriSuite.scala @@ -1,20 +1,19 @@ /* - * Licensed until the Apache Software Foundation (ASF) under one or more - * contribuuntilr license agreements. See the NOTICE file distributed with + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. - * The ASF licenses this file until You under the Apache License, Version 2.0 + * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed until in writing, software + * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.mllib.fim import org.apache.spark.SparkContext @@ -24,94 +23,75 @@ import org.scalatest.FunSuite /** * scala test unit - * using Practical Machine Learning Book data set to test apriori algorithm + * using Practical Machine Learning Book data test the apriori algorithm result by minSupport from 0.9 to 0.1 */ class AprioriSuite extends FunSuite with LocalSparkContext { - test("test FIM with Apriori dataset 1") + test("test FIM with AprioriByBroadcast dataset 1") { + val arr = AprioriSuite.createFIMDataSet1() + val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= AprioriByBroadcast.apriori - // input data set - val input = Array[String]( - "1 3 4", - "2 3 5", - "1 2 3 5", - "2 5") + val dataSet = sc.parallelize(arr) + val rdd = dataSet.map(line => line.split(" ")) - // correct FIS answers - val answer1 = Array((Set("4")), (Set("5")), (Set("2")), (Set("3")), (Set("1")), (Set("4", "1")), (Set("5", "2")), (Set("3", "1")), (Set("5", "3")), (Set("2", "3")), (Set("2", "1")), (Set("5", "1")), (Set("4", "3")), (Set("5", "2", "3")), (Set("3", "1", "5")), (Set("3", "1", "2")), (Set("4", "1", "3")), (Set("5", "2", "1")), (Set("5", "2", "3", "1"))) - val answer2 = Array((Set("4")), (Set("5")), (Set("2")), (Set("3")), (Set("1")), (Set("4", "1")), (Set("5", "2")), (Set("3", "1")), (Set("5", "3")), (Set("2", "3")), (Set("2", "1")), (Set("5", "1")), (Set("4", "3")), (Set("5", "2", "3")), (Set("3", "1", "5")), (Set("3", "1", "2")), (Set("4", "1", "3")), (Set("5", "2", "1")), (Set("5", "2", "3", "1"))) - val answer3 = Array((Set("5")), (Set("2")), (Set("3")), (Set("1")), (Set("5", "2")), (Set("3", "1")), (Set("5", "3")), (Set("2", "3")), (Set("5", "2", "3"))) - val answer4 = Array((Set("5")), (Set("2")), (Set("3")), (Set("1")), (Set("5", "2")), (Set("3", "1")), (Set("5", "3")), (Set("2", "3")), (Set("5", "2", "3"))) - val answer5 = Array((Set("5")), (Set("2")), (Set("3")), (Set("1")), (Set("5", "2")), (Set("3", "1")), (Set("5", "3")), (Set("2", "3")), (Set("5", "2", "3"))) - val answer6 = Array((Set("5")), (Set("2")), (Set("3")), (Set("5", "2"))) - val answer7 = Array((Set("5")), (Set("2")), (Set("3")), (Set("5", "2"))) - val answer8 = Array() - val answer9 = Array() + for (i <- 1 to 9){ + println(s"frequent item set with support ${i/10d}") + target(rdd, i/10d, sc).foreach(x => print("(" + x._1 + "), ")) + println() + } + } - val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= Apriori.apriori + test("test FIM with AprioriByBroadcast dataset 2") + { + val arr = AprioriSuite.createFIMDataSet2() + val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= AprioriByBroadcast.apriori - val dataSet = sc.parallelize(input) + val dataSet = sc.parallelize(arr) val rdd = dataSet.map(line => line.split(" ")) - val result9 = target(rdd, 0.9, sc) - assert(result9.length == answer9.length) + assert(target(rdd,0.9,sc).length == 0) - val result8 = target(rdd, 0.8, sc) - assert(result8.length == answer8.length) + assert(target(rdd,0.8,sc).length == 1) - val result7 = target(rdd, 0.7, sc) - assert(result7.length == answer7.length) - for (i <- 0 until result7.length){ - assert(answer7(i).equals(result7(i)._1)) - } + assert(target(rdd,0.7,sc).length == 1) + + assert(target(rdd,0.6,sc).length == 2) + + assert(target(rdd,0.5,sc).length == 18) + + assert(target(rdd,0.4,sc).length == 18) + + assert(target(rdd,0.3,sc).length == 54) + + assert(target(rdd,0.2,sc).length == 54) + + assert(target(rdd,0.1,sc).length == 625) - val result6 = target(rdd, 0.6, sc) - assert(result6.length == answer6.length) - for (i <- 0 until result6.length) - assert(answer6(i).equals(result6(i)._1)) - - val result5 = target(rdd, 0.5, sc) - assert(result5.length == answer5.length) - for (i <- 0 until result5.length) - assert(answer5(i).equals(result5(i)._1)) - - val result4 = target(rdd, 0.4, sc) - assert(result4.length == answer4.length) - for (i <- 0 until result4.length) - assert(answer4(i).equals(result4(i)._1)) - - val result3 = target(rdd, 0.3, sc) - assert(result3.length == answer3.length) - for (i <- 0 until result3.length) - assert(answer3(i).equals(result3(i)._1)) - - val result2 = target(rdd, 0.2, sc) - assert(result2.length == answer2.length) - for (i <- 0 until result2.length) - assert(answer2(i).equals(result2(i)._1)) - - val result1 = target(rdd, 0.1, sc) - assert(result1.length == answer1.length) - for (i <- 0 until result1.length) - assert(answer1(i).equals(result1(i)._1)) } - test("test FIM with Apriori dataset 2") + test("test FIM with AprioriByCartesian dataset 1") { + val arr = AprioriSuite.createFIMDataSet1() + val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= AprioriByCartesian.apriori - // input data set - val input = Array[String]( - "r z h j p", - "z y x w v u t s", - "z", - "r x n o s", - "y r x z q t p", - "y z x e q s t m") + val dataSet = sc.parallelize(arr) + val rdd = dataSet.map(line => line.split(" ")) + + for (i <- 1 to 9){ + println(s"frequent item set with support ${i/10d}") + target(rdd, i/10d, sc).foreach(x => print("(" + x._1 + "), ")) + println() + } - val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= Apriori.apriori + + } + test("test FIM with AprioriByCartesian dataset 2") + { + val arr = AprioriSuite.createFIMDataSet2() + val target: (RDD[Array[String]], Double, SparkContext) => Array[(Set[String], Int)]= AprioriByCartesian.apriori - val dataSet = sc.parallelize(input) + val dataSet = sc.parallelize(arr) val rdd = dataSet.map(line => line.split(" ")) assert(target(rdd,0.9,sc).length == 0) @@ -135,3 +115,34 @@ class AprioriSuite extends FunSuite with LocalSparkContext { } } + +/** + * create dataset + */ +object AprioriSuite +{ + /** + * create dataset using Practical Machine Learning Book data + * @return dataset + */ + def createFIMDataSet1():Array[String] = { + val arr = Array[String]( + "1 3 4", + "2 3 5", + "1 2 3 5", + "2 5") + arr + } + + def createFIMDataSet2():Array[String] = { + val arr = Array[String]( + "r z h j p", + "z y x w v u t s", + "z", + "r x n o s", + "y r x z q t p", + "y z x e q s t m") + arr + } +} + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fim/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fim/FPGrowthSuite.scala new file mode 100644 index 0000000000000..5156babcd1975 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/fim/FPGrowthSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.fim + +import org.scalatest.FunSuite +import org.apache.spark.mllib.util.LocalSparkContext + +/** + * scala test unit + * using Practical Machine Learning Book data test the FPGrowth algorithm result by minSupport from 0.9 to 0.1 + */ +class FPGrowthSuite extends FunSuite with LocalSparkContext { + + test("test FIM with FPGrowth") + { + val arr = AprioriSuite.createFIMDataSet2() + + assert(arr.length === 6) + val dataSet = sc.parallelize(arr) + assert(dataSet.count() == 6) + val rdd = dataSet.map(line => line.split(" ")) + assert(rdd.count() == 6) + + //check frenquent item set length + val fimWithFPGrowth = new FPGrowth() + println(fimWithFPGrowth.FPGrowth(rdd,0.1,sc).length) + + assert(fimWithFPGrowth.FPGrowth(rdd,0.9,sc).length == 0) + + assert(fimWithFPGrowth.FPGrowth(rdd,0.8,sc).length == 1) + + assert(fimWithFPGrowth.FPGrowth(rdd,0.7,sc).length == 1) + + assert(fimWithFPGrowth.FPGrowth(rdd,0.6,sc).length == 2) + + assert(fimWithFPGrowth.FPGrowth(rdd,0.5,sc).length == 18) + + assert(fimWithFPGrowth.FPGrowth(rdd,0.4,sc).length == 18) + + assert(fimWithFPGrowth.FPGrowth(rdd,0.3,sc).length == 54) + + assert(fimWithFPGrowth.FPGrowth(rdd,0.2,sc).length == 54) + + assert(fimWithFPGrowth.FPGrowth(rdd,0.1,sc).length == 625) + + } +} + +/** + * create dataset + */ +object FPGrowthSuite +{ + /** + * create dataset using Practical Machine Learning Book data + * @return dataset + */ + def createFIMDataSet():Array[String] = + { + val arr = Array[String]("r z h j p", + "z y x w v u t s", + "z", + "r x n o s", + "y r x z q t p", + "y z x e q s t m") + return arr + } +}