Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22357][CORE] SparkContext.binaryFiles ignore minPartitions parameter #21638

Closed
wants to merge 4 commits into from

Conversation

bomeng
Copy link
Contributor

@bomeng bomeng commented Jun 25, 2018

What changes were proposed in this pull request?

Fix the issue that minPartitions was not used in the method. This is a simple fix and I am not trying to make it complicated. The purpose is to still allow user to control the defaultParallelism through the value of minPartitions, while also via sc.defaultParallelism parameters.

How was this patch tested?

I have not provided the additional test since the fix is very straightforward.

@SparkQA
Copy link

SparkQA commented Jun 26, 2018

Test build #92309 has finished for PR 21638 at commit 0fc35d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -45,7 +45,8 @@ private[spark] abstract class StreamFileInputFormat[T]
* which is set through setMaxSplitSize
*/
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val defaultMaxSplitBytes = Math.max(
sc.getConf.get(config.FILES_MAX_PARTITION_BYTES), minPartitions)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, shouldn't minPartitions be used like this?

val defaultParallelism = Math.max(sc.defaultParallelism, if (minPartitions == 0) 1 else minPartitions)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe the use case when you need to take into account minPartitions. By default, FILES_MAX_PARTITION_BYTES is 128MB. Let's say it is even set to 1000, and minPartitions equals to 10 000. What is the reason to set the max size of splits in bytes to the min number of partition. Why should bigger number of partitions require bigger split size? Could you add more details to the PR description, please.

@MaxGekk
Copy link
Member

MaxGekk commented Jun 26, 2018

It seems there is similar code there:

val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
. Should it be changed in the same way?

@HyukjinKwon
Copy link
Member

Not sure yet but let's leave that out of this PR.

val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it makes much more sense.

@SparkQA
Copy link

SparkQA commented Jun 26, 2018

Test build #92350 has finished for PR 21638 at commit c24fbe5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bomeng
Copy link
Contributor Author

bomeng commented Jun 27, 2018

@HyukjinKwon please review. thanks.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93067 has finished for PR 21638 at commit c24fbe5.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -47,7 +47,7 @@ private[spark] abstract class StreamFileInputFormat[T]
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sc.defaultParallelism < 2, and minParititions is not set in BinaryFileRDD, then previously defaultParallelism shall be the same as sc.defaultParallelism, and after the change it will be 2. Have you already consider this case and feel it's right behavior change to make?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to pass in the minPartitions to use this method, what do you mean minParititions is not set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I metioned BinaryFileRDD not this method, you can check the code to see how it handles the default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BinaryFileRDD will set minPartitions, which will either be defaultMinPartitions, or the values you can set via binaryFiles(path, minPartitions) method. Eventually, this minPartitions value will be passed to setMinPartitions() method.

@srowen
Copy link
Member

srowen commented Jul 18, 2018

Because this method is internal to Spark, why not just take out the parameter? Yes it's superfluous now, but it's been this way for a while, and seems perhaps better to avoid a behavior change. In fact you can pull a minPartitions parameter out of several private methods then. You can't remove the parameter to binaryFiles, sure, but it can be documented as doing nothing.

@bomeng
Copy link
Contributor Author

bomeng commented Jul 18, 2018

Either way works for me, but I think since this is not a private method, so people may use it in their own approach. The minimal change will be the best.

@srowen
Copy link
Member

srowen commented Jul 18, 2018

Except for binaryFiles, everything else that needs to change is private to Spark. I know it's public in the bytecode, but only Java callers could accidentally exploit that. Still I don't personally care too much either way, as long as all the unused args are documented, I guess, for completeness.

@HyukjinKwon
Copy link
Member

Yea, it's internal to Spark. Might be good to keep it but that concern should be secondary IMHO.

@SparkQA
Copy link

SparkQA commented Aug 26, 2018

Test build #4290 has finished for PR 21638 at commit c24fbe5.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 27, 2018

Test build #95295 has finished for PR 21638 at commit 5e46efb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Aug 29, 2018

Merged to master

@asfgit asfgit closed this in bbbf814 Aug 29, 2018
@@ -47,7 +47,7 @@ private[spark] abstract class StreamFileInputFormat[T]
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a test case; otherwise, we could hit the same issue again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, it is easy to add such a test case. We can even test the behaviors of the boundary cases. cc @srowen @HyukjinKwon @MaxGekk @jiangxb1987

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's hard to test, technically, because setMinPartitions is only a hint. In the case of binaryFiles we know it will put a hard limit on the number of partitions, but it isn't true of other implementations. We can still make a simple test for all of these, it just may be asserting behavior that could change in the future in Hadoop, though I strongly doubt it would.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is hard to test. I appreciate If anyone can give me some hints of how to do these (how to verify and where to put my test cases).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind following up with a test that just asserts that asking for, say, 20 partitions results in 20 partitions? This is technically too specific as a test, but is probably fine for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the codes, you can see the calculation is just the intermediate result and this method won't return any value. Checking the split size does not make sense for this test case because it depends on multiple variables and this is just one of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
        .set(config.FILES_OPEN_COST_IN_BYTES.key, "0")
        .set("spark.default.parallelism", "1"))

      println(sc.binaryFiles(dirpath1, minPartitions = 50).getNumPartitions)
      println(sc.binaryFiles(dirpath1, minPartitions = 1).getNumPartitions)

It is not hard to verify whether the parameter minPartitions takes an effect. Currently, the description of this parameter is not clear. We need to document it clear which factors impact the actual number of partitions; otherwise, users will not understand how to use it.

@gatorsmile
Copy link
Member

@bomeng Could you submit a follow-up PR to add a test case?

@HyukjinKwon
Copy link
Member

Yea, let's add a regression test.

@bomeng
Copy link
Contributor Author

bomeng commented Sep 5, 2018

Here is the test code, not sure it is right or not ---

  test("Number of partitions") {
    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
      .set("spark.files.maxPartitionBytes", "10")
      .set("spark.files.openCostInBytes", "0")
      .set("spark.default.parallelism", "1"))

    val dir1 = Utils.createTempDir()
    val dirpath1 = dir1.getAbsolutePath
    val dir2 = Utils.createTempDir()
    val dirpath2 = dir2.getAbsolutePath

    val file1 = new File(dir1, "part-00000")
    val file2 = new File(dir1, "part-00001")

    Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1,
      StandardCharsets.UTF_8)
    Files.write("someline1 in file2\nsomeline2 in file2\nsomeline3 in file2", file2,
      StandardCharsets.UTF_8)

    assert(sc.binaryFiles(dirpath1, minPartitions = 1).getNumPartitions == 2)
    assert(sc.binaryFiles(dirpath1, minPartitions = 2).getNumPartitions == 2)
    assert(sc.binaryFiles(dirpath1, minPartitions = 50).getNumPartitions == 2)
  }

@srowen
Copy link
Member

srowen commented Sep 5, 2018

Ideally the last test should have 50 partitions? is it because we really need the test data to be at least 50 bytes? ideally a multiple of 50, I guess.

asfgit pushed a commit that referenced this pull request Sep 7, 2018
…itions parameter

## What changes were proposed in this pull request?

This adds a test following #21638

## How was this patch tested?

Existing tests and new test.

Closes #22356 from srowen/SPARK-22357.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
(cherry picked from commit 4e3365b)
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants