Skip to content

Commit

Permalink
importccl: Added tests for IMPORT INTO with user specified target cols.
Browse files Browse the repository at this point in the history
This change adds unit tests for importing into an existing table while
specifying a subset of all visible columns as targets.

Release note: None
  • Loading branch information
adityamaru27 committed Jul 29, 2019
1 parent 796a57e commit 91aa7c4
Showing 1 changed file with 222 additions and 10 deletions.
232 changes: 222 additions & 10 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,6 @@ func TestImportCSVStmt(t *testing.T) {
// -> FK and constraint violation
// -> CSV containing keys which will shadow existing data
// -> Rollback of a failed IMPORT INTO
// -> IMPORT INTO specific columns of an existing table
func TestImportIntoCSV(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -1543,15 +1542,6 @@ func TestImportIntoCSV(t *testing.T) {
}
}

// TODO(adityamaru): IMPORT INTO does not respect the number of columns
// specified after the table name as of now. It compares the number of
// fields in the row being imported, to the number of columns in the created
// table. Thus, statements such as:
// IMPORT INTO t (a, b, c) ...
// IMPORT INTO t (a) ...
// succeed while they should probably alert the user. Add failing test once
// implemented.

// Specify wrong table name.
sqlDB.ExpectErr(
t, `pq: relation "bad" does not exist`,
Expand All @@ -1561,6 +1551,228 @@ func TestImportIntoCSV(t *testing.T) {
// Expect it to succeed with correct columns.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0]))
})

// Tests for user specified target columns in IMPORT INTO statements.
//
// Tests IMPORT INTO with various target column sets, and an implicit PK
// provided by the hidden column row_id. Some statements are run with
// experimental_direct_ingestion.
t.Run("target-cols-with-default-pk", func(t *testing.T) {
var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

sqlDB.Exec(t, `CREATE DATABASE targetcols`)
sqlDB.Exec(t, `SET DATABASE = targetcols`)
createQuery := `CREATE TABLE target%d (a INT8,
b INT8,
c STRING,
d INT8,
e INT8,
f STRING)`
var testNum int

data = "1,5,e,7,12,teststr"
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (a) CSV DATA ($1)`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'NULL'`),
)
testNum++
})
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (a, f) CSV DATA ($1) WITH experimental_direct_ingestion`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'teststr'`),
)
testNum++
})
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (d, e, f) CSV DATA ($1)`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT NULL, NULL, NULL, 7, 12, 'teststr'`),
)
testNum++
})

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests IMPORT INTO with a target column set, and an explicit PK.
t.Run("target-cols-with-explicit-pk", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i+1000, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO t (a) CSV DATA (%s)", testFiles.files[0]))

var result int
numExistingRows := len(insert)
// Verify that the target column has been populated.
sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE a IS NOT NULL`).Scan(&result)
if expect := numExistingRows + rowsPerFile; result != expect {
t.Fatalf("expected %d rows, got %d", expect, result)
}

// Verify that the non-target columns have NULLs.
sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE b IS NULL`).Scan(&result)
expectedNulls := rowsPerFile
if result != expectedNulls {
t.Fatalf("expected %d rows, got %d", expectedNulls, result)
}

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests IMPORT INTO with a target column set which does not include all PKs.
// As a result the non-target column is non-nullable, which is not allowed
// until we support DEFAULT expressions.
t.Run("target-cols-excluding-explicit-pk", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

// Expect an error if attempting to IMPORT INTO a target list which does
// not include all the PKs of the table.
sqlDB.ExpectErr(
t, `pq: all non-target columns in IMPORT INTO must be nullable`,
fmt.Sprintf(`IMPORT INTO t (b) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests behavior when the existing table being imported into has more columns
// in its schema then the source CSV file.
t.Run("more-table-cols-than-csv", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING, c INT)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b, c) VALUES (%d, %s, %d)", i, v, i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
sqlDB.ExpectErr(
t, fmt.Sprintf("pq: %s: row 1: expected 3 fields, got 2", stripFilenameQuotes),
fmt.Sprintf(`IMPORT INTO t (a, b, c) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests behvior when the existing table being imported into has fewer columns
// in its schema then the source CSV file.
t.Run("fewer-table-cols-than-csv", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a) VALUES (%d)", i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
sqlDB.ExpectErr(
t, fmt.Sprintf("pq: %s: row 1: expected 1 fields, got 2", stripFilenameQuotes),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// IMPORT INTO does not support DEFAULT expressions for either target or
// non-target columns.
t.Run("import-into-check-no-default-cols", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT DEFAULT 1, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, insert[i]))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

sqlDB.ExpectErr(
t, fmt.Sprintf("pq: cannot IMPORT INTO a table with a DEFAULT expression for any of its columns"),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})
// TODO(adityamaru): Add test for IMPORT INTO without target columns specified
// once grammar has been added.
}

func BenchmarkImport(b *testing.B) {
Expand Down

0 comments on commit 91aa7c4

Please sign in to comment.