Skip to content

Commit

Permalink
VStreamer: add throttled logs when row/result/vstreamers get throttle…
Browse files Browse the repository at this point in the history
…d. (#14936)

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Mar 15, 2024
1 parent dc72e61 commit 2ee5946
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 9 deletions.
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type Engine struct {
throttlerClient *throttle.Client
}

const throttledLoggerInterval = 5 * time.Minute

// NewEngine creates a new Engine.
// Initialization sequence is: NewEngine->InitDBConfig->Open.
// Open and Close can be called multiple times and are idempotent.
Expand Down Expand Up @@ -149,6 +151,10 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrot
return vse
}

func (vse *Engine) GetTabletInfo() string {
return fmt.Sprintf("%s/%s/%s", vse.cell, vse.keyspace, vse.shard)
}

// InitDBConfig initializes the target parameters for the Engine.
func (vse *Engine) InitDBConfig(keyspace, shard string) {
vse.keyspace = keyspace
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

// resultStreamer streams the results of the requested query
Expand Down Expand Up @@ -97,6 +99,8 @@ func (rs *resultStreamer) Stream() error {

response := &binlogdatapb.VStreamResultsResponse{}
byteCount := 0
loggerName := fmt.Sprintf("%s (%v)", rs.vse.GetTabletInfo(), rs.tableName)
logger := logutil.NewThrottledLogger(loggerName, throttledLoggerInterval)
for {
select {
case <-rs.ctx.Done():
Expand All @@ -106,6 +110,7 @@ func (rs *resultStreamer) Stream() error {

// check throttler.
if !rs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(rs.ctx, throttlerapp.ResultStreamerName) {
logger.Infof("throttled.")
continue
}

Expand Down
10 changes: 7 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ import (
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var (
Expand Down Expand Up @@ -394,6 +396,7 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse)
filtered := make([]sqltypes.Value, len(rs.plan.ColExprs))
lastpk := make([]sqltypes.Value, len(rs.pkColumns))
byteCount := 0
logger := logutil.NewThrottledLogger(rs.vse.GetTabletInfo(), throttledLoggerInterval)
for {
if rs.ctx.Err() != nil {
log.Infof("Stream ended because of ctx.Done")
Expand All @@ -405,6 +408,7 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse)
throttleResponseRateLimiter.Do(func() error {
return safeSend(&binlogdatapb.VStreamRowsResponse{Throttled: true})
})
logger.Infof("throttled.")
continue
}

Expand Down
14 changes: 9 additions & 5 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@ import (

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql"
mysqlbinlog "vitess.io/vitess/go/mysql/binlog"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/binlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
vtschema "vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

mysqlbinlog "vitess.io/vitess/go/mysql/binlog"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
vtschema "vitess.io/vitess/go/vt/schema"
)

const (
Expand Down Expand Up @@ -299,6 +301,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
}

logger := logutil.NewThrottledLogger(vs.vse.GetTabletInfo(), throttledLoggerInterval)
throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime)
defer throttledHeartbeatsRateLimiter.Stop()
Expand All @@ -316,6 +319,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
return injectHeartbeat(true)
})
// we won't process events, until we're no longer throttling
logger.Infof("throttled.")
continue
}
select {
Expand Down

0 comments on commit 2ee5946

Please sign in to comment.