Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: Add support for user specified target cols in IMPORT INTO #39023

Merged
merged 3 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
Expand Down Expand Up @@ -399,8 +400,34 @@ func importPlanHook(
if err != nil {
return err
}
// TODO(dt): configure target cols from ImportStmt.IntoCols
tableDetails = []jobspb.ImportDetails_Table{{Desc: &found.TableDescriptor, IsNew: false}}

// Validate target columns.
var intoCols []string
var isTargetCol = make(map[string]bool, len(importStmt.IntoCols))
for _, name := range importStmt.IntoCols {
var err error
if _, err = found.FindActiveColumnByName(name.String()); err != nil {
return errors.Wrap(err, "verifying target columns")
}

isTargetCol[name.String()] = true
intoCols = append(intoCols, name.String())
}

// IMPORT INTO does not support columns with DEFAULT expressions. Ensure
// that all non-target columns are nullable until we support DEFAULT
// expressions.
for _, col := range found.VisibleColumns() {
if col.HasDefault() {
return errors.Errorf("cannot IMPORT INTO a table with a DEFAULT expression for any of its columns")
}

if !isTargetCol[col.Name] && !col.IsNullable() {
return errors.Errorf("all non-target columns in IMPORT INTO must be nullable")
}
}

tableDetails = []jobspb.ImportDetails_Table{{Desc: &found.TableDescriptor, IsNew: false, TargetCols: intoCols}}
} else {
var tableDescs []*sqlbase.TableDescriptor
seqVals := make(map[sqlbase.ID]int64)
Expand Down Expand Up @@ -523,7 +550,7 @@ func doDistributedCSVTransform(
files []string,
p sql.PlanHookState,
parentID sqlbase.ID,
tables map[string]*sqlbase.TableDescriptor,
tables map[string]*distsqlpb.ReadImportDataSpec_ImportTable,
format roachpb.IOFileFormat,
walltime int64,
sstSize int64,
Expand Down Expand Up @@ -804,8 +831,7 @@ func (r *importResumer) Resume(
sstSize = storageccl.MaxImportBatchSize(r.settings) * 5
}

tables := make(map[string]*sqlbase.TableDescriptor, len(details.Tables))

tables := make(map[string]*distsqlpb.ReadImportDataSpec_ImportTable, len(details.Tables))
if details.Tables != nil {
// Skip prepare stage on job resumption, if it has already been completed.
if !details.PrepareComplete {
Expand All @@ -817,11 +843,11 @@ func (r *importResumer) Resume(
details = r.job.Details().(jobspb.ImportDetails)
}

for _, tbl := range details.Tables {
if tbl.Name != "" {
tables[tbl.Name] = tbl.Desc
} else if tbl.Desc != nil {
tables[tbl.Desc.Name] = tbl.Desc
for _, i := range details.Tables {
if i.Name != "" {
tables[i.Name] = &distsqlpb.ReadImportDataSpec_ImportTable{Desc: i.Desc, TargetCols: i.TargetCols}
} else if i.Desc != nil {
tables[i.Desc.Name] = &distsqlpb.ReadImportDataSpec_ImportTable{Desc: i.Desc, TargetCols: i.TargetCols}
} else {
return errors.Errorf("invalid table specification")
}
Expand Down Expand Up @@ -854,7 +880,7 @@ func (r *importResumer) Resume(
if !stickyBitEnabled {
tableIDs := make([]uint32, 0, len(tables))
for _, t := range tables {
tableIDs = append(tableIDs, uint32(t.ID))
tableIDs = append(tableIDs, uint32(t.Desc.ID))
}
disableCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
232 changes: 222 additions & 10 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,6 @@ func TestImportCSVStmt(t *testing.T) {
// -> FK and constraint violation
// -> CSV containing keys which will shadow existing data
// -> Rollback of a failed IMPORT INTO
// -> IMPORT INTO specific columns of an existing table
func TestImportIntoCSV(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -1543,15 +1542,6 @@ func TestImportIntoCSV(t *testing.T) {
}
}

// TODO(adityamaru): IMPORT INTO does not respect the number of columns
// specified after the table name as of now. It compares the number of
// fields in the row being imported, to the number of columns in the created
// table. Thus, statements such as:
// IMPORT INTO t (a, b, c) ...
// IMPORT INTO t (a) ...
// succeed while they should probably alert the user. Add failing test once
// implemented.

// Specify wrong table name.
sqlDB.ExpectErr(
t, `pq: relation "bad" does not exist`,
Expand All @@ -1561,6 +1551,228 @@ func TestImportIntoCSV(t *testing.T) {
// Expect it to succeed with correct columns.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0]))
})

// Tests for user specified target columns in IMPORT INTO statements.
//
// Tests IMPORT INTO with various target column sets, and an implicit PK
// provided by the hidden column row_id. Some statements are run with
// experimental_direct_ingestion.
t.Run("target-cols-with-default-pk", func(t *testing.T) {
var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

sqlDB.Exec(t, `CREATE DATABASE targetcols`)
sqlDB.Exec(t, `SET DATABASE = targetcols`)
createQuery := `CREATE TABLE target%d (a INT8,
b INT8,
c STRING,
d INT8,
e INT8,
f STRING)`
var testNum int

data = "1,5,e,7,12,teststr"
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (a) CSV DATA ($1)`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'NULL'`),
)
testNum++
})
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (a, f) CSV DATA ($1) WITH experimental_direct_ingestion`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'teststr'`),
)
testNum++
})
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (d, e, f) CSV DATA ($1)`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT NULL, NULL, NULL, 7, 12, 'teststr'`),
)
testNum++
})

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests IMPORT INTO with a target column set, and an explicit PK.
t.Run("target-cols-with-explicit-pk", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i+1000, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO t (a) CSV DATA (%s)", testFiles.files[0]))

var result int
numExistingRows := len(insert)
// Verify that the target column has been populated.
sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE a IS NOT NULL`).Scan(&result)
if expect := numExistingRows + rowsPerFile; result != expect {
t.Fatalf("expected %d rows, got %d", expect, result)
}

// Verify that the non-target columns have NULLs.
sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE b IS NULL`).Scan(&result)
expectedNulls := rowsPerFile
if result != expectedNulls {
t.Fatalf("expected %d rows, got %d", expectedNulls, result)
}

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests IMPORT INTO with a target column set which does not include all PKs.
// As a result the non-target column is non-nullable, which is not allowed
// until we support DEFAULT expressions.
t.Run("target-cols-excluding-explicit-pk", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

// Expect an error if attempting to IMPORT INTO a target list which does
// not include all the PKs of the table.
sqlDB.ExpectErr(
t, `pq: all non-target columns in IMPORT INTO must be nullable`,
fmt.Sprintf(`IMPORT INTO t (b) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests behavior when the existing table being imported into has more columns
// in its schema then the source CSV file.
t.Run("more-table-cols-than-csv", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING, c INT)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b, c) VALUES (%d, %s, %d)", i, v, i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
sqlDB.ExpectErr(
t, fmt.Sprintf("pq: %s: row 1: expected 3 fields, got 2", stripFilenameQuotes),
fmt.Sprintf(`IMPORT INTO t (a, b, c) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests behvior when the existing table being imported into has fewer columns
// in its schema then the source CSV file.
t.Run("fewer-table-cols-than-csv", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a) VALUES (%d)", i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
sqlDB.ExpectErr(
t, fmt.Sprintf("pq: %s: row 1: expected 1 fields, got 2", stripFilenameQuotes),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// IMPORT INTO does not support DEFAULT expressions for either target or
// non-target columns.
t.Run("import-into-check-no-default-cols", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT DEFAULT 1, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, insert[i]))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

sqlDB.ExpectErr(
t, fmt.Sprintf("pq: cannot IMPORT INTO a table with a DEFAULT expression for any of its columns"),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})
// TODO(adityamaru): Add test for IMPORT INTO without target columns specified
// once grammar has been added.
}

func BenchmarkImport(b *testing.B) {
Expand Down
Loading