From 06fddffa78eb21fb8844589a94b8b2ba912016bf Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 3 Sep 2023 17:26:14 -0400 Subject: [PATCH 1/6] Properly support ignore_nulls in CreateLookupVindex Signed-off-by: Matt Lord --- .../tabletserver/vstreamer/planbuilder.go | 25 ++++ go/vt/wrangler/materializer.go | 39 ++++-- go/vt/wrangler/materializer_test.go | 121 ++++++++++++++++++ 3 files changed, 173 insertions(+), 12 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 07f0a981dd0..b8a3a0bbc6a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -79,6 +79,8 @@ const ( GreaterThanEqual // NotEqual is used to filter a comparable column if != specific value NotEqual + // IsNotNull is used to filter a column if it is NULL + IsNotNull ) // Filter contains opcodes for filtering. @@ -224,6 +226,10 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations. if !key.KeyRangeContains(filter.KeyRange, ksid) { return false, nil } + case IsNotNull: + if values[filter.ColNum].IsNull() { + return false, nil + } default: match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, charsets[filter.ColNum]) if err != nil { @@ -550,6 +556,25 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil { return err } + case *sqlparser.IsExpr: // Needed for CreateLookupVindex with ignore_nulls + if expr.Right != sqlparser.IsNotNullOp { + return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) + } + qualifiedName, ok := expr.Left.(*sqlparser.ColName) + if !ok { + return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + if !qualifiedName.Qualifier.IsEmpty() { + return fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(qualifiedName)) + } + colnum, err := findColumn(plan.Table, qualifiedName.Name) + if err != nil { + return err + } + plan.Filters = append(plan.Filters, Filter{ + Opcode: IsNotNull, + ColNum: colnum, + }) default: return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 5e24f63a7f7..5ad03e6f932 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -22,6 +22,7 @@ import ( "hash/fnv" "math" "sort" + "strconv" "strings" "sync" "text/template" @@ -504,12 +505,13 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp // Important variables are pulled out here. var ( // lookup vindex info - vindexName string - vindex *vschemapb.Vindex - targetKeyspace string - targetTableName string - vindexFromCols []string - vindexToCol string + vindexName string + vindex *vschemapb.Vindex + targetKeyspace string + targetTableName string + vindexFromCols []string + vindexToCol string + vindexIgnoreNulls bool // source table info sourceTableName string @@ -560,6 +562,9 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { return nil, nil, nil, err } + if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { + vindexIgnoreNulls, _ = strconv.ParseBool(ignoreNullsStr) + } // Validate input table if len(specs.Tables) != 1 { @@ -696,21 +701,31 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp buf = sqlparser.NewTrackedBuffer(nil) buf.Myprintf("select ") for i := range vindexFromCols { - buf.Myprintf("%v as %v, ", sqlparser.NewIdentifierCI(sourceVindexColumns[i]), sqlparser.NewIdentifierCI(vindexFromCols[i])) + buf.Myprintf("%s as %s, ", sqlparser.String(sqlparser.NewIdentifierCI(sourceVindexColumns[i])), sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) } if strings.EqualFold(vindexToCol, "keyspace_id") || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") { - buf.Myprintf("keyspace_id() as %v ", sqlparser.NewIdentifierCI(vindexToCol)) + buf.Myprintf("keyspace_id() as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) } else { - buf.Myprintf("%v as %v ", sqlparser.NewIdentifierCI(vindexToCol), sqlparser.NewIdentifierCI(vindexToCol)) + buf.Myprintf("%s as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)), sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) + } + buf.Myprintf("from %s", sqlparser.String(sqlparser.NewIdentifierCS(sourceTableName))) + if vindexIgnoreNulls { + buf.Myprintf(" where ") + lastValIdx := len(vindexFromCols) - 1 + for i := range vindexFromCols { + buf.Myprintf("%s is not null", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) + if i != lastValIdx { + buf.Myprintf(" and ") + } + } } - buf.Myprintf("from %v", sqlparser.NewIdentifierCS(sourceTableName)) if vindex.Owner != "" { // Only backfill buf.Myprintf(" group by ") for i := range vindexFromCols { - buf.Myprintf("%v, ", sqlparser.NewIdentifierCI(vindexFromCols[i])) + buf.Myprintf("%s, ", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) } - buf.Myprintf("%v", sqlparser.NewIdentifierCI(vindexToCol)) + buf.Myprintf("%s", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) } materializeQuery = buf.String() diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 323769ae908..a3ce84bb495 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1334,6 +1334,127 @@ func TestCreateCustomizedVindex(t *testing.T) { } } +func TestCreateLookupVindexIgnoreNulls(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "ks", + TargetKeyspace: "ks", + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) + defer env.close() + + specs := &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "ks.lkp", + "from": "c1", + "to": "col2", + "ignore_nulls": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + } + // Dummy sourceSchema + sourceSchema := "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1" + + vschema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }}, + }, + }, + } + + wantKs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "ks.lkp", + "from": "c1", + "to": "col2", + "write_only": "true", + "ignore_nulls": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "v", + Column: "col2", + }}, + }, + "lkp": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + } + wantQuery := "select col2 as c1, col2 as col2 from t1 where c1 is not null group by c1, col2" + + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Fields: []*querypb.Field{{ + Name: "col1", + Type: querypb.Type_INT64, + }, { + Name: "col2", + Type: querypb.Type_INT64, + }}, + Schema: sourceSchema, + }}, + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vschema); err != nil { + t.Fatal(err) + } + + ms, ks, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false) + require.NoError(t, err) + if !proto.Equal(ks, wantKs) { + t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs) + } + require.NotNil(t, ms) + require.GreaterOrEqual(t, len(ms.TableSettings), 1) + require.Equal(t, ms.TableSettings[0].SourceExpression, wantQuery, "unexpected query") +} + func TestStopAfterCopyFlag(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ SourceKeyspace: "ks", From 1fac09246a1769a6ae5bee6b8eedebf9ea93686f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 4 Sep 2023 16:14:22 -0400 Subject: [PATCH 2/6] Improve unit test and add e2e test Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/config_test.go | 72 +++++++++++++------ .../vreplication/vreplication_test.go | 22 ++++++ go/vt/wrangler/materializer_test.go | 28 ++++---- 3 files changed, 86 insertions(+), 36 deletions(-) diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index dcae0f6a5bf..619abe45c59 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -74,32 +74,60 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name)); initialProductVSchema = ` { "tables": { - "product": {}, - "merchant": {}, - "orders": {}, + "product": {}, + "merchant": {}, + "orders": {}, "loadtest": {}, - "customer": {}, - "customer_seq": { - "type": "sequence" - }, - "customer2": {}, - "customer_seq2": { - "type": "sequence" - }, - "order_seq": { - "type": "sequence" - }, - "Lead": {}, - "Lead-1": {}, - "db_order_test": {}, - "vdiff_order": {}, - "datze": {}, - "reftable": { - "type": "reference" - } + "customer": {}, + "customer_seq": { + "type": "sequence" + }, + "customer2": {}, + "customer_seq2": { + "type": "sequence" + }, + "order_seq": { + "type": "sequence" + }, + "Lead": {}, + "Lead-1": {}, + "db_order_test": {}, + "vdiff_order": {}, + "datze": {}, + "reftable": { + "type": "reference" + }, + "customer_cid_keyspace_id": {} + } +} +` + + createLookupVindexVSchema = ` +{ + "sharded": true, + "vindexes": { + "customer_name_keyspace_id": { + "type": "consistent_lookup", + "params": { + "table": "product.customer_name_keyspace_id", + "from": "name,cid", + "to": "keyspace_id", + "ignore_nulls": "true" + }, + "owner": "customer" + } + }, + "tables": { + "customer": { + "column_vindexes": [{ + "columns": ["name", "cid"], + "name": "customer_name_keyspace_id" + }] + } } } ` + customerSchema = "" customerVSchema = ` { diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 74843c862eb..20d84ae1e14 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -367,6 +367,28 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string verifyCopyStateIsOptimized(t, tablet) } }) + + t.Run("Test CreateLookupVindex", func(t *testing.T) { + // CreateLookupVindex does not support noblob images. + if strings.ToLower(binlogRowImage) == "noblob" { + return + } + _, err = vtgateConn.ExecuteFetch("use customer", 1, false) + require.NoError(t, err, "error using customer keyspace: %v", err) + res, err := vtgateConn.ExecuteFetch("select count(*) from customer where name is not null", 1, false) + require.NoError(t, err, "error getting current row count in customer: %v", err) + require.Equal(t, 1, len(res.Rows), "expected 1 row in count(*) query, got %d", len(res.Rows)) + rows, _ := res.Rows[0][0].ToInt32() + // Insert a couple of rows with a NULL name to confirm that they + // are ignored. + insert := "insert into customer (cid, name, typ, sport, meta) values (100, NULL, 'soho', 'football','{}'), (101, NULL, 'enterprise','baseball','{}')" + _, err = vtgateConn.ExecuteFetch(insert, -1, false) + require.NoError(t, err, "error executing %q: %v", insert, err) + err = vc.VtctlClient.ExecuteCommand("CreateLookupVindex", "--", "--tablet_types=PRIMARY", "customer", createLookupVindexVSchema) + require.NoError(t, err, "error executing CreateLookupVindex: %v", err) + waitForWorkflowState(t, vc, "product.customer_name_keyspace_id_vdx", binlogdatapb.VReplicationWorkflowState_Stopped.String()) + waitForRowCount(t, vtgateConn, "product", "customer_name_keyspace_id", int(rows)) + }) } func TestV2WorkflowsAcrossDBVersions(t *testing.T) { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index a3ce84bb495..9296c6f70d4 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1348,11 +1348,11 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { specs := &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ "v": { - Type: "lookup_unique", + Type: "consistent_lookup", Params: map[string]string{ "table": "ks.lkp", - "from": "c1", - "to": "col2", + "from": "col2,col1", + "to": "keyspace_id", "ignore_nulls": "true", }, Owner: "t1", @@ -1361,8 +1361,8 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { Tables: map[string]*vschemapb.Table{ "t1": { ColumnVindexes: []*vschemapb.ColumnVindex{{ - Name: "v", - Column: "col2", + Name: "v", + Columns: []string{"col2", "col1"}, }}, }, }, @@ -1398,11 +1398,11 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { Type: "hash", }, "v": { - Type: "lookup_unique", + Type: "consistent_lookup", Params: map[string]string{ "table": "ks.lkp", - "from": "c1", - "to": "col2", + "from": "col2,col1", + "to": "keyspace_id", "write_only": "true", "ignore_nulls": "true", }, @@ -1415,19 +1415,19 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { Name: "hash", Column: "col1", }, { - Name: "v", - Column: "col2", + Name: "v", + Columns: []string{"col2", "col1"}, }}, }, "lkp": { ColumnVindexes: []*vschemapb.ColumnVindex{{ - Column: "c1", + Column: "col2", Name: "hash", }}, }, }, } - wantQuery := "select col2 as c1, col2 as col2 from t1 where c1 is not null group by c1, col2" + wantQuery := "select col2 as col2, col1 as col1, keyspace_id() as keyspace_id from t1 where col2 is not null and col1 is not null group by col2, col1, keyspace_id" env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ @@ -1447,12 +1447,12 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { ms, ks, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false) require.NoError(t, err) - if !proto.Equal(ks, wantKs) { + if !proto.Equal(wantKs, ks) { t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs) } require.NotNil(t, ms) require.GreaterOrEqual(t, len(ms.TableSettings), 1) - require.Equal(t, ms.TableSettings[0].SourceExpression, wantQuery, "unexpected query") + require.Equal(t, wantQuery, ms.TableSettings[0].SourceExpression, "unexpected query") } func TestStopAfterCopyFlag(t *testing.T) { From 01a3107af3ff60dd5d1414c17e3091d939512f7e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 4 Sep 2023 17:33:08 -0400 Subject: [PATCH 3/6] This is no longer needed Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/config_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 619abe45c59..4b9bd8c7c01 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -96,8 +96,7 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name)); "datze": {}, "reftable": { "type": "reference" - }, - "customer_cid_keyspace_id": {} + } } } ` @@ -120,8 +119,8 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name)); "tables": { "customer": { "column_vindexes": [{ - "columns": ["name", "cid"], - "name": "customer_name_keyspace_id" + "columns": ["name", "cid"], + "name": "customer_name_keyspace_id" }] } } From 70b66eec8fe128cb3c63621be62356eea66018fa Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 4 Sep 2023 19:52:18 -0400 Subject: [PATCH 4/6] Kick CI Signed-off-by: Matt Lord From 69694eb0c362c214733aa9bbaee5ad96b1fbcbd6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 5 Sep 2023 09:08:51 -0400 Subject: [PATCH 5/6] Align ignore_nulls value handling Signed-off-by: Matt Lord --- go/vt/wrangler/materializer.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 5ad03e6f932..ddfd8c90d38 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -22,7 +22,6 @@ import ( "hash/fnv" "math" "sort" - "strconv" "strings" "sync" "text/template" @@ -55,6 +54,7 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type materializer struct { @@ -563,7 +563,16 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp return nil, nil, nil, err } if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { - vindexIgnoreNulls, _ = strconv.ParseBool(ignoreNullsStr) + // This mirrors the behavior of vindexes.boolFromMap(). + switch ignoreNullsStr { + case "true": + vindexIgnoreNulls = true + case "false": + vindexIgnoreNulls = false + default: + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls value must be 'true' or 'false': '%s'", + ignoreNullsStr) + } } // Validate input table From 39e269fbf28ca0f1f41a75ffcc95166e94823b37 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 5 Sep 2023 14:55:25 -0400 Subject: [PATCH 6/6] Fix one more indentation spot in test vschema definition Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 4b9bd8c7c01..1ee1b48f91d 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -79,7 +79,7 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name)); "orders": {}, "loadtest": {}, "customer": {}, - "customer_seq": { + "customer_seq": { "type": "sequence" }, "customer2": {},