Skip to content

Commit

Permalink
Merge pull request #79 from github/r15.0/add-shutdown-state-in-vtgate
Browse files Browse the repository at this point in the history
Gracefully shutdown VTGate instances
  • Loading branch information
arthurschreiber authored Oct 10, 2023
2 parents 6e3ab03 + ebd0acf commit 1b7e877
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
29 changes: 29 additions & 0 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ type Conn struct {
// enableQueryInfo controls whether we parse the INFO field in QUERY_OK packets
// See: ConnParams.EnableQueryInfo
enableQueryInfo bool

// mu protects the fields below
mu sync.Mutex
// this is used to mark the connection to be closed so that the command phase for the connection can be stopped and
// the connection gets closed.
closing bool
}

// splitStatementFunciton is the function that is used to split the statement in case of a multi-statement query.
Expand Down Expand Up @@ -895,6 +901,11 @@ func (c *Conn) handleNextCommand(handler Handler) bool {
return false
}

// before continue to process the packet, check if the connection should be closed or not.
if c.IsMarkedForClose() {
return false
}

switch data[0] {
case ComQuit:
c.recycleReadPacket()
Expand Down Expand Up @@ -1581,3 +1592,21 @@ func (c *Conn) IsUnixSocket() bool {
func (c *Conn) GetRawConn() net.Conn {
return c.conn
}

// MarkForClose marks the connection for close.
func (c *Conn) MarkForClose() {
c.mu.Lock()
defer c.mu.Unlock()
c.closing = true
}

// IsMarkedForClose return true if the connection should be closed.
func (c *Conn) IsMarkedForClose() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.closing
}

func (c *Conn) IsShuttingDown() bool {
return c.listener.isShutdown()
}
3 changes: 2 additions & 1 deletion go/mysql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ func (l *Listener) handle(conn net.Conn, connectionID uint32, acceptTime time.Ti

for {
kontinue := c.handleNextCommand(l.handler)
if !kontinue {
// before going for next command check if the connection should be closed or not.
if !kontinue || c.IsMarkedForClose() {
return
}
}
Expand Down
11 changes: 8 additions & 3 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ func startSpan(ctx context.Context, query, label string) (trace.Span, context.Co
}

func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
session := vh.session(c)
if c.IsShuttingDown() && !session.InTransaction {
c.MarkForClose()
return mysql.NewSQLError(mysql.ERServerShutdown, mysql.SSNetError, "Server shutdown in progress")
}

ctx := context.Background()
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
Expand Down Expand Up @@ -209,7 +215,6 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq
"VTGate MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session := vh.session(c)
if !session.InTransaction {
atomic.AddInt32(&busyConnections, 1)
}
Expand Down Expand Up @@ -536,11 +541,11 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys

func shutdownMysqlProtocolAndDrain() {
if mysqlListener != nil {
mysqlListener.Close()
mysqlListener.Shutdown()
mysqlListener = nil
}
if mysqlUnixListener != nil {
mysqlUnixListener.Close()
mysqlUnixListener.Shutdown()
mysqlUnixListener = nil
}
if sigChan != nil {
Expand Down

0 comments on commit 1b7e877

Please sign in to comment.