diff --git a/changelog/18.0/18.0.0/summary.md b/changelog/18.0/18.0.0/summary.md
index 61cf6be1cf8..cc1384db96c 100644
--- a/changelog/18.0/18.0.0/summary.md
+++ b/changelog/18.0/18.0.0/summary.md
@@ -9,6 +9,7 @@
- [VTOrc flag `--allow-emergency-reparent`](#new-flag-toggle-ers)
- [VTOrc flag `--change-tablets-with-errant-gtid-to-drained`](#new-flag-errant-gtid-convert)
- [ERS sub flag `--wait-for-all-tablets`](#new-ers-subflag)
+ - [VTGate flag `--grpc-send-session-in-streaming`](#new-vtgate-streaming-sesion)
- **[VTAdmin](#vtadmin)**
- [Updated to node v18.16.0](#update-node)
- **[Deprecations and Deletions](#deprecations-and-deletions)**
@@ -63,6 +64,14 @@ for a response from all the tablets. Originally `EmergencyReparentShard` was mea
We have realized now that there are cases when the replication is broken but all the tablets are reachable. In these cases, it is advisable to
call `EmergencyReparentShard` with `--wait-for-all-tablets` so that it doesn't ignore one of the tablets.
+#### VTGate GRPC stream execute session flag `--grpc-send-session-in-streaming`
+
+This flag enables transaction support on `StreamExecute` api.
+One enabled, VTGate `StreamExecute` grpc api will send session as the last packet in the response.
+The client should enable it only when they have made the required changes to expect such a packet.
+
+It is disabled by default.
+
### VTAdmin
#### vtadmin-web updated to node v18.16.0 (LTS)
diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt
index 89f6544ca8f..a496b2765d2 100644
--- a/go/flags/endtoend/vtgate.txt
+++ b/go/flags/endtoend/vtgate.txt
@@ -63,6 +63,7 @@ Flags:
--foreign_key_mode string This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow (default "allow")
--gate_query_cache_memory int gate server query cache size in bytes, maximum amount of memory to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432)
--gateway_initial_tablet_timeout duration At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type (default 30s)
+ --grpc-send-session-in-streaming If set, will send the session as last packet in streaming api to support transactions in streaming
--grpc-use-effective-groups If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.
--grpc-use-static-authentication-callerid If set, will set the immediate caller id to the username authenticated by the static auth plugin.
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
diff --git a/go/test/endtoend/vtgate/grpc_api/main_test.go b/go/test/endtoend/vtgate/grpc_api/main_test.go
index a51c6d9e6f2..3c8605f79a0 100644
--- a/go/test/endtoend/vtgate/grpc_api/main_test.go
+++ b/go/test/endtoend/vtgate/grpc_api/main_test.go
@@ -111,6 +111,7 @@ func TestMain(m *testing.M) {
"--grpc_auth_static_password_file", grpcServerAuthStaticPath,
"--grpc_use_effective_callerid",
"--grpc-use-static-authentication-callerid",
+ "--grpc-send-session-in-streaming",
}
// Configure vttablet to use table ACL
diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go
index 7baff6cefe8..bf00db4ea1c 100644
--- a/go/vt/vtgate/grpcvtgateservice/server.go
+++ b/go/vt/vtgate/grpcvtgateservice/server.go
@@ -48,12 +48,15 @@ var (
useEffective bool
useEffectiveGroups bool
useStaticAuthenticationIdentity bool
+
+ sendSessionInStreaming bool
)
func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&useEffective, "grpc_use_effective_callerid", false, "If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.")
fs.BoolVar(&useEffectiveGroups, "grpc-use-effective-groups", false, "If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.")
fs.BoolVar(&useStaticAuthenticationIdentity, "grpc-use-static-authentication-callerid", false, "If set, will set the immediate caller id to the username authenticated by the static auth plugin.")
+ fs.BoolVar(&sendSessionInStreaming, "grpc-send-session-in-streaming", false, "If set, will send the session as last packet in streaming api to support transactions in streaming")
}
func init() {
@@ -192,19 +195,22 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream
})
})
- // even if there is an error, session could have been modified.
- // So, this needs to be sent back to the client. Session is sent in the last stream response.
- lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
- Session: session,
- })
-
var errs []error
if vtgErr != nil {
errs = append(errs, vtgErr)
}
- if lastErr != nil {
- errs = append(errs, lastErr)
+
+ if sendSessionInStreaming {
+ // even if there is an error, session could have been modified.
+ // So, this needs to be sent back to the client. Session is sent in the last stream response.
+ lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
+ Session: session,
+ })
+ if lastErr != nil {
+ errs = append(errs, lastErr)
+ }
}
+
return vterrors.ToGRPC(vterrors.Aggregate(errs))
}
diff --git a/go/vt/vttablet/tabletserver/exclude_race_test.go b/go/vt/vttablet/tabletserver/exclude_race_test.go
new file mode 100644
index 00000000000..6e55671ac96
--- /dev/null
+++ b/go/vt/vttablet/tabletserver/exclude_race_test.go
@@ -0,0 +1,62 @@
+//go:build !race
+
+package tabletserver
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "vitess.io/vitess/go/sqltypes"
+ querypb "vitess.io/vitess/go/vt/proto/query"
+ "vitess.io/vitess/go/vt/sqlparser"
+ "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
+)
+
+// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
+// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
+// truncate the error text in logs, but will not truncate the error text in the
+// error value.
+func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ tl := newTestLogger()
+ defer tl.Close()
+ logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
+ db, tsv := setupTabletServerTest(t, ctx, "")
+ defer tsv.StopService()
+ defer db.Close()
+
+ longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
+ longBv := map[string]*querypb.BindVariable{
+ "bv1": sqltypes.Int64BindVariable(1111111111),
+ "bv2": sqltypes.Int64BindVariable(2222222222),
+ "bv3": sqltypes.Int64BindVariable(3333333333),
+ "bv4": sqltypes.Int64BindVariable(4444444444),
+ }
+ origTruncateErrLen := sqlparser.GetTruncateErrLen()
+ sqlparser.SetTruncateErrLen(32)
+ defer sqlparser.SetTruncateErrLen(origTruncateErrLen)
+
+ defer func() {
+ err := logStats.Error
+ want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), want)
+ want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
+ gotWhatWeWant := false
+ for _, log := range tl.getLogs() {
+ if strings.HasPrefix(log, want) {
+ gotWhatWeWant = true
+ break
+ }
+ }
+ assert.True(t, gotWhatWeWant)
+ }()
+
+ defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
+ panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
+}
diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go
index 2f4988cc2b6..d2fb10e5a77 100644
--- a/go/vt/vttablet/tabletserver/tabletserver_test.go
+++ b/go/vt/vttablet/tabletserver/tabletserver_test.go
@@ -1492,51 +1492,6 @@ func TestHandleExecUnknownError(t *testing.T) {
panic("unknown exec error")
}
-// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
-// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
-// truncate the error text in logs, but will not truncate the error text in the
-// error value.
-func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- tl := newTestLogger()
- defer tl.Close()
- logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
- db, tsv := setupTabletServerTest(t, ctx, "")
- defer tsv.StopService()
- defer db.Close()
-
- longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
- longBv := map[string]*querypb.BindVariable{
- "bv1": sqltypes.Int64BindVariable(1111111111),
- "bv2": sqltypes.Int64BindVariable(2222222222),
- "bv3": sqltypes.Int64BindVariable(3333333333),
- "bv4": sqltypes.Int64BindVariable(4444444444),
- }
- origTruncateErrLen := sqlparser.GetTruncateErrLen()
- sqlparser.SetTruncateErrLen(32)
- defer sqlparser.SetTruncateErrLen(origTruncateErrLen)
-
- defer func() {
- err := logStats.Error
- want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
- require.Error(t, err)
- assert.Contains(t, err.Error(), want)
- want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
- gotWhatWeWant := false
- for _, log := range tl.getLogs() {
- if strings.HasPrefix(log, want) {
- gotWhatWeWant = true
- break
- }
- }
- assert.True(t, gotWhatWeWant)
- }()
-
- defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
- panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
-}
-
func TestQueryAsString(t *testing.T) {
longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{