Skip to content

Commit

Permalink
move newline responsibility
Browse files Browse the repository at this point in the history
moves newline creation from NDJson to the http handler, json stream only encodes and sends now

ignore snapshot restore if broker is disabled

enable dev mode to access event steam without acl

use mapping instead of switch

use pointers for config sizes, remove unused ttl, simplify closed conn logic

fix tests, 123 was not random enough
  • Loading branch information
drewbailey committed Oct 14, 2020
1 parent f5db891 commit 8c88f29
Show file tree
Hide file tree
Showing 27 changed files with 435 additions and 314 deletions.
2 changes: 1 addition & 1 deletion api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/hashicorp/nomad/api
go 1.12

require (
github.com/davecgh/go-spew v1.1.1
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/go-units v0.3.3
github.com/gorilla/websocket v1.4.1
github.com/hashicorp/cronexpr v1.1.0
Expand Down
8 changes: 4 additions & 4 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.EnableEventBroker != nil {
conf.EnableEventBroker = *agentConfig.Server.EnableEventBroker
}
if agentConfig.Server.EventBufferSize > 0 {
conf.EventBufferSize = int64(agentConfig.Server.EventBufferSize)
if agentConfig.Server.EventBufferSize != nil {
conf.EventBufferSize = int64(*agentConfig.Server.EventBufferSize)
}
if agentConfig.Server.DurableEventCount > 0 {
conf.DurableEventCount = int64(agentConfig.Server.DurableEventCount)
if agentConfig.Server.DurableEventCount != nil {
conf.DurableEventCount = int64(*agentConfig.Server.DurableEventCount)
}
if agentConfig.Autopilot != nil {
if agentConfig.Autopilot.CleanupDeadServers != nil {
Expand Down
12 changes: 6 additions & 6 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,11 @@ type ServerConfig struct {
// EventBufferSize configure the amount of events to be held in memory.
// If EnableEventBroker is set to true, the minimum allowable value
// for the EventBufferSize is 1.
EventBufferSize int `hcl:"event_buffer_size"`
EventBufferSize *int `hcl:"event_buffer_size"`

// DurableEventCount specifies the amount of events to persist during snapshot generation.
// A count of 0 signals that no events should be persisted.
DurableEventCount int `hcl:"durable_event_count"`
DurableEventCount *int `hcl:"durable_event_count"`

// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
Expand Down Expand Up @@ -889,8 +889,8 @@ func DefaultConfig() *Config {
Server: &ServerConfig{
Enabled: false,
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: 100,
DurableEventCount: 100,
EventBufferSize: helper.IntToPtr(100),
DurableEventCount: helper.IntToPtr(100),
StartJoin: []string{},
ServerJoin: &ServerJoin{
RetryJoin: []string{},
Expand Down Expand Up @@ -1419,11 +1419,11 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.EnableEventBroker = b.EnableEventBroker
}

if b.EventBufferSize != 0 {
if b.EventBufferSize != nil {
result.EventBufferSize = b.EventBufferSize
}

if b.DurableEventCount != 0 {
if b.DurableEventCount != nil {
result.DurableEventCount = b.DurableEventCount
}

Expand Down
26 changes: 13 additions & 13 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,19 @@ var basicConfig = &Config{
MinHeartbeatTTL: 33 * time.Second,
MinHeartbeatTTLHCL: "33s",
MaxHeartbeatsPerSecond: 11.0,
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: 15 * time.Second,
RetryIntervalHCL: "15s",
RejoinAfterLeave: true,
RetryMaxAttempts: 3,
NonVotingServer: true,
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: 200,
DurableEventCount: 100,
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: 15 * time.Second,
RetryIntervalHCL: "15s",
RejoinAfterLeave: true,
RetryMaxAttempts: 3,
NonVotingServer: true,
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(200),
DurableEventCount: helper.IntToPtr(0),
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: time.Duration(15) * time.Second,
Expand Down
24 changes: 22 additions & 2 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func TestConfig_Merge(t *testing.T) {
RedundancyZone: "foo",
UpgradeVersion: "foo",
EnableEventBroker: helper.BoolToPtr(false),
DurableEventCount: 0,
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down Expand Up @@ -331,6 +332,8 @@ func TestConfig_Merge(t *testing.T) {
RedundancyZone: "bar",
UpgradeVersion: "bar",
EnableEventBroker: helper.BoolToPtr(true),
DurableEventCount: helper.IntToPtr(100),
EventBufferSize: helper.IntToPtr(100),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down Expand Up @@ -1170,37 +1173,54 @@ func TestTelemetry_Parse(t *testing.T) {
func TestEventBroker_Parse(t *testing.T) {

require := require.New(t)

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(false, *result.EnableEventBroker)
require.Equal(0, *result.EventBufferSize)
require.Equal(0, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(5000),
DurableEventCount: helper.IntToPtr(200),
}
b := DefaultConfig().Server
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(true, *result.EnableEventBroker)
require.Equal(5000, *result.EventBufferSize)
require.Equal(200, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventBroker = helper.BoolToPtr(true)
b.EventBufferSize = helper.IntToPtr(20000)
b.DurableEventCount = helper.IntToPtr(1000)

result := a.Merge(b)
require.Equal(true, *result.EnableEventBroker)
require.Equal(20000, *result.EventBufferSize)
require.Equal(1000, *result.DurableEventCount)
}
}
3 changes: 3 additions & 0 deletions command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i
if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil {
return CodedError(500, err.Error())
}
// Each entry is its own new line according to ndjson.org
// append new line to each entry
fmt.Fprint(output, "\n")
}
})

Expand Down
10 changes: 6 additions & 4 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"

"github.com/hashicorp/nomad/testutil"
Expand Down Expand Up @@ -71,6 +72,8 @@ func TestEventStream_NamespaceQuery(t *testing.T) {

httpTest(t, nil, func(s *TestAgent) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream?namespace=foo", nil)
require.Nil(t, err)
resp := httptest.NewRecorder()
Expand All @@ -85,14 +88,14 @@ func TestEventStream_NamespaceQuery(t *testing.T) {
pub, err := s.Agent.server.State().EventBroker()
require.NoError(t, err)

pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Namespace: "bar", Payload: testEvent{ID: "123"}}}})
badID := uuid.Generate()
pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Namespace: "bar", Payload: testEvent{ID: badID}}}})
pub.Publish(&structs.Events{Index: 101, Events: []structs.Event{{Namespace: "foo", Payload: testEvent{ID: "456"}}}})

testutil.WaitForResult(func() (bool, error) {
got := resp.Body.String()
want := `"Namespace":"foo"`
bad := `123`
if strings.Contains(got, bad) {
if strings.Contains(got, badID) {
return false, fmt.Errorf("expected non matching namespace to be filtered, got:%v", got)
}
if strings.Contains(got, want) {
Expand All @@ -101,7 +104,6 @@ func TestEventStream_NamespaceQuery(t *testing.T) {

return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want)
}, func(err error) {
cancel()
require.Fail(t, err.Error())
})

Expand Down
2 changes: 2 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ func DefaultConfig() *Config {
SentinelGCInterval: 30 * time.Second,
LicenseConfig: &LicenseConfig{},
EnableEventBroker: true,
EventBufferSize: 100,
DurableEventCount: 100,
AutopilotConfig: &structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 200 * time.Millisecond,
Expand Down
102 changes: 64 additions & 38 deletions nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package nomad

import (
"context"
"fmt"
"io"
"io/ioutil"
"time"

"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -41,34 +43,28 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
return
}

// ACL check
// TODO(drew) ACL checks need to be per topic
// All Events Management
// System Events Management
// Node Events NamespaceCapabilityReadEvents
// Job/Alloc Events NamespaceCapabilityReadEvents
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
aclObj, err := e.srv.ResolveToken(args.AuthToken)
if err != nil {
handleJsonResultError(err, nil, encoder)
return
} else if aclObj != nil && !aclObj.IsManagement() {
handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}

// authToken is passed to the subscribe request so the event stream
// can handle closing a subscription if the authToken expires.
// If ACLs are disabled, a random token is generated and it will
// never be closed due to expiry.
authToken := args.AuthToken
if authToken == "" {
authToken = uuid.Generate()
}
subReq := &stream.SubscribeRequest{
Token: authToken,
Token: args.AuthToken,
Topics: args.Topics,
Index: uint64(args.Index),
Namespace: args.Namespace,
}

// Check required ACL permissions for requested Topics
if aclObj != nil {
if err := aclCheckForEvents(subReq, aclObj); err != nil {
handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}
}

// Get the servers broker and subscribe
publisher, err := e.srv.State().EventBroker()
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
Expand All @@ -86,28 +82,14 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
}
defer subscription.Unsubscribe()

ndJsonCh := make(chan *structs.NDJson)
errCh := make(chan error)

jsonStream := stream.NewNDJsonStream(ndJsonCh, 30*time.Second)
jsonStream.Run(ctx)
jsonStream := stream.NewJsonStream(ctx, 30*time.Second)

// goroutine to detect remote side closing
go func() {
for {
if _, err := conn.Read(nil); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
// One end of the pipe was explicitly closed, exit cleanly
cancel()
return
}
select {
case errCh <- err:
case <-ctx.Done():
return
}
}
}
io.Copy(ioutil.Discard, conn)
cancel()
}()

go func() {
Expand Down Expand Up @@ -145,7 +127,7 @@ OUTER:
break OUTER
case <-ctx.Done():
break OUTER
case eventJSON, ok := <-ndJsonCh:
case eventJSON, ok := <-jsonStream.OutCh():
// check if ndjson may have been closed when an error occurred,
// check once more for an error.
if !ok {
Expand Down Expand Up @@ -214,3 +196,47 @@ func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) {
Error: structs.NewRpcError(err, code),
})
}

func aclCheckForEvents(subReq *stream.SubscribeRequest, aclObj *acl.ACL) error {
if len(subReq.Topics) == 0 {
return fmt.Errorf("invalid topic request")
}

reqPolicies := make(map[string]struct{})
var required = struct{}{}

for topic := range subReq.Topics {
switch topic {
case structs.TopicDeployment, structs.TopicEval,
structs.TopicAlloc, structs.TopicJob:
if _, ok := reqPolicies[acl.NamespaceCapabilityReadJob]; !ok {
reqPolicies[acl.NamespaceCapabilityReadJob] = required
}
case structs.TopicNode:
reqPolicies["node-read"] = required
case structs.TopicAll:
reqPolicies["management"] = required
default:
return fmt.Errorf("unknown topic %s", topic)
}
}

for checks := range reqPolicies {
switch checks {
case acl.NamespaceCapabilityReadJob:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return structs.ErrPermissionDenied
}
case "node-read":
if ok := aclObj.AllowNodeRead(); !ok {
return structs.ErrPermissionDenied
}
case "management":
if ok := aclObj.IsManagement(); !ok {
return structs.ErrPermissionDenied
}
}
}

return nil
}
Loading

0 comments on commit 8c88f29

Please sign in to comment.