Skip to content

Commit

Permalink
Refactor publisher like monitor/auditor
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Luis Lucas authored and iknite committed Dec 18, 2018
1 parent e6a343f commit 2a74d9f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 110 deletions.
10 changes: 7 additions & 3 deletions cmd/agent_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/bbva/qed/log"
"github.com/bbva/qed/util"
"github.com/spf13/cobra"
"github.com/valyala/fasthttp"
)

func newAgentPublisherCommand(ctx *agentContext) *cobra.Command {
Expand All @@ -39,9 +38,14 @@ func newAgentPublisherCommand(ctx *agentContext) *cobra.Command {

agentConfig := ctx.config
agentConfig.Role = member.Publisher
publisherConfig := publisher.NewConfig(&fasthttp.Client{}, endpoints)
publisherConfig := publisher.NewConfig(endpoints)

agent, err := gossip.NewAgent(agentConfig, []gossip.Processor{publisher.NewPublisher(publisherConfig)})
publisher, err := publisher.NewPublisher(publisherConfig)
if err != nil {
log.Fatalf("Failed to start the QED publisher: %v", err)
}

agent, err := gossip.NewAgent(agentConfig, []gossip.Processor{publisher})
if err != nil {
log.Fatalf("Failed to start the QED publisher: %v", err)
}
Expand Down
97 changes: 82 additions & 15 deletions gossip/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,115 @@ import (
"io"
"io/ioutil"
"net/http"
"time"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/valyala/fasthttp"
)

type Config struct {
Client *fasthttp.Client
SendTo []string
PubUrls []string
TaskExecutionInterval time.Duration
MaxInFlightTasks int
}

func DefaultConfig() *Config {
return &Config{}
return &Config{
TaskExecutionInterval: 200 * time.Millisecond,
MaxInFlightTasks: 10,
}
}

func NewConfig(c *fasthttp.Client, to []string) *Config {
func NewConfig(PubUrls []string) *Config {
cfg := DefaultConfig()
cfg.Client = c
cfg.SendTo = to
cfg.PubUrls = PubUrls
return cfg
}

type Publisher struct {
Agent *gossip.Agent
Config *Config
quit chan bool
client *fasthttp.Client
conf *Config

taskCh chan PublishTask
quitCh chan bool
executionTicker *time.Ticker
}

func NewPublisher(conf *Config) *Publisher {
return &Publisher{
Config: conf,
func NewPublisher(conf *Config) (*Publisher, error) {

publisher := &Publisher{
client: &fasthttp.Client{},
conf: conf,
taskCh: make(chan PublishTask, 100),
quitCh: make(chan bool),
}

go publisher.runTaskDispatcher()

return publisher, nil
}

type PublishTask struct {
Batch *protocol.BatchSnapshots
}

func (p *Publisher) Process(b *protocol.BatchSnapshots) {
buf, err := b.Encode()
task := &PublishTask{
Batch: b,
}
p.taskCh <- *task
}

func (p *Publisher) runTaskDispatcher() {
p.executionTicker = time.NewTicker(p.conf.TaskExecutionInterval)
for {
select {
case <-p.executionTicker.C:
log.Debug("Dispatching tasks...")
p.dispatchTasks()
case <-p.quitCh:
return
}
}
}

func (p *Publisher) Shutdown() {
p.executionTicker.Stop()
p.quitCh <- true
close(p.quitCh)
close(p.taskCh)
}

func (p *Publisher) dispatchTasks() {
count := 0
var task PublishTask
defer log.Debugf("%d tasks dispatched", count)
for {
select {
case task = <-p.taskCh:
go p.executeTask(&task)
count++
default:
return
}
if count >= p.conf.MaxInFlightTasks {
return
}
}
}

func (p *Publisher) executeTask(task *PublishTask) {
log.Debug("Executing task: %+v", task)
fmt.Printf("Executing task: %+v\n", task)

buf, err := task.Batch.Encode()
if err != nil {
log.Debug("\nPublisher: Error marshalling: %s", err.Error())
return
}
resp, err := http.Post(fmt.Sprintf("%s/batch", p.Config.SendTo[0]), "application/json", bytes.NewBuffer(buf))
resp, err := http.Post(fmt.Sprintf("%s/batch", p.conf.PubUrls[0]),
"application/json", bytes.NewBuffer(buf))
if err != nil {
log.Infof("Error saving batch in snapStore: %v", err)
}
Expand Down
87 changes: 0 additions & 87 deletions gossip/publisher/publisher_test.go

This file was deleted.

16 changes: 11 additions & 5 deletions tests/e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"github.com/bbva/qed/gossip/publisher"
"github.com/bbva/qed/server"
"github.com/bbva/qed/testutils/scope"

"github.com/valyala/fasthttp"
)

var apiKey, storageType, keyFile string
Expand Down Expand Up @@ -109,19 +107,27 @@ func setupMonitor(t *testing.T) (scope.TestF, scope.TestF) {

func setupPublisher(t *testing.T) (scope.TestF, scope.TestF) {
var pu *publisher.Publisher
var err error

before := func(t *testing.T) {
conf := publisher.DefaultConfig()
conf.Client = &fasthttp.Client{}
conf.SendTo = []string{PubUrl}
conf.PubUrls = []string{PubUrl}

go (func() {
pu = publisher.NewPublisher(conf)
pu, err = publisher.NewPublisher(conf)
if err != nil {
t.Fatalf("Unable to create a new publisher: %v", err)
}
})()
time.Sleep(2 * time.Second)
}

after := func(t *testing.T) {
if pu != nil {
pu.Shutdown()
} else {
t.Fatalf("Unable to shutdown the publisher!")
}
}
return before, after
}
Expand Down

0 comments on commit 2a74d9f

Please sign in to comment.