diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index a02dd091fbbf..3a7ef55106cc 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -1209,7 +1209,6 @@ func TestImportCSVStmt(t *testing.T) { // -> FK and constraint violation // -> CSV containing keys which will shadow existing data // -> Rollback of a failed IMPORT INTO -// -> IMPORT INTO specific columns of an existing table func TestImportIntoCSV(t *testing.T) { defer leaktest.AfterTest(t)() @@ -1543,15 +1542,6 @@ func TestImportIntoCSV(t *testing.T) { } } - // TODO(adityamaru): IMPORT INTO does not respect the number of columns - // specified after the table name as of now. It compares the number of - // fields in the row being imported, to the number of columns in the created - // table. Thus, statements such as: - // IMPORT INTO t (a, b, c) ... - // IMPORT INTO t (a) ... - // succeed while they should probably alert the user. Add failing test once - // implemented. - // Specify wrong table name. sqlDB.ExpectErr( t, `pq: relation "bad" does not exist`, @@ -1561,6 +1551,228 @@ func TestImportIntoCSV(t *testing.T) { // Expect it to succeed with correct columns. sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0])) }) + + // Tests for user specified target columns in IMPORT INTO statements. + // + // Tests IMPORT INTO with various target column sets, and an implicit PK + // provided by the hidden column row_id. Some statements are run with + // experimental_direct_ingestion. + t.Run("target-cols-with-default-pk", func(t *testing.T) { + var data string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + _, _ = w.Write([]byte(data)) + } + })) + defer srv.Close() + + sqlDB.Exec(t, `CREATE DATABASE targetcols`) + sqlDB.Exec(t, `SET DATABASE = targetcols`) + createQuery := `CREATE TABLE target%d (a INT8, + b INT8, + c STRING, + d INT8, + e INT8, + f STRING)` + var testNum int + + data = "1,5,e,7,12,teststr" + t.Run(data, func(t *testing.T) { + sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum)) + query := fmt.Sprintf(`IMPORT INTO target%d (a) CSV DATA ($1)`, testNum) + sqlDB.Exec(t, query, srv.URL) + sqlDB.CheckQueryResults(t, + fmt.Sprintf(`SELECT * FROM target%d`, testNum), + sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'NULL'`), + ) + testNum++ + }) + t.Run(data, func(t *testing.T) { + sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum)) + query := fmt.Sprintf(`IMPORT INTO target%d (a, f) CSV DATA ($1) WITH experimental_direct_ingestion`, testNum) + sqlDB.Exec(t, query, srv.URL) + sqlDB.CheckQueryResults(t, + fmt.Sprintf(`SELECT * FROM target%d`, testNum), + sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'teststr'`), + ) + testNum++ + }) + t.Run(data, func(t *testing.T) { + sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum)) + query := fmt.Sprintf(`IMPORT INTO target%d (d, e, f) CSV DATA ($1)`, testNum) + sqlDB.Exec(t, query, srv.URL) + sqlDB.CheckQueryResults(t, + fmt.Sprintf(`SELECT * FROM target%d`, testNum), + sqlDB.QueryStr(t, `SELECT NULL, NULL, NULL, 7, 12, 'teststr'`), + ) + testNum++ + }) + + sqlDB.Exec(t, "DROP DATABASE targetcols") + }) + + // Tests IMPORT INTO with a target column set, and an explicit PK. + t.Run("target-cols-with-explicit-pk", func(t *testing.T) { + sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols") + sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`) + + // Insert the test data + insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} + + if tx, err := db.Begin(); err != nil { + t.Fatal(err) + } else { + for i, v := range insert { + sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i+1000, v)) + } + + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + } + + sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO t (a) CSV DATA (%s)", testFiles.files[0])) + + var result int + numExistingRows := len(insert) + // Verify that the target column has been populated. + sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE a IS NOT NULL`).Scan(&result) + if expect := numExistingRows + rowsPerFile; result != expect { + t.Fatalf("expected %d rows, got %d", expect, result) + } + + // Verify that the non-target columns have NULLs. + sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE b IS NULL`).Scan(&result) + expectedNulls := rowsPerFile + if result != expectedNulls { + t.Fatalf("expected %d rows, got %d", expectedNulls, result) + } + + sqlDB.Exec(t, "DROP DATABASE targetcols") + }) + + // Tests IMPORT INTO with a target column set which does not include all PKs. + // As a result the non-target column is non-nullable, which is not allowed + // until we support DEFAULT expressions. + t.Run("target-cols-excluding-explicit-pk", func(t *testing.T) { + sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols") + sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`) + + // Insert the test data + insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} + + if tx, err := db.Begin(); err != nil { + t.Fatal(err) + } else { + for i, v := range insert { + sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v)) + } + + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + } + + // Expect an error if attempting to IMPORT INTO a target list which does + // not include all the PKs of the table. + sqlDB.ExpectErr( + t, `pq: all non-target columns in IMPORT INTO must be nullable`, + fmt.Sprintf(`IMPORT INTO t (b) CSV DATA (%s)`, testFiles.files[0]), + ) + + sqlDB.Exec(t, "DROP DATABASE targetcols") + }) + + // Tests behavior when the existing table being imported into has more columns + // in its schema then the source CSV file. + t.Run("more-table-cols-than-csv", func(t *testing.T) { + sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols") + sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING, c INT)`) + + // Insert the test data + insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} + + if tx, err := db.Begin(); err != nil { + t.Fatal(err) + } else { + for i, v := range insert { + sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b, c) VALUES (%d, %s, %d)", i, v, i)) + } + + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + } + + stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1] + sqlDB.ExpectErr( + t, fmt.Sprintf("pq: %s: row 1: expected 3 fields, got 2", stripFilenameQuotes), + fmt.Sprintf(`IMPORT INTO t (a, b, c) CSV DATA (%s)`, testFiles.files[0]), + ) + + sqlDB.Exec(t, "DROP DATABASE targetcols") + }) + + // Tests behvior when the existing table being imported into has fewer columns + // in its schema then the source CSV file. + t.Run("fewer-table-cols-than-csv", func(t *testing.T) { + sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols") + sqlDB.Exec(t, `CREATE TABLE t (a INT)`) + + // Insert the test data + insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} + + if tx, err := db.Begin(); err != nil { + t.Fatal(err) + } else { + for i := range insert { + sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a) VALUES (%d)", i)) + } + + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + } + + stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1] + sqlDB.ExpectErr( + t, fmt.Sprintf("pq: %s: row 1: expected 1 fields, got 2", stripFilenameQuotes), + fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]), + ) + + sqlDB.Exec(t, "DROP DATABASE targetcols") + }) + + // IMPORT INTO does not support DEFAULT expressions for either target or + // non-target columns. + t.Run("import-into-check-no-default-cols", func(t *testing.T) { + sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols") + sqlDB.Exec(t, `CREATE TABLE t (a INT DEFAULT 1, b STRING)`) + + // Insert the test data + insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} + + if tx, err := db.Begin(); err != nil { + t.Fatal(err) + } else { + for i := range insert { + sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, insert[i])) + } + + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + } + + sqlDB.ExpectErr( + t, fmt.Sprintf("pq: cannot IMPORT INTO a table with a DEFAULT expression for any of its columns"), + fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]), + ) + + sqlDB.Exec(t, "DROP DATABASE targetcols") + }) + // TODO(adityamaru): Add test for IMPORT INTO without target columns specified + // once grammar has been added. } func BenchmarkImport(b *testing.B) {