Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
39023: pkg: Add support for user specified target cols in IMPORT INTO r=adityamaru27 a=adityamaru27

This change adds the ability for users to specify a subset of the columns of an existing table to import into, from a CSV data source. 
(Part of the larger roadmap #26834)

39043: distsqlrun: lookup join efficiency improvements r=jordanlewis a=jordanlewis

- distsqlrun: don't save lookup rows twice
- distsqlrun: stream lookup join output
- distsql: remove unused field from lookup join spec

Previously, lookup join buffered its rendered output rows before
emitting any of them, because this used to be a requirement when lookup
join was also responsible for doing a second layer of lookups against an
index. This was no longer necessary.

Now, after the result of a lookup batch is accumulated into memory, rows
are rendered and emitted in a row-at-a-time streaming fashion.

Also, remove the unused extra index filter expression field from the lookup join spec.

39118: opt: add IndexOrdinal alias r=RaduBerinde a=RaduBerinde

Adding a `cat.IndexOrdinal` alias to make it more explicit when a
value is an index ordinal. It is only an alias because a separate type
would make things like loops more annoying.

Release note: None

39146: sql/tree: store From as value in SelectClause AST node r=nvanbenschoten a=nvanbenschoten

Drive by cleanup. The value is assumed to be non-nil in a number of
places, so we should avoid the unnecessary indirection.

Release note: None

Co-authored-by: Aditya Maru <adityamaru@cockroachlabs.com>
Co-authored-by: Jordan Lewis <jordanthelewis@gmail.com>
Co-authored-by: Radu Berinde <radu@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
5 people committed Jul 29, 2019
5 parents 18a7112 + 91aa7c4 + ba6d1a6 + df01561 + 67923ac commit b427807
Show file tree
Hide file tree
Showing 46 changed files with 1,352 additions and 845 deletions.
48 changes: 37 additions & 11 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
Expand Down Expand Up @@ -399,8 +400,34 @@ func importPlanHook(
if err != nil {
return err
}
// TODO(dt): configure target cols from ImportStmt.IntoCols
tableDetails = []jobspb.ImportDetails_Table{{Desc: &found.TableDescriptor, IsNew: false}}

// Validate target columns.
var intoCols []string
var isTargetCol = make(map[string]bool, len(importStmt.IntoCols))
for _, name := range importStmt.IntoCols {
var err error
if _, err = found.FindActiveColumnByName(name.String()); err != nil {
return errors.Wrap(err, "verifying target columns")
}

isTargetCol[name.String()] = true
intoCols = append(intoCols, name.String())
}

// IMPORT INTO does not support columns with DEFAULT expressions. Ensure
// that all non-target columns are nullable until we support DEFAULT
// expressions.
for _, col := range found.VisibleColumns() {
if col.HasDefault() {
return errors.Errorf("cannot IMPORT INTO a table with a DEFAULT expression for any of its columns")
}

if !isTargetCol[col.Name] && !col.IsNullable() {
return errors.Errorf("all non-target columns in IMPORT INTO must be nullable")
}
}

tableDetails = []jobspb.ImportDetails_Table{{Desc: &found.TableDescriptor, IsNew: false, TargetCols: intoCols}}
} else {
var tableDescs []*sqlbase.TableDescriptor
seqVals := make(map[sqlbase.ID]int64)
Expand Down Expand Up @@ -523,7 +550,7 @@ func doDistributedCSVTransform(
files []string,
p sql.PlanHookState,
parentID sqlbase.ID,
tables map[string]*sqlbase.TableDescriptor,
tables map[string]*distsqlpb.ReadImportDataSpec_ImportTable,
format roachpb.IOFileFormat,
walltime int64,
sstSize int64,
Expand Down Expand Up @@ -804,8 +831,7 @@ func (r *importResumer) Resume(
sstSize = storageccl.MaxImportBatchSize(r.settings) * 5
}

tables := make(map[string]*sqlbase.TableDescriptor, len(details.Tables))

tables := make(map[string]*distsqlpb.ReadImportDataSpec_ImportTable, len(details.Tables))
if details.Tables != nil {
// Skip prepare stage on job resumption, if it has already been completed.
if !details.PrepareComplete {
Expand All @@ -817,11 +843,11 @@ func (r *importResumer) Resume(
details = r.job.Details().(jobspb.ImportDetails)
}

for _, tbl := range details.Tables {
if tbl.Name != "" {
tables[tbl.Name] = tbl.Desc
} else if tbl.Desc != nil {
tables[tbl.Desc.Name] = tbl.Desc
for _, i := range details.Tables {
if i.Name != "" {
tables[i.Name] = &distsqlpb.ReadImportDataSpec_ImportTable{Desc: i.Desc, TargetCols: i.TargetCols}
} else if i.Desc != nil {
tables[i.Desc.Name] = &distsqlpb.ReadImportDataSpec_ImportTable{Desc: i.Desc, TargetCols: i.TargetCols}
} else {
return errors.Errorf("invalid table specification")
}
Expand Down Expand Up @@ -854,7 +880,7 @@ func (r *importResumer) Resume(
if !stickyBitEnabled {
tableIDs := make([]uint32, 0, len(tables))
for _, t := range tables {
tableIDs = append(tableIDs, uint32(t.ID))
tableIDs = append(tableIDs, uint32(t.Desc.ID))
}
disableCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
232 changes: 222 additions & 10 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,6 @@ func TestImportCSVStmt(t *testing.T) {
// -> FK and constraint violation
// -> CSV containing keys which will shadow existing data
// -> Rollback of a failed IMPORT INTO
// -> IMPORT INTO specific columns of an existing table
func TestImportIntoCSV(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -1543,15 +1542,6 @@ func TestImportIntoCSV(t *testing.T) {
}
}

// TODO(adityamaru): IMPORT INTO does not respect the number of columns
// specified after the table name as of now. It compares the number of
// fields in the row being imported, to the number of columns in the created
// table. Thus, statements such as:
// IMPORT INTO t (a, b, c) ...
// IMPORT INTO t (a) ...
// succeed while they should probably alert the user. Add failing test once
// implemented.

// Specify wrong table name.
sqlDB.ExpectErr(
t, `pq: relation "bad" does not exist`,
Expand All @@ -1561,6 +1551,228 @@ func TestImportIntoCSV(t *testing.T) {
// Expect it to succeed with correct columns.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0]))
})

// Tests for user specified target columns in IMPORT INTO statements.
//
// Tests IMPORT INTO with various target column sets, and an implicit PK
// provided by the hidden column row_id. Some statements are run with
// experimental_direct_ingestion.
t.Run("target-cols-with-default-pk", func(t *testing.T) {
var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

sqlDB.Exec(t, `CREATE DATABASE targetcols`)
sqlDB.Exec(t, `SET DATABASE = targetcols`)
createQuery := `CREATE TABLE target%d (a INT8,
b INT8,
c STRING,
d INT8,
e INT8,
f STRING)`
var testNum int

data = "1,5,e,7,12,teststr"
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (a) CSV DATA ($1)`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'NULL'`),
)
testNum++
})
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (a, f) CSV DATA ($1) WITH experimental_direct_ingestion`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT 1, NULL, NULL, NULL, NULL, 'teststr'`),
)
testNum++
})
t.Run(data, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(createQuery, testNum))
query := fmt.Sprintf(`IMPORT INTO target%d (d, e, f) CSV DATA ($1)`, testNum)
sqlDB.Exec(t, query, srv.URL)
sqlDB.CheckQueryResults(t,
fmt.Sprintf(`SELECT * FROM target%d`, testNum),
sqlDB.QueryStr(t, `SELECT NULL, NULL, NULL, 7, 12, 'teststr'`),
)
testNum++
})

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests IMPORT INTO with a target column set, and an explicit PK.
t.Run("target-cols-with-explicit-pk", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i+1000, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO t (a) CSV DATA (%s)", testFiles.files[0]))

var result int
numExistingRows := len(insert)
// Verify that the target column has been populated.
sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE a IS NOT NULL`).Scan(&result)
if expect := numExistingRows + rowsPerFile; result != expect {
t.Fatalf("expected %d rows, got %d", expect, result)
}

// Verify that the non-target columns have NULLs.
sqlDB.QueryRow(t, `SELECT count(*) FROM t WHERE b IS NULL`).Scan(&result)
expectedNulls := rowsPerFile
if result != expectedNulls {
t.Fatalf("expected %d rows, got %d", expectedNulls, result)
}

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests IMPORT INTO with a target column set which does not include all PKs.
// As a result the non-target column is non-nullable, which is not allowed
// until we support DEFAULT expressions.
t.Run("target-cols-excluding-explicit-pk", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

// Expect an error if attempting to IMPORT INTO a target list which does
// not include all the PKs of the table.
sqlDB.ExpectErr(
t, `pq: all non-target columns in IMPORT INTO must be nullable`,
fmt.Sprintf(`IMPORT INTO t (b) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests behavior when the existing table being imported into has more columns
// in its schema then the source CSV file.
t.Run("more-table-cols-than-csv", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING, c INT)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b, c) VALUES (%d, %s, %d)", i, v, i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
sqlDB.ExpectErr(
t, fmt.Sprintf("pq: %s: row 1: expected 3 fields, got 2", stripFilenameQuotes),
fmt.Sprintf(`IMPORT INTO t (a, b, c) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// Tests behvior when the existing table being imported into has fewer columns
// in its schema then the source CSV file.
t.Run("fewer-table-cols-than-csv", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a) VALUES (%d)", i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
sqlDB.ExpectErr(
t, fmt.Sprintf("pq: %s: row 1: expected 1 fields, got 2", stripFilenameQuotes),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})

// IMPORT INTO does not support DEFAULT expressions for either target or
// non-target columns.
t.Run("import-into-check-no-default-cols", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT DEFAULT 1, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, insert[i]))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

sqlDB.ExpectErr(
t, fmt.Sprintf("pq: cannot IMPORT INTO a table with a DEFAULT expression for any of its columns"),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]),
)

sqlDB.Exec(t, "DROP DATABASE targetcols")
})
// TODO(adityamaru): Add test for IMPORT INTO without target columns specified
// once grammar has been added.
}

func BenchmarkImport(b *testing.B) {
Expand Down
Loading

0 comments on commit b427807

Please sign in to comment.