Skip to content

Commit

Permalink
[release-17.0] fix: GetField to use existing session for query (#13219)…
Browse files Browse the repository at this point in the history
… (#13245)

* fix: getfield to use existing session for query

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* added test comments

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

---------

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
Co-authored-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
vitess-bot[bot] and harshit-gangal authored Jun 6, 2023
1 parent 511481b commit ea6675d
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 22 deletions.
61 changes: 47 additions & 14 deletions go/test/endtoend/vtgate/transaction/single/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (
"context"
_ "embed"
"flag"
"fmt"
"os"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/utils"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/vtgate/planbuilder"
)

var (
Expand Down Expand Up @@ -70,6 +71,7 @@ func TestMain(m *testing.M) {
}

// Start vtgate
clusterInstance.VtGatePlannerVersion = planbuilder.Gen4
clusterInstance.VtGateExtraArgs = []string{"--transaction_mode", "SINGLE"}
err = clusterInstance.StartVtgate()
if err != nil {
Expand Down Expand Up @@ -168,12 +170,8 @@ func TestLookupDangleRowLaterMultiDB(t *testing.T) {
}

func TestLookupDangleRowRecordInSameShard(t *testing.T) {
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()
defer func() {
utils.Exec(t, conn, `delete from txn_unique_constraints where txn_id = 'txn1'`)
}()
conn, cleanup := setup(t)
defer cleanup()

// insert a dangling row in lookup table
utils.Exec(t, conn, `INSERT INTO uniqueConstraint_vdx(unique_constraint, keyspace_id) VALUES ('foo', 'J\xda\xf0p\x0e\xcc(\x8fਁ\xa7P\x86\xa5=')`)
Expand All @@ -190,12 +188,8 @@ func TestLookupDangleRowRecordInSameShard(t *testing.T) {
}

func TestMultiDbSecondRecordLookupDangle(t *testing.T) {
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()
defer func() {
utils.Exec(t, conn, `delete from uniqueConstraint_vdx where unique_constraint = 'bar'`)
}()
conn, cleanup := setup(t)
defer cleanup()

// insert a dangling row in lookup table
utils.Exec(t, conn, `INSERT INTO uniqueConstraint_vdx(unique_constraint, keyspace_id) VALUES ('bar', '\x86\xc8\xc5\x1ac\xfb\x8c+6\xe4\x1f\x03\xd8ϝB')`)
Expand All @@ -220,3 +214,42 @@ func TestMultiDbSecondRecordLookupDangle(t *testing.T) {
// no row should exist.
utils.AssertMatches(t, conn, `select txn_id from txn_unique_constraints`, `[]`)
}

// TestNoRecordInTableNotFail test that vindex lookup query creates a transaction on one shard say x.
// To fetch the fields for the actual table, the Select Impossible query should also be reouted to x.
// If it routes to other shard then the test will fail with multi-shard transaction attempted error.
// The fix ensures it does not happen.
func TestNoRecordInTableNotFail(t *testing.T) {
conn, cleanup := setup(t)
defer cleanup()

utils.AssertMatches(t, conn, `select @@transaction_mode`, `[[VARCHAR("SINGLE")]]`)
// Need to run this test multiple times as shards are picked randomly for Impossible query.
// After the fix it is not random if a shard session already exists then it reuses that same shard session.
for i := 0; i < 100; i++ {
utils.Exec(t, conn, `begin`)
utils.Exec(t, conn, `INSERT INTO t1(id, txn_id) VALUES (1, "t1")`)
utils.Exec(t, conn, `SELECT * FROM t2 WHERE id = 1`)
utils.Exec(t, conn, `rollback`)
}
}

func setup(t *testing.T) (*mysql.Conn, func()) {
t.Helper()
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

tables := []string{
"txn_unique_constraints", "uniqueConstraint_vdx",
"t1", "t1_id_vdx", "t2", "t2_id_vdx",
}
cleanup := func() {
utils.Exec(t, conn, "set transaction_mode=multi")
for _, table := range tables {
utils.Exec(t, conn, fmt.Sprintf("delete from %s /* cleanup */", table))
}
utils.Exec(t, conn, "set transaction_mode=single")
}
cleanup()
return conn, cleanup
}
26 changes: 25 additions & 1 deletion go/test/endtoend/vtgate/transaction/single/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,28 @@ CREATE TABLE uniqueConstraint_vdx(
`unique_constraint` VARCHAR(50) NOT NULL,
`keyspace_id` VARBINARY(50) NOT NULL,
PRIMARY KEY(unique_constraint)
) ENGINE=InnoDB;
) ENGINE=InnoDB;

CREATE TABLE `t1` (
`id` bigint(20) NOT NULL,
`txn_id` varchar(50) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `t1_id_vdx` (
`id` bigint(20) NOT NULL,
`keyspace_id` varbinary(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `t2` (
`id` bigint(20) NOT NULL,
`txn_id` varchar(50) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `t2_id_vdx` (
`id` bigint(20) NOT NULL,
`keyspace_id` varbinary(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
63 changes: 63 additions & 0 deletions go/test/endtoend/vtgate/transaction/single/vschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,29 @@
"autocommit": "true"
},
"owner": "txn_unique_constraints"
},
"hash_vdx": {
"type": "hash"
},
"t1_id_vdx": {
"type": "consistent_lookup_unique",
"params": {
"autocommit": "true",
"from": "id",
"table": "t1_id_vdx",
"to": "keyspace_id"
},
"owner": "t1"
},
"t2_id_vdx": {
"type": "consistent_lookup_unique",
"params": {
"autocommit": "true",
"from": "id",
"table": "t2_id_vdx",
"to": "keyspace_id"
},
"owner": "t2"
}
},
"tables": {
Expand All @@ -35,6 +58,46 @@
"name": "unicode_loose_md5_vdx"
}
]
},
"t1": {
"columnVindexes": [
{
"column": "txn_id",
"name": "unicode_loose_md5_vdx"
},
{
"column": "id",
"name": "t1_id_vdx"
}
]
},
"t2": {
"columnVindexes": [
{
"column": "txn_id",
"name": "unicode_loose_md5_vdx"
},
{
"column": "id",
"name": "t2_id_vdx"
}
]
},
"t1_id_vdx": {
"columnVindexes": [
{
"column": "id",
"name": "hash_vdx"
}
]
},
"t2_id_vdx": {
"columnVindexes": [
{
"column": "id",
"name": "hash_vdx"
}
]
}
}
}
29 changes: 22 additions & 7 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,30 @@ func (route *Route) mergeSort(

// GetFields fetches the field info.
func (route *Route) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(ctx, route.Keyspace.Name, nil, []key.Destination{key.DestinationAnyShard{}})
if err != nil {
return nil, err
var rs *srvtopo.ResolvedShard

// Use an existing shard session
sss := vcursor.Session().ShardSession()
for _, ss := range sss {
if ss.Target.Keyspace == route.Keyspace.Name {
rs = ss
break
}
}
if len(rss) != 1 {
// This code is unreachable. It's just a sanity check.
return nil, fmt.Errorf("no shards for keyspace: %s", route.Keyspace.Name)

// If not find, then pick any shard.
if rs == nil {
rss, _, err := vcursor.ResolveDestinations(ctx, route.Keyspace.Name, nil, []key.Destination{key.DestinationAnyShard{}})
if err != nil {
return nil, err
}
if len(rss) != 1 {
// This code is unreachable. It's just a sanity check.
return nil, fmt.Errorf("no shards for keyspace: %s", route.Keyspace.Name)
}
rs = rss[0]
}
qr, err := execShard(ctx, route, vcursor, route.FieldQuery, bindVars, rss[0], false /* rollbackOnError */, false /* canAutocommit */)
qr, err := execShard(ctx, route, vcursor, route.FieldQuery, bindVars, rs, false /* rollbackOnError */, false /* canAutocommit */)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit ea6675d

Please sign in to comment.