From 52d96f7d71f3c6088e4a96f238cbe89082c436f6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Helge=20Br=C3=BCgner?=
<11437312+moredatapls@users.noreply.github.com>
Date: Fri, 17 Feb 2023 22:53:30 +0100
Subject: [PATCH] Upgrade to Spark 3.3.0 (#197)
* add jdbcutils class, upgrade to spark 3.3
* replace usages of createconnectionfactory with new createconnection
* added integration test for the jdbc connection
* refactored test dir
* added read write integration test
* bump package version
* add version to the readme
* add version to the other table in the readme
* fix typo in readme
* add newline
* removed integration tests and readme changes, update profile name
---
.github/workflows/scala.yml | 2 +-
README.md | 25 ++++++++++---------
pom.xml | 6 ++---
.../sqlserver/jdbc/spark/DefaultSource.scala | 11 +++-----
.../ReliableSingleInstanceStrategy.scala | 12 ++++-----
.../jdbc/spark/utils/BulkCopyUtils.scala | 16 ++++++------
.../jdbc/spark/utils/DataPoolUtils.scala | 10 ++++----
.../jdbc/spark/utils/JdbcUtils.scala | 19 ++++++++++++++
.../{ => unit}/bulkwrite/DataSourceTest.scala | 8 +++---
test/scala_test/pom.xml | 4 +--
10 files changed, 66 insertions(+), 47 deletions(-)
create mode 100644 src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/JdbcUtils.scala
rename src/test/scala/com/microsoft/sqlserver/jdbc/spark/{ => unit}/bulkwrite/DataSourceTest.scala (97%)
diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml
index 3fc4d7ca..ebeca400 100644
--- a/.github/workflows/scala.yml
+++ b/.github/workflows/scala.yml
@@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macOS-latest]
- profile: ['spark31']
+ profile: ['spark33']
timeout-minutes: 15
steps:
diff --git a/README.md b/README.md
index f4f8d83a..d5496f34 100644
--- a/README.md
+++ b/README.md
@@ -11,11 +11,12 @@ This library contains the source code for the Apache Spark Connector for SQL Ser
There are three version sets of the connector available through Maven, a 2.4.x, a 3.0.x and a 3.1.x compatible version. All versions can be found [here](https://search.maven.org/search?q=spark-mssql-connector) and can be imported using the coordinates below:
-| Connector | Maven Coordinate | Scala Version |
-| --------- | ---------------- | ------------- |
-| Spark 2.4.x compatible connnector | `com.microsoft.azure:spark-mssql-connector:1.0.2` | 2.11 |
-| Spark 3.0.x compatible connnector | `com.microsoft.azure:spark-mssql-connector_2.12:1.1.0` | 2.12 |
-| Spark 3.1.x compatible connnector | `com.microsoft.azure:spark-mssql-connector_2.12:1.2.0` | 2.12 |
+| Connector | Maven Coordinate | Scala Version |
+|----------------------------------|--------------------------------------------------------|---------------|
+| Spark 2.4.x compatible connector | `com.microsoft.azure:spark-mssql-connector:1.0.2` | 2.11 |
+| Spark 3.0.x compatible connector | `com.microsoft.azure:spark-mssql-connector_2.12:1.1.0` | 2.12 |
+| Spark 3.1.x compatible connector | `com.microsoft.azure:spark-mssql-connector_2.12:1.2.0` | 2.12 |
+| Spark 3.3.x compatible connector | `com.microsoft.azure:spark-mssql-connector_2.12:1.3.0` | 2.12 |
## Current Releases
@@ -35,13 +36,13 @@ For main changes from previous releases and known issues please refer to [CHANGE
* Reliable connector support for Sql Server Single Instance
-| Component | Versions Supported |
-| --------- | ------------------ |
-| Apache Spark | 2.4.x, 3.0.x, 3.1.x |
-| Scala | 2.11, 2.12 |
-| Microsoft JDBC Driver for SQL Server | 8.4.1 |
-| Microsoft SQL Server | SQL Server 2008 or later |
-| Azure SQL Databases | Supported |
+| Component | Versions Supported |
+|--------------------------------------|----------------------------|
+| Apache Spark | 2.4.x, 3.0.x, 3.1.x, 3.3.x |
+| Scala | 2.11, 2.12 |
+| Microsoft JDBC Driver for SQL Server | 8.4.1 |
+| Microsoft SQL Server | SQL Server 2008 or later |
+| Azure SQL Databases | Supported |
*Note: Azure Synapse (Azure SQL DW) use is not tested with this connector. While it may work, there may be unintended consequences.*
diff --git a/pom.xml b/pom.xml
index 4611f55d..cfc55dee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.microsoft.azure
spark-mssql-connector
jar
- 1.2.0
+ 1.3.0
${project.groupId}:${project.artifactId}
The Apache Spark Connector for SQL Server and Azure SQL is a high-performance connector that enables you to use transactional data in big data analytics and persists results for ad-hoc queries or reporting.
https://github.com/microsoft/sql-spark-connector
@@ -201,14 +201,14 @@
- spark31
+ spark33
true
2.12
2.12.11
- 3.1.2
+ 3.3.0
diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/DefaultSource.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/DefaultSource.scala
index ed29d1a8..ff3236ea 100644
--- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/DefaultSource.scala
+++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/DefaultSource.scala
@@ -13,15 +13,12 @@
*/
package com.microsoft.sqlserver.jdbc.spark
-import java.sql.{Connection, ResultSet, SQLException}
-
+import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils._
+import com.microsoft.sqlserver.jdbc.spark.utils.JdbcUtils.createConnection
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{SQLContext, DataFrame, SaveMode}
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.createConnectionFactory
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.sources.BaseRelation
-
-import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils._
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
/**
* DefaultSource extends JDBCRelationProvider to provide a implmentation for MSSQLSpark connector.
@@ -56,7 +53,7 @@ class DefaultSource extends JdbcRelationProvider with Logging {
// if no user input app name provided, will use SparkMSSQLConnector:NotSpecified
val applicationName = s"SparkMSSQLConnector:${parameters.getOrElse("applicationname", "NotSpecified")}"
val options = new SQLServerBulkJdbcOptions(parameters + ("applicationname" -> applicationName))
- val conn = createConnectionFactory(options)()
+ val conn = createConnection(options)
val df = repartitionDataFrame(rawDf, options)
logInfo(s"JDBC Driver major/mior version " +
diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/connectors/ReliableSingleInstanceStrategy.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/connectors/ReliableSingleInstanceStrategy.scala
index b4cb4e5b..fd20c5a6 100644
--- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/connectors/ReliableSingleInstanceStrategy.scala
+++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/connectors/ReliableSingleInstanceStrategy.scala
@@ -13,14 +13,14 @@
*/
package com.microsoft.sqlserver.jdbc.spark
-import java.sql.{Connection, ResultSetMetaData, SQLException}
-
import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils.{executeUpdate, savePartition}
+import com.microsoft.sqlserver.jdbc.spark.utils.JdbcUtils.createConnection
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.createConnectionFactory
import org.apache.spark.sql.{DataFrame, Row}
+import java.sql.{Connection, SQLException}
+
/**
* Implements the Reliable write strategy for Single Instances that's that's resilient to executor restart.
* Write is implemented as a 2 phase commit and executor restarts do not result in duplicate inserts.
@@ -49,7 +49,7 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {
appId: String): Unit = {
logInfo("write : reliable write to single instance called")
// Initialize - create connection and cleanup existing tables if any
- val conn = createConnectionFactory(options)()
+ val conn = createConnection(options)
val stagingTableList = getStagingTableNames(appId, options.dbtable, df.rdd.getNumPartitions)
cleanupStagingTables(conn, stagingTableList, options)
createStagingTables(conn, stagingTableList,options)
@@ -125,7 +125,7 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {
dfColMetaData: Array[ColumnMetadata],
options: SQLServerBulkJdbcOptions): Unit = {
logDebug(s"idempotentInsertToTable : Started")
- val conn = createConnectionFactory(options)()
+ val conn = createConnection(options)
try {
BulkCopyUtils.mssqlTruncateTable(conn, tableName)
} catch {
@@ -151,7 +151,7 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {
options: SQLServerBulkJdbcOptions): Unit = {
logInfo("unionStagingTables: insert to final table")
val insertStmt = stmtInsertWithUnion(stagingTableList, dfColMetadata, options)
- val conn = createConnectionFactory(options)()
+ val conn = createConnection(options)
executeUpdate(conn,insertStmt)
}
diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala
index 8b06e8e6..26a444ad 100644
--- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala
+++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala
@@ -13,15 +13,15 @@
*/
package com.microsoft.sqlserver.jdbc.spark
-import java.sql.{Connection, ResultSet, ResultSetMetaData, SQLException}
-
+import com.microsoft.sqlserver.jdbc.spark.utils.JdbcUtils.createConnection
+import com.microsoft.sqlserver.jdbc.{SQLServerBulkCopy, SQLServerBulkCopyOptions}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.apache.spark.sql.types.{ByteType, DataType, ShortType, StructType}
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{getSchema, schemaString}
import org.apache.spark.sql.jdbc.JdbcDialects
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{createConnectionFactory, getSchema, schemaString}
-import com.microsoft.sqlserver.jdbc.{SQLServerBulkCopy, SQLServerBulkCopyOptions}
+import org.apache.spark.sql.types.{ByteType, ShortType}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import java.sql.{Connection, ResultSet, ResultSetMetaData, SQLException}
import scala.collection.mutable.ListBuffer
/**
@@ -35,7 +35,7 @@ object BulkCopyUtils extends Logging {
* a connection, sets connection properties and does a BulkWrite. Called when writing data to
* master instance and data pools both. URL in options is used to create the relevant connection.
*
- * @param itertor - iterator for row of the partition.
+ * @param iterator - iterator for row of the partition.
* @param dfColMetadata - array of ColumnMetadata type
* @param options - SQLServerBulkJdbcOptions with url for the connection
*/
@@ -47,7 +47,7 @@ object BulkCopyUtils extends Logging {
options: SQLServerBulkJdbcOptions ): Unit = {
logDebug("savePartition:Entered")
- val conn = createConnectionFactory(options)()
+ val conn = createConnection(options)
conn.setAutoCommit(false)
conn.setTransactionIsolation(options.isolationLevel)
var committed = false
diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/DataPoolUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/DataPoolUtils.scala
index e67a3988..91f8df3f 100644
--- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/DataPoolUtils.scala
+++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/DataPoolUtils.scala
@@ -13,19 +13,19 @@
*/
package com.microsoft.sqlserver.jdbc.spark
+import com.microsoft.sqlserver.jdbc.spark.utils.JdbcUtils.createConnection
+
import java.net.{InetAddress, UnknownHostException}
import java.nio.file.{Files, Paths}
-
import org.apache.spark.deploy.history.LogInfo
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.createConnectionFactory
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.annotation.tailrec
import scala.io.Source
-/**
+/**
* DataPoolUtils
*
*/
@@ -38,8 +38,8 @@ object DataPoolUtils extends Logging {
def getDataPoolNodeList(options:SQLServerBulkJdbcOptions): List[String] = {
logInfo(s"Searching DMV for data pools \n")
import scala.collection.mutable.ListBuffer
- val stmt = createConnectionFactory(options)().createStatement()
- var nodeList = new ListBuffer[String]()
+ val stmt = createConnection(options).createStatement()
+ val nodeList = new ListBuffer[String]()
val query = s"select address from sys.dm_db_data_pool_nodes"
try {
val rs = stmt.executeQuery(query)
diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/JdbcUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/JdbcUtils.scala
new file mode 100644
index 00000000..474ec873
--- /dev/null
+++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/JdbcUtils.scala
@@ -0,0 +1,19 @@
+package com.microsoft.sqlserver.jdbc.spark.utils
+
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
+import org.apache.spark.sql.jdbc.JdbcDialects
+
+import java.sql.Connection
+
+object JdbcUtils {
+ /**
+ * Creates a JDBC connection using the input JDBC options.
+ * @param options The options which are used to create the connection.
+ * @return A JDBC connection.
+ */
+ def createConnection(options: JDBCOptions): Connection = {
+ val dialect = JdbcDialects.get(options.url)
+ val conn = dialect.createConnectionFactory(options)(-1)
+ conn
+ }
+}
diff --git a/src/test/scala/com/microsoft/sqlserver/jdbc/spark/bulkwrite/DataSourceTest.scala b/src/test/scala/com/microsoft/sqlserver/jdbc/spark/unit/bulkwrite/DataSourceTest.scala
similarity index 97%
rename from src/test/scala/com/microsoft/sqlserver/jdbc/spark/bulkwrite/DataSourceTest.scala
rename to src/test/scala/com/microsoft/sqlserver/jdbc/spark/unit/bulkwrite/DataSourceTest.scala
index 9445ffa5..b5e033d4 100644
--- a/src/test/scala/com/microsoft/sqlserver/jdbc/spark/bulkwrite/DataSourceTest.scala
+++ b/src/test/scala/com/microsoft/sqlserver/jdbc/spark/unit/bulkwrite/DataSourceTest.scala
@@ -11,12 +11,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.microsoft.sqlserver.jdbc.spark
-import java.sql.Connection
+package com.microsoft.sqlserver.jdbc.spark.unit.bulkwrite
-import org.scalatest.matchers.should.Matchers
+import com.microsoft.sqlserver.jdbc.spark.{BulkCopyUtils, DataPoolUtils, SQLServerBulkJdbcOptions}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.test.SharedSparkSession
+import org.scalatest.matchers.should.Matchers
+
+import java.sql.Connection
class DataSourceTest extends SparkFunSuite with Matchers with SharedSparkSession {
diff --git a/test/scala_test/pom.xml b/test/scala_test/pom.xml
index 0e0ddcd4..53743ccd 100644
--- a/test/scala_test/pom.xml
+++ b/test/scala_test/pom.xml
@@ -123,14 +123,14 @@
- spark31
+ spark33
true
2.12
2.12.11
- 3.1.2
+ 3.3.0