Skip to content

Commit

Permalink
Merge pull request nsqio#121 from mreiferson/buffered_producer_chan_121
Browse files Browse the repository at this point in the history
producer: buffer transaction chan
  • Loading branch information
jehiah committed Feb 22, 2015
2 parents a3e0f55 + e3169fc commit a03f590
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 31 deletions.
5 changes: 0 additions & 5 deletions command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@ package nsq

import (
"bytes"
"io/ioutil"
"log"
"os"
"testing"
)

func BenchmarkCommand(b *testing.B) {
b.StopTimer()
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
data := make([]byte, 2048)
cmd := Publish("test", data)
var buf bytes.Buffer
Expand Down
4 changes: 0 additions & 4 deletions mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"io/ioutil"
"log"
"net"
"os"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -182,9 +181,6 @@ func (h *testHandler) HandleMessage(message *Message) error {
}

func TestConsumerBackoff(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

logger := log.New(ioutil.Discard, "", log.LstdFlags)

var mgood bytes.Buffer
Expand Down
10 changes: 9 additions & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"time"
)

type producerConn interface {
String() string
SetLogger(logger, LogLevel, string)
Connect() (*IdentifyResponse, error)
Close() error
WriteCommand(*Command) error
}

// Producer is a high-level type to publish to NSQ.
//
// A Producer instance is 1:1 with a destination `nsqd`
Expand All @@ -17,7 +25,7 @@ import (
type Producer struct {
id int64
addr string
conn *Conn
conn producerConn
config Config

logger logger
Expand Down
107 changes: 86 additions & 21 deletions producer_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package nsq

import (
"bytes"
"errors"
"io/ioutil"
"log"
"os"
"runtime"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -35,9 +36,6 @@ func (h *ConsumerHandler) HandleMessage(message *Message) error {
}

func TestProducerConnection(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

config := NewConfig()
w, _ := NewProducer("127.0.0.1:4150", config)
w.SetLogger(nullLogger, LogLevelInfo)
Expand All @@ -56,9 +54,6 @@ func TestProducerConnection(t *testing.T) {
}

func TestProducerPublish(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

topicName := "publish" + strconv.Itoa(int(time.Now().Unix()))
msgCount := 10

Expand All @@ -83,9 +78,6 @@ func TestProducerPublish(t *testing.T) {
}

func TestProducerMultiPublish(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

topicName := "multi_publish" + strconv.Itoa(int(time.Now().Unix()))
msgCount := 10

Expand Down Expand Up @@ -113,9 +105,6 @@ func TestProducerMultiPublish(t *testing.T) {
}

func TestProducerPublishAsync(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

topicName := "async_publish" + strconv.Itoa(int(time.Now().Unix()))
msgCount := 10

Expand Down Expand Up @@ -151,9 +140,6 @@ func TestProducerPublishAsync(t *testing.T) {
}

func TestProducerMultiPublishAsync(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

topicName := "multi_publish" + strconv.Itoa(int(time.Now().Unix()))
msgCount := 10

Expand Down Expand Up @@ -193,9 +179,6 @@ func TestProducerMultiPublishAsync(t *testing.T) {
}

func TestProducerHeartbeat(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

topicName := "heartbeat" + strconv.Itoa(int(time.Now().Unix()))

config := NewConfig()
Expand Down Expand Up @@ -269,3 +252,85 @@ func readMessages(topicName string, t *testing.T, msgCount int) {
t.Fatal("failed message not done")
}
}

type mockProducerConn struct {
delegate ConnDelegate
closeCh chan struct{}
pubCh chan struct{}
}

func newMockProducerConn(delegate ConnDelegate) producerConn {
m := &mockProducerConn{
delegate: delegate,
closeCh: make(chan struct{}),
pubCh: make(chan struct{}, 4),
}
go m.router()
return m
}

func (m *mockProducerConn) String() string {
return "127.0.0.1:0"
}

func (m *mockProducerConn) SetLogger(logger logger, level LogLevel, prefix string) {}

func (m *mockProducerConn) Connect() (*IdentifyResponse, error) {
return &IdentifyResponse{}, nil
}

func (m *mockProducerConn) Close() error {
close(m.closeCh)
return nil
}

func (m *mockProducerConn) WriteCommand(cmd *Command) error {
if bytes.Equal(cmd.Name, []byte("PUB")) {
m.pubCh <- struct{}{}
}
return nil
}

func (m *mockProducerConn) router() {
for {
select {
case <-m.closeCh:
goto exit
case <-m.pubCh:
m.delegate.OnResponse(nil, framedResponse(FrameTypeResponse, []byte("OK")))
}
}
exit:
}

func BenchmarkProducer(b *testing.B) {
b.StopTimer()
body := make([]byte, 512)

config := NewConfig()
p, _ := NewProducer("127.0.0.1:0", config)

p.conn = newMockProducerConn(&producerConnDelegate{p})
atomic.StoreInt32(&p.state, StateConnected)
p.closeChan = make(chan int)
go p.router()

startCh := make(chan struct{})
var wg sync.WaitGroup
parallel := runtime.GOMAXPROCS(0)

for j := 0; j < parallel; j++ {
wg.Add(1)
go func() {
<-startCh
for i := 0; i < b.N/parallel; i++ {
p.Publish("test", body)
}
wg.Done()
}()
}

b.StartTimer()
close(startCh)
wg.Wait()
}

0 comments on commit a03f590

Please sign in to comment.