From 4449f2481631ea0fe86e5ae6cc0289c5e8d5a609 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Wed, 22 Feb 2017 14:41:25 +0100 Subject: [PATCH] [SC-5621][REDSHIFT] Fix autocommit behaviour of tests JDBC connection. Changing the autocommit behaviour of the JDBC connection tests has brought a number of issues. To fix that, went through all the tests and cleaned it up to use autocommit=true everywhere. Author: Juliusz Sompolski Closes #245 from juliuszsompolski/SC-5621-fixup. --- .../AWSCredentialsInUriIntegrationSuite.scala | 2 +- .../spark/redshift/ColumnMetadataSuite.scala | 18 ++------- .../CrossRegionIntegrationSuite.scala | 6 +-- .../redshift/DecimalIntegrationSuite.scala | 33 ++++++---------- .../spark/redshift/IAMIntegrationSuite.scala | 12 +----- .../spark/redshift/IntegrationSuiteBase.scala | 36 ++++++------------ .../PostgresDriverIntegrationSuite.scala | 2 +- ...iftCredentialsInConfIntegrationSuite.scala | 6 +-- .../spark/redshift/RedshiftReadSuite.scala | 38 +++++-------------- .../spark/redshift/RedshiftWriteSuite.scala | 34 ++++++----------- .../spark/redshift/STSIntegrationSuite.scala | 6 +-- .../redshift/SaveModeIntegrationSuite.scala | 2 +- .../redshift/SearchPathIntegrationSuite.scala | 6 +-- .../AdvancedPushdownIntegrationSuite.scala | 1 - .../FilterPushdownIntegrationSuite.scala | 1 - 15 files changed, 56 insertions(+), 147 deletions(-) diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/AWSCredentialsInUriIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/AWSCredentialsInUriIntegrationSuite.scala index b4d42be0ed3d7..db14e55ea4482 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/AWSCredentialsInUriIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/AWSCredentialsInUriIntegrationSuite.scala @@ -53,6 +53,6 @@ class AWSCredentialsInUriIntegrationSuite extends IntegrationSuiteBase { test("roundtrip save and load") { val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 1), StructType(StructField("foo", IntegerType) :: Nil)) - testRoundtripSaveAndLoad(s"roundtrip_save_and_load_$randomSuffix", df) + testRoundtripSaveAndLoad(s"roundtrip_save_and_load", df) } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/ColumnMetadataSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/ColumnMetadataSuite.scala index 95e471991be03..c9df3412711e0 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/ColumnMetadataSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/ColumnMetadataSuite.scala @@ -22,8 +22,7 @@ import org.apache.spark.tags.ExtendedRedshiftTest class ColumnMetadataSuite extends IntegrationSuiteBase { test("configuring maxlength on string columns") { - val tableName = s"configuring_maxlength_on_string_column_$randomSuffix" - try { + withTempRedshiftTable("configuring_maxlength_on_string_column") { tableName => val metadata = new MetadataBuilder().putLong("maxlength", 512).build() val schema = StructType( StructField("x", StringType, metadata = metadata) :: Nil) @@ -40,15 +39,11 @@ class ColumnMetadataSuite extends IntegrationSuiteBase { .mode(SaveMode.Append) .save() } - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } test("configuring compression on columns") { - val tableName = s"configuring_compression_on_columns_$randomSuffix" - try { + withTempRedshiftTable("configuring_compression_on_columns") { tableName => val metadata = new MetadataBuilder().putString("encoding", "LZO").build() val schema = StructType( StructField("x", StringType, metadata = metadata) :: Nil) @@ -65,15 +60,11 @@ class ColumnMetadataSuite extends IntegrationSuiteBase { s"""(SELECT "column", lower(encoding) FROM pg_table_def WHERE tablename='$tableName')""") .load() checkAnswer(encodingDF, Seq(Row("x", "lzo"))) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } test("configuring comments on columns") { - val tableName = s"configuring_comments_on_columns_$randomSuffix" - try { + withTempRedshiftTable("configuring_comments_on_columns") { tableName => val metadata = new MetadataBuilder().putString("description", "Hello Column").build() val schema = StructType( StructField("x", StringType, metadata = metadata) :: Nil) @@ -106,9 +97,6 @@ class ColumnMetadataSuite extends IntegrationSuiteBase { .option("dbtable", commentQuery) .load() checkAnswer(columnDF, Seq(Row("x", "Hello Column"))) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/CrossRegionIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/CrossRegionIntegrationSuite.scala index b4fb34af86c62..7b6d4c0b3c59b 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/CrossRegionIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/CrossRegionIntegrationSuite.scala @@ -34,8 +34,7 @@ class CrossRegionIntegrationSuite extends IntegrationSuiteBase { new AmazonS3Client(new BasicAWSCredentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY))).get val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 1), StructType(StructField("foo", IntegerType) :: Nil)) - val tableName = s"roundtrip_save_and_load_$randomSuffix" - try { + withTempRedshiftTable("roundtrip_save_and_load") { tableName => write(df) .option("dbtable", tableName) .option("extracopyoptions", s"region '$bucketRegion'") @@ -48,9 +47,6 @@ class CrossRegionIntegrationSuite extends IntegrationSuiteBase { Thread.sleep(1000) assert(DefaultJDBCWrapper.tableExists(conn, tableName)) } - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/DecimalIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/DecimalIntegrationSuite.scala index e3078b9457eaf..be79f062b1e01 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/DecimalIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/DecimalIntegrationSuite.scala @@ -22,7 +22,6 @@ class DecimalIntegrationSuite extends IntegrationSuiteBase { private def testReadingDecimals(precision: Int, scale: Int, decimalStrings: Seq[String]): Unit = { test(s"reading DECIMAL($precision, $scale)") { - val tableName = s"reading_decimal_${precision}_${scale}_$randomSuffix" val expectedRows = decimalStrings.map { d => if (d == null) { Row(null) @@ -30,20 +29,15 @@ class DecimalIntegrationSuite extends IntegrationSuiteBase { Row(Conversions.createRedshiftDecimalFormat().parse(d).asInstanceOf[java.math.BigDecimal]) } } - try { - conn.createStatement().executeUpdate( - s"CREATE TABLE $tableName (x DECIMAL($precision, $scale))") + withTempRedshiftTable(s"reading_decimal_${precision}_${scale}") { tableName => + jdbcUpdate(s"CREATE TABLE $tableName (x DECIMAL($precision, $scale))") for (x <- decimalStrings) { - conn.createStatement().executeUpdate(s"INSERT INTO $tableName VALUES ($x)") + jdbcUpdate(s"INSERT INTO $tableName VALUES ($x)") } - conn.commit() assert(DefaultJDBCWrapper.tableExists(conn, tableName)) val loadedDf = read.option("dbtable", tableName).load() checkAnswer(loadedDf, expectedRows) checkAnswer(loadedDf.selectExpr("x + 0"), expectedRows) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } } @@ -76,18 +70,15 @@ class DecimalIntegrationSuite extends IntegrationSuiteBase { test("Decimal precision is preserved when reading from query (regression test for issue #203)") { withTempRedshiftTable("issue203") { tableName => - try { - conn.createStatement().executeUpdate(s"CREATE TABLE $tableName (foo BIGINT)") - conn.createStatement().executeUpdate(s"INSERT INTO $tableName VALUES (91593373)") - conn.commit() - assert(DefaultJDBCWrapper.tableExists(conn, tableName)) - val df = read - .option("query", s"select foo / 1000000.0 from $tableName limit 1") - .load() - val res: Double = df.collect().toSeq.head.getDecimal(0).doubleValue() - assert(res === (91593373L / 1000000.0) +- 0.01) - assert(df.schema.fields.head.dataType === DecimalType(28, 8)) - } + jdbcUpdate(s"CREATE TABLE $tableName (foo BIGINT)") + jdbcUpdate(s"INSERT INTO $tableName VALUES (91593373)") + assert(DefaultJDBCWrapper.tableExists(conn, tableName)) + val df = read + .option("query", s"select foo / 1000000.0 from $tableName limit 1") + .load() + val res: Double = df.collect().toSeq.head.getDecimal(0).doubleValue() + assert(res === (91593373L / 1000000.0) +- 0.01) + assert(df.schema.fields.head.dataType === DecimalType(28, 8)) } } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/IAMIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/IAMIntegrationSuite.scala index c8c298166f5ef..85135b605fb19 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/IAMIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/IAMIntegrationSuite.scala @@ -24,10 +24,9 @@ class IAMIntegrationSuite extends IntegrationSuiteBase { private val IAM_ROLE_ARN: String = loadConfigFromEnv("STS_ROLE_ARN") test("roundtrip save and load") { - val tableName = s"iam_roundtrip_save_and_load$randomSuffix" val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))), StructType(StructField("a", IntegerType) :: Nil)) - try { + withTempRedshiftTable("iam_roundtrip_save_and_load") { tableName => write(df) .option("dbtable", tableName) .option("forward_spark_s3_credentials", "false") @@ -44,15 +43,11 @@ class IAMIntegrationSuite extends IntegrationSuiteBase { assert(loadedDf.schema.length === 1) assert(loadedDf.columns === Seq("a")) checkAnswer(loadedDf, Seq(Row(1))) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } test("load fails if IAM role cannot be assumed") { - val tableName = s"iam_load_fails_if_role_cannot_be_assumed$randomSuffix" - try { + withTempRedshiftTable("iam_load_fails_if_role_cannot_be_assumed") { tableName => val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))), StructType(StructField("a", IntegerType) :: Nil)) val err = intercept[SQLException] { @@ -64,9 +59,6 @@ class IAMIntegrationSuite extends IntegrationSuiteBase { .save() } assert(err.getCause.getMessage.contains("is not authorized to assume IAM Role")) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/IntegrationSuiteBase.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/IntegrationSuiteBase.scala index 86017bf8a00bc..09462cedcc640 100644 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/IntegrationSuiteBase.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/IntegrationSuiteBase.scala @@ -78,14 +78,9 @@ trait IntegrationSuiteBase protected var conn: Connection = _ protected var sparkSession: SparkSession = _ - def runSql(query: String): Unit = { - log.debug("RUNNING: " + Utils.sanitizeQueryText(query)) - sqlContext.sql(query).collect() - } - def jdbcUpdate(query: String): Unit = { - log.debug("RUNNING: " + Utils.sanitizeQueryText(query)) - conn.createStatement.executeUpdate(query) + log.debug("JDBC RUNNING: " + Utils.sanitizeQueryText(query)) + conn.createStatement().executeUpdate(query) } override def beforeAll(): Unit = { @@ -97,19 +92,14 @@ trait IntegrationSuiteBase sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID) sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY) conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None) - // Disable autocommit due to conflicts with PG JDBC driver: - // PG JDBC driver seems to forbid calling explicit commit when autocommit is on, - // so lets just make autocommit off everywhere for consistency. - conn.setAutoCommit(false) - conn.createStatement().executeUpdate(s"create schema if not exists $schemaName") - conn.createStatement().execute(s"set search_path to $schemaName, '$$user', public") - conn.commit() + conn.setAutoCommit(true) + jdbcUpdate(s"create schema if not exists $schemaName") + jdbcUpdate(s"set search_path to $schemaName, '$$user', public") } override def afterAll(): Unit = { try { - conn.createStatement().executeUpdate(s"drop schema if exists $schemaName cascade") - conn.commit() + jdbcUpdate(s"drop schema if exists $schemaName cascade") val conf = new Configuration(false) conf.set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID) conf.set("fs.s3n.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY) @@ -166,7 +156,8 @@ trait IntegrationSuiteBase } protected def createTestDataInRedshift(tableName: String): Unit = { - conn.createStatement().executeUpdate( + jdbcUpdate(s"drop table if exists $tableName") + jdbcUpdate( s""" |create table $tableName ( |testbyte int2, @@ -183,7 +174,7 @@ trait IntegrationSuiteBase """.stripMargin ) // scalastyle:off - conn.createStatement().executeUpdate( + jdbcUpdate( s""" |insert into $tableName values |(null, null, null, null, null, null, null, null, null, null), @@ -195,7 +186,6 @@ trait IntegrationSuiteBase """.stripMargin ) // scalastyle:on - conn.commit() } protected def withTempRedshiftTable[T](namePrefix: String)(body: String => T): T = { @@ -203,8 +193,7 @@ trait IntegrationSuiteBase try { body(tableName) } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() + jdbcUpdate(s"drop table if exists $tableName") } } @@ -314,7 +303,7 @@ trait IntegrationSuiteBase df: DataFrame, expectedSchemaAfterLoad: Option[StructType] = None, saveMode: SaveMode = SaveMode.ErrorIfExists): Unit = { - try { + withTempRedshiftTable(tableName) { tableName => write(df) .option("dbtable", tableName) .mode(saveMode) @@ -330,9 +319,6 @@ trait IntegrationSuiteBase val loadedDf = read.option("dbtable", tableName).load() assert(loadedDf.schema === expectedSchemaAfterLoad.getOrElse(df.schema)) checkAnswer(loadedDf, df.collect()) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/PostgresDriverIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/PostgresDriverIntegrationSuite.scala index 3da3c7ff12ae0..5a4cd4f89c056 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/PostgresDriverIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/PostgresDriverIntegrationSuite.scala @@ -39,6 +39,6 @@ class PostgresDriverIntegrationSuite extends IntegrationSuiteBase { test("roundtrip save and load") { val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 1), StructType(StructField("foo", IntegerType) :: Nil)) - testRoundtripSaveAndLoad(s"save_with_one_empty_partition_$randomSuffix", df) + testRoundtripSaveAndLoad(s"save_with_one_empty_partition", df) } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftCredentialsInConfIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftCredentialsInConfIntegrationSuite.scala index cde68da81d1a7..81290ef806522 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftCredentialsInConfIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftCredentialsInConfIntegrationSuite.scala @@ -23,8 +23,7 @@ class RedshiftCredentialsInConfIntegrationSuite extends IntegrationSuiteBase { test("roundtrip save and load") { val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 1), StructType(StructField("foo", IntegerType) :: Nil)) - val tableName = s"roundtrip_save_and_load_$randomSuffix" - try { + withTempRedshiftTable("roundtrip_save_and_load") { tableName => write(df) .option("url", AWS_REDSHIFT_JDBC_URL) .option("user", AWS_REDSHIFT_USER) @@ -40,9 +39,6 @@ class RedshiftCredentialsInConfIntegrationSuite extends IntegrationSuiteBase { .load() assert(loadedDf.schema === df.schema) checkAnswer(loadedDf, df.collect()) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftReadSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftReadSuite.scala index f3692e61fbc55..8f9af46357ed4 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftReadSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftReadSuite.scala @@ -23,15 +23,12 @@ class RedshiftReadSuite extends IntegrationSuiteBase { override def beforeAll(): Unit = { super.beforeAll() - conn.prepareStatement(s"drop table if exists $test_table").executeUpdate() - conn.commit() createTestDataInRedshift(test_table) } override def afterAll(): Unit = { try { - conn.prepareStatement(s"drop table if exists $test_table").executeUpdate() - conn.commit() + jdbcUpdate(s"drop table if exists $test_table") } finally { super.afterAll() } @@ -185,49 +182,32 @@ class RedshiftReadSuite extends IntegrationSuiteBase { } test("read special float values (regression test for #261)") { - val tableName = s"roundtrip_special_float_values_$randomSuffix" - try { - conn.createStatement().executeUpdate( - s"CREATE TABLE $tableName (x real)") - conn.createStatement().executeUpdate( - s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')") - conn.commit() + withTempRedshiftTable("roundtrip_special_float_values") { tableName => + jdbcUpdate(s"CREATE TABLE $tableName (x real)") + jdbcUpdate(s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')") assert(DefaultJDBCWrapper.tableExists(conn, tableName)) // Due to #98, we use Double here instead of float: checkAnswer( read.option("dbtable", tableName).load(), Seq(Double.NaN, Double.PositiveInfinity, Double.NegativeInfinity).map(x => Row.apply(x))) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } test("read special double values (regression test for #261)") { - val tableName = s"roundtrip_special_double_values_$randomSuffix" - try { - conn.createStatement().executeUpdate( - s"CREATE TABLE $tableName (x double precision)") - conn.createStatement().executeUpdate( - s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')") - conn.commit() + withTempRedshiftTable("roundtrip_special_double_values") { tableName => + jdbcUpdate(s"CREATE TABLE $tableName (x double precision)") + jdbcUpdate(s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')") assert(DefaultJDBCWrapper.tableExists(conn, tableName)) checkAnswer( read.option("dbtable", tableName).load(), Seq(Double.NaN, Double.PositiveInfinity, Double.NegativeInfinity).map(x => Row.apply(x))) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } test("read records containing escaped characters") { withTempRedshiftTable("records_with_escaped_characters") { tableName => - conn.createStatement().executeUpdate( - s"CREATE TABLE $tableName (x text)") - conn.createStatement().executeUpdate( - s"""INSERT INTO $tableName VALUES ('a\\nb'), ('\\\\'), ('"')""") - conn.commit() + jdbcUpdate(s"CREATE TABLE $tableName (x text)") + jdbcUpdate(s"""INSERT INTO $tableName VALUES ('a\\nb'), ('\\\\'), ('"')""") assert(DefaultJDBCWrapper.tableExists(conn, tableName)) checkAnswer( read.option("dbtable", tableName).load(), diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftWriteSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftWriteSuite.scala index a00fcf4d13823..cbfeb79ddb41c 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftWriteSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftWriteSuite.scala @@ -27,8 +27,7 @@ abstract class BaseRedshiftWriteSuite extends IntegrationSuiteBase { test("roundtrip save and load") { // This test can be simplified once #98 is fixed. - val tableName = s"roundtrip_save_and_load_$randomSuffix" - try { + withTempRedshiftTable("roundtrip_save_and_load") { tableName => write( sqlContext.createDataFrame(sc.parallelize(TestUtils.expectedData), TestUtils.testSchema)) .option("dbtable", tableName) @@ -37,15 +36,12 @@ abstract class BaseRedshiftWriteSuite extends IntegrationSuiteBase { assert(DefaultJDBCWrapper.tableExists(conn, tableName)) checkAnswer(read.option("dbtable", tableName).load(), TestUtils.expectedData) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } test("roundtrip save and load with uppercase column names") { testRoundtripSaveAndLoad( - s"roundtrip_write_and_read_with_uppercase_column_names_$randomSuffix", + s"roundtrip_write_and_read_with_uppercase_column_names", sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))), StructType(StructField("A", IntegerType) :: Nil)), expectedSchemaAfterLoad = Some(StructType(StructField("a", IntegerType) :: Nil))) @@ -53,7 +49,7 @@ abstract class BaseRedshiftWriteSuite extends IntegrationSuiteBase { test("save with column names that are reserved words") { testRoundtripSaveAndLoad( - s"save_with_column_names_that_are_reserved_words_$randomSuffix", + s"save_with_column_names_that_are_reserved_words", sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))), StructType(StructField("table", IntegerType) :: Nil))) } @@ -62,24 +58,23 @@ abstract class BaseRedshiftWriteSuite extends IntegrationSuiteBase { val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 2), StructType(StructField("foo", IntegerType) :: Nil)) assert(df.rdd.glom.collect() === Array(Array.empty[Row], Array(Row(1)))) - testRoundtripSaveAndLoad(s"save_with_one_empty_partition_$randomSuffix", df) + testRoundtripSaveAndLoad(s"save_with_one_empty_partition", df) } test("save with all empty partitions (regression test for #96)") { val df = sqlContext.createDataFrame(sc.parallelize(Seq.empty[Row], 2), StructType(StructField("foo", IntegerType) :: Nil)) assert(df.rdd.glom.collect() === Array(Array.empty[Row], Array.empty[Row])) - testRoundtripSaveAndLoad(s"save_with_all_empty_partitions_$randomSuffix", df) + testRoundtripSaveAndLoad(s"save_with_all_empty_partitions", df) // Now try overwriting that table. Although the new table is empty, it should still overwrite // the existing table. val df2 = df.withColumnRenamed("foo", "bar") testRoundtripSaveAndLoad( - s"save_with_all_empty_partitions_$randomSuffix", df2, saveMode = SaveMode.Overwrite) + s"save_with_all_empty_partitions", df2, saveMode = SaveMode.Overwrite) } test("informative error message when saving a table with string that is longer than max length") { - val tableName = s"error_message_when_string_too_long_$randomSuffix" - try { + withTempRedshiftTable("error_message_when_string_too_long") { tableName => val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))), StructType(StructField("A", StringType) :: Nil)) val e = intercept[SQLException] { @@ -89,9 +84,6 @@ abstract class BaseRedshiftWriteSuite extends IntegrationSuiteBase { .save() } assert(e.getMessage.contains("while loading data into Redshift")) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } @@ -102,7 +94,7 @@ abstract class BaseRedshiftWriteSuite extends IntegrationSuiteBase { TestUtils.toTimestamp(1970, 0, 1, 0, 0, 0, millis = 100), TestUtils.toTimestamp(1970, 0, 1, 0, 0, 0, millis = 1000)) testRoundtripSaveAndLoad( - s"full_timestamp_precision_is_preserved$randomSuffix", + s"full_timestamp_precision_is_preserved", sqlContext.createDataFrame(sc.parallelize(timestamps.map(Row(_))), StructType(StructField("ts", TimestampType) :: Nil)) ) @@ -116,7 +108,7 @@ class AvroRedshiftWriteSuite extends BaseRedshiftWriteSuite { test("informative error message when saving with column names that contain spaces (#84)") { intercept[IllegalArgumentException] { testRoundtripSaveAndLoad( - s"error_when_saving_column_name_with_spaces_$randomSuffix", + s"error_when_saving_column_name_with_spaces", sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))), StructType(StructField("column name with spaces", IntegerType) :: Nil))) } @@ -129,7 +121,7 @@ class CSVRedshiftWriteSuite extends BaseRedshiftWriteSuite { test("save with column names that contain spaces (#84)") { testRoundtripSaveAndLoad( - s"save_with_column_names_that_contain_spaces_$randomSuffix", + s"save_with_column_names_that_contain_spaces", sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))), StructType(StructField("column name with spaces", IntegerType) :: Nil))) } @@ -146,8 +138,7 @@ class CSVGZIPRedshiftWriteSuite extends IntegrationSuiteBase { test("roundtrip save and load") { // This test can be simplified once #98 is fixed. - val tableName = s"roundtrip_save_and_load_$randomSuffix" - try { + withTempRedshiftTable("roundtrip_save_and_load") { tableName => write( sqlContext.createDataFrame(sc.parallelize(TestUtils.expectedData), TestUtils.testSchema)) .option("dbtable", tableName) @@ -156,9 +147,6 @@ class CSVGZIPRedshiftWriteSuite extends IntegrationSuiteBase { assert(DefaultJDBCWrapper.tableExists(conn, tableName)) checkAnswer(read.option("dbtable", tableName).load(), TestUtils.expectedData) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/STSIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/STSIntegrationSuite.scala index c86081799e7ca..7c2355205e6f6 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/STSIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/STSIntegrationSuite.scala @@ -43,10 +43,9 @@ class STSIntegrationSuite extends IntegrationSuiteBase { } test("roundtrip save and load") { - val tableName = s"roundtrip_save_and_load$randomSuffix" val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))), StructType(StructField("a", IntegerType) :: Nil)) - try { + withTempRedshiftTable("roundtrip_save_and_load") { tableName => write(df) .option("dbtable", tableName) .option("forward_spark_s3_credentials", "false") @@ -67,9 +66,6 @@ class STSIntegrationSuite extends IntegrationSuiteBase { assert(loadedDf.schema.length === 1) assert(loadedDf.columns === Seq("a")) checkAnswer(loadedDf, Seq(Row(1))) - } finally { - conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() - conn.commit() } } } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/SaveModeIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/SaveModeIntegrationSuite.scala index a276e65ce8f5f..32e939c5fa225 100755 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/SaveModeIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/SaveModeIntegrationSuite.scala @@ -38,7 +38,7 @@ class SaveModeIntegrationSuite extends IntegrationSuiteBase { test("SaveMode.Overwrite with non-existent table") { testRoundtripSaveAndLoad( - s"overwrite_non_existent_table$randomSuffix", + s"overwrite_non_existent_table", sqlContext.createDataFrame(sc.parallelize(Seq(Row(1))), StructType(StructField("a", IntegerType) :: Nil)), saveMode = SaveMode.Overwrite) diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/SearchPathIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/SearchPathIntegrationSuite.scala index 021100fddf58f..6700ea72938f0 100644 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/SearchPathIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/SearchPathIntegrationSuite.scala @@ -20,15 +20,13 @@ class SearchPathIntegrationSuite extends IntegrationSuiteBase { override def beforeAll(): Unit = { super.beforeAll() - conn.prepareStatement(s"drop table if exists $testTable").executeUpdate() - conn.commit() + jdbcUpdate(s"drop table if exists $testTable") createTestDataInRedshift(testTable) } override def afterAll(): Unit = { try { - conn.prepareStatement(s"drop table if exists $testTable").executeUpdate() - conn.commit() + jdbcUpdate(s"drop table if exists $testTable") } finally { super.afterAll() } diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/pushdown/AdvancedPushdownIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/pushdown/AdvancedPushdownIntegrationSuite.scala index 0910643333780..e43aaa47c1af0 100644 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/pushdown/AdvancedPushdownIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/pushdown/AdvancedPushdownIntegrationSuite.scala @@ -70,7 +70,6 @@ class AdvancedPushdownIntegrationSuite extends IntegrationSuiteBase { |(null, 0), |(4, 3) """.stripMargin) - conn.commit() val df1 = read .option("dbtable", test_table1) diff --git a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/pushdown/FilterPushdownIntegrationSuite.scala b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/pushdown/FilterPushdownIntegrationSuite.scala index be2c3623b6d1c..2de457a5b423e 100644 --- a/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/pushdown/FilterPushdownIntegrationSuite.scala +++ b/external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/pushdown/FilterPushdownIntegrationSuite.scala @@ -34,7 +34,6 @@ class FilterPushdownIntegrationSuite extends IntegrationSuiteBase { jdbcUpdate(s"create table $test_table(i int, s varchar(256))") jdbcUpdate(s"""insert into $test_table |values(null, 'Hello'), (2, 'Redshift'), (3, 'Spark'), (4, null)""".stripMargin) - conn.commit() } test("Test Simple Comparisons") {