Skip to content

Commit

Permalink
Request-Queue Subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranav Kamath KV authored and Pranav Kamath KV committed Mar 31, 2021
1 parent e88e95d commit 1f08218
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 6 deletions.
27 changes: 23 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ DOCKERFILE_QPUB := $(CURDIR)/docker/qpub/.
DOCKERFILE_QSUB := $(CURDIR)/docker/qsub/.
DOCKERFILE_REQ := $(CURDIR)/docker/req/.
DOCKERFILE_REP := $(CURDIR)/docker/rep/.

DOCKERFILE_QREQ := $(CURDIR)/docker/qreq/.
DOCKERFILE_QREQ_SUB := $(CURDIR)/docker/qreq_sub/.
# configuration for the build path for each of the app
PUBSUB_PUB := $(BUILD_PATH)/pub-sub/pub
PUBSUB_SUB := $(BUILD_PATH)/pub-sub/sub
QGROUP_PUB := $(BUILD_PATH)/queue-group/producer
QGROUP_SUB := $(BUILD_PATH)/queue-group/consumer
REQREPLY_PUB := $(BUILD_PATH)/request-reply/pub
REQREPLY_SUB := $(BUILD_PATH)/request-reply/sub

REQREPLY_QPUB := $(BUILD_PATH)/req-qsub/pub
REQREPLY_QSUB := $(BUILD_PATH)/req-qsub/sub
# configuration for building the docker images
SRCROOT_ON_HOST := $(shell dirname $(abspath $(lastword $(MAKEFILE_LIST))))
SRCROOT_ON_CONTAINER := /go/src/$(PROJECT_ROOT)
Expand Down Expand Up @@ -56,14 +58,24 @@ IMAGE_FULL_REQ := $(IMAGE_REGISTRY)/$(IMAGE_NAME_REQ):$(IMAGE_VERSION)
IMAGE_NAME_REP := dockerrep
IMAGE_FULL_REP := $(IMAGE_REGISTRY)/$(IMAGE_NAME_REP):$(IMAGE_VERSION)

# queue-request image
IMAGE_NAME_QREQ := dockerqreq
IMAGE_FULL_QREQ := $(IMAGE_REGISTRY)/$(IMAGE_NAME_QREQ):$(IMAGE_VERSION)


# queue request-subscribe image
IMAGE_NAME_QREQ_SUB := dockerqreq-sub
IMAGE_FULL_QREQ_SUB := $(IMAGE_REGISTRY)/$(IMAGE_NAME_QREQ_SUB):$(IMAGE_VERSION)

# go app paths
SERVER_PUB := $(PROJECT_ROOT)/cmd/pub-sub/pub/
SERVER_SUB := $(PROJECT_ROOT)/cmd/pub-sub/sub/
SERVER_QPUB := $(PROJECT_ROOT)/cmd/queue-group/producer/
SERVER_QSUB := $(PROJECT_ROOT)/cmd/queue-group/consumer/
SERVER_REQ := $(PROJECT_ROOT)/cmd/request-reply/pub/
SERVER_REP := $(PROJECT_ROOT)/cmd/request-reply/sub/

SERVER_QREQ := $(PROJECT_ROOT)/cmd/req-qsub/pub/
SERVER_QREQ_SUB := $(PROJECT_ROOT)/cmd/req-qsub/sub/
# Targets to build go app binary

.PHONY: build-pub
Expand Down Expand Up @@ -102,7 +114,6 @@ build-rep:
@echo $(GO_BUILD)
@$(GO_BUILD) -o $(REQREPLY_SUB) $(SERVER_REP)


# Targets to run docker build for each go app

.PHONY: docker-pub
Expand Down Expand Up @@ -132,6 +143,14 @@ docker-rep:
@docker build -f $(DOCKERFILE_REP) -t $(IMAGE_FULL_REP) .


.PHONY: docker-qreq
docker-qreq:
@docker build -f $(DOCKERFILE_QREQ) -t $(IMAGE_FULL_QREQ) .

.PHONY: docker-qrep
docker-qrep:
@docker build -f $(DOCKERFILE_QREQ_SUB) -t $(IMAGE_FULL_QREQ_SUB) .

# Targets to push docker image of the go app

.PHONY: push-pub
Expand Down
49 changes: 49 additions & 0 deletions cmd/req-qsub/pub/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"os"
"runtime"
"strings"

"github.com/prnvkv/my-nats/cmd/config"
"github.com/prnvkv/my-nats/pkg/request-reply/pub"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)

func init() {
config.LoadConfig()
viper.BindPFlags(pflag.CommandLine)
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
// viper.SetConfigName("config")
// viper.SetConfigType("yaml")
// viper.AddConfigPath("../../../deploy/")
viper.AddConfigPath(viper.GetString("config.source"))

if len(viper.GetString("config.file")) != 0 {
err := viper.ReadInConfig()
if err != nil {
log.Errorf("Error cannot read config file : %s\n", err.Error())
os.Exit(1)
}

}

}

func main() {
msg := "Hello world"
log.Infof("MAIN: message: '%s'", msg)
subjectName := viper.GetString("nats.subject.dns")

response, err := pub.Publish(subjectName, msg)
if err != nil {
log.Errorf("ERROR: %s", err.Error())
return
}

log.Infoln("Received the message: ", string(response))
runtime.Goexit()
}
71 changes: 71 additions & 0 deletions cmd/req-qsub/sub/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"fmt"
"os"
"runtime"
"strings"

"github.com/prnvkv/my-nats/cmd/config"
"github.com/prnvkv/my-nats/pkg/queue-group/sub"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)

// func init() {
// pflag.Parse()
// viper.BindPFlags(pflag.CommandLine)
// viper.SetConfigName("config")
// viper.SetConfigType("yaml")
// viper.AddConfigPath("../../../deploy/")
// viper.AddConfigPath(viper.GetString("config.source"))
// viper.AutomaticEnv()
// viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))

// err := viper.ReadInConfig()
// if err != nil {
// log.Errorf("Error cannot read config file : %s\n", err.Error())
// os.Exit(1)
// }

// }

func init() {
config.LoadConfig()
viper.BindPFlags(pflag.CommandLine)
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
// viper.SetConfigName("config")
// viper.SetConfigType("yaml")
// viper.AddConfigPath("../../../deploy/")
viper.AddConfigPath(viper.GetString("config.source"))

if len(viper.GetString("config.file")) != 0 {
err := viper.ReadInConfig()
if err != nil {
log.Errorf("Error cannot read config file : %s\n", err.Error())
os.Exit(1)
}

}

}

func main() {

serverAddr := viper.GetString("nats.server.addr")
serverPort := viper.GetString("nats.server.port")
subjectName := viper.GetString("nats.subject.dns")
queueGroupName := viper.GetString("nats.queue_group.name")

fmt.Println("Server port and subject...", serverAddr, serverPort, subjectName, queueGroupName)
msg, err := sub.Subscribe(subjectName, queueGroupName)
if err != nil {
log.Errorf("Error: %s", err)
return
}
log.Infof("Recieved the message: %s", msg)

runtime.Goexit()
}
14 changes: 14 additions & 0 deletions docker/qreq/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:1.13.1 AS builder
LABEL stage=server-intermediate

WORKDIR /go/src/my-nats
COPY . .
RUN go build -o bin/nats-qreq ./cmd/req-qsub/pub/

FROM alpine:latest AS runner
RUN mkdir -p /lib64 && \
ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
WORKDIR /usr/local/bin
COPY --from=builder /go/src/my-nats/bin/nats-qreq .
# CMD ["nats-pub"]
ENTRYPOINT ["nats-qreq"]
14 changes: 14 additions & 0 deletions docker/qreq_sub/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:1.13.1 AS builder
LABEL stage=server-intermediate

WORKDIR /go/src/my-nats
COPY . .
RUN go build -o bin/nats-qreqsub ./cmd/req-qsub/sub/

FROM alpine:latest AS runner
RUN mkdir -p /lib64 && \
ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
WORKDIR /usr/local/bin
COPY --from=builder /go/src/my-nats/bin/nats-qreqsub .
# CMD ["nats-pub"]
ENTRYPOINT ["nats-qreqsub"]
3 changes: 2 additions & 1 deletion pkg/queue-group/pub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func Publish(subject string, message interface{}) error {
if err != nil {
return err
}
defer nc.Close()

// defer nc.Close()
// nc.QueueSubscribe("greeting", "workers", func(m *nats.Msg) {
// log.Printf("[Received] %s", string(m.Data))
// })
Expand Down
14 changes: 13 additions & 1 deletion pkg/queue-group/sub/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,27 @@ func Subscribe(subject string, queueGroupName string) ([]byte, error) {
return nil, err
}

defer nc.Close()
// defer nc.Close()

log.Infof("Consuming the message from the topic: %s\n", subject)
var receivedMsg []byte
nc.QueueSubscribe(subject, queueGroupName, func(m *nats.Msg) {
receivedMsg = m.Data
// err = m.Respond([]byte("success"))
err = nc.Publish(m.Reply, []byte("success"))
// err = m.Ack()
if err != nil {
log.Errorf("Error while ack : %s \n", err.Error())
return
}
log.Printf("[Received] %s\n", receivedMsg)
})

if err != nil {
log.Errorf("Error man!! \n ")
return nil, err
}

if len(receivedMsg) == 0 || reflect.ValueOf(receivedMsg).Kind() == reflect.Ptr && reflect.ValueOf(receivedMsg).IsNil() {
runtime.Goexit()
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/req-qsub/pub/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pub

import (
"bytes"
"encoding/gob"
"time"

log "github.com/sirupsen/logrus"

"github.com/nats-io/nats.go"
"github.com/spf13/viper"
)

func Publish(subject string, message interface{}) ([]byte, error) {
serverAddr := viper.GetString("nats.server.addr")
serverPort := viper.GetString("nats.server.port")
var natsConnection = "nats://" + serverAddr + ":" + serverPort

log.Infof("Connecting the nats server: %s with subject %s\n", natsConnection, subject)
nc, err := nats.Connect(natsConnection)
if err != nil {
return nil, err
}

defer nc.Close()

var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err = enc.Encode(message)
if err != nil {
return nil, err
}

log.Infof("Publishing the message to the subject: '%s'", subject)

response, err := nc.Request(subject, buf.Bytes(), 1*time.Second)
if err != nil {
return nil, err
}

return response.Data, nil
}
52 changes: 52 additions & 0 deletions pkg/req-qsub/sub/consumers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package sub

import (
"reflect"
"runtime"

"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

func Subscribe(subject string, queueGroupName string) ([]byte, error) {
serverAddr := viper.GetString("nats.server.addr")
serverPort := viper.GetString("nats.server.port")
// subjectName := viper.GetString("nats.subject.dns")

natsConnection := "nats://" + serverAddr + ":" + serverPort

log.Infof("Subscriber connecting to nats messaging server: %s with subject %s Queue Group: %s\n", natsConnection, subject, queueGroupName)
nc, err := nats.Connect(natsConnection)
if err != nil {
return nil, err
}

// defer nc.Close()

log.Infof("Consuming the message from the topic: %s\n", subject)
var receivedMsg []byte
nc.QueueSubscribe(subject, queueGroupName, func(m *nats.Msg) {
receivedMsg = m.Data
// err = m.Respond([]byte("success"))
err = nc.Publish(m.Reply, []byte("success"))
// err = m.Ack()
if err != nil {
log.Errorf("Error while ack : %s \n", err.Error())
return
}
log.Printf("[Received] %s\n", receivedMsg)
})

if err != nil {
log.Errorf("Error man!! \n ")
return nil, err
}

if len(receivedMsg) == 0 || reflect.ValueOf(receivedMsg).Kind() == reflect.Ptr && reflect.ValueOf(receivedMsg).IsNil() {
runtime.Goexit()
}

return receivedMsg, nil

}

0 comments on commit 1f08218

Please sign in to comment.