Skip to content

Commit

Permalink
Merge pull request #229 from mreiferson/compression_229
Browse files Browse the repository at this point in the history
nsqd: snappy/deflate compression
  • Loading branch information
jehiah committed Sep 10, 2013
2 parents 085b0a4 + a237e04 commit 57fea7e
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 43 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ env:
- GOARCH=amd64
- GOARCH=386
install:
- go get github.com/bmizerany/assert
- go get github.com/bitly/go-nsq
- go get github.com/bitly/go-hostpool
- go get github.com/bitly/go-simplejson
- go get github.com/mreiferson/go-snappystream
- go get github.com/bitly/go-hostpool
- go get github.com/bmizerany/assert
script:
- pushd $TRAVIS_BUILD_DIR
- ./test.sh
Expand Down
3 changes: 2 additions & 1 deletion dist.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ git archive HEAD | tar -x -C $TMPGOPATH/src/github.com/bitly/nsq
export GOPATH="$TMPGOPATH:$GOROOT"

echo "... getting dependencies"
go get -v github.com/bitly/go-nsq
go get -v github.com/bitly/go-simplejson
go get -v github.com/mreiferson/go-snappystream
go get -v github.com/bitly/go-hostpool
go get -v github.com/bitly/go-nsq
go get -v github.com/bmizerany/assert

pushd $TMPGOPATH/src/github.com/bitly/nsq
Expand Down
76 changes: 66 additions & 10 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package main

import (
"bufio"
"compress/flate"
"crypto/tls"
"errors"
"fmt"
"github.com/bitly/go-nsq"
"github.com/mreiferson/go-snappystream"
"log"
"net"
"sync"
"sync/atomic"
"time"
)

const DefaultBufferSize = 16 * 1024

type IdentifyDataV2 struct {
ShortId string `json:"short_id"`
LongId string `json:"long_id"`
Expand All @@ -21,6 +25,9 @@ type IdentifyDataV2 struct {
OutputBufferTimeout int `json:"output_buffer_timeout"`
FeatureNegotiation bool `json:"feature_negotiation"`
TLSv1 bool `json:"tls_v1"`
Deflate bool `json:"deflate"`
DeflateLevel int `json:"deflate_level"`
Snappy bool `json:"snappy"`
}

type ClientV2 struct {
Expand All @@ -32,16 +39,23 @@ type ClientV2 struct {
FinishCount uint64
RequeueCount uint64

net.Conn
sync.Mutex

ID int64
context *Context
tlsConn net.Conn

// buffered IO
Reader *bufio.Reader
Writer *bufio.Writer
// original connection
net.Conn

// connections based on negotiated features
tlsConn *tls.Conn
flateWriter *flate.Writer

// reading/writing interfaces
Reader *bufio.Reader
Writer *bufio.Writer

// output buffering
OutputBufferSize int
OutputBufferTimeout *time.Ticker
OutputBufferTimeoutUpdateChan chan time.Duration
Expand Down Expand Up @@ -73,12 +87,14 @@ func NewClientV2(id int64, conn net.Conn, context *Context) *ClientV2 {

c := &ClientV2{
ID: id,
Conn: conn,
context: context,

Reader: bufio.NewReaderSize(conn, 16*1024),
Writer: bufio.NewWriterSize(conn, 16*1024),
OutputBufferSize: 16 * 1024,
Conn: conn,

Reader: bufio.NewReaderSize(conn, DefaultBufferSize),
Writer: bufio.NewWriterSize(conn, DefaultBufferSize),

OutputBufferSize: DefaultBufferSize,
OutputBufferTimeout: time.NewTicker(250 * time.Millisecond),
OutputBufferTimeoutUpdateChan: make(chan time.Duration, 1),

Expand Down Expand Up @@ -309,16 +325,56 @@ func (c *ClientV2) UpgradeTLS() error {
}
c.tlsConn = tlsConn

c.Reader = bufio.NewReaderSize(c.tlsConn, 16*1024)
c.Reader = bufio.NewReaderSize(c.tlsConn, DefaultBufferSize)
c.Writer = bufio.NewWriterSize(c.tlsConn, c.OutputBufferSize)

return nil
}

func (c *ClientV2) UpgradeDeflate(level int) error {
c.Lock()
defer c.Unlock()

conn := c.Conn
if c.tlsConn != nil {
conn = c.tlsConn
}

c.Reader = bufio.NewReaderSize(flate.NewReader(conn), DefaultBufferSize)

fw, _ := flate.NewWriter(conn, level)
c.flateWriter = fw
c.Writer = bufio.NewWriterSize(fw, c.OutputBufferSize)

return nil
}

func (c *ClientV2) UpgradeSnappy() error {
c.Lock()
defer c.Unlock()

conn := c.Conn
if c.tlsConn != nil {
conn = c.tlsConn
}

c.Reader = bufio.NewReaderSize(snappystream.NewReader(conn, snappystream.SkipVerifyChecksum), DefaultBufferSize)
c.Writer = bufio.NewWriterSize(snappystream.NewWriter(conn), c.OutputBufferSize)

return nil
}

func (c *ClientV2) Flush() error {
c.SetWriteDeadline(time.Now().Add(time.Second))

err := c.Writer.Flush()
if err != nil {
return err
}

if c.flateWriter != nil {
return c.flateWriter.Flush()
}

return nil
}
12 changes: 12 additions & 0 deletions nsqd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ var (
// TLS config
tlsCert = flag.String("tls-cert", "", "path to certificate file")
tlsKey = flag.String("tls-key", "", "path to private key file")

// compression
deflateEnabled = flag.Bool("deflate", true, "enable deflate feature negotiation (client compression)")
maxDeflateLevel = flag.Int("max-deflate-level", 6, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
snappyEnabled = flag.Bool("snappy", true, "enable snappy feature negotiation (client compression)")
)

func init() {
Expand Down Expand Up @@ -107,6 +112,10 @@ func main() {
// flagToDuration will fatally error if it is invalid
msgTimeoutDuration := flagToDuration(*msgTimeout, time.Millisecond, "--msg-timeout")

if *maxDeflateLevel < 1 || *maxDeflateLevel > 9 {
log.Fatalf("--max-deflate-level must be [1,9]")
}

options := NewNsqdOptions()
options.maxRdyCount = *maxRdyCount
options.maxMessageSize = *maxMessageSize
Expand All @@ -124,6 +133,9 @@ func main() {
options.maxOutputBufferTimeout = *maxOutputBufferTimeout
options.tlsCert = *tlsCert
options.tlsKey = *tlsKey
options.deflateEnabled = *deflateEnabled
options.maxDeflateLevel = *maxDeflateLevel
options.snappyEnabled = *snappyEnabled

if *statsdAddress != "" {
// flagToDuration will fatally error if it is invalid
Expand Down
9 changes: 9 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ type nsqdOptions struct {
// TLS config
tlsCert string
tlsKey string

// compression
deflateEnabled bool
maxDeflateLevel int
snappyEnabled bool
}

func NewNsqdOptions() *nsqdOptions {
Expand Down Expand Up @@ -106,6 +111,10 @@ func NewNsqdOptions() *nsqdOptions {

tlsCert: "",
tlsKey: "",

deflateEnabled: true,
maxDeflateLevel: -1,
snappyEnabled: true,
}
}

Expand Down
88 changes: 67 additions & 21 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/bitly/nsq/util"
"io"
"log"
"math"
"net"
"sync/atomic"
"time"
Expand Down Expand Up @@ -120,27 +121,21 @@ func (p *ProtocolV2) SendMessage(client *ClientV2, msg *nsq.Message, buf *bytes.

func (p *ProtocolV2) Send(client *ClientV2, frameType int32, data []byte) error {
client.Lock()
defer client.Unlock()

client.SetWriteDeadline(time.Now().Add(time.Second))
_, err := util.SendFramedResponse(client.Writer, frameType, data)
if err != nil {
client.Unlock()
return err
}

if frameType != nsq.FrameTypeMessage {
err = client.Flush()
}

return err
}

func (p *ProtocolV2) Flush(client *ClientV2) error {
client.Lock()
defer client.Unlock()
client.Unlock()

client.SetWriteDeadline(time.Now().Add(time.Second))
return client.Flush()
return err
}

func (p *ProtocolV2) Exec(client *ClientV2, params [][]byte) ([]byte, error) {
Expand Down Expand Up @@ -196,7 +191,9 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
clientMsgChan = nil
flusherChan = nil
// force flush
err = p.Flush(client)
client.Lock()
err = client.Flush()
client.Unlock()
if err != nil {
goto exit
}
Expand All @@ -218,7 +215,9 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
err = p.Flush(client)
client.Lock()
err = client.Flush()
client.Unlock()
if err != nil {
goto exit
}
Expand Down Expand Up @@ -313,19 +312,40 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
}

tlsv1 := p.context.nsqd.tlsConfig != nil && identifyData.TLSv1
deflate := p.context.nsqd.options.deflateEnabled && identifyData.Deflate
deflateLevel := 0
if deflate {
if identifyData.DeflateLevel <= 0 {
deflateLevel = 6
}
deflateLevel = int(math.Min(float64(deflateLevel), float64(p.context.nsqd.options.maxDeflateLevel)))
}
snappy := p.context.nsqd.options.snappyEnabled && identifyData.Snappy

if deflate && snappy {
return nil, util.NewFatalClientErr(nil, "E_IDENTIFY_FAILED", "cannot enable both deflate and snappy compression")
}

resp, err := json.Marshal(struct {
MaxRdyCount int64 `json:"max_rdy_count"`
Version string `json:"version"`
MaxMsgTimeout int64 `json:"max_msg_timeout"`
MsgTimeout int64 `json:"msg_timeout"`
TLSv1 bool `json:"tls_v1"`
MaxRdyCount int64 `json:"max_rdy_count"`
Version string `json:"version"`
MaxMsgTimeout int64 `json:"max_msg_timeout"`
MsgTimeout int64 `json:"msg_timeout"`
TLSv1 bool `json:"tls_v1"`
Deflate bool `json:"deflate"`
DeflateLevel int `json:"deflate_level"`
MaxDeflateLevel int `json:"max_deflate_level"`
Snappy bool `json:"snappy"`
}{
MaxRdyCount: p.context.nsqd.options.maxRdyCount,
Version: util.BINARY_VERSION,
MaxMsgTimeout: int64(p.context.nsqd.options.maxMsgTimeout / time.Millisecond),
MsgTimeout: int64(p.context.nsqd.options.msgTimeout / time.Millisecond),
TLSv1: tlsv1,
MaxRdyCount: p.context.nsqd.options.maxRdyCount,
Version: util.BINARY_VERSION,
MaxMsgTimeout: int64(p.context.nsqd.options.maxMsgTimeout / time.Millisecond),
MsgTimeout: int64(p.context.nsqd.options.msgTimeout / time.Millisecond),
TLSv1: tlsv1,
Deflate: deflate,
DeflateLevel: deflateLevel,
MaxDeflateLevel: p.context.nsqd.options.maxDeflateLevel,
Snappy: snappy,
})
if err != nil {
panic("should never happen")
Expand All @@ -349,6 +369,32 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
}
}

if snappy {
log.Printf("PROTOCOL(V2): [%s] upgrading connection to snappy", client)
err = client.UpgradeSnappy()
if err != nil {
return nil, util.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error())
}

err = p.Send(client, nsq.FrameTypeResponse, okBytes)
if err != nil {
return nil, util.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error())
}
}

if deflate {
log.Printf("PROTOCOL(V2): [%s] upgrading connection to deflate", client)
err = client.UpgradeDeflate(deflateLevel)
if err != nil {
return nil, util.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error())
}

err = p.Send(client, nsq.FrameTypeResponse, okBytes)
if err != nil {
return nil, util.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error())
}
}

return nil, nil
}

Expand Down
Loading

0 comments on commit 57fea7e

Please sign in to comment.