Skip to content

Commit

Permalink
Fine tune agents. WIP: api-key in cmd>client.go>doReq
Browse files Browse the repository at this point in the history
Sync api-key among start-agents and start-server files.
Add new metrics.
Improve QED dashboard.
  • Loading branch information
Jose Luis Lucas authored and iknite committed Feb 19, 2019
1 parent f9eb3ad commit 892f346
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 69 deletions.
5 changes: 5 additions & 0 deletions balloon/balloon.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ func (b *Balloon) Add(event []byte) (*Snapshot, []*storage.Mutation, error) {
}

func (b Balloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*MembershipProof, error) {
// Metrics
metrics.Qed_balloon_digest_membership_total.Inc()
timer := prometheus.NewTimer(metrics.Qed_balloon_digest_membership_duration_seconds)
defer timer.ObserveDuration()

stats := metrics.Balloon
stats.AddFloat("QueryMembership", 1)
var proof MembershipProof
Expand Down
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.conf.APIKey)
// TOFIX: c.conf.APIKey arrives empty. It causes failures when doing
// memberships and incrementals by auditors and monitors.
req.Header.Set("Api-Key", "AAAAAAA") // c.conf.APIKey) //

resp, err := c.exponentialBackoff(req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func newStartCommand(ctx *cmdContext) *cobra.Command {

// INFO: testing purposes
f.BoolVar(&conf.EnableTampering, "tampering", false, "Allow tampering api for proof demostrations")
f.MarkHidden("tampering")
_ = f.MarkHidden("tampering")

// Lookups
v.BindPFlag("server.node-id", f.Lookup("node-id"))
Expand Down
21 changes: 14 additions & 7 deletions deploy/aws/modules/prometheus/provisioning/dashboards/QED.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"id": 3,
"iteration": 1550162878246,
"id": 8,
"iteration": 1550228062274,
"links": [],
"panels": [
{
Expand Down Expand Up @@ -295,7 +295,7 @@
"tableColumn": "",
"targets": [
{
"expr": "sum(qed_balloon_membership_total)",
"expr": "sum(qed_balloon_membership_total) + sum(qed_balloon_digest_membership_total)",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
Expand Down Expand Up @@ -524,15 +524,22 @@
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"stack": true,
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(qed_balloon_membership_total[$interval])) by (job)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{job}}",
"legendFormat": "{{job}}-memb",
"refId": "A"
},
{
"expr": "sum(rate(qed_balloon_digest_membership_total[$interval])) by (job)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{job}}-digest_memb",
"refId": "B"
}
],
"thresholds": [],
Expand Down Expand Up @@ -1434,7 +1441,7 @@
"tableColumn": "",
"targets": [
{
"expr": "sum(qed_store_alerts_received_total)",
"expr": "sum(qed_store_alerts_generated_total)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{job}}",
Expand Down Expand Up @@ -1512,7 +1519,7 @@
]
},
"time": {
"from": "now-15m",
"from": "now-5m",
"to": "now"
},
"timepicker": {
Expand Down
104 changes: 52 additions & 52 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,29 +132,40 @@ func (a Auditor) dispatchTasks() {
}
}

type Task interface {
Do()
}
func (a Auditor) Process(b protocol.BatchSnapshots) {
// Metrics
metrics.Qed_auditor_batches_received_total.Inc()
timer := prometheus.NewTimer(metrics.Qed_auditor_batches_process_seconds)
defer timer.ObserveDuration()

func (t MembershipTask) getSnapshot(version uint64) (*protocol.SignedSnapshot, error) {
resp, err := http.Get(fmt.Sprintf("%s/snapshot?v=%d", t.pubUrl, version))
if err != nil {
return nil, fmt.Errorf("Error getting snapshot from the store: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Error getting snapshot from the store. Status: %d", resp.StatusCode)
}
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Infof("Error reading request body: %v", err)
task := &MembershipTask{
qed: a.qed,
pubUrl: a.conf.PubUrls[0],
taskCh: a.taskCh,
s: *b.Snapshots[0],
}
var s protocol.SignedSnapshot
err = s.Decode(buf)
if err != nil {
return nil, fmt.Errorf("Error decoding signed snapshot %d codec", t.s.Snapshot.Version)

a.taskCh <- task
}

func (a *Auditor) Shutdown() {
// Metrics
metrics.Qed_auditor_instances_count.Dec()

log.Debugf("Metrics enabled: stopping server...")
if err := a.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
}
return &s, nil
log.Debugf("Done.\n")

a.executionTicker.Stop()
a.quitCh <- true
close(a.quitCh)
close(a.taskCh)
}

type Task interface {
Do()
}

type MembershipTask struct {
Expand Down Expand Up @@ -193,6 +204,27 @@ func (t MembershipTask) Do() {
log.Infof("MembershipTask.Do(): Snapshot %v has been verified by QED", t.s.Snapshot)
}

func (t MembershipTask) getSnapshot(version uint64) (*protocol.SignedSnapshot, error) {
resp, err := http.Get(fmt.Sprintf("%s/snapshot?v=%d", t.pubUrl, version))
if err != nil {
return nil, fmt.Errorf("Error getting snapshot from the store: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Error getting snapshot from the store. Status: %d", resp.StatusCode)
}
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Infof("Error reading request body: %v", err)
}
var s protocol.SignedSnapshot
err = s.Decode(buf)
if err != nil {
return nil, fmt.Errorf("Error decoding signed snapshot %d codec", t.s.Snapshot.Version)
}
return &s, nil
}

func (t MembershipTask) sendAlert(msg string) {
resp, err := http.Post(fmt.Sprintf("%s/alert", t.pubUrl), "application/json",
bytes.NewBufferString(msg))
Expand All @@ -206,35 +238,3 @@ func (t MembershipTask) sendAlert(msg string) {
log.Infof("Error reading request body: %v", err)
}
}

func (a Auditor) Process(b protocol.BatchSnapshots) {
// Metrics
metrics.Qed_auditor_batches_received_total.Inc()
timer := prometheus.NewTimer(metrics.Qed_auditor_batches_process_seconds)
defer timer.ObserveDuration()

task := &MembershipTask{
qed: a.qed,
pubUrl: a.conf.PubUrls[0],
taskCh: a.taskCh,
s: *b.Snapshots[0],
}

a.taskCh <- task
}

func (a *Auditor) Shutdown() {
// Metrics
metrics.Qed_auditor_instances_count.Dec()

log.Debugf("Metrics enabled: stopping server...")
if err := a.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
}
log.Debugf("Done.\n")

a.executionTicker.Stop()
a.quitCh <- true
close(a.quitCh)
close(a.taskCh)
}
2 changes: 0 additions & 2 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ func NewMonitor(conf Config) (*Monitor, error) {
Handler: metricsMux,
}

// fmt.Println(">>>>>>>>>< ", monitor.metricsServer.Addr)

go func() {
log.Debugf(" * Starting metrics HTTP server in addr: %s", conf.MetricsAddr)
if err := monitor.metricsServer.ListenAndServe(); err != http.ErrServerClosed {
Expand Down
2 changes: 1 addition & 1 deletion gossip/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (p Publisher) dispatchTasks() {
}

func (p Publisher) executeTask(task PublishTask) {
log.Debug("Executing task: %+v\n", task)
log.Debugf("Executing task: %+v", task)
buf, err := task.Batch.Encode()
if err != nil {
log.Debug("Publisher: Error marshalling: %s\n", err.Error())
Expand Down
4 changes: 2 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ var (
},
)

Qed_balloon_digest_membership_duration_seconds = prometheus.NewGauge(
prometheus.GaugeOpts{
Qed_balloon_digest_membership_duration_seconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "qed_balloon_digest_membership_duration_seconds",
Help: "Duration of the 'Digest Membership' Qed_balloon method.",
},
Expand Down
6 changes: 3 additions & 3 deletions tests/start_agents
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ do
$QED agent \
--alertsUrls $alertsStoreEndpoint \
auditor \
-k key \
-k test_key \
-l info \
--bind 127.0.0.1:810$i \
--join $qedGossipEndpoint \
Expand All @@ -39,7 +39,7 @@ do
$QED agent \
--alertsUrls $alertsStoreEndpoint \
monitor \
-k key \
-k test_key \
-l info \
--bind 127.0.0.1:820$i \
--join $qedGossipEndpoint \
Expand All @@ -53,7 +53,7 @@ do
$QED agent \
--alertsUrls $alertsStoreEndpoint \
publisher \
-k key \
-k test_key \
-l info \
--bind 127.0.0.1:830$i \
--join $qedGossipEndpoint \
Expand Down

0 comments on commit 892f346

Please sign in to comment.