diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 398f7b4e27e..7a0d1aaaf3d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -105,6 +105,8 @@ type Engine struct { throttlerClient *throttle.Client } +const throttledLoggerInterval = 5 * time.Minute + // NewEngine creates a new Engine. // Initialization sequence is: NewEngine->InitDBConfig->Open. // Open and Close can be called multiple times and are idempotent. @@ -149,6 +151,10 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrot return vse } +func (vse *Engine) GetTabletInfo() string { + return fmt.Sprintf("%s/%s/%s", vse.cell, vse.keyspace, vse.shard) +} + // InitDBConfig initializes the target parameters for the Engine. func (vse *Engine) InitDBConfig(keyspace, shard string) { vse.keyspace = keyspace diff --git a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go index 834b8b88378..4632bea672b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go @@ -23,9 +23,11 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) // resultStreamer streams the results of the requested query @@ -97,6 +99,8 @@ func (rs *resultStreamer) Stream() error { response := &binlogdatapb.VStreamResultsResponse{} byteCount := 0 + loggerName := fmt.Sprintf("%s (%v)", rs.vse.GetTabletInfo(), rs.tableName) + logger := logutil.NewThrottledLogger(loggerName, throttledLoggerInterval) for { select { case <-rs.ctx.Done(): @@ -106,6 +110,7 @@ func (rs *resultStreamer) Stream() error { // check throttler. if !rs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(rs.ctx, throttlerapp.ResultStreamerName) { + logger.Infof("throttled.") continue } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 44f25067dfe..ac2b2285e27 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -31,14 +31,16 @@ import ( "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var ( @@ -394,6 +396,7 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse) filtered := make([]sqltypes.Value, len(rs.plan.ColExprs)) lastpk := make([]sqltypes.Value, len(rs.pkColumns)) byteCount := 0 + logger := logutil.NewThrottledLogger(rs.vse.GetTabletInfo(), throttledLoggerInterval) for { if rs.ctx.Err() != nil { log.Infof("Stream ended because of ctx.Done") @@ -405,6 +408,7 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse) throttleResponseRateLimiter.Do(func() error { return safeSend(&binlogdatapb.VStreamRowsResponse{Throttled: true}) }) + logger.Infof("throttled.") continue } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index a1a69332a5e..3aede20f650 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/mysql" - mysqlbinlog "vitess.io/vitess/go/mysql/binlog" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" @@ -35,15 +34,18 @@ import ( "vitess.io/vitess/go/vt/binlog" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - vtschema "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + mysqlbinlog "vitess.io/vitess/go/mysql/binlog" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + vtschema "vitess.io/vitess/go/vt/schema" ) const ( @@ -299,6 +301,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } } + logger := logutil.NewThrottledLogger(vs.vse.GetTabletInfo(), throttledLoggerInterval) throttleEvents := func(throttledEvents chan mysql.BinlogEvent) { throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime) defer throttledHeartbeatsRateLimiter.Stop() @@ -316,6 +319,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog return injectHeartbeat(true) }) // we won't process events, until we're no longer throttling + logger.Infof("throttled.") continue } select {