Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6304][Streaming] Fix checkpointing doesn't retain driver port issue. #5060

Closed
wants to merge 6 commits into from

Conversation

jerryshao
Copy link
Contributor

No description provided.

@SparkQA
Copy link

SparkQA commented Mar 17, 2015

Test build #28698 has finished for PR 5060 at commit 2398ae1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor Author

Hi @tdas , I think this is actually a bug, would you please help to review this one?

@jerryshao
Copy link
Contributor Author

Since it is pending for a long time.

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30926 has finished for PR 5060 at commit 5713c20.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@@ -94,6 +94,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

// This is used for Spark Streaming to check whether driver host and port are set by user,
// if these two configurations are set by user, so the recovery mechanism should not remove this.
private[spark] val isDriverHostSetByUser = config.contains("spark.driver.host")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem worth tacking on yet more little fields in SparkContext just for a niche use case in a submodule. Use the config object in Checkpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it is a good idea to clutter SparkContext further with such functions, especially when Spark core itself does not use it. Would be good to find a different solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think there has to be a place in Spark Core to judge whether this configuration is set by user or Spark itself before SparkContext is initialized, either in SparkConf or somewhere else. It cannot be gotten from Spark Streaming, where all the SparkContext things have already been initialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to track this in Checkpoint, since SparkEnv will reset this configuration if user not set it, so in the Checkpoint how to differentiate whether this is set by user or SparkEnv?

@tdas
Copy link
Contributor

tdas commented Apr 24, 2015

I thought more about this particular issue. These two lines in the SparkContext actually sets the two parameters explicitly with host = local hostname and port = 0, ONLY IF they are not set from the user-provided conf. For each of them here is my observation.

  1. spark.driver.host = This should never be recovered through checkpoints. Because if the driver is restarted on a different machine, and the old value of spark.driver.host (that is, the old hostname) is recovered from checkpoint , then it will be set incorrectly as the recreated SparkContext will not set spark.driver.host to the new hostname. So we should never recover the host name, and always let the new SparkContext set a new hostname.
  2. spark.driver.port = The sparkcontext sets the value to 0 if not specified (see). So if we always save the value of sparkContext.conf (that is, after 0 is set in SparkContext initialization) in checkpoint, then it suffices the purpose - if set explicitly by user, then that port is used. If not set explicitly by user, then 0 stays, in which case it will be random even after restart.

I think this simplifies the whole idea - never save host, always save port. Isnt it?

@jerryshao
Copy link
Contributor Author

Hi @tdas , I agree with you that only port in enough, no need to track the host. But we could only save the port when user explicitly set it. Port number 0 will not be stayed after SparkEnv is initialized, as you see here, so we shouldn't save this port number (which is set by SparkEnv).

@SparkQA
Copy link

SparkQA commented Apr 27, 2015

Test build #31052 has started for PR 5060 at commit 5713c20.

@tdas
Copy link
Contributor

tdas commented Apr 30, 2015

I see, that's the problem. Its not clear from the streaming code what the value that the user had set before the SparkEnv set the port value. Without that this problem cannot be solved.

@andrewor14
Copy link
Contributor

bump. What is the status on this PR @jerryshao @tdas?

@andrewor14
Copy link
Contributor

retest this please

@jerryshao
Copy link
Contributor Author

Hi @andrewor14 , I think this is indeed a bug for checkpointing in Spark Streaming, my implementation just add two fields in SparkContext to save the snapshot of this two configurations, but this is no so elegant as TD suggested, Currently I cannot figure out any other better place to hold this port number. So any suggestion is greatly appreciated.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35191 has finished for PR 5060 at commit 5713c20.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

The tricky part for this PR is to figure out when the port was specified by
the user (recover that conf from checkpoint it in that case), and when it
was NOT specified (dont recover that conf)

On Thu, Jun 18, 2015 at 6:39 PM, UCB AMPLab notifications@github.com
wrote:

Merged build finished. Test PASSed.


Reply to this email directly or view it on GitHub
#5060 (comment).

@tdas
Copy link
Contributor

tdas commented Jul 14, 2015

@jerryshao let finalize this PR. I think what we should do is.

Currently, we remove spark.driver.* from conf used to create the recovered streaming context, ignoring the fact that the user may be explicitly setting those conf in spark-defaults.conf. That is wrong, the general policy should be never recover from spark.driver.* from checkpointed conf. Then if the those properties are set in the defaults, they would be present in the final conf for restarting context, other they wont be.

This solves the original problem in the JIRA. If someone wants to set the port explicitly, then they can set if in the spark-defaults.conf. With the above change, it will not be explicitly deleted when recovering and will be automatically used in the recovered context.

Sounds good? If so, please update the PR.

@jerryshao
Copy link
Contributor Author

Yeah, that sounds good :), let me update the code.

@@ -50,7 +50,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val propertiesToReload = List(
"spark.master",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you keep these alphabetically sorted. Looks cleaner.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37341 has finished for PR 5060 at commit 7cc146d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val newCpConf = newCp.createSparkConf()
assert(newCpConf.contains("spark.driver.host"))
assert(newCpConf.contains("spark.driver.port"))
assert(newCpConf.get("spark.driver.host") === "localhost")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just tests whether its correctly set in the new conf when it is set as system property. you should also test the other case, where the new conf does not have them when they are not in the property, even though it was present in the original conf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will update the test

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37352 has finished for PR 5060 at commit 275d252.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LDAModel(JavaModelWrapper):
    • class LDA(object):
    • case class Round(child: Expression, scale: Expression)

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37446 has finished for PR 5060 at commit 89b01f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LDAModel(JavaModelWrapper):
    • class LDA(object):
    • trait ImplicitCastInputTypes extends ExpectsInputTypes
    • abstract class BinaryOperator extends BinaryExpression with ExpectsInputTypes
    • case class UnaryMinus(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class UnaryPositive(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class Abs(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class BitwiseNot(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class Factorial(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Hex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Unhex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Round(child: Expression, scale: Expression)
    • case class Md5(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Sha1(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Not(child: Expression)
    • case class And(left: Expression, right: Expression) extends BinaryOperator with Predicate
    • case class Or(left: Expression, right: Expression) extends BinaryOperator with Predicate
    • trait StringRegexExpression extends ImplicitCastInputTypes
    • trait String2StringExpression extends ImplicitCastInputTypes
    • trait StringComparison extends ImplicitCastInputTypes
    • case class StringSpace(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class StringLength(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Ascii(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Base64(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class UnBase64(child: Expression) extends UnaryExpression with ImplicitCastInputTypes

@tdas
Copy link
Contributor

tdas commented Jul 16, 2015

All right merging this.

@asfgit asfgit closed this in 031d7d4 Jul 16, 2015
@mariussoutier
Copy link

👍 Thanks guys! (I was the original JIRA author.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants