Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: channel sampling #223

Closed
wants to merge 9 commits into from
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ _testmain.go
*.exe

profile

# Vim files
*.sw[op]
2 changes: 1 addition & 1 deletion nsq/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
const DefaultClientTimeout = 60 * time.Second

var validTopicNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+$`)
var validChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)
var validChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#[\.;a-zA-Z0-9_\-=]+)?$`)

// IsValidTopicName checks a topic name for correctness
func IsValidTopicName(name string) bool {
Expand Down
85 changes: 76 additions & 9 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/bitly/nsq/util/pqueue"
"log"
"math"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -44,7 +46,8 @@ type Channel struct {
notifier Notifier
options *nsqdOptions

backend BackendQueue
backend BackendQueue
properties channelProperties

incomingMsgChan chan *nsq.Message
memoryMsgChan chan *nsq.Message
Expand All @@ -54,11 +57,10 @@ type Channel struct {
exitFlag int32

// state tracking
clients []Consumer
paused int32
ephemeralChannel bool
deleteCallback func(*Channel)
deleter sync.Once
clients []Consumer
paused int32
deleteCallback func(*Channel)
deleter sync.Once

// TODO: these can be DRYd up
deferredMessages map[nsq.MessageID]*pqueue.Item
Expand All @@ -81,6 +83,12 @@ type inFlightMessage struct {
ts time.Time
}

// Channel property struct
type channelProperties struct {
ephemeralChannel bool
sampleRate int32
}

// NewChannel creates a new instance of the Channel type and returns a pointer
func NewChannel(topicName string, channelName string, options *nsqdOptions,
notifier Notifier, deleteCallback func(*Channel)) *Channel {
Expand All @@ -101,8 +109,14 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions,

c.initPQ()

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeralChannel = true
// Split the channel name into properties if any are passed
channelProperties := strings.SplitN(channelName, "#", 2)
if len(channelProperties) > 1 {
c.setChannelProperties(channelProperties[1])
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this section probably deserves to be its own function


// Create the channels
if c.properties.ephemeralChannel == true {
c.backend = NewDummyBackendQueue()
} else {
c.backend = NewDiskQueue(backendName, options.dataPath, options.maxBytesPerFile,
Expand All @@ -120,6 +134,52 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions,
return c
}

// Set the channel properties for the channel by parsing the channel name string properties
func (c *Channel) setChannelProperties(properties string) {
channelProperties := strings.Split(properties, ";")
// Iterate over all properties and set the valid ones on the channel
for _, property := range channelProperties {
var propertyField = ""
var propertyValue = ""
if strings.Contains(property, "=") {
p := strings.Split(property, "=")
propertyField = p[0]
propertyValue = p[1]
} else {
propertyField = property
propertyValue = "true"
}
switch strings.ToLower(propertyField) {
case "ephemeral":
if propertyValue == "true" {
c.properties.ephemeralChannel = true
} else {
c.properties.ephemeralChannel = false
}
case "samplerate":
c.properties.sampleRate = c.validateSampleRate(propertyValue)
}
}
}

// Need to validate the sampleRate passed in on channel instantiation
func (c *Channel) validateSampleRate(dirtySampleRate string) int32 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two things

  1. I think sample rate should be an int where 1 <= sampleRate <= 99, so we dont have to deal with floats on the server side
  2. this should return an error too so that we can propagate back up the chain if things weren't correct (and fail fatally). unfortunately, this will need a bit of refactoring because we currently assume that NewChannel cannot fail (and it would need to propagate the error as well)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is making the assumption that everyone will want to sample at a resolution of 1%. What about the possibility of someone wanting to sample at 33.33%? This makes send when people are doing billions of messages, as that extra decimal can make the difference of hundreds of thousands of messages. I would lean toward only allowing floats and not allowing the 0-100.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it was implemented before it was the same resolution (it converted to int on the way out).

I personally don't care that much... a percentage is a percentage is a percentage... losing fractions of a percent granularity is meh IMO.

sampleRate, err := strconv.ParseInt(dirtySampleRate, 10, 0)
// If we get an error when trying to ParseInt, fail on channel creation
if err != nil {
log.Printf("Float conversion error on %s: Setting sample rate to 0", dirtySampleRate)
}

// If 1<rate<100, consider it a number and use that (100 means no sampling)
if (int32(sampleRate) >= int32(1)) && (int32(sampleRate) <= int32(100)) {
return int32(sampleRate)
} else {
// Here we fail on channel creation because the sampleRate makes no sense
}

return int32(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of adding this extra return you can just drop the last else case... cleaner

}

func (c *Channel) initPQ() {
pqSize := int(math.Max(1, float64(c.options.memQueueSize)/10))

Expand Down Expand Up @@ -392,7 +452,7 @@ func (c *Channel) RemoveClient(client Consumer) {
c.clients = finalClients
}

if len(c.clients) == 0 && c.ephemeralChannel == true {
if len(c.clients) == 0 && c.properties.ephemeralChannel == true {
go c.deleter.Do(func() { c.deleteCallback(c) })
}
}
Expand Down Expand Up @@ -572,6 +632,13 @@ func (c *Channel) messagePump() {
goto exit
}

// If we are sampling, discard the sampled messages here
if c.properties.sampleRate > 0 {
if rand.Int31n(100) > c.properties.sampleRate {
continue
}
}

msg.Attempts++

atomic.StoreInt32(&c.bufferedCount, 1)
Expand Down
45 changes: 45 additions & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,48 @@ func TestChannelEmptyConsumer(t *testing.T) {
}

}

// ensure that we can push $n messages through a topic and get the sampled
// percentage of those messages out the channel
func TestPutMessageOnSampledChannel(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

nsqd = NewNSQd(1, NewNsqdOptions())
defer nsqd.Exit()

topicName := "test_put_message" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("sampled_channel#sampleRate=25")
assert.Equal(t, channel.properties.sampleRate, int32(25))
log.Println("Channel Sample Rate: ", channel.properties.sampleRate)

var inFlightMessageCount int32

for i := 0; i < 100; i++ {
var id nsq.MessageID
msg := nsq.NewMessage(id, []byte("test"))
topic.PutMessage(msg)
}

// Churn messages until the queue is empty and see how many messages are inFlight
for channel.Depth() > int64(0) {
select {
case outputMsg := <-channel.clientMsgChan:
// Just to use outputMsg
assert.Equal(t, outputMsg.Id, outputMsg.Id)
inFlightMessageCount++
case <-time.After(10 * time.Millisecond):
}
}

// Check to see if inFlightMessageCount is roughly between 20-30
// Statisical anomalies of randomness can allow for slight deviations
if (inFlightMessageCount > 20) && (inFlightMessageCount < 30) {
assert.Equal(t, inFlightMessageCount, inFlightMessageCount)
} else {
assert.NotEqual(t, inFlightMessageCount, inFlightMessageCount)
}

assert.NotEqual(t, inFlightMessageCount, int32(100))
}
4 changes: 4 additions & 0 deletions nsqd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"hash/crc32"
"io"
"log"
"math/rand"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -136,6 +137,9 @@ func main() {
nsqd.httpAddr = httpAddr
nsqd.lookupdTCPAddrs = lookupdTCPAddrs

// Set the random seed for the future
rand.Seed(time.Now().UTC().UnixNano())

nsqd.LoadMetadata()
err = nsqd.PersistMetadata()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (n *NSQd) PersistMetadata() error {
topic.Lock()
for _, channel := range topic.channelMap {
channel.Lock()
if !channel.ephemeralChannel {
if !channel.properties.ephemeralChannel {
channelData := make(map[string]interface{})
channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused()
Expand Down