Skip to content

Commit

Permalink
implement prefetch_count on AMQP subscribers;
Browse files Browse the repository at this point in the history
+ add PositiveInteger prompt validator
  • Loading branch information
lukasjarosch committed Jul 19, 2019
1 parent cc9c60f commit caee155
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 8 deletions.
4 changes: 2 additions & 2 deletions a_main-packr.go

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,23 @@ require (
github.com/Masterminds/sprig v2.18.0+incompatible
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/fatih/structtag v1.0.0 // indirect
github.com/go-godin/rabbitmq v0.0.0-20190718163615-1e522570d2b0
github.com/go-godin/log v0.0.0-20190716173926-b62a2fca0801
github.com/go-godin/rabbitmq v0.0.0-20190719103636-6b979908c7b1
github.com/go-kit/kit v0.9.0
github.com/go-logfmt/logfmt v0.4.0 // indirect
github.com/gobuffalo/genny v0.0.0-20190403191548-3ca520ef0d9e // indirect
github.com/gobuffalo/packr v1.26.0
github.com/gobuffalo/packr/v2 v2.4.0
github.com/google/uuid v1.1.1
github.com/huandu/xstrings v1.2.0 // indirect
github.com/iancoleman/strcase v0.0.0-20190422225806-e506e3ef7365
github.com/imdario/mergo v0.3.7 // indirect
github.com/manifoldco/promptui v0.3.2
github.com/mitchellh/mapstructure v1.1.2
github.com/pelletier/go-toml v1.3.0 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.0.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.3.2
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
github.com/vetcher/go-astra v1.2.0
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c // indirect
gopkg.in/errgo.v2 v2.1.0
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ github.com/fatih/structtag v1.0.0 h1:pTHj65+u3RKWYPSGaU290FpI/dXxTaHdVwVwbcPKmEc
github.com/fatih/structtag v1.0.0/go.mod h1:IKitwq45uXL/yqi5mYghiD3w9H6eTOvI9vnk8tXMphA=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-godin/grpc-metadata v0.0.0-20190712125646-04ceb265ac8a/go.mod h1:8l5js6oXPPupjhdJrmYVaJuidrUyr+Q3V75iZJAZ974=
github.com/go-godin/grpc-metadata v0.0.0-20190717081641-be8cff64989a h1:M7HE/lFQQ8zczy6/U6Rl+f+8Do0EdHBsmywWiapF4pk=
github.com/go-godin/grpc-metadata v0.0.0-20190717081641-be8cff64989a/go.mod h1:8l5js6oXPPupjhdJrmYVaJuidrUyr+Q3V75iZJAZ974=
github.com/go-godin/log v0.0.0-20190716173926-b62a2fca0801 h1:pL0q4o9YI3KgpazoHN9uYUp3bsE2GBemMMOkREy/zow=
github.com/go-godin/log v0.0.0-20190716173926-b62a2fca0801/go.mod h1:foeoqZOKhUxkgzb1i/Mn7DPj+3/NKFuLCBN1DxFhv8Y=
github.com/go-godin/rabbitmq v0.0.0-20190716163246-e909e622b8c7 h1:dYvuqVELcsEcrUD1S+ztnmV5uCis6uh92oFxVcDUQPM=
github.com/go-godin/rabbitmq v0.0.0-20190716163246-e909e622b8c7/go.mod h1:RC4VtaodR+xZa0/+CDeM1cXCSmkxnOYKvtDntFA4lIw=
github.com/go-godin/rabbitmq v0.0.0-20190717074815-2a37ca6d6428 h1:i0jRrHiKVPq9mTEXdhR5DY0B4enu1Ma3ZBxEfDEwQt8=
Expand All @@ -57,6 +60,8 @@ github.com/go-godin/rabbitmq v0.0.0-20190718123308-7135d018437c h1:WfUK35XMX0AwA
github.com/go-godin/rabbitmq v0.0.0-20190718123308-7135d018437c/go.mod h1:fO91iywDopq8wdK/EDvQkIXf1pmZBgEgI7Mkg+tiFVc=
github.com/go-godin/rabbitmq v0.0.0-20190718163615-1e522570d2b0 h1:bTpzUq6qrARFMshpB9nguQNRvi18GS20/U2iIxeZ4pA=
github.com/go-godin/rabbitmq v0.0.0-20190718163615-1e522570d2b0/go.mod h1:MJrnf5cgsXgUIRmkmcA57XW7pRWlXSXRqZlS0NcdeAY=
github.com/go-godin/rabbitmq v0.0.0-20190719103636-6b979908c7b1 h1:NlMN7OBauR/Pcail4s2253nrSaOqlKlMNrXFlPqDtiI=
github.com/go-godin/rabbitmq v0.0.0-20190719103636-6b979908c7b1/go.mod h1:MJrnf5cgsXgUIRmkmcA57XW7pRWlXSXRqZlS0NcdeAY=
github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
Expand Down Expand Up @@ -131,7 +136,9 @@ github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaR
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down Expand Up @@ -160,6 +167,7 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nicksnyder/go-i18n v2.0.2+incompatible h1:Xt6dluut3s2zBUha8/3sj6atWMQbFioi9OMqUGH9khg=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.3.0 h1:e5+lF2E4Y2WCIxBefVowBuB0iHrUH4HZ8q+6mGF7fJc=
Expand Down Expand Up @@ -226,6 +234,7 @@ golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf h1:pZmAoqqjjxp5rjcjFhHcvD
golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3 h1:x/bBzNauLQAlE3fLku/xy92Y8QwKX5HZymrMz2IiKFc=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down Expand Up @@ -256,6 +265,7 @@ golang.org/x/tools v0.0.0-20190404132500-923d25813098/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f h1:+zypR5600WBcnJgA2nzZAsBlM8cArEGa8dhhiNE4u3w=
golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190711191110-9a621aea19f8 h1:VZick+NwcqlXXVsD1iFr4Wo6F1FgBbnM4AOMzhwKQ7w=
golang.org/x/tools v0.0.0-20190711191110-9a621aea19f8/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
Expand Down
14 changes: 14 additions & 0 deletions internal/bundle/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bundle

import (
"fmt"
"strconv"
"strings"

"github.com/iancoleman/strcase"
Expand Down Expand Up @@ -112,6 +113,19 @@ func promptSubscriberValues(cfg *SubscriberConfiguration) (err error) {
}
cfg.RabbitMQ.Queue.Name = queue

// Prefetch count
p = prompt.NewPrompt(
"Prefetch count",
"10",
prompt.Validate(prompt.PositiveInteger()),
)
prefetchCount, err := p.Run()
if err != nil {
return err
}
count, _ := strconv.Atoi(prefetchCount)
cfg.RabbitMQ.PrefetchCount = count

// Protobuf module
p = prompt.NewPrompt(
"Import of the target protobuf",
Expand Down
16 changes: 16 additions & 0 deletions internal/prompt/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package prompt

import (
"fmt"
"strconv"
"strings"
"github.com/manifoldco/promptui"
"regexp"
Expand All @@ -18,6 +19,21 @@ func Validate(validators ...promptui.ValidateFunc) promptui.ValidateFunc {
}
}

func PositiveInteger() promptui.ValidateFunc {
return func(s string) error {
val, err := strconv.Atoi(s);
if err != nil {
return fmt.Errorf("only positive integers")
}

if val < 0 {
return fmt.Errorf("only positive integers")
}

return nil
}
}

func MinLengthThree() promptui.ValidateFunc {
return func(s string) error {
if len(s) < 3 {
Expand Down
2 changes: 2 additions & 0 deletions templates/amqp_subscriber_init.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func Subscriptions(conn *rabbitmq.RabbitMQ) SubscriberSet {
Exchange: "{{ .Subscription.Exchange }}",
Topic: "{{ .Subscription.Topic }}",
AutoAck: {{ .Subscription.AutoAck }},
PrefetchCount: {{ .Subscription.PrefetchCount }},
Queue: rabbitmq.SubscriptionQueue{
AutoDelete: {{ .Subscription.Queue.AutoDelete }},
Durable: {{ .Subscription.Queue.Durable }},
Expand Down Expand Up @@ -65,6 +66,7 @@ func (ss SubscriberSet) {{ .Handler }}(logger log.Logger, usecase service.{{ $se
"topic", ss.{{ untitle .Handler }}.Subscription.Topic,
"queue", ss.{{ untitle .Handler }}.Subscription.Queue.Name,
"exchange", ss.{{ untitle .Handler }}.Subscription.Exchange,
"prefetch_count", ss.{{ untitle .Handler }}.Subscription.PrefetchCount,
"transport", "AMQP",
)

Expand Down
2 changes: 1 addition & 1 deletion templates/partials/amqp/encode_decode/subscribe_decode.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// you added the subscriber, you need to fix the imports by adding:
// {{ untitle .Handler }}Proto "{{ .Protobuf.Import }}"
func {{ .Handler }}Decoder(delivery *rabbitmq.Delivery) (decoded interface{}, err error) {
var event *{{ untitle .Handler }}Proto.{{ .Protobuf.Message }}
event := &{{ untitle .Handler }}Proto.{{ .Protobuf.Message }}{}

if err := proto.Unmarshal(delivery.Body, event); err != nil {
return nil, fmt.Errorf("failed to unmarshal Delivery into *{{ untitle .Handler }}Proto.{{ .Protobuf.Message }}: %s", err)
Expand Down

0 comments on commit caee155

Please sign in to comment.