diff --git a/.gitignore b/.gitignore index 16af516..8880daa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea bin/ +dist diff --git a/README.md b/README.md index 864ee43..aadb2ac 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ Examples are now hosted in a separate repository: [godin-examples](https://githu |------|------| | **init** | initialize a new godin project in the current directory | | **update** | update the project in the current directory and regenerate necessary stuff | +| **add** | add bundles to an existing project | | **version** | print godin version | | **help** | Help about any command | diff --git a/cmd/add.go b/cmd/add.go new file mode 100644 index 0000000..30d0b87 --- /dev/null +++ b/cmd/add.go @@ -0,0 +1,98 @@ +package cmd + +import ( + "fmt" + "os" + + "strings" + + "github.com/lukasjarosch/godin/internal/bundle" + "github.com/lukasjarosch/godin/internal/godin" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +func init() { + rootCmd.AddCommand(addCommand) +} + +// rootCmd represents the base command when called without any subcommands +var addCommand = &cobra.Command{ + Use: "add", + Short: "Add bundles to an initalized godin project", + Long: `Add different kind of bundles to the current project. + + godin add + +The available bundles are: + middleware: to add a new service-layer middleware + datastore: to add a new datastore bundle (mysql, mongodb, redis) + subscriber: add an AMQP topic subscription + transport: to add a new transport layer`, + Run: addCmd, + Args: cobra.MinimumNArgs(1), +} + +var validBundleTypes = []string{"middleware", "datastore", "transport", "subscriber"} + +func addCmd(cmd *cobra.Command, args []string) { + logrus.SetLevel(logrus.DebugLevel) + // ensure a valid bundle type is passed + bundleType, err := validateBundleType(args[0]) + if err != nil { + logrus.Error(err) + os.Exit(1) + } + + // project must be initialized + if _, err := os.Stat(godin.ConfigFilename()); err != nil { + logrus.Fatal("project not initialized") + } + + if err := godin.LoadConfiguration(); err != nil { + logrus.Fatalf("failed to load configuration: %s", err.Error()) + } + + switch bundleType { + case "middleware", + "datastore", + "transport": + logrus.Info("sorry, this bundle type is not yet implemented :(") + case "subscriber": + + _, err := bundle.InitializeAMQPTransport() + if err != nil { + logrus.Errorf("failed to initialize AMQP transport: %s", err) + } + + /* + sub, err := bundle.InitializeSubscriber() + if err != nil { + logrus.Errorf("failed to initialize subscriber: %s", err) + os.Exit(1) + } + + // add to config + subscriptions := make(map[string]interface{}) + if config.IsSet("transport.amqp.subscriptions") { + subscriptions = config.GetStringMap("transport.amqp.subscriptions") + } + subscriptions[sub.HandlerName] = sub.Subscription + config.Set("transport.amqp.subscriptions", subscriptions) + godin.SaveConfiguration() + */ + } +} + +func validateBundleType(givenType string) (bundleType string, err error) { + for _, validType := range validBundleTypes { + if givenType == validType { + bundleType = validType + break + } + } + if bundleType == "" { + return "", fmt.Errorf("invalid bundle type, valid types are: %s", strings.Join(validBundleTypes, ", ")) + } + return bundleType, nil +} diff --git a/cmd/update.go b/cmd/update.go index 3104b50..68425bb 100644 --- a/cmd/update.go +++ b/cmd/update.go @@ -37,7 +37,6 @@ var updateCommand = &cobra.Command{ } func updateCmd(cmd *cobra.Command, args []string) { - logrus.SetLevel(logrus.DebugLevel) // project must be initialized diff --git a/go.mod b/go.mod index 7916bb4..b216f51 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v0.0.5 github.com/spf13/viper v1.3.2 + github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 github.com/vetcher/go-astra v1.2.0 gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c // indirect ) diff --git a/go.sum b/go.sum index d9713d2..750618e 100644 --- a/go.sum +++ b/go.sum @@ -147,6 +147,8 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/internal/bundle/datastore.go b/internal/bundle/datastore.go new file mode 100644 index 0000000..460cb58 --- /dev/null +++ b/internal/bundle/datastore.go @@ -0,0 +1,9 @@ +package bundle + +type datastore struct { +} + +type MySQLDatastore struct { + datastore +} + diff --git a/internal/bundle/middleware.go b/internal/bundle/middleware.go new file mode 100644 index 0000000..fd27aed --- /dev/null +++ b/internal/bundle/middleware.go @@ -0,0 +1 @@ +package bundle diff --git a/internal/bundle/subscribe.go b/internal/bundle/subscribe.go new file mode 100644 index 0000000..a39be14 --- /dev/null +++ b/internal/bundle/subscribe.go @@ -0,0 +1,74 @@ +package bundle + +import ( + "github.com/lukasjarosch/godin/internal/prompt" + "github.com/lukasjarosch/godin/pkg/amqp" +) + +type subscriber struct { + Subscription amqp.Subscription + HandlerName string `json:"handler_name"` +} + +func InitializeSubscriber() (*subscriber, error) { + sub := amqp.Subscription{} + handlerName, err := promptValues(sub) + if err != nil { + return nil, err + } + + return &subscriber{ + Subscription: sub, + HandlerName: handlerName, + }, nil +} + +func promptValues(sub amqp.Subscription) (handlerName string, err error) { + // Topic + p := prompt.NewPrompt( + "AMQP subscription Topic", + "user.created", + prompt.Validate(), + ) + topic, err := p.Run() + if err != nil { + return "", err + } + sub.Topic = topic + + // Exchange + p = prompt.NewPrompt( + "AMQP Exchange name", + "user-Exchange", + prompt.Validate(), + ) + exchange, err := p.Run() + if err != nil { + return "", err + } + sub.Exchange = exchange + + // Queue + p = prompt.NewPrompt( + "AMQP Queue name which is bound to the Exchange", + "user-created-Queue", + prompt.Validate(), + ) + queue, err := p.Run() + if err != nil { + return "", err + } + sub.Queue = queue + + // HandlerName + p = prompt.NewPrompt( + "Name of the handler for this subscription", + "UserCreatedHandler", + prompt.Validate(), + ) + handler, err := p.Run() + if err != nil { + return "", err + } + return handler, nil +} diff --git a/internal/bundle/transport.go b/internal/bundle/transport.go new file mode 100644 index 0000000..83d8bb4 --- /dev/null +++ b/internal/bundle/transport.go @@ -0,0 +1,64 @@ +package bundle + +import ( + "fmt" + + "github.com/lukasjarosch/godin/internal/prompt" + "github.com/sirupsen/logrus" + config "github.com/spf13/viper" + "github.com/lukasjarosch/godin/internal/godin" +) + +const AmqpTransportKey = "transport.amqp" +const AmqpExchangeKey = "transport.amqp.exchange" +const AmqpExchangeTypeKey = "transport.amqp.exchange_type" +const AmqpDefaultAddress = "transport.amqp.default_address" + +type amqpTransport struct { + DefaultAddress string `json:"default_address"` + Exchange string `json:"exchange"` + ExchangeType string `json:"exchange_type"` +} + +func InitializeAMQPTransport() (transport *amqpTransport, err error) { + if config.IsSet(AmqpTransportKey) { + exchange := config.GetString(AmqpExchangeKey) + exchangeType := config.GetString(AmqpExchangeTypeKey) + defaultAddress := config.GetString(AmqpDefaultAddress) + + logrus.Debug("AMQP transport is already configured, loading from config") + + return &amqpTransport{ + DefaultAddress: defaultAddress, + Exchange: exchange, + ExchangeType: exchangeType, + }, nil + } + + transport = &amqpTransport{} + if err := promptAmqpTransportValues(transport); err != nil { + return nil, fmt.Errorf("error while prompting AMQP transport values: %s", err) + } + + config.Set(AmqpExchangeKey, transport.Exchange) + config.Set(AmqpExchangeTypeKey, transport.ExchangeType) + config.Set(AmqpDefaultAddress, transport.DefaultAddress) + godin.SaveConfiguration() + + return transport, nil +} + +func promptAmqpTransportValues(transport *amqpTransport) (err error) { + // default AMQP server address + p := prompt.NewPrompt( + "AMQP default server address to use if AMQP_URL is not provided", + "amqp://username:password@host:port/vhost", + prompt.Validate(), + ) + transport.DefaultAddress, err = p.Run() + if err != nil { + return err + } + + return nil +} diff --git a/pkg/amqp/publish.go b/pkg/amqp/publish.go new file mode 100644 index 0000000..0e8d02a --- /dev/null +++ b/pkg/amqp/publish.go @@ -0,0 +1 @@ +package amqp diff --git a/pkg/amqp/subscribe.go b/pkg/amqp/subscribe.go new file mode 100644 index 0000000..9e04c1c --- /dev/null +++ b/pkg/amqp/subscribe.go @@ -0,0 +1,84 @@ +package amqp + +import ( + "github.com/streadway/amqp" +) + +type SubscriberHandler func(delivery amqp.Delivery) + +type Subscription struct { + Topic string `json:"topic"` + Exchange string `json:"exchange"` + Queue string `json:"queue"` +} + +type handler struct { + implementation SubscriberHandler + done chan error + ctag string + routingKey string +} + +type Subscriber struct { + channel *amqp.Channel + exchange string + handlers []handler +} + +// NewSubscriber returns an AMQP publisher +func NewSubscriber(channel *amqp.Channel, exchange string) Subscriber { + return Subscriber{ + channel: channel, + exchange: exchange, + } +} + +func (c *Subscriber) Subscribe(queueName, routingKey string, ctag string, handler SubscriberHandler) error { + queue, err := c.channel.QueueDeclare(queueName, true, false, false, false, nil) + if err != nil { + return err + } + + if err = c.channel.QueueBind(queue.Name, routingKey, c.exchange, false, nil); err != nil { + return err + } + + deliveries, err := c.channel.Consume(queue.Name, ctag, false, false, false, false, nil) + if err != nil { + return err + } + + h := c.addHandler(routingKey, ctag, handler) + go c.Handler(deliveries, h) + + return nil +} + +func (c *Subscriber) addHandler(routingKey string, ctag string, handlerImpl SubscriberHandler) handler { + h := handler{ + routingKey: routingKey, + ctag: ctag, + implementation: handlerImpl, + done: make(chan error), + } + c.handlers = append(c.handlers, h) + + return h +} + +func (c *Subscriber) Handler(deliveries <-chan amqp.Delivery, h handler) { + for d := range deliveries { + h.implementation(d) + } + h.done <- nil +} + +func (c *Subscriber) Shutdown() error { + for _, h := range c.handlers { + if err := c.channel.Cancel(h.ctag, true); err != nil { + return err + } + <-h.done + } + return nil +}