Skip to content

Commit

Permalink
make an option to not reuse conns
Browse files Browse the repository at this point in the history
This required some refactoring so that the interface between the proxy and the
mysql server uses a struct that wraps the vtgateconn as well as the session,
all so that we can call Close on the conn when the mysql server thread
completes.
  • Loading branch information
demmer committed May 7, 2024
1 parent 10d4525 commit f1f71e0
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 48 deletions.
Binary file added go/cmd/vtgate/vtgate
Binary file not shown.
1 change: 1 addition & 0 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
target := targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}
targets[target.PoolType] = append(targets[target.PoolType], target)
}
b.targets = targets

for poolType := range targets {
if b.affinityField != "" {
Expand Down
62 changes: 31 additions & 31 deletions go/vt/vtgateproxy/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ func (ph *proxyHandler) NewConnection(c *mysql.Conn) {

func (ph *proxyHandler) ComResetConnection(c *mysql.Conn) {
ctx := context.Background()
session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return
}
if session.SessionPb().InTransaction {
if conn.session.SessionPb().InTransaction {
defer atomic.AddInt32(&busyConnections, -1)
}
err = ph.proxy.CloseSession(ctx, session)
err = ph.proxy.Close(ctx, conn)
if err != nil {
log.Errorf("Error happened in transaction rollback: %v", err)
}
Expand All @@ -127,14 +127,14 @@ func (ph *proxyHandler) ConnectionClosed(c *mysql.Conn) {
} else {
ctx = context.Background()
}
session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return
}
if session.SessionPb().InTransaction {
if conn.session.SessionPb().InTransaction {
defer atomic.AddInt32(&busyConnections, -1)
}
_ = ph.proxy.CloseSession(ctx, session)
_ = ph.proxy.Close(ctx, conn)
}

// Regexp to extract parent span id over the sql query
Expand Down Expand Up @@ -199,30 +199,30 @@ func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql
"VTGate MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return err
}
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, 1)
}
defer func() {
if session == nil || !session.SessionPb().InTransaction {
if conn == nil || !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, -1)
}
}()

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback)
if conn.session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, conn, query, make(map[string]*querypb.BindVariable), callback)
return mysql.NewSQLErrorFromError(err)
}

result, err := ph.proxy.Execute(ctx, session, query, make(map[string]*querypb.BindVariable))
result, err := ph.proxy.Execute(ctx, conn, query, make(map[string]*querypb.BindVariable))

if err := mysql.NewSQLErrorFromError(err); err != nil {
return err
}
fillInTxStatusFlags(c, session)
fillInTxStatusFlags(c, conn.session)
return callback(result)
}

Expand Down Expand Up @@ -264,20 +264,20 @@ func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[str
"VTGateProxy MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return nil, err
}
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, 1)
}
defer func() {
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, -1)
}
}()

session, fld, err := ph.proxy.Prepare(ctx, session, query, bindVars)
fld, err := ph.proxy.Prepare(ctx, conn, query, bindVars)
err = mysql.NewSQLErrorFromError(err)
if err != nil {
return nil, err
Expand Down Expand Up @@ -309,29 +309,29 @@ func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData
"VTGateProxy MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return err
}
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, 1)
}
defer func() {
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, -1)
}
}()

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback)
if conn.session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, conn, prepare.PrepareStmt, prepare.BindVars, callback)
return mysql.NewSQLErrorFromError(err)
}

qr, err := ph.proxy.Execute(ctx, session, prepare.PrepareStmt, prepare.BindVars)
qr, err := ph.proxy.Execute(ctx, conn, prepare.PrepareStmt, prepare.BindVars)
if err != nil {
return mysql.NewSQLErrorFromError(err)
}
fillInTxStatusFlags(c, session)
fillInTxStatusFlags(c, conn.session)

return callback(qr)
}
Expand All @@ -350,9 +350,9 @@ func (ph *proxyHandler) ComBinlogDumpGTID(c *mysql.Conn, gtidSet mysql.GTIDSet)
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComBinlogDumpGTID")
}

func (ph *proxyHandler) getSession(ctx context.Context, c *mysql.Conn) (*vtgateconn.VTGateSession, error) {
session, _ := c.ClientData.(*vtgateconn.VTGateSession)
if session == nil {
func (ph *proxyHandler) getProxyConn(ctx context.Context, c *mysql.Conn) (*proxyConn, error) {
conn, _ := c.ClientData.(*proxyConn)
if conn == nil {
options := &querypb.ExecuteOptions{
IncludedFields: querypb.ExecuteOptions_ALL,
Workload: querypb.ExecuteOptions_Workload(mysqlDefaultWorkload),
Expand All @@ -363,18 +363,18 @@ func (ph *proxyHandler) getSession(ctx context.Context, c *mysql.Conn) (*vtgatec
}

var err error
session, err = ph.proxy.NewSession(ctx, options, c.Attributes)
conn, err = ph.proxy.Connect(ctx, options, c.Attributes)
if err != nil {
log.Errorf("error creating new session for %s: %v", c.GetRawConn().RemoteAddr().String(), err)
return nil, err
}

if session != nil {
c.ClientData = session
if conn != nil {
c.ClientData = conn
}
}

return session, nil
return conn, nil
}

var mysqlListener *mysql.Listener
Expand Down
50 changes: 33 additions & 17 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
affinityValue = flag.String("affinity_value", "", "Value to match for routing affinity , e.g. 'use-az1'")
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
portField = flag.String("port_field", "port", "field name in the json file containing the port")
reuseConns = flag.Bool("reuse_conns", true, "Reuse the grpc connection across multiple requests (default true)")

timings = stats.NewTimings("Timings", "proxy timings by operation", "operation")

Expand All @@ -69,9 +70,19 @@ type VTGateProxy struct {
mu sync.RWMutex
}

type proxyConn struct {
conn *vtgateconn.VTGateConn
session *vtgateconn.VTGateSession
}

func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) {
log.V(100).Infof("Getting connection for %v\n", target)

// If we're not reusing conns, then just return a new one each time with no need to do any locking
if !*reuseConns {
return vtgateconn.DialProtocol(ctx, "grpc", target)
}

// If the connection exists, return it
proxy.mu.RLock()
existingConn := proxy.targetConns[target]
Expand Down Expand Up @@ -109,8 +120,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt
return conn, nil
}

func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) {

func (proxy *VTGateProxy) Connect(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*proxyConn, error) {
targetUrl := url.URL{
Scheme: "vtgate",
Host: "pool",
Expand Down Expand Up @@ -141,47 +151,50 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu
return nil, err
}

return conn.Session("", options), nil
session := conn.Session("", options)

return &proxyConn{conn, session}, nil
}

// CloseSession closes the session, rolling back any implicit transactions. This has the
// same effect as if a "rollback" statement was executed, but does not affect the query
// statistics.
func (proxy *VTGateProxy) CloseSession(ctx context.Context, session *vtgateconn.VTGateSession) error {
return session.CloseSession(ctx)
}
func (proxy *VTGateProxy) Close(ctx context.Context, conn *proxyConn) error {
err := conn.session.CloseSession(ctx)

// ResolveTransaction resolves the specified 2PC transaction.
func (proxy *VTGateProxy) ResolveTransaction(ctx context.Context, dtid string) error {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "not implemented")
if !*reuseConns {
conn.conn.Close()
}

return err
}

// Prepare supports non-streaming prepare statement query with multi shards
func (proxy *VTGateProxy) Prepare(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable) (newsession *vtgateconn.VTGateSession, fld []*querypb.Field, err error) {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "not implemented")
func (proxy *VTGateProxy) Prepare(ctx context.Context, conn *proxyConn, sql string, bindVariables map[string]*querypb.BindVariable) (fld []*querypb.Field, err error) {
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "not implemented")
}

func (proxy *VTGateProxy) Execute(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable) (qr *sqltypes.Result, err error) {
func (proxy *VTGateProxy) Execute(ctx context.Context, conn *proxyConn, sql string, bindVariables map[string]*querypb.BindVariable) (qr *sqltypes.Result, err error) {

// Intercept "use" statements since they just have to update the local session
if strings.HasPrefix(sql, "use ") {
targetString := sqlescape.UnescapeID(sql[4:])
session.SessionPb().TargetString = targetString
conn.session.SessionPb().TargetString = targetString
return &sqltypes.Result{}, nil
}

startTime := time.Now()
defer timings.Record(executeTimingKey, startTime)

return session.Execute(ctx, sql, bindVariables)
return conn.session.Execute(ctx, sql, bindVariables)

}

func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
func (proxy *VTGateProxy) StreamExecute(ctx context.Context, conn *proxyConn, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
startTime := time.Now()
defer timings.Record(streamExecuteTimingKey, startTime)

stream, err := session.StreamExecute(ctx, sql, bindVariables)
stream, err := conn.session.StreamExecute(ctx, sql, bindVariables)
if err != nil {
return err
}
Expand All @@ -203,7 +216,10 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn
func Init() {
log.V(100).Infof("Registering GRPC dial options")
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
if *reuseConns {
opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`))
}
return opts, nil
})

RegisterJSONGateResolver(
Expand Down
Binary file added vtgateproxy
Binary file not shown.

0 comments on commit f1f71e0

Please sign in to comment.