diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 11983c651..bc77cef2d 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -87,7 +87,7 @@ func TestInFlightWorker(t *testing.T) { channel := topic.GetChannel("channel") for i := 0; i < count; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test")) + msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) } @@ -129,7 +129,7 @@ func TestChannelEmpty(t *testing.T) { msgs := make([]*Message, 0, 25) for i := 0; i < 25; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test")) + msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) msgs = append(msgs, msg) } @@ -167,7 +167,7 @@ func TestChannelEmptyConsumer(t *testing.T) { channel.AddClient(client.ID, client) for i := 0; i < 25; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test")) + msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) client.SendingMessage() } @@ -206,15 +206,15 @@ func TestChannelHealth(t *testing.T) { channel.backend = &errorBackendQueue{} - msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg := NewMessage(topic.GenerateID(), make([]byte, 100)) err := channel.PutMessage(msg) equal(t, err, nil) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) equal(t, err, nil) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) nequal(t, err, nil) @@ -228,7 +228,7 @@ func TestChannelHealth(t *testing.T) { channel.backend = &errorRecoveredBackendQueue{} - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) equal(t, err, nil) diff --git a/nsqd/guid.go b/nsqd/guid.go index 0fa5823a1..4e5ba021e 100644 --- a/nsqd/guid.go +++ b/nsqd/guid.go @@ -1,73 +1,18 @@ package nsqd -// the core algorithm here was borrowed from: -// Blake Mizerany's `noeqd` https://github.com/bmizerany/noeqd -// and indirectly: -// Twitter's `snowflake` https://github.com/twitter/snowflake - -// only minor cleanup and changes to introduce a type, combine the concept -// of workerID + datacenterId into a single identifier, and modify the -// behavior when sequences rollover for our specific implementation needs - import ( "encoding/hex" - "errors" - "time" + "sync/atomic" ) -const ( - workerIDBits = uint64(10) - sequenceBits = uint64(12) - workerIDShift = sequenceBits - timestampShift = sequenceBits + workerIDBits - sequenceMask = int64(-1) ^ (int64(-1) << sequenceBits) - - // ( 2012-10-28 16:23:42 UTC ).UnixNano() >> 20 - twepoch = int64(1288834974288) -) - -var ErrTimeBackwards = errors.New("time has gone backwards") -var ErrSequenceExpired = errors.New("sequence expired") -var ErrIDBackwards = errors.New("ID went backward") - type guid int64 type guidFactory struct { - sequence int64 - lastTimestamp int64 - lastID guid + sequence int64 } -func (f *guidFactory) NewGUID(workerID int64) (guid, error) { - // divide by 1048576, giving pseudo-milliseconds - ts := time.Now().UnixNano() >> 20 - - if ts < f.lastTimestamp { - return 0, ErrTimeBackwards - } - - if f.lastTimestamp == ts { - f.sequence = (f.sequence + 1) & sequenceMask - if f.sequence == 0 { - return 0, ErrSequenceExpired - } - } else { - f.sequence = 0 - } - - f.lastTimestamp = ts - - id := guid(((ts - twepoch) << timestampShift) | - (workerID << workerIDShift) | - f.sequence) - - if id <= f.lastID { - return 0, ErrIDBackwards - } - - f.lastID = id - - return id, nil +func (f *guidFactory) NewGUID() guid { + return guid(atomic.AddInt64(&f.sequence, 1)) } func (g guid) Hex() MessageID { diff --git a/nsqd/guid_test.go b/nsqd/guid_test.go index 946b742c7..0056c5460 100644 --- a/nsqd/guid_test.go +++ b/nsqd/guid_test.go @@ -25,10 +25,6 @@ func BenchmarkGUIDUnsafe(b *testing.B) { func BenchmarkGUID(b *testing.B) { factory := &guidFactory{} for i := 0; i < b.N; i++ { - guid, err := factory.NewGUID(0) - if err != nil { - continue - } - guid.Hex() + factory.NewGUID().Hex() } } diff --git a/nsqd/http.go b/nsqd/http.go index fe64bd102..2f4b26892 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -236,7 +236,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout } } - msg := NewMessage(<-s.ctx.nsqd.idChan, body) + msg := NewMessage(topic.GenerateID(), body) msg.deferred = deferred err = topic.PutMessage(msg) if err != nil { @@ -265,8 +265,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou _, ok := reqParams["binary"] if ok { tmp := make([]byte, 4) - msgs, err = readMPUB(req.Body, tmp, s.ctx.nsqd.idChan, - s.ctx.nsqd.getOpts().MaxMsgSize) + msgs, err = readMPUB(req.Body, tmp, topic, s.ctx.nsqd.getOpts().MaxMsgSize) if err != nil { return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]} } @@ -304,7 +303,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou return nil, http_api.Err{413, "MSG_TOO_BIG"} } - msg := NewMessage(<-s.ctx.nsqd.idChan, block) + msg := NewMessage(topic.GenerateID(), block) msgs = append(msgs, msg) } } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 0a161b88d..93b8f0e21 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -11,7 +11,6 @@ import ( "net" "os" "path" - "runtime" "strings" "sync" "sync/atomic" @@ -61,7 +60,6 @@ type NSQD struct { poolSize int - idChan chan MessageID notifyChan chan interface{} optsNotificationChan chan struct{} exitChan chan int @@ -80,7 +78,6 @@ func New(opts *Options) *NSQD { n := &NSQD{ startTime: time.Now(), topicMap: make(map[string]*Topic), - idChan: make(chan MessageID, 4096), exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), @@ -254,7 +251,6 @@ func (n *NSQD) Main() { }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) - n.waitGroup.Wrap(func() { n.idPump() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) @@ -419,8 +415,6 @@ func (n *NSQD) Exit() { } n.Unlock() - // we want to do this last as it closes the idPump (if closed first it - // could potentially starve items in process and deadlock) close(n.exitChan) n.waitGroup.Wait() @@ -524,33 +518,6 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error { return nil } -func (n *NSQD) idPump() { - factory := &guidFactory{} - lastError := time.Unix(0, 0) - workerID := n.getOpts().ID - for { - id, err := factory.NewGUID(workerID) - if err != nil { - now := time.Now() - if now.Sub(lastError) > time.Second { - // only print the error once/second - n.logf("ERROR: %s", err) - lastError = now - } - runtime.Gosched() - continue - } - select { - case n.idChan <- id.Hex(): - case <-n.exitChan: - goto exit - } - } - -exit: - n.logf("ID: closing") -} - func (n *NSQD) Notify(v interface{}) { // since the in-memory metadata is incomplete, // should not persist metadata while loading it. diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index 1828165fc..f32c7fe51 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -124,7 +124,7 @@ func TestStartup(t *testing.T) { body := make([]byte, 256) topic := nsqd.GetTopic(topicName) for i := 0; i < iterations; i++ { - msg := NewMessage(<-nsqd.idChan, body) + msg := NewMessage(topic.GenerateID(), body) topic.PutMessage(msg) } @@ -226,7 +226,7 @@ func TestEphemeralTopicsAndChannels(t *testing.T) { client := newClientV2(0, nil, &context{nsqd}) ephemeralChannel.AddClient(client.ID, client) - msg := NewMessage(<-nsqd.idChan, body) + msg := NewMessage(topic.GenerateID(), body) topic.PutMessage(msg) msg = <-ephemeralChannel.clientMsgChan equal(t, msg.Body, body) diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 809d24fce..44dab3fd6 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -761,7 +761,7 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { } topic := p.ctx.nsqd.GetTopic(topicName) - msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody) + msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) @@ -783,6 +783,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("E_BAD_TOPIC MPUB topic name %q is not valid", topicName)) } + if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil { + return nil, err + } + + topic := p.ctx.nsqd.GetTopic(topicName) + bodyLen, err := readLen(client.Reader, client.lenSlice) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size") @@ -798,18 +804,12 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize)) } - messages, err := readMPUB(client.Reader, client.lenSlice, p.ctx.nsqd.idChan, + messages, err := readMPUB(client.Reader, client.lenSlice, topic, p.ctx.nsqd.getOpts().MaxMsgSize) if err != nil { return nil, err } - if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil { - return nil, err - } - - topic := p.ctx.nsqd.GetTopic(topicName) - // if we've made it this far we've validated all the input, // the only possible error is that the topic is exiting during // this next call (and no messages will be queued in that case) @@ -873,7 +873,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { } topic := p.ctx.nsqd.GetTopic(topicName) - msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody) + msg := NewMessage(topic.GenerateID(), messageBody) msg.deferred = timeoutDuration err = topic.PutMessage(msg) if err != nil { @@ -910,7 +910,7 @@ func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) { return nil, nil } -func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int64) ([]*Message, error) { +func readMPUB(r io.Reader, tmp []byte, topic *Topic, maxMessageSize int64) ([]*Message, error) { numMessages, err := readLen(r, tmp) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count") @@ -945,7 +945,7 @@ func readMPUB(r io.Reader, tmp []byte, idChan chan MessageID, maxMessageSize int return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "MPUB failed to read message body") } - messages = append(messages, NewMessage(<-idChan, msgBody)) + messages = append(messages, NewMessage(topic.GenerateID(), msgBody)) } return messages, nil diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index aa5f2f8a7..570c72456 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -130,7 +130,7 @@ func TestBasicV2(t *testing.T) { topicName := "test_v2" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) conn, err := mustConnectNSQD(tcpAddr) @@ -165,7 +165,7 @@ func TestMultipleConsumerV2(t *testing.T) { topicName := "test_multiple_v2" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.GetChannel("ch1") topic.GetChannel("ch2") topic.PutMessage(msg) @@ -372,7 +372,7 @@ func TestPausing(t *testing.T) { equal(t, err, nil) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) channel := topic.GetChannel("ch") topic.PutMessage(msg) @@ -397,7 +397,7 @@ func TestPausing(t *testing.T) { // sleep to allow the paused state to take effect time.Sleep(50 * time.Millisecond) - msg = NewMessage(<-nsqd.idChan, []byte("test body2")) + msg = NewMessage(topic.GenerateID(), []byte("test body2")) topic.PutMessage(msg) // allow the client to possibly get a message, the test would hang indefinitely @@ -409,7 +409,7 @@ func TestPausing(t *testing.T) { // unpause the channel... the client should now be pushed a message channel.UnPause() - msg = NewMessage(<-nsqd.idChan, []byte("test body3")) + msg = NewMessage(topic.GenerateID(), []byte("test body3")) topic.PutMessage(msg) resp, _ = nsq.ReadResponse(conn) @@ -614,7 +614,7 @@ func TestTouch(t *testing.T) { topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("ch") - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) _, err = nsq.Ready(1).WriteTo(conn) @@ -656,7 +656,7 @@ func TestMaxRdyCount(t *testing.T) { defer conn.Close() topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) data := identify(t, conn, nil, frameTypeResponse) @@ -732,7 +732,7 @@ func TestOutputBuffering(t *testing.T) { outputBufferTimeout := 500 topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, make([]byte, outputBufferSize-1024)) + msg := NewMessage(topic.GenerateID(), make([]byte, outputBufferSize-1024)) topic.PutMessage(msg) start := time.Now() @@ -1128,7 +1128,7 @@ func TestSnappy(t *testing.T) { equal(t, err, nil) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, msgBody) + msg := NewMessage(topic.GenerateID(), msgBody) topic.PutMessage(msg) resp, _ = nsq.ReadResponse(compressConn) @@ -1222,7 +1222,7 @@ func TestSampling(t *testing.T) { topicName := "test_sampling" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) for i := 0; i < num; i++ { - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) } channel := topic.GetChannel("ch") @@ -1325,13 +1325,13 @@ func TestClientMsgTimeout(t *testing.T) { topicName := "test_cmsg_timeout" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg := NewMessage(topic.GenerateID(), make([]byte, 100)) topic.PutMessage(msg) // without this the race detector thinks there's a write // to msg.Attempts that races with the read in the protocol's messagePump... // it does not reflect a realistically possible condition - topic.PutMessage(NewMessage(<-nsqd.idChan, make([]byte, 100))) + topic.PutMessage(NewMessage(topic.GenerateID(), make([]byte, 100))) conn, err := mustConnectNSQD(tcpAddr) equal(t, err, nil) @@ -1655,7 +1655,7 @@ func benchmarkProtocolV2Sub(b *testing.B, size int) { topicName := "bench_v2_sub" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) for i := 0; i < b.N; i++ { - msg := NewMessage(<-nsqd.idChan, msg) + msg := NewMessage(topic.GenerateID(), msg) topic.PutMessage(msg) } topic.GetChannel("ch") @@ -1755,7 +1755,7 @@ func benchmarkProtocolV2MultiSub(b *testing.B, num int) { topicName := "bench_v2" + strconv.Itoa(b.N) + "_" + strconv.Itoa(i) + "_" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) for i := 0; i < b.N; i++ { - msg := NewMessage(<-nsqd.idChan, msg) + msg := NewMessage(topic.GenerateID(), msg) topic.PutMessage(msg) } topic.GetChannel("ch") diff --git a/nsqd/stats_test.go b/nsqd/stats_test.go index 231a78f1e..1be6b7218 100644 --- a/nsqd/stats_test.go +++ b/nsqd/stats_test.go @@ -20,7 +20,7 @@ func TestStats(t *testing.T) { topicName := "test_stats" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("test body")) + msg := NewMessage(topic.GenerateID(), []byte("test body")) topic.PutMessage(msg) conn, err := mustConnectNSQD(tcpAddr) diff --git a/nsqd/topic.go b/nsqd/topic.go index 62f604e28..941e4536e 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -25,6 +25,7 @@ type Topic struct { channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 + idFactory *guidFactory ephemeral bool deleteCallback func(*Topic) @@ -47,6 +48,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi ctx: ctx, pauseChan: make(chan bool), deleteCallback: deleteCallback, + idFactory: &guidFactory{}, } if strings.HasSuffix(topicName, "#ephemeral") { @@ -435,3 +437,7 @@ func (t *Topic) doPause(pause bool) error { func (t *Topic) IsPaused() bool { return atomic.LoadInt32(&t.paused) == 1 } + +func (t *Topic) GenerateID() MessageID { + return t.idFactory.NewGUID().Hex() +} diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index 3a0f183f2..8d28cd94f 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -74,19 +74,19 @@ func TestHealth(t *testing.T) { topic := nsqd.GetTopic("test") topic.backend = &errorBackendQueue{} - msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg := NewMessage(topic.GenerateID(), make([]byte, 100)) err := topic.PutMessage(msg) equal(t, err, nil) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessages([]*Message{msg}) equal(t, err, nil) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessage(msg) nequal(t, err, nil) - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessages([]*Message{msg}) nequal(t, err, nil) @@ -100,7 +100,7 @@ func TestHealth(t *testing.T) { topic.backend = &errorRecoveredBackendQueue{} - msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = topic.PutMessages([]*Message{msg}) equal(t, err, nil) @@ -153,7 +153,7 @@ func TestDeleteLast(t *testing.T) { equal(t, nil, err) equal(t, 0, len(topic.channelMap)) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) err = topic.PutMessage(msg) time.Sleep(100 * time.Millisecond) equal(t, nil, err) @@ -175,7 +175,7 @@ func TestPause(t *testing.T) { channel := topic.GetChannel("ch1") nequal(t, channel, nil) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) err = topic.PutMessage(msg) equal(t, err, nil) @@ -219,7 +219,7 @@ func BenchmarkTopicPut(b *testing.B) { for i := 0; i <= b.N; i++ { topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) topic.PutMessage(msg) } } @@ -239,7 +239,7 @@ func BenchmarkTopicToChannelPut(b *testing.B) { for i := 0; i <= b.N; i++ { topic := nsqd.GetTopic(topicName) - msg := NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) + msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) topic.PutMessage(msg) }