Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Spark 2.4.0 #426

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,6 @@ matrix:
- jdk: openjdk7
scala: 2.11.7
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.7.4"
env:
global:
# AWS_REDSHIFT_JDBC_URL
- secure: "RNkxdKcaKEYuJqxli8naazp42qO5/pgueIzs+J5rHwl39jcBvJMgW3DX8kT7duzdoBb/qrolj/ttbQ3l/30P45+djn0BEwcJMX7G/FGpZYD23yd03qeq7sOKPQl2Ni/OBttYHJMah5rI6aPmAysBZMQO7Wijdenb/RUiU2YcZp0="
# AWS_REDSHIFT_PASSWORD
- secure: "g5li3gLejD+/2BIqIm+qHiqBUvCc5l0qnftVaVlLtL7SffErp/twDiFP4gW8eqnFqi2GEC1c9Shf7Z9cOIUunNSBQZdYIVG0f38UfBeDP14nOoIuwZ974O5yggbgZhX0cKvJzINcENGoRNk0FzRwgOdCCiF05IMnRqQxI3C24fE="
# AWS_REDSHIFT_USER
- secure: "LIkY/ZpBXK3vSFsdpBSRXEsgfD2wDF52X8OZOlyBJOiZpS4y1/obj8b3VQABDPyPH95bGX/LOpM0vVM137rYgF0pskgVEzLMyZOPpwYqNGPf/d4BtQhBRc8f7+jmr6D4Hrox4jCl0cCKaeiTazun2+Y9E+zgCUDvQ8y9qGctR2k="
# TEST_AWS_ACCESS_KEY_ID
- secure: "bsB6YwkscUxtzcZOKja4Y69IR3JqvCP3W/4vFftW/v33/hOC3EBz7TVNKS+ZIomBUQYJnzsMfM59bj7YEc3KZe8WxIcUdLI40hg0X5O1RhJDNPW+0oGbWshmzyua+hY1y7nRja+8/17tYTbAi1+MhscRu+O/2aWaXolA9BicuX0="
# TEST_AWS_SECRET_ACCESS_KEY
- secure: "cGxnZh4be9XiPBOMxe9wHYwEfrWNw4zSjmvGFEC9UUV11ydHLo5wrXtcTVFmY7qxUxYeb0NB2N+CQXE0GcyUKoTviKG9sOS3cxR1q30FsdOVcWDKAzpBUmzDTMwDLAUMysziyOtMorDlNVydqYdYLMpiUN0O+eDKA+iOHlJp7fo="
# STS_ROLE_ARN
- secure: "cuyemI1bqPkWBD5B1FqIKDJb5g/SX5x8lrzkO0J/jkyGY0VLbHxrl5j/9PrKFuvraBK3HC56HEP1Zg+IMvh+uv0D+p5y14C97fAzE33uNgR2aVkamOo92zHvxvXe7zBtqc8rztWsJb1pgkrY7SdgSXgQc88ohey+XecDh4TahTY="
# AWS_S3_SCRATCH_SPACE
- secure: "LvndQIW6dHs6nyaMHtblGI/oL+s460lOezFs2BoD0Isenb/O/IM+nY5K9HepTXjJIcq8qvUYnojZX1FCrxxOXX2/+/Iihiq7GzJYdmdMC6hLg9bJYeAFk0dWYT88/AwadrJCBOa3ockRLhiO3dkai7Ki5+M1erfaFiAHHMpJxYQ="
# AWS_S3_CROSS_REGION_SCRATCH_SPACE
- secure: "esYmBqt256Dc77HT68zoaE/vtsFGk2N+Kt+52RlR0cjHPY1q5801vxLbeOlpYb2On3x8YckE++HadjL40gwSBsca0ffoogq6zTlfbJYDSQkQG1evxXWJZLcafB0igfBs/UbEUo7EaxoAJQcLgiWWwUdO0a0iU1ciSVyogZPagL0="

script:
- ./dev/run-tests-travis.sh
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
[![Build Status](https://travis-ci.org/databricks/spark-redshift.svg?branch=master)](https://travis-ci.org/databricks/spark-redshift)
[![codecov.io](http://codecov.io/github/databricks/spark-redshift/coverage.svg?branch=master)](http://codecov.io/github/databricks/spark-redshift?branch=master)

## Disclaimer
This is fork version from Databricks's spark-redshift repository. Our custom changes only tested with Spark **2.4.0** version. These custom changes may not be worked with older version of Spark

## Note

To ensure the best experience for our customers, we have decided to inline this connector directly in Databricks Runtime. The latest version of Databricks Runtime (3.0+) includes an advanced version of the RedShift connector for Spark that features both performance improvements (full query pushdown) as well as security improvements (automatic encryption). For more information, refer to the <a href="https://docs.databricks.com/spark/latest/data-sources/aws/amazon-redshift.html">Databricks documentation</a>. As a result, we will no longer be making releases separately from Databricks Runtime.
Expand Down
8 changes: 4 additions & 4 deletions project/SparkRedshiftBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object SparkRedshiftBuild extends Build {
organization := "com.databricks",
scalaVersion := "2.11.7",
crossScalaVersions := Seq("2.10.5", "2.11.7"),
sparkVersion := "2.0.0",
sparkVersion := "2.4.0",
testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value),
testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("3.0.0"),
testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"),
Expand All @@ -64,7 +64,7 @@ object SparkRedshiftBuild extends Build {
"com.eclipsesource.minimal-json" % "minimal-json" % "0.9.4",
// We require spark-avro, but avro-mapred must be provided to match Hadoop version.
// In most cases, avro-mapred will be provided as part of the Spark assembly JAR.
"com.databricks" %% "spark-avro" % "3.0.0",
"org.apache.spark" %% "spark-avro" % sparkVersion.value,
if (testHadoopVersion.value.startsWith("1")) {
"org.apache.avro" % "avro-mapred" % "1.7.7" % "provided" classifier "hadoop1" exclude("org.mortbay.jetty", "servlet-api")
} else {
Expand All @@ -75,7 +75,7 @@ object SparkRedshiftBuild extends Build {
// A Redshift-compatible JDBC driver must be present on the classpath for spark-redshift to work.
// For testing, we use an Amazon driver, which is available from
// http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html
"com.amazon.redshift" % "jdbc4" % "1.1.7.1007" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC4-1.1.7.1007.jar",
"com.amazon.redshift" % "jdbc41" % "1.2.12.1017" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.12.1017/RedshiftJDBC41-1.2.12.1017.jar",
// Although support for the postgres driver is lower priority than support for Amazon's
// official Redshift driver, we still run basic tests with it.
"postgresql" % "postgresql" % "8.3-606.jdbc4" % "test",
Expand Down Expand Up @@ -118,7 +118,7 @@ object SparkRedshiftBuild extends Build {
"org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"org.apache.spark" %% "spark-hive" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"com.databricks" %% "spark-avro" % testSparkAvroVersion.value % "test" exclude("org.apache.avro", "avro-mapred") force()
"org.apache.spark" %% "spark-avro" % testSparkVersion.value % "test" exclude("org.apache.avro", "avro-mapred") force()
),
// Although spark-avro declares its avro-mapred dependency as `provided`, its version of the
// dependency can still end up on the classpath during tests, which breaks the tests for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class AWSCredentialsInUriIntegrationSuite extends IntegrationSuiteBase {
// Override this method so that we do not set the credentials in sc.hadoopConf.
override def beforeAll(): Unit = {
assert(tempDir.contains("AKIA"), "tempdir did not contain AWS credentials")
assert(!AWS_SECRET_ACCESS_KEY.contains("/"), "AWS secret key should not contain slash")
sc = new SparkContext("local", getClass.getSimpleName)
conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,19 @@ trait IntegrationSuiteBase
protected val AWS_REDSHIFT_JDBC_URL: String = loadConfigFromEnv("AWS_REDSHIFT_JDBC_URL")
protected val AWS_REDSHIFT_USER: String = loadConfigFromEnv("AWS_REDSHIFT_USER")
protected val AWS_REDSHIFT_PASSWORD: String = loadConfigFromEnv("AWS_REDSHIFT_PASSWORD")
protected val AWS_ACCESS_KEY_ID: String = loadConfigFromEnv("TEST_AWS_ACCESS_KEY_ID")
protected val AWS_SECRET_ACCESS_KEY: String = loadConfigFromEnv("TEST_AWS_SECRET_ACCESS_KEY")
protected val AWS_ACCESS_KEY_ID: String = loadConfigFromEnv("AWS_ACCESS_KEY_ID")
protected val AWS_SECRET_ACCESS_KEY: String = loadConfigFromEnv("AWS_SECRET_ACCESS_KEY")
// Path to a directory in S3 (e.g. 's3n://bucket-name/path/to/scratch/space').
protected val AWS_S3_SCRATCH_SPACE: String = loadConfigFromEnv("AWS_S3_SCRATCH_SPACE")
require(AWS_S3_SCRATCH_SPACE.contains("s3n"), "must use s3n:// URL")

protected def jdbcUrl: String = {
s"$AWS_REDSHIFT_JDBC_URL?user=$AWS_REDSHIFT_USER&password=$AWS_REDSHIFT_PASSWORD"
s"$AWS_REDSHIFT_JDBC_URL?user=$AWS_REDSHIFT_USER&password=$AWS_REDSHIFT_PASSWORD&ssl=true"
}

protected def jdbcUrlNoUserPassword: String = {
s"$AWS_REDSHIFT_JDBC_URL?ssl=true"
}
/**
* Random suffix appended appended to table and directory names in order to avoid collisions
* between separate Travis builds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ class RedshiftCredentialsInConfIntegrationSuite extends IntegrationSuiteBase {
val tableName = s"roundtrip_save_and_load_$randomSuffix"
try {
write(df)
.option("url", AWS_REDSHIFT_JDBC_URL)
.option("url", jdbcUrlNoUserPassword)
.option("user", AWS_REDSHIFT_USER)
.option("password", AWS_REDSHIFT_PASSWORD)
.option("dbtable", tableName)
.save()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = read
.option("url", AWS_REDSHIFT_JDBC_URL)
.option("url", jdbcUrlNoUserPassword)
.option("user", AWS_REDSHIFT_USER)
.option("password", AWS_REDSHIFT_PASSWORD)
.option("dbtable", tableName)
Expand Down
18 changes: 16 additions & 2 deletions src/it/scala/com/databricks/spark/redshift/RedshiftReadSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,30 @@ class RedshiftReadSuite extends IntegrationSuiteBase {
s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')")
conn.commit()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
// Due to #98, we use Double here instead of float:
checkAnswer(
read.option("dbtable", tableName).load(),
Seq(Double.NaN, Double.PositiveInfinity, Double.NegativeInfinity).map(x => Row.apply(x)))
Seq(Float.NaN, Float.PositiveInfinity, Float.NegativeInfinity).map(x => Row.apply(x)))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("test empty string and null") {
withTempRedshiftTable("records_with_empty_and_null_characters") { tableName =>
conn.createStatement().executeUpdate(
s"CREATE TABLE $tableName (x varchar(256))")
conn.createStatement().executeUpdate(
s"INSERT INTO $tableName VALUES ('null'), (''), (null)")
conn.commit()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
checkAnswer(
read.option("dbtable", tableName).load(),
Seq("null", "", null).map(x => Row.apply(x)))
}
}


test("read special double values (regression test for #261)") {
val tableName = s"roundtrip_special_double_values_$randomSuffix"
try {
Expand Down
13 changes: 11 additions & 2 deletions src/main/scala/com/databricks/spark/redshift/Conversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[redshift] object Conversions {
*
* Note that instances of this function are NOT thread-safe.
*/
def createRowConverter(schema: StructType): Array[String] => InternalRow = {
def createRowConverter(schema: StructType, nullString: String): Array[String] => InternalRow = {
val dateFormat = createRedshiftDateFormat()
val decimalFormat = createRedshiftDecimalFormat()
val conversionFunctions: Array[String => Any] = schema.fields.map { field =>
Expand Down Expand Up @@ -116,7 +116,16 @@ private[redshift] object Conversions {
var i = 0
while (i < schema.length) {
val data = inputRow(i)
converted(i) = if (data == null || data.isEmpty) null else conversionFunctions(i)(data)
converted(i) = if ((data == null || data == nullString) ||
(data.isEmpty && schema.fields(i).dataType != StringType)) {
null
}
else if (data.isEmpty) {
""
}
else {
conversionFunctions(i)(data)
}
i += 1
}
encoder.toRow(externalRow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}

/**
* Internal data source used for reading Redshift UNLOAD files.
Expand Down Expand Up @@ -95,8 +95,11 @@ private[redshift] class RedshiftFileFormat extends FileFormat {
// be closed once it is completely iterated, but this is necessary to guard against
// resource leaks in case the task fails or is interrupted.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
val converter = Conversions.createRowConverter(requiredSchema)
val converter = Conversions.createRowConverter(requiredSchema,
options.getOrElse("nullString", Parameters.DEFAULT_PARAMETERS("csvnullstring")))
iter.map(converter)
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,46 +300,39 @@ private[redshift] class JDBCWrapper {
// TODO: cleanup types which are irrelevant for Redshift.
val answer = sqlType match {
// scalastyle:off
case java.sql.Types.ARRAY => null
case java.sql.Types.BIGINT => if (signed) { LongType } else { DecimalType(20,0) }
case java.sql.Types.BINARY => BinaryType
case java.sql.Types.BIT => BooleanType // @see JdbcDialect for quirks
case java.sql.Types.BLOB => BinaryType
case java.sql.Types.BOOLEAN => BooleanType
// Null Type
case java.sql.Types.NULL => null

// Character Types
case java.sql.Types.CHAR => StringType
case java.sql.Types.CLOB => StringType
case java.sql.Types.DATALINK => null
case java.sql.Types.NCHAR => StringType
case java.sql.Types.NVARCHAR => StringType
case java.sql.Types.VARCHAR => StringType

// Datetime Types
case java.sql.Types.DATE => DateType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIMESTAMP => TimestampType

// Boolean Type
case java.sql.Types.BIT => BooleanType // @see JdbcDialect for quirks
case java.sql.Types.BOOLEAN => BooleanType

// Numeric Types
case java.sql.Types.BIGINT => if (signed) { LongType } else { DecimalType(20,0) }
case java.sql.Types.DECIMAL
if precision != 0 || scale != 0 => DecimalType(precision, scale)
case java.sql.Types.DECIMAL => DecimalType(38, 18) // Spark 1.5.0 default
case java.sql.Types.DISTINCT => null
case java.sql.Types.DOUBLE => DoubleType
case java.sql.Types.FLOAT => FloatType
case java.sql.Types.INTEGER => if (signed) { IntegerType } else { LongType }
case java.sql.Types.JAVA_OBJECT => null
case java.sql.Types.LONGNVARCHAR => StringType
case java.sql.Types.LONGVARBINARY => BinaryType
case java.sql.Types.LONGVARCHAR => StringType
case java.sql.Types.NCHAR => StringType
case java.sql.Types.NCLOB => StringType
case java.sql.Types.NULL => null
case java.sql.Types.NUMERIC
if precision != 0 || scale != 0 => DecimalType(precision, scale)
case java.sql.Types.NUMERIC => DecimalType(38, 18) // Spark 1.5.0 default
case java.sql.Types.NVARCHAR => StringType
case java.sql.Types.OTHER => null
case java.sql.Types.REAL => DoubleType
case java.sql.Types.REF => StringType
case java.sql.Types.ROWID => LongType
// Redshift Real is represented in 4 bytes IEEE Float. https://docs.aws.amazon.com/redshift/latest/dg/r_Numeric_types201.html
case java.sql.Types.REAL => FloatType
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ => null
// scalastyle:on
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ private[redshift] case class RedshiftRelation(
// Unload data from Redshift into a temporary directory in S3:
val tempDir = params.createPerQueryTempDir()
val unloadSql = buildUnloadStmt(requiredColumns, filters, tempDir, creds)
log.info(unloadSql)
val conn = jdbcWrapper.getConnector(params.jdbcDriver, params.jdbcUrl, params.credentials)
try {
jdbcWrapper.executeInterruptibly(conn.prepareStatement(unloadSql))
Expand Down Expand Up @@ -165,6 +164,7 @@ private[redshift] case class RedshiftRelation(
sqlContext.read
.format(classOf[RedshiftFileFormat].getName)
.schema(prunedSchema)
.option("nullString", params.nullString)
.load(filesToRead: _*)
.queryExecution.executedPlan.execute().asInstanceOf[RDD[Row]]
}
Expand All @@ -189,11 +189,13 @@ private[redshift] case class RedshiftRelation(
val escapedTableNameOrSubqury = tableNameOrSubquery.replace("\\", "\\\\").replace("'", "\\'")
s"SELECT $columnList FROM $escapedTableNameOrSubqury $whereClause"
}
log.info(query)
// We need to remove S3 credentials from the unload path URI because they will conflict with
// the credentials passed via `credsString`.
val fixedUrl = Utils.fixS3Url(Utils.removeCredentialsFromURI(new URI(tempDir)).toString)

s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString' ESCAPE MANIFEST"
s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString'" +
s" ESCAPE MANIFEST NULL AS '${params.nullString}'"
}

private def pruneSchema(schema: StructType, columns: Array[String]): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ private[redshift] class RedshiftWriter(
val writer = sqlContext.createDataFrame(convertedRows, convertedSchema).write
(tempFormat match {
case "AVRO" =>
writer.format("com.databricks.spark.avro")
writer.format("avro")
case "CSV" =>
writer.format("csv")
.option("escape", "\"")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.sql.types._
class ConversionsSuite extends FunSuite {

private def createRowConverter(schema: StructType) = {
Conversions.createRowConverter(schema).andThen(RowEncoder(schema).resolveAndBind().fromRow)
Conversions.createRowConverter(schema, Parameters.DEFAULT_PARAMETERS("csvnullstring"))
.andThen(RowEncoder(schema).resolveAndBind().fromRow)
}

test("Data should be correctly converted") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class RedshiftSourceSuite
|1|f|2015-07-02|0|0.0|42|1239012341823719|-13|asdf|2015-07-02 00:00:00.0
|0||2015-07-03|0.0|-1.0|4141214|1239012341823719||f|2015-07-03 00:00:00
|0|f||-1234152.12312498|100000.0||1239012341823719|24|___\|_123|
||||||||||
|||||||||@NULL@|
""".stripMargin.trim
// scalastyle:on
val expectedQuery = (
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "3.0.0-SNAPSHOT"
version in ThisBuild := "3.0.0"