Skip to content

Commit

Permalink
importccl: Added tests for IMPORT INTO with user specified target cols.
Browse files Browse the repository at this point in the history
This change adds unit tests for importing into an existing table while
specifying a subset of all visible columns as targets.

Release note: None
  • Loading branch information
adityamaru27 committed Jul 22, 2019
1 parent c0d807d commit 8a49379
Showing 1 changed file with 190 additions and 10 deletions.
200 changes: 190 additions & 10 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,6 @@ func TestImportCSVStmt(t *testing.T) {
// -> FK and constraint violation
// -> CSV containing keys which will shadow existing data
// -> Rollback of a failed IMPORT INTO
// -> IMPORT INTO specific columns of an existing table
func TestImportIntoCSV(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -1543,15 +1542,6 @@ func TestImportIntoCSV(t *testing.T) {
}
}

// TODO(adityamaru): IMPORT INTO does not respect the number of columns
// specified after the table name as of now. It compares the number of
// fields in the row being imported, to the number of columns in the created
// table. Thus, statements such as:
// IMPORT INTO t (a, b, c) ...
// IMPORT INTO t (a) ...
// succeed while they should probably alert the user. Add failing test once
// implemented.

// Specify wrong table name.
sqlDB.ExpectErr(
t, `pq: relation "bad" does not exist`,
Expand All @@ -1561,6 +1551,196 @@ func TestImportIntoCSV(t *testing.T) {
// Expect it to succeed with correct columns.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0]))
})

// Tests for user specified target columns in IMPORT INTO statements.
//
// Tests IMPORT INTO with various target column sets, and an implicit PK
// provided by the hidden column row_id. Some statements are run with
// experimental_direct_ingestion.
t.Run("target-cols-with-default-pk", func(t *testing.T) {
var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

sqlDB.Exec(t, `CREATE DATABASE targetcols`)
sqlDB.Exec(t, `SET DATABASE = targetcols`)
createQuery := `CREATE TABLE target%d (a INT8,
b INT8,
c STRING,
d INT8,
e INT8,
f STRING)`
var testNum int

data = "1,5,e,7,12,teststr"
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (a) CSV DATA ($1)`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'NULL'`),
)
testNum++
})
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (a, f) CSV DATA ($1) WITH experimental_direct_ingestion`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'teststr'`),
)
testNum++
})
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (d, e, f) CSV DATA ($1)`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT NULL, NULL, NULL, 7, 12, 'teststr'`),
)
testNum++
})

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests IMPORT INTO with a target column set, and an explicit PK.
t.Run("target-cols-with-explicit-pk", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i+1000, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO t (a) CSV DATA (%s)", testFiles.files[0]))

var result int
numExistingRows := len(insert)
// Verify that the target column has been populated.
sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE a IS NOT NULL`).Scan(&result)
if expect := numExistingRows + rowsPerFile; result != expect {
t.Fatalf("expected %d rows, got %d", expect, result)
}

// Verify that the non-target columns have NULLs.
sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE b IS NULL`).Scan(&result)
expectedNulls := rowsPerFile
if result != expectedNulls {
t.Fatalf("expected %d rows, got %d", expectedNulls, result)
}

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests IMPORT INTO with a target column set which does not include all PKs.
t.Run("target-cols-excluding-explicit-pk", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

// Expect an error if attempting to IMPORT INTO a target list which does
// not include all the PKs of the table.
sqlDB.ExpectErr(
t, `pq: make row inserter: missing "a" primary key column`,
fmt.Sprintf(`IMPORT INTO t (b) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests behvior when the existing table being imported into has more columns
// in its schema then the source CSV file.
t.Run("more-table-cols-than-csv", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING, c INT)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b, c) VALUES (%d, %s, %d)", i, v, i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

sqlDB.ExpectErr(
t, `pq: nodelocal:///data-0: row 1: expected 3 fields, got 2`,
fmt.Sprintf(`IMPORT INTO t (a, b, c) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests behvior when the existing table being imported into has fewer columns
// in its schema then the source CSV file.
t.Run("fewer-table-cols-than-csv", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a) VALUES (%d)", i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

sqlDB.ExpectErr(
t, `pq: nodelocal:///data-0: row 1: expected 1 fields, got 2`,
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// TODO(adityamaru): Add test for IMPORT INTO without target columns specified
// once grammar has been added.
}

func BenchmarkImport(b *testing.B) {
Expand Down

0 comments on commit 8a49379

Please sign in to comment.