Skip to content

Commit

Permalink
Merge pull request cockroachdb#32 from crowdflux/develop
Browse files Browse the repository at this point in the history
Persistent channel using rabbitmq & better logging
  • Loading branch information
himanshu144141 authored Oct 17, 2016
2 parents ef2fda6 + 0395606 commit b2f597f
Show file tree
Hide file tree
Showing 64 changed files with 1,281 additions and 495 deletions.
65 changes: 65 additions & 0 deletions app/DAL/clients/rabbitmq/internal/receive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//package rabbitmq

package internal

import (
"log"

"encoding/json"
"github.com/crowdflux/angel/app/models"
"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"test", // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
for d := range msgs {
flu := models.FeedLineUnit{}
json.Unmarshal(d.Body, &flu)
log.Printf("Received a message: %s", flu.ID)
//err := d.Ack(false)
//if err != nil {
// panic(err)
//}
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
57 changes: 57 additions & 0 deletions app/DAL/clients/rabbitmq/internal/send.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package internal

//
//package main
//
//import (
// "log"
//
// "encoding/json"
// "github.com/crowdflux/angel/app/models"
// "github.com/crowdflux/angel/app/models/uuid"
// "github.com/streadway/amqp"
//)
//
//func failOnError(err error, msg string) {
// if err != nil {
// log.Fatalf("%s: %s", msg, err)
// }
//}
//
//func main() {
// conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
// failOnError(err, "Failed to connect to RabbitMQ")
// defer conn.Close()
//
// ch, err := conn.Channel()
// failOnError(err, "Failed to open a channel")
// defer ch.Close()
//
// q, err := ch.QueueDeclare(
// "hello", // name
// false, // durable
// false, // delete when unused
// false, // exclusive
// false, // no-wait
// nil, // arguments
// )
// failOnError(err, "Failed to declare a queue")
//
// flu := models.FeedLineUnit{
// ID: uuid.NewV4(),
// }
// fluBty, _ := json.Marshal(flu)
//
// err = ch.Publish(
// "", // exchange
// q.Name, // routing key
// false, // mandatory
// false, // immediate
// amqp.Publishing{
// ContentType: "application/json",
// Body: fluBty,
// })
// log.Printf(" [x] Sent %s", flu)
// failOnError(err, "Failed to publish a message")
//
//}
34 changes: 34 additions & 0 deletions app/DAL/clients/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package rabbitmq

import (
"github.com/crowdflux/angel/app/config"
"github.com/streadway/amqp"
)

var rabbitmqConn *amqp.Connection

func init() {
rabbitmqConn = initRabbitMqClient()
}

func initRabbitMqClient() *amqp.Connection {

username := config.RABBITMQ_USERNAME.Get()
password := config.RABBITMQ_PASSWORD.Get()
host := config.RABBITMQ_HOST.Get()

conn, err := amqp.Dial("amqp://" + username + ":" + password + "@" + host + ":5672/")
if err != nil {
panic(err)
}

return conn
}

func GetNewChannel() *amqp.Channel {
ch, err := rabbitmqConn.Channel()
if err != nil {
panic(err)
}
return ch
}
File renamed without changes.
126 changes: 126 additions & 0 deletions app/DAL/feed_line/feed_line.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package feed_line

import (
"encoding/json"
"errors"
"github.com/crowdflux/angel/app/DAL/clients/rabbitmq"
"github.com/crowdflux/angel/app/models"
"github.com/crowdflux/angel/app/plog"
"github.com/streadway/amqp"
"sync"
)

// ShortHand for channel of FLUs i.e. FeedLine
type Fl struct {
amqpChan *amqp.Channel
queueName string
once sync.Once
}

func New(name string) Fl {

ch := rabbitmq.GetNewChannel()

q, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)

if err != nil {
plog.Error("Feedline", err, "error declaring queue, name: ", name)
panic(err)
}

return Fl{
amqpChan: ch,
queueName: q.Name,
}
}

func (fl *Fl) Push(flu FLU) {

// Send only the models.Feedline part of the flu in bytes
bty, _ := json.Marshal(flu.FeedLineUnit)

// This is async
// TODO Think about a way to guarantee this operation also
err := fl.amqpChan.Publish(
"", // exchange
fl.queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: bty,
})
if err != nil {
plog.Error("Feedline", err, "error publishing to channel", "flu_id: "+flu.ID.String())
panic(err)
}

// Just for safety: if someone forgets
// to ConfirmReceive the flu received from a queue
// then reconfirm it here as it will most
// probably be a bug
if flu.delivery.Acknowledger != nil {
flu.ConfirmReceive()
}

plog.Info("feedline", "complete push from: ", fl.queueName, "id: ", flu.ID.String())
}

func (fl *Fl) Receiver() <-chan FLU {

println("Feedline, subscribe request: ", fl.queueName)

var fluChan chan FLU
var flag bool = false

fl.once.Do(func() {

fluChan = make(chan FLU)

deliveryChan, err := fl.amqpChan.Consume(
fl.queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
plog.Error("Feedline", err, "error consuming queue, name:", fl.queueName)
panic(err)
}

go func() {

for msg := range deliveryChan {

flu := models.FeedLineUnit{}
json.Unmarshal(msg.Body, &flu)

fluChan <- FLU{
FeedLineUnit: flu,
delivery: msg,
once: &sync.Once{},
}
plog.Info("feedline", "sent to FLU chan, name: ", fl.queueName, "id: ", flu.ID.String())
}
}()

flag = true
})

if flag {
return (<-chan FLU)(fluChan)
} else {
panic(errors.New("Feedline already subscribed, name: " + fl.queueName))
}

}
33 changes: 33 additions & 0 deletions app/DAL/feed_line/feed_line_unit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package feed_line

import (
"github.com/crowdflux/angel/app/models"
"github.com/crowdflux/angel/app/plog"
"github.com/streadway/amqp"
"sync"
)

//--------------------------------------------------------------------------------//

type FLU struct {
models.FeedLineUnit

delivery amqp.Delivery

once *sync.Once
}

func (flu *FLU) ConfirmReceive() {

flu.once.Do(func() {
err := flu.delivery.Ack(false)
if err != nil {
plog.Error("FLU", err, "error while ack", "fluId: "+flu.FeedLineUnit.ID.String())
panic(err)
}
})
}

func (flu *FLU) Redelivered() bool {
return flu.delivery.Redelivered
}
63 changes: 63 additions & 0 deletions app/DAL/feed_line/feed_line_unit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package feed_line

import (
"testing"

"github.com/crowdflux/angel/app/models"
"github.com/crowdflux/angel/app/models/uuid"
"github.com/stretchr/testify/assert"
"time"
)

func TestNew(t *testing.T) {

fl := New("test1")

fluId := uuid.NewV4()

fl.Push(FLU{
FeedLineUnit: models.FeedLineUnit{
ID: fluId,
},
})

flu := <-fl.Receiver()

flu.ConfirmReceive()

fl.amqpChan.QueueDelete("test1", false, false, false)

assert.EqualValues(t, fluId, flu.ID)

}

func TestFeedline_Load(t *testing.T) {
fl := New("test12")

flus := fl.Receiver()

go func() {

for {
fl.Push(FLU{
FeedLineUnit: models.FeedLineUnit{
ID: uuid.NewV4(),
},
})

}
}()

go func() {

for {
<-flus

}
}()

time.Sleep(time.Duration(1) * time.Second)

fl.amqpChan.QueueDelete("test12", false, false, false)

}
Loading

0 comments on commit b2f597f

Please sign in to comment.