Skip to content

Commit

Permalink
Source max ingest rate (#519)
Browse files Browse the repository at this point in the history
Introduce ingest rate limiter on source and remove the one on load client
  • Loading branch information
purplefox authored Aug 16, 2022
1 parent 92b882a commit b77ddf4
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 138 deletions.
60 changes: 36 additions & 24 deletions command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package command

import (
"fmt"
log "github.com/sirupsen/logrus"
"github.com/squareup/pranadb/conf"
"github.com/squareup/pranadb/protos/squareup/cash/pranadb/v1/clustermsgs"
"github.com/squareup/pranadb/remoting"
Expand Down Expand Up @@ -109,28 +110,32 @@ func (e *Executor) ExecuteSQLStatement(execCtx *execctx.ExecutionContext, sql st
}
return exec.Empty, nil
case ast.Create != nil && ast.Create.MaterializedView != nil:
sequences, err := e.generateTableIDSequences(3)
if err != nil {
return nil, errors.WithStack(err)
}
command, err := NewOriginatingCreateMVCommand(e, execCtx.Planner(), execCtx.Schema, sql, sequences, ast.Create.MaterializedView)
if err != nil {
return nil, errors.WithStack(err)
}
if err := e.executeCommandWithRetry(command); err != nil {
if err := e.executeCommandWithRetry(func() (DDLCommand, error) {
sequences, err := e.generateTableIDSequences(3)
if err != nil {
return nil, errors.WithStack(err)
}
command, err := NewOriginatingCreateMVCommand(e, execCtx.Planner(), execCtx.Schema, sql, sequences, ast.Create.MaterializedView)
if err != nil {
return nil, errors.WithStack(err)
}
return command, nil
}); err != nil {
return nil, errors.WithStack(err)
}
return exec.Empty, nil
case ast.Create != nil && ast.Create.Index != nil:
sequences, err := e.generateTableIDSequences(1)
if err != nil {
return nil, errors.WithStack(err)
}
command, err := NewOriginatingCreateIndexCommand(e, execCtx.Planner(), execCtx.Schema, sql, sequences, ast.Create.Index)
if err != nil {
return nil, errors.WithStack(err)
}
if err := e.executeCommandWithRetry(command); err != nil {
if err := e.executeCommandWithRetry(func() (DDLCommand, error) {
sequences, err := e.generateTableIDSequences(1)
if err != nil {
return nil, errors.WithStack(err)
}
command, err := NewOriginatingCreateIndexCommand(e, execCtx.Planner(), execCtx.Schema, sql, sequences, ast.Create.Index)
if err != nil {
return nil, errors.WithStack(err)
}
return command, nil
}); err != nil {
return nil, errors.WithStack(err)
}
return exec.Empty, nil
Expand Down Expand Up @@ -184,8 +189,8 @@ func (e *Executor) ExecuteSQLStatement(execCtx *execctx.ExecutionContext, sql st
return nil, errors.WithStack(err)
}
return exec.Empty, nil
case ast.ConsumerRate != nil:
if err := e.execConsumerRate(execCtx, ast.ConsumerRate.SourceName, ast.ConsumerRate.Rate); err != nil {
case ast.SourceSetMaxRate != nil:
if err := e.execSetMaxSourceIngestRate(execCtx, ast.SourceSetMaxRate.SourceName, ast.SourceSetMaxRate.Rate); err != nil {
return nil, errors.WithStack(err)
}
return exec.Empty, nil
Expand All @@ -208,13 +213,20 @@ func (e *Executor) GetPullEngine() *pull.Engine {
return e.pullEngine
}

func (e *Executor) executeCommandWithRetry(command DDLCommand) error {
func (e *Executor) executeCommandWithRetry(commandFactory func() (DDLCommand, error)) error {
start := time.Now()
for {
err := e.ddlRunner.RunCommand(command)
command, err := commandFactory()
if err != nil {
return err
}
log.Debugf("executing command %s with potential retry", command.SQL())
err = e.ddlRunner.RunCommand(command)
if err != nil {
log.Errorf("failed to run command %s %v", command.SQL(), err)
var perr errors.PranaError
if errors.As(err, &perr) && perr.Code == errors.DdlRetry {
log.Debugf("It is a ddl retry - will retry it after a short delay %s", command.SQL())
// Some DDL commands like create MV or index can return DdlRetry if they fail because Raft
// leadership changed - in this case we retry rather than returning an error as this can be transient
// e.g. cluster is starting up or node is being rolled
Expand Down Expand Up @@ -362,8 +374,8 @@ func (e *Executor) execDescribe(execCtx *execctx.ExecutionContext, tableName str
return describeRows(tableInfo)
}

func (e *Executor) execConsumerRate(execCtx *execctx.ExecutionContext, sourceName string, rate int64) error {
return e.ddlClient.Broadcast(&clustermsgs.ConsumerSetRate{
func (e *Executor) execSetMaxSourceIngestRate(execCtx *execctx.ExecutionContext, sourceName string, rate int64) error {
return e.ddlClient.Broadcast(&clustermsgs.SourceSetMaxIngestRate{
SchemaName: execCtx.Schema.Name,
SourceName: sourceName,
Rate: rate,
Expand Down
18 changes: 9 additions & 9 deletions command/parser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,19 @@ type Show struct {
TableName string `("ON" @Ident)?`
}

type ConsumerRate struct {
type SourceSetMaxRate struct {
SourceName string `@Ident`
Rate int64 `@Number`
}

// AST root.
type AST struct {
Select string // Unaltered SELECT statement, if any.
Use string `( "USE" @Ident`
Drop *Drop ` | "DROP" @@ `
Create *Create ` | "CREATE" @@ `
Show *Show ` | "SHOW" @@ `
Describe string ` | "DESCRIBE" @Ident `
ConsumerRate *ConsumerRate ` | "CONSUMER" "RATE" @@ `
ResetDdl string ` | "RESET" "DDL" @Ident ) ';'?`
Select string // Unaltered SELECT statement, if any.
Use string `( "USE" @Ident`
Drop *Drop ` | "DROP" @@ `
Create *Create ` | "CREATE" @@ `
Show *Show ` | "SHOW" @@ `
Describe string ` | "DESCRIBE" @Ident `
SourceSetMaxRate *SourceSetMaxRate ` | "SOURCE" "SET" "MAX" "RATE" @@ `
ResetDdl string ` | "RESET" "DDL" @Ident ) ';'?`
}
3 changes: 0 additions & 3 deletions kafka/cflt_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,3 @@ func (cmp *ConfluentMessageProvider) Start() error {
cmp.consumer = consumer
return nil
}

func (cmp *ConfluentMessageProvider) SetMaxRate(rate int) {
}
1 change: 0 additions & 1 deletion kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type MessageProvider interface {
Start() error
Close() error
SetRebalanceCallback(callback RebalanceCallback)
SetMaxRate(rate int)
}

type Message struct {
Expand Down
38 changes: 0 additions & 38 deletions kafka/load/load_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/squareup/pranadb/errors"
"github.com/squareup/pranadb/kafka"
"github.com/squareup/pranadb/msggen"
"go.uber.org/ratelimit"
"math"
"math/rand"
"strings"
Expand All @@ -24,7 +23,6 @@ type LoadClientMessageProviderFactory struct {
partitionsStart int
nextPartition int
properties map[string]string
maxRate int
maxMessagesPerConsumer int64
uniqueIDsPerPartition int64
messageGeneratorName string
Expand All @@ -34,7 +32,6 @@ type LoadClientMessageProviderFactory struct {

const (
produceTimeout = 100 * time.Millisecond
maxRatePropName = "prana.loadclient.maxrateperconsumer"
partitionsPerConsumerPropName = "prana.loadclient.partitionsperconsumer"
uniqueIDsPerPartitionPropName = "prana.loadclient.uniqueidsperpartition"
maxMessagesPerConsumerPropName = "prana.loadclient.maxmessagesperconsumer"
Expand All @@ -51,10 +48,6 @@ func NewMessageProviderFactory(bufferSize int, numConsumersPerSource int, nodeID
}
partitionsPerNode := numConsumersPerSource * partitionsPerConsumer
partitionsStart := nodeID * partitionsPerNode
maxRate, err := common.GetOrDefaultIntProperty(maxRatePropName, properties, -1)
if err != nil {
return nil, err
}
uniqueIDsPerPartition, err := common.GetOrDefaultIntProperty(uniqueIDsPerPartitionPropName, properties, math.MaxInt64)
if err != nil {
return nil, err
Expand All @@ -74,7 +67,6 @@ func NewMessageProviderFactory(bufferSize int, numConsumersPerSource int, nodeID
partitionsStart: partitionsStart,
nextPartition: partitionsStart,
properties: properties,
maxRate: maxRate,
uniqueIDsPerPartition: int64(uniqueIDsPerPartition),
maxMessagesPerConsumer: int64(maxMessagesPerConsumer),
messageGeneratorName: msgGeneratorName,
Expand Down Expand Up @@ -104,10 +96,6 @@ func (l *LoadClientMessageProviderFactory) NewMessageProvider() (kafka.MessagePr
for i, partitionID := range partitions {
offsets[i] = l.committedOffsets[partitionID] + 1
}
var rl ratelimit.Limiter
if l.maxRate > 0 {
rl = ratelimit.New(l.maxRate)
}
rnd := rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
msgGen, err := l.getMessageGenerator(l.messageGeneratorName)
if err != nil {
Expand All @@ -119,7 +107,6 @@ func (l *LoadClientMessageProviderFactory) NewMessageProvider() (kafka.MessagePr
partitions: partitions,
numPartitions: len(partitions),
offsets: offsets,
rateLimiter: rl,
uniqueIDsPerPartition: l.uniqueIDsPerPartition,
maxMessages: l.maxMessagesPerConsumer,
rnd: rnd,
Expand Down Expand Up @@ -153,12 +140,10 @@ type LoadClientMessageProvider struct {
partitions []int32
offsets []int64
sequence int64
rateLimiter ratelimit.Limiter
uniqueIDsPerPartition int64
maxMessages int64
msgGenerator msggen.MessageGenerator
rnd *rand.Rand
limiterLock sync.Mutex
msgLock sync.Mutex
committedOffsets map[int32]int64
}
Expand Down Expand Up @@ -204,35 +189,12 @@ func (l *LoadClientMessageProvider) Close() error {
func (l *LoadClientMessageProvider) SetRebalanceCallback(callback kafka.RebalanceCallback) {
}

func (l *LoadClientMessageProvider) SetMaxRate(rate int) {
l.limiterLock.Lock()
defer l.limiterLock.Unlock()
if rate == 0 {
return
} else if rate == -1 {
l.rateLimiter = nil
} else {
l.rateLimiter = ratelimit.New(rate)
}
}

func (l *LoadClientMessageProvider) getLimiter() ratelimit.Limiter {
// This lock should almost always be uncontended so perf should be ok
l.limiterLock.Lock()
defer l.limiterLock.Unlock()
return l.rateLimiter
}

func (l *LoadClientMessageProvider) genLoop() {
var msgCount int64
var msg *kafka.Message
for l.running.Get() && msgCount < l.maxMessages {
if msg == nil {
var err error
limiter := l.getLimiter()
if limiter != nil {
limiter.Take()
}
msg, err = l.genMessage()
if err != nil {
log.Errorf("failed to generate message %+v", err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@


6squareup/cash/pranadb/clustermsgs/v1/clustermsgs.proto$squareup.cash.pranadb.clustermsgs.v1"�
DDLStatementInfo.
Expand Down Expand Up @@ -32,8 +32,8 @@ schemaName"
shard_id (RshardId!
request_body ( R requestBody":
ClusterReadResponse#
response_body ( R responseBody"g
ConsumerSetRate
response_body ( R responseBody"n
SourceSetMaxIngestRate
schema_name ( R
schemaName
source_name ( R
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ message ClusterReadResponse {
bytes response_body = 1;
}

message ConsumerSetRate {
message SourceSetMaxIngestRate {
string schema_name = 1;
string source_name = 2;
int64 rate = 3;
Expand Down
Loading

0 comments on commit b77ddf4

Please sign in to comment.