Skip to content

Commit

Permalink
leave out auto col and use user providec col mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
luxu1-ms committed Dec 9, 2021
1 parent 6d9e49f commit 2a1506e
Showing 1 changed file with 16 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{createConnectionFactory, getSchema, schemaString}
import com.microsoft.sqlserver.jdbc.{SQLServerBulkCopy, SQLServerBulkCopyOptions}

import scala.collection.mutable.ListBuffer

/**
* BulkCopyUtils Object implements common utility function used by both datapool and
*/
Expand All @@ -35,7 +33,7 @@ object BulkCopyUtils extends Logging {
* a connection, sets connection properties and does a BulkWrite. Called when writing data to
* master instance and data pools both. URL in options is used to create the relevant connection.
*
* @param itertor - iterator for row of the partition.
* @param iterator - iterator for row of the partition.
* @param dfColMetadata - array of ColumnMetadata type
* @param options - SQLServerBulkJdbcOptions with url for the connection
*/
Expand Down Expand Up @@ -179,32 +177,6 @@ object BulkCopyUtils extends Logging {
conn.createStatement.executeQuery(queryStr)
}

/**
* getAutoCols
* utility function to get auto generated columns.
* Use auto generated column names to exclude them when matching schema.
*/
private[spark] def getAutoCols(
conn: Connection,
table: String): List[String] = {
// auto cols union computed cols, generated always cols, and node / edge table auto cols
val queryStr = s"""SELECT name
FROM sys.columns
WHERE object_id = OBJECT_ID('${table}')
AND (is_computed = 1 -- computed column
OR generated_always_type > 0 -- generated always / temporal table
OR (is_hidden = 0 AND graph_type = 2)) -- graph table
"""

val autoColRs = conn.createStatement.executeQuery(queryStr)
val autoCols = ListBuffer[String]()
while (autoColRs.next()) {
val colName = autoColRs.getString("name")
autoCols.append(colName)
}
autoCols.toList
}

/**
* getColMetadataMap
* Utility function convert result set meta data to array.
Expand Down Expand Up @@ -290,35 +262,30 @@ object BulkCopyUtils extends Logging {
val dfCols = df.schema

val tableCols = getSchema(rs, JdbcDialects.get(url))
val autoCols = getAutoCols(conn, dbtable)

val columnsToWriteSet = columnsToWrite.split(",").toSet
logDebug(s"columnsToWrite: $columnsToWriteSet")

val prefix = "Spark Dataframe and SQL Server table have differing"

// auto columns should not exist in df
assertIfCheckEnabled(dfCols.length + autoCols.length == tableCols.length, strictSchemaCheck,
s"${prefix} numbers of columns")

// if columnsToWrite provided by user, use it for metadata mapping. If not, use sql table.
if (columnsToWrite == "") {
val result = new Array[ColumnMetadata](columnsToWriteSet.size)
var metadataLen = tableCols.length
var columnsToWriteSet: Set[String] = Set()
if (columnsToWrite.isEmpty) {
assertIfCheckEnabled(dfCols.length == tableCols.length, strictSchemaCheck,
s"${prefix} numbers of columns")
} else {
val result = new Array[ColumnMetadata](tableCols.length - autoCols.length)
columnsToWriteSet = columnsToWrite.split(",").map(_.trim).toSet
logDebug(s"columnsToWrite: $columnsToWriteSet")
metadataLen = columnsToWriteSet.size
}

var nonAutoColIndex = 0
var colMappingIndex = 0
val result = new Array[ColumnMetadata](metadataLen)

for (i <- 0 to tableCols.length-1) {
val tableColName = tableCols(i).name
var dfFieldIndex = -1
if (!columnsToWriteSet.isEmpty && !columnsToWriteSet.contains(tableColName)) {
// if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata
// if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata
if (!columnsToWrite.isEmpty && !columnsToWriteSet.contains(tableColName)) {
logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list")
} else if (autoCols.contains(tableColName)) {
// if auto columns, skip column mapping and ColumnMetadata
logDebug(s"skipping auto generated col index $i col name $tableColName dfFieldIndex $dfFieldIndex")
}else{
var dfColName:String = ""
if (isCaseSensitive) {
Expand Down Expand Up @@ -361,15 +328,15 @@ object BulkCopyUtils extends Logging {
s" DF col ${dfColName} nullable config is ${dfCols(dfFieldIndex).nullable} " +
s" Table col ${tableColName} nullable config is ${tableCols(i).nullable}")

// Schema check passed for element, Create ColMetaData only for non auto generated column
result(nonAutoColIndex) = new ColumnMetadata(
// Schema check passed for element, Create ColMetaData for columns
result(colMappingIndex) = new ColumnMetadata(
rs.getMetaData().getColumnName(i+1),
rs.getMetaData().getColumnType(i+1),
rs.getMetaData().getPrecision(i+1),
rs.getMetaData().getScale(i+1),
dfFieldIndex
)
nonAutoColIndex += 1
colMappingIndex += 1
}
}
result
Expand Down

0 comments on commit 2a1506e

Please sign in to comment.