Skip to content

Commit

Permalink
started working on the amqp transport and subscriber bundle
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas <lukas.jarosch@mail.com>
  • Loading branch information
lukasjarosch committed Jun 25, 2019
1 parent aeb4a56 commit 464766a
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea
bin/
dist
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Examples are now hosted in a separate repository: [godin-examples](https://githu
|------|------|
| **init** | initialize a new godin project in the current directory |
| **update** | update the project in the current directory and regenerate necessary stuff |
| **add** | add bundles to an existing project |
| **version** | print godin version |
| **help** | Help about any command |

Expand Down
98 changes: 98 additions & 0 deletions cmd/add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package cmd

import (
"fmt"
"os"

"strings"

"github.com/lukasjarosch/godin/internal/bundle"
"github.com/lukasjarosch/godin/internal/godin"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

func init() {
rootCmd.AddCommand(addCommand)
}

// rootCmd represents the base command when called without any subcommands
var addCommand = &cobra.Command{
Use: "add",
Short: "Add bundles to an initalized godin project",
Long: `Add different kind of bundles to the current project.
godin add <bundleType>
The available bundles are:
middleware: to add a new service-layer middleware
datastore: to add a new datastore bundle (mysql, mongodb, redis)
subscriber: add an AMQP topic subscription
transport: to add a new transport layer`,
Run: addCmd,
Args: cobra.MinimumNArgs(1),
}

var validBundleTypes = []string{"middleware", "datastore", "transport", "subscriber"}

func addCmd(cmd *cobra.Command, args []string) {
logrus.SetLevel(logrus.DebugLevel)
// ensure a valid bundle type is passed
bundleType, err := validateBundleType(args[0])
if err != nil {
logrus.Error(err)
os.Exit(1)
}

// project must be initialized
if _, err := os.Stat(godin.ConfigFilename()); err != nil {
logrus.Fatal("project not initialized")
}

if err := godin.LoadConfiguration(); err != nil {
logrus.Fatalf("failed to load configuration: %s", err.Error())
}

switch bundleType {
case "middleware",
"datastore",
"transport":
logrus.Info("sorry, this bundle type is not yet implemented :(")
case "subscriber":

_, err := bundle.InitializeAMQPTransport()
if err != nil {
logrus.Errorf("failed to initialize AMQP transport: %s", err)
}

/*
sub, err := bundle.InitializeSubscriber()
if err != nil {
logrus.Errorf("failed to initialize subscriber: %s", err)
os.Exit(1)
}
// add to config
subscriptions := make(map[string]interface{})
if config.IsSet("transport.amqp.subscriptions") {
subscriptions = config.GetStringMap("transport.amqp.subscriptions")
}
subscriptions[sub.HandlerName] = sub.Subscription
config.Set("transport.amqp.subscriptions", subscriptions)
godin.SaveConfiguration()
*/
}
}

func validateBundleType(givenType string) (bundleType string, err error) {
for _, validType := range validBundleTypes {
if givenType == validType {
bundleType = validType
break
}
}
if bundleType == "" {
return "", fmt.Errorf("invalid bundle type, valid types are: %s", strings.Join(validBundleTypes, ", "))
}
return bundleType, nil
}
1 change: 0 additions & 1 deletion cmd/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ var updateCommand = &cobra.Command{
}

func updateCmd(cmd *cobra.Command, args []string) {

logrus.SetLevel(logrus.DebugLevel)

// project must be initialized
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.3.2
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
github.com/vetcher/go-astra v1.2.0
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
9 changes: 9 additions & 0 deletions internal/bundle/datastore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package bundle

type datastore struct {
}

type MySQLDatastore struct {
datastore
}

1 change: 1 addition & 0 deletions internal/bundle/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package bundle
74 changes: 74 additions & 0 deletions internal/bundle/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package bundle

import (
"github.com/lukasjarosch/godin/internal/prompt"
"github.com/lukasjarosch/godin/pkg/amqp"
)

type subscriber struct {
Subscription amqp.Subscription
HandlerName string `json:"handler_name"`
}

func InitializeSubscriber() (*subscriber, error) {
sub := amqp.Subscription{}
handlerName, err := promptValues(sub)
if err != nil {
return nil, err
}

return &subscriber{
Subscription: sub,
HandlerName: handlerName,
}, nil
}

func promptValues(sub amqp.Subscription) (handlerName string, err error) {
// Topic
p := prompt.NewPrompt(
"AMQP subscription Topic",
"user.created",
prompt.Validate(),
)
topic, err := p.Run()
if err != nil {
return "", err
}
sub.Topic = topic

// Exchange
p = prompt.NewPrompt(
"AMQP Exchange name",
"user-Exchange",
prompt.Validate(),
)
exchange, err := p.Run()
if err != nil {
return "", err
}
sub.Exchange = exchange

// Queue
p = prompt.NewPrompt(
"AMQP Queue name which is bound to the Exchange",
"user-created-Queue",
prompt.Validate(),
)
queue, err := p.Run()
if err != nil {
return "", err
}
sub.Queue = queue

// HandlerName
p = prompt.NewPrompt(
"Name of the handler for this subscription",
"UserCreatedHandler",
prompt.Validate(),
)
handler, err := p.Run()
if err != nil {
return "", err
}
return handler, nil
}
64 changes: 64 additions & 0 deletions internal/bundle/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package bundle

import (
"fmt"

"github.com/lukasjarosch/godin/internal/prompt"
"github.com/sirupsen/logrus"
config "github.com/spf13/viper"
"github.com/lukasjarosch/godin/internal/godin"
)

const AmqpTransportKey = "transport.amqp"
const AmqpExchangeKey = "transport.amqp.exchange"
const AmqpExchangeTypeKey = "transport.amqp.exchange_type"
const AmqpDefaultAddress = "transport.amqp.default_address"

type amqpTransport struct {
DefaultAddress string `json:"default_address"`
Exchange string `json:"exchange"`
ExchangeType string `json:"exchange_type"`
}

func InitializeAMQPTransport() (transport *amqpTransport, err error) {
if config.IsSet(AmqpTransportKey) {
exchange := config.GetString(AmqpExchangeKey)
exchangeType := config.GetString(AmqpExchangeTypeKey)
defaultAddress := config.GetString(AmqpDefaultAddress)

logrus.Debug("AMQP transport is already configured, loading from config")

return &amqpTransport{
DefaultAddress: defaultAddress,
Exchange: exchange,
ExchangeType: exchangeType,
}, nil
}

transport = &amqpTransport{}
if err := promptAmqpTransportValues(transport); err != nil {
return nil, fmt.Errorf("error while prompting AMQP transport values: %s", err)
}

config.Set(AmqpExchangeKey, transport.Exchange)
config.Set(AmqpExchangeTypeKey, transport.ExchangeType)
config.Set(AmqpDefaultAddress, transport.DefaultAddress)
godin.SaveConfiguration()

return transport, nil
}

func promptAmqpTransportValues(transport *amqpTransport) (err error) {
// default AMQP server address
p := prompt.NewPrompt(
"AMQP default server address to use if AMQP_URL is not provided",
"amqp://username:password@host:port/vhost",
prompt.Validate(),
)
transport.DefaultAddress, err = p.Run()
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions pkg/amqp/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package amqp
84 changes: 84 additions & 0 deletions pkg/amqp/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package amqp

import (
"github.com/streadway/amqp"
)

type SubscriberHandler func(delivery amqp.Delivery)

type Subscription struct {
Topic string `json:"topic"`
Exchange string `json:"exchange"`
Queue string `json:"queue"`
}

type handler struct {
implementation SubscriberHandler
done chan error
ctag string
routingKey string
}

type Subscriber struct {
channel *amqp.Channel
exchange string
handlers []handler
}

// NewSubscriber returns an AMQP publisher
func NewSubscriber(channel *amqp.Channel, exchange string) Subscriber {
return Subscriber{
channel: channel,
exchange: exchange,
}
}

func (c *Subscriber) Subscribe(queueName, routingKey string, ctag string, handler SubscriberHandler) error {
queue, err := c.channel.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
return err
}

if err = c.channel.QueueBind(queue.Name, routingKey, c.exchange, false, nil); err != nil {
return err
}

deliveries, err := c.channel.Consume(queue.Name, ctag, false, false, false, false, nil)
if err != nil {
return err
}

h := c.addHandler(routingKey, ctag, handler)
go c.Handler(deliveries, h)

return nil
}

func (c *Subscriber) addHandler(routingKey string, ctag string, handlerImpl SubscriberHandler) handler {
h := handler{
routingKey: routingKey,
ctag: ctag,
implementation: handlerImpl,
done: make(chan error),
}
c.handlers = append(c.handlers, h)

return h
}

func (c *Subscriber) Handler(deliveries <-chan amqp.Delivery, h handler) {
for d := range deliveries {
h.implementation(d)
}
h.done <- nil
}

func (c *Subscriber) Shutdown() error {
for _, h := range c.handlers {
if err := c.channel.Cancel(h.ctag, true); err != nil {
return err
}
<-h.done
}
return nil
}

0 comments on commit 464766a

Please sign in to comment.