Skip to content

Commit

Permalink
subscriber bundle now provides it's config; add subscription decoder …
Browse files Browse the repository at this point in the history
…type
  • Loading branch information
lukasjarosch committed Jul 18, 2019
1 parent f4ff161 commit bc318ed
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 47 deletions.
7 changes: 4 additions & 3 deletions a_main-packr.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Masterminds/sprig v2.18.0+incompatible
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/fatih/structtag v1.0.0 // indirect
github.com/go-godin/rabbitmq v0.0.0-20190718123308-7135d018437c
github.com/go-godin/rabbitmq v0.0.0-20190718163615-1e522570d2b0
github.com/go-kit/kit v0.9.0
github.com/go-logfmt/logfmt v0.4.0 // indirect
github.com/gobuffalo/genny v0.0.0-20190403191548-3ca520ef0d9e // indirect
Expand Down
19 changes: 19 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/goutils v1.1.0 h1:zukEsf/1JZwCMgHiK3GZftabmxiCw4apj3a28RPBiVg=
Expand Down Expand Up @@ -36,6 +37,8 @@ github.com/fatih/structtag v1.0.0 h1:pTHj65+u3RKWYPSGaU290FpI/dXxTaHdVwVwbcPKmEc
github.com/fatih/structtag v1.0.0/go.mod h1:IKitwq45uXL/yqi5mYghiD3w9H6eTOvI9vnk8tXMphA=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-godin/grpc-metadata v0.0.0-20190717081641-be8cff64989a h1:M7HE/lFQQ8zczy6/U6Rl+f+8Do0EdHBsmywWiapF4pk=
github.com/go-godin/grpc-metadata v0.0.0-20190717081641-be8cff64989a/go.mod h1:8l5js6oXPPupjhdJrmYVaJuidrUyr+Q3V75iZJAZ974=
github.com/go-godin/rabbitmq v0.0.0-20190716163246-e909e622b8c7 h1:dYvuqVELcsEcrUD1S+ztnmV5uCis6uh92oFxVcDUQPM=
github.com/go-godin/rabbitmq v0.0.0-20190716163246-e909e622b8c7/go.mod h1:RC4VtaodR+xZa0/+CDeM1cXCSmkxnOYKvtDntFA4lIw=
github.com/go-godin/rabbitmq v0.0.0-20190717074815-2a37ca6d6428 h1:i0jRrHiKVPq9mTEXdhR5DY0B4enu1Ma3ZBxEfDEwQt8=
Expand All @@ -52,6 +55,8 @@ github.com/go-godin/rabbitmq v0.0.0-20190718123150-841efeb3fbb8 h1:bP8g+EOCwfzEo
github.com/go-godin/rabbitmq v0.0.0-20190718123150-841efeb3fbb8/go.mod h1:fO91iywDopq8wdK/EDvQkIXf1pmZBgEgI7Mkg+tiFVc=
github.com/go-godin/rabbitmq v0.0.0-20190718123308-7135d018437c h1:WfUK35XMX0AwA+Q8yxvCRkHLvu0XAy6YyWypJL3Qsqg=
github.com/go-godin/rabbitmq v0.0.0-20190718123308-7135d018437c/go.mod h1:fO91iywDopq8wdK/EDvQkIXf1pmZBgEgI7Mkg+tiFVc=
github.com/go-godin/rabbitmq v0.0.0-20190718163615-1e522570d2b0 h1:bTpzUq6qrARFMshpB9nguQNRvi18GS20/U2iIxeZ4pA=
github.com/go-godin/rabbitmq v0.0.0-20190718163615-1e522570d2b0/go.mod h1:MJrnf5cgsXgUIRmkmcA57XW7pRWlXSXRqZlS0NcdeAY=
github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
Expand Down Expand Up @@ -91,11 +96,14 @@ github.com/gobuffalo/packr/v2 v2.4.0/go.mod h1:ra341gygw9/61nSjAbfwcwh8IrYL4WmR4
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754 h1:tpom+2CJmpzAWj5/VEHync2rJGi+epHNIeRSWjzGA+4=
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/lint v0.0.0-20181026193005-c67002cb31c3 h1:I4BOK3PBMjhWfQM2zPJKK7lOBGsrsvOB7kBELP33hiE=
github.com/golang/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/shlex v0.0.0-20181106134648-c34317bd91bf h1:7+FW5aGwISbqUtkfmIpZJGRgNFg2ioYPvFaUxdqpDsg=
github.com/google/shlex v0.0.0-20181106134648-c34317bd91bf/go.mod h1:RpwtwJQFrIEPstU94h88MWPXP2ektJZ8cZ0YntAmXiE=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
Expand Down Expand Up @@ -218,8 +226,11 @@ golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf h1:pZmAoqqjjxp5rjcjFhHcvD
golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3 h1:x/bBzNauLQAlE3fLku/xy92Y8QwKX5HZymrMz2IiKFc=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -238,11 +249,18 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20181122213734-04b5d21e00f1/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190329151228-23e29df326fe/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190404132500-923d25813098 h1:MtqjsZmyGRgMmLUgxnmMJ6RYdvd2ib8ipiayHhqSxs4=
golang.org/x/tools v0.0.0-20190404132500-923d25813098/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f h1:+zypR5600WBcnJgA2nzZAsBlM8cArEGa8dhhiNE4u3w=
golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190711191110-9a621aea19f8/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.22.0 h1:J0UbZOIrCAl+fpTOf8YLs4dJo8L/owV4LYVtAXQoPkw=
google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c h1:vTxShRUnK60yd8DZU+f95p1zSLj814+5CuEh7NjF2/Y=
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA=
Expand All @@ -254,3 +272,4 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
1 change: 1 addition & 0 deletions internal/bundle/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type publisher struct {
Publishing rabbitmq.Publishing
}


func InitializePublisher() (*publisher, error) {
pub := rabbitmq.Publishing{}
protoMessage, err := promptPublisherValues(&pub)
Expand Down
67 changes: 49 additions & 18 deletions internal/bundle/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,46 @@ import (

const SubscriberKey = "transport.amqp.subscriber"

type SubscriberConfiguration struct {
RabbitMQ rabbitmq.Subscription
HandlerName string `json:"handler_name" mapstructure:"handler_name"`
Protobuf struct {
GoModule string `json:"go_module" mapstructure:"go_module"`
MessageName string `json:"message_name" mapstructure:"message_name"`
}
}

type subscriber struct {
Subscription rabbitmq.Subscription
HandlerName string `json:"handler_name"`
Configuration SubscriberConfiguration
}

func InitializeSubscriber() (*subscriber, error) {
sub := rabbitmq.Subscription{}
err := promptSubscriberValues(&sub)
cfg := SubscriberConfiguration{}
err := promptSubscriberValues(&cfg)
if err != nil {
return nil, err
}

subscriberKey := strings.Replace(sub.Topic, ".", "_", -1)
subscriberKey := strings.Replace(cfg.RabbitMQ.Topic, ".", "_", -1)
subscriberKey = strings.ToLower(subscriberKey)

// defaults
sub.AutoAck = false
sub.Queue.Durable = true
sub.Queue.NoWait = false
sub.Queue.Exclusive = false
sub.Queue.AutoDelete = false
cfg.HandlerName = strcase.ToCamel(subscriberKey)
cfg.RabbitMQ.AutoAck = false
cfg.RabbitMQ.Queue.Durable = true
cfg.RabbitMQ.Queue.NoWait = false
cfg.RabbitMQ.Queue.Exclusive = false
cfg.RabbitMQ.Queue.AutoDelete = false

confSub := config.GetStringMap(SubscriberKey)
if _, ok := confSub[subscriberKey]; ok == true {
return nil, fmt.Errorf("subscriber '%s' is already registered", subscriberKey)
}
confSub[subscriberKey] = sub
confSub[subscriberKey] = cfg
config.Set(SubscriberKey, confSub)
godin.SaveConfiguration()

return &subscriber{
Subscription: sub,
HandlerName: subscriberKey,
Configuration: cfg,
}, nil
}

Expand All @@ -68,7 +75,7 @@ func SubscriberFileName(topic string) string {
return fmt.Sprintf("%s.go", name)
}

func promptSubscriberValues(sub *rabbitmq.Subscription) (err error) {
func promptSubscriberValues(cfg *SubscriberConfiguration) (err error) {
// Topic
p := prompt.NewPrompt(
"AMQP subscription Topic",
Expand All @@ -79,7 +86,7 @@ func promptSubscriberValues(sub *rabbitmq.Subscription) (err error) {
if err != nil {
return err
}
sub.Topic = topic
cfg.RabbitMQ.Topic = topic

// Exchange
p = prompt.NewPrompt(
Expand All @@ -91,7 +98,7 @@ func promptSubscriberValues(sub *rabbitmq.Subscription) (err error) {
if err != nil {
return err
}
sub.Exchange = exchange
cfg.RabbitMQ.Exchange = exchange

// Queue
p = prompt.NewPrompt(
Expand All @@ -103,7 +110,31 @@ func promptSubscriberValues(sub *rabbitmq.Subscription) (err error) {
if err != nil {
return err
}
sub.Queue.Name = queue
cfg.RabbitMQ.Queue.Name = queue

// Protobuf module
p = prompt.NewPrompt(
"Import of the target protobuf",
"github.com/user/some-other-service-protobuf",
prompt.Validate(),
)
protoModule, err := p.Run()
if err != nil {
return err
}
cfg.Protobuf.GoModule = protoModule

// Protobuf message
p = prompt.NewPrompt(
"Name of the protobuf message of the subscribed event",
"AnotherServiceMessage",
prompt.Validate(),
)
protoMessage, err := p.Run()
if err != nil {
return err
}
cfg.Protobuf.MessageName = protoMessage

return nil
}
8 changes: 7 additions & 1 deletion internal/generate/amqp_encode_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,17 @@ func (s *AmqpEncodeDecode) Update() error {

func (s *AmqpEncodeDecode) GenerateMissing() error {
var publishers []string
var subscribers []string

for _, pub := range s.opts.Context.Service.Publisher {
publishers = append(publishers, pub.Name)
}

implementation := parse.NewAmqpEncodeDecodeParser(s.opts.TargetFile, publishers)
for _, sub := range s.opts.Context.Service.Subscriber {
subscribers = append(subscribers, sub.Handler)
}

implementation := parse.NewAmqpEncodeDecodeParser(s.opts.TargetFile, publishers, subscribers)
if err := implementation.Parse(); err != nil {
return errors.Wrap(err, "Parse")
}
Expand Down
6 changes: 4 additions & 2 deletions internal/parse/amqp_encode_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ import (
type amqpEncodeDecodeParser struct {
baseParser
publisherNames []string
subscriberNames []string
ImplementedFunctions []string
MissingFunctions []string
formatStrings []string
}

func NewAmqpEncodeDecodeParser(path string, publisherNames []string) *amqpEncodeDecodeParser {
func NewAmqpEncodeDecodeParser(path string, publisherNames []string, subscriberNames []string) *amqpEncodeDecodeParser {
return &amqpEncodeDecodeParser{
baseParser: baseParser{
Path: path,
},
publisherNames: publisherNames,
publisherNames: publisherNames,
subscriberNames: subscriberNames,
}
}

Expand Down
20 changes: 14 additions & 6 deletions internal/template/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ func NewContextFromConfig() Context {
sub := config.GetStringMap(bundle.SubscriberKey)
if len(sub) > 0 {
for x := range sub {
s := &rabbitmq.Subscription{}
mapstructure.Decode(sub[x], s)
subscribers = append(subscribers, Subscriber{
Subscription: *s,
Handler: bundle.SubscriberHandlerName(s.Topic),
})
cfg := &bundle.SubscriberConfiguration{}
mapstructure.Decode(sub[x], cfg)

sub := Subscriber{}
sub.Subscription = cfg.RabbitMQ
sub.Handler = cfg.HandlerName
sub.Protobuf.Import = cfg.Protobuf.GoModule
sub.Protobuf.Message = cfg.Protobuf.MessageName

subscribers = append(subscribers, sub)
}
}

Expand Down Expand Up @@ -174,6 +178,10 @@ type Docker struct {
type Subscriber struct {
Handler string
Subscription rabbitmq.Subscription
Protobuf struct {
Import string
Message string
}
}

type Publisher struct {
Expand Down
29 changes: 15 additions & 14 deletions internal/template/partial.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ import (
)

var PartialTemplates = map[string]string{
"service_method": "partials/service_method.tpl",
"logging_method": "partials/logging_method.tpl",
"request": "partials/request.tpl",
"endpoint": "partials/endpoint.tpl",
"response": "partials/response.tpl",
"grpc_encode_request": "partials/grpc/request_response/encode_request.tpl",
"grpc_encode_response": "partials/grpc/request_response/encode_response.tpl",
"grpc_decode_request": "partials/grpc/request_response/decode_request.tpl",
"grpc_decode_response": "partials/grpc/request_response/decode_response.tpl",
"grpc_request_decoder": "partials/grpc/encode_decode/request_decoder.tpl",
"grpc_request_encoder": "partials/grpc/encode_decode/request_encoder.tpl",
"grpc_response_decoder": "partials/grpc/encode_decode/response_decoder.tpl",
"grpc_response_encoder": "partials/grpc/encode_decode/response_encoder.tpl",
"amqp_publish_encode": "partials/amqp/encode_decode/publish_encode.tpl",
"service_method": "partials/service_method.tpl",
"logging_method": "partials/logging_method.tpl",
"request": "partials/request.tpl",
"endpoint": "partials/endpoint.tpl",
"response": "partials/response.tpl",
"grpc_encode_request": "partials/grpc/request_response/encode_request.tpl",
"grpc_encode_response": "partials/grpc/request_response/encode_response.tpl",
"grpc_decode_request": "partials/grpc/request_response/decode_request.tpl",
"grpc_decode_response": "partials/grpc/request_response/decode_response.tpl",
"grpc_request_decoder": "partials/grpc/encode_decode/request_decoder.tpl",
"grpc_request_encoder": "partials/grpc/encode_decode/request_encoder.tpl",
"grpc_response_decoder": "partials/grpc/encode_decode/response_decoder.tpl",
"grpc_response_encoder": "partials/grpc/encode_decode/response_encoder.tpl",
"amqp_publish_encode": "partials/amqp/encode_decode/publish_encode.tpl",
"amqp_subscribe_decoder_interfaces": "partials/amqp/encode_decode/subscribe_decoder_interfaces.tpl",
}

type Partial struct {
Expand Down
5 changes: 5 additions & 0 deletions templates/amqp_encode_decode.tpl
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package amqp

import (
"github.com/go-godin/rabbitmq"
pb "{{ .Protobuf.Package }}"
)

{{- if gt (len .Service.Subscriber) 0 }}
type SubcriptionDecoder func(delivery *rabbitmq.Delivery) (decoded interface{}, err error)
{{- end }}

{{- if gt (len .Service.Publisher) 0 }}
{{- range .Service.Publisher }}
{{- template "amqp_publish_encode" . }}
{{- end }}
{{- end }}

14 changes: 13 additions & 1 deletion templates/amqp_subscriber_handler.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
grpc_metadata "github.com/go-godin/grpc-metadata"

"{{ .Service.Module }}/internal/service"
{{- range .Service.Subscriber }}
{{ untitle .Handler }}Proto "{{ .Protobuf.Import }}"
{{- end }}
)

{{- $serviceName := title .Service.Name -}}
Expand All @@ -17,11 +20,20 @@ subscriber only, so we can safely assume that there is only one element in the s
*/}}
{{ range .Service.Subscriber }}
// {{ .Handler }} is responsible of handling all incoming AMQP messages with routing key '{{ .Subscription.Topic }}'
func {{ .Handler }}(logger log.Logger, usecase service.{{ title $serviceName }}) rabbitmq.SubscriptionHandler {
func {{ .Handler }}Subscriber(logger log.Logger, usecase service.{{ title $serviceName }}, decoder rabbitmq.SubscriberDecoder) rabbitmq.SubscriptionHandler {
return func(ctx context.Context, delivery *rabbitmq.Delivery) {
// the requestId is injected into the context and should be attached on every log
logger = logger.With(string(grpc_metadata.RequestID), ctx.Value(string(grpc_metadata.RequestID)))
event, err := decoder(delivery)
event = event.({{ untitle .Handler }}Proto.{{ .Protobuf.Message }})
if err != nil {
logger.Error("failed to decode '{{ .Subscription.Topic }}' event", "err", err)
delivery.NackDelivery(false, false, "{{ .Subscription.Topic }}")
delivery.IncrementTransportErrorCounter("{{ .Subscription.Topic }}")
return
}

// TODO: Handle {{ .Subscription.Topic }} subscription
/*
If you want to NACK the delivery, use `delivery.NackDelivery()` instead of Nack().
Expand Down
2 changes: 1 addition & 1 deletion templates/amqp_subscriber_init.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func Subscriptions(conn *rabbitmq.RabbitMQ) SubscriberSet {
// The RequestID middlware will extract the requestId from the delivery header or generate a new one. The requestId is
// then mad available through the context.
func (ss SubscriberSet) {{ .Handler }}(logger log.Logger, usecase service.{{ $serviceName }}) error {
handler := subscriber.{{ .Handler }}(logger, usecase)
handler := subscriber.{{ .Handler }}Subscriber(logger, usecase)
handler = middleware.Logging(logger, "{{ .Subscription.Topic }}", handler)
handler = middleware.PrometheusInstrumentation("{{ .Subscription.Topic }}", handler)
handler = middleware.RequestID(handler)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{- define "amqp_subscribe_decoder_interface" }}
{{- end }}

0 comments on commit bc318ed

Please sign in to comment.