Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make an option to not reuse conns #334

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added go/cmd/vtgate/vtgate
Binary file not shown.
1 change: 1 addition & 0 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
target := targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}
targets[target.PoolType] = append(targets[target.PoolType], target)
}
b.targets = targets

for poolType := range targets {
if b.affinityField != "" {
Expand Down
62 changes: 31 additions & 31 deletions go/vt/vtgateproxy/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ func (ph *proxyHandler) NewConnection(c *mysql.Conn) {

func (ph *proxyHandler) ComResetConnection(c *mysql.Conn) {
ctx := context.Background()
session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return
}
if session.SessionPb().InTransaction {
if conn.session.SessionPb().InTransaction {
defer atomic.AddInt32(&busyConnections, -1)
}
err = ph.proxy.CloseSession(ctx, session)
err = ph.proxy.Close(ctx, conn)
if err != nil {
log.Errorf("Error happened in transaction rollback: %v", err)
}
Expand All @@ -127,14 +127,14 @@ func (ph *proxyHandler) ConnectionClosed(c *mysql.Conn) {
} else {
ctx = context.Background()
}
session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return
}
if session.SessionPb().InTransaction {
if conn.session.SessionPb().InTransaction {
defer atomic.AddInt32(&busyConnections, -1)
}
_ = ph.proxy.CloseSession(ctx, session)
_ = ph.proxy.Close(ctx, conn)
}

// Regexp to extract parent span id over the sql query
Expand Down Expand Up @@ -199,30 +199,30 @@ func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql
"VTGate MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return err
}
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, 1)
}
defer func() {
if session == nil || !session.SessionPb().InTransaction {
if conn == nil || !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, -1)
}
}()

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback)
if conn.session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, conn, query, make(map[string]*querypb.BindVariable), callback)
return mysql.NewSQLErrorFromError(err)
}

result, err := ph.proxy.Execute(ctx, session, query, make(map[string]*querypb.BindVariable))
result, err := ph.proxy.Execute(ctx, conn, query, make(map[string]*querypb.BindVariable))

if err := mysql.NewSQLErrorFromError(err); err != nil {
return err
}
fillInTxStatusFlags(c, session)
fillInTxStatusFlags(c, conn.session)
return callback(result)
}

Expand Down Expand Up @@ -264,20 +264,20 @@ func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[str
"VTGateProxy MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return nil, err
}
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, 1)
}
defer func() {
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, -1)
}
}()

session, fld, err := ph.proxy.Prepare(ctx, session, query, bindVars)
fld, err := ph.proxy.Prepare(ctx, conn, query, bindVars)
err = mysql.NewSQLErrorFromError(err)
if err != nil {
return nil, err
Expand Down Expand Up @@ -309,29 +309,29 @@ func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData
"VTGateProxy MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session, err := ph.getSession(ctx, c)
conn, err := ph.getProxyConn(ctx, c)
if err != nil {
return err
}
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, 1)
}
defer func() {
if !session.SessionPb().InTransaction {
if !conn.session.SessionPb().InTransaction {
atomic.AddInt32(&busyConnections, -1)
}
}()

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback)
if conn.session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, conn, prepare.PrepareStmt, prepare.BindVars, callback)
return mysql.NewSQLErrorFromError(err)
}

qr, err := ph.proxy.Execute(ctx, session, prepare.PrepareStmt, prepare.BindVars)
qr, err := ph.proxy.Execute(ctx, conn, prepare.PrepareStmt, prepare.BindVars)
if err != nil {
return mysql.NewSQLErrorFromError(err)
}
fillInTxStatusFlags(c, session)
fillInTxStatusFlags(c, conn.session)

return callback(qr)
}
Expand All @@ -350,9 +350,9 @@ func (ph *proxyHandler) ComBinlogDumpGTID(c *mysql.Conn, gtidSet mysql.GTIDSet)
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComBinlogDumpGTID")
}

func (ph *proxyHandler) getSession(ctx context.Context, c *mysql.Conn) (*vtgateconn.VTGateSession, error) {
session, _ := c.ClientData.(*vtgateconn.VTGateSession)
if session == nil {
func (ph *proxyHandler) getProxyConn(ctx context.Context, c *mysql.Conn) (*proxyConn, error) {
conn, _ := c.ClientData.(*proxyConn)
if conn == nil {
options := &querypb.ExecuteOptions{
IncludedFields: querypb.ExecuteOptions_ALL,
Workload: querypb.ExecuteOptions_Workload(mysqlDefaultWorkload),
Expand All @@ -363,18 +363,18 @@ func (ph *proxyHandler) getSession(ctx context.Context, c *mysql.Conn) (*vtgatec
}

var err error
session, err = ph.proxy.NewSession(ctx, options, c.Attributes)
conn, err = ph.proxy.Connect(ctx, options, c.Attributes)
if err != nil {
log.Errorf("error creating new session for %s: %v", c.GetRawConn().RemoteAddr().String(), err)
return nil, err
}

if session != nil {
c.ClientData = session
if conn != nil {
c.ClientData = conn
}
}

return session, nil
return conn, nil
}

var mysqlListener *mysql.Listener
Expand Down
50 changes: 33 additions & 17 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
affinityValue = flag.String("affinity_value", "", "Value to match for routing affinity , e.g. 'use-az1'")
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
portField = flag.String("port_field", "port", "field name in the json file containing the port")
reuseConns = flag.Bool("reuse_conns", true, "Reuse the grpc connection across multiple requests (default true)")

timings = stats.NewTimings("Timings", "proxy timings by operation", "operation")

Expand All @@ -69,9 +70,19 @@ type VTGateProxy struct {
mu sync.RWMutex
}

type proxyConn struct {
conn *vtgateconn.VTGateConn
session *vtgateconn.VTGateSession
}

func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) {
log.V(100).Infof("Getting connection for %v\n", target)

// If we're not reusing conns, then just return a new one each time with no need to do any locking
if !*reuseConns {
return vtgateconn.DialProtocol(ctx, "grpc", target)
}

// If the connection exists, return it
proxy.mu.RLock()
existingConn := proxy.targetConns[target]
Expand Down Expand Up @@ -109,8 +120,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt
return conn, nil
}

func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) {

func (proxy *VTGateProxy) Connect(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*proxyConn, error) {
targetUrl := url.URL{
Scheme: "vtgate",
Host: "pool",
Expand Down Expand Up @@ -141,47 +151,50 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu
return nil, err
}

return conn.Session("", options), nil
session := conn.Session("", options)

return &proxyConn{conn, session}, nil
}

// CloseSession closes the session, rolling back any implicit transactions. This has the
// same effect as if a "rollback" statement was executed, but does not affect the query
// statistics.
func (proxy *VTGateProxy) CloseSession(ctx context.Context, session *vtgateconn.VTGateSession) error {
return session.CloseSession(ctx)
}
func (proxy *VTGateProxy) Close(ctx context.Context, conn *proxyConn) error {
err := conn.session.CloseSession(ctx)

// ResolveTransaction resolves the specified 2PC transaction.
func (proxy *VTGateProxy) ResolveTransaction(ctx context.Context, dtid string) error {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "not implemented")
if !*reuseConns {
conn.conn.Close()
}

return err
}

// Prepare supports non-streaming prepare statement query with multi shards
func (proxy *VTGateProxy) Prepare(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable) (newsession *vtgateconn.VTGateSession, fld []*querypb.Field, err error) {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "not implemented")
func (proxy *VTGateProxy) Prepare(ctx context.Context, conn *proxyConn, sql string, bindVariables map[string]*querypb.BindVariable) (fld []*querypb.Field, err error) {
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "not implemented")
}

func (proxy *VTGateProxy) Execute(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable) (qr *sqltypes.Result, err error) {
func (proxy *VTGateProxy) Execute(ctx context.Context, conn *proxyConn, sql string, bindVariables map[string]*querypb.BindVariable) (qr *sqltypes.Result, err error) {

// Intercept "use" statements since they just have to update the local session
if strings.HasPrefix(sql, "use ") {
targetString := sqlescape.UnescapeID(sql[4:])
session.SessionPb().TargetString = targetString
conn.session.SessionPb().TargetString = targetString
return &sqltypes.Result{}, nil
}

startTime := time.Now()
defer timings.Record(executeTimingKey, startTime)

return session.Execute(ctx, sql, bindVariables)
return conn.session.Execute(ctx, sql, bindVariables)

}

func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
func (proxy *VTGateProxy) StreamExecute(ctx context.Context, conn *proxyConn, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
startTime := time.Now()
defer timings.Record(streamExecuteTimingKey, startTime)

stream, err := session.StreamExecute(ctx, sql, bindVariables)
stream, err := conn.session.StreamExecute(ctx, sql, bindVariables)
if err != nil {
return err
}
Expand All @@ -203,7 +216,10 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn
func Init() {
log.V(100).Infof("Registering GRPC dial options")
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
if *reuseConns {
opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`))
}
return opts, nil
})

RegisterJSONGateResolver(
Expand Down
Binary file added vtgateproxy
Binary file not shown.
Loading