Skip to content

Commit

Permalink
Properly support ignore_nulls in CreateLookupVindex (#13913)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Sep 5, 2023
1 parent 6d26dad commit dd561ee
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 34 deletions.
71 changes: 49 additions & 22 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,32 +74,59 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name));
initialProductVSchema = `
{
"tables": {
"product": {},
"merchant": {},
"orders": {},
"product": {},
"merchant": {},
"orders": {},
"loadtest": {},
"customer": {},
"customer_seq": {
"type": "sequence"
},
"customer2": {},
"customer_seq2": {
"type": "sequence"
},
"order_seq": {
"type": "sequence"
},
"Lead": {},
"Lead-1": {},
"db_order_test": {},
"vdiff_order": {},
"datze": {},
"reftable": {
"type": "reference"
}
"customer": {},
"customer_seq": {
"type": "sequence"
},
"customer2": {},
"customer_seq2": {
"type": "sequence"
},
"order_seq": {
"type": "sequence"
},
"Lead": {},
"Lead-1": {},
"db_order_test": {},
"vdiff_order": {},
"datze": {},
"reftable": {
"type": "reference"
}
}
}
`

createLookupVindexVSchema = `
{
"sharded": true,
"vindexes": {
"customer_name_keyspace_id": {
"type": "consistent_lookup",
"params": {
"table": "product.customer_name_keyspace_id",
"from": "name,cid",
"to": "keyspace_id",
"ignore_nulls": "true"
},
"owner": "customer"
}
},
"tables": {
"customer": {
"column_vindexes": [{
"columns": ["name", "cid"],
"name": "customer_name_keyspace_id"
}]
}
}
}
`

customerSchema = ""
customerVSchema = `
{
Expand Down
22 changes: 22 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,28 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string
verifyCopyStateIsOptimized(t, tablet)
}
})

t.Run("Test CreateLookupVindex", func(t *testing.T) {
// CreateLookupVindex does not support noblob images.
if strings.ToLower(binlogRowImage) == "noblob" {
return
}
_, err = vtgateConn.ExecuteFetch("use customer", 1, false)
require.NoError(t, err, "error using customer keyspace: %v", err)
res, err := vtgateConn.ExecuteFetch("select count(*) from customer where name is not null", 1, false)
require.NoError(t, err, "error getting current row count in customer: %v", err)
require.Equal(t, 1, len(res.Rows), "expected 1 row in count(*) query, got %d", len(res.Rows))
rows, _ := res.Rows[0][0].ToInt32()
// Insert a couple of rows with a NULL name to confirm that they
// are ignored.
insert := "insert into customer (cid, name, typ, sport, meta) values (100, NULL, 'soho', 'football','{}'), (101, NULL, 'enterprise','baseball','{}')"
_, err = vtgateConn.ExecuteFetch(insert, -1, false)
require.NoError(t, err, "error executing %q: %v", insert, err)
err = vc.VtctlClient.ExecuteCommand("CreateLookupVindex", "--", "--tablet_types=PRIMARY", "customer", createLookupVindexVSchema)
require.NoError(t, err, "error executing CreateLookupVindex: %v", err)
waitForWorkflowState(t, vc, "product.customer_name_keyspace_id_vdx", binlogdatapb.VReplicationWorkflowState_Stopped.String())
waitForRowCount(t, vtgateConn, "product", "customer_name_keyspace_id", int(rows))
})
}

func TestV2WorkflowsAcrossDBVersions(t *testing.T) {
Expand Down
25 changes: 25 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ const (
GreaterThanEqual
// NotEqual is used to filter a comparable column if != specific value
NotEqual
// IsNotNull is used to filter a column if it is NULL
IsNotNull
)

// Filter contains opcodes for filtering.
Expand Down Expand Up @@ -224,6 +226,10 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations.
if !key.KeyRangeContains(filter.KeyRange, ksid) {
return false, nil
}
case IsNotNull:
if values[filter.ColNum].IsNull() {
return false, nil
}
default:
match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, charsets[filter.ColNum])
if err != nil {
Expand Down Expand Up @@ -550,6 +556,25 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil {
return err
}
case *sqlparser.IsExpr: // Needed for CreateLookupVindex with ignore_nulls
if expr.Right != sqlparser.IsNotNullOp {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
qualifiedName, ok := expr.Left.(*sqlparser.ColName)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
if !qualifiedName.Qualifier.IsEmpty() {
return fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(qualifiedName))
}
colnum, err := findColumn(plan.Table, qualifiedName.Name)
if err != nil {
return err
}
plan.Filters = append(plan.Filters, Filter{
Opcode: IsNotNull,
ColNum: colnum,
})
default:
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
Expand Down
48 changes: 36 additions & 12 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type materializer struct {
Expand Down Expand Up @@ -504,12 +505,13 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp
// Important variables are pulled out here.
var (
// lookup vindex info
vindexName string
vindex *vschemapb.Vindex
targetKeyspace string
targetTableName string
vindexFromCols []string
vindexToCol string
vindexName string
vindex *vschemapb.Vindex
targetKeyspace string
targetTableName string
vindexFromCols []string
vindexToCol string
vindexIgnoreNulls bool

// source table info
sourceTableName string
Expand Down Expand Up @@ -560,6 +562,18 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp
if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil {
return nil, nil, nil, err
}
if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok {
// This mirrors the behavior of vindexes.boolFromMap().
switch ignoreNullsStr {
case "true":
vindexIgnoreNulls = true
case "false":
vindexIgnoreNulls = false
default:
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls value must be 'true' or 'false': '%s'",
ignoreNullsStr)
}
}

// Validate input table
if len(specs.Tables) != 1 {
Expand Down Expand Up @@ -696,21 +710,31 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp
buf = sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select ")
for i := range vindexFromCols {
buf.Myprintf("%v as %v, ", sqlparser.NewIdentifierCI(sourceVindexColumns[i]), sqlparser.NewIdentifierCI(vindexFromCols[i]))
buf.Myprintf("%s as %s, ", sqlparser.String(sqlparser.NewIdentifierCI(sourceVindexColumns[i])), sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i])))
}
if strings.EqualFold(vindexToCol, "keyspace_id") || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") {
buf.Myprintf("keyspace_id() as %v ", sqlparser.NewIdentifierCI(vindexToCol))
buf.Myprintf("keyspace_id() as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)))
} else {
buf.Myprintf("%v as %v ", sqlparser.NewIdentifierCI(vindexToCol), sqlparser.NewIdentifierCI(vindexToCol))
buf.Myprintf("%s as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)), sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)))
}
buf.Myprintf("from %s", sqlparser.String(sqlparser.NewIdentifierCS(sourceTableName)))
if vindexIgnoreNulls {
buf.Myprintf(" where ")
lastValIdx := len(vindexFromCols) - 1
for i := range vindexFromCols {
buf.Myprintf("%s is not null", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i])))
if i != lastValIdx {
buf.Myprintf(" and ")
}
}
}
buf.Myprintf("from %v", sqlparser.NewIdentifierCS(sourceTableName))
if vindex.Owner != "" {
// Only backfill
buf.Myprintf(" group by ")
for i := range vindexFromCols {
buf.Myprintf("%v, ", sqlparser.NewIdentifierCI(vindexFromCols[i]))
buf.Myprintf("%s, ", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i])))
}
buf.Myprintf("%v", sqlparser.NewIdentifierCI(vindexToCol))
buf.Myprintf("%s", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)))
}
materializeQuery = buf.String()

Expand Down
121 changes: 121 additions & 0 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,127 @@ func TestCreateCustomizedVindex(t *testing.T) {
}
}

func TestCreateLookupVindexIgnoreNulls(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
SourceKeyspace: "ks",
TargetKeyspace: "ks",
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"})
defer env.close()

specs := &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Type: "consistent_lookup",
Params: map[string]string{
"table": "ks.lkp",
"from": "col2,col1",
"to": "keyspace_id",
"ignore_nulls": "true",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Columns: []string{"col2", "col1"},
}},
},
},
}
// Dummy sourceSchema
sourceSchema := "CREATE TABLE `t1` (\n" +
" `col1` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `col2` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1"

vschema := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "hash",
Column: "col1",
}},
},
},
}

wantKs := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
"v": {
Type: "consistent_lookup",
Params: map[string]string{
"table": "ks.lkp",
"from": "col2,col1",
"to": "keyspace_id",
"write_only": "true",
"ignore_nulls": "true",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "hash",
Column: "col1",
}, {
Name: "v",
Columns: []string{"col2", "col1"},
}},
},
"lkp": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "col2",
Name: "hash",
}},
},
},
}
wantQuery := "select col2 as col2, col1 as col1, keyspace_id() as keyspace_id from t1 where col2 is not null and col1 is not null group by col2, col1, keyspace_id"

env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Fields: []*querypb.Field{{
Name: "col1",
Type: querypb.Type_INT64,
}, {
Name: "col2",
Type: querypb.Type_INT64,
}},
Schema: sourceSchema,
}},
}
if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vschema); err != nil {
t.Fatal(err)
}

ms, ks, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(wantKs, ks) {
t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs)
}
require.NotNil(t, ms)
require.GreaterOrEqual(t, len(ms.TableSettings), 1)
require.Equal(t, wantQuery, ms.TableSettings[0].SourceExpression, "unexpected query")
}

func TestStopAfterCopyFlag(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
SourceKeyspace: "ks",
Expand Down

0 comments on commit dd561ee

Please sign in to comment.