Skip to content

Commit

Permalink
Cleanup panics in txthrottler, reorder for readability (vitessio#12901
Browse files Browse the repository at this point in the history
)

* Cleanup tx_throttler.go

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Cleanup tx_throttler.go #2

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Fix throttlerFactoryFunc

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Undo if-cond consolidation

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Undo struct shuffling

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* prove that disabled config returns nil error

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Improve test

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

---------

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Apr 16, 2024
1 parent 9f9b0ba commit 02af9bc
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 69 deletions.
131 changes: 62 additions & 69 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ limitations under the License.
package txthrottler

import (
"context"
"fmt"
"strings"
"sync"
"time"

"google.golang.org/protobuf/proto"

"google.golang.org/protobuf/encoding/prototext"

"context"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
Expand All @@ -40,6 +38,61 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// These vars store the functions used to create the topo server, healthcheck,
// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
topologyWatcherFactory topologyWatcherFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func init() {
resetTxThrottlerFactories()
}

func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Throttle(threadID int) time.Duration
ThreadFinished(threadID int)
Close()
MaxRate() int64
SetMaxRate(rate int64)
RecordReplicationLag(time time.Time, th *discovery.TabletHealth)
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
}

// TopologyWatcherInterface defines the public interface that is implemented by
// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out
// go/vt/discovery.LegacyTopologyWatcher.
type TopologyWatcherInterface interface {
Start()
Stop()
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// TxThrottler throttles transactions based on replication lag.
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler.
Expand Down Expand Up @@ -85,10 +138,6 @@ type TxThrottler struct {
requestsThrottled *stats.Counter
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// NewTxThrottler tries to construct a TxThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
Expand All @@ -99,10 +148,8 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
if err != nil {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
txThrottler, err = newTxThrottler(env, &txThrottlerConfig{enabled: false})
if err != nil {
panic("BUG: Can't create a disabled transaction throttler")
}
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, &txThrottlerConfig{enabled: false})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
Expand Down Expand Up @@ -152,28 +199,6 @@ type txThrottlerConfig struct {
healthCheckCells []string
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Throttle(threadID int) time.Duration
ThreadFinished(threadID int)
Close()
MaxRate() int64
SetMaxRate(rate int64)
RecordReplicationLag(time time.Time, th *discovery.TabletHealth)
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
}

// TopologyWatcherInterface defines the public interface that is implemented by
// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out
// go/vt/discovery.LegacyTopologyWatcher.
type TopologyWatcherInterface interface {
Start()
Stop()
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
Expand All @@ -186,35 +211,6 @@ type txThrottlerState struct {
topologyWatchers []TopologyWatcherInterface
}

// These vars store the functions used to create the topo server, healthcheck,
// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
topologyWatcherFactory topologyWatcherFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func init() {
resetTxThrottlerFactories()
}

func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
}

func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, error) {
if config.enabled {
// Verify config.
Expand All @@ -235,7 +231,7 @@ func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler,
}

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
func (t *TxThrottler) Open() error {
func (t *TxThrottler) Open() (err error) {
if !t.config.enabled {
return nil
}
Expand All @@ -244,7 +240,6 @@ func (t *TxThrottler) Open() error {
}
log.Info("TxThrottler: opening")
t.throttlerRunning.Set(1)
var err error
t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell)
return err
}
Expand Down Expand Up @@ -273,9 +268,6 @@ func (t *TxThrottler) Throttle() (result bool) {
if !t.config.enabled {
return false
}
if t.state == nil {
panic("BUG: Throttle() called on a closed TxThrottler")
}
result = t.state.throttle()
t.requestsTotal.Add(1)
if result {
Expand Down Expand Up @@ -342,7 +334,8 @@ func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottler

func (ts *txThrottlerState) throttle() bool {
if ts.throttler == nil {
panic("BUG: throttle called after deallocateResources was called.")
log.Error("throttle called after deallocateResources was called")
return false
}
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -135,3 +136,34 @@ func TestEnabledThrottler(t *testing.T) {
throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}

func TestNewTxThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())

{
// disabled config
throttler, err := newTxThrottler(env, &txThrottlerConfig{enabled: false})
assert.Nil(t, err)
assert.NotNil(t, throttler)
}
{
// enabled with invalid throttler config
throttler, err := newTxThrottler(env, &txThrottlerConfig{
enabled: true,
throttlerConfig: &throttlerdatapb.Configuration{},
})
assert.NotNil(t, err)
assert.Nil(t, throttler)
}
{
// enabled
throttler, err := newTxThrottler(env, &txThrottlerConfig{
enabled: true,
healthCheckCells: []string{"cell1"},
throttlerConfig: throttler.DefaultMaxReplicationLagModuleConfig().Configuration,
})
assert.Nil(t, err)
assert.NotNil(t, throttler)
}
}

0 comments on commit 02af9bc

Please sign in to comment.