Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-16.0] Rewrite USING to ON condition for joins (#13931) #13940

Merged
merged 5 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,12 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
if !cluster.ReusingVTDATAROOT {
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name)
}
var mysqlctlProcessList []*exec.Cmd
for _, shardName := range shardNames {
shard := &Shard{
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
mysqlctlProcessList = []*exec.Cmd{}
var mysqlctlProcessList []*exec.Cmd
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
Expand Down Expand Up @@ -1191,8 +1190,16 @@ func (cluster *LocalProcessCluster) VtprocessInstanceFromVttablet(tablet *Vttabl
}

// StartVttablet starts a new tablet
func (cluster *LocalProcessCluster) StartVttablet(tablet *Vttablet, servingStatus string,
supportBackup bool, cell string, keyspaceName string, hostname string, shardName string) error {
func (cluster *LocalProcessCluster) StartVttablet(
tablet *Vttablet,
explicitServingStatus bool,
servingStatus string,
supportBackup bool,
cell string,
keyspaceName string,
hostname string,
shardName string,
) error {
tablet.VttabletProcess = VttabletProcessInstance(
tablet.HTTPPort,
tablet.GrpcPort,
Expand All @@ -1210,6 +1217,7 @@ func (cluster *LocalProcessCluster) StartVttablet(tablet *Vttablet, servingStatu

tablet.VttabletProcess.SupportsBackup = supportBackup
tablet.VttabletProcess.ServingStatus = servingStatus
tablet.VttabletProcess.ExplicitServingStatus = explicitServingStatus
return tablet.VttabletProcess.Setup()
}

Expand Down
15 changes: 13 additions & 2 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (
"vitess.io/vitess/go/vt/log"
)

const vttabletStateTimeout = 30 * time.Second

// VttabletProcess is a generic handle for a running vttablet .
// It can be spawned manually
type VttabletProcess struct {
Expand Down Expand Up @@ -67,6 +69,7 @@ type VttabletProcess struct {
QueryzURL string
StatusDetailsURL string
SupportsBackup bool
ExplicitServingStatus bool
ServingStatus string
DbPassword string
DbPort int
Expand All @@ -75,7 +78,7 @@ type VttabletProcess struct {
Charset string
ConsolidationsURL string

//Extra Args to be set before starting the vttablet process
// Extra Args to be set before starting the vttablet process
ExtraArgs []string

proc *exec.Cmd
Expand Down Expand Up @@ -146,7 +149,15 @@ func (vttablet *VttabletProcess) Setup() (err error) {
}()

if vttablet.ServingStatus != "" {
if err = vttablet.WaitForTabletStatus(vttablet.ServingStatus); err != nil {
// If the tablet has an explicit serving status we use the serving status
// otherwise we wait for any serving status to show up in the healthcheck.
var servingStatus []string
if vttablet.ExplicitServingStatus {
servingStatus = append(servingStatus, vttablet.ServingStatus)
} else {
servingStatus = append(servingStatus, "SERVING", "NOT_SERVING")
}
if err = vttablet.WaitForTabletStatuses(servingStatus); err != nil {
errFileContent, _ := os.ReadFile(fname)
if errFileContent != nil {
log.Infof("vttablet error:\n%s\n", string(errFileContent))
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/tabletmanager/custom_rule_topo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestTopoCustomRule(t *testing.T) {
require.Nil(t, err, "error should be Nil")

// Start Vttablet
err = clusterInstance.StartVttablet(rTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(rTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.Nil(t, err, "error should be Nil")

err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate")
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/tabletmanager/primary/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestPrimaryRestartSetsTERTimestamp(t *testing.T) {
require.NoError(t, err)

// Start Vttablet
err = clusterInstance.StartVttablet(&replicaTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(&replicaTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

// Make sure that the TER did not change
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestTabletReshuffle(t *testing.T) {

// SupportsBackup=False prevents vttablet from trying to restore
// Start vttablet process
err = clusterInstance.StartVttablet(rTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(rTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

sql := "select value from t1"
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestHealthCheck(t *testing.T) {
utils.Exec(t, replicaConn, fmt.Sprintf("create database vt_%s", keyspaceName))

// start vttablet process, should be in SERVING state as we already have a primary
err = clusterInstance.StartVttablet(rTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(rTablet, true, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

conn, err := mysql.Connect(ctx, &primaryTabletParams)
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {
// - the second tablet will be set to 'drained' and we expect that
// - the query service won't be shutdown

//Wait if tablet is not in service state
// Wait if tablet is not in service state
defer cluster.PanicHandler(t)
err := rdonlyTablet.VttabletProcess.WaitForTabletStatus("SERVING")
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestFallbackSecurityPolicy(t *testing.T) {

// Requesting an unregistered security_policy should fallback to deny-all.
clusterInstance.VtTabletExtraArgs = []string{"--security_policy", "bogus"}
err = clusterInstance.StartVttablet(mTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(mTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

// It should deny ADMIN role.
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestDenyAllSecurityPolicy(t *testing.T) {

// Requesting a deny-all security_policy.
clusterInstance.VtTabletExtraArgs = []string{"--security_policy", "deny-all"}
err = clusterInstance.StartVttablet(mTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(mTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

// It should deny ADMIN role.
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestReadOnlySecurityPolicy(t *testing.T) {

// Requesting a read-only security_policy.
clusterInstance.VtTabletExtraArgs = []string{"--security_policy", "read-only"}
err = clusterInstance.StartVttablet(mTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(mTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

// It should deny ADMIN role.
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/tabletmanager/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestEnsureDB(t *testing.T) {

log.Info(fmt.Sprintf("Started vttablet %v", tablet))
// Start vttablet process as replica. It won't be able to serve because there's no db.
err = clusterInstance.StartVttablet(tablet, "NOT_SERVING", false, cell, "dbtest", hostname, "0")
err = clusterInstance.StartVttablet(tablet, false, "NOT_SERVING", false, cell, "dbtest", hostname, "0")
require.NoError(t, err)

// Make it the primary.
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestResetReplicationParameters(t *testing.T) {

log.Info(fmt.Sprintf("Started vttablet %v", tablet))
// Start vttablet process as replica. It won't be able to serve because there's no db.
err = clusterInstance.StartVttablet(tablet, "NOT_SERVING", false, cell, "dbtest", hostname, "0")
err = clusterInstance.StartVttablet(tablet, false, "NOT_SERVING", false, cell, "dbtest", hostname, "0")
require.NoError(t, err)

// Set a replication source on the tablet and start replication
Expand Down
8 changes: 8 additions & 0 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,11 @@ func TestBuggyOuterJoin(t *testing.T) {
mcmp.Exec("insert into t1(id1, id2) values (1,2), (42,5), (5, 42)")
mcmp.Exec("select t1.id1, t2.id1 from t1 left join t1 as t2 on t2.id1 = t2.id2")
}

func TestLeftJoinUsingUnsharded(t *testing.T) {
mcmp, closer := start(t)
defer closer()

utils.Exec(t, mcmp.VtConn, "insert /*vt+ QUERY_TIMEOUT_MS=1000 */ into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
utils.Exec(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from uks.unsharded as A left join uks.unsharded as B using(id1)")
}
2 changes: 2 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
VT09009 = errorWithoutState("VT09009", vtrpcpb.Code_FAILED_PRECONDITION, "stream is supported only for primary tablet type, current type: %v", "Stream is only supported for primary tablets, please use a stream on those tablets.")
VT09010 = errorWithoutState("VT09010", vtrpcpb.Code_FAILED_PRECONDITION, "SHOW VITESS_THROTTLER STATUS works only on primary tablet", "SHOW VITESS_THROTTLER STATUS works only on primary tablet.")
VT09013 = errorWithoutState("VT09013", vtrpcpb.Code_FAILED_PRECONDITION, "semi-sync plugins are not loaded", "Durability policy wants Vitess to use semi-sync, but the MySQL instances don't have the semi-sync plugin loaded.")
VT09015 = errorWithoutState("VT09015", vtrpcpb.Code_FAILED_PRECONDITION, "schema tracking required", "This query cannot be planned without more information on the SQL schema. Please turn on schema tracking or add authoritative columns information to your VSchema.")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")

Expand Down Expand Up @@ -125,6 +126,7 @@ var (
VT09009,
VT09010,
VT09013,
VT09015,
VT10001,
VT12001,
VT13001,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vterrors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func TestStackFormat(t *testing.T) {
// but the change in errors#27 made them incomparable. Assert that
// various kinds of errors have a functional equality operator, even
// if the result of that equality is always false.
func TestErrorEquality(t *testing.T) {
func TestErrorEquality(_ *testing.T) {
vals := []error{
nil,
io.EOF,
Expand Down
22 changes: 22 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/from_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -6500,6 +6500,28 @@
]
}
},
{
"comment": "left join with using has to be transformed into inner join with on condition",
"query": "SELECT * FROM unsharded_authoritative as A LEFT JOIN unsharded_authoritative as B USING(col1)",
"plan": {
"QueryType": "SELECT",
"Original": "SELECT * FROM unsharded_authoritative as A LEFT JOIN unsharded_authoritative as B USING(col1)",
"Instructions": {
"OperatorType": "Route",
"Variant": "Unsharded",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select A.col1 as col1, A.col2 as col2, B.col2 as col2 from unsharded_authoritative as A left join unsharded_authoritative as B on A.col1 = B.col1 where 1 != 1",
"Query": "select A.col1 as col1, A.col2 as col2, B.col2 as col2 from unsharded_authoritative as A left join unsharded_authoritative as B on A.col1 = B.col1",
"Table": "unsharded_authoritative"
},
"TablesUsed": [
"main.unsharded_authoritative"
]
}
},
{
"comment": "join query using table with muticolumn vindex",
"query": "select 1 from multicol_tbl m1 join multicol_tbl m2 on m1.cola = m2.cola",
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/testdata/unsupported_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@
"comment": "join with USING construct",
"query": "select * from user join user_extra using(id)",
"v3-plan": "VT12001: unsupported: JOIN with USING(column_list) clause for complex queries",
"gen4-plan": "can't handle JOIN USING without authoritative tables"
"gen4-plan": "VT09015: schema tracking required"
},
{
"comment": "join with USING construct with 3 tables",
"query": "select user.id from user join user_extra using(id) join music using(id2)",
"v3-plan": "VT12001: unsupported: JOIN with USING(column_list) clause for complex queries",
"gen4-plan": "can't handle JOIN USING without authoritative tables"
"gen4-plan": "VT09015: schema tracking required"
},
{
"comment": "natural left join",
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/semantics/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ func (a *analyzer) analyzeUp(cursor *sqlparser.Cursor) bool {
return false
}

if err := a.rewriter.up(cursor); err != nil {
a.setError(err)
return true
}

a.leaveProjection(cursor)
return a.shouldContinue()
}
Expand Down
7 changes: 0 additions & 7 deletions go/vt/vtgate/semantics/binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ func (b *binder) up(cursor *sqlparser.Cursor) error {
}
currScope.joinUsing[ident.Lowered()] = deps.direct
}
if len(node.Using) > 0 {
err := rewriteJoinUsing(currScope, node.Using, b.org)
if err != nil {
return err
}
node.Using = nil
}
case *sqlparser.ColName:
currentScope := b.scoper.currentScope()
deps, err := b.resolveColumn(node, currentScope, false)
Expand Down
Loading
Loading