Skip to content

Commit

Permalink
[SPARK-44262][SQL] Add dropTable and getInsertStatement to JdbcDi…
Browse files Browse the repository at this point in the history
…alect

### What changes were proposed in this pull request?
1. This PR add `dropTable` function to `JdbcDialect`. So user can override dropTable SQL by other JdbcDialect like Neo4J
Neo4J Drop case
```sql
MATCH (m:Person {name: 'Mark'})
DELETE m
```
2. Also add `getInsertStatement` for same reason.
Neo4J Insert case
```sql
MATCH (p:Person {name: 'Jennifer'})
SET p.birthdate = date('1980-01-01')
RETURN p
```
Neo4J SQL(in fact named `CQL`) not like normal SQL, but it have JDBC driver.

### Why are the changes needed?
Make JdbcDialect more useful

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exist test

Closes #41855 from Hisoka-X/SPARK-44262_JDBCUtils_improve.

Authored-by: Jia Fan <fanjiaeminem@qq.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
Hisoka-X authored and MaxGekk committed Oct 16, 2023
1 parent 9bdad31 commit 6994bad
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
* Drops a table from the JDBC database.
*/
def dropTable(conn: Connection, table: String, options: JDBCOptions): Unit = {
executeStatement(conn, options, s"DROP TABLE $table")
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options, dialect.dropTable(table))
}

/**
Expand Down Expand Up @@ -114,22 +115,19 @@ object JdbcUtils extends Logging with SQLConfHelper {
isCaseSensitive: Boolean,
dialect: JdbcDialect): String = {
val columns = if (tableSchema.isEmpty) {
rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
rddSchema.fields
} else {
// The generated insert statement needs to follow rddSchema's column sequence and
// tableSchema's column names. When appending data into some case-sensitive DBMSs like
// PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
// RDD column names for user convenience.
val tableColumnNames = tableSchema.get.fieldNames
rddSchema.fields.map { col =>
val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
tableSchema.get.find(f => conf.resolver(f.name, col.name)).getOrElse {
throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
}
dialect.quoteIdentifier(normalizedName)
}.mkString(",")
}
}
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
s"INSERT INTO $table ($columns) VALUES ($placeholders)"
dialect.insertIntoTable(table, columns)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,24 @@ abstract class JdbcDialect extends Serializable with Logging {
statement.executeUpdate(s"CREATE TABLE $tableName ($strSchema) $createTableOptions")
}

/**
* Returns an Insert SQL statement template for inserting a row into the target table via JDBC
* conn. Use "?" as placeholder for each value to be inserted.
* E.g. `INSERT INTO t ("name", "age", "gender") VALUES (?, ?, ?)`
*
* @param table The name of the table.
* @param fields The fields of the row that will be inserted.
* @return The SQL query to use for insert data into table.
*/
@Since("4.0.0")
def insertIntoTable(
table: String,
fields: Array[StructField]): String = {
val placeholders = fields.map(_ => "?").mkString(",")
val columns = fields.map(x => quoteIdentifier(x.name)).mkString(",")
s"INSERT INTO $table ($columns) VALUES ($placeholders)"
}

/**
* Get the SQL query that should be used to find if the given table exists. Dialects can
* override this method to return a query that works best in a particular database.
Expand Down Expand Up @@ -542,6 +560,17 @@ abstract class JdbcDialect extends Serializable with Logging {
}
}

/**
* Build a SQL statement to drop the given table.
*
* @param table the table name
* @return The SQL statement to use for drop the table.
*/
@Since("4.0.0")
def dropTable(table: String): String = {
s"DROP TABLE $table"
}

/**
* Build a create index SQL statement.
*
Expand Down

0 comments on commit 6994bad

Please sign in to comment.