Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4001][MLlib] adding parallel FP-Growth algorithm for frequent pattern mining in MLlib #2847

Closed
wants to merge 12 commits into from

Conversation

jackylk
Copy link
Contributor

@jackylk jackylk commented Oct 19, 2014

Apriori is the classic algorithm for frequent item set mining in a transactional data set. It will be useful if Apriori algorithm is added to MLLib in Spark. This PR add an implementation for it.
There is a point I am not sure wether it is most efficient. In order to filter out the eligible frequent item set, currently I am using a cartesian operation on two RDDs to calculate the degree of support of each item set, not sure wether it is better to use broadcast variable to achieve the same.

I will add an example to use this algorithm if requires

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

1 similar comment
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mengxr
Copy link
Contributor

mengxr commented Nov 7, 2014

Had an offline discussion with @jackylk . We plan to implement a more scalable version of Apriori, as described in PFP: Parallel FP-Growth for Query Recommendation (http://dl.acm.org/citation.cfm?id=1454027)

@varadharajan
Copy link
Contributor

As mentioned in one of comments of SPARK-2432. I was wondering how the PFP version compares with YAFIM (http://pasa-bigdata.nju.edu.cn/people/ronggu/pub/YAFIM_ParLearning.pdf). Probably i will do a bit more reading on this.

@jackylk jackylk changed the title [SPARK-4001][MLlib] adding apriori algorithm for frequent item set mining in Spark [SPARK-4001][MLlib] adding apriori algorithm for frequent item set mining in Spark (WIP) Nov 26, 2014
@jackylk jackylk changed the title [SPARK-4001][MLlib] adding apriori algorithm for frequent item set mining in Spark (WIP) [SPARK-4001][MLlib] adding apriori and fp-growth algorithm for frequent itemset mining in Spark (WIP) Nov 26, 2014
@denmoroz
Copy link

denmoroz commented Dec 8, 2014

Maybe it is better to use RDD[BitSet] as transactions RDD? Then you can add a preprocessor trait and make any transformations for source RDD to RDD of BitSets. For example, transformation of RDD[Array[String]] to RDD[BitSet].
It seems to me, that BitSet is the much better idea of transactions representation then Array[String] or Array[Int] or anything else.

Or even better idea is to make Transaction entity, which will contain it's BitSet representation and all necessary convinient methods. And then anyone could make a preprocessor of RDD[...Any Type...] to RDD[Transaction].

@erikerlandson
Copy link
Contributor

As long as itemset mining is under consideration, has anybody tried a Spark implementation of "Logical Itemset Mining":
http://cvit.iiit.ac.in/papers/Chandrashekar2012Logical.pdf

@denmoroz
Copy link

denmoroz commented Dec 8, 2014

Dou you use SON algorithm for Apriori parallel implementation?
(http://importantfish.com/limited-pass-algorithms/)

@mengxr
Copy link
Contributor

mengxr commented Jan 15, 2015

Had an offline discussion with @jackylk and here is the summary:

  1. Keep only the parallel FP-Growth implementation, because it is generally more efficient than Apriori, especially on medium/large datasets. @jackylk can share some performance testing results.
  2. Rename the package "fim" (frequent itemset mining) to "fpm" (frequent pattern mining). There is no standard acronym this family of mining algorithms. Frequent patten mining is a broader term than frequent itemset mining. This package name is also used in Mahout.
  3. Include links to the original FP-Growth paper and the PFP paper in the doc.
  4. Have FPGrowth take minSupport at a parameter and implement run(RDD...): FPGrowthModel, where FPGrowthModel holds an RDD of frequent itemsets and counts.
  5. Hide methods used internally.
  6. Update code style: a) remove extra empty lines; b) fix indentation; c) change variable names; d) line width; etc.
  7. Check whether we can use generic type for items (for Java API).

@mengxr
Copy link
Contributor

mengxr commented Jan 15, 2015

add to whitelist

@SparkQA
Copy link

SparkQA commented Jan 15, 2015

Test build #25596 has started for PR 2847 at commit 7b77ad7.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 15, 2015

Test build #25596 has finished for PR 2847 at commit 7b77ad7.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25596/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 19, 2015

Test build #25742 has started for PR 2847 at commit eb3e4ca.

  • This patch merges cleanly.

@jackylk
Copy link
Contributor Author

jackylk commented Jan 19, 2015

Yes, I have tested the parallel FP-Growth algorithm using a open data set from http://fimi.ua.ac.be/data/, performance test result can be found at https://issues.apache.org/jira/browse/SPARK-4001

All modification is done except for the 7th (generic type), please review the code for now.
I am still considering whether it is worthy to implement generic type since it adds more complexity to the code

@SparkQA
Copy link

SparkQA commented Jan 19, 2015

Test build #25742 has finished for PR 2847 at commit eb3e4ca.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25742/
Test FAILed.

@jackylk jackylk changed the title [SPARK-4001][MLlib] adding apriori and fp-growth algorithm for frequent itemset mining in Spark (WIP) [SPARK-4001][MLlib] adding parallel FP-Growth algorithm for frequent pattern mining in MLlib Jan 19, 2015
@jackylk
Copy link
Contributor Author

jackylk commented Jan 19, 2015

Please test again

@SparkQA
Copy link

SparkQA commented Jan 19, 2015

Test build #25752 has started for PR 2847 at commit d110ab2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 19, 2015

Test build #25752 has finished for PR 2847 at commit d110ab2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

// Sort it and create the item combinations
val sortedItems = items.sortWith(_._1 > _._1).sortWith(_._2 > _._2).toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sorting twice? The second will overwrite the first. Besides, using sortBy(-_._2) would be better.

@mengxr
Copy link
Contributor

mengxr commented Jan 21, 2015

@jackylk I made a brief scan of the implementation. Besides inline comments, I have some high-level suggestions:

  1. It would be good if we can make the code easier for someone with the paper in hand. I don't think the notation is good in the PFP paper. But when we create a variable, we should put a comment mentioning the corresponding variable name in the paper.
  2. Maybe for reason 1), I cannot find an exact match between the implementation and the algorithm described in the paper. It seems that you implemented Figure 1 of the paper but it is not the PFP algorithm. Could you double check?
  3. The tree building code needs some unit tests. So it is easy to convince reviewers that the implementation is correct. And if using trees can compress the data, we should use aggregateByKey instead of groupByKey.

@zhangyouhua2014
Copy link

@mengxr . I am working with Jacky together to develop and test this algorithm. I answered this question:
We refer to the PFP paper, but reduces the process of building the tree, omit this process it can use this time to do other things. Specific steps are as follows:
1, the transaction database DB is distributed to more than one worker nodes, after two scans transaction database, get conditional pattern sequences.
   1.1, the first scan DB, get a frequent itemsets L1. For example: (a, 6), (b, 5), (c, 3)
   1.2, according to 1.1) was L1 scanning DB again, to filter out non-frequent item, get conditional pattern sequence conditionSEQ. For example: (c, (a, b)), (b, (a)),
   After two scans DB get conditionSEQ, conditionSEQ DB is much smaller than the amount of information.
2, reduce operations performed using groupByKey operator will conditionSEQ on a machine of the same key into the presence of the same key conditionSEQ worker set on each machine after the merger. The following is based conditionSEQ to mining frequent item sets.
3, on each worker, using a priori principle of collective operations conditionSEQ find frequent item sets.
4. Finally, the use of operators collect aggregate results.
  DB algorithm change will spread across multiple worker nodes only need to scan twice to obtain the conditions set pattern sequence conditionSEQ small amount of information in the collection; frequent item set mining is onditionSEQ processed only once reduce, network interaction is small, so fast.

@mengxr
Copy link
Contributor

mengxr commented Jan 26, 2015

@zhangyouhua2014

We refer to the PFP paper, but reduces the process of building the tree, omit this process it can use > this time to do other things.

By "reduce", did you mean skipping the process of growing trees? The FP-Growth algorithm reduces memory requirement using the tree representation of candidate sets. If we skip this step, it is hard to call it FPGrowth. Did you do any performance comparison between your version and the PFP implementation?

2, reduce operations performed using groupByKey operator will conditionSEQ on a machine of the same key into the presence of the same key conditionSEQ worker set on each machine after the merger. The following is based conditionSEQ to mining frequent item sets.

It is important to grow the tree on the mapper side to save communication cost. groupByKey doesn't do that. I was suggesting using aggregateByKey. For each key, we start with an empty tree, with seqOp growing the tree and combOp merging two trees. Besides, the partition key is the hash value of the last item of the sequence. We should be able to reduce communication cost (see my inline comments at L135).

@zhangyouhua2014
Copy link

@mengxr
1 I mean I use step 1(that Equivalent to create FPTree and condition FPTree ) we have reduce data size and create condition FPTree(only include frequently item not transition data), when using condition FPTree mining frequently item set,it is have a small candidate set.
2 I have test it and compared mahout pfp,it is a good performance that about 10 time.
3 afer use groupByKey,ming frequently item set in each node that include Specified key,so it is not network communication overhead.
4 is there have aggregateByKey operator in new spark version?

@mengxr
Copy link
Contributor

mengxr commented Jan 26, 2015

1 I mean I use step 1(that Equivalent to create FPTree and condition FPTree ) we have reduce data size and create condition FPTree(only include frequently item not transition data), when using condition FPTree mining frequently item set,it is have a small candidate set.

The advantage of FP-Growth over Apriori is the tree structure to present candidate set. Both algorithms are taking advantage on the fact that the candidate set is small. I'm asking whether the current implementation uses the tree structure to save communication.

2 I have test it and compared mahout pfp,it is a good performance that about 10 time.

I'm not surprised by the 10x speed-up. It is not equivalent to say the current implementation is correct and high-performance. I believe that we can be much faster.

3 afer use groupByKey,ming frequently item set in each node that include Specified key,so it is not network communication overhead.

groupByKey collects everything to reducers. aggregateByKey does part of the aggregation on mappers. There is definitely space for improvement.

4 is there have aggregateByKey operator in new spark version?

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

@mengxr
Copy link
Contributor

mengxr commented Jan 28, 2015

Had an offline discussion with @jackylk and @zhangyouhua2014 . We plan to add a utility class named FPTree with the following (the exact names are TBD):

def add(transaction: Array[String]): this.type

def merge(tree: FPTree): this.type

def extract(threshold: Int, validateSuffix: String => Boolean): Iterator[Array[String]]

and the use aggregateByKey to grow trees in parallel.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26406 has started for PR 2847 at commit 93f3280.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26406 has finished for PR 2847 at commit 93f3280.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FPGrowthModel (val frequentPattern: Array[(Array[String], Long)]) extends Serializable
    • class FPTree extends Serializable
    • class FPTreeNode(val item: String, var count: Int) extends Serializable

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26406/
Test FAILed.

@jackylk
Copy link
Contributor Author

jackylk commented Jan 30, 2015

@mengxr
I have modified according to the comments, please review

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26407 has started for PR 2847 at commit ec21f7d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26407 has finished for PR 2847 at commit ec21f7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FPGrowthModel (val frequentPattern: Array[(Array[String], Long)]) extends Serializable
    • class FPTree extends Serializable
    • class FPTreeNode(val item: String, var count: Int) extends Serializable

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26407/
Test PASSed.

@mengxr
Copy link
Contributor

mengxr commented Jan 30, 2015

@jackylk Thanks for the update! Did you see any performance improvement on your dataset with aggregateByKey? I'm quite interested in how much shuffle we can save (hopefully) on a real dataset.

@jackylk
Copy link
Contributor Author

jackylk commented Jan 31, 2015

I have not tested performance yet. I will test it at weekend

@SparkQA
Copy link

SparkQA commented Feb 2, 2015

Test build #26486 has started for PR 2847 at commit bee3093.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 2, 2015

Test build #26486 has finished for PR 2847 at commit bee3093.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FPGrowthModel(val freqItemsets: RDD[(Array[String], Long)]) extends Serializable
    • class Node[T](val parent: Node[T]) extends Serializable

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26486/
Test FAILed.

@mengxr
Copy link
Contributor

mengxr commented Feb 2, 2015

LGTM. Merged into master. Thanks!! (The failed test is a known flakey test. All relevant tests passed.)

@asfgit asfgit closed this in 859f724 Feb 2, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants