Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #26275 to 7.13: Enable agent to send custom headers to kibana/ES #26362

Merged
merged 2 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,4 @@
- Add --fleet-server-service-token and FLEET_SERVER_SERVICE_TOKEN options {pull}25083[25083]
- Keep http and logging config during enroll {pull}25132[25132]
- Log output of container to $LOGS_PATH/elastic-agent-start.log when LOGS_PATH set {pull}25150[25150]
- Enable agent to send custom headers to kibana/ES {pull}26275[26275]
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const maxRetriesloadAgentInfo = 5

type persistentAgentInfo struct {
ID string `json:"id" yaml:"id" config:"id"`
Headers map[string]string `json:"headers" yaml:"headers" config:"headers"`
LogLevel string `json:"logging.level,omitempty" yaml:"logging.level,omitempty" config:"logging.level,omitempty"`
MonitoringHTTP *monitoringConfig.MonitoringHTTPConfig `json:"monitoring.http,omitempty" yaml:"monitoring.http,omitempty" config:"monitoring.http,omitempty"`
}
Expand Down
7 changes: 7 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/info/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type AgentInfo struct {
agentID string
logLevel string
headers map[string]string
}

// NewAgentInfoWithLog creates a new agent information.
Expand All @@ -30,6 +31,7 @@ func NewAgentInfoWithLog(level string, createAgentID bool) (*AgentInfo, error) {
return &AgentInfo{
agentID: agentInfo.ID,
logLevel: agentInfo.LogLevel,
headers: agentInfo.Headers,
}, nil
}

Expand Down Expand Up @@ -84,3 +86,8 @@ func (*AgentInfo) Version() string {
func (*AgentInfo) Snapshot() bool {
return release.Snapshot()
}

// Headers returns custom headers used to communicate with elasticsearch.
func (i *AgentInfo) Headers() map[string]string {
return i.headers
}
142 changes: 31 additions & 111 deletions x-pack/elastic-agent/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
}

if cfg.Kibana.Fleet.Setup {
client, err = kibanaClient(cfg.Kibana)
client, err = kibanaClient(cfg.Kibana, cfg.Kibana.Headers)
if err != nil {
return err
}
Expand All @@ -286,7 +286,7 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
token := cfg.Fleet.EnrollmentToken
if token == "" && !cfg.FleetServer.Enable {
if client == nil {
client, err = kibanaClient(cfg.Kibana)
client, err = kibanaClient(cfg.Kibana, cfg.Kibana.Headers)
if err != nil {
return err
}
Expand Down Expand Up @@ -363,6 +363,11 @@ func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string,
if cfg.FleetServer.CertKey != "" {
args = append(args, "--fleet-server-cert-key", cfg.FleetServer.CertKey)
}

for k, v := range cfg.FleetServer.Headers {
args = append(args, "--header", k+"="+v)
}

if cfg.Fleet.URL != "" {
args = append(args, "--url", cfg.Fleet.URL)
}
Expand Down Expand Up @@ -444,19 +449,21 @@ func kibanaFetchToken(cfg setupConfig, client *kibana.Client, policy *kibanaPoli
return keyDetail.Item.APIKey, nil
}

func kibanaClient(cfg kibanaConfig) (*kibana.Client, error) {
func kibanaClient(cfg kibanaConfig, headers map[string]string) (*kibana.Client, error) {
var tls *tlscommon.Config
if cfg.Fleet.CA != "" {
tls = &tlscommon.Config{
CAs: []string{cfg.Fleet.CA},
}
}

return kibana.NewClientWithConfig(&kibana.ClientConfig{
Host: cfg.Fleet.Host,
Username: cfg.Fleet.Username,
Password: cfg.Fleet.Password,
IgnoreVersion: true,
TLS: tls,
Headers: headers,
})
}

Expand Down Expand Up @@ -518,6 +525,27 @@ func envBool(keys ...string) bool {
return false
}

func envMap(key string) map[string]string {
m := make(map[string]string)
prefix := key + "="
for _, env := range os.Environ() {
if !strings.HasPrefix(env, prefix) {
continue
}

envVal := strings.TrimPrefix(env, prefix)

keyValue := strings.SplitN(envVal, "=", 2)
if len(keyValue) != 2 {
continue
}

m[keyValue[0]] = keyValue[1]
}

return m
}

func isTrue(val string) bool {
trueVals := []string{"1", "true", "yes", "y"}
val = strings.ToLower(val)
Expand Down Expand Up @@ -815,114 +843,6 @@ type kibanaAPIKeyDetail struct {
Item kibanaAPIKey `json:"item"`
}

// setup configuration

type setupConfig struct {
Fleet fleetConfig `config:"fleet"`
FleetServer fleetServerConfig `config:"fleet_server"`
Kibana kibanaConfig `config:"kibana"`
}

type elasticsearchConfig struct {
CA string `config:"ca"`
Host string `config:"host"`
Username string `config:"username"`
Password string `config:"password"`
ServiceToken string `config:"service_token"`
}

type fleetConfig struct {
CA string `config:"ca"`
Enroll bool `config:"enroll"`
EnrollmentToken string `config:"enrollment_token"`
Force bool `config:"force"`
Insecure bool `config:"insecure"`
TokenName string `config:"token_name"`
TokenPolicyName string `config:"token_policy_name"`
URL string `config:"url"`
}

type fleetServerConfig struct {
Cert string `config:"cert"`
CertKey string `config:"cert_key"`
Elasticsearch elasticsearchConfig `config:"elasticsearch"`
Enable bool `config:"enable"`
Host string `config:"host"`
InsecureHTTP bool `config:"insecure_http"`
PolicyID string `config:"policy_id"`
Port string `config:"port"`
}

type kibanaConfig struct {
Fleet kibanaFleetConfig `config:"fleet"`
RetrySleepDuration time.Duration `config:"retry_sleep_duration"`
RetryMaxCount int `config:"retry_max_count"`
}

type kibanaFleetConfig struct {
CA string `config:"ca"`
Host string `config:"host"`
Password string `config:"password"`
Setup bool `config:"setup"`
Username string `config:"username"`
}

func defaultAccessConfig() (setupConfig, error) {
retrySleepDuration, err := envDurationWithDefault(defaultRequestRetrySleep, requestRetrySleepEnv)
if err != nil {
return setupConfig{}, err
}

retryMaxCount, err := envIntWithDefault(defaultMaxRequestRetries, maxRequestRetriesEnv)
if err != nil {
return setupConfig{}, err
}

cfg := setupConfig{
Fleet: fleetConfig{
CA: envWithDefault("", "FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
Enroll: envBool("FLEET_ENROLL", "FLEET_SERVER_ENABLE"),
EnrollmentToken: envWithDefault("", "FLEET_ENROLLMENT_TOKEN"),
Force: envBool("FLEET_FORCE"),
Insecure: envBool("FLEET_INSECURE"),
TokenName: envWithDefault("Default", "FLEET_TOKEN_NAME"),
TokenPolicyName: envWithDefault("", "FLEET_TOKEN_POLICY_NAME"),
URL: envWithDefault("", "FLEET_URL"),
},
FleetServer: fleetServerConfig{
Cert: envWithDefault("", "FLEET_SERVER_CERT"),
CertKey: envWithDefault("", "FLEET_SERVER_CERT_KEY"),
Elasticsearch: elasticsearchConfig{
Host: envWithDefault("http://elasticsearch:9200", "FLEET_SERVER_ELASTICSEARCH_HOST", "ELASTICSEARCH_HOST"),
Username: envWithDefault("elastic", "FLEET_SERVER_ELASTICSEARCH_USERNAME", "ELASTICSEARCH_USERNAME"),
Password: envWithDefault("changeme", "FLEET_SERVER_ELASTICSEARCH_PASSWORD", "ELASTICSEARCH_PASSWORD"),
ServiceToken: envWithDefault("", "FLEET_SERVER_SERVICE_TOKEN"),
CA: envWithDefault("", "FLEET_SERVER_ELASTICSEARCH_CA", "ELASTICSEARCH_CA"),
},
Enable: envBool("FLEET_SERVER_ENABLE"),
Host: envWithDefault("", "FLEET_SERVER_HOST"),
InsecureHTTP: envBool("FLEET_SERVER_INSECURE_HTTP"),
PolicyID: envWithDefault("", "FLEET_SERVER_POLICY_ID", "FLEET_SERVER_POLICY"),
Port: envWithDefault("", "FLEET_SERVER_PORT"),
},
Kibana: kibanaConfig{
Fleet: kibanaFleetConfig{
// Remove FLEET_SETUP in 8.x
// The FLEET_SETUP environment variable boolean is a fallback to the old name. The name was updated to
// reflect that its setting up Fleet in Kibana versus setting up Fleet Server.
Setup: envBool("KIBANA_FLEET_SETUP", "FLEET_SETUP"),
Host: envWithDefault("http://kibana:5601", "KIBANA_FLEET_HOST", "KIBANA_HOST"),
Username: envWithDefault("elastic", "KIBANA_FLEET_USERNAME", "KIBANA_USERNAME", "ELASTICSEARCH_USERNAME"),
Password: envWithDefault("changeme", "KIBANA_FLEET_PASSWORD", "KIBANA_PASSWORD", "ELASTICSEARCH_PASSWORD"),
CA: envWithDefault("", "KIBANA_FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
},
RetrySleepDuration: retrySleepDuration,
RetryMaxCount: retryMaxCount,
},
}
return cfg, nil
}

func envDurationWithDefault(defVal string, keys ...string) (time.Duration, error) {
valStr := defVal
for _, key := range keys {
Expand Down
24 changes: 24 additions & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
Expand Down Expand Up @@ -59,6 +60,7 @@ func addEnrollFlags(cmd *cobra.Command) {
cmd.Flags().Uint16P("fleet-server-port", "", 0, "Fleet Server HTTP binding port (overrides the policy)")
cmd.Flags().StringP("fleet-server-cert", "", "", "Certificate to use for exposed Fleet Server HTTPS endpoint")
cmd.Flags().StringP("fleet-server-cert-key", "", "", "Private key to use for exposed Fleet Server HTTPS endpoint")
cmd.Flags().StringSliceP("header", "", []string{}, "Headers used in communication with elasticsearch")
cmd.Flags().BoolP("fleet-server-insecure-http", "", false, "Expose Fleet Server over HTTP (not recommended; insecure)")
cmd.Flags().StringP("certificate-authorities", "a", "", "Comma separated list of root certificate for server verifications")
cmd.Flags().StringP("ca-sha256", "p", "", "Comma separated list of certificate authorities hash pins used for certificate verifications")
Expand All @@ -81,6 +83,7 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
fPort, _ := cmd.Flags().GetUint16("fleet-server-port")
fCert, _ := cmd.Flags().GetString("fleet-server-cert")
fCertKey, _ := cmd.Flags().GetString("fleet-server-cert-key")
fHeaders, _ := cmd.Flags().GetStringSlice("header")
fInsecure, _ := cmd.Flags().GetBool("fleet-server-insecure-http")
ca, _ := cmd.Flags().GetString("certificate-authorities")
sha256, _ := cmd.Flags().GetString("ca-sha256")
Expand Down Expand Up @@ -128,6 +131,12 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
args = append(args, "--fleet-server-cert-key")
args = append(args, fCertKey)
}

for k, v := range mapFromEnvList(fHeaders) {
args = append(args, "--header")
args = append(args, k+"="+v)
}

if fInsecure {
args = append(args, "--fleet-server-insecure-http")
}
Expand Down Expand Up @@ -211,6 +220,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
enrollmentToken, _ := cmd.Flags().GetString("enrollment-token")
fServer, _ := cmd.Flags().GetString("fleet-server-es")
fElasticSearchCA, _ := cmd.Flags().GetString("fleet-server-es-ca")
fHeaders, _ := cmd.Flags().GetStringSlice("header")
fServiceToken, _ := cmd.Flags().GetString("fleet-server-service-token")
fPolicy, _ := cmd.Flags().GetString("fleet-server-policy")
fHost, _ := cmd.Flags().GetString("fleet-server-host")
Expand Down Expand Up @@ -246,6 +256,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
CertKey: fCertKey,
Insecure: fInsecure,
SpawnAgent: !fromInstall,
Headers: mapFromEnvList(fHeaders),
},
}

Expand Down Expand Up @@ -285,3 +296,16 @@ func handleSignal(ctx context.Context) context.Context {

return ctx
}

func mapFromEnvList(envList []string) map[string]string {
m := make(map[string]string)
for _, kv := range envList {
keyValue := strings.SplitN(kv, "=", 2)
if len(keyValue) != 2 {
continue
}

m[keyValue[0]] = keyValue[1]
}
return m
}
31 changes: 26 additions & 5 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type enrollCmdFleetServerOption struct {
CertKey string
Insecure bool
SpawnAgent bool
Headers map[string]string
}

// enrollCmdOption define all the supported enrollment option.
Expand Down Expand Up @@ -232,7 +233,8 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken,
c.options.FleetServer.PolicyID,
c.options.FleetServer.Host, c.options.FleetServer.Port,
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA)
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA,
c.options.FleetServer.Headers)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -412,7 +414,7 @@ func (c *enrollCmd) enroll(ctx context.Context, persistentConfig map[string]inte
return err
}

agentConfig, err := c.createAgentConfig(resp.Item.ID, persistentConfig)
agentConfig, err := c.createAgentConfig(resp.Item.ID, persistentConfig, c.options.FleetServer.Headers)
if err != nil {
return err
}
Expand All @@ -422,7 +424,8 @@ func (c *enrollCmd) enroll(ctx context.Context, persistentConfig map[string]inte
c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken,
c.options.FleetServer.PolicyID,
c.options.FleetServer.Host, c.options.FleetServer.Port,
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA)
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA,
c.options.FleetServer.Headers)
if err != nil {
return err
}
Expand Down Expand Up @@ -717,7 +720,12 @@ func storeAgentInfo(s saver, reader io.Reader) error {
return nil
}

func createFleetServerBootstrapConfig(connStr string, serviceToken string, policyID string, host string, port uint16, cert string, key string, esCA string) (*configuration.FleetAgentConfig, error) {
func createFleetServerBootstrapConfig(
connStr, serviceToken, policyID, host string,
port uint16,
cert, key, esCA string,
headers map[string]string,
) (*configuration.FleetAgentConfig, error) {
es, err := configuration.ElasticsearchFromConnStr(connStr, serviceToken)
if err != nil {
return nil, err
Expand All @@ -733,6 +741,15 @@ func createFleetServerBootstrapConfig(connStr string, serviceToken string, polic
if port == 0 {
port = defaultFleetServerPort
}
if len(headers) > 0 {
if es.Headers == nil {
es.Headers = make(map[string]string)
}
// overwrites previously set headers
for k, v := range headers {
es.Headers[k] = v
}
}
cfg := configuration.DefaultFleetAgentConfig()
cfg.Enabled = true
cfg.Server = &configuration.FleetServerConfig{
Expand Down Expand Up @@ -773,11 +790,15 @@ func createFleetConfigFromEnroll(accessAPIKey string, cli remote.Config) (*confi
return cfg, nil
}

func (c *enrollCmd) createAgentConfig(agentID string, pc map[string]interface{}) (map[string]interface{}, error) {
func (c *enrollCmd) createAgentConfig(agentID string, pc map[string]interface{}, headers map[string]string) (map[string]interface{}, error) {
agentConfig := map[string]interface{}{
"id": agentID,
}

if len(headers) > 0 {
agentConfig["headers"] = headers
}

if c.options.Staging != "" {
staging := fmt.Sprintf("https://staging.elastic.co/%s-%s/downloads/", release.Version(), c.options.Staging[:8])
agentConfig["download"] = map[string]interface{}{
Expand Down
Loading