Skip to content

Commit

Permalink
[SPARK-9078] [SQL] Allow jdbc dialects to override the query used to …
Browse files Browse the repository at this point in the history
…check the table.

Current implementation uses query with a LIMIT clause to find if table already exists. This syntax works only in some database systems. This patch changes the default query to the one that is likely to work on most databases, and adds a new method to the  JdbcDialect abstract class to allow  dialects to override the default query.

I looked at using the JDBC meta data calls, it turns out there is no common way to find the current schema, catalog..etc.  There is a new method Connection.getSchema() , but that is available only starting jdk1.7 , and existing jdbc drivers may not have implemented it.  Other option was to use jdbc escape syntax clause for LIMIT, not sure on how well this supported in all the databases also. After looking at all the jdbc metadata options my conclusion was most common way is to use the simple select query with 'where 1 =0' , and allow dialects to customize as needed

Author: sureshthalamati <suresh.thalamati@gmail.com>

Closes #8676 from sureshthalamati/table_exists_spark-9078.
  • Loading branch information
sureshthalamati authored and yhuai committed Sep 16, 2015
1 parent 35a19f3 commit 64c29af
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
val conn = JdbcUtils.createConnection(url, props)

try {
var tableExists = JdbcUtils.tableExists(conn, table)
var tableExists = JdbcUtils.tableExists(conn, url, table)

if (mode == SaveMode.Ignore && tableExists) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ object JdbcUtils extends Logging {
/**
* Returns true if the table already exists in the JDBC database.
*/
def tableExists(conn: Connection, table: String): Boolean = {
def tableExists(conn: Connection, url: String, table: String): Boolean = {
val dialect = JdbcDialects.get(url)

// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
// SQL database systems, considering "table" could also include the database name.
Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess
// SQL database systems using JDBC meta data calls, considering "table" could also include
// the database name. Query used to find table exists can be overriden by the dialects.
Try(conn.prepareStatement(dialect.getTableExistsQuery(table)).executeQuery()).isSuccess
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ abstract class JdbcDialect {
def quoteIdentifier(colName: String): String = {
s""""$colName""""
}

/**
* Get the SQL query that should be used to find if the given table exists. Dialects can
* override this method to return a query that works best in a particular database.
* @param table The name of the table.
* @return The SQL query to use for checking the table.
*/
def getTableExistsQuery(table: String): String = {
s"SELECT * FROM $table WHERE 1=0"
}

}

/**
Expand Down Expand Up @@ -198,6 +209,11 @@ case object PostgresDialect extends JdbcDialect {
case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
case _ => None
}

override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}

}

/**
Expand All @@ -222,6 +238,10 @@ case object MySQLDialect extends JdbcDialect {
override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}

override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
}

/**
Expand Down
14 changes: 14 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -450,4 +450,18 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB")
assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)")
}

test("table exists query by jdbc dialect") {
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
val h2 = JdbcDialects.get(url)
val table = "weblogs"
val defaultQuery = s"SELECT * FROM $table WHERE 1=0"
val limitQuery = s"SELECT 1 FROM $table LIMIT 1"
assert(MySQL.getTableExistsQuery(table) == limitQuery)
assert(Postgres.getTableExistsQuery(table) == limitQuery)
assert(db2.getTableExistsQuery(table) == defaultQuery)
assert(h2.getTableExistsQuery(table) == defaultQuery)
}
}

0 comments on commit 64c29af

Please sign in to comment.