diff --git a/gossip/auditor/auditor.go b/gossip/auditor/auditor.go index 48e289715..3e2163839 100644 --- a/gossip/auditor/auditor.go +++ b/gossip/auditor/auditor.go @@ -24,7 +24,6 @@ import ( "io" "io/ioutil" "net/http" - "strings" "time" "github.com/bbva/qed/api/metricshttp" @@ -41,6 +40,7 @@ import ( type Config struct { QEDUrls []string PubUrls []string + AlertsUrls []string APIKey string TaskExecutionInterval time.Duration MaxInFlightTasks int @@ -66,6 +66,10 @@ type Auditor struct { executionTicker *time.Ticker } +type Task interface { + Do() +} + func NewAuditor(conf Config) (*Auditor, error) { metrics.QedAuditorInstancesCount.Inc() @@ -96,16 +100,15 @@ func NewAuditor(conf Config) (*Auditor, error) { auditor.prometheusRegistry = r metricsMux := metricshttp.NewMetricsHTTP(r) - addr := strings.Split(conf.MetricsAddr, ":") auditor.metricsServer = &http.Server{ - Addr: ":1" + addr[1], + Addr: conf.MetricsAddr, Handler: metricsMux, } go func() { - log.Debugf(" * Starting metrics HTTP server in addr: %s", conf.MetricsAddr) + log.Debugf(" * Auditor starting metrics HTTP server in addr: %s", conf.MetricsAddr) if err := auditor.metricsServer.ListenAndServe(); err != http.ErrServerClosed { - log.Errorf("Can't start metrics HTTP server: %s", err) + log.Errorf("Auditor can't start metrics HTTP server: %s", err) } }() @@ -145,49 +148,46 @@ func (a Auditor) dispatchTasks() { } } -func (a Auditor) Process(b protocol.BatchSnapshots) { +func (a Auditor) Process(b *protocol.BatchSnapshots) { // Metrics metrics.QedAuditorBatchesReceivedTotal.Inc() timer := prometheus.NewTimer(metrics.QedAuditorBatchesProcessSeconds) defer timer.ObserveDuration() task := &MembershipTask{ - qed: a.qed, - pubUrl: a.conf.PubUrls[0], - taskCh: a.taskCh, - retries: 2, - s: *b.Snapshots[0], + qed: a.qed, + pubUrl: a.conf.PubUrls[0], + alertsUrl: a.conf.AlertsUrls[0], + taskCh: a.taskCh, + retries: 2, + s: *b.Snapshots[0], } a.taskCh <- task } func (a *Auditor) Shutdown() { - // Metrics metrics.QedAuditorInstancesCount.Dec() - log.Debugf("Metrics enabled: stopping server...") - if err := a.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil - log.Error(err) + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + if err := a.metricsServer.Shutdown(ctx); err != nil { + log.Infof("Auditor metrics http server shutdown process with error: %v", err) } - log.Debugf("Done.\n") a.executionTicker.Stop() a.quitCh <- true close(a.quitCh) close(a.taskCh) -} - -type Task interface { - Do() + log.Debugf("Auditor stopped.") } type MembershipTask struct { - qed *client.HTTPClient - pubUrl string - taskCh chan Task - retries int - s protocol.SignedSnapshot + qed *client.HTTPClient + pubUrl string + alertsUrl string + taskCh chan Task + retries int + s protocol.SignedSnapshot } func (t *MembershipTask) Do() { @@ -256,7 +256,7 @@ func (t MembershipTask) getSnapshot(version uint64) (*protocol.SignedSnapshot, e } func (t MembershipTask) sendAlert(msg string) { - resp, err := http.Post(fmt.Sprintf("%s/alert", t.pubUrl), "application/json", + resp, err := http.Post(t.alertsUrl, "application/json", bytes.NewBufferString(msg)) if err != nil { log.Infof("Error saving batch in alertStore: %v", err) diff --git a/gossip/monitor/monitor.go b/gossip/monitor/monitor.go index 8f09efd51..8b0268ed6 100644 --- a/gossip/monitor/monitor.go +++ b/gossip/monitor/monitor.go @@ -24,7 +24,6 @@ import ( "io" "io/ioutil" "net/http" - "strings" "time" "github.com/bbva/qed/api/metricshttp" @@ -40,7 +39,7 @@ import ( type Config struct { QEDUrls []string - PubUrls []string + AlertsUrls []string APIKey string TaskExecutionInterval time.Duration MaxInFlightTasks int @@ -56,7 +55,7 @@ func DefaultConfig() *Config { type Monitor struct { client *client.HTTPClient - conf Config + conf *Config metricsServer *http.Server prometheusRegistry *prometheus.Registry @@ -66,7 +65,11 @@ type Monitor struct { executionTicker *time.Ticker } -func NewMonitor(conf Config) (*Monitor, error) { +type Task interface { + Do() +} + +func NewMonitor(conf *Config) (*Monitor, error) { // Metrics metrics.QedMonitorInstancesCount.Inc() @@ -96,16 +99,15 @@ func NewMonitor(conf Config) (*Monitor, error) { monitor.prometheusRegistry = r metricsMux := metricshttp.NewMetricsHTTP(r) - addr := strings.Split(conf.MetricsAddr, ":") monitor.metricsServer = &http.Server{ - Addr: ":1" + addr[1], + Addr: conf.MetricsAddr, Handler: metricsMux, } go func() { - log.Debugf(" * Starting metrics HTTP server in addr: %s", conf.MetricsAddr) + log.Debugf(" * Monitor starting metrics HTTP server in addr: %s", conf.MetricsAddr) if err := monitor.metricsServer.ListenAndServe(); err != http.ErrServerClosed { - log.Errorf("Can't start metrics HTTP server: %s", err) + log.Errorf("Monitor can't start metrics HTTP server: %s", err) } }() @@ -115,12 +117,7 @@ func NewMonitor(conf Config) (*Monitor, error) { return &monitor, nil } -type Task interface { - Do() -} - -func (m Monitor) Process(b protocol.BatchSnapshots) { - // Metrics +func (m Monitor) Process(b *protocol.BatchSnapshots) { metrics.QedMonitorBatchesReceivedTotal.Inc() timer := prometheus.NewTimer(metrics.QedMonitorBatchesProcessSeconds) defer timer.ObserveDuration() @@ -128,11 +125,11 @@ func (m Monitor) Process(b protocol.BatchSnapshots) { first := b.Snapshots[0].Snapshot last := b.Snapshots[len(b.Snapshots)-1].Snapshot - log.Debugf("Processing batch from versions %d to %d", first.Version, last.Version) + log.Debugf("Monitor processing batch from versions %d to %d", first.Version, last.Version) task := QueryTask{ client: m.client, - pubUrl: m.conf.PubUrls[0], + alertsUrl: m.conf.AlertsUrls[0], Start: first.Version, End: last.Version, StartSnapshot: *first, @@ -155,20 +152,18 @@ func (m Monitor) runTaskDispatcher() { } func (m *Monitor) Shutdown() { - // Metrics metrics.QedMonitorInstancesCount.Dec() - log.Debugf("Metrics enabled: stopping server...") - // TODO include timeout instead nil - if err := m.metricsServer.Shutdown(context.Background()); err != nil { - log.Error(err) + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + if err := m.metricsServer.Shutdown(ctx); err != nil { + log.Infof("Monitor metrics http server shutdown process with error: %v", err) } - log.Debugf("Done.\n") m.executionTicker.Stop() m.quitCh <- true close(m.quitCh) close(m.taskCh) + log.Debugf("Monitor stopped.") } func (m Monitor) dispatchTasks() { @@ -195,23 +190,23 @@ func (m Monitor) dispatchTasks() { type QueryTask struct { client *client.HTTPClient - pubUrl string + alertsUrl string taskCh chan Task Start, End uint64 StartSnapshot, EndSnapshot protocol.Snapshot } func (q QueryTask) sendAlert(msg string) { - resp, err := http.Post(fmt.Sprintf("%s/alert", q.pubUrl), "application/json", bytes.NewBufferString(msg)) + resp, err := http.Post(q.alertsUrl+"/alert", "application/json", bytes.NewBufferString(msg)) if err != nil { - log.Infof("Error saving batch in alertStore (task re-enqueued): %v", err) + log.Infof("Monitor had an error saving batch in alertStore (task re-enqueued): %v", err) q.taskCh <- q return } defer resp.Body.Close() _, err = io.Copy(ioutil.Discard, resp.Body) if err != nil { - log.Infof("Error getting response from alertStore saving a batch: %v", err) + log.Infof("MOnitor had an error from alertStore saving a batch: %v", err) } } @@ -220,13 +215,13 @@ func (q QueryTask) Do() { resp, err := q.client.Incremental(q.Start, q.End) if err != nil { metrics.QedMonitorGetIncrementalProofErrTotal.Inc() - log.Infof("Unable to get incremental proof from QED server: %s", err.Error()) + log.Infof("Monitor is unable to get incremental proof from QED server: %s", err.Error()) return } ok := q.client.VerifyIncremental(resp, &q.StartSnapshot, &q.EndSnapshot, hashing.NewSha256Hasher()) if !ok { - q.sendAlert(fmt.Sprintf("Unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version)) - log.Infof("Unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version) + q.sendAlert(fmt.Sprintf("Monitor is unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version)) + log.Infof("Monitor is unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version) } - log.Debugf("Consistency between versions %d and %d: %v\n", q.Start, q.End, ok) + log.Debugf("Monitor verified a consistency proof between versions %d and %d: %v\n", q.Start, q.End, ok) } diff --git a/gossip/publisher/publisher.go b/gossip/publisher/publisher.go index 9f8d826d3..42d708218 100644 --- a/gossip/publisher/publisher.go +++ b/gossip/publisher/publisher.go @@ -22,13 +22,13 @@ import ( "io" "io/ioutil" "net/http" - "strings" "time" "github.com/bbva/qed/api/metricshttp" "github.com/bbva/qed/gossip/metrics" "github.com/bbva/qed/log" "github.com/bbva/qed/protocol" + "github.com/coocood/freecache" "github.com/prometheus/client_golang/prometheus" ) @@ -47,9 +47,9 @@ func DefaultConfig() *Config { } } -func NewConfig(PubUrls []string) *Config { +func NewConfig(urls []string) *Config { cfg := DefaultConfig() - cfg.PubUrls = PubUrls + cfg.PubUrls = urls return cfg } @@ -57,6 +57,7 @@ type Publisher struct { store *http.Client conf Config + processed *freecache.Cache metricsServer *http.Server prometheusRegistry *prometheus.Registry @@ -65,13 +66,18 @@ type Publisher struct { executionTicker *time.Ticker } +type Task interface { + Do() +} + func NewPublisher(conf Config) (*Publisher, error) { metrics.QedPublisherInstancesCount.Inc() publisher := Publisher{ - store: &http.Client{}, - conf: conf, - taskCh: make(chan Task, 100), - quitCh: make(chan bool), + store: &http.Client{}, + conf: conf, + processed: freecache.NewCache(1 << 20), + taskCh: make(chan Task, 100), + quitCh: make(chan bool), } r := prometheus.NewRegistry() @@ -79,9 +85,8 @@ func NewPublisher(conf Config) (*Publisher, error) { publisher.prometheusRegistry = r metricsMux := metricshttp.NewMetricsHTTP(r) - addr := strings.Split(conf.MetricsAddr, ":") publisher.metricsServer = &http.Server{ - Addr: ":1" + addr[1], + Addr: conf.MetricsAddr, Handler: metricsMux, } @@ -98,17 +103,32 @@ func NewPublisher(conf Config) (*Publisher, error) { return &publisher, nil } -func (p *Publisher) Process(b protocol.BatchSnapshots) { - // Metrics +func (p *Publisher) Process(b *protocol.BatchSnapshots) { metrics.QedPublisherBatchesReceivedTotal.Inc() timer := prometheus.NewTimer(metrics.QedPublisherBatchesProcessSeconds) defer timer.ObserveDuration() + var batch protocol.BatchSnapshots + + for _, signedSnap := range b.Snapshots { + _, err := p.processed.Get(signedSnap.Signature) + if err != nil { + p.processed.Set(signedSnap.Signature, []byte{0x0}, 0) + batch.Snapshots = append(batch.Snapshots, signedSnap) + } + } + + if len(batch.Snapshots) < 1 { + return + } + + batch.From = b.From + batch.TTL = b.TTL task := &PublishTask{ store: p.store, pubUrl: p.conf.PubUrls[0], taskCh: p.taskCh, - batch: b, + batch: &batch, } p.taskCh <- task } @@ -126,19 +146,18 @@ func (p Publisher) runTaskDispatcher() { } func (p *Publisher) Shutdown() { - // Metrics metrics.QedPublisherInstancesCount.Dec() - log.Debugf("Metrics enabled: stopping server...") - if err := p.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil - log.Error(err) + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + if err := p.metricsServer.Shutdown(ctx); err != nil { + log.Infof("Publisher metrics http server shutdown process with error: %v", err) } - log.Debugf("Done.\n") p.executionTicker.Stop() p.quitCh <- true close(p.quitCh) close(p.taskCh) + log.Debugf("Publisher stopped.") } func (p Publisher) dispatchTasks() { @@ -159,33 +178,29 @@ func (p Publisher) dispatchTasks() { } } -type Task interface { - Do() -} - type PublishTask struct { store *http.Client pubUrl string - batch protocol.BatchSnapshots + batch *protocol.BatchSnapshots taskCh chan Task } func (t PublishTask) Do() { - log.Debugf("Executing task: %+v", t) + log.Debugf("Publisher is going to execute task: %+v", t) buf, err := t.batch.Encode() if err != nil { log.Debug("Publisher: Error marshalling: %s\n", err.Error()) return } - resp, err := t.store.Post(t.pubUrl+"/batch", "application/json", bytes.NewBuffer(buf)) + resp, err := t.store.Post(t.pubUrl, "application/json", bytes.NewBuffer(buf)) if err != nil { - log.Infof("Error saving batch in snapStore: %v\n", err) + log.Infof("Publised had an error saving batch in snapStore: %v\n", err) t.taskCh <- t return } defer resp.Body.Close() _, err = io.Copy(ioutil.Discard, resp.Body) if err != nil { - log.Infof("Error getting response from snapStore saving a batch: %v", err) + log.Infof("Publisher had an error getting response from snapStore saving a batch: %v", err) } }