Skip to content

Commit

Permalink
[SPARK-19517][SS] KafkaSource fails to initialize partition offsets
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This patch fixes a bug in `KafkaSource` with the (de)serialization of the length of the JSON string that contains the initial partition offsets.

## How was this patch tested?

I ran the test suite for spark-sql-kafka-0-10.

Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com>

Closes #16857 from vitillo/kafka_source_fix.
  • Loading branch information
vitillo authored and zsxwing committed Feb 17, 2017
1 parent 4cc06f4 commit 1a3f5f8
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 7 deletions.
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,4 @@ org.apache.spark.scheduler.ExternalClusterManager
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
spark-warehouse
structured-streaming/*
kafka-source-initial-offset-version-2.1.0.bin
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.{util => ju}
import java.io._
import java.nio.charset.StandardCharsets

import org.apache.commons.io.IOUtils
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -97,16 +98,31 @@ private[kafka010] class KafkaSource(
val metadataLog =
new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
out.write(bytes.length)
out.write(bytes)
out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
writer.write(VERSION)
writer.write(metadata.json)
writer.flush
}

override def deserialize(in: InputStream): KafkaSourceOffset = {
val length = in.read()
val bytes = new Array[Byte](length)
in.read(bytes)
KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8)))
in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
// HDFSMetadataLog guarantees that it never creates a partial file.
assert(content.length != 0)
if (content(0) == 'v') {
if (content.startsWith(VERSION)) {
KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length)))
} else {
val versionInFile = content.substring(0, content.indexOf("\n"))
throw new IllegalStateException(
s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " +
s"but was $versionInFile. Please upgrade your Spark.")
}
} else {
// The log was generated by Spark 2.1.0
KafkaSourceOffset(SerializedOffset(content))
}
}
}

Expand Down Expand Up @@ -335,6 +351,8 @@ private[kafka010] object KafkaSource {
| source option "failOnDataLoss" to "false".
""".stripMargin

private val VERSION = "v1\n"

def getSortedExecutorList(sc: SparkContext): Array[String] = {
val bm = sc.env.blockManager
bm.master.getPeers(bm.blockManagerId).toArray
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.sql.kafka010

import java.io._
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.{Files, Paths}
import java.util.Properties
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -141,6 +143,108 @@ class KafkaSourceSuite extends KafkaSourceTest {

private val topicId = new AtomicInteger(0)

testWithUninterruptibleThread(
"deserialization of initial offset with Spark 2.1.0") {
withTempDir { metadataPath =>
val topic = newTopic
testUtils.createTopic(topic, partitions = 3)

val provider = new KafkaSourceProvider
val parameters = Map(
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
)
val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
"", parameters)
source.getOffset.get // Write initial offset

// Make sure Spark 2.1.0 will throw an exception when reading the new log
intercept[java.lang.IllegalArgumentException] {
// Simulate how Spark 2.1.0 reads the log
val in = new FileInputStream(metadataPath.getAbsolutePath + "/0")
val length = in.read()
val bytes = new Array[Byte](length)
in.read(bytes)
KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
}
}
}

testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") {
withTempDir { metadataPath =>
val topic = "kafka-initial-offset-2-1-0"
testUtils.createTopic(topic, partitions = 3)

val provider = new KafkaSourceProvider
val parameters = Map(
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
)

val from = Paths.get(
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath)
val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
Files.copy(from, to)

val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
"", parameters)
val deserializedOffset = source.getOffset.get
val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
assert(referenceOffset == deserializedOffset)
}
}

testWithUninterruptibleThread("deserialization of initial offset written by future version") {
withTempDir { metadataPath =>
val futureMetadataLog =
new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession,
metadataPath.getAbsolutePath) {
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
out.write(0)
val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
writer.write(s"v0\n${metadata.json}")
writer.flush
}
}

val topic = newTopic
testUtils.createTopic(topic, partitions = 3)
val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
futureMetadataLog.add(0, offset)

val provider = new KafkaSourceProvider
val parameters = Map(
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
)
val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
"", parameters)

val e = intercept[java.lang.IllegalStateException] {
source.getOffset.get // Read initial offset
}

assert(e.getMessage.contains("Please upgrade your Spark"))
}
}

test("(de)serialization of initial offsets") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 64)

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)

testStream(reader.load)(
makeSureGetOffsetCalled,
StopStream,
StartStream(),
StopStream)
}

test("maxOffsetsPerTrigger") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
Expand Down

0 comments on commit 1a3f5f8

Please sign in to comment.