Skip to content

Commit

Permalink
Always add api_key to Elasticsearch output
Browse files Browse the repository at this point in the history
This fixes a few issues:

- When the hash on the permission and the hash on the agent were the
same the key were not added to the output. This was causing the agent to
not be able to connect to Elasticsearch. If you changes an integration
policy field, like the host,  and the data stream are the same the permission hash will
stay the same.

- Fix a possible issue when the new key was not saved on the Agent model, the
key was only saved when we were targetting a default agent policy.
Currently we only support a single ES output so this should always be saved.

- Reorganize the test to ensure that all the scenario from the case
statement are tested.

Fix: elastic/elastic-agent#285
  • Loading branch information
ph committed Apr 4, 2022
1 parent fb20c54 commit 372e598
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 77 deletions.
73 changes: 42 additions & 31 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,19 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker
// Note: This will need to be updated when doing multi-cluster elasticsearch support
// Currently, we only have access to the token for the elasticsearch instance fleet-server
// is monitors. When updating for multiple ES instances we need to tie the token to the output.
needKey := true
needNewKey := true
switch {
case agent.DefaultApiKey == "":
zlog.Debug().Msg("must generate api key as default API key is not present")
case p.Role.Sha2 != agent.PolicyOutputPermissionsHash:
fmt.Println("!= hash?")
zlog.Debug().Msg("must generate api key as policy output permissions changed")
default:
needKey = false
needNewKey = false
zlog.Debug().Msg("policy output permissions are the same")
}

if needKey {
if needNewKey {
zlog.Debug().
RawJSON("roles", p.Role.Raw).
Str("oldHash", agent.PolicyOutputPermissionsHash).
Expand All @@ -79,42 +80,52 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker
return err
}

if ok := setMapObj(outputMap, outputAPIKey.Agent(), p.Name, "api_key"); !ok {
return ErrFailInjectAPIKey
}
agent.DefaultApiKey = outputAPIKey.Agent()

if isDefault {
zlog.Info().
Str("hash.sha256", p.Role.Sha2).
Str(logger.DefaultOutputApiKeyId, outputAPIKey.Id).
Msg("Updating agent record to pick up default output key.")
// When a new keys is generated we need to update the Agent record,
// this will need to be updated when multiples Elasticsearch output
// are used.
zlog.Info().
Str("hash.sha256", p.Role.Sha2).
Str(logger.DefaultOutputApiKeyId, outputAPIKey.Id).
Msg("Updating agent record to pick up default output key.")

fields := map[string]interface{}{
dl.FieldDefaultApiKey: outputAPIKey.Agent(),
dl.FieldDefaultApiKeyId: outputAPIKey.Id,
dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2,
}
if agent.DefaultApiKeyId != "" {
fields[dl.FieldDefaultApiKeyHistory] = model.DefaultApiKeyHistoryItems{
Id: agent.DefaultApiKeyId,
RetiredAt: time.Now().UTC().Format(time.RFC3339),
}
fields := map[string]interface{}{
dl.FieldDefaultApiKey: outputAPIKey.Agent(),
dl.FieldDefaultApiKeyId: outputAPIKey.Id,
dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2,
}
if agent.DefaultApiKeyId != "" {
fields[dl.FieldDefaultApiKeyHistory] = model.DefaultApiKeyHistoryItems{
Id: agent.DefaultApiKeyId,
RetiredAt: time.Now().UTC().Format(time.RFC3339),
}
}

// Using painless script to append the old keys to the history
body, err := renderUpdatePainlessScript(fields)
// Using painless script to append the old keys to the history
body, err := renderUpdatePainlessScript(fields)

if err != nil {
return err
}
if err != nil {
return err
}

if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil {
zlog.Error().Err(err).Msg("fail update agent record")
return err
}
agent.DefaultApiKey = outputAPIKey.Agent()
if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil {
zlog.Error().Err(err).Msg("fail update agent record")
return err
}
}

// Always insert the `api_key` as part of the output block, this is required
// because only fleet server know the api key for the specific agent, if we don't
// add it the agent will not receive the `api_key` and will not be able to connect
// to Elasticsearch.
//
// TODO(ph) Investigate allocation with the new LS output, we had optimization
// in place to reduce number of agent policy allocation when distribution the updated
// keys to multiples agents.
if ok := setMapObj(outputMap, agent.DefaultApiKey, p.Name, "api_key"); !ok {
return ErrFailInjectAPIKey
}
case OutputTypeLogstash:
zlog.Debug().Msg("preparing logstash output")
zlog.Info().Msg("no actions required for logstash output preparation")
Expand Down
136 changes: 90 additions & 46 deletions internal/pkg/policy/policy_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package policy

import (
"context"
"fmt"
"testing"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
Expand Down Expand Up @@ -85,57 +86,100 @@ func TestPolicyESOutputPrepareNoRole(t *testing.T) {
}

func TestPolicyOutputESPrepare(t *testing.T) {
bulker := ftesting.NewMockBulk(&bulk.ApiKey{
Id: "test id",
Key: "test key",
t.Run("Permission hash == Agent Permission Hash no need to regenerate the key", func(t *testing.T) {
bulker := ftesting.NewMockBulk(&bulk.ApiKey{})
hashPerm := "abc123"
po := PolicyOutput{
Type: OutputTypeElasticsearch,
Name: "test output",
Role: &RoleT{
Sha2: hashPerm,
Raw: TestPayload,
},
}

policyMap := smap.Map{
"test output": map[string]interface{}{},
}

testAgent := &model.Agent{
DefaultApiKey: "test_id:EXISTING-KEY",
PolicyOutputPermissionsHash: hashPerm,
}

err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, testAgent, policyMap, false)
require.NoError(t, err, "expected prepare to pass")

key, ok := policyMap.GetMap("test output")["api_key"].(string)

fmt.Println(policyMap)
require.True(t, ok, "unable to case api key")
require.Equal(t, testAgent.DefaultApiKey, key)
require.Equal(t, len(bulker.ArgumentData.Update), 0, "update should not be called")
})
po := PolicyOutput{
Type: OutputTypeElasticsearch,
Name: "test output",
Role: &RoleT{
Sha2: "fake sha",
Raw: TestPayload,
},
}
policyMap := smap.Map{
"test output": map[string]interface{}{
"api_key": "",
},
}

err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, &model.Agent{}, policyMap, false)
require.Nil(t, err, "expected prepare to pass")
t.Run("Permission hash != Agent Permission Hash need to regenerate the key", func(t *testing.T) {
bulker := ftesting.NewMockBulk(&bulk.ApiKey{
Id: "abc",
Key: "new-key",
})

po := PolicyOutput{
Type: OutputTypeElasticsearch,
Name: "test output",
Role: &RoleT{
Sha2: "new-hash",
Raw: TestPayload,
},
}

policyMap := smap.Map{
"test output": map[string]interface{}{},
}

testAgent := &model.Agent{
DefaultApiKey: "test_id:EXISTING-KEY",
PolicyOutputPermissionsHash: "old-HASH",
}

err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, testAgent, policyMap, false)
require.NoError(t, err, "expected prepare to pass")

key, ok := policyMap.GetMap("test output")["api_key"].(string)

require.True(t, ok, "unable to case api key")
require.Equal(t, "abc:new-key", key)
require.Equal(t, len(bulker.ArgumentData.Update), 1, "update should be called")
})

updatedKey, ok := policyMap.GetMap("test output")["api_key"].(string)
t.Run("Generate API Key on new Agent", func(t *testing.T) {
bulker := ftesting.NewMockBulk(&bulk.ApiKey{
Id: "abc",
Key: "new-key",
})

require.True(t, ok, "unable to case api key")
require.Equal(t, updatedKey, bulker.MockedAPIKey.Agent())
require.Equal(t, len(bulker.ArgumentData.Update), 0, "update should not be called")
}
po := PolicyOutput{
Type: OutputTypeElasticsearch,
Name: "test output",
Role: &RoleT{
Sha2: "new-hash",
Raw: TestPayload,
},
}

func TestPolicyOutputDefaultESPrepare(t *testing.T) {
bulker := ftesting.NewMockBulk(&bulk.ApiKey{
Id: "test id",
Key: "test key",
})
po := PolicyOutput{
Type: OutputTypeElasticsearch,
Name: "test output",
Role: &RoleT{
Sha2: "fake sha",
Raw: TestPayload,
},
}
policyMap := smap.Map{
"test output": map[string]interface{}{},
}
testAgent := &model.Agent{}
err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, testAgent, policyMap, true)
require.Nil(t, err, "expected prepare to pass")
policyMap := smap.Map{
"test output": map[string]interface{}{},
}

testAgent := &model.Agent{}

updatedKey, ok := policyMap.GetMap("test output")["api_key"].(string)
err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, testAgent, policyMap, false)
require.NoError(t, err, "expected prepare to pass")

require.True(t, ok, "unable to case api key")
require.Equal(t, updatedKey, bulker.MockedAPIKey.Agent())
require.Greater(t, len(bulker.ArgumentData.Update), 0, "update should be called")
key, ok := policyMap.GetMap("test output")["api_key"].(string)

require.True(t, ok, "unable to case api key")
require.Equal(t, "abc:new-key", key)
require.Equal(t, len(bulker.ArgumentData.Update), 1, "update should be called")
})
}

0 comments on commit 372e598

Please sign in to comment.