Skip to content

Commit

Permalink
Merge pull request #367 from mreiferson/deprecation_purge_367
Browse files Browse the repository at this point in the history
the great deprecation purge
  • Loading branch information
mreiferson authored Dec 29, 2016
2 parents 866aa08 + 91e0e18 commit c7623b5
Show file tree
Hide file tree
Showing 31 changed files with 176 additions and 796 deletions.
1 change: 0 additions & 1 deletion Godeps
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
github.com/BurntSushi/toml 2dff11163ee667d51dcc066660925a92ce138deb
github.com/bitly/go-hostpool 58b95b10d6ca26723a7f46017b348653b825a8d6
github.com/nsqio/go-nsq 642a3f9935f12cb3b747294318d730f56f4c34b4 # v1.0.6-alpha
github.com/bitly/go-simplejson 18db6e68d8fd9cbf2e8ebe4c81a78b96fd9bf05a
github.com/bmizerany/perks/quantile 6cb9d9d729303ee2628580d9aec5db968da3a607
github.com/mreiferson/go-options 2e28e74c79f31a5a0e807baad7f4429ea2703dcb
github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380
Expand Down
5 changes: 0 additions & 5 deletions apps/nsq_stat/nsq_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ var (
showVersion = flag.Bool("version", false, "print version")
topic = flag.String("topic", "", "NSQ topic")
channel = flag.String("channel", "", "NSQ channel")
statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output")
interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
Expand Down Expand Up @@ -143,10 +142,6 @@ func main() {
}

intvl := *interval
if *statusEvery != -1 {
log.Printf("--status-every is deprecated, use --interval")
intvl = *statusEvery
}
if int64(intvl) <= 0 {
log.Fatal("--interval should be positive")
}
Expand Down
4 changes: 1 addition & 3 deletions apps/nsq_tail/nsq_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ func (th *TailHandler) HandleMessage(m *nsq.Message) error {

func main() {
cfg := nsq.NewConfig()
// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")

flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")
flag.Parse()

if *showVersion {
Expand Down
26 changes: 2 additions & 24 deletions apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ var (
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}

// TODO: remove, deprecated
gzipCompression = flag.Int("gzip-compression", 3, "(deprecated) use --gzip-level, gzip compression level (1 = BestSpeed, 2 = BestCompression, 3 = DefaultCompression)")
)

func init() {
Expand Down Expand Up @@ -313,13 +310,12 @@ func (f *FileLogger) updateFile() {
}

func NewFileLogger(gzipEnabled bool, compressionLevel int, filenameFormat, topic string) (*FileLogger, error) {
// TODO: remove, deprecated, for compat <GZIPREV>
filenameFormat = strings.Replace(filenameFormat, "<GZIPREV>", "<REV>", -1)
if gzipEnabled || *rotateSize > 0 || *rotateInterval > 0 {
if strings.Index(filenameFormat, "<REV>") == -1 {
return nil, errors.New("missing <REV> in --filename-format when gzip or rotation enabled")
}
} else { // remove <REV> as we don't need it
} else {
// remove <REV> as we don't need it
filenameFormat = strings.Replace(filenameFormat, "<REV>", "", -1)
}

Expand Down Expand Up @@ -467,10 +463,7 @@ func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string,
func main() {
cfg := nsq.NewConfig()

// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")

flag.Parse()

if *showVersion {
Expand Down Expand Up @@ -505,21 +498,6 @@ func main() {
log.Fatalf("invalid --gzip-level value (%d), should be 1-9", *gzipLevel)
}

// TODO: remove, deprecated
if hasArg("gzip-compression") {
log.Printf("WARNING: --gzip-compression is deprecated in favor of --gzip-level")
switch *gzipCompression {
case 1:
*gzipLevel = gzip.BestSpeed
case 2:
*gzipLevel = gzip.BestCompression
case 3:
*gzipLevel = gzip.DefaultCompression
default:
log.Fatalf("invalid --gzip-compression value (%d), should be 1,2,3", *gzipCompression)
}
}

cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
cfg.MaxInFlight = *maxInFlight

Expand Down
56 changes: 6 additions & 50 deletions apps/nsq_to_http/nsq_to_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ var (
channel = flag.String("channel", "nsq_to_http", "nsq channel")
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")

numPublishers = flag.Int("n", 100, "number of concurrent publishers")
mode = flag.String("mode", "hostpool", "the upstream request mode options: multicast, round-robin, hostpool (default), epsilon-greedy")
sample = flag.Float64("sample", 1.0, "% of messages to publish (float b/w 0 -> 1)")
// TODO: remove; deprecated in favor of http-client-connect-timeout, http-client-request-timeout
httpTimeout = flag.Duration("http-timeout", 20*time.Second, "timeout for HTTP connect/read/write (each)")
numPublishers = flag.Int("n", 100, "number of concurrent publishers")
mode = flag.String("mode", "hostpool", "the upstream request mode options: round-robin, hostpool (default), epsilon-greedy")
sample = flag.Float64("sample", 1.0, "% of messages to publish (float b/w 0 -> 1)")
httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flag.Duration("http-client-request-timeout", 20*time.Second, "timeout for HTTP request")
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables")
Expand All @@ -53,12 +51,6 @@ var (
postAddrs = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}

// TODO: remove, deprecated
roundRobin = flag.Bool("round-robin", false, "(deprecated) use --mode=round-robin, enable round robin mode")
maxBackoffDuration = flag.Duration("max-backoff-duration", 120*time.Second, "(deprecated) use --consumer-opt=max_backoff_duration,X")
throttleFraction = flag.Float64("throttle-fraction", 1.0, "(deprecated) use --sample=X, publish only a fraction of messages")
httpTimeoutMs = flag.Int("http-timeout-ms", 20000, "(deprecated) use --http-timeout=X, timeout for HTTP connect/read/write (each)")
)

func init() {
Expand Down Expand Up @@ -170,15 +162,13 @@ func hasArg(s string) bool {
}

func main() {
cfg := nsq.NewConfig()
// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")

var publisher Publisher
var addresses app.StringArray
var selectedMode int

cfg := nsq.NewConfig()

flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")
flag.Parse()

if *showVersion {
Expand Down Expand Up @@ -221,44 +211,16 @@ func main() {
}

switch *mode {
case "multicast":
log.Printf("WARNING: multicast mode is deprecated in favor of using separate nsq_to_http on different channels (and will be dropped in a future release)")
selectedMode = ModeAll
case "round-robin":
selectedMode = ModeRoundRobin
case "hostpool", "epsilon-greedy":
selectedMode = ModeHostPool
}

// TODO: remove, deprecated
if hasArg("--round-robin") {
log.Printf("WARNING: --round-robin is deprecated in favor of --mode=round-robin")
selectedMode = ModeRoundRobin
}

// TODO: remove, deprecated
if hasArg("throttle-fraction") {
log.Printf("WARNING: --throttle-fraction is deprecatedin favor of --sample=X")
*sample = *throttleFraction
}

if *sample > 1.0 || *sample < 0.0 {
log.Fatal("ERROR: --sample must be between 0.0 and 1.0")
}

// TODO: remove, deprecated
if hasArg("http-timeout-ms") {
log.Printf("WARNING: --http-timeout-ms is deprecated in favor of --http-timeout=X")
*httpTimeout = time.Duration(*httpTimeoutMs) * time.Millisecond
}

// TODO: remove, deprecated
if hasArg("http-timeout") {
log.Printf("WARNING: --http-timeout is deprecated in favor of --http-client-connect-timeout=X and --http-client-request-timeout=Y")
*httpConnectTimeout = *httpTimeout
*httpRequestTimeout = *httpTimeout
}

termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

Expand All @@ -273,12 +235,6 @@ func main() {
cfg.UserAgent = fmt.Sprintf("nsq_to_http/%s go-nsq/%s", version.Binary, nsq.VERSION)
cfg.MaxInFlight = *maxInFlight

// TODO: remove, deprecated
if hasArg("max-backoff-duration") {
log.Printf("WARNING: --max-backoff-duration is deprecated in favor of --consumer-opt=max_backoff_duration,X")
cfg.MaxBackoffDuration = *maxBackoffDuration
}

consumer, err := nsq.NewConsumer(*topic, *channel, cfg)
if err != nil {
log.Fatal(err)
Expand Down
43 changes: 13 additions & 30 deletions apps/nsq_to_nsq/nsq_to_nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/bitly/go-hostpool"
"github.com/bitly/go-simplejson"
"github.com/bitly/timer_metrics"
"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/app"
Expand Down Expand Up @@ -48,9 +47,6 @@ var (

requireJSONField = flag.String("require-json-field", "", "for JSON messages: only pass messages that contain this field")
requireJSONValue = flag.String("require-json-value", "", "for JSON messages: only pass messages in which the required field has this value")

// TODO: remove, deprecated
maxBackoffDuration = flag.Duration("max-backoff-duration", 120*time.Second, "(deprecated) use --consumer-opt=max_backoff_duration,X")
)

func init() {
Expand Down Expand Up @@ -120,7 +116,7 @@ func (ph *PublishHandler) responder() {
}
}

func (ph *PublishHandler) shouldPassMessage(jsonMsg *simplejson.Json) (bool, bool) {
func (ph *PublishHandler) shouldPassMessage(js map[string]interface{}) (bool, bool) {
pass := true
backoff := false

Expand All @@ -136,7 +132,7 @@ func (ph *PublishHandler) shouldPassMessage(jsonMsg *simplejson.Json) (bool, boo
ph.requireJSONValueParsed = true
}

jsonVal, ok := jsonMsg.CheckGet(*requireJSONField)
v, ok := js[*requireJSONField]
if !ok {
pass = false
if *requireJSONValue != "" {
Expand All @@ -146,13 +142,13 @@ func (ph *PublishHandler) shouldPassMessage(jsonMsg *simplejson.Json) (bool, boo
} else if *requireJSONValue != "" {
// if command-line argument can't convert to float, then it can't match a number
// if it can, also integers (up to 2^53 or so) can be compared as float64
if strVal, err := jsonVal.String(); err == nil {
if strVal != *requireJSONValue {
if s, ok := v.(string); ok {
if s != *requireJSONValue {
pass = false
}
} else if ph.requireJSONValueIsNumber {
floatVal, err := jsonVal.Float64()
if err != nil || ph.requireJSONNumber != floatVal {
f, ok := v.(float64)
if !ok || f != ph.requireJSONNumber {
pass = false
}
} else {
Expand All @@ -165,21 +161,16 @@ func (ph *PublishHandler) shouldPassMessage(jsonMsg *simplejson.Json) (bool, boo
return pass, backoff
}

func filterMessage(jsonMsg *simplejson.Json, rawMsg []byte) ([]byte, error) {
func filterMessage(js map[string]interface{}, rawMsg []byte) ([]byte, error) {
if len(whitelistJSONFields) == 0 {
// no change
return rawMsg, nil
}

msg, err := jsonMsg.Map()
if err != nil {
return nil, errors.New("json is not an object")
}

newMsg := make(map[string]interface{}, len(whitelistJSONFields))

for _, key := range whitelistJSONFields {
value, ok := msg[key]
value, ok := js[key]
if ok {
// avoid printing int as float (go 1.0)
switch tvalue := value.(type) {
Expand Down Expand Up @@ -208,21 +199,21 @@ func (ph *PublishHandler) HandleMessage(m *nsq.Message) error {
msgBody := m.Body

if *requireJSONField != "" || len(whitelistJSONFields) > 0 {
var jsonMsg *simplejson.Json
jsonMsg, err = simplejson.NewJson(m.Body)
var js map[string]interface{}
err = json.Unmarshal(msgBody, &js)
if err != nil {
log.Printf("ERROR: Unable to decode json: %s", m.Body)
log.Printf("ERROR: Unable to decode json: %s", msgBody)
return nil
}

if pass, backoff := ph.shouldPassMessage(jsonMsg); !pass {
if pass, backoff := ph.shouldPassMessage(js); !pass {
if backoff {
return errors.New("backoff")
}
return nil
}

msgBody, err = filterMessage(jsonMsg, m.Body)
msgBody, err = filterMessage(js, msgBody)
if err != nil {
log.Printf("ERROR: filterMessage() failed: %s", err)
return err
Expand Down Expand Up @@ -270,8 +261,6 @@ func main() {
cCfg := nsq.NewConfig()
pCfg := nsq.NewConfig()

// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cCfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cCfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/nsqio/go-nsq#Config)")
flag.Var(&nsq.ConfigFlag{pCfg}, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, see http://godoc.org/github.com/nsqio/go-nsq#Config)")

Expand Down Expand Up @@ -328,12 +317,6 @@ func main() {
cCfg.UserAgent = defaultUA
cCfg.MaxInFlight = *maxInFlight

// TODO: remove, deprecated
if hasArg("max-backoff-duration") {
log.Printf("WARNING: --max-backoff-duration is deprecated in favor of --consumer-opt=max_backoff_duration,X")
cCfg.MaxBackoffDuration = *maxBackoffDuration
}

consumer, err := nsq.NewConsumer(*topic, *channel, cCfg)
if err != nil {
log.Fatal(err)
Expand Down
6 changes: 0 additions & 6 deletions apps/nsqadmin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ var (
showVersion = flagSet.Bool("version", false, "print version string")

httpAddress = flagSet.String("http-address", "0.0.0.0:4171", "<addr>:<port> to listen on for HTTP clients")
templateDir = flagSet.String("template-dir", "", "path to templates directory")

graphiteURL = flagSet.String("graphite-url", "", "graphite HTTP address")
proxyGraphite = flagSet.Bool("proxy-graphite", false, "proxy HTTP requests to graphite")

useStatsdPrefixes = flagSet.Bool("use-statsd-prefixes", true, "(Deprecated - Use --statsd-counter-format and --statsd-gauge-format) Expect statsd prefixed keys in graphite (ie: 'stats.counters.' and 'stats.gauges.')")
statsdCounterFormat = flagSet.String("statsd-counter-format", "stats.counters.%s.count", "The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
statsdGaugeFormat = flagSet.String("statsd-gauge-format", "stats.gauges.%s", "The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
statsdPrefix = flagSet.String("statsd-prefix", "nsq.%s", "prefix used for keys sent to statsd (%s for host replacement, must match nsqd)")
Expand Down Expand Up @@ -61,10 +59,6 @@ func main() {
return
}

if *templateDir != "" {
log.Printf("WARNING: --template-dir is deprecated and will be removed in the next release (templates are now compiled into the binary)")
}

exitChan := make(chan int)
signalChan := make(chan os.Signal, 1)
go func() {
Expand Down
2 changes: 0 additions & 2 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Duration("max-msg-timeout", opts.MaxMsgTimeout, "maximum duration before a message will timeout")
flagSet.Int64("max-msg-size", opts.MaxMsgSize, "maximum size of a single message in bytes")
flagSet.Duration("max-req-timeout", opts.MaxReqTimeout, "maximum requeuing timeout for a message")
// remove, deprecated
flagSet.Int64("max-message-size", opts.MaxMsgSize, "(deprecated use --max-msg-size) maximum size of a single message in bytes")
flagSet.Int64("max-body-size", opts.MaxBodySize, "maximum size of a single command body")

// client overridable configuration options
Expand Down
3 changes: 1 addition & 2 deletions bench/bench_channels/bench_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func subWorker(n int, tcpAddr string,
conn.Write(nsq.MagicV2)
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
ci := make(map[string]interface{})
ci["short_id"] = "test"
ci["long_id"] = "test"
ci["client_id"] = "test"
cmd, _ := nsq.Identify(ci)
cmd.WriteTo(rw)
nsq.Subscribe(topic, channel).WriteTo(rw)
Expand Down
3 changes: 1 addition & 2 deletions bench/bench_reader/bench_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ func subWorker(td time.Duration, workers int, tcpAddr string, topic string, chan
conn.Write(nsq.MagicV2)
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
ci := make(map[string]interface{})
ci["short_id"] = "test"
ci["long_id"] = "test"
ci["client_id"] = "test"
cmd, _ := nsq.Identify(ci)
cmd.WriteTo(rw)
nsq.Subscribe(topic, channel).WriteTo(rw)
Expand Down
Loading

0 comments on commit c7623b5

Please sign in to comment.