Skip to content

Commit

Permalink
Merge branch 'master' into graph-tables
Browse files Browse the repository at this point in the history
  • Loading branch information
luxu1-ms authored Oct 13, 2021
2 parents cb15588 + e328e20 commit 0015f40
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 76 deletions.
13 changes: 9 additions & 4 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@ on:

jobs:
build:

runs-on: ubuntu-latest
name: ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macOS-latest]
profile: ['spark31']
timeout-minutes: 15

steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Run tests
run: sbt test
- name: Build and run tests
run: mvn clean package -P ${{ matrix.profile }}
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,9 @@ project/project/
project/target/
target/
.idea

# VS Code
.vscode
.settings
.classpath
.project
20 changes: 12 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@ This library contains the source code for the Apache Spark Connector for SQL Ser

[Apache Spark](https://spark.apache.org/) is a unified analytics engine for large-scale data processing.

There are two versions of the connector available through Maven, a 2.4.5 compatible version and a 3.0.0 compatible version. Both versions can be found [here](https://search.maven.org/search?q=spark-mssql-connector) and can be imported using the coordinates below:
There are three version sets of the connector available through Maven, a 2.4.x, a 3.0.x and a 3.1.x compatible version. All versions can be found [here](https://search.maven.org/search?q=spark-mssql-connector) and can be imported using the coordinates below:

| Connector | Maven Coordinate |
| --------- | ------------------ |
| Spark 2.4.5 compatible connnector | `com.microsoft.azure:spark-mssql-connector:1.0.1` |
| Spark 3.0.0 compatible connnector | `com.microsoft.azure:spark-mssql-connector_2.12_3.0:1.0.0-alpha` |
| Connector | Maven Coordinate | Scala Version |
| --------- | ---------------- | ------------- |
| Spark 2.4.x compatible connnector | `com.microsoft.azure:spark-mssql-connector:1.0.2` | 2.11 |
| Spark 3.0.x compatible connnector | `com.microsoft.azure:spark-mssql-connector_2.12:1.1.0` | 2.12 |
| Spark 3.1.x compatible connnector | `com.microsoft.azure:spark-mssql-connector_2.12:1.2.0` | 2.12 |

## Current Releases

The Spark 2.4.5 compatible connector is on v1.0.1.
The Spark 3.0.0 compatible connector is on v1.0.0-alpha.
The latest Spark 2.4.x compatible connector is on v1.0.2.

The latest Spark 3.0.x compatible connector is on v1.1.0.

The latest Spark 3.1.x compatible connector is on v1.2.0.

For main changes from previous releases and known issues please refer to [CHANGELIST](docs/CHANGELIST.md)

Expand All @@ -33,7 +37,7 @@ For main changes from previous releases and known issues please refer to [CHANGE

| Component | Versions Supported |
| --------- | ------------------ |
| Apache Spark | 2.4.5, 3.0.0 |
| Apache Spark | 2.4.x, 3.0.x, 3.1.x |
| Scala | 2.11, 2.12 |
| Microsoft JDBC Driver for SQL Server | 8.4.1 |
| Microsoft SQL Server | SQL Server 2008 or later |
Expand Down
68 changes: 25 additions & 43 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>spark-mssql-connector</artifactId>
<packaging>jar</packaging>
<version>1.0.0</version>
<version>1.2.0</version>
<name>${project.groupId}:${project.artifactId}</name>
<description>The Apache Spark Connector for SQL Server and Azure SQL is a high-performance connector that enables you to use transactional data in big data analytics and persists results for ad-hoc queries or reporting.</description>
<url>https://github.com/microsoft/sql-spark-connector</url>
Expand Down Expand Up @@ -200,47 +200,29 @@
</plugins>
</build>
<profiles>
<profile>
<id>spark24</id>
<properties>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<spark.version>2.4.6</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>8.4.1.jre8</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark30</id>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.11</scala.version>
<spark.version>3.0.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>8.4.1.jre8</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark31</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.11</scala.version>
<spark.version>3.1.2</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.2.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>8.4.1.jre8</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {
* Phase 2 Driver combines all staging tables to transactionally write to user specified table.
* Driver does a cleanup of staging tables as a good practice. Staging tables are temporary tables
* and should be cleanup automatically on job completion. Staging table names are prefixed
* with appId to allow for identification.
* with appId and destination fully qualified table name to allow for identification.
* @param df dataframe to write
* @param dfColMetaData for the table
* @param options user specified options
Expand All @@ -50,7 +50,7 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {
logInfo("write : reliable write to single instance called")
// Initialize - create connection and cleanup existing tables if any
val conn = createConnectionFactory(options)()
val stagingTableList = getStagingTableNames(appId, df.rdd.getNumPartitions)
val stagingTableList = getStagingTableNames(appId, options.dbtable, df.rdd.getNumPartitions)
cleanupStagingTables(conn, stagingTableList, options)
createStagingTables(conn, stagingTableList,options)
// Phase1 - Executors write partitions to staging tables.
Expand Down Expand Up @@ -93,7 +93,7 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {
try {
df.rdd.mapPartitionsWithIndex(
(index, iterator) => {
val table_name = getStagingTableName(appId,index)
val table_name = getStagingTableName(appId,options.dbtable,index)
logDebug(s"writeToStagingTables: Writing partition index $index to Table $table_name")
val newOptions = new SQLServerBulkJdbcOptions(options.parameters + ("tableLock" -> "true"))
idempotentInsertToTable(iterator, table_name, dfColMetadata, newOptions)
Expand Down Expand Up @@ -157,28 +157,32 @@ object ReliableSingleInstanceStrategy extends DataIOStrategy with Logging {

/**
* utility function to get all global temp table names as a list.
* @param appId appId used as prefix of tablename
* @param nrOfPartitions number of paritions in dataframe used as suffix
* @param appId appId used as prefix of staging table name
* @param dbtable destination fully qualified table name used as part of temp staging table name
* @param nrOfPartitions number of partitions in dataframe used as suffix
*/
private def getStagingTableNames(
appId: String,
dbtable: String,
nrOfPartitions: Int): IndexedSeq[String] = {
val stagingTableList = for (index <- 0 until nrOfPartitions) yield {
getStagingTableName(appId, index)
getStagingTableName(appId, dbtable, index)
}
stagingTableList
}

/**
* utility function to create a staging table name
* @param appId appId used as prefix of tablename
* @param appId appId used as prefix of table name
* @param dbtable destination fully qualified table name used as part of temp staging table name
* @param index used as suffix
*/
private def getStagingTableName(
appId: String,
dbtable: String,
index:Int) : String = {
// Global table names in SQLServer are prefixed with ##
s"[##${appId}_${index}]"
s"[##${appId}_${dbtable}_${index}]"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ object BulkCopyUtils extends Logging {
df: DataFrame,
options: SQLServerBulkJdbcOptions): Unit = {
logDebug("Creating table")
val strSchema = schemaString(df, options.url, options.createTableColumnTypes)
val strSchema = schemaString(df.schema, true, options.url, options.createTableColumnTypes)
val createTableStr = s"CREATE TABLE ${options.dbtable} (${strSchema}) ${options.createTableOptions}"
executeUpdate(conn,createTableStr)
logDebug("Creating table succeeded")
Expand All @@ -525,7 +525,7 @@ object BulkCopyUtils extends Logging {
df: DataFrame,
options: SQLServerBulkJdbcOptions): Unit = {
logDebug(s"Creating external table ${options.dbtable}")
val strSchema = schemaString(df, "jdbc:sqlserver")
val strSchema = schemaString(df.schema, true, "jdbc:sqlserver")
val createExTableStr = s"CREATE EXTERNAL TABLE ${options.dbtable} (${strSchema}) " +
s"WITH (DATA_SOURCE=${options.dataPoolDataSource}, DISTRIBUTION=${options.dataPoolDistPolicy});"
executeUpdate(conn,createExTableStr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package com.microsoft.sqlserver.jdbc.spark
import java.sql.Connection

import org.scalatest.Matchers
import org.scalatest.matchers.should.Matchers
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.test.SharedSparkSession

Expand Down
15 changes: 5 additions & 10 deletions test/scala_test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,14 @@
</build>
<profiles>
<profile>
<id>spark24</id>
<properties>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<spark.version>2.4.6</spark.version>
</properties>
</profile>
<profile>
<id>spark30</id>
<id>spark31</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.11</scala.version>
<spark.version>3.0.0</spark.version>
<spark.version>3.1.2</spark.version>
</properties>
</profile>
</profiles>
Expand Down
31 changes: 31 additions & 0 deletions test/scala_test/src/main/scala/MasterInstanceTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
import org.apache.spark.sql.functions.asc
import org.apache.spark.sql.types._

import java.util.concurrent.Executors
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

/*
* MasterInstanceTest
* test cases for master instance. Most test can we used for Spark JDBC
Expand Down Expand Up @@ -536,4 +540,31 @@ class MasterInstanceTest(testUtils:Connector_TestUtils) {
log.info("test_gci_reordered_columns : Reordered Write overwrite without truncate")
testUtils.drop_test_table(table_name)
}

// Test basic functionalities of writing to different databases in parallel
def test_gci_write_parallel() {
//Allowing a maximum of 2 threads to run
val executorService = Executors.newFixedThreadPool(2)
implicit val executionContext = ExecutionContext.fromExecutorService(executorService)

val table_name1 = s"test_write_parallel_1_${testType }"
val table_name2 = s"test_write_parallel_2_${testType }"
val df = testUtils.create_toy_df()
val futureA = Future {
testUtils.df_write(df, SaveMode.Overwrite, table_name1)
}
val futureB = Future {
testUtils.df_write(df, SaveMode.Overwrite, table_name2)
}
Await.result(futureA, Duration.Inf)
Await.result(futureB, Duration.Inf)

var result1 = testUtils.df_read(table_name1)
assert(df.schema == result1.schema)
var result2 = testUtils.df_read(table_name2)
assert(df.schema == result2.schema)
log.info("test_write_parallel : Exit")
testUtils.drop_test_table(table_name1)
testUtils.drop_test_table(table_name2)
}
}

0 comments on commit 0015f40

Please sign in to comment.