Skip to content

Commit

Permalink
Cherry-pick #26275 to 7.13: Enable agent to send custom headers to ki…
Browse files Browse the repository at this point in the history
…bana/ES (#26362)

* update

* Update CHANGELOG.next.asciidoc
  • Loading branch information
michalpristas authored Jun 18, 2021
1 parent f28af6f commit c9f0fc8
Show file tree
Hide file tree
Showing 16 changed files with 361 additions and 117 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,4 @@
- Add --fleet-server-service-token and FLEET_SERVER_SERVICE_TOKEN options {pull}25083[25083]
- Keep http and logging config during enroll {pull}25132[25132]
- Log output of container to $LOGS_PATH/elastic-agent-start.log when LOGS_PATH set {pull}25150[25150]
- Enable agent to send custom headers to kibana/ES {pull}26275[26275]
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const maxRetriesloadAgentInfo = 5

type persistentAgentInfo struct {
ID string `json:"id" yaml:"id" config:"id"`
Headers map[string]string `json:"headers" yaml:"headers" config:"headers"`
LogLevel string `json:"logging.level,omitempty" yaml:"logging.level,omitempty" config:"logging.level,omitempty"`
MonitoringHTTP *monitoringConfig.MonitoringHTTPConfig `json:"monitoring.http,omitempty" yaml:"monitoring.http,omitempty" config:"monitoring.http,omitempty"`
}
Expand Down
7 changes: 7 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/info/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type AgentInfo struct {
agentID string
logLevel string
headers map[string]string
}

// NewAgentInfoWithLog creates a new agent information.
Expand All @@ -30,6 +31,7 @@ func NewAgentInfoWithLog(level string, createAgentID bool) (*AgentInfo, error) {
return &AgentInfo{
agentID: agentInfo.ID,
logLevel: agentInfo.LogLevel,
headers: agentInfo.Headers,
}, nil
}

Expand Down Expand Up @@ -84,3 +86,8 @@ func (*AgentInfo) Version() string {
func (*AgentInfo) Snapshot() bool {
return release.Snapshot()
}

// Headers returns custom headers used to communicate with elasticsearch.
func (i *AgentInfo) Headers() map[string]string {
return i.headers
}
142 changes: 31 additions & 111 deletions x-pack/elastic-agent/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
}

if cfg.Kibana.Fleet.Setup {
client, err = kibanaClient(cfg.Kibana)
client, err = kibanaClient(cfg.Kibana, cfg.Kibana.Headers)
if err != nil {
return err
}
Expand All @@ -286,7 +286,7 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
token := cfg.Fleet.EnrollmentToken
if token == "" && !cfg.FleetServer.Enable {
if client == nil {
client, err = kibanaClient(cfg.Kibana)
client, err = kibanaClient(cfg.Kibana, cfg.Kibana.Headers)
if err != nil {
return err
}
Expand Down Expand Up @@ -363,6 +363,11 @@ func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string,
if cfg.FleetServer.CertKey != "" {
args = append(args, "--fleet-server-cert-key", cfg.FleetServer.CertKey)
}

for k, v := range cfg.FleetServer.Headers {
args = append(args, "--header", k+"="+v)
}

if cfg.Fleet.URL != "" {
args = append(args, "--url", cfg.Fleet.URL)
}
Expand Down Expand Up @@ -444,19 +449,21 @@ func kibanaFetchToken(cfg setupConfig, client *kibana.Client, policy *kibanaPoli
return keyDetail.Item.APIKey, nil
}

func kibanaClient(cfg kibanaConfig) (*kibana.Client, error) {
func kibanaClient(cfg kibanaConfig, headers map[string]string) (*kibana.Client, error) {
var tls *tlscommon.Config
if cfg.Fleet.CA != "" {
tls = &tlscommon.Config{
CAs: []string{cfg.Fleet.CA},
}
}

return kibana.NewClientWithConfig(&kibana.ClientConfig{
Host: cfg.Fleet.Host,
Username: cfg.Fleet.Username,
Password: cfg.Fleet.Password,
IgnoreVersion: true,
TLS: tls,
Headers: headers,
})
}

Expand Down Expand Up @@ -518,6 +525,27 @@ func envBool(keys ...string) bool {
return false
}

func envMap(key string) map[string]string {
m := make(map[string]string)
prefix := key + "="
for _, env := range os.Environ() {
if !strings.HasPrefix(env, prefix) {
continue
}

envVal := strings.TrimPrefix(env, prefix)

keyValue := strings.SplitN(envVal, "=", 2)
if len(keyValue) != 2 {
continue
}

m[keyValue[0]] = keyValue[1]
}

return m
}

func isTrue(val string) bool {
trueVals := []string{"1", "true", "yes", "y"}
val = strings.ToLower(val)
Expand Down Expand Up @@ -815,114 +843,6 @@ type kibanaAPIKeyDetail struct {
Item kibanaAPIKey `json:"item"`
}

// setup configuration

type setupConfig struct {
Fleet fleetConfig `config:"fleet"`
FleetServer fleetServerConfig `config:"fleet_server"`
Kibana kibanaConfig `config:"kibana"`
}

type elasticsearchConfig struct {
CA string `config:"ca"`
Host string `config:"host"`
Username string `config:"username"`
Password string `config:"password"`
ServiceToken string `config:"service_token"`
}

type fleetConfig struct {
CA string `config:"ca"`
Enroll bool `config:"enroll"`
EnrollmentToken string `config:"enrollment_token"`
Force bool `config:"force"`
Insecure bool `config:"insecure"`
TokenName string `config:"token_name"`
TokenPolicyName string `config:"token_policy_name"`
URL string `config:"url"`
}

type fleetServerConfig struct {
Cert string `config:"cert"`
CertKey string `config:"cert_key"`
Elasticsearch elasticsearchConfig `config:"elasticsearch"`
Enable bool `config:"enable"`
Host string `config:"host"`
InsecureHTTP bool `config:"insecure_http"`
PolicyID string `config:"policy_id"`
Port string `config:"port"`
}

type kibanaConfig struct {
Fleet kibanaFleetConfig `config:"fleet"`
RetrySleepDuration time.Duration `config:"retry_sleep_duration"`
RetryMaxCount int `config:"retry_max_count"`
}

type kibanaFleetConfig struct {
CA string `config:"ca"`
Host string `config:"host"`
Password string `config:"password"`
Setup bool `config:"setup"`
Username string `config:"username"`
}

func defaultAccessConfig() (setupConfig, error) {
retrySleepDuration, err := envDurationWithDefault(defaultRequestRetrySleep, requestRetrySleepEnv)
if err != nil {
return setupConfig{}, err
}

retryMaxCount, err := envIntWithDefault(defaultMaxRequestRetries, maxRequestRetriesEnv)
if err != nil {
return setupConfig{}, err
}

cfg := setupConfig{
Fleet: fleetConfig{
CA: envWithDefault("", "FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
Enroll: envBool("FLEET_ENROLL", "FLEET_SERVER_ENABLE"),
EnrollmentToken: envWithDefault("", "FLEET_ENROLLMENT_TOKEN"),
Force: envBool("FLEET_FORCE"),
Insecure: envBool("FLEET_INSECURE"),
TokenName: envWithDefault("Default", "FLEET_TOKEN_NAME"),
TokenPolicyName: envWithDefault("", "FLEET_TOKEN_POLICY_NAME"),
URL: envWithDefault("", "FLEET_URL"),
},
FleetServer: fleetServerConfig{
Cert: envWithDefault("", "FLEET_SERVER_CERT"),
CertKey: envWithDefault("", "FLEET_SERVER_CERT_KEY"),
Elasticsearch: elasticsearchConfig{
Host: envWithDefault("http://elasticsearch:9200", "FLEET_SERVER_ELASTICSEARCH_HOST", "ELASTICSEARCH_HOST"),
Username: envWithDefault("elastic", "FLEET_SERVER_ELASTICSEARCH_USERNAME", "ELASTICSEARCH_USERNAME"),
Password: envWithDefault("changeme", "FLEET_SERVER_ELASTICSEARCH_PASSWORD", "ELASTICSEARCH_PASSWORD"),
ServiceToken: envWithDefault("", "FLEET_SERVER_SERVICE_TOKEN"),
CA: envWithDefault("", "FLEET_SERVER_ELASTICSEARCH_CA", "ELASTICSEARCH_CA"),
},
Enable: envBool("FLEET_SERVER_ENABLE"),
Host: envWithDefault("", "FLEET_SERVER_HOST"),
InsecureHTTP: envBool("FLEET_SERVER_INSECURE_HTTP"),
PolicyID: envWithDefault("", "FLEET_SERVER_POLICY_ID", "FLEET_SERVER_POLICY"),
Port: envWithDefault("", "FLEET_SERVER_PORT"),
},
Kibana: kibanaConfig{
Fleet: kibanaFleetConfig{
// Remove FLEET_SETUP in 8.x
// The FLEET_SETUP environment variable boolean is a fallback to the old name. The name was updated to
// reflect that its setting up Fleet in Kibana versus setting up Fleet Server.
Setup: envBool("KIBANA_FLEET_SETUP", "FLEET_SETUP"),
Host: envWithDefault("http://kibana:5601", "KIBANA_FLEET_HOST", "KIBANA_HOST"),
Username: envWithDefault("elastic", "KIBANA_FLEET_USERNAME", "KIBANA_USERNAME", "ELASTICSEARCH_USERNAME"),
Password: envWithDefault("changeme", "KIBANA_FLEET_PASSWORD", "KIBANA_PASSWORD", "ELASTICSEARCH_PASSWORD"),
CA: envWithDefault("", "KIBANA_FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
},
RetrySleepDuration: retrySleepDuration,
RetryMaxCount: retryMaxCount,
},
}
return cfg, nil
}

func envDurationWithDefault(defVal string, keys ...string) (time.Duration, error) {
valStr := defVal
for _, key := range keys {
Expand Down
24 changes: 24 additions & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
Expand Down Expand Up @@ -59,6 +60,7 @@ func addEnrollFlags(cmd *cobra.Command) {
cmd.Flags().Uint16P("fleet-server-port", "", 0, "Fleet Server HTTP binding port (overrides the policy)")
cmd.Flags().StringP("fleet-server-cert", "", "", "Certificate to use for exposed Fleet Server HTTPS endpoint")
cmd.Flags().StringP("fleet-server-cert-key", "", "", "Private key to use for exposed Fleet Server HTTPS endpoint")
cmd.Flags().StringSliceP("header", "", []string{}, "Headers used in communication with elasticsearch")
cmd.Flags().BoolP("fleet-server-insecure-http", "", false, "Expose Fleet Server over HTTP (not recommended; insecure)")
cmd.Flags().StringP("certificate-authorities", "a", "", "Comma separated list of root certificate for server verifications")
cmd.Flags().StringP("ca-sha256", "p", "", "Comma separated list of certificate authorities hash pins used for certificate verifications")
Expand All @@ -81,6 +83,7 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
fPort, _ := cmd.Flags().GetUint16("fleet-server-port")
fCert, _ := cmd.Flags().GetString("fleet-server-cert")
fCertKey, _ := cmd.Flags().GetString("fleet-server-cert-key")
fHeaders, _ := cmd.Flags().GetStringSlice("header")
fInsecure, _ := cmd.Flags().GetBool("fleet-server-insecure-http")
ca, _ := cmd.Flags().GetString("certificate-authorities")
sha256, _ := cmd.Flags().GetString("ca-sha256")
Expand Down Expand Up @@ -128,6 +131,12 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
args = append(args, "--fleet-server-cert-key")
args = append(args, fCertKey)
}

for k, v := range mapFromEnvList(fHeaders) {
args = append(args, "--header")
args = append(args, k+"="+v)
}

if fInsecure {
args = append(args, "--fleet-server-insecure-http")
}
Expand Down Expand Up @@ -211,6 +220,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
enrollmentToken, _ := cmd.Flags().GetString("enrollment-token")
fServer, _ := cmd.Flags().GetString("fleet-server-es")
fElasticSearchCA, _ := cmd.Flags().GetString("fleet-server-es-ca")
fHeaders, _ := cmd.Flags().GetStringSlice("header")
fServiceToken, _ := cmd.Flags().GetString("fleet-server-service-token")
fPolicy, _ := cmd.Flags().GetString("fleet-server-policy")
fHost, _ := cmd.Flags().GetString("fleet-server-host")
Expand Down Expand Up @@ -246,6 +256,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
CertKey: fCertKey,
Insecure: fInsecure,
SpawnAgent: !fromInstall,
Headers: mapFromEnvList(fHeaders),
},
}

Expand Down Expand Up @@ -285,3 +296,16 @@ func handleSignal(ctx context.Context) context.Context {

return ctx
}

func mapFromEnvList(envList []string) map[string]string {
m := make(map[string]string)
for _, kv := range envList {
keyValue := strings.SplitN(kv, "=", 2)
if len(keyValue) != 2 {
continue
}

m[keyValue[0]] = keyValue[1]
}
return m
}
31 changes: 26 additions & 5 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type enrollCmdFleetServerOption struct {
CertKey string
Insecure bool
SpawnAgent bool
Headers map[string]string
}

// enrollCmdOption define all the supported enrollment option.
Expand Down Expand Up @@ -232,7 +233,8 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken,
c.options.FleetServer.PolicyID,
c.options.FleetServer.Host, c.options.FleetServer.Port,
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA)
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA,
c.options.FleetServer.Headers)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -412,7 +414,7 @@ func (c *enrollCmd) enroll(ctx context.Context, persistentConfig map[string]inte
return err
}

agentConfig, err := c.createAgentConfig(resp.Item.ID, persistentConfig)
agentConfig, err := c.createAgentConfig(resp.Item.ID, persistentConfig, c.options.FleetServer.Headers)
if err != nil {
return err
}
Expand All @@ -422,7 +424,8 @@ func (c *enrollCmd) enroll(ctx context.Context, persistentConfig map[string]inte
c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken,
c.options.FleetServer.PolicyID,
c.options.FleetServer.Host, c.options.FleetServer.Port,
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA)
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA,
c.options.FleetServer.Headers)
if err != nil {
return err
}
Expand Down Expand Up @@ -717,7 +720,12 @@ func storeAgentInfo(s saver, reader io.Reader) error {
return nil
}

func createFleetServerBootstrapConfig(connStr string, serviceToken string, policyID string, host string, port uint16, cert string, key string, esCA string) (*configuration.FleetAgentConfig, error) {
func createFleetServerBootstrapConfig(
connStr, serviceToken, policyID, host string,
port uint16,
cert, key, esCA string,
headers map[string]string,
) (*configuration.FleetAgentConfig, error) {
es, err := configuration.ElasticsearchFromConnStr(connStr, serviceToken)
if err != nil {
return nil, err
Expand All @@ -733,6 +741,15 @@ func createFleetServerBootstrapConfig(connStr string, serviceToken string, polic
if port == 0 {
port = defaultFleetServerPort
}
if len(headers) > 0 {
if es.Headers == nil {
es.Headers = make(map[string]string)
}
// overwrites previously set headers
for k, v := range headers {
es.Headers[k] = v
}
}
cfg := configuration.DefaultFleetAgentConfig()
cfg.Enabled = true
cfg.Server = &configuration.FleetServerConfig{
Expand Down Expand Up @@ -773,11 +790,15 @@ func createFleetConfigFromEnroll(accessAPIKey string, cli remote.Config) (*confi
return cfg, nil
}

func (c *enrollCmd) createAgentConfig(agentID string, pc map[string]interface{}) (map[string]interface{}, error) {
func (c *enrollCmd) createAgentConfig(agentID string, pc map[string]interface{}, headers map[string]string) (map[string]interface{}, error) {
agentConfig := map[string]interface{}{
"id": agentID,
}

if len(headers) > 0 {
agentConfig["headers"] = headers
}

if c.options.Staging != "" {
staging := fmt.Sprintf("https://staging.elastic.co/%s-%s/downloads/", release.Version(), c.options.Staging[:8])
agentConfig["download"] = map[string]interface{}{
Expand Down
Loading

0 comments on commit c9f0fc8

Please sign in to comment.