Skip to content

Commit

Permalink
implement subscription decoders, improve generated handler
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasjarosch committed Jul 19, 2019
1 parent bc318ed commit 65257f7
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 24 deletions.
7 changes: 4 additions & 3 deletions a_main-packr.go

Large diffs are not rendered by default.

27 changes: 26 additions & 1 deletion internal/generate/amqp_encode_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,29 @@ func (s *AmqpEncodeDecode) GenerateMissing() error {
// encoders are used by publishers, decoders are used by subscribers
// e.g. UserCreatedEncoder => UserCreated; it's the required config key and thus the template context to use
if strings.Contains(missingFunction, "Decoder") {
// TODO: subscriber
var ctx template.Subscriber

name := strings.Replace(missingFunction, "Decoder", "", 1)
for _, registeredSubscriber := range s.opts.Context.Service.Subscriber {
if registeredSubscriber.Handler == name {
ctx = registeredSubscriber
break
}
}

tpl := template.NewPartial(templateName, true)
data, err := tpl.Render(s.box, ctx)
if err != nil {
return errors.Wrap(err, "failed to render partial")
}

writer := template.NewFileAppendWriter(s.opts.TargetFile, data)
if err := writer.Write(); err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to append-write to %s", s.TargetPath()))
}

logrus.Infof("added missing subscribe decoder to %s: %s", s.opts.TargetFile, missingFunction)

} else if strings.Contains(missingFunction, "Encoder") {
var ctx template.Publisher

Expand Down Expand Up @@ -114,6 +136,9 @@ func (r *AmqpEncodeDecode) templateFromFunction(name string) (templateName strin
if strings.Contains(name, "Encoder") {
return "amqp_publish_encode", nil
}
if strings.Contains(name, "Decoder") {
return "amqp_subscribe_decode", nil
}

return "", fmt.Errorf("function %s does not have a template associated", name)
}
5 changes: 5 additions & 0 deletions internal/parse/amqp_encode_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,10 @@ func (p *amqpEncodeDecodeParser) RequiredFunctions() []string {
requiredFunctions = append(requiredFunctions, fmt.Sprintf("%sEncoder", pub))
}

// subscribers
for _, sub := range p.subscriberNames {
requiredFunctions = append(requiredFunctions, fmt.Sprintf("%sDecoder", sub))
}

return requiredFunctions
}
1 change: 1 addition & 0 deletions internal/template/partial.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var PartialTemplates = map[string]string{
"grpc_response_decoder": "partials/grpc/encode_decode/response_decoder.tpl",
"grpc_response_encoder": "partials/grpc/encode_decode/response_encoder.tpl",
"amqp_publish_encode": "partials/amqp/encode_decode/publish_encode.tpl",
"amqp_subscribe_decode": "partials/amqp/encode_decode/subscribe_decode.tpl",
"amqp_subscribe_decoder_interfaces": "partials/amqp/encode_decode/subscribe_decoder_interfaces.tpl",
}

Expand Down
15 changes: 10 additions & 5 deletions templates/amqp_encode_decode.tpl
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package amqp

import (
"fmt"

"github.com/go-godin/rabbitmq"

pb "{{ .Protobuf.Package }}"
)

{{- if gt (len .Service.Subscriber) 0 }}
type SubcriptionDecoder func(delivery *rabbitmq.Delivery) (decoded interface{}, err error)
{{- end }}

{{- if gt (len .Service.Publisher) 0 }}
{{- range .Service.Publisher }}
{{- template "amqp_publish_encode" . }}
{{- template "amqp_publish_encode" . -}}
{{- end }}
{{- end }}

{{- if gt (len .Service.Subscriber) 0 }}
{{- range .Service.Subscriber }}
{{- template "amqp_subscribe_decode" . -}}
{{- end }}
{{- end }}

39 changes: 25 additions & 14 deletions templates/amqp_subscriber_handler.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"github.com/go-godin/log"
"github.com/go-godin/rabbitmq"
grpc_metadata "github.com/go-godin/grpc-metadata"
grpcMetadata "github.com/go-godin/grpc-metadata"
"github.com/pkg/errors"

"{{ .Service.Module }}/internal/service"
"{{ .Service.Module }}/internal/service"
{{- range .Service.Subscriber }}
{{ untitle .Handler }}Proto "{{ .Protobuf.Import }}"
{{- end }}
Expand All @@ -22,30 +23,40 @@ subscriber only, so we can safely assume that there is only one element in the s
// {{ .Handler }} is responsible of handling all incoming AMQP messages with routing key '{{ .Subscription.Topic }}'
func {{ .Handler }}Subscriber(logger log.Logger, usecase service.{{ title $serviceName }}, decoder rabbitmq.SubscriberDecoder) rabbitmq.SubscriptionHandler {
return func(ctx context.Context, delivery *rabbitmq.Delivery) {
// the requestId is injected into the context and should be attached on every log
logger = logger.With(string(grpc_metadata.RequestID), ctx.Value(string(grpc_metadata.RequestID)))
logger = logger.With(string(grpcMetadata.RequestID), ctx.Value(string(grpcMetadata.RequestID)))
event, err := decoder(delivery)
event = event.({{ untitle .Handler }}Proto.{{ .Protobuf.Message }})
event, err := decode{{ .Handler }}(delivery, decoder, logger)
if err != nil {
logger.Error("failed to decode '{{ .Subscription.Topic }}' event", "err", err)
delivery.NackDelivery(false, false, "{{ .Subscription.Topic }}")
delivery.IncrementTransportErrorCounter("{{ .Subscription.Topic }}")
return
}

// TODO: Handle {{ .Subscription.Topic }} subscription
_ = event // remove this line, it just keeps the compiler calm until you start using the event :)

// TODO: Handle {{ .Subscription.Topic }} subscription here
/*
If you want to NACK the delivery, use `delivery.NackDelivery()` instead of Nack().
This will ensure that the prometheus amqp_nack_counter is increased.

Godins delivery wrapper also provides a `delivery.IncrementTransportErrorCounter()` method to grant
you access to the amqp_transport_error metric. Call it if the message is incomplete or cannot
be unmarshalled for any reason.
*/

_ = delivery.Ack(false)
}
}

// decode{{ .Handler }} cleans up the actual handler by providing a cleaner interface for decoding incoming {{ .Protobuf.Message }} deliveries.
// It will also take care of logging errors and handling metrics.
func decode{{ .Handler }}(delivery *rabbitmq.Delivery, decoder rabbitmq.SubscriberDecoder, logger log.Logger) (*{{ untitle .Handler }}Proto.{{ .Protobuf.Message }}, error) {
event, err := decoder(delivery)
if err != nil {
if err2 := delivery.NackDelivery(false, false, "{{ .Subscription.Topic }}"); err2 != nil {
err = errors.Wrap(err, err2.Error())
}
delivery.IncrementTransportErrorCounter("{{ .Subscription.Topic }}")
logger.Error("failed to decode {{ .Protobuf.Message }}", "err", err)
return nil, err
}
logger.Debug("decoded {{ .Protobuf.Message }}", "event", event)

return event.(*{{ untitle .Handler }}Proto.{{ .Protobuf.Message }}), nil
}
{{ end }}

2 changes: 1 addition & 1 deletion templates/amqp_subscriber_init.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func Subscriptions(conn *rabbitmq.RabbitMQ) SubscriberSet {
// The RequestID middlware will extract the requestId from the delivery header or generate a new one. The requestId is
// then mad available through the context.
func (ss SubscriberSet) {{ .Handler }}(logger log.Logger, usecase service.{{ $serviceName }}) error {
handler := subscriber.{{ .Handler }}Subscriber(logger, usecase)
handler := subscriber.{{ .Handler }}Subscriber(logger, usecase, {{ .Handler }}Decoder)
handler = middleware.Logging(logger, "{{ .Subscription.Topic }}", handler)
handler = middleware.PrometheusInstrumentation("{{ .Subscription.Topic }}", handler)
handler = middleware.RequestID(handler)
Expand Down
6 changes: 6 additions & 0 deletions templates/partials/amqp/encode_decode/subscribe_decode.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{- define "amqp_subscribe_decode" }}
// {{ .Handler }}Decoder
func {{ .Handler }}Decoder(delivery *rabbitmq.Delivery) (decoded interface{}, err error) {
return nil, fmt.Errorf("{{ .Handler }}Decoder is not implemented")
}
{{- end }}

0 comments on commit 65257f7

Please sign in to comment.