From ed693e9363231ce318da5e23bffe68256065f298 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 26 Feb 2020 10:55:27 -0500 Subject: [PATCH] Add cloudfoundry input for filebeat (#16586) * Add cloudfoundry input. * Fix issue with http transport. * Add input documentation. --- filebeat/docs/filebeat-options.asciidoc | 3 + .../docs/inputs/input-cloudfoundry.asciidoc | 93 ++++++++++++++ x-pack/filebeat/include/list.go | 1 + x-pack/filebeat/input/cloudfoundry/input.go | 120 ++++++++++++++++++ x-pack/libbeat/common/cloudfoundry/hub.go | 13 +- 5 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc create mode 100644 x-pack/filebeat/input/cloudfoundry/input.go diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index ca54a8a4ff1..ac011960a6a 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -56,6 +56,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-netflow>> * <<{beatname_lc}-input-google-pubsub>> * <<{beatname_lc}-input-azure-eventhub>> +* <<{beatname_lc}-input-cloudfoundry>> include::inputs/input-log.asciidoc[] @@ -85,3 +86,5 @@ include::../../x-pack/filebeat/docs/inputs/input-netflow.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-google-pubsub.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-azure-eventhub.asciidoc[] + +include::../../x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc[] diff --git a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc new file mode 100644 index 00000000000..6261f5b2d6a --- /dev/null +++ b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc @@ -0,0 +1,93 @@ +[role="xpack"] + +:type: cloudfoundry + +[id="{beatname_lc}-input-{type}"] +=== Cloud Foundry input + +++++ +Cloud Foundry +++++ + +experimental[] + +Use the `cloudfoundry` input to get http access logs, container logs and error logs from Cloud Foundry. Connects to +the Cloud Foundry loggregator to receive events. + +Example configurations: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: cloudfoundry + api_address: https://api.dev.cfdev.sh + client_id: uaa-filebeat + client_secret: verysecret + ssl: + verification_mode: none +---- + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: cloudfoundry + api_address: https://api.dev.cfdev.sh + client_id: uaa-filebeat + client_secret: verysecret + ssl.certificate_authorities: ["/etc/pki/cf/ca.pem"] + ssl.certificate: "/etc/pki/cf/cert.pem" + ssl.key: "/etc/pki/cf/cert.key" + +---- + + +==== Configuration options + +The `cloudfoundry` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +==== `api_address` + +The URL of the Cloud Foundry API. Optional. Default: "http://api.bosh-lite.com". + +[float] +==== `doppler_address` + +The URL of the Cloud Foundry Doppler Websocket. Optional. Default: "(value from ${api_address}/v2/info)". + +[float] +==== `uaa_address` + +The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_address}/v2/info)". + +[float] +==== `rlp_address` + +The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)". + +[float] +==== `client_id` + +Client ID to authenticate with Cloud Foundry. Default: "". + +[float] +==== `client_secret` + +Client Secret to authenticate with Cloud Foundry. Default: "". + +[float] +==== `shard_id` + +Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events +from the RLP Gateway. Default: "(generated UUID)". + +[float] +==== `ssl` + +This specifies SSL/TLS common config. Default: not used. + +[id="{beatname_lc}-input-{type}-common-options"] +include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] + +:type!: diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index e92b3ceb7d5..457d2fce773 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -9,6 +9,7 @@ package include import ( // Import packages that need to register themselves. _ "github.com/elastic/beats/x-pack/filebeat/input/azureeventhub" + _ "github.com/elastic/beats/x-pack/filebeat/input/cloudfoundry" _ "github.com/elastic/beats/x-pack/filebeat/input/googlepubsub" _ "github.com/elastic/beats/x-pack/filebeat/input/httpjson" _ "github.com/elastic/beats/x-pack/filebeat/input/netflow" diff --git a/x-pack/filebeat/input/cloudfoundry/input.go b/x-pack/filebeat/input/cloudfoundry/input.go new file mode 100644 index 00000000000..fb047a92503 --- /dev/null +++ b/x-pack/filebeat/input/cloudfoundry/input.go @@ -0,0 +1,120 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "context" + "sync" + + "github.com/elastic/beats/x-pack/libbeat/common/cloudfoundry" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +func init() { + err := input.Register("cloudfoundry", NewInput) + if err != nil { + panic(err) + } +} + +// Input defines a udp input to receive event on a specific host:port. +type Input struct { + sync.Mutex + listener *cloudfoundry.RlpListener + started bool + log *logp.Logger + outlet channel.Outleter +} + +// NewInput creates a new udp input +func NewInput( + cfg *common.Config, + outlet channel.Connector, + context input.Context, +) (input.Input, error) { + log := logp.NewLogger("cloudfoundry") + + out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: context.DynamicFields, + }, + }) + if err != nil { + return nil, err + } + + var conf cloudfoundry.Config + if err = cfg.Unpack(&conf); err != nil { + return nil, err + } + + hub := cloudfoundry.NewHub(&conf, "filebeat", log) + forwarder := harvester.NewForwarder(out) + callbacks := cloudfoundry.RlpListenerCallbacks{ + HttpAccess: func(evt *cloudfoundry.EventHttpAccess) { + forwarder.Send(beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), + }) + }, + Log: func(evt *cloudfoundry.EventLog) { + forwarder.Send(beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), + }) + }, + Error: func(evt *cloudfoundry.EventError) { + forwarder.Send(beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), + }) + }, + } + + listener, err := hub.RlpListener(callbacks) + if err != nil { + return nil, err + } + return &Input{ + outlet: out, + listener: listener, + started: false, + log: log, + }, nil +} + +// Run starts and start the UDP server and read events from the socket +func (p *Input) Run() { + p.Lock() + defer p.Unlock() + + if !p.started { + p.log.Info("starting cloudfoundry input") + p.listener.Start(context.TODO()) + p.started = true + } +} + +// Stop stops the UDP input +func (p *Input) Stop() { + defer p.outlet.Close() + p.Lock() + defer p.Unlock() + + p.log.Info("stopping cloudfoundry input") + p.listener.Stop() + p.started = false +} + +// Wait suspends the UDP input +func (p *Input) Wait() { + p.Stop() +} diff --git a/x-pack/libbeat/common/cloudfoundry/hub.go b/x-pack/libbeat/common/cloudfoundry/hub.go index ed7740c464f..467cbe07ce1 100644 --- a/x-pack/libbeat/common/cloudfoundry/hub.go +++ b/x-pack/libbeat/common/cloudfoundry/hub.go @@ -127,7 +127,18 @@ func (h *Hub) httpClient() (*http.Client, bool, error) { return nil, true, err } httpClient := cfclient.DefaultConfig().HttpClient - tp := httpClient.Transport.(*http.Transport) + tp := defaultTransport() tp.TLSClientConfig = tls + httpClient.Transport = tp return httpClient, tls.InsecureSkipVerify, nil } + +// defaultTransport returns a new http.Transport for http.Client +func defaultTransport() *http.Transport { + defaultTransport := http.DefaultTransport.(*http.Transport) + return &http.Transport{ + Proxy: defaultTransport.Proxy, + TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout, + ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout, + } +}