Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2978. Transformation with MR shuffle semantics #2274

Closed
wants to merge 9 commits into from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented Sep 4, 2014

I didn't add this to the transformations list in the docs because it's kind of obscure, but would be happy to do so if others think it would be helpful.

@SparkQA
Copy link

SparkQA commented Sep 4, 2014

QA tests have started for PR 2274 at commit a75f277.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 4, 2014

QA tests have finished for PR 2274 at commit a75f277.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 4, 2014

QA tests have started for PR 2274 at commit a1ef807.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 4, 2014

QA tests have finished for PR 2274 at commit a1ef807.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class BlockManagerMaster(
    • class AttributeMap[A](baseMap: Map[ExprId, (Attribute, A)])

@@ -514,6 +514,30 @@ def __add__(self, other):
raise TypeError
return self.union(other)

def repartitionAndSortWithinPartition(self, ascending=True, numPartitions=None,
partitionFunc=portable_hash, keyfunc=lambda x: x):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about re-arrange the parameters to follow the function name? such as:

repartitionAndSortWithinPartition(self, numPartitions=None, partitionFunc=portable_hash, 
                                          ascending=True, keyfunc=lambda x: x)

@sryza
Copy link
Contributor Author

sryza commented Sep 4, 2014

Updated patch removes Python version, adds Java version, and adds some additional doc.

@mateiz
Copy link
Contributor

mateiz commented Sep 5, 2014

Just a nit, it should probably be called repartitionAndSortWithinPartition_s_.

Also, this name is pretty long. Another one I'd reconsider is repartitionWithSort, but not sure what other people think.

Finally I think it should be a policy to add all these APIs to Python, and implement them there too. Basically there are two options -- if you're doing this to support a slightly easier transition from MR jobs, but you don't want to do it in Python, you could just have it as a document, or an example, or maybe even a third-party package that takes a Hadoop JobConf and runs it on Spark. But if you want it in Spark, we need to put it in each language. The reason is to allow people to easily read code in one supported language and run it in others -- it's always disappointing when some operators turn out to be missing in yours.

@rxin
Copy link
Contributor

rxin commented Sep 5, 2014

@mateiz

The reason to add this is because this is a smaller API that we can support (both source and binary compatibility) in the long run before finalizing ShuffledRDD (since that one has been in flux and changing in multiple past releases). Perhaps we can mark this new API as DeveloperApi but commit to maintaining it. What do you think?

The naming is long, but I'm worried repartitionWithSort in a way implies the data are sorted globally.

* because it can push the sorting down into the shuffle machinery.
*/
def repartitionAndSortWithinPartition(partitioner: Partitioner)
: RDD[(K, V)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u can put this on the previous line ...

@mateiz
Copy link
Contributor

mateiz commented Sep 5, 2014

Ah, I see. Then we can add it, but in that case I'd also add it in Python.

@sryza
Copy link
Contributor Author

sryza commented Sep 5, 2014

Updated patch adds Python back in and adds the 's' at the end.

@rxin
Copy link
Contributor

rxin commented Sep 5, 2014

Thanks, Sandy. Can you add a unit test in Java to make sure the thing is callable from Java?


self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001))
self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are removed by accident during merging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, my bad

@SparkQA
Copy link

SparkQA commented Sep 8, 2014

QA tests have started for PR 2274 at commit c04b447.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 8, 2014

QA tests have finished for PR 2274 at commit c04b447.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2014

QA tests have started for PR 2274 at commit 4a5332a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 8, 2014

QA tests have finished for PR 2274 at commit 4a5332a.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Sep 8, 2014

LGTM, thanks.

@mateiz
Copy link
Contributor

mateiz commented Sep 8, 2014

Thanks Sandy! I've merged this.

@asfgit asfgit closed this in 16a73c2 Sep 8, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants