Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTGate StreamExecute rpc to return session as response #13131

Merged
merged 8 commits into from
May 30, 2023
13 changes: 13 additions & 0 deletions changelog/17.0/17.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
- [Deprecated Flags](#vttablet-deprecated-flags)
- **[VReplication](#VReplication)**
- [Support for the `noblob` binlog row image mode](#noblob)
- **[VTGate](#vtgate)
- [StreamExecute GRPC API](#stream-execute)

## <a id="major-changes"/> Major Changes

Expand Down Expand Up @@ -381,3 +383,14 @@ would not work. Given the criticality of VReplication workflows within Vitess, t

We have addressed this issue in [PR #12950](https://github.com/vitessio/vitess/pull/12950) by adding support for processing the compressed transaction events in VReplication,
without any (known) limitations.

### <a id="vtgate"/> VTGate

#### <a id="stream-execute"/> Modified StreamExecute GRPC API
Earlier VTGate grpc api for `StreamExecute` did not return the session in the response.
Even though the underlying implementation supported transactions and other features that requires session persistence.
With [PR #13131](https://github.com/vitessio/vitess/pull/13131) VTGate will return the session to the client
so that it can be persisted with the client and sent back to VTGate on the next api call.

This does not impact anyone using the mysql client library to connect to VTGate.
This could be a breaking change for grpc api users based on how they have implemented their grpc clients.
4 changes: 2 additions & 2 deletions go/cmd/vtgateclienttest/services/callerid.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (c *callerIDClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Ses
return c.fallbackClient.ExecuteBatch(ctx, session, sqlList, bindVariablesList)
}

func (c *callerIDClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
func (c *callerIDClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) {
if ok, err := c.checkCallerID(ctx, sql); ok {
return err
return session, err
}
return c.fallbackClient.StreamExecute(ctx, session, sql, bindVariables, callback)
}
4 changes: 2 additions & 2 deletions go/cmd/vtgateclienttest/services/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func (c *echoClient) Execute(ctx context.Context, session *vtgatepb.Session, sql
return c.fallbackClient.Execute(ctx, session, sql, bindVariables)
}

func (c *echoClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
func (c *echoClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) {
if strings.HasPrefix(sql, EchoPrefix) {
callback(echoQueryResult(map[string]any{
"callerId": callerid.EffectiveCallerIDFromContext(ctx),
"query": sql,
"bindVars": bindVariables,
"session": session,
}))
return nil
return session, nil
}
return c.fallbackClient.StreamExecute(ctx, session, sql, bindVariables, callback)
}
Expand Down
4 changes: 2 additions & 2 deletions go/cmd/vtgateclienttest/services/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ func (c *errorClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Sessio
return c.fallbackClient.ExecuteBatch(ctx, session, sqlList, bindVariablesList)
}

func (c *errorClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
func (c *errorClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) {
if err := requestToError(sql); err != nil {
return err
return session, err
}
return c.fallbackClient.StreamExecute(ctx, session, sql, bindVariables, callback)
}
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtgateclienttest/services/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (c fallbackClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Sess
return c.fallback.ExecuteBatch(ctx, session, sqlList, bindVariablesList)
}

func (c fallbackClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
func (c fallbackClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) {
return c.fallback.StreamExecute(ctx, session, sql, bindVariables, callback)
}

Expand Down
4 changes: 2 additions & 2 deletions go/cmd/vtgateclienttest/services/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (c *terminalClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Ses
return session, nil, errTerminal
}

func (c *terminalClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
return errTerminal
func (c *terminalClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) {
return session, errTerminal
}

func (c *terminalClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) {
Expand Down
15 changes: 15 additions & 0 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"testing"
"time"

"google.golang.org/grpc"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -446,3 +451,13 @@ func WaitForHealthyShard(vtctldclient *VtctldClientProcess, keyspace, shard stri
time.Sleep(defaultRetryDelay)
}
}

// DialVTGate returns a VTGate grpc connection.
func DialVTGate(ctx context.Context, name, addr, username, password string) (*vtgateconn.VTGateConn, error) {
clientCreds := &grpcclient.StaticAuthClientCreds{Username: username, Password: password}
creds := grpc.WithPerRPCCredentials(clientCreds)
dialerFunc := grpcvtgateconn.Dial(creds)
dialerName := name
vtgateconn.RegisterDialer(dialerName, dialerFunc)
return vtgateconn.DialProtocol(ctx, dialerName, addr)
}
110 changes: 110 additions & 0 deletions go/test/endtoend/vtgate/grpc_api/acl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package grpc_api

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/callerid"
)

// TestEffectiveCallerIDWithAccess verifies that an authenticated gRPC static user with an effectiveCallerID that has ACL access can execute queries
func TestEffectiveCallerIDWithAccess(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "some_other_user", "test_password")
require.NoError(t, err)
defer vtgateConn.Close()

session := vtgateConn.Session(keyspaceName+"@primary", nil)
query := "SELECT id FROM test_table"
ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_with_access", "", ""), nil)
_, err = session.Execute(ctx, query, nil)
assert.NoError(t, err)
}

// TestEffectiveCallerIDWithNoAccess verifies that an authenticated gRPC static user without an effectiveCallerID that has ACL access cannot execute queries
func TestEffectiveCallerIDWithNoAccess(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "another_unrelated_user", "test_password")
require.NoError(t, err)
defer vtgateConn.Close()

session := vtgateConn.Session(keyspaceName+"@primary", nil)
query := "SELECT id FROM test_table"
ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_no_access", "", ""), nil)
_, err = session.Execute(ctx, query, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "Select command denied to user")
assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)")
}

// TestAuthenticatedUserWithAccess verifies that an authenticated gRPC static user with ACL access can execute queries
func TestAuthenticatedUserWithAccess(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_with_access", "test_password")
require.NoError(t, err)
defer vtgateConn.Close()

session := vtgateConn.Session(keyspaceName+"@primary", nil)
query := "SELECT id FROM test_table"
_, err = session.Execute(ctx, query, nil)
assert.NoError(t, err)
}

// TestAuthenticatedUserNoAccess verifies that an authenticated gRPC static user with no ACL access cannot execute queries
func TestAuthenticatedUserNoAccess(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_no_access", "test_password")
require.NoError(t, err)
defer vtgateConn.Close()

session := vtgateConn.Session(keyspaceName+"@primary", nil)
query := "SELECT id FROM test_table"
_, err = session.Execute(ctx, query, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "Select command denied to user")
assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)")
}

// TestUnauthenticatedUser verifies that an unauthenticated gRPC user cannot execute queries
func TestUnauthenticatedUser(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "", "")
require.NoError(t, err)
defer vtgateConn.Close()

session := vtgateConn.Session(keyspaceName+"@primary", nil)
query := "SELECT id FROM test_table"
_, err = session.Execute(ctx, query, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid credentials")
}
114 changes: 114 additions & 0 deletions go/test/endtoend/vtgate/grpc_api/execute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package grpc_api
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"fmt"
"io"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
querypb "vitess.io/vitess/go/vt/proto/query"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

func TestTransctionsWithGRPCAPI(t *testing.T) {
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_with_access", "test_password")
require.NoError(t, err)
defer vtgateConn.Close()

vtSession := vtgateConn.Session(keyspaceName, nil)
workload := []string{"OLTP", "OLAP"}
for i := 0; i < 4; i++ { // running all switch combinations.
index := i % len(workload)
_, session, err := exec(ctx, vtSession, fmt.Sprintf("set workload = %s", workload[index]), nil)
require.NoError(t, err)

require.Equal(t, workload[index], session.Options.Workload.String())
execTest(ctx, t, workload[index], vtSession)
}

}

func execTest(ctx context.Context, t *testing.T, workload string, vtSession *vtgateconn.VTGateSession) {
tcases := []struct {
query string

expRowCount int
expRowAffected int
expInTransaction bool
}{{
query: "select id, val from test_table",
}, {
query: "begin",
expInTransaction: true,
}, {
query: "insert into test_table(id, val) values (1, 'A')",
expRowAffected: 1,
expInTransaction: true,
}, {
query: "select id, val from test_table",
expRowCount: 1,
expInTransaction: true,
}, {
query: "commit",
}, {
query: "select id, val from test_table",
expRowCount: 1,
}, {
query: "delete from test_table",
expRowAffected: 1,
}}

for _, tc := range tcases {
t.Run(workload+":"+tc.query, func(t *testing.T) {
qr, session, err := exec(ctx, vtSession, tc.query, nil)
require.NoError(t, err)

assert.Len(t, qr.Rows, tc.expRowCount)
assert.EqualValues(t, tc.expRowAffected, qr.RowsAffected)
assert.EqualValues(t, tc.expInTransaction, session.InTransaction)
})
}
}

func exec(ctx context.Context, conn *vtgateconn.VTGateSession, sql string, bv map[string]*querypb.BindVariable) (*sqltypes.Result, *vtgatepb.Session, error) {
options := conn.SessionPb().GetOptions()
if options != nil && options.Workload == querypb.ExecuteOptions_OLAP {
return streamExec(ctx, conn, sql, bv)
}
res, err := conn.Execute(ctx, sql, bv)
return res, conn.SessionPb(), err
}

func streamExec(ctx context.Context, conn *vtgateconn.VTGateSession, sql string, bv map[string]*querypb.BindVariable) (*sqltypes.Result, *vtgatepb.Session, error) {
stream, err := conn.StreamExecute(ctx, sql, bv)
if err != nil {
return nil, conn.SessionPb(), err
}
result := &sqltypes.Result{}
for {
res, err := stream.Recv()
if err != nil {
if err == io.EOF {
return result, conn.SessionPb(), nil
}
return nil, conn.SessionPb(), err
}
result.Rows = append(result.Rows, res.Rows...)
result.RowsAffected += res.RowsAffected
if res.InsertID != 0 {
result.InsertID = res.InsertID
}
if res.Fields != nil {
result.Fields = res.Fields
}
}
}
Loading