From dd561ee2ee4f59c6f01acbc03fef7093b147cdb4 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 5 Sep 2023 17:15:25 -0400 Subject: [PATCH] Properly support ignore_nulls in CreateLookupVindex (#13913) Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/config_test.go | 71 ++++++---- .../vreplication/vreplication_test.go | 22 ++++ .../tabletserver/vstreamer/planbuilder.go | 25 ++++ go/vt/wrangler/materializer.go | 48 +++++-- go/vt/wrangler/materializer_test.go | 121 ++++++++++++++++++ 5 files changed, 253 insertions(+), 34 deletions(-) diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index dcae0f6a5bf..1ee1b48f91d 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -74,32 +74,59 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name)); initialProductVSchema = ` { "tables": { - "product": {}, - "merchant": {}, - "orders": {}, + "product": {}, + "merchant": {}, + "orders": {}, "loadtest": {}, - "customer": {}, - "customer_seq": { - "type": "sequence" - }, - "customer2": {}, - "customer_seq2": { - "type": "sequence" - }, - "order_seq": { - "type": "sequence" - }, - "Lead": {}, - "Lead-1": {}, - "db_order_test": {}, - "vdiff_order": {}, - "datze": {}, - "reftable": { - "type": "reference" - } + "customer": {}, + "customer_seq": { + "type": "sequence" + }, + "customer2": {}, + "customer_seq2": { + "type": "sequence" + }, + "order_seq": { + "type": "sequence" + }, + "Lead": {}, + "Lead-1": {}, + "db_order_test": {}, + "vdiff_order": {}, + "datze": {}, + "reftable": { + "type": "reference" + } } } ` + + createLookupVindexVSchema = ` +{ + "sharded": true, + "vindexes": { + "customer_name_keyspace_id": { + "type": "consistent_lookup", + "params": { + "table": "product.customer_name_keyspace_id", + "from": "name,cid", + "to": "keyspace_id", + "ignore_nulls": "true" + }, + "owner": "customer" + } + }, + "tables": { + "customer": { + "column_vindexes": [{ + "columns": ["name", "cid"], + "name": "customer_name_keyspace_id" + }] + } + } +} +` + customerSchema = "" customerVSchema = ` { diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 74843c862eb..20d84ae1e14 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -367,6 +367,28 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string verifyCopyStateIsOptimized(t, tablet) } }) + + t.Run("Test CreateLookupVindex", func(t *testing.T) { + // CreateLookupVindex does not support noblob images. + if strings.ToLower(binlogRowImage) == "noblob" { + return + } + _, err = vtgateConn.ExecuteFetch("use customer", 1, false) + require.NoError(t, err, "error using customer keyspace: %v", err) + res, err := vtgateConn.ExecuteFetch("select count(*) from customer where name is not null", 1, false) + require.NoError(t, err, "error getting current row count in customer: %v", err) + require.Equal(t, 1, len(res.Rows), "expected 1 row in count(*) query, got %d", len(res.Rows)) + rows, _ := res.Rows[0][0].ToInt32() + // Insert a couple of rows with a NULL name to confirm that they + // are ignored. + insert := "insert into customer (cid, name, typ, sport, meta) values (100, NULL, 'soho', 'football','{}'), (101, NULL, 'enterprise','baseball','{}')" + _, err = vtgateConn.ExecuteFetch(insert, -1, false) + require.NoError(t, err, "error executing %q: %v", insert, err) + err = vc.VtctlClient.ExecuteCommand("CreateLookupVindex", "--", "--tablet_types=PRIMARY", "customer", createLookupVindexVSchema) + require.NoError(t, err, "error executing CreateLookupVindex: %v", err) + waitForWorkflowState(t, vc, "product.customer_name_keyspace_id_vdx", binlogdatapb.VReplicationWorkflowState_Stopped.String()) + waitForRowCount(t, vtgateConn, "product", "customer_name_keyspace_id", int(rows)) + }) } func TestV2WorkflowsAcrossDBVersions(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 07f0a981dd0..b8a3a0bbc6a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -79,6 +79,8 @@ const ( GreaterThanEqual // NotEqual is used to filter a comparable column if != specific value NotEqual + // IsNotNull is used to filter a column if it is NULL + IsNotNull ) // Filter contains opcodes for filtering. @@ -224,6 +226,10 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations. if !key.KeyRangeContains(filter.KeyRange, ksid) { return false, nil } + case IsNotNull: + if values[filter.ColNum].IsNull() { + return false, nil + } default: match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, charsets[filter.ColNum]) if err != nil { @@ -550,6 +556,25 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil { return err } + case *sqlparser.IsExpr: // Needed for CreateLookupVindex with ignore_nulls + if expr.Right != sqlparser.IsNotNullOp { + return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) + } + qualifiedName, ok := expr.Left.(*sqlparser.ColName) + if !ok { + return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + if !qualifiedName.Qualifier.IsEmpty() { + return fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(qualifiedName)) + } + colnum, err := findColumn(plan.Table, qualifiedName.Name) + if err != nil { + return err + } + plan.Filters = append(plan.Filters, Filter{ + Opcode: IsNotNull, + ColNum: colnum, + }) default: return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 5e24f63a7f7..ddfd8c90d38 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -54,6 +54,7 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type materializer struct { @@ -504,12 +505,13 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp // Important variables are pulled out here. var ( // lookup vindex info - vindexName string - vindex *vschemapb.Vindex - targetKeyspace string - targetTableName string - vindexFromCols []string - vindexToCol string + vindexName string + vindex *vschemapb.Vindex + targetKeyspace string + targetTableName string + vindexFromCols []string + vindexToCol string + vindexIgnoreNulls bool // source table info sourceTableName string @@ -560,6 +562,18 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { return nil, nil, nil, err } + if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { + // This mirrors the behavior of vindexes.boolFromMap(). + switch ignoreNullsStr { + case "true": + vindexIgnoreNulls = true + case "false": + vindexIgnoreNulls = false + default: + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls value must be 'true' or 'false': '%s'", + ignoreNullsStr) + } + } // Validate input table if len(specs.Tables) != 1 { @@ -696,21 +710,31 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp buf = sqlparser.NewTrackedBuffer(nil) buf.Myprintf("select ") for i := range vindexFromCols { - buf.Myprintf("%v as %v, ", sqlparser.NewIdentifierCI(sourceVindexColumns[i]), sqlparser.NewIdentifierCI(vindexFromCols[i])) + buf.Myprintf("%s as %s, ", sqlparser.String(sqlparser.NewIdentifierCI(sourceVindexColumns[i])), sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) } if strings.EqualFold(vindexToCol, "keyspace_id") || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") { - buf.Myprintf("keyspace_id() as %v ", sqlparser.NewIdentifierCI(vindexToCol)) + buf.Myprintf("keyspace_id() as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) } else { - buf.Myprintf("%v as %v ", sqlparser.NewIdentifierCI(vindexToCol), sqlparser.NewIdentifierCI(vindexToCol)) + buf.Myprintf("%s as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)), sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) + } + buf.Myprintf("from %s", sqlparser.String(sqlparser.NewIdentifierCS(sourceTableName))) + if vindexIgnoreNulls { + buf.Myprintf(" where ") + lastValIdx := len(vindexFromCols) - 1 + for i := range vindexFromCols { + buf.Myprintf("%s is not null", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) + if i != lastValIdx { + buf.Myprintf(" and ") + } + } } - buf.Myprintf("from %v", sqlparser.NewIdentifierCS(sourceTableName)) if vindex.Owner != "" { // Only backfill buf.Myprintf(" group by ") for i := range vindexFromCols { - buf.Myprintf("%v, ", sqlparser.NewIdentifierCI(vindexFromCols[i])) + buf.Myprintf("%s, ", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) } - buf.Myprintf("%v", sqlparser.NewIdentifierCI(vindexToCol)) + buf.Myprintf("%s", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) } materializeQuery = buf.String() diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 323769ae908..9296c6f70d4 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1334,6 +1334,127 @@ func TestCreateCustomizedVindex(t *testing.T) { } } +func TestCreateLookupVindexIgnoreNulls(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "ks", + TargetKeyspace: "ks", + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) + defer env.close() + + specs := &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "consistent_lookup", + Params: map[string]string{ + "table": "ks.lkp", + "from": "col2,col1", + "to": "keyspace_id", + "ignore_nulls": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Columns: []string{"col2", "col1"}, + }}, + }, + }, + } + // Dummy sourceSchema + sourceSchema := "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1" + + vschema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }}, + }, + }, + } + + wantKs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "consistent_lookup", + Params: map[string]string{ + "table": "ks.lkp", + "from": "col2,col1", + "to": "keyspace_id", + "write_only": "true", + "ignore_nulls": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "v", + Columns: []string{"col2", "col1"}, + }}, + }, + "lkp": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "col2", + Name: "hash", + }}, + }, + }, + } + wantQuery := "select col2 as col2, col1 as col1, keyspace_id() as keyspace_id from t1 where col2 is not null and col1 is not null group by col2, col1, keyspace_id" + + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Fields: []*querypb.Field{{ + Name: "col1", + Type: querypb.Type_INT64, + }, { + Name: "col2", + Type: querypb.Type_INT64, + }}, + Schema: sourceSchema, + }}, + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vschema); err != nil { + t.Fatal(err) + } + + ms, ks, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false) + require.NoError(t, err) + if !proto.Equal(wantKs, ks) { + t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs) + } + require.NotNil(t, ms) + require.GreaterOrEqual(t, len(ms.TableSettings), 1) + require.Equal(t, wantQuery, ms.TableSettings[0].SourceExpression, "unexpected query") +} + func TestStopAfterCopyFlag(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ SourceKeyspace: "ks",