Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup panics in txthrottler, reorder for readability #12901

Merged
merged 8 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 62 additions & 69 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ limitations under the License.
package txthrottler

import (
"context"
"fmt"
"strings"
"sync"
"time"

"google.golang.org/protobuf/proto"

"google.golang.org/protobuf/encoding/prototext"

"context"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
Expand All @@ -40,6 +38,61 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// These vars store the functions used to create the topo server, healthcheck,
// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
topologyWatcherFactory topologyWatcherFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func init() {
resetTxThrottlerFactories()
}

func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Throttle(threadID int) time.Duration
ThreadFinished(threadID int)
Close()
MaxRate() int64
SetMaxRate(rate int64)
RecordReplicationLag(time time.Time, th *discovery.TabletHealth)
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
}

// TopologyWatcherInterface defines the public interface that is implemented by
// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out
// go/vt/discovery.LegacyTopologyWatcher.
type TopologyWatcherInterface interface {
Start()
Stop()
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
// TxThrottler throttles transactions based on replication lag.
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler.
Expand Down Expand Up @@ -85,10 +138,6 @@ type TxThrottler struct {
requestsThrottled *stats.Counter
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// NewTxThrottler tries to construct a TxThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
Expand All @@ -99,10 +148,8 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
if err != nil {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
txThrottler, err = newTxThrottler(env, &txThrottlerConfig{enabled: false})
if err != nil {
panic("BUG: Can't create a disabled transaction throttler")
}
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, &txThrottlerConfig{enabled: false})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
Expand Down Expand Up @@ -152,28 +199,6 @@ type txThrottlerConfig struct {
healthCheckCells []string
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Throttle(threadID int) time.Duration
ThreadFinished(threadID int)
Close()
MaxRate() int64
SetMaxRate(rate int64)
RecordReplicationLag(time time.Time, th *discovery.TabletHealth)
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
}

// TopologyWatcherInterface defines the public interface that is implemented by
// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out
// go/vt/discovery.LegacyTopologyWatcher.
type TopologyWatcherInterface interface {
Start()
Stop()
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
Expand All @@ -186,35 +211,6 @@ type txThrottlerState struct {
topologyWatchers []TopologyWatcherInterface
}

// These vars store the functions used to create the topo server, healthcheck,
// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
topologyWatcherFactory topologyWatcherFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func init() {
resetTxThrottlerFactories()
}

func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
}

func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, error) {
if config.enabled {
// Verify config.
Expand All @@ -235,7 +231,7 @@ func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler,
}

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
func (t *TxThrottler) Open() error {
func (t *TxThrottler) Open() (err error) {
if !t.config.enabled {
return nil
}
Expand All @@ -244,7 +240,6 @@ func (t *TxThrottler) Open() error {
}
log.Info("TxThrottler: opening")
t.throttlerRunning.Set(1)
var err error
t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell)
return err
}
Expand Down Expand Up @@ -273,9 +268,6 @@ func (t *TxThrottler) Throttle() (result bool) {
if !t.config.enabled {
return false
}
if t.state == nil {
panic("BUG: Throttle() called on a closed TxThrottler")
}
result = t.state.throttle()
t.requestsTotal.Add(1)
if result {
Expand Down Expand Up @@ -342,7 +334,8 @@ func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottler

func (ts *txThrottlerState) throttle() bool {
if ts.throttler == nil {
panic("BUG: throttle called after deallocateResources was called.")
log.Error("throttle called after deallocateResources was called")
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
return false
}
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -135,3 +136,34 @@ func TestEnabledThrottler(t *testing.T) {
throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}

func TestNewTxThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())

{
// disabled config
throttler, err := newTxThrottler(env, &txThrottlerConfig{enabled: false})
assert.Nil(t, err)
assert.NotNil(t, throttler)
}
{
// enabled with invalid throttler config
throttler, err := newTxThrottler(env, &txThrottlerConfig{
enabled: true,
throttlerConfig: &throttlerdatapb.Configuration{},
})
assert.NotNil(t, err)
assert.Nil(t, throttler)
}
{
// enabled
throttler, err := newTxThrottler(env, &txThrottlerConfig{
enabled: true,
healthCheckCells: []string{"cell1"},
throttlerConfig: throttler.DefaultMaxReplicationLagModuleConfig().Configuration,
})
assert.Nil(t, err)
assert.NotNil(t, throttler)
}
}