From 2a1506ec910fb2bc0f8af84707a1f36b3fc0bbc1 Mon Sep 17 00:00:00 2001 From: luxu1 Date: Thu, 9 Dec 2021 12:21:03 -0800 Subject: [PATCH] leave out auto col and use user providec col mapping --- .../jdbc/spark/utils/BulkCopyUtils.scala | 65 +++++-------------- 1 file changed, 16 insertions(+), 49 deletions(-) diff --git a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala index 45ed3cf..ffdcb0d 100644 --- a/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala +++ b/src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala @@ -22,8 +22,6 @@ import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{createConnectionFactory, getSchema, schemaString} import com.microsoft.sqlserver.jdbc.{SQLServerBulkCopy, SQLServerBulkCopyOptions} -import scala.collection.mutable.ListBuffer - /** * BulkCopyUtils Object implements common utility function used by both datapool and */ @@ -35,7 +33,7 @@ object BulkCopyUtils extends Logging { * a connection, sets connection properties and does a BulkWrite. Called when writing data to * master instance and data pools both. URL in options is used to create the relevant connection. * - * @param itertor - iterator for row of the partition. + * @param iterator - iterator for row of the partition. * @param dfColMetadata - array of ColumnMetadata type * @param options - SQLServerBulkJdbcOptions with url for the connection */ @@ -179,32 +177,6 @@ object BulkCopyUtils extends Logging { conn.createStatement.executeQuery(queryStr) } - /** - * getAutoCols - * utility function to get auto generated columns. - * Use auto generated column names to exclude them when matching schema. - */ - private[spark] def getAutoCols( - conn: Connection, - table: String): List[String] = { - // auto cols union computed cols, generated always cols, and node / edge table auto cols - val queryStr = s"""SELECT name - FROM sys.columns - WHERE object_id = OBJECT_ID('${table}') - AND (is_computed = 1 -- computed column - OR generated_always_type > 0 -- generated always / temporal table - OR (is_hidden = 0 AND graph_type = 2)) -- graph table - """ - - val autoColRs = conn.createStatement.executeQuery(queryStr) - val autoCols = ListBuffer[String]() - while (autoColRs.next()) { - val colName = autoColRs.getString("name") - autoCols.append(colName) - } - autoCols.toList - } - /** * getColMetadataMap * Utility function convert result set meta data to array. @@ -290,35 +262,30 @@ object BulkCopyUtils extends Logging { val dfCols = df.schema val tableCols = getSchema(rs, JdbcDialects.get(url)) - val autoCols = getAutoCols(conn, dbtable) - - val columnsToWriteSet = columnsToWrite.split(",").toSet - logDebug(s"columnsToWrite: $columnsToWriteSet") val prefix = "Spark Dataframe and SQL Server table have differing" - // auto columns should not exist in df - assertIfCheckEnabled(dfCols.length + autoCols.length == tableCols.length, strictSchemaCheck, - s"${prefix} numbers of columns") - // if columnsToWrite provided by user, use it for metadata mapping. If not, use sql table. - if (columnsToWrite == "") { - val result = new Array[ColumnMetadata](columnsToWriteSet.size) + var metadataLen = tableCols.length + var columnsToWriteSet: Set[String] = Set() + if (columnsToWrite.isEmpty) { + assertIfCheckEnabled(dfCols.length == tableCols.length, strictSchemaCheck, + s"${prefix} numbers of columns") } else { - val result = new Array[ColumnMetadata](tableCols.length - autoCols.length) + columnsToWriteSet = columnsToWrite.split(",").map(_.trim).toSet + logDebug(s"columnsToWrite: $columnsToWriteSet") + metadataLen = columnsToWriteSet.size } - var nonAutoColIndex = 0 + var colMappingIndex = 0 + val result = new Array[ColumnMetadata](metadataLen) for (i <- 0 to tableCols.length-1) { val tableColName = tableCols(i).name var dfFieldIndex = -1 - if (!columnsToWriteSet.isEmpty && !columnsToWriteSet.contains(tableColName)) { - // if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata + // if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata + if (!columnsToWrite.isEmpty && !columnsToWriteSet.contains(tableColName)) { logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list") - } else if (autoCols.contains(tableColName)) { - // if auto columns, skip column mapping and ColumnMetadata - logDebug(s"skipping auto generated col index $i col name $tableColName dfFieldIndex $dfFieldIndex") }else{ var dfColName:String = "" if (isCaseSensitive) { @@ -361,15 +328,15 @@ object BulkCopyUtils extends Logging { s" DF col ${dfColName} nullable config is ${dfCols(dfFieldIndex).nullable} " + s" Table col ${tableColName} nullable config is ${tableCols(i).nullable}") - // Schema check passed for element, Create ColMetaData only for non auto generated column - result(nonAutoColIndex) = new ColumnMetadata( + // Schema check passed for element, Create ColMetaData for columns + result(colMappingIndex) = new ColumnMetadata( rs.getMetaData().getColumnName(i+1), rs.getMetaData().getColumnType(i+1), rs.getMetaData().getPrecision(i+1), rs.getMetaData().getScale(i+1), dfFieldIndex ) - nonAutoColIndex += 1 + colMappingIndex += 1 } } result