Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply data source v2 changes #576

Closed
wants to merge 71 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
ae0e2ca
[SPARK-26865][SQL] DataSourceV2Strategy should push normalized filters
dongjoon-hyun Feb 14, 2019
dc26348
[SPARK-26666][SQL] Support DSv2 overwrite and dynamic partition overw…
rdblue Feb 18, 2019
af5d187
[SPARK-26785][SQL] data source v2 API refactor: streaming write
cloud-fan Feb 19, 2019
167ffec
[SPARK-24252][SQL] Add v2 catalog plugin system
rdblue Mar 8, 2019
3da923b
[SPARK-26946][SQL] Identifiers for multi-catalog
jzhuge Mar 22, 2019
85d0f08
[SPARK-27250][TEST-MAVEN][BUILD] Scala 2.11 maven compile should targ…
jzhuge Mar 24, 2019
4db1e19
[SPARK-26673][FOLLOWUP][SQL] File Source V2: check existence of outpu…
gengliangwang Feb 26, 2019
2733301
[SPARK-26952][SQL] Row count statics should respect the data reported…
ConeyLiu Feb 26, 2019
4094211
[SPARK-26871][SQL] File Source V2: avoid creating unnecessary FileInd…
gengliangwang Feb 15, 2019
caa5fab
[SPARK-26744][SQL] Support schema validation in FileDataSourceV2 fram…
gengliangwang Feb 16, 2019
d2f0dd5
[SPARK-26956][SS] remove streaming output mode from data source v2 APIs
cloud-fan Mar 4, 2019
49dd067
[SPARK-26389][SS] Add force delete temp checkpoint configuration
gaborgsomogyi Feb 8, 2019
1f5d3d4
[SPARK-26824][SS] Fix the checkpoint location and _spark_metadata whe…
zsxwing Feb 20, 2019
982df04
[SPARK-27111][SS] Fix a race that a continuous query may fail with In…
zsxwing Mar 9, 2019
38556e7
[SPARK-24063][SS] Add maximum epoch queue threshold for ContinuousExe…
gaborgsomogyi Feb 27, 2019
3fecdd9
[SPARK-27064][SS] create StreamingWrite at the beginning of streaming…
cloud-fan Mar 13, 2019
1609b3f
[SPARK-27106][SQL] merge CaseInsensitiveStringMap and DataSourceOptions
cloud-fan Mar 13, 2019
8f9c5ac
[SPARK-26811][SQL] Add capabilities to v2.Table
rdblue Mar 18, 2019
c46db75
[SPARK-27209][SQL] Split parsing of SELECT and INSERT into two top-le…
dilipbiswal Mar 26, 2019
bc6ece7
Revert "[SPARK-27209][SQL] Split parsing of SELECT and INSERT into tw…
mccheah May 15, 2019
b9a2061
[SPARK-27209][SQL] Split parsing of SELECT and INSERT into two top-le…
dilipbiswal Mar 26, 2019
e68a36c
Revert "[SPARK-27209][SQL] Split parsing of SELECT and INSERT into tw…
mccheah May 15, 2019
942ac18
[SPARK-26215][SQL] Define reserved/non-reserved keywords based on the…
maropu Feb 22, 2019
4a7c007
[SPARK-26215][SQL][FOLLOW-UP][MINOR] Fix the warning from ANTR4
dilipbiswal Mar 1, 2019
6cb9234
[SPARK-26982][SQL] Enhance describe framework to describe the output …
dilipbiswal Mar 2, 2019
f0d9915
[SPARK-27108][SQL] Add parsed SQL plans for create, CTAS.
rdblue Mar 22, 2019
0f9ac2a
[SPARK-27181][SQL] Add public transform API
rdblue Apr 10, 2019
d70253e
[SPARK-24252][SQL] Add TableCatalog API
rdblue May 8, 2019
d2b526c
[SPARK-24923][SQL] Implement v2 CreateTableAsSelect
rdblue May 15, 2019
e133b92
Fix scala 2.11 compilation
mccheah May 15, 2019
6dbc1d3
Fix style
mccheah May 15, 2019
d49a179
[SPARK-27162][SQL] Add new method asCaseSensitiveMap in CaseInsensiti…
gengliangwang Mar 19, 2019
affb14b
Fix compilation
mccheah May 15, 2019
4661671
More Scala 2.11 stuff
mccheah May 15, 2019
ee834f7
[SPARK-26744][SPARK-26744][SQL][HOTFOX] Disable schema validation tes…
HyukjinKwon Feb 18, 2019
294eaef
Merge remote-tracking branch 'palantir/apply-dsv2-changes' into dsv2-…
mccheah Jun 6, 2019
5e7eb12
[SPARK-26811][SQL][FOLLOWUP] fix some documentation
cloud-fan Apr 4, 2019
57153b4
[MINOR][TEST][DOC] Execute action miss name message
uncleGen Apr 27, 2019
0c2d6aa
[SPARK-27576][SQL] table capability to skip the output column resolution
cloud-fan May 16, 2019
d3e9b94
[SPARK-26356][SQL] remove SaveMode from data source v2
cloud-fan May 24, 2019
c0ffa90
Fix compilation issues
mccheah May 24, 2019
c968388
Fix scalastyle
mccheah May 24, 2019
d7e3943
[SPARK-27521][SQL] Move data source v2 to catalyst module
cloud-fan Jun 5, 2019
e0edb6c
Fix merge conflicts
mccheah Jun 6, 2019
c7c5d84
[SPARK-27732][SQL] Add v2 CreateTable implementation.
rdblue May 24, 2019
6244b77
[SPARK-26946][SQL][FOLLOWUP] Require lookup function
jzhuge May 30, 2019
0db2aa0
[SPARK-27813][SQL] DataSourceV2: Add DropTable logical operation
jzhuge May 30, 2019
d9e0cca
[SPARK-27103][SQL][MINOR] List SparkSql reserved keywords in alphabet…
SongYadong Mar 8, 2019
e1365ba
[SPARK-27857][SQL] Move ALTER TABLE parsing into Catalyst
rdblue Jun 5, 2019
1a45142
Fix merge conflicts
mccheah Jun 6, 2019
8bcc74d
Revert "Fix merge conflicts"
mccheah Jun 6, 2019
a3debfd
Revert "[SPARK-27857][SQL] Move ALTER TABLE parsing into Catalyst"
mccheah Jun 6, 2019
7c1eb92
Revert "[SPARK-27103][SQL][MINOR] List SparkSql reserved keywords in …
mccheah Jun 6, 2019
cfa37b0
[SPARK-27675][SQL] do not use MutableColumnarRow in ColumnarBatch
cloud-fan May 12, 2019
d8f503e
[MINOR] Move java file to java directory
ConeyLiu Mar 28, 2019
b28de53
[SPARK-27190][SQL] add table capability for streaming
cloud-fan Apr 26, 2019
a022526
Fix merge conflicts
mccheah Jun 6, 2019
4e5087f
[SPARK-23014][SS] Fully remove V1 memory sink.
gaborgsomogyi Apr 29, 2019
287e9d7
Fix merge conflicts
mccheah Jun 6, 2019
d97de74
[SPARK-27579][SQL] remove BaseStreamingSource and BaseStreamingSink
cloud-fan May 6, 2019
12746c1
[SPARK-27642][SS] make v1 offset extends v2 offset
cloud-fan May 8, 2019
c99b896
Fix imports
mccheah Jun 6, 2019
5d2096e
[SPARK-27693][SQL] Add default catalog property
rdblue May 20, 2019
f7e63d6
Fix merge conflicts
mccheah Jun 6, 2019
876d1a0
Revert "Fix merge conflicts"
mccheah Jun 6, 2019
b714508
FIx merge conflicts again
mccheah Jun 6, 2019
c018fba
Fix style
mccheah Jun 6, 2019
2cd8078
Fix test build.
mccheah Jun 6, 2019
5346dcf
[SPARK-27411][SQL] DataSourceV2Strategy should not eliminate subquery
francis0407 Apr 9, 2019
17bb20c
Fix merge conflict
mccheah Jun 7, 2019
5a8ea0b
Fix build
mccheah Jun 7, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,20 @@ private[spark] object TestUtils {
assert(listener.numSpilledStages == 0, s"expected $identifier to not spill, but did")
}

/**
* Asserts that exception message contains the message. Please note this checks all
* exceptions in the tree.
*/
def assertExceptionMsg(exception: Throwable, msg: String): Unit = {
var e = exception
var contains = e.getMessage.contains(msg)
while (e.getCause != null && !contains) {
e = e.getCause
contains = e.getMessage.contains(msg)
}
assert(contains, s"Exception tree doesn't contain the expected message: $msg")
}

/**
* Test if a command is available.
*/
Expand Down
2 changes: 2 additions & 0 deletions docs/_data/menu-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
url: sql-migration-guide-upgrade.html
- text: Compatibility with Apache Hive
url: sql-migration-guide-hive-compatibility.html
- text: SQL Reserved/Non-Reserved Keywords
url: sql-reserved-and-non-reserved-keywords.html
- text: Reference
url: sql-reference.html
subitems:
Expand Down
575 changes: 575 additions & 0 deletions docs/sql-reserved-and-non-reserved-keywords.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2980,7 +2980,7 @@ the effect of the change is not well-defined. For all of them:

- Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) are allowed, but the semantics of the change depends on the code.

- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example:
- *Changes in projection / filter / map-like operations*: Some cases are allowed. For example:

- Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming._
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
* read by per-task consumers generated later.
* @param kafkaParams String params for per-task Kafka consumers.
* @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
* are not Kafka consumer params.
* @param sourceOptions Params which are not Kafka consumer params.
* @param metadataPath Path to a directory this reader can use for writing metadata.
* @param initialOffsets The Kafka offsets to start reading data at.
* @param failOnDataLoss Flag indicating whether reading should fail in data loss
Expand Down Expand Up @@ -77,7 +76,7 @@ class KafkaContinuousStream(
}

override def planInputPartitions(start: Offset): Array[InputPartition] = {
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start)
val oldStartPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets

val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.UninterruptibleThread

/**
Expand All @@ -57,7 +57,7 @@ import org.apache.spark.util.UninterruptibleThread
private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader: KafkaOffsetReader,
executorKafkaParams: ju.Map[String, Object],
options: DataSourceOptions,
options: CaseInsensitiveStringMap,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
Expand All @@ -66,8 +66,7 @@ private[kafka010] class KafkaMicroBatchStream(
"kafkaConsumer.pollTimeoutMs",
SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)

private val maxOffsetsPerTrigger =
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong)

private val rangeCalculator = KafkaOffsetRangeCalculator(options)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.util.CaseInsensitiveStringMap


/**
Expand Down Expand Up @@ -91,8 +91,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int

private[kafka010] object KafkaOffsetRangeCalculator {

def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = {
val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt)
def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator = {
val optionalValue = Option(options.get("minPartitions")).map(_.toInt)
new KafkaOffsetRangeCalculator(optionalValue)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset

/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
* their offsets.
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset {

override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* The provider class for all Kafka readers and writers. It is designed such that it throws
Expand All @@ -47,7 +50,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with StreamingWriteSupportProvider
with TableProvider
with Logging {
import KafkaSourceProvider._
Expand Down Expand Up @@ -102,8 +104,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}

override def getTable(options: DataSourceOptions): KafkaTable = {
new KafkaTable(strategy(options.asMap().asScala.toMap))
override def getTable(options: CaseInsensitiveStringMap): KafkaTable = {
new KafkaTable(strategy(options.asScala.toMap))
}

/**
Expand Down Expand Up @@ -180,20 +182,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}

override def createStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceOptions): StreamingWriteSupport = {
import scala.collection.JavaConverters._

val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)

new KafkaStreamingWriteSupport(topic, producerParams, schema)
}

private def strategy(caseInsensitiveParams: Map[String, String]) =
caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
case ("assign", value) =>
Expand Down Expand Up @@ -365,23 +353,47 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

class KafkaTable(strategy: => ConsumerStrategy) extends Table
with SupportsMicroBatchRead with SupportsContinuousRead {
with SupportsRead with SupportsWrite {

override def name(): String = s"Kafka $strategy"

override def schema(): StructType = KafkaOffsetReader.kafkaSchema

override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
override def capabilities(): ju.Set[TableCapability] = {
Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava
}

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new KafkaScan(options)
}

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder {
private var inputSchema: StructType = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def buildForStreaming(): StreamingWrite = {
import scala.collection.JavaConverters._

assert(inputSchema != null)
val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
val producerParams = kafkaParamsForProducer(options.asScala.toMap)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
}
}
}

class KafkaScan(options: DataSourceOptions) extends Scan {
class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {

override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
val parameters = options.asMap().asScala.toMap
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
Expand Down Expand Up @@ -410,7 +422,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
val parameters = options.asMap().asScala.toMap
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -33,18 +33,18 @@ import org.apache.spark.sql.types.StructType
case object KafkaWriterCommitMessage extends WriterCommitMessage

/**
* A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory.
* A [[StreamingWrite]] for Kafka writing. Responsible for generating the writer factory.
*
* @param topic The topic this writer is responsible for. If None, topic will be inferred from
* a `topic` field in the incoming data.
* @param producerParams Parameters for Kafka producers in each task.
* @param schema The schema of the input data.
*/
class KafkaStreamingWriteSupport(
class KafkaStreamingWrite(
topic: Option[String],
producerParams: ju.Map[String, Object],
schema: StructType)
extends StreamingWriteSupport {
extends StreamingWrite {

validateQuery(schema.toAttributes, producerParams, topic)

Expand Down
Loading