Skip to content

Commit

Permalink
Make reader and writer buffers configurable (#2014)
Browse files Browse the repository at this point in the history
* Make reader and writer buffers configurable

Replace global sync pools with per-DB pools and make reader and writer buffers configurable.
Default buffer sizes: 1Mb for reader buffer, 64Kb for writer buffer.

Context:
DB connections are pooled, usually there are not much of them available,
so this is a resource used by goroutines, which have to wait for their
turn in order to get a connection and use it before returning it back to
the pool.

Before getting into the waiting line every goroutine allocates a read
and a write buffer from the sync.Pool of buffers.
Currently hardcoded reader buffer size is 1Mb, so when 1000 goroutines
wait in the queue, you get 1000Mb of buffers pre-allocated. So when an
application unexpectedly gets a spike of traffic and all the database
connections are being used, we don't get request timeouts as one could
be expecting, the application is being OOM-killed instead.

The patch addresses this issue by trading some allocations and
(probably, though my benchmarks don't really show it) latency for the
ability to serve more simultaneous connections.

* Fix tests by using properly initialized Conn
  • Loading branch information
tony2001 authored Dec 14, 2024
1 parent a7fe379 commit e55fd63
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 77 deletions.
16 changes: 8 additions & 8 deletions base.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func (db *baseDB) ExecContext(c context.Context, query interface{}, params ...in
}

func (db *baseDB) exec(ctx context.Context, query interface{}, params ...interface{}) (Result, error) {
wb := pool.GetWriteBuffer()
defer pool.PutWriteBuffer(wb)
wb := db.pool.GetWriteBuffer()
defer db.pool.PutWriteBuffer(wb)

if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
return nil, err
Expand Down Expand Up @@ -297,8 +297,8 @@ func (db *baseDB) QueryContext(c context.Context, model, query interface{}, para
}

func (db *baseDB) query(ctx context.Context, model, query interface{}, params ...interface{}) (Result, error) {
wb := pool.GetWriteBuffer()
defer pool.PutWriteBuffer(wb)
wb := db.pool.GetWriteBuffer()
defer db.pool.PutWriteBuffer(wb)

if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
return nil, err
Expand Down Expand Up @@ -374,8 +374,8 @@ func (db *baseDB) copyFrom(
) (res Result, err error) {
var evt *QueryEvent

wb := pool.GetWriteBuffer()
defer pool.PutWriteBuffer(wb)
wb := db.pool.GetWriteBuffer()
defer db.pool.PutWriteBuffer(wb)

if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
return nil, err
Expand Down Expand Up @@ -456,8 +456,8 @@ func (db *baseDB) copyTo(
) (res Result, err error) {
var evt *QueryEvent

wb := pool.GetWriteBuffer()
defer pool.PutWriteBuffer(wb)
wb := db.pool.GetWriteBuffer()
defer db.pool.PutWriteBuffer(wb)

if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
return nil, err
Expand Down
21 changes: 18 additions & 3 deletions base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

/*
The test is for testing the case that sending a cancel request when the timeout from connection comes earlier than ctx.Done().
The test is for testing the case that sending a cancel request when the timeout from connection comes earlier than ctx.Done().
*/
func Test_baseDB_withConn(t *testing.T) {
b := mockBaseDB{}
Expand Down Expand Up @@ -44,9 +44,10 @@ type mockPooler struct {
}

func (m *mockPooler) NewConn(ctx context.Context) (*pool.Conn, error) {
m.conn = &pool.Conn{ProcessID: 123, SecretKey: 234, Inited: true}
m.mockConn = mockConn{}
m.conn.SetNetConn(&m.mockConn)
m.conn = pool.NewConn(&m.mockConn, pool.NewConnPool(&pool.Options{}))
m.conn.ProcessID = 123
m.conn.SecretKey = 234
return m.conn, nil
}

Expand Down Expand Up @@ -83,6 +84,20 @@ func (m *mockPooler) Close() error {
return nil
}

func (m *mockPooler) GetWriteBuffer() *pool.WriteBuffer {
return pool.NewWriteBuffer(1024)
}

func (m *mockPooler) PutWriteBuffer(_ *pool.WriteBuffer) {
}

func (m *mockPooler) GetReaderContext() *pool.ReaderContext {
return pool.NewReaderContext(1024)
}

func (m *mockPooler) PutReaderContext(_ *pool.ReaderContext) {
}

type mockPGError struct {
M map[byte]string
}
Expand Down
14 changes: 8 additions & 6 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var noDeadline = time.Time{}
type Conn struct {
netConn net.Conn
rd *ReaderContext
pool *ConnPool

ProcessID int32
SecretKey int32
Expand All @@ -24,9 +25,10 @@ type Conn struct {
Inited bool
}

func NewConn(netConn net.Conn) *Conn {
func NewConn(netConn net.Conn, pool *ConnPool) *Conn {
cn := &Conn{
createdAt: time.Now(),
pool: pool,
}
cn.SetNetConn(netConn)
cn.SetUsedAt(time.Now())
Expand Down Expand Up @@ -57,7 +59,7 @@ func (cn *Conn) LockReader() {
if cn.rd != nil {
panic("not reached")
}
cn.rd = NewReaderContext()
cn.rd = NewReaderContext(cn.pool.opt.ReadBufferInitialSize)
cn.rd.Reset(cn.netConn)
}

Expand All @@ -79,8 +81,8 @@ func (cn *Conn) WithReader(

rd := cn.rd
if rd == nil {
rd = GetReaderContext()
defer PutReaderContext(rd)
rd = cn.pool.GetReaderContext()
defer cn.pool.PutReaderContext(rd)

rd.Reset(cn.netConn)
}
Expand All @@ -97,8 +99,8 @@ func (cn *Conn) WithReader(
func (cn *Conn) WithWriter(
ctx context.Context, timeout time.Duration, fn func(wb *WriteBuffer) error,
) error {
wb := GetWriteBuffer()
defer PutWriteBuffer(wb)
wb := cn.pool.GetWriteBuffer()
defer cn.pool.PutWriteBuffer(wb)

if err := fn(wb); err != nil {
return err
Expand Down
53 changes: 46 additions & 7 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Pooler interface {
Get(context.Context) (*Conn, error)
Put(context.Context, *Conn)
Remove(context.Context, *Conn, error)
GetWriteBuffer() *WriteBuffer
PutWriteBuffer(*WriteBuffer)
GetReaderContext() *ReaderContext
PutReaderContext(*ReaderContext)

Len() int
IdleLen() int
Expand All @@ -54,12 +58,14 @@ type Options struct {
Dialer func(context.Context) (net.Conn, error)
OnClose func(*Conn) error

PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
PoolSize int
MinIdleConns int
ReadBufferInitialSize int
WriteBufferInitialSize int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}

type ConnPool struct {
Expand All @@ -82,6 +88,9 @@ type ConnPool struct {

poolSize int
idleConnsLen int

wbPool sync.Pool
rbPool sync.Pool
}

var _ Pooler = (*ConnPool)(nil)
Expand All @@ -93,6 +102,16 @@ func NewConnPool(opt *Options) *ConnPool {
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
wbPool: sync.Pool{
New: func() interface{} {
return NewWriteBuffer(opt.WriteBufferInitialSize)
},
},
rbPool: sync.Pool{
New: func() interface{} {
return NewReaderContext(opt.ReadBufferInitialSize)
},
},
}

p.connsMu.Lock()
Expand Down Expand Up @@ -182,7 +201,7 @@ func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) {
return nil, err
}

cn := NewConn(netConn)
cn := NewConn(netConn, p)
cn.pooled = pooled
return cn, nil
}
Expand Down Expand Up @@ -504,3 +523,23 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {

return false
}

func (p *ConnPool) GetWriteBuffer() *WriteBuffer {
wb := p.wbPool.Get().(*WriteBuffer)
return wb
}

func (p *ConnPool) PutWriteBuffer(wb *WriteBuffer) {
wb.Reset()
p.wbPool.Put(wb)
}

func (p *ConnPool) GetReaderContext() *ReaderContext {
rd := p.rbPool.Get().(*ReaderContext)
return rd
}

func (p *ConnPool) PutReaderContext(rd *ReaderContext) {
rd.ColumnAlloc.Reset()
p.rbPool.Put(rd)
}
16 changes: 16 additions & 0 deletions internal/pool/pool_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,19 @@ func (p *SingleConnPool) IdleLen() int {
func (p *SingleConnPool) Stats() *Stats {
return &Stats{}
}

func (p *SingleConnPool) GetWriteBuffer() *WriteBuffer {
return p.pool.GetWriteBuffer()
}

func (p *SingleConnPool) PutWriteBuffer(wb *WriteBuffer) {
p.pool.PutWriteBuffer(wb)
}

func (p *SingleConnPool) GetReaderContext() *ReaderContext {
return p.pool.GetReaderContext()
}

func (p *SingleConnPool) PutReaderContext(rd *ReaderContext) {
p.pool.PutReaderContext(rd)
}
16 changes: 16 additions & 0 deletions internal/pool/pool_sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,19 @@ func (p *StickyConnPool) IdleLen() int {
func (p *StickyConnPool) Stats() *Stats {
return &Stats{}
}

func (p *StickyConnPool) GetWriteBuffer() *WriteBuffer {
return p.pool.GetWriteBuffer()
}

func (p *StickyConnPool) PutWriteBuffer(wb *WriteBuffer) {
p.pool.PutWriteBuffer(wb)
}

func (p *StickyConnPool) GetReaderContext() *ReaderContext {
return p.pool.GetReaderContext()
}

func (p *StickyConnPool) PutReaderContext(rd *ReaderContext) {
p.pool.PutReaderContext(rd)
}
23 changes: 1 addition & 22 deletions internal/pool/reader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package pool

import (
"sync"
)

type Reader interface {
Buffered() int

Expand Down Expand Up @@ -55,26 +51,9 @@ type ReaderContext struct {
ColumnAlloc *ColumnAlloc
}

func NewReaderContext() *ReaderContext {
const bufSize = 1 << 20 // 1mb
func NewReaderContext(bufSize int) *ReaderContext {
return &ReaderContext{
BufReader: NewBufReader(bufSize),
ColumnAlloc: NewColumnAlloc(),
}
}

var readerPool = sync.Pool{
New: func() interface{} {
return NewReaderContext()
},
}

func GetReaderContext() *ReaderContext {
rd := readerPool.Get().(*ReaderContext)
return rd
}

func PutReaderContext(rd *ReaderContext) {
rd.ColumnAlloc.Reset()
readerPool.Put(rd)
}
23 changes: 2 additions & 21 deletions internal/pool/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,18 @@ package pool
import (
"encoding/binary"
"io"
"sync"
)

const defaultBufSize = 65 << 10 // 65kb

var wbPool = sync.Pool{
New: func() interface{} {
return NewWriteBuffer()
},
}

func GetWriteBuffer() *WriteBuffer {
wb := wbPool.Get().(*WriteBuffer)
return wb
}

func PutWriteBuffer(wb *WriteBuffer) {
wb.Reset()
wbPool.Put(wb)
}

type WriteBuffer struct {
Bytes []byte

msgStart int
paramStart int
}

func NewWriteBuffer() *WriteBuffer {
func NewWriteBuffer(bufSize int) *WriteBuffer {
return &WriteBuffer{
Bytes: make([]byte, 0, defaultBufSize),
Bytes: make([]byte, 0, bufSize),
}
}

Expand Down
30 changes: 24 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ type Options struct {
// but idle connections are still discarded by the client
// if IdleTimeout is set.
IdleCheckFrequency time.Duration
// Connections read buffers stored in a sync.Pool to reduce allocations.
// Using this option you can adjust the initial size of the buffer.
// Default is 1 Mb.
ReadBufferInitialSize int
// Connections write buffers stored in a sync.Pool to reduce allocations.
// Using this option you can adjust the initial size of the buffer.
// Default is 64 Kb.
WriteBufferInitialSize int
}

func (opt *Options) init() {
Expand Down Expand Up @@ -164,6 +172,14 @@ func (opt *Options) init() {
case 0:
opt.MaxRetryBackoff = 4 * time.Second
}

if opt.ReadBufferInitialSize == 0 {
opt.ReadBufferInitialSize = 1048576 // 1Mb
}

if opt.WriteBufferInitialSize == 0 {
opt.WriteBufferInitialSize = 65536 // 64Kb
}
}

func env(key, defValue string) string {
Expand Down Expand Up @@ -318,11 +334,13 @@ func newConnPool(opt *Options) *pool.ConnPool {
Dialer: opt.getDialer(),
OnClose: terminateConn,

PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
ReadBufferInitialSize: opt.ReadBufferInitialSize,
WriteBufferInitialSize: opt.WriteBufferInitialSize,
})
}
Loading

0 comments on commit e55fd63

Please sign in to comment.