Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add option to enable/disable txn id cache #76523

Merged
merged 2 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o
server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use (set to 0 to disable)
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<tr><td><code>server.web_session.purge.period</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the time until old sessions are deleted</td></tr>
<tr><td><code>server.web_session.purge.ttl</code></td><td>duration</td><td><code>1h0m0s</code></td><td>if nonzero, entries in system.web_sessions older than this duration are periodically purged</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use (set to 0 to disable)</td></tr>
<tr><td><code>sql.cross_db_fks.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating foreign key references across databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_owners.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating sequences owned by tables from other databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_references.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, sequences referenced by tables from other databases are allowed</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/contention/txnidcache/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ import "github.com/cockroachdb/cockroach/pkg/settings"
var MaxSize = settings.RegisterByteSizeSetting(
settings.TenantWritable,
`sql.contention.txn_id_cache.max_size`,
"the maximum byte size TxnID cache will use",
"the maximum byte size TxnID cache will use (set to 0 to disable)",
64*1024*1024, // 64 MB
).WithPublic()
15 changes: 9 additions & 6 deletions pkg/sql/contention/txnidcache/concurrent_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ const blockSize = 168

type block [blockSize]ResolvedTxnID

var blockPool = &sync.Pool{
New: func() interface{} {
return &block{}
},
}

// concurrentWriteBuffer is a data structure that optimizes for concurrent
// writes and also implements the Writer interface.
type concurrentWriteBuffer struct {
Expand All @@ -33,8 +39,6 @@ type concurrentWriteBuffer struct {
block *block
}

blockPool *sync.Pool

// sink is the flush target that ConcurrentWriteBuffer flushes to once
// block is full.
sink blockSink
Expand All @@ -43,10 +47,9 @@ type concurrentWriteBuffer struct {
var _ Writer = &concurrentWriteBuffer{}

// newConcurrentWriteBuffer returns a new instance of concurrentWriteBuffer.
func newConcurrentWriteBuffer(sink blockSink, blockPool *sync.Pool) *concurrentWriteBuffer {
func newConcurrentWriteBuffer(sink blockSink) *concurrentWriteBuffer {
writeBuffer := &concurrentWriteBuffer{
sink: sink,
blockPool: blockPool,
sink: sink,
}

writeBuffer.guard.block = blockPool.Get().(*block)
Expand All @@ -58,7 +61,7 @@ func newConcurrentWriteBuffer(sink blockSink, blockPool *sync.Pool) *concurrentW
writeBuffer.sink.push(writeBuffer.guard.block)

// Resets the block.
writeBuffer.guard.block = writeBuffer.blockPool.Get().(*block)
writeBuffer.guard.block = blockPool.Get().(*block)
} /* onBufferFull */)

return writeBuffer
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/contention/txnidcache/fifo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ var nodePool = &sync.Pool{
}

type fifoCache struct {
blockPool *sync.Pool
capacity contentionutils.CapacityLimiter
capacity contentionutils.CapacityLimiter

mu struct {
syncutil.RWMutex
Expand All @@ -54,10 +53,9 @@ type blockList struct {
tailIdx int
}

func newFIFOCache(pool *sync.Pool, capacity contentionutils.CapacityLimiter) *fifoCache {
func newFIFOCache(capacity contentionutils.CapacityLimiter) *fifoCache {
c := &fifoCache{
blockPool: pool,
capacity: capacity,
capacity: capacity,
}

c.mu.data = make(map[uuid.UUID]roachpb.TransactionFingerprintID)
Expand Down Expand Up @@ -85,7 +83,7 @@ func (c *fifoCache) add(b *block) {

// Zeros out the block and put it back into the blockPool.
*b = block{}
c.blockPool.Put(b)
blockPool.Put(b)

c.maybeEvictLocked()
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/sql/contention/txnidcache/fifo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package txnidcache
import (
"fmt"
"math/rand"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -22,12 +21,7 @@ import (
)

func TestFIFOCache(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
return &block{}
},
}
cache := newFIFOCache(pool, func() int64 { return 2 * blockSize } /* capacity */)
cache := newFIFOCache(func() int64 { return 2 * blockSize } /* capacity */)

// Fill the first eviction block in cache to 1/4 capacity.
input1, expected1 := generateInputBlock(blockSize * 1 / 4 /* size */)
Expand Down
14 changes: 3 additions & 11 deletions pkg/sql/contention/txnidcache/txn_id_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package txnidcache

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -105,9 +104,7 @@ type Cache struct {
blockCh chan *block
closeCh chan struct{}

store *fifoCache
blockPool *sync.Pool

store *fifoCache
writer Writer

metrics *Metrics
Expand Down Expand Up @@ -142,18 +139,13 @@ func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache {
metrics: metrics,
blockCh: make(chan *block, channelSize),
closeCh: make(chan struct{}),
blockPool: &sync.Pool{
New: func() interface{} {
return &block{}
},
},
}

t.store = newFIFOCache(t.blockPool, func() int64 {
t.store = newFIFOCache(func() int64 {
return MaxSize.Get(&st.SV) / entrySize
} /* capacity */)

t.writer = newWriter(t, t.blockPool)
t.writer = newWriter(st, t)
return t
}

Expand Down
19 changes: 11 additions & 8 deletions pkg/sql/contention/txnidcache/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
package txnidcache

import (
"sync"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
Expand All @@ -22,29 +21,33 @@ import (
const shardCount = 16

type writer struct {
st *cluster.Settings

shards [shardCount]*concurrentWriteBuffer

sink blockSink
blockPool *sync.Pool
sink blockSink
}

var _ Writer = &writer{}

func newWriter(sink blockSink, blockPool *sync.Pool) *writer {
func newWriter(st *cluster.Settings, sink blockSink) *writer {
w := &writer{
sink: sink,
blockPool: blockPool,
st: st,
sink: sink,
}

for shardIdx := 0; shardIdx < shardCount; shardIdx++ {
w.shards[shardIdx] = newConcurrentWriteBuffer(sink, blockPool)
w.shards[shardIdx] = newConcurrentWriteBuffer(sink)
}

return w
}

// Record implements the Writer interface.
func (w *writer) Record(resolvedTxnID ResolvedTxnID) {
if MaxSize.Get(&w.st.SV) == 0 {
return
}
shardIdx := hashTxnID(resolvedTxnID.TxnID)
buffer := w.shards[shardIdx]
buffer.Record(resolvedTxnID)
Expand Down
84 changes: 62 additions & 22 deletions pkg/sql/contention/txnidcache/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,27 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

type blackHoleSink struct {
pool *sync.Pool

// Simulate a real sink.
ch chan *block
}

var _ blockSink = &blackHoleSink{}

func newBlackHoleSink(pool *sync.Pool, chanSize int) *blackHoleSink {
func newBlackHoleSink(chanSize int) *blackHoleSink {
return &blackHoleSink{
pool: pool,
ch: make(chan *block, chanSize),
ch: make(chan *block, chanSize),
}
}

func (b *blackHoleSink) start() {
go func() {
for incomingBlock := range b.ch {
*incomingBlock = block{}
b.pool.Put(incomingBlock)
blockPool.Put(incomingBlock)
}
}()
}
Expand Down Expand Up @@ -74,11 +72,12 @@ func BenchmarkWriter(b *testing.B) {
defer log.Scope(b).Close(b)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()

run := func(b *testing.B, sink blockSink, blockPool *sync.Pool, numOfConcurrentWriter int) {
run := func(b *testing.B, sink blockSink, numOfConcurrentWriter int) {
starter := make(chan struct{})

w := newWriter(sink, blockPool)
w := newWriter(st, sink)

b.ResetTimer()
b.SetBytes(blockSize * entrySize)
Expand Down Expand Up @@ -109,36 +108,30 @@ func BenchmarkWriter(b *testing.B) {

type testSinkType struct {
name string
new func() (_ blockSink, _ *sync.Pool, cleanup func())
new func() (_ blockSink, cleanup func())
}

sinkTypes := []testSinkType{
{
name: "blackHole",
new: func() (_ blockSink, _ *sync.Pool, cleanup func()) {
blockPool := &sync.Pool{
New: func() interface{} {
return &block{}
},
}

blackHole := newBlackHoleSink(blockPool, channelSize)
new: func() (_ blockSink, cleanup func()) {
blackHole := newBlackHoleSink(channelSize)
blackHole.start()

return blackHole, blockPool, blackHole.stop
return blackHole, blackHole.stop
},
},
{
name: "real",
new: func() (_ blockSink, _ *sync.Pool, cleanup func()) {
new: func() (_ blockSink, cleanup func()) {
st := cluster.MakeTestingClusterSettings()
metrics := NewMetrics()
realSink := NewTxnIDCache(st, &metrics)

stopper := stop.NewStopper()
realSink.Start(ctx, stopper)

return realSink, realSink.blockPool, func() {
return realSink, func() {
stopper.Stop(ctx)
}
},
Expand All @@ -149,12 +142,59 @@ func BenchmarkWriter(b *testing.B) {
b.Run(fmt.Sprintf("sinkType=%s", sinkType.name), func(b *testing.B) {
for _, numOfConcurrentWriter := range []int{1, 24, 48, 64, 92, 128} {
b.Run(fmt.Sprintf("concurrentWriter=%d", numOfConcurrentWriter), func(b *testing.B) {
sink, blockPool, cleanup := sinkType.new()
sink, cleanup := sinkType.new()
defer cleanup()

run(b, sink, blockPool, numOfConcurrentWriter)
run(b, sink, numOfConcurrentWriter)
})
}
})
}
}

type counterSink struct {
numOfRecord int
}

var _ blockSink = &counterSink{}

func (c *counterSink) push(block *block) {
for i := 0; i < blockSize; i++ {
if !block[i].valid() {
break
}
c.numOfRecord++
}
}

func TestTxnIDCacheCanBeDisabledViaClusterSetting(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
ctx := context.Background()

sink := &counterSink{}
w := newWriter(st, sink)
w.Record(ResolvedTxnID{
TxnID: uuid.FastMakeV4(),
})

w.Flush()
require.Equal(t, 1, sink.numOfRecord)

// This should disable txn id cache.
MaxSize.Override(ctx, &st.SV, 0)

w.Record(ResolvedTxnID{
TxnID: uuid.FastMakeV4(),
})
w.Flush()
require.Equal(t, 1, sink.numOfRecord)

// This should re-enable txn id cache.
MaxSize.Override(ctx, &st.SV, MaxSize.Default())

w.Record(ResolvedTxnID{
TxnID: uuid.FastMakeV4(),
})
w.Flush()
require.Equal(t, 2, sink.numOfRecord)
}