Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tighten up field/method visibility in Executor and made some code more clear to read. #4850

Closed
wants to merge 2 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Mar 2, 2015

I was reading Executor just now and found that some latest changes introduced some weird code path with too much monadic chaining and unnecessary fields. I cleaned it up a bit, and also tightened up the visibility of various fields/methods. Also added some inline documentation to help understand this code better.

@SparkQA
Copy link

SparkQA commented Mar 2, 2015

Test build #28166 has started for PR 4850 at commit a5e8ffd.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 2, 2015

Test build #28166 has finished for PR 4850 at commit a5e8ffd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason
    • class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28166/
Test PASSed.

def startDriverHeartbeater() {
val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
/** Reports heartbeat and metrics for active tasks to the driver. */
private def reportHeartBeat(): Unit = {
val timeout = AkkaUtils.lookupTimeout(conf)
val retryAttempts = AkkaUtils.numRetries(conf)
val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is making it so that now we read these properties and remake the Akka ref on each heartbeat on purpose? I.e. is actor resolution guaranteed to be cheap after the first time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah they are better to be taken out of this function. going to put it in the class itself as fields. once we have our own rpc interface, we can delete this.

@SparkQA
Copy link

SparkQA commented Mar 2, 2015

Test build #28195 has started for PR 4850 at commit dcba289.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 3, 2015

Test build #28195 has finished for PR 4850 at commit dcba289.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason
    • class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source
    • class MatrixFactorizationModel(JavaModelWrapper, Saveable, JavaLoader):
    • class Saveable(object):
    • class Loader(object):
    • class JavaLoader(Loader):
    • java_class = ".".join([java_package, cls.__name__])

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28195/
Test PASSed.

metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)

if (isLocal) {
// JobProgressListener will hold an reference of it during
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be worth fixing the grammar in this comment.

@sryza
Copy link
Contributor

sryza commented Mar 5, 2015

I had a few minor nits. Otherwise, this LGTM.

@SparkQA
Copy link

SparkQA commented Mar 19, 2015

Test build #28892 has started for PR 4850 at commit 866fc60.

  • This patch merges cleanly.

@@ -22,14 +22,12 @@ import org.apache.spark.{TaskCommitDenied, TaskEndReason}
/**
* Exception thrown when a task attempts to commit output to HDFS but is denied by the driver.
*/
class CommitDeniedException(
private[spark] class CommitDeniedException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this was inadvertently public before, and thus was public in Spark 1.3, I think that this change will cause a MiMa failure once we bump the version to 1.4.0-SNAPSHOT. Therefore, this PR sort of implicitly conflicts with #5056, so we'll have to make sure to re-test whichever PR we merge second.

@JoshRosen
Copy link
Contributor

LGTM.

@SparkQA
Copy link

SparkQA commented Mar 19, 2015

Test build #28892 has finished for PR 4850 at commit 866fc60.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason
    • class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28892/
Test PASSed.

@asfgit asfgit closed this in 0745a30 Mar 20, 2015
@rxin rxin deleted the executor branch March 20, 2015 02:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants