From 892f3463b6defef38bb5ccead3634a67c0832bf0 Mon Sep 17 00:00:00 2001 From: Jose Luis Lucas Date: Fri, 15 Feb 2019 14:35:31 +0100 Subject: [PATCH] Fine tune agents. WIP: api-key in cmd>client.go>doReq Sync api-key among start-agents and start-server files. Add new metrics. Improve QED dashboard. --- balloon/balloon.go | 5 + client/client.go | 4 +- cmd/start.go | 2 +- .../provisioning/dashboards/QED.json | 21 ++-- gossip/auditor/auditor.go | 104 +++++++++--------- gossip/monitor/monitor.go | 2 - gossip/publisher/publisher.go | 2 +- metrics/metrics.go | 4 +- tests/start_agents | 6 +- 9 files changed, 81 insertions(+), 69 deletions(-) diff --git a/balloon/balloon.go b/balloon/balloon.go index 8402516c4..11527849d 100644 --- a/balloon/balloon.go +++ b/balloon/balloon.go @@ -238,6 +238,11 @@ func (b *Balloon) Add(event []byte) (*Snapshot, []*storage.Mutation, error) { } func (b Balloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*MembershipProof, error) { + // Metrics + metrics.Qed_balloon_digest_membership_total.Inc() + timer := prometheus.NewTimer(metrics.Qed_balloon_digest_membership_duration_seconds) + defer timer.ObserveDuration() + stats := metrics.Balloon stats.AddFloat("QueryMembership", 1) var proof MembershipProof diff --git a/client/client.go b/client/client.go index 43841cce6..4a4af2924 100644 --- a/client/client.go +++ b/client/client.go @@ -100,7 +100,9 @@ func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) { } req.Header.Set("Content-Type", "application/json") - req.Header.Set("Api-Key", c.conf.APIKey) + // TOFIX: c.conf.APIKey arrives empty. It causes failures when doing + // memberships and incrementals by auditors and monitors. + req.Header.Set("Api-Key", "AAAAAAA") // c.conf.APIKey) // resp, err := c.exponentialBackoff(req) if err != nil { diff --git a/cmd/start.go b/cmd/start.go index 2f611ace5..023beb25b 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -99,7 +99,7 @@ func newStartCommand(ctx *cmdContext) *cobra.Command { // INFO: testing purposes f.BoolVar(&conf.EnableTampering, "tampering", false, "Allow tampering api for proof demostrations") - f.MarkHidden("tampering") + _ = f.MarkHidden("tampering") // Lookups v.BindPFlag("server.node-id", f.Lookup("node-id")) diff --git a/deploy/aws/modules/prometheus/provisioning/dashboards/QED.json b/deploy/aws/modules/prometheus/provisioning/dashboards/QED.json index 4a282e35c..2814d05dc 100644 --- a/deploy/aws/modules/prometheus/provisioning/dashboards/QED.json +++ b/deploy/aws/modules/prometheus/provisioning/dashboards/QED.json @@ -15,8 +15,8 @@ "editable": true, "gnetId": null, "graphTooltip": 0, - "id": 3, - "iteration": 1550162878246, + "id": 8, + "iteration": 1550228062274, "links": [], "panels": [ { @@ -295,7 +295,7 @@ "tableColumn": "", "targets": [ { - "expr": "sum(qed_balloon_membership_total)", + "expr": "sum(qed_balloon_membership_total) + sum(qed_balloon_digest_membership_total)", "format": "time_series", "intervalFactor": 1, "refId": "A" @@ -524,15 +524,22 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "stack": false, + "stack": true, "steppedLine": false, "targets": [ { "expr": "sum(rate(qed_balloon_membership_total[$interval])) by (job)", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{job}}", + "legendFormat": "{{job}}-memb", "refId": "A" + }, + { + "expr": "sum(rate(qed_balloon_digest_membership_total[$interval])) by (job)", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "{{job}}-digest_memb", + "refId": "B" } ], "thresholds": [], @@ -1434,7 +1441,7 @@ "tableColumn": "", "targets": [ { - "expr": "sum(qed_store_alerts_received_total)", + "expr": "sum(qed_store_alerts_generated_total)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{job}}", @@ -1512,7 +1519,7 @@ ] }, "time": { - "from": "now-15m", + "from": "now-5m", "to": "now" }, "timepicker": { diff --git a/gossip/auditor/auditor.go b/gossip/auditor/auditor.go index a44a863d6..2d137532b 100644 --- a/gossip/auditor/auditor.go +++ b/gossip/auditor/auditor.go @@ -132,29 +132,40 @@ func (a Auditor) dispatchTasks() { } } -type Task interface { - Do() -} +func (a Auditor) Process(b protocol.BatchSnapshots) { + // Metrics + metrics.Qed_auditor_batches_received_total.Inc() + timer := prometheus.NewTimer(metrics.Qed_auditor_batches_process_seconds) + defer timer.ObserveDuration() -func (t MembershipTask) getSnapshot(version uint64) (*protocol.SignedSnapshot, error) { - resp, err := http.Get(fmt.Sprintf("%s/snapshot?v=%d", t.pubUrl, version)) - if err != nil { - return nil, fmt.Errorf("Error getting snapshot from the store: %v", err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("Error getting snapshot from the store. Status: %d", resp.StatusCode) - } - buf, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Infof("Error reading request body: %v", err) + task := &MembershipTask{ + qed: a.qed, + pubUrl: a.conf.PubUrls[0], + taskCh: a.taskCh, + s: *b.Snapshots[0], } - var s protocol.SignedSnapshot - err = s.Decode(buf) - if err != nil { - return nil, fmt.Errorf("Error decoding signed snapshot %d codec", t.s.Snapshot.Version) + + a.taskCh <- task +} + +func (a *Auditor) Shutdown() { + // Metrics + metrics.Qed_auditor_instances_count.Dec() + + log.Debugf("Metrics enabled: stopping server...") + if err := a.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil + log.Error(err) } - return &s, nil + log.Debugf("Done.\n") + + a.executionTicker.Stop() + a.quitCh <- true + close(a.quitCh) + close(a.taskCh) +} + +type Task interface { + Do() } type MembershipTask struct { @@ -193,6 +204,27 @@ func (t MembershipTask) Do() { log.Infof("MembershipTask.Do(): Snapshot %v has been verified by QED", t.s.Snapshot) } +func (t MembershipTask) getSnapshot(version uint64) (*protocol.SignedSnapshot, error) { + resp, err := http.Get(fmt.Sprintf("%s/snapshot?v=%d", t.pubUrl, version)) + if err != nil { + return nil, fmt.Errorf("Error getting snapshot from the store: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Error getting snapshot from the store. Status: %d", resp.StatusCode) + } + buf, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Infof("Error reading request body: %v", err) + } + var s protocol.SignedSnapshot + err = s.Decode(buf) + if err != nil { + return nil, fmt.Errorf("Error decoding signed snapshot %d codec", t.s.Snapshot.Version) + } + return &s, nil +} + func (t MembershipTask) sendAlert(msg string) { resp, err := http.Post(fmt.Sprintf("%s/alert", t.pubUrl), "application/json", bytes.NewBufferString(msg)) @@ -206,35 +238,3 @@ func (t MembershipTask) sendAlert(msg string) { log.Infof("Error reading request body: %v", err) } } - -func (a Auditor) Process(b protocol.BatchSnapshots) { - // Metrics - metrics.Qed_auditor_batches_received_total.Inc() - timer := prometheus.NewTimer(metrics.Qed_auditor_batches_process_seconds) - defer timer.ObserveDuration() - - task := &MembershipTask{ - qed: a.qed, - pubUrl: a.conf.PubUrls[0], - taskCh: a.taskCh, - s: *b.Snapshots[0], - } - - a.taskCh <- task -} - -func (a *Auditor) Shutdown() { - // Metrics - metrics.Qed_auditor_instances_count.Dec() - - log.Debugf("Metrics enabled: stopping server...") - if err := a.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil - log.Error(err) - } - log.Debugf("Done.\n") - - a.executionTicker.Stop() - a.quitCh <- true - close(a.quitCh) - close(a.taskCh) -} diff --git a/gossip/monitor/monitor.go b/gossip/monitor/monitor.go index a8dd80780..fc63d444d 100644 --- a/gossip/monitor/monitor.go +++ b/gossip/monitor/monitor.go @@ -90,8 +90,6 @@ func NewMonitor(conf Config) (*Monitor, error) { Handler: metricsMux, } - // fmt.Println(">>>>>>>>>< ", monitor.metricsServer.Addr) - go func() { log.Debugf(" * Starting metrics HTTP server in addr: %s", conf.MetricsAddr) if err := monitor.metricsServer.ListenAndServe(); err != http.ErrServerClosed { diff --git a/gossip/publisher/publisher.go b/gossip/publisher/publisher.go index 7f58ce78a..6e0564731 100644 --- a/gossip/publisher/publisher.go +++ b/gossip/publisher/publisher.go @@ -164,7 +164,7 @@ func (p Publisher) dispatchTasks() { } func (p Publisher) executeTask(task PublishTask) { - log.Debug("Executing task: %+v\n", task) + log.Debugf("Executing task: %+v", task) buf, err := task.Batch.Encode() if err != nil { log.Debug("Publisher: Error marshalling: %s\n", err.Error()) diff --git a/metrics/metrics.go b/metrics/metrics.go index 4b7dc033a..95e3d82c1 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -64,8 +64,8 @@ var ( }, ) - Qed_balloon_digest_membership_duration_seconds = prometheus.NewGauge( - prometheus.GaugeOpts{ + Qed_balloon_digest_membership_duration_seconds = prometheus.NewSummary( + prometheus.SummaryOpts{ Name: "qed_balloon_digest_membership_duration_seconds", Help: "Duration of the 'Digest Membership' Qed_balloon method.", }, diff --git a/tests/start_agents b/tests/start_agents index 0066937ac..47f86c003 100755 --- a/tests/start_agents +++ b/tests/start_agents @@ -25,7 +25,7 @@ do $QED agent \ --alertsUrls $alertsStoreEndpoint \ auditor \ - -k key \ + -k test_key \ -l info \ --bind 127.0.0.1:810$i \ --join $qedGossipEndpoint \ @@ -39,7 +39,7 @@ do $QED agent \ --alertsUrls $alertsStoreEndpoint \ monitor \ - -k key \ + -k test_key \ -l info \ --bind 127.0.0.1:820$i \ --join $qedGossipEndpoint \ @@ -53,7 +53,7 @@ do $QED agent \ --alertsUrls $alertsStoreEndpoint \ publisher \ - -k key \ + -k test_key \ -l info \ --bind 127.0.0.1:830$i \ --join $qedGossipEndpoint \