Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7139][Streaming] Allow received block metadata to be saved to WAL and recovered on driver failure #5732

Closed
wants to merge 8 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Apr 28, 2015

  • Enabled ReceivedBlockTracker WAL by default
  • Stored block metadata in the WAL
  • Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark

@tdas
Copy link
Contributor Author

tdas commented Apr 28, 2015

@harishreedharan @jerryshao Take a look.
@pwendell Take a look at the BlockRDD as it touches core.

@SparkQA
Copy link

SparkQA commented Apr 28, 2015

Test build #31106 has finished for PR 5732 at commit d06fa21.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 28, 2015

Test build #31135 has finished for PR 5732 at commit 1bc5bc3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 29, 2015

Test build #31266 has finished for PR 5732 at commit 5f67a59.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch adds the following new dependencies:
    • spark-unsafe_2.10-1.4.0-SNAPSHOT.jar

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
	streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
	streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
	streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
	streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@tdas
Copy link
Contributor Author

tdas commented Apr 30, 2015

@jerryshao @harishreedharan Please take a look at this PR.

}
if (dataRead == null) {

def getBlockFromBlockManager(): Option[Iterator[T]] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole thing just puts the old code within inner functions, not really a big change. The real change is that these functions are called selectively in lines 157..160.

@SparkQA
Copy link

SparkQA commented Apr 30, 2015

Test build #31360 has finished for PR 5732 at commit 466212c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

logWarning("Multiple result types in block information, WAL information will be ignored.")
}
// Is WAL segment info present with all the blocks
val isWALSegmentInfoPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to rename as isWALRecordHandleInfoPresent ? Looked though this part of code, some variables use segment but actually is handle, might be better to keep consistent :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@SparkQA
Copy link

SparkQA commented Apr 30, 2015

Test build #31446 has finished for PR 5732 at commit 637bc9c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@tdas
Copy link
Contributor Author

tdas commented May 1, 2015

@harishreedharan Any thoughts?

@@ -40,21 +40,30 @@ private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
val walRecordHandle: WriteAheadLogRecordHandle)
extends Partition
val isBlockIdValid: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this param to the scaladoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

@harishreedharan
Copy link
Contributor

LGTM.

@@ -66,7 +67,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
blockIds.foreach { blockId =>
sc.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
if (_setInvalid) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So rather than having this maybe the subclass of this could override isValid. It's a bit confusing as it stands.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. That is a more intuitive design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I will add unit tests to test this behavior for BlockRDD and WALBackedBlockRDD.

tdas added 3 commits May 2, 2015 02:16
Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
	streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
	streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@tdas
Copy link
Contributor Author

tdas commented May 5, 2015

I addressed @pwendell comments as well as added more unit tests to WriteAheadLogBackedBlockRDDSuite to get 100% code coverage over WriteAheadLogBackedBlockRDD. Also got 100% coverage of ReceivedBlockTracker.

@tdas
Copy link
Contributor Author

tdas commented May 5, 2015

I will merge this when tests pass.

@SparkQA
Copy link

SparkQA commented May 5, 2015

Test build #31817 has finished for PR 5732 at commit 575476e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented May 5, 2015

retest this please.

@SparkQA
Copy link

SparkQA commented May 5, 2015

Test build #31836 has finished for PR 5732 at commit 575476e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request May 5, 2015
… WAL and recovered on driver failure

- Enabled ReceivedBlockTracker WAL by default
- Stored block metadata in the WAL
- Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5732 from tdas/SPARK-7139 and squashes the following commits:

575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD
19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
685fab3 [Tathagata Das] Addressed comments in PR
637bc9c [Tathagata Das] Changed segment to handle
466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system
1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery
d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD

(cherry picked from commit 1854ac3)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 1854ac3 May 5, 2015
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
… WAL and recovered on driver failure

- Enabled ReceivedBlockTracker WAL by default
- Stored block metadata in the WAL
- Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#5732 from tdas/SPARK-7139 and squashes the following commits:

575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD
19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
685fab3 [Tathagata Das] Addressed comments in PR
637bc9c [Tathagata Das] Changed segment to handle
466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system
1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery
d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
… WAL and recovered on driver failure

- Enabled ReceivedBlockTracker WAL by default
- Stored block metadata in the WAL
- Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#5732 from tdas/SPARK-7139 and squashes the following commits:

575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD
19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
685fab3 [Tathagata Das] Addressed comments in PR
637bc9c [Tathagata Das] Changed segment to handle
466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system
1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery
d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
… WAL and recovered on driver failure

- Enabled ReceivedBlockTracker WAL by default
- Stored block metadata in the WAL
- Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#5732 from tdas/SPARK-7139 and squashes the following commits:

575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD
19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
685fab3 [Tathagata Das] Addressed comments in PR
637bc9c [Tathagata Das] Changed segment to handle
466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system
1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery
d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants