-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nsqd: channel sampling #223
Changes from 3 commits
ba160ed
6a18428
30e9eca
d79afe8
6f4dfa9
271e077
a75d82e
9d0066d
9fe4b6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ import ( | |
"github.com/bitly/nsq/util/pqueue" | ||
"log" | ||
"math" | ||
"math/rand" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
|
@@ -45,6 +47,7 @@ type Channel struct { | |
options *nsqdOptions | ||
|
||
backend BackendQueue | ||
params channelParams | ||
|
||
incomingMsgChan chan *nsq.Message | ||
memoryMsgChan chan *nsq.Message | ||
|
@@ -54,11 +57,10 @@ type Channel struct { | |
exitFlag int32 | ||
|
||
// state tracking | ||
clients []Consumer | ||
paused int32 | ||
ephemeralChannel bool | ||
deleteCallback func(*Channel) | ||
deleter sync.Once | ||
clients []Consumer | ||
paused int32 | ||
deleteCallback func(*Channel) | ||
deleter sync.Once | ||
|
||
// TODO: these can be DRYd up | ||
deferredMessages map[nsq.MessageID]*pqueue.Item | ||
|
@@ -81,6 +83,12 @@ type inFlightMessage struct { | |
ts time.Time | ||
} | ||
|
||
// Parameter struct | ||
type channelParams struct { | ||
ephemeralChannel bool | ||
sampleRate int32 | ||
} | ||
|
||
// NewChannel creates a new instance of the Channel type and returns a pointer | ||
func NewChannel(topicName string, channelName string, options *nsqdOptions, | ||
notifier Notifier, deleteCallback func(*Channel)) *Channel { | ||
|
@@ -101,8 +109,36 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions, | |
|
||
c.initPQ() | ||
|
||
if strings.HasSuffix(channelName, "#ephemeral") { | ||
c.ephemeralChannel = true | ||
// Split the channel name into parameters | ||
cParams := strings.SplitN(channelName, "#", 2) | ||
if len(cParams) > 1 { | ||
channelParamsString := strings.Split(cParams[1], ";") | ||
for _, param := range channelParamsString { | ||
var paramField = "" | ||
var paramValue = "" | ||
if strings.Contains(param, "=") { | ||
p := strings.Split(param, "=") | ||
paramField = p[0] | ||
paramValue = p[1] | ||
} else { | ||
paramField = param | ||
paramValue = "true" | ||
} | ||
switch strings.ToLower(paramField) { | ||
case "ephemeral": | ||
if paramValue == "true" { | ||
c.params.ephemeralChannel = true | ||
} else { | ||
c.params.ephemeralChannel = false | ||
} | ||
case "samplerate": | ||
c.params.sampleRate = c.validateSampleRate(paramValue) | ||
} | ||
} | ||
} | ||
|
||
// Create the channels | ||
if c.params.ephemeralChannel == true { | ||
c.backend = NewDummyBackendQueue() | ||
} else { | ||
c.backend = NewDiskQueue(backendName, options.dataPath, options.maxBytesPerFile, | ||
|
@@ -120,6 +156,30 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions, | |
return c | ||
} | ||
|
||
// Need to validate the sampleRate passed in on channel instantiation | ||
func (c *Channel) validateSampleRate(dirtySampleRate string) int32 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. two things
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is making the assumption that everyone will want to sample at a resolution of 1%. What about the possibility of someone wanting to sample at 33.33%? This makes send when people are doing billions of messages, as that extra decimal can make the difference of hundreds of thousands of messages. I would lean toward only allowing floats and not allowing the 0-100. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As it was implemented before it was the same resolution (it converted to int on the way out). I personally don't care that much... a percentage is a percentage is a percentage... losing fractions of a percent granularity is meh IMO. |
||
sampleRateFloat, err := strconv.ParseFloat(dirtySampleRate, 64) | ||
// If we get an error when trying to ParseFloat, we don't want the channel | ||
// creation to fail, so we create a normal, unsampled channel | ||
if err != nil { | ||
log.Printf("Float conversion error on %s: Setting sample rate to 0", dirtySampleRate) | ||
return int32(0) | ||
} | ||
|
||
// If it's between 0.0 and 1.0, consider it a percentage and multiply by 100 | ||
if (float64(sampleRateFloat) > float64(0)) && (float64(sampleRateFloat) <= float64(1)) { | ||
return int32(sampleRateFloat * float64(100)) | ||
// If 1<rate<100, consider it a number and use that (100 means no sampling) | ||
} else if (float64(sampleRateFloat) > float64(1)) && (float64(sampleRateFloat) < float64(100)) { | ||
return int32(sampleRateFloat) | ||
// Fallback is not to sample so we set it to 0 | ||
} else { | ||
return int32(0) | ||
} | ||
|
||
return int32(0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of adding this extra |
||
} | ||
|
||
func (c *Channel) initPQ() { | ||
pqSize := int(math.Max(1, float64(c.options.memQueueSize)/10)) | ||
|
||
|
@@ -392,7 +452,7 @@ func (c *Channel) RemoveClient(client Consumer) { | |
c.clients = finalClients | ||
} | ||
|
||
if len(c.clients) == 0 && c.ephemeralChannel == true { | ||
if len(c.clients) == 0 && c.params.ephemeralChannel == true { | ||
go c.deleter.Do(func() { c.deleteCallback(c) }) | ||
} | ||
} | ||
|
@@ -572,6 +632,13 @@ func (c *Channel) messagePump() { | |
goto exit | ||
} | ||
|
||
// If we are sampling, discard the sampled messages here | ||
if c.params.sampleRate > 0 { | ||
if rand.Int31n(100) > c.params.sampleRate { | ||
continue | ||
} | ||
} | ||
|
||
msg.Attempts++ | ||
|
||
atomic.StoreInt32(&c.bufferedCount, 1) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this section probably deserves to be its own function