Skip to content

Commit

Permalink
Upgrade to Spark 3.3.0 (#197)
Browse files Browse the repository at this point in the history
* add jdbcutils class, upgrade to spark 3.3

* replace usages of createconnectionfactory with new createconnection

* added integration test for the jdbc connection

* refactored test dir

* added read write integration test

* bump package version

* add version to the readme

* add version to the other table in the readme

* fix typo in readme

* add newline

* removed integration tests and readme changes, update profile name
  • Loading branch information
moredatapls authored Feb 17, 2023
1 parent 9a07172 commit 52d96f7
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macOS-latest]
profile: ['spark31']
profile: ['spark33']
timeout-minutes: 15

steps:
Expand Down
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ This library contains the source code for the Apache Spark Connector for SQL Ser

There are three version sets of the connector available through Maven, a 2.4.x, a 3.0.x and a 3.1.x compatible version. All versions can be found [here](https://search.maven.org/search?q=spark-mssql-connector) and can be imported using the coordinates below:

| Connector | Maven Coordinate | Scala Version |
| --------- | ---------------- | ------------- |
| Spark 2.4.x compatible connnector | `com.microsoft.azure:spark-mssql-connector:1.0.2` | 2.11 |
| Spark 3.0.x compatible connnector | `com.microsoft.azure:spark-mssql-connector_2.12:1.1.0` | 2.12 |
| Spark 3.1.x compatible connnector | `com.microsoft.azure:spark-mssql-connector_2.12:1.2.0` | 2.12 |
| Connector | Maven Coordinate | Scala Version |
|----------------------------------|--------------------------------------------------------|---------------|
| Spark 2.4.x compatible connector | `com.microsoft.azure:spark-mssql-connector:1.0.2` | 2.11 |
| Spark 3.0.x compatible connector | `com.microsoft.azure:spark-mssql-connector_2.12:1.1.0` | 2.12 |
| Spark 3.1.x compatible connector | `com.microsoft.azure:spark-mssql-connector_2.12:1.2.0` | 2.12 |
| Spark 3.3.x compatible connector | `com.microsoft.azure:spark-mssql-connector_2.12:1.3.0` | 2.12 |

## Current Releases

Expand All @@ -35,13 +36,13 @@ For main changes from previous releases and known issues please refer to [CHANGE
* Reliable connector support for Sql Server Single Instance


| Component | Versions Supported |
| --------- | ------------------ |
| Apache Spark | 2.4.x, 3.0.x, 3.1.x |
| Scala | 2.11, 2.12 |
| Microsoft JDBC Driver for SQL Server | 8.4.1 |
| Microsoft SQL Server | SQL Server 2008 or later |
| Azure SQL Databases | Supported |
| Component | Versions Supported |
|--------------------------------------|----------------------------|
| Apache Spark | 2.4.x, 3.0.x, 3.1.x, 3.3.x |
| Scala | 2.11, 2.12 |
| Microsoft JDBC Driver for SQL Server | 8.4.1 |
| Microsoft SQL Server | SQL Server 2008 or later |
| Azure SQL Databases | Supported |

*Note: Azure Synapse (Azure SQL DW) use is not tested with this connector. While it may work, there may be unintended consequences.*

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>spark-mssql-connector</artifactId>
<packaging>jar</packaging>
<version>1.2.0</version>
<version>1.3.0</version>
<name>${project.groupId}:${project.artifactId}</name>
<description>The Apache Spark Connector for SQL Server and Azure SQL is a high-performance connector that enables you to use transactional data in big data analytics and persists results for ad-hoc queries or reporting.</description>
<url>https://github.com/microsoft/sql-spark-connector</url>
Expand Down Expand Up @@ -201,14 +201,14 @@
</build>
<profiles>
<profile>
<id>spark31</id>
<id>spark33</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.11</scala.version>
<spark.version>3.1.2</spark.version>
<spark.version>3.3.0</spark.version>
</properties>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@
*/
package com.microsoft.sqlserver.jdbc.spark

import java.sql.{Connection, ResultSet, SQLException}

import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils._
import com.microsoft.sqlserver.jdbc.spark.utils.JdbcUtils.createConnection
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SQLContext, DataFrame, SaveMode}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.createConnectionFactory
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.sources.BaseRelation

import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils._
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

/**
* DefaultSource extends JDBCRelationProvider to provide a implmentation for MSSQLSpark connector.
Expand Down Expand Up @@ -56,7 +53,7 @@ class DefaultSource extends JdbcRelationProvider with Logging {
// if no user input app name provided, will use SparkMSSQLConnector:NotSpecified
val applicationName = s"SparkMSSQLConnector:${parameters.getOrElse("applicationname", "NotSpecified")}"
val options = new SQLServerBulkJdbcOptions(parameters + ("applicationname" -> applicationName))
val conn = createConnectionFactory(options)()
val conn = createConnection(options)
val df = repartitionDataFrame(rawDf, options)

logInfo(s"JDBC Driver major/mior version " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
*/
package com.microsoft.sqlserver.jdbc.spark

import java.sql.{Connection, ResultSetMetaData, SQLException}

import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils.{executeUpdate, savePartition}
import com.microsoft.sqlserver.jdbc.spark.utils.JdbcUtils.createConnection
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.createConnectionFactory
import org.apache.spark.sql.{DataFrame, Row}

import java.sql.{Connection, SQLException}

/**
* Implements the Reliable write strategy for Single Instances that's that's resilient to executor restart.
* Write is implemented as a 2 phase commit and executor restarts do not result in duplicate inserts.
Expand Down Expand Up @@ -49,7 +49,7 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {
appId: String): Unit = {
logInfo("write : reliable write to single instance called")
// Initialize - create connection and cleanup existing tables if any
val conn = createConnectionFactory(options)()
val conn = createConnection(options)
val stagingTableList = getStagingTableNames(appId, options.dbtable, df.rdd.getNumPartitions)
cleanupStagingTables(conn, stagingTableList, options)
createStagingTables(conn, stagingTableList,options)
Expand Down Expand Up @@ -125,7 +125,7 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {
dfColMetaData: Array[ColumnMetadata],
options: SQLServerBulkJdbcOptions): Unit = {
logDebug(s"idempotentInsertToTable : Started")
val conn = createConnectionFactory(options)()
val conn = createConnection(options)
try {
BulkCopyUtils.mssqlTruncateTable(conn, tableName)
} catch {
Expand All @@ -151,7 +151,7 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {
options: SQLServerBulkJdbcOptions): Unit = {
logInfo("unionStagingTables: insert to final table")
val insertStmt = stmtInsertWithUnion(stagingTableList, dfColMetadata, options)
val conn = createConnectionFactory(options)()
val conn = createConnection(options)
executeUpdate(conn,insertStmt)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
*/
package com.microsoft.sqlserver.jdbc.spark

import java.sql.{Connection, ResultSet, ResultSetMetaData, SQLException}

import com.microsoft.sqlserver.jdbc.spark.utils.JdbcUtils.createConnection
import com.microsoft.sqlserver.jdbc.{SQLServerBulkCopy, SQLServerBulkCopyOptions}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types.{ByteType, DataType, ShortType, StructType}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{getSchema, schemaString}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{createConnectionFactory, getSchema, schemaString}
import com.microsoft.sqlserver.jdbc.{SQLServerBulkCopy, SQLServerBulkCopyOptions}
import org.apache.spark.sql.types.{ByteType, ShortType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

import java.sql.{Connection, ResultSet, ResultSetMetaData, SQLException}
import scala.collection.mutable.ListBuffer

/**
Expand All @@ -35,7 +35,7 @@ object BulkCopyUtils extends Logging {
* a connection, sets connection properties and does a BulkWrite. Called when writing data to
* master instance and data pools both. URL in options is used to create the relevant connection.
*
* @param itertor - iterator for row of the partition.
* @param iterator - iterator for row of the partition.
* @param dfColMetadata - array of ColumnMetadata type
* @param options - SQLServerBulkJdbcOptions with url for the connection
*/
Expand All @@ -47,7 +47,7 @@ object BulkCopyUtils extends Logging {
options: SQLServerBulkJdbcOptions ): Unit = {

logDebug("savePartition:Entered")
val conn = createConnectionFactory(options)()
val conn = createConnection(options)
conn.setAutoCommit(false)
conn.setTransactionIsolation(options.isolationLevel)
var committed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
*/
package com.microsoft.sqlserver.jdbc.spark

import com.microsoft.sqlserver.jdbc.spark.utils.JdbcUtils.createConnection

import java.net.{InetAddress, UnknownHostException}
import java.nio.file.{Files, Paths}

import org.apache.spark.deploy.history.LogInfo
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.createConnectionFactory
import org.json4s._
import org.json4s.jackson.JsonMethods._

import scala.annotation.tailrec
import scala.io.Source

/**
/**
* DataPoolUtils
*
*/
Expand All @@ -38,8 +38,8 @@ object DataPoolUtils extends Logging {
def getDataPoolNodeList(options:SQLServerBulkJdbcOptions): List[String] = {
logInfo(s"Searching DMV for data pools \n")
import scala.collection.mutable.ListBuffer
val stmt = createConnectionFactory(options)().createStatement()
var nodeList = new ListBuffer[String]()
val stmt = createConnection(options).createStatement()
val nodeList = new ListBuffer[String]()
val query = s"select address from sys.dm_db_data_pool_nodes"
try {
val rs = stmt.executeQuery(query)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.microsoft.sqlserver.jdbc.spark.utils

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.jdbc.JdbcDialects

import java.sql.Connection

object JdbcUtils {
/**
* Creates a JDBC connection using the input JDBC options.
* @param options The options which are used to create the connection.
* @return A JDBC connection.
*/
def createConnection(options: JDBCOptions): Connection = {
val dialect = JdbcDialects.get(options.url)
val conn = dialect.createConnectionFactory(options)(-1)
conn
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.sqlserver.jdbc.spark
import java.sql.Connection
package com.microsoft.sqlserver.jdbc.spark.unit.bulkwrite

import org.scalatest.matchers.should.Matchers
import com.microsoft.sqlserver.jdbc.spark.{BulkCopyUtils, DataPoolUtils, SQLServerBulkJdbcOptions}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.test.SharedSparkSession
import org.scalatest.matchers.should.Matchers

import java.sql.Connection

class DataSourceTest extends SparkFunSuite with Matchers with SharedSparkSession {

Expand Down
4 changes: 2 additions & 2 deletions test/scala_test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@
</build>
<profiles>
<profile>
<id>spark31</id>
<id>spark33</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.11</scala.version>
<spark.version>3.1.2</spark.version>
<spark.version>3.3.0</spark.version>
</properties>
</profile>
</profiles>
Expand Down

0 comments on commit 52d96f7

Please sign in to comment.