Skip to content

Commit

Permalink
[SC-5621][REDSHIFT] Fix autocommit behaviour of tests JDBC connection.
Browse files Browse the repository at this point in the history
Changing the autocommit behaviour of the JDBC connection tests has brought a number of issues.
To fix that, went through all the tests and cleaned it up to use autocommit=true everywhere.

Author: Juliusz Sompolski <julek@databricks.com>

Closes apache#245 from juliuszsompolski/SC-5621-fixup.
  • Loading branch information
juliuszsompolski committed Feb 22, 2017
1 parent 45652f6 commit 4449f24
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ class AWSCredentialsInUriIntegrationSuite extends IntegrationSuiteBase {
test("roundtrip save and load") {
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 1),
StructType(StructField("foo", IntegerType) :: Nil))
testRoundtripSaveAndLoad(s"roundtrip_save_and_load_$randomSuffix", df)
testRoundtripSaveAndLoad(s"roundtrip_save_and_load", df)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import org.apache.spark.tags.ExtendedRedshiftTest
class ColumnMetadataSuite extends IntegrationSuiteBase {

test("configuring maxlength on string columns") {
val tableName = s"configuring_maxlength_on_string_column_$randomSuffix"
try {
withTempRedshiftTable("configuring_maxlength_on_string_column") { tableName =>
val metadata = new MetadataBuilder().putLong("maxlength", 512).build()
val schema = StructType(
StructField("x", StringType, metadata = metadata) :: Nil)
Expand All @@ -40,15 +39,11 @@ class ColumnMetadataSuite extends IntegrationSuiteBase {
.mode(SaveMode.Append)
.save()
}
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("configuring compression on columns") {
val tableName = s"configuring_compression_on_columns_$randomSuffix"
try {
withTempRedshiftTable("configuring_compression_on_columns") { tableName =>
val metadata = new MetadataBuilder().putString("encoding", "LZO").build()
val schema = StructType(
StructField("x", StringType, metadata = metadata) :: Nil)
Expand All @@ -65,15 +60,11 @@ class ColumnMetadataSuite extends IntegrationSuiteBase {
s"""(SELECT "column", lower(encoding) FROM pg_table_def WHERE tablename='$tableName')""")
.load()
checkAnswer(encodingDF, Seq(Row("x", "lzo")))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("configuring comments on columns") {
val tableName = s"configuring_comments_on_columns_$randomSuffix"
try {
withTempRedshiftTable("configuring_comments_on_columns") { tableName =>
val metadata = new MetadataBuilder().putString("description", "Hello Column").build()
val schema = StructType(
StructField("x", StringType, metadata = metadata) :: Nil)
Expand Down Expand Up @@ -106,9 +97,6 @@ class ColumnMetadataSuite extends IntegrationSuiteBase {
.option("dbtable", commentQuery)
.load()
checkAnswer(columnDF, Seq(Row("x", "Hello Column")))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class CrossRegionIntegrationSuite extends IntegrationSuiteBase {
new AmazonS3Client(new BasicAWSCredentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY))).get
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 1),
StructType(StructField("foo", IntegerType) :: Nil))
val tableName = s"roundtrip_save_and_load_$randomSuffix"
try {
withTempRedshiftTable("roundtrip_save_and_load") { tableName =>
write(df)
.option("dbtable", tableName)
.option("extracopyoptions", s"region '$bucketRegion'")
Expand All @@ -48,9 +47,6 @@ class CrossRegionIntegrationSuite extends IntegrationSuiteBase {
Thread.sleep(1000)
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
}
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,22 @@ class DecimalIntegrationSuite extends IntegrationSuiteBase {

private def testReadingDecimals(precision: Int, scale: Int, decimalStrings: Seq[String]): Unit = {
test(s"reading DECIMAL($precision, $scale)") {
val tableName = s"reading_decimal_${precision}_${scale}_$randomSuffix"
val expectedRows = decimalStrings.map { d =>
if (d == null) {
Row(null)
} else {
Row(Conversions.createRedshiftDecimalFormat().parse(d).asInstanceOf[java.math.BigDecimal])
}
}
try {
conn.createStatement().executeUpdate(
s"CREATE TABLE $tableName (x DECIMAL($precision, $scale))")
withTempRedshiftTable(s"reading_decimal_${precision}_${scale}") { tableName =>
jdbcUpdate(s"CREATE TABLE $tableName (x DECIMAL($precision, $scale))")
for (x <- decimalStrings) {
conn.createStatement().executeUpdate(s"INSERT INTO $tableName VALUES ($x)")
jdbcUpdate(s"INSERT INTO $tableName VALUES ($x)")
}
conn.commit()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = read.option("dbtable", tableName).load()
checkAnswer(loadedDf, expectedRows)
checkAnswer(loadedDf.selectExpr("x + 0"), expectedRows)
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}
}
Expand Down Expand Up @@ -76,18 +70,15 @@ class DecimalIntegrationSuite extends IntegrationSuiteBase {

test("Decimal precision is preserved when reading from query (regression test for issue #203)") {
withTempRedshiftTable("issue203") { tableName =>
try {
conn.createStatement().executeUpdate(s"CREATE TABLE $tableName (foo BIGINT)")
conn.createStatement().executeUpdate(s"INSERT INTO $tableName VALUES (91593373)")
conn.commit()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val df = read
.option("query", s"select foo / 1000000.0 from $tableName limit 1")
.load()
val res: Double = df.collect().toSeq.head.getDecimal(0).doubleValue()
assert(res === (91593373L / 1000000.0) +- 0.01)
assert(df.schema.fields.head.dataType === DecimalType(28, 8))
}
jdbcUpdate(s"CREATE TABLE $tableName (foo BIGINT)")
jdbcUpdate(s"INSERT INTO $tableName VALUES (91593373)")
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val df = read
.option("query", s"select foo / 1000000.0 from $tableName limit 1")
.load()
val res: Double = df.collect().toSeq.head.getDecimal(0).doubleValue()
assert(res === (91593373L / 1000000.0) +- 0.01)
assert(df.schema.fields.head.dataType === DecimalType(28, 8))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ class IAMIntegrationSuite extends IntegrationSuiteBase {
private val IAM_ROLE_ARN: String = loadConfigFromEnv("STS_ROLE_ARN")

test("roundtrip save and load") {
val tableName = s"iam_roundtrip_save_and_load$randomSuffix"
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))),
StructType(StructField("a", IntegerType) :: Nil))
try {
withTempRedshiftTable("iam_roundtrip_save_and_load") { tableName =>
write(df)
.option("dbtable", tableName)
.option("forward_spark_s3_credentials", "false")
Expand All @@ -44,15 +43,11 @@ class IAMIntegrationSuite extends IntegrationSuiteBase {
assert(loadedDf.schema.length === 1)
assert(loadedDf.columns === Seq("a"))
checkAnswer(loadedDf, Seq(Row(1)))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("load fails if IAM role cannot be assumed") {
val tableName = s"iam_load_fails_if_role_cannot_be_assumed$randomSuffix"
try {
withTempRedshiftTable("iam_load_fails_if_role_cannot_be_assumed") { tableName =>
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))),
StructType(StructField("a", IntegerType) :: Nil))
val err = intercept[SQLException] {
Expand All @@ -64,9 +59,6 @@ class IAMIntegrationSuite extends IntegrationSuiteBase {
.save()
}
assert(err.getCause.getMessage.contains("is not authorized to assume IAM Role"))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,9 @@ trait IntegrationSuiteBase
protected var conn: Connection = _
protected var sparkSession: SparkSession = _

def runSql(query: String): Unit = {
log.debug("RUNNING: " + Utils.sanitizeQueryText(query))
sqlContext.sql(query).collect()
}

def jdbcUpdate(query: String): Unit = {
log.debug("RUNNING: " + Utils.sanitizeQueryText(query))
conn.createStatement.executeUpdate(query)
log.debug("JDBC RUNNING: " + Utils.sanitizeQueryText(query))
conn.createStatement().executeUpdate(query)
}

override def beforeAll(): Unit = {
Expand All @@ -97,19 +92,14 @@ trait IntegrationSuiteBase
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY)
conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None)
// Disable autocommit due to conflicts with PG JDBC driver:
// PG JDBC driver seems to forbid calling explicit commit when autocommit is on,
// so lets just make autocommit off everywhere for consistency.
conn.setAutoCommit(false)
conn.createStatement().executeUpdate(s"create schema if not exists $schemaName")
conn.createStatement().execute(s"set search_path to $schemaName, '$$user', public")
conn.commit()
conn.setAutoCommit(true)
jdbcUpdate(s"create schema if not exists $schemaName")
jdbcUpdate(s"set search_path to $schemaName, '$$user', public")
}

override def afterAll(): Unit = {
try {
conn.createStatement().executeUpdate(s"drop schema if exists $schemaName cascade")
conn.commit()
jdbcUpdate(s"drop schema if exists $schemaName cascade")
val conf = new Configuration(false)
conf.set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID)
conf.set("fs.s3n.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY)
Expand Down Expand Up @@ -166,7 +156,8 @@ trait IntegrationSuiteBase
}

protected def createTestDataInRedshift(tableName: String): Unit = {
conn.createStatement().executeUpdate(
jdbcUpdate(s"drop table if exists $tableName")
jdbcUpdate(
s"""
|create table $tableName (
|testbyte int2,
Expand All @@ -183,7 +174,7 @@ trait IntegrationSuiteBase
""".stripMargin
)
// scalastyle:off
conn.createStatement().executeUpdate(
jdbcUpdate(
s"""
|insert into $tableName values
|(null, null, null, null, null, null, null, null, null, null),
Expand All @@ -195,16 +186,14 @@ trait IntegrationSuiteBase
""".stripMargin
)
// scalastyle:on
conn.commit()
}

protected def withTempRedshiftTable[T](namePrefix: String)(body: String => T): T = {
val tableName = s"$namePrefix$randomSuffix"
try {
body(tableName)
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
jdbcUpdate(s"drop table if exists $tableName")
}
}

Expand Down Expand Up @@ -314,7 +303,7 @@ trait IntegrationSuiteBase
df: DataFrame,
expectedSchemaAfterLoad: Option[StructType] = None,
saveMode: SaveMode = SaveMode.ErrorIfExists): Unit = {
try {
withTempRedshiftTable(tableName) { tableName =>
write(df)
.option("dbtable", tableName)
.mode(saveMode)
Expand All @@ -330,9 +319,6 @@ trait IntegrationSuiteBase
val loadedDf = read.option("dbtable", tableName).load()
assert(loadedDf.schema === expectedSchemaAfterLoad.getOrElse(df.schema))
checkAnswer(loadedDf, df.collect())
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ class PostgresDriverIntegrationSuite extends IntegrationSuiteBase {
test("roundtrip save and load") {
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 1),
StructType(StructField("foo", IntegerType) :: Nil))
testRoundtripSaveAndLoad(s"save_with_one_empty_partition_$randomSuffix", df)
testRoundtripSaveAndLoad(s"save_with_one_empty_partition", df)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ class RedshiftCredentialsInConfIntegrationSuite extends IntegrationSuiteBase {
test("roundtrip save and load") {
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 1),
StructType(StructField("foo", IntegerType) :: Nil))
val tableName = s"roundtrip_save_and_load_$randomSuffix"
try {
withTempRedshiftTable("roundtrip_save_and_load") { tableName =>
write(df)
.option("url", AWS_REDSHIFT_JDBC_URL)
.option("user", AWS_REDSHIFT_USER)
Expand All @@ -40,9 +39,6 @@ class RedshiftCredentialsInConfIntegrationSuite extends IntegrationSuiteBase {
.load()
assert(loadedDf.schema === df.schema)
checkAnswer(loadedDf, df.collect())
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@ class RedshiftReadSuite extends IntegrationSuiteBase {

override def beforeAll(): Unit = {
super.beforeAll()
conn.prepareStatement(s"drop table if exists $test_table").executeUpdate()
conn.commit()
createTestDataInRedshift(test_table)
}

override def afterAll(): Unit = {
try {
conn.prepareStatement(s"drop table if exists $test_table").executeUpdate()
conn.commit()
jdbcUpdate(s"drop table if exists $test_table")
} finally {
super.afterAll()
}
Expand Down Expand Up @@ -185,49 +182,32 @@ class RedshiftReadSuite extends IntegrationSuiteBase {
}

test("read special float values (regression test for #261)") {
val tableName = s"roundtrip_special_float_values_$randomSuffix"
try {
conn.createStatement().executeUpdate(
s"CREATE TABLE $tableName (x real)")
conn.createStatement().executeUpdate(
s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')")
conn.commit()
withTempRedshiftTable("roundtrip_special_float_values") { tableName =>
jdbcUpdate(s"CREATE TABLE $tableName (x real)")
jdbcUpdate(s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')")
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
// Due to #98, we use Double here instead of float:
checkAnswer(
read.option("dbtable", tableName).load(),
Seq(Double.NaN, Double.PositiveInfinity, Double.NegativeInfinity).map(x => Row.apply(x)))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("read special double values (regression test for #261)") {
val tableName = s"roundtrip_special_double_values_$randomSuffix"
try {
conn.createStatement().executeUpdate(
s"CREATE TABLE $tableName (x double precision)")
conn.createStatement().executeUpdate(
s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')")
conn.commit()
withTempRedshiftTable("roundtrip_special_double_values") { tableName =>
jdbcUpdate(s"CREATE TABLE $tableName (x double precision)")
jdbcUpdate(s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')")
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
checkAnswer(
read.option("dbtable", tableName).load(),
Seq(Double.NaN, Double.PositiveInfinity, Double.NegativeInfinity).map(x => Row.apply(x)))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("read records containing escaped characters") {
withTempRedshiftTable("records_with_escaped_characters") { tableName =>
conn.createStatement().executeUpdate(
s"CREATE TABLE $tableName (x text)")
conn.createStatement().executeUpdate(
s"""INSERT INTO $tableName VALUES ('a\\nb'), ('\\\\'), ('"')""")
conn.commit()
jdbcUpdate(s"CREATE TABLE $tableName (x text)")
jdbcUpdate(s"""INSERT INTO $tableName VALUES ('a\\nb'), ('\\\\'), ('"')""")
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
checkAnswer(
read.option("dbtable", tableName).load(),
Expand Down
Loading

0 comments on commit 4449f24

Please sign in to comment.