Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] connpool: Allow time out during shutdown (#15979) #16003

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 79 additions & 22 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@

var (
// ErrTimeout is returned if a connection get times out.
ErrTimeout = vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "resource pool timed out")
ErrTimeout = vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "connection pool timed out")

// ErrCtxTimeout is returned if a ctx is already expired by the time the connection pool is used
ErrCtxTimeout = vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "resource pool context already expired")
ErrCtxTimeout = vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "connection pool context already expired")

// ErrConnPoolClosed is returned when trying to get a connection from a closed conn pool
ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "connection pool is closed")

// PoolCloseTimeout is how long to wait for all connections to be returned to the pool during close
PoolCloseTimeout = 10 * time.Second
)

type Metrics struct {
Expand Down Expand Up @@ -119,8 +125,9 @@
capacity atomic.Int64

// workers is a waitgroup for all the currently running worker goroutines
workers sync.WaitGroup
close chan struct{}
workers sync.WaitGroup
close chan struct{}
capacityMu sync.Mutex

config struct {
// connect is the callback to create a new connection for the pool
Expand All @@ -142,6 +149,7 @@
}

Metrics Metrics
Name string
}

// NewPool creates a new connection pool with the given Config.
Expand Down Expand Up @@ -236,29 +244,60 @@

// Close shuts down the pool. No connections will be returned from ConnPool.Get after calling this,
// but calling ConnPool.Put is still allowed. This function will not return until all of the pool's
// connections have been returned.
// connections have been returned or the default PoolCloseTimeout has elapsed
func (pool *ConnPool[C]) Close() {
if pool.close == nil {
ctx, cancel := context.WithTimeout(context.Background(), PoolCloseTimeout)
defer cancel()

if err := pool.CloseWithContext(ctx); err != nil {
log.Errorf("failed to close pool %q: %v", pool.Name, err)
}
}

// CloseWithContext behaves like Close but allows passing in a Context to time out the
// pool closing operation
func (pool *ConnPool[C]) CloseWithContext(ctx context.Context) error {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()

if pool.close == nil || pool.capacity.Load() == 0 {
// already closed
return
return nil
}

pool.SetCapacity(0)
// close all the connections in the pool; if we time out while waiting for
// users to return our connections, we still want to finish the shutdown
// for the pool
err := pool.setCapacity(ctx, 0)

close(pool.close)
pool.workers.Wait()
pool.close = nil
return err
}

func (pool *ConnPool[C]) reopen() {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()

capacity := pool.capacity.Load()
if capacity == 0 {
return
}

pool.Close()
pool.open()
pool.SetCapacity(capacity)
ctx, cancel := context.WithTimeout(context.Background(), PoolCloseTimeout)
defer cancel()

// to re-open the connection pool, first set the capacity to 0 so we close
// all the existing connections, as they're now connected to a stale MySQL
// instance.
if err := pool.setCapacity(ctx, 0); err != nil {
log.Errorf("failed to reopen pool %q: %v", pool.Name, err)

Check warning on line 295 in go/pools/smartconnpool/pool.go

View check run for this annotation

Codecov / codecov/patch

go/pools/smartconnpool/pool.go#L295

Added line #L295 was not covered by tests
}

// the second call to setCapacity cannot fail because it's only increasing the number
// of connections and doesn't need to shut down any
_ = pool.setCapacity(ctx, capacity)
}

// IsOpen returns whether the pool is open
Expand Down Expand Up @@ -322,7 +361,7 @@
return nil, ErrCtxTimeout
}
if pool.capacity.Load() == 0 {
return nil, ErrTimeout
return nil, ErrConnPoolClosed

Check warning on line 364 in go/pools/smartconnpool/pool.go

View check run for this annotation

Codecov / codecov/patch

go/pools/smartconnpool/pool.go#L364

Added line #L364 was not covered by tests
}
if setting == nil {
return pool.get(ctx)
Expand Down Expand Up @@ -572,39 +611,55 @@
// If the capacity is smaller than the number of connections that there are
// currently open, we'll close enough connections before returning, even if
// that means waiting for clients to return connections to the pool.
func (pool *ConnPool[C]) SetCapacity(newcap int64) {
// If the given context times out before we've managed to close enough connections
// an error will be returned.
func (pool *ConnPool[C]) SetCapacity(ctx context.Context, newcap int64) error {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()
return pool.setCapacity(ctx, newcap)
}

// setCapacity is the internal implementation for SetCapacity; it must be called
// with pool.capacityMu being held
func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
if newcap < 0 {
panic("negative capacity")
}

oldcap := pool.capacity.Swap(newcap)
if oldcap == newcap {
return
return nil

Check warning on line 631 in go/pools/smartconnpool/pool.go

View check run for this annotation

Codecov / codecov/patch

go/pools/smartconnpool/pool.go#L631

Added line #L631 was not covered by tests
}

backoff := 1 * time.Millisecond
const delay = 10 * time.Millisecond

// close connections until we're under capacity
for pool.active.Load() > newcap {
if err := ctx.Err(); err != nil {
return vterrors.Errorf(vtrpcpb.Code_ABORTED,
"timed out while waiting for connections to be returned to the pool (capacity=%d, active=%d, borrowed=%d)",
pool.capacity.Load(), pool.active.Load(), pool.borrowed.Load())
}
// if we're closing down the pool, make sure there's no clients waiting
// for connections because they won't be returned in the future
if newcap == 0 {
pool.wait.expire(true)
}

// try closing from connections which are currently idle in the stacks
conn := pool.getFromSettingsStack(nil)
if conn == nil {
conn, _ = pool.clean.Pop()
}
if conn == nil {
time.Sleep(backoff)
backoff += 1 * time.Millisecond
time.Sleep(delay)
continue
}
conn.Close()
pool.closedConn()
}

// if we're closing down the pool, wake up any blocked waiters because no connections
// are going to be returned in the future
if newcap == 0 {
pool.wait.expire(true)
}
return nil
}

func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
Expand Down Expand Up @@ -660,6 +715,8 @@
return
}

pool.Name = name

stats.NewGaugeFunc(name+"Capacity", "Tablet server conn pool capacity", func() int64 {
return pool.Capacity()
})
Expand Down
72 changes: 62 additions & 10 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ func TestOpen(t *testing.T) {
assert.EqualValues(t, 6, state.lastID.Load())

// SetCapacity
p.SetCapacity(3)
err = p.SetCapacity(ctx, 3)
require.NoError(t, err)
assert.EqualValues(t, 3, state.open.Load())
assert.EqualValues(t, 6, state.lastID.Load())
assert.EqualValues(t, 3, p.Capacity())
assert.EqualValues(t, 3, p.Available())

p.SetCapacity(6)
err = p.SetCapacity(ctx, 6)
require.NoError(t, err)
assert.EqualValues(t, 6, p.Capacity())
assert.EqualValues(t, 6, p.Available())

Expand Down Expand Up @@ -265,7 +267,9 @@ func TestShrinking(t *testing.T) {
}
done := make(chan bool)
go func() {
p.SetCapacity(3)
err := p.SetCapacity(ctx, 3)
require.NoError(t, err)

done <- true
}()
expected := map[string]any{
Expand Down Expand Up @@ -335,7 +339,8 @@ func TestShrinking(t *testing.T) {

// This will also wait
go func() {
p.SetCapacity(2)
err := p.SetCapacity(ctx, 2)
require.NoError(t, err)
done <- true
}()
time.Sleep(10 * time.Millisecond)
Expand All @@ -353,7 +358,8 @@ func TestShrinking(t *testing.T) {
assert.EqualValues(t, 2, state.open.Load())

// Test race condition of SetCapacity with itself
p.SetCapacity(3)
err = p.SetCapacity(ctx, 3)
require.NoError(t, err)
for i := 0; i < 3; i++ {
var r *Pooled[*TestConn]
var err error
Expand All @@ -375,9 +381,15 @@ func TestShrinking(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// This will wait till we Put
go p.SetCapacity(2)
go func() {
err := p.SetCapacity(ctx, 2)
require.NoError(t, err)
}()
time.Sleep(10 * time.Millisecond)
go p.SetCapacity(4)
go func() {
err := p.SetCapacity(ctx, 4)
require.NoError(t, err)
}()
time.Sleep(10 * time.Millisecond)

// This should not hang
Expand All @@ -387,7 +399,7 @@ func TestShrinking(t *testing.T) {
<-done

assert.Panics(t, func() {
p.SetCapacity(-1)
_ = p.SetCapacity(ctx, -1)
})

assert.EqualValues(t, 4, p.Capacity())
Expand Down Expand Up @@ -530,6 +542,46 @@ func TestReopen(t *testing.T) {
assert.EqualValues(t, 0, state.open.Load())
}

func TestUserClosing(t *testing.T) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 5,
IdleTimeout: time.Second,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

var resources [5]*Pooled[*TestConn]
for i := 0; i < 5; i++ {
var err error
resources[i], err = p.Get(ctx, nil)
require.NoError(t, err)
}

for _, r := range resources[:4] {
r.Recycle()
}

ch := make(chan error)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := p.CloseWithContext(ctx)
ch <- err
close(ch)
}()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Pool did not shutdown after 5s")
case err := <-ch:
require.Error(t, err)
t.Logf("Shutdown error: %v", err)
}
}

func TestIdleTimeout(t *testing.T) {
testTimeout := func(t *testing.T, setting *Setting) {
var state TestState
Expand Down Expand Up @@ -818,7 +870,7 @@ func TestTimeout(t *testing.T) {
newctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
_, err = p.Get(newctx, setting)
cancel()
assert.EqualError(t, err, "resource pool timed out")
assert.EqualError(t, err, "connection pool timed out")

}

Expand All @@ -842,7 +894,7 @@ func TestExpired(t *testing.T) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
_, err := p.Get(ctx, setting)
cancel()
require.EqualError(t, err, "resource pool context already expired")
require.EqualError(t, err, "connection pool context already expired")
}
}

Expand Down
6 changes: 4 additions & 2 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,10 @@ func TestSidecarTables(t *testing.T) {
}

func TestConsolidation(t *testing.T) {
defer framework.Server.SetPoolSize(framework.Server.PoolSize())
framework.Server.SetPoolSize(1)
defer framework.Server.SetPoolSize(context.Background(), framework.Server.PoolSize())

err := framework.Server.SetPoolSize(context.Background(), 1)
require.NoError(t, err)

const tag = "Waits/Histograms/Consolidations/Count"

Expand Down
7 changes: 5 additions & 2 deletions go/vt/vttablet/endtoend/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package endtoend

import (
"context"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -98,11 +99,13 @@ func TestStreamConsolidation(t *testing.T) {

defaultPoolSize := framework.Server.StreamPoolSize()

framework.Server.SetStreamPoolSize(4)
err = framework.Server.SetStreamPoolSize(context.Background(), 4)
require.NoError(t, err)

framework.Server.SetStreamConsolidationBlocking(true)

defer func() {
framework.Server.SetStreamPoolSize(defaultPoolSize)
_ = framework.Server.SetStreamPoolSize(context.Background(), defaultPoolSize)
framework.Server.SetStreamConsolidationBlocking(false)
}()

Expand Down
6 changes: 0 additions & 6 deletions go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,9 @@ import (
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// ErrConnPoolClosed is returned when the connection pool is closed.
var ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "internal error: unexpected: conn pool is closed")

const (
getWithoutS = "GetWithoutSettings"
getWithS = "GetWithSettings"
Expand Down
Loading
Loading