-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7056][Streaming] Make the Write Ahead Log pluggable #5645
Conversation
Conflicts: streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
Test build #30802 has finished for PR 5645 at commit
|
Test build #30803 has finished for PR 5645 at commit
|
Test build #30814 has finished for PR 5645 at commit
|
Test build #30820 has finished for PR 5645 at commit
|
@jerryshao @harishreedharan Can you please take a look. |
Test build #30865 has finished for PR 5645 at commit
|
…guration into it.
Test build #30885 has finished for PR 5645 at commit
|
Test build #30886 has finished for PR 5645 at commit
|
Test build #30909 has finished for PR 5645 at commit
|
Test build #30913 has finished for PR 5645 at commit
|
Test build #30914 has finished for PR 5645 at commit
|
var dataRead: ByteBuffer = null | ||
var writeAheadLog: WriteAheadLog = null | ||
try { | ||
val dummyDirectory = FileUtils.getTempDirectoryPath() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why here need to use dummyDirectory
? Assuming WAL may not be file-based, so I'm not sure what's the meaning we need to have this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the default WAL is file based so a log directory is needed for it to work. However, the log directory is really not needed reading a particular record. But to read a single record you have to create a FileBasedWriteAheadLog object, which needs a log directory. Hence I am providing a dummy directory for this.
I know that this is a little awkward. This is the cost of defining a single interface for both writing and reading single records. Earlier there were two independent classes (WALWriter and WALRandomReader) that was used for these two purposes, which has different requirements. But since I am trying make single interface that can be used for all reading and writing, the log directory must be provided in the constructor of the default file-based WAL. This results in the awkwardness.
I dont quite like it myself, but it may practically be okay as long as we ensure that the FileBasedWAL does not create unnecessary directories/files when only reading a single record. I can add a test to ensure that.
@pwendell Please take a look. |
I am taking a look at this. So far this looks good, I will comments, if any, tomorrow. |
* to plug in your own custom implementation of a write ahead log. | ||
*/ | ||
@org.apache.spark.annotation.DeveloperApi | ||
public interface WriteAheadLog { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea that this would be useful for Java implementations to keep this a Java interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Its meant for users to create arbitrary implementations and we want to
stay backward compatible (scala traits have pretty nasty corner cases).
On Apr 26, 2015 9:48 PM, "Hari Shreedharan" notifications@github.com
wrote:
In
streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
#5645 (comment):
- * limitations under the License.
- /
+
+package org.apache.spark.streaming.util;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/*- * Interface representing a write ahead log (aka journal) that is used by Spark Streaming to
- * save the received data (by receivers) and associated metadata to a reliable storage, so that
- * they can be recovered after driver failures. See the Spark docs for more information on how
- * to plug in your own custom implementation of a write ahead log.
- */
+@org.apache.spark.annotation.DeveloperApi
+public interface WriteAheadLog {Is the idea that this would be useful for Java implementations to keep
this a Java interface?—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5645/files#r29120494.
* a WriteAheadLog to read a written record. | ||
*/ | ||
@org.apache.spark.annotation.DeveloperApi | ||
public interface WriteAheadLogSegment extends java.io.Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should more explicitly build the serialization of these segment identifiers into this interface. One extreme option is to have the segment identifiers actually be byte buffers and ask the user to deal on their own with serializing them.
The main concerns I have are the following:
- Individual implementations of this must be java Serializable, but it's not possible to reflect that in the interface.
- If those implementations want to evolve over different versions for instance they add a new field to the segment identifier, it will be tricky for them to do in a way that's backwards compatible (they'll have to write a custom externalization logic, which isn't really used for backwards compatibility).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also could we call this a WALSegmentHandle
or something? This isn't the segment itself it's just an identifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Well, for advanced users who want to implement their own WAL implementation will have to ensure that the segment info is serializable, no matter whether we expose an interface or a bytebuffer. In fact, exposing an interface avoids them from writing the code to serialize and return a bytebuffer in a usual case, which is easier to user. Also this interface is expected to be called not faster than 100s of time per second. So does not require super high serialization efficiency. Even if they want, they can always make the implementation extend Externalizable.
- That is a good point. There are easy workaround even if we dont make this a ByteBuffer. They can put a bytebuffer within their implementation
class MyWALSegment(byteBuffer: ByteBuffer) extends WALSegment
. Now for people who dont care about backward compatibility, making it a bytebuffer make it harder for them to implement. For others who do care about backward compatibility, they will have to write custom externalization logic either way, while returning bytebuffer or returning MyWALSegment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in the current approach, they can't for instance use kryo or protobuf to serialize, unless they do something really crazy like use an externalizable hook to then call into Kryo. I guess I'm just thinking ahead to how this will evolve. However, if we want to have this in the future we can always create an alternative version that is additive, so I don't feel strongly at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this is an interface, so I am not sure we can create an alternate
method without breaking binary compatibility.
Well, they could leave the serialization of MyWALSegment to Java, which is
just a wrapper for a ByteBuffer/byte array which contains all the real
data. If that sounds too complicated, then may be we should do bytes. And
probably we should simply use byte array instead of ByteBuffer, as we
probably dont need to deal with direct byte buffers here.
On Mon, Apr 27, 2015 at 9:31 PM, Patrick Wendell notifications@github.com
wrote:
In
streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogSegment.java
#5645 (comment):
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- /
+
+package org.apache.spark.streaming.util;
+
+/*- * This is an interface that represent the information required by any implementation of
- * a WriteAheadLog to read a written record.
- */
+@org.apache.spark.annotation.DeveloperApi
+public interface WriteAheadLogSegment extends java.io.Serializable {But in the current approach, they can't for instance use kryo or protobuf
to serialize, unless they do something really crazy like use an
externalizable hook to then call into Kryo. I guess I'm just thinking ahead
to how this will evolve. However, if we want to have this in the future we
can always create an alternative version that is additive, so I don't feel
strongly at all—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5645/files#r29213656.
I added some comments on the public interface. The main one is about whether we use opaque buffers to make the serialization of the segment identifier more explicit. |
I think I dont have a strong opinion either ways, I just dont see too much benefit with bytebuffer. Rather it make its slightly hard for naive users to implement their own WAL, while making it no more or less difficult for advanced users who care about backward compatibility. |
Offline conversation with @pwendell - he thinks its fine either ways, as long as there is a way to add more methods in the future. To enable that I am making the interfaces into abstract classes, so that we can add more methods with minimal affect to binary compatibility. |
Test build #31187 has finished for PR 5645 at commit
|
@@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler( | |||
*/ | |||
private[streaming] case class WriteAheadLogBasedStoreResult( | |||
blockId: StreamBlockId, | |||
segment: WriteAheadLogFileSegment | |||
segment: WriteAheadLogRecordHandle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a leftover from the name change, segment --> handle. If there is no other change needed, I can take care of it in the next PR #5732
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, I am fixing this. This is a problem in too many places.
Test build #31210 has finished for PR 5645 at commit
|
Test build #31212 has finished for PR 5645 at commit
|
s"Could not read data from write ahead log record ${partition.walRecordHandle}", e) | ||
} finally { | ||
if (writeAheadLog != null) { | ||
writeAheadLog.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be reset writeAheadLog to null after close to avoid unexpected behavior :).
writeAheadLog = null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #31230 has finished for PR 5645 at commit
|
} | ||
|
||
/** Instantiate the class, either using single arg constructor or zero arg constructor */ | ||
private def instantiateClass(cls: Class[WriteAheadLog], conf: SparkConf): WriteAheadLog = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Class[WriteAheadLog]
should be changed to Class[_ <: WriteAheadLog]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not help much as it just makes the thing look more complicated. Changed nonetheless for correctness.
retest this please |
Test build #31272 has finished for PR 5645 at commit
|
blockLocations.getOrElse( | ||
HdfsUtils.getFileSegmentLocations( | ||
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig)) | ||
blockLocations.getOrElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might make sense to add location info to the WALRecordHandle interface itself. This way, systems that are not HDFS, but still benefit from preferred locations can use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I wasnt super sure of whether it is a good idea to have it in the interface in this version. We can add it later and maintain binary compatibility as the RecordHandle is an abstract class. Also It is still a developer API s. For now, I am going to merge this in to unblock #5732 .
Generally looks good. Posted one comment, which is not a blocker, but something we can consider adding later too. |
@harishreedharan @jerryshao @pwendell Thanks for the feedback y'all. Merging this. |
Users may want the WAL data to be written to non-HDFS data storage systems. To allow that, we have to make the WAL pluggable. The following design doc outlines the plan. https://docs.google.com/a/databricks.com/document/d/1A2XaOLRFzvIZSi18i_luNw5Rmm9j2j4AigktXxIYxmY/edit?usp=sharing Things to add. * Unit tests for WriteAheadLogUtils Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#5645 from tdas/wal-pluggable and squashes the following commits: 2c431fd [Tathagata Das] Minor fixes. c2bc738 [Tathagata Das] More changes based on PR comments. 569a416 [Tathagata Das] fixed long line bde26b1 [Tathagata Das] Renamed segment to record handle everywhere b65e155 [Tathagata Das] More changes based on PR comments. d7cd15b [Tathagata Das] Fixed test 1a32a4b [Tathagata Das] Fixed test e0d19fb [Tathagata Das] Fixed defaults 9310cbf [Tathagata Das] style fix. 86abcb1 [Tathagata Das] Refactored WriteAheadLogUtils, and consolidated all WAL related configuration into it. 84ce469 [Tathagata Das] Added unit test and fixed compilation error. bce5e75 [Tathagata Das] Fixed long lines. 837c4f5 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 754fbf8 [Tathagata Das] Added license and docs. 09bc6fe [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 7dd2d4b [Tathagata Das] Added pluggable WriteAheadLog interface, and refactored all code along with it
Users may want the WAL data to be written to non-HDFS data storage systems. To allow that, we have to make the WAL pluggable. The following design doc outlines the plan. https://docs.google.com/a/databricks.com/document/d/1A2XaOLRFzvIZSi18i_luNw5Rmm9j2j4AigktXxIYxmY/edit?usp=sharing Things to add. * Unit tests for WriteAheadLogUtils Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#5645 from tdas/wal-pluggable and squashes the following commits: 2c431fd [Tathagata Das] Minor fixes. c2bc738 [Tathagata Das] More changes based on PR comments. 569a416 [Tathagata Das] fixed long line bde26b1 [Tathagata Das] Renamed segment to record handle everywhere b65e155 [Tathagata Das] More changes based on PR comments. d7cd15b [Tathagata Das] Fixed test 1a32a4b [Tathagata Das] Fixed test e0d19fb [Tathagata Das] Fixed defaults 9310cbf [Tathagata Das] style fix. 86abcb1 [Tathagata Das] Refactored WriteAheadLogUtils, and consolidated all WAL related configuration into it. 84ce469 [Tathagata Das] Added unit test and fixed compilation error. bce5e75 [Tathagata Das] Fixed long lines. 837c4f5 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 754fbf8 [Tathagata Das] Added license and docs. 09bc6fe [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 7dd2d4b [Tathagata Das] Added pluggable WriteAheadLog interface, and refactored all code along with it
Users may want the WAL data to be written to non-HDFS data storage systems. To allow that, we have to make the WAL pluggable. The following design doc outlines the plan. https://docs.google.com/a/databricks.com/document/d/1A2XaOLRFzvIZSi18i_luNw5Rmm9j2j4AigktXxIYxmY/edit?usp=sharing Things to add. * Unit tests for WriteAheadLogUtils Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#5645 from tdas/wal-pluggable and squashes the following commits: 2c431fd [Tathagata Das] Minor fixes. c2bc738 [Tathagata Das] More changes based on PR comments. 569a416 [Tathagata Das] fixed long line bde26b1 [Tathagata Das] Renamed segment to record handle everywhere b65e155 [Tathagata Das] More changes based on PR comments. d7cd15b [Tathagata Das] Fixed test 1a32a4b [Tathagata Das] Fixed test e0d19fb [Tathagata Das] Fixed defaults 9310cbf [Tathagata Das] style fix. 86abcb1 [Tathagata Das] Refactored WriteAheadLogUtils, and consolidated all WAL related configuration into it. 84ce469 [Tathagata Das] Added unit test and fixed compilation error. bce5e75 [Tathagata Das] Fixed long lines. 837c4f5 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 754fbf8 [Tathagata Das] Added license and docs. 09bc6fe [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 7dd2d4b [Tathagata Das] Added pluggable WriteAheadLog interface, and refactored all code along with it
Users may want the WAL data to be written to non-HDFS data storage systems. To allow that, we have to make the WAL pluggable. The following design doc outlines the plan.
https://docs.google.com/a/databricks.com/document/d/1A2XaOLRFzvIZSi18i_luNw5Rmm9j2j4AigktXxIYxmY/edit?usp=sharing
Things to add.