Skip to content

Commit

Permalink
*: Support idle transaction timeout (#48714)
Browse files Browse the repository at this point in the history
close #48712
  • Loading branch information
crazycs520 authored Dec 5, 2023
1 parent 0fd779c commit 6492800
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 1 deletion.
24 changes: 24 additions & 0 deletions pkg/executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,30 @@ func TestSetVar(t *testing.T) {
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("2"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=64;")
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("64"))

// test tidb_idle_transaction_timeout
tk.MustQuery("select @@session.tidb_idle_transaction_timeout").Check(testkit.Rows("0"))
tk.MustExec("SET SESSION tidb_idle_transaction_timeout = 2")
tk.MustQuery("select @@session.tidb_idle_transaction_timeout").Check(testkit.Rows("2"))
tk.MustGetErrMsg("SET SESSION tidb_idle_transaction_timeout='x';", "[variable:1232]Incorrect argument type to variable 'tidb_idle_transaction_timeout'")
tk.MustExec("SET SESSION tidb_idle_transaction_timeout=31536001;")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_idle_transaction_timeout value: '31536001'"))
tk.MustQuery("select @@session.tidb_idle_transaction_timeout").Check(testkit.Rows("31536000"))
tk.MustExec("SET SESSION tidb_idle_transaction_timeout = 0")
tk.MustQuery("select @@session.tidb_idle_transaction_timeout").Check(testkit.Rows("0"))
tk.MustExec("SET SESSION tidb_idle_transaction_timeout=31536000;")
tk.MustQuery("select @@session.tidb_idle_transaction_timeout").Check(testkit.Rows("31536000"))
tk.MustQuery("select @@global.tidb_idle_transaction_timeout").Check(testkit.Rows("0"))
tk.MustExec("SET GLOBAL tidb_idle_transaction_timeout = 1")
tk.MustQuery("select @@global.tidb_idle_transaction_timeout").Check(testkit.Rows("1"))
tk.MustGetErrMsg("SET GLOBAL tidb_idle_transaction_timeout='x';", "[variable:1232]Incorrect argument type to variable 'tidb_idle_transaction_timeout'")
tk.MustExec("SET GLOBAL tidb_idle_transaction_timeout=31536001;")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_idle_transaction_timeout value: '31536001'"))
tk.MustQuery("select @@global.tidb_idle_transaction_timeout").Check(testkit.Rows("31536000"))
tk.MustExec("SET GLOBAL tidb_idle_transaction_timeout = 0")
tk.MustQuery("select @@global.tidb_idle_transaction_timeout").Check(testkit.Rows("0"))
tk.MustExec("SET GLOBAL tidb_idle_transaction_timeout=31536000;")
tk.MustQuery("select @@global.tidb_idle_transaction_timeout").Check(testkit.Rows("31536000"))
}

func TestSetCollationAndCharset(t *testing.T) {
Expand Down
10 changes: 9 additions & 1 deletion pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,14 @@ func (cc *clientConn) writePacket(data []byte) error {
return cc.pkt.WritePacket(data)
}

func (cc *clientConn) getWaitTimeout(ctx context.Context) uint64 {
sessVars := cc.ctx.GetSessionVars()
if sessVars.InTxn() && sessVars.IdleTransactionTimeout > 0 {
return uint64(sessVars.IdleTransactionTimeout)
}
return cc.getSessionVarsWaitTimeout(ctx)
}

// getSessionVarsWaitTimeout get session variable wait_timeout
func (cc *clientConn) getSessionVarsWaitTimeout(ctx context.Context) uint64 {
valStr, exists := cc.ctx.GetSessionVars().GetSystemVar(variable.WaitTimeout)
Expand Down Expand Up @@ -1033,7 +1041,7 @@ func (cc *clientConn) Run(ctx context.Context) {
cc.alloc.Reset()
// close connection when idle time is more than wait_timeout
// default 28800(8h), FIXME: should not block at here when we kill the connection.
waitTimeout := cc.getSessionVarsWaitTimeout(ctx)
waitTimeout := cc.getWaitTimeout(ctx)
cc.pkt.SetReadTimeout(time.Duration(waitTimeout) * time.Second)
start := time.Now()
data, err := cc.readPacket()
Expand Down
222 changes: 222 additions & 0 deletions pkg/server/tests/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2907,6 +2907,228 @@ func TestChunkReuseCorruptSysVarString(t *testing.T) {
require.Equal(t, "Asia/Shanghai", rows[0])
}

func TestTiDBIdleTransactionTimeout(t *testing.T) {
ts := createTidbTestTopSQLSuite(t)
cases := []func(dbt *testkit.DBTestKit){}
// Test simple txn.
cases = append(cases, func(dbt *testkit.DBTestKit) {
dbt.MustExec("use test;")
dbt.MustExec("create table t1 (id int key);")
dbt.MustExec("set @@tidb_idle_transaction_timeout = 1")
tx, err := dbt.GetDB().Begin()
require.NoError(t, err)
rows, err := tx.Query("select * from t1;")
require.NoError(t, err)
ts.CheckRows(t, rows, "")
time.Sleep(1500 * time.Millisecond)
_, err = tx.Query("select * from t1;")
require.Error(t, err)
require.Equal(t, "invalid connection", err.Error())
})
// Test raw conn.
cases = append(cases, func(dbt *testkit.DBTestKit) {
ctx := context.Background()
dbt.MustExec("use test;")
dbt.MustExec("create table t2 (id int key);")
dbt.MustExec("set @@tidb_idle_transaction_timeout = 1")
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "begin")
require.NoError(t, err)
rows, err := conn.QueryContext(ctx, "select * from t2;")
require.NoError(t, err)
ts.CheckRows(t, rows, "")
time.Sleep(1500 * time.Millisecond)
_, err = conn.QueryContext(ctx, "select * from t2;")
require.Error(t, err)
require.Equal(t, "invalid connection", err.Error())
})
// Test txn write.
cases = append(cases, func(dbt *testkit.DBTestKit) {
dbt.MustExec("use test;")
dbt.MustExec("create table t3 (id int key);")
dbt.MustExec("set @@tidb_idle_transaction_timeout = 1")
tx, err := dbt.GetDB().Begin()
require.NoError(t, err)
_, err = tx.Exec("insert into t3 values (1)")
require.NoError(t, err)
time.Sleep(1500 * time.Millisecond)
_, err = tx.Exec("commit")
require.Error(t, err)
require.Equal(t, "invalid connection", err.Error())
rows := dbt.MustQuery("select * from t3;")
ts.CheckRows(t, rows, "")
})
// Test autocommit=0.
cases = append(cases, func(dbt *testkit.DBTestKit) {
dbt.MustExec("use test;")
dbt.MustExec("create table t4 (id int key);")
dbt.MustExec("set @@tidb_idle_transaction_timeout = 1")
ctx := context.Background()
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "set @@autocommit=0")
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "insert into t4 values (1)")
require.NoError(t, err)
time.Sleep(1500 * time.Millisecond)
_, err = conn.ExecContext(ctx, "commit")
require.Error(t, err)
require.Equal(t, "invalid connection", err.Error())
rows := dbt.MustQuery("select * from t4;")
ts.CheckRows(t, rows, "")
})
// Test autocommit=1.
cases = append(cases, func(dbt *testkit.DBTestKit) {
dbt.MustExec("use test;")
dbt.MustExec("create table t5 (id int key);")
dbt.MustExec("set @@tidb_idle_transaction_timeout = 1")
ctx := context.Background()
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "set @@autocommit=1")
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "insert into t5 values (1)")
require.NoError(t, err)
time.Sleep(1500 * time.Millisecond)
_, err = conn.ExecContext(ctx, "insert into t5 values (2)")
require.NoError(t, err)
rows := dbt.MustQuery("select * from t5;")
ts.CheckRows(t, rows, "1\n2")
})
// Test sleep stmt in txn.
cases = append(cases, func(dbt *testkit.DBTestKit) {
dbt.MustExec("use test;")
dbt.MustExec("create table t6 (id int key);")
dbt.MustExec("set @@tidb_idle_transaction_timeout = 1")
tx, err := dbt.GetDB().Begin()
require.NoError(t, err)
_, err = tx.Exec("insert into t6 values (1)")
require.NoError(t, err)
rows, err := tx.Query("select sleep(1.5)")
require.NoError(t, err)
ts.CheckRows(t, rows, "0")
_, err = tx.Exec("commit")
require.NoError(t, err)
rows = dbt.MustQuery("select * from t6;")
ts.CheckRows(t, rows, "1")
})
// Test sleep stmt in raw conn.
cases = append(cases, func(dbt *testkit.DBTestKit) {
dbt.MustExec("use test;")
dbt.MustExec("create table t7 (id int key);")
dbt.MustExec("set @@tidb_idle_transaction_timeout = 1")
ctx := context.Background()
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "begin")
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "insert into t7 values (1)")
require.NoError(t, err)
rows, err := conn.QueryContext(ctx, "select sleep(1.5)")
require.NoError(t, err)
ts.CheckRows(t, rows, "0")
_, err = conn.ExecContext(ctx, "commit")
require.NoError(t, err)
rows = dbt.MustQuery("select * from t7;")
ts.CheckRows(t, rows, "1")
})
// Test many sleep stmts in txn.
cases = append(cases, func(dbt *testkit.DBTestKit) {
dbt.MustExec("use test;")
dbt.MustExec("create table t8 (id int key);")
dbt.MustExec("set @@tidb_idle_transaction_timeout = 1")
tx, err := dbt.GetDB().Begin()
require.NoError(t, err)
_, err = tx.Exec("insert into t8 values (1)")
require.NoError(t, err)
rows, err := tx.Query("select sleep(0.5)")
require.NoError(t, err)
ts.CheckRows(t, rows, "0")
_, err = tx.Exec("insert into t8 values (2)")
require.NoError(t, err)
rows, err = tx.Query("select sleep(0.5)")
require.NoError(t, err)
ts.CheckRows(t, rows, "0")
_, err = tx.Exec("insert into t8 values (3)")
require.NoError(t, err)
rows, err = tx.Query("select sleep(0.5)")
require.NoError(t, err)
ts.CheckRows(t, rows, "0")
_, err = tx.Exec("commit")
require.NoError(t, err)
rows = dbt.MustQuery("select * from t8;")
ts.CheckRows(t, rows, "1\n2\n3")
})

var wg sync.WaitGroup
for _, ca := range cases {
wg.Add(1)
go func(fn func(dbt *testkit.DBTestKit)) {
defer wg.Done()
ts.RunTests(t, nil, fn)
}(ca)
}
// Test release lock.
wg.Add(2)
go func() {
defer wg.Done()
db1, err := sql.Open("mysql", ts.GetDSN(nil))
require.NoError(t, err)
db2, err := sql.Open("mysql", ts.GetDSN(nil))
require.NoError(t, err)
defer func() {
err := db1.Close()
require.NoError(t, err)
err = db2.Close()
require.NoError(t, err)
}()
ctx := context.Background()
conn1, err := db1.Conn(ctx)
require.NoError(t, err)
_, err = conn1.ExecContext(ctx, "create table t (id int key);")
require.NoError(t, err)
_, err = conn1.ExecContext(ctx, "set @@tidb_idle_transaction_timeout = 1")
require.NoError(t, err)
_, err = conn1.ExecContext(ctx, "insert into t values (1)")
require.NoError(t, err)
_, err = conn1.ExecContext(ctx, "begin")
require.NoError(t, err)
_, err = conn1.ExecContext(ctx, "select * from t for update")
require.NoError(t, err)
_, err = conn1.ExecContext(ctx, "insert into t values (2)")
require.NoError(t, err)
go func() {
defer wg.Done()
conn2, err := db2.Conn(ctx)
require.NoError(t, err)
_, err = conn2.ExecContext(ctx, "set @@tidb_idle_transaction_timeout = 1")
require.NoError(t, err)
_, err = conn2.ExecContext(ctx, "begin")
require.NoError(t, err)
_, err = conn2.ExecContext(ctx, "select * from t for update")
require.NoError(t, err)
_, err = conn2.ExecContext(ctx, "insert into t values (3)")
require.NoError(t, err)
_, err = conn2.ExecContext(ctx, "commit")
require.NoError(t, err)
rows, err := db2.QueryContext(ctx, "select * from t")
require.NoError(t, err)
ts.CheckRows(t, rows, "1\n3")
}()
time.Sleep(1500 * time.Millisecond)
_, err = conn1.ExecContext(ctx, "commit")
require.Error(t, err)
require.Equal(t, "invalid connection", err.Error())
rows, err := db1.QueryContext(ctx, "select * from t where id=2")
require.NoError(t, err)
ts.CheckRows(t, rows, "")
}()

// wait all test case finished.
wg.Wait()
}

type mockProxyProtocolProxy struct {
frontend string
backend string
Expand Down
3 changes: 3 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2779,6 +2779,9 @@ type Concurrency struct {

// SourceAddr is the source address of request. Available in coprocessor ONLY.
SourceAddr net.TCPAddr

// IdleTransactionTimeout indicates the maximum time duration a transaction could be idle, unit is second.
IdleTransactionTimeout int
}

// SetIndexLookupConcurrency set the number of concurrent index lookup worker.
Expand Down
5 changes: 5 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2954,6 +2954,11 @@ var defaultSysVars = []*SysVar{
SchemaVersionCacheLimit.Store(TidbOptInt64(val, DefTiDBSchemaVersionCacheLimit))
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBIdleTransactionTimeout, Value: strconv.Itoa(DefTiDBIdleTransactionTimeout), Type: TypeUnsigned, MinValue: 0, MaxValue: secondsPerYear,
SetSession: func(s *SessionVars, val string) error {
s.IdleTransactionTimeout = tidbOptPositiveInt32(val, DefTiDBIdleTransactionTimeout)
return nil
}},
}

func setTiFlashComputeDispatchPolicy(s *SessionVars, val string) error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,10 @@ const (
// TiDBEnableTiFlashPipelineMode means if we should use pipeline model to execute query or not in tiflash.
// It's deprecated and setting it will not have any effect.
TiDBEnableTiFlashPipelineMode = "tidb_enable_tiflash_pipeline_model"
// TiDBIdleTransactionTimeout indicates the maximum time duration a transaction could be idle, unit is second.
// Any idle transaction will be killed after being idle for `tidb_idle_transaction_timeout` seconds.
// This is similar to https://docs.percona.com/percona-server/5.7/management/innodb_kill_idle_trx.html and https://mariadb.com/kb/en/transaction-timeouts/
TiDBIdleTransactionTimeout = "tidb_idle_transaction_timeout"
)

// TiDB intentional limits
Expand Down Expand Up @@ -1432,6 +1436,7 @@ const (
DefTiDBOptEnableHashJoin = true
DefTiDBOptObjective = OptObjectiveModerate
DefTiDBSchemaVersionCacheLimit = 16
DefTiDBIdleTransactionTimeout = 0
)

// Process global variables.
Expand Down

0 comments on commit 6492800

Please sign in to comment.