Skip to content

Commit

Permalink
Improve http output handling
Browse files Browse the repository at this point in the history
* Add MaxConcurrentRequests configuration per output in order to limit
  the number of requests/connections.
* Refactor HTTP auth headers handling, eliminate mutex on that code
  path.
* Extract common configuration for http, refactor NewClient to avoid
  adding one more parameter.
* Refactor default configuration definition in order to avoid typos with
  repetitive Output name prefix and avoid repetitive use of defaults

Signed-off-by: Aleksandr Maus <aleksandr.maus@elastic.co>
  • Loading branch information
aleksmaus committed Aug 15, 2024
1 parent a36af89 commit 261eb9d
Show file tree
Hide file tree
Showing 28 changed files with 712 additions and 707 deletions.
762 changes: 397 additions & 365 deletions config.go

Large diffs are not rendered by default.

68 changes: 34 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func init() {

if config.Slack.WebhookURL != "" {
var err error
slackClient, err = outputs.NewClient("Slack", config.Slack.WebhookURL, config.Slack.MutualTLS, config.Slack.CheckCert, *initClientArgs)
slackClient, err = outputs.NewClient("Slack", config.Slack.WebhookURL, config.Slack.CommonConfig, *initClientArgs)
if err != nil {
config.Slack.WebhookURL = ""
} else {
Expand All @@ -157,7 +157,7 @@ func init() {

if config.Cliq.WebhookURL != "" {
var err error
cliqClient, err = outputs.NewClient("Cliq", config.Cliq.WebhookURL, config.Cliq.MutualTLS, config.Cliq.CheckCert, *initClientArgs)
cliqClient, err = outputs.NewClient("Cliq", config.Cliq.WebhookURL, config.Cliq.CommonConfig, *initClientArgs)
if err != nil {
config.Cliq.WebhookURL = ""
} else {
Expand All @@ -167,7 +167,7 @@ func init() {

if config.Rocketchat.WebhookURL != "" {
var err error
rocketchatClient, err = outputs.NewClient("Rocketchat", config.Rocketchat.WebhookURL, config.Rocketchat.MutualTLS, config.Rocketchat.CheckCert, *initClientArgs)
rocketchatClient, err = outputs.NewClient("Rocketchat", config.Rocketchat.WebhookURL, config.Rocketchat.CommonConfig, *initClientArgs)
if err != nil {
config.Rocketchat.WebhookURL = ""
} else {
Expand All @@ -177,7 +177,7 @@ func init() {

if config.Mattermost.WebhookURL != "" {
var err error
mattermostClient, err = outputs.NewClient("Mattermost", config.Mattermost.WebhookURL, config.Mattermost.MutualTLS, config.Mattermost.CheckCert, *initClientArgs)
mattermostClient, err = outputs.NewClient("Mattermost", config.Mattermost.WebhookURL, config.Mattermost.CommonConfig, *initClientArgs)
if err != nil {
config.Mattermost.WebhookURL = ""
} else {
Expand All @@ -187,7 +187,7 @@ func init() {

if config.Teams.WebhookURL != "" {
var err error
teamsClient, err = outputs.NewClient("Teams", config.Teams.WebhookURL, config.Teams.MutualTLS, config.Teams.CheckCert, *initClientArgs)
teamsClient, err = outputs.NewClient("Teams", config.Teams.WebhookURL, config.Teams.CommonConfig, *initClientArgs)
if err != nil {
config.Teams.WebhookURL = ""
} else {
Expand All @@ -198,7 +198,7 @@ func init() {
if config.Datadog.APIKey != "" {
var err error
endpointUrl := fmt.Sprintf("%s?api_key=%s", config.Datadog.Host+outputs.DatadogPath, config.Datadog.APIKey)
datadogClient, err = outputs.NewClient("Datadog", endpointUrl, config.Datadog.MutualTLS, config.Datadog.CheckCert, *initClientArgs)
datadogClient, err = outputs.NewClient("Datadog", endpointUrl, config.Datadog.CommonConfig, *initClientArgs)
if err != nil {
config.Datadog.APIKey = ""
} else {
Expand All @@ -208,7 +208,7 @@ func init() {

if config.Discord.WebhookURL != "" {
var err error
discordClient, err = outputs.NewClient("Discord", config.Discord.WebhookURL, config.Discord.MutualTLS, config.Discord.CheckCert, *initClientArgs)
discordClient, err = outputs.NewClient("Discord", config.Discord.WebhookURL, config.Discord.CommonConfig, *initClientArgs)
if err != nil {
config.Discord.WebhookURL = ""
} else {
Expand All @@ -219,7 +219,7 @@ func init() {
if config.Alertmanager.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s%s", config.Alertmanager.HostPort, config.Alertmanager.Endpoint)
alertmanagerClient, err = outputs.NewClient("AlertManager", endpointUrl, config.Alertmanager.MutualTLS, config.Alertmanager.CheckCert, *initClientArgs)
alertmanagerClient, err = outputs.NewClient("AlertManager", endpointUrl, config.Alertmanager.CommonConfig, *initClientArgs)
if err != nil {
config.Alertmanager.HostPort = ""
} else {
Expand All @@ -230,7 +230,7 @@ func init() {
if config.Elasticsearch.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/%s/%s", config.Elasticsearch.HostPort, config.Elasticsearch.Index, config.Elasticsearch.Type)
elasticsearchClient, err = outputs.NewClient("Elasticsearch", endpointUrl, config.Elasticsearch.MutualTLS, config.Elasticsearch.CheckCert, *initClientArgs)
elasticsearchClient, err = outputs.NewClient("Elasticsearch", endpointUrl, config.Elasticsearch.CommonConfig, *initClientArgs)
if err != nil {
config.Elasticsearch.HostPort = ""
} else {
Expand All @@ -251,7 +251,7 @@ func init() {
var err error

endpointUrl := fmt.Sprintf("%s/%s/%s/ingest", config.Quickwit.HostPort, config.Quickwit.ApiEndpoint, config.Quickwit.Index)
quickwitClient, err = outputs.NewClient("Quickwit", endpointUrl, config.Quickwit.MutualTLS, config.Quickwit.CheckCert, *initClientArgs)
quickwitClient, err = outputs.NewClient("Quickwit", endpointUrl, config.Quickwit.CommonConfig, *initClientArgs)
if err == nil && config.Quickwit.AutoCreateIndex {
err = quickwitClient.AutoCreateQuickwitIndex(*initClientArgs)
}
Expand All @@ -265,7 +265,7 @@ func init() {

if config.Loki.HostPort != "" {
var err error
lokiClient, err = outputs.NewClient("Loki", config.Loki.HostPort+config.Loki.Endpoint, config.Loki.MutualTLS, config.Loki.CheckCert, *initClientArgs)
lokiClient, err = outputs.NewClient("Loki", config.Loki.HostPort+config.Loki.Endpoint, config.Loki.CommonConfig, *initClientArgs)
if err != nil {
config.Loki.HostPort = ""
} else {
Expand All @@ -275,7 +275,7 @@ func init() {

if config.SumoLogic.ReceiverURL != "" {
var err error
sumologicClient, err = outputs.NewClient("SumoLogic", config.SumoLogic.ReceiverURL, false, config.SumoLogic.CheckCert, *initClientArgs)
sumologicClient, err = outputs.NewClient("SumoLogic", config.SumoLogic.ReceiverURL, config.SumoLogic.CommonConfig, *initClientArgs)
if err != nil {
config.SumoLogic.ReceiverURL = ""
} else {
Expand All @@ -285,7 +285,7 @@ func init() {

if config.Nats.HostPort != "" {
var err error
natsClient, err = outputs.NewClient("NATS", config.Nats.HostPort, config.Nats.MutualTLS, config.Nats.CheckCert, *initClientArgs)
natsClient, err = outputs.NewClient("NATS", config.Nats.HostPort, config.Nats.CommonConfig, *initClientArgs)
if err != nil {
config.Nats.HostPort = ""
} else {
Expand All @@ -295,7 +295,7 @@ func init() {

if config.Stan.HostPort != "" && config.Stan.ClusterID != "" && config.Stan.ClientID != "" {
var err error
stanClient, err = outputs.NewClient("STAN", config.Stan.HostPort, config.Stan.MutualTLS, config.Stan.CheckCert, *initClientArgs)
stanClient, err = outputs.NewClient("STAN", config.Stan.HostPort, config.Stan.CommonConfig, *initClientArgs)
if err != nil {
config.Stan.HostPort = ""
config.Stan.ClusterID = ""
Expand All @@ -320,7 +320,7 @@ func init() {
}

var err error
influxdbClient, err = outputs.NewClient("Influxdb", url, config.Influxdb.MutualTLS, config.Influxdb.CheckCert, *initClientArgs)
influxdbClient, err = outputs.NewClient("Influxdb", url, config.Influxdb.CommonConfig, *initClientArgs)
if err != nil {
config.Influxdb.HostPort = ""
} else {
Expand Down Expand Up @@ -402,7 +402,7 @@ func init() {
if strings.ToLower(config.Opsgenie.Region) == "eu" {
url = "https://api.eu.opsgenie.com/v2/alerts"
}
opsgenieClient, err = outputs.NewClient("Opsgenie", url, config.Opsgenie.MutualTLS, config.Opsgenie.CheckCert, *initClientArgs)
opsgenieClient, err = outputs.NewClient("Opsgenie", url, config.Opsgenie.CommonConfig, *initClientArgs)
if err != nil {
config.Opsgenie.APIKey = ""
} else {
Expand All @@ -412,7 +412,7 @@ func init() {

if config.Webhook.Address != "" {
var err error
webhookClient, err = outputs.NewClient("Webhook", config.Webhook.Address, config.Webhook.MutualTLS, config.Webhook.CheckCert, *initClientArgs)
webhookClient, err = outputs.NewClient("Webhook", config.Webhook.Address, config.Webhook.CommonConfig, *initClientArgs)
if err != nil {
config.Webhook.Address = ""
} else {
Expand All @@ -422,7 +422,7 @@ func init() {

if config.NodeRed.Address != "" {
var err error
noderedClient, err = outputs.NewClient("NodeRed", config.NodeRed.Address, false, config.NodeRed.CheckCert, *initClientArgs)
noderedClient, err = outputs.NewClient("NodeRed", config.NodeRed.Address, config.NodeRed.CommonConfig, *initClientArgs)
if err != nil {
config.NodeRed.Address = ""
} else {
Expand All @@ -432,7 +432,7 @@ func init() {

if config.CloudEvents.Address != "" {
var err error
cloudeventsClient, err = outputs.NewClient("CloudEvents", config.CloudEvents.Address, config.CloudEvents.MutualTLS, config.CloudEvents.CheckCert, *initClientArgs)
cloudeventsClient, err = outputs.NewClient("CloudEvents", config.CloudEvents.Address, config.CloudEvents.CommonConfig, *initClientArgs)
if err != nil {
config.CloudEvents.Address = ""
} else {
Expand Down Expand Up @@ -478,7 +478,7 @@ func init() {
var err error
var outputName = "GCPCloudRun"

gcpCloudRunClient, err = outputs.NewClient(outputName, config.GCP.CloudRun.Endpoint, false, false, *initClientArgs)
gcpCloudRunClient, err = outputs.NewClient(outputName, config.GCP.CloudRun.Endpoint, types.CommonConfig{}, *initClientArgs)

if err != nil {
config.GCP.CloudRun.Endpoint = ""
Expand All @@ -489,7 +489,7 @@ func init() {

if config.Googlechat.WebhookURL != "" {
var err error
googleChatClient, err = outputs.NewClient("Googlechat", config.Googlechat.WebhookURL, config.Googlechat.MutualTLS, config.Googlechat.CheckCert, *initClientArgs)
googleChatClient, err = outputs.NewClient("Googlechat", config.Googlechat.WebhookURL, config.Googlechat.CommonConfig, *initClientArgs)
if err != nil {
config.Googlechat.WebhookURL = ""
} else {
Expand All @@ -509,7 +509,7 @@ func init() {

if config.KafkaRest.Address != "" {
var err error
kafkaRestClient, err = outputs.NewClient("KafkaRest", config.KafkaRest.Address, config.KafkaRest.MutualTLS, config.KafkaRest.CheckCert, *initClientArgs)
kafkaRestClient, err = outputs.NewClient("KafkaRest", config.KafkaRest.Address, config.KafkaRest.CommonConfig, *initClientArgs)
if err != nil {
config.KafkaRest.Address = ""
} else {
Expand All @@ -522,7 +522,7 @@ func init() {
var url = "https://events.pagerduty.com/v2/enqueue"
var outputName = "Pagerduty"

pagerdutyClient, err = outputs.NewClient(outputName, url, config.Pagerduty.MutualTLS, config.Pagerduty.CheckCert, *initClientArgs)
pagerdutyClient, err = outputs.NewClient(outputName, url, config.Pagerduty.CommonConfig, *initClientArgs)

if err != nil {
config.Pagerduty.RoutingKey = ""
Expand All @@ -545,7 +545,7 @@ func init() {

if config.WebUI.URL != "" {
var err error
webUIClient, err = outputs.NewClient("WebUI", config.WebUI.URL, config.WebUI.MutualTLS, config.WebUI.CheckCert, *initClientArgs)
webUIClient, err = outputs.NewClient("WebUI", config.WebUI.URL, config.WebUI.CommonConfig, *initClientArgs)
if err != nil {
config.WebUI.URL = ""
} else {
Expand Down Expand Up @@ -575,7 +575,7 @@ func init() {

if config.Tekton.EventListener != "" {
var err error
tektonClient, err = outputs.NewClient("Tekton", config.Tekton.EventListener, config.Tekton.MutualTLS, config.Tekton.CheckCert, *initClientArgs)
tektonClient, err = outputs.NewClient("Tekton", config.Tekton.EventListener, config.Tekton.CommonConfig, *initClientArgs)
if err != nil {
log.Printf("[ERROR] : Tekton - %v\n", err)
} else {
Expand Down Expand Up @@ -618,7 +618,7 @@ func init() {
var err error
var outputName = "Grafana"
endpointUrl := fmt.Sprintf("%s/api/annotations", config.Grafana.HostPort)
grafanaClient, err = outputs.NewClient(outputName, endpointUrl, config.Grafana.MutualTLS, config.Grafana.CheckCert, *initClientArgs)
grafanaClient, err = outputs.NewClient(outputName, endpointUrl, config.Grafana.CommonConfig, *initClientArgs)
if err != nil {
config.Grafana.HostPort = ""
config.Grafana.APIKey = ""
Expand All @@ -630,7 +630,7 @@ func init() {
if config.GrafanaOnCall.WebhookURL != "" {
var err error
var outputName = "GrafanaOnCall"
grafanaOnCallClient, err = outputs.NewClient(outputName, config.GrafanaOnCall.WebhookURL, config.GrafanaOnCall.MutualTLS, config.GrafanaOnCall.CheckCert, *initClientArgs)
grafanaOnCallClient, err = outputs.NewClient(outputName, config.GrafanaOnCall.WebhookURL, config.GrafanaOnCall.CommonConfig, *initClientArgs)
if err != nil {
config.GrafanaOnCall.WebhookURL = ""
} else {
Expand Down Expand Up @@ -689,7 +689,7 @@ func init() {
if config.Zincsearch.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/api/%s/_doc", config.Zincsearch.HostPort, config.Zincsearch.Index)
zincsearchClient, err = outputs.NewClient("Zincsearch", endpointUrl, false, config.Zincsearch.CheckCert, *initClientArgs)
zincsearchClient, err = outputs.NewClient("Zincsearch", endpointUrl, types.CommonConfig{CheckCert: config.Zincsearch.CheckCert}, *initClientArgs)
if err != nil {
config.Zincsearch.HostPort = ""
} else {
Expand All @@ -700,7 +700,7 @@ func init() {
if config.Gotify.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/message", config.Gotify.HostPort)
gotifyClient, err = outputs.NewClient("Gotify", endpointUrl, false, config.Gotify.CheckCert, *initClientArgs)
gotifyClient, err = outputs.NewClient("Gotify", endpointUrl, types.CommonConfig{CheckCert: config.Gotify.CheckCert}, *initClientArgs)
if err != nil {
config.Gotify.HostPort = ""
} else {
Expand Down Expand Up @@ -744,7 +744,7 @@ func init() {
var err error
var urlFormat = "https://api.telegram.org/bot%s/sendMessage"

telegramClient, err = outputs.NewClient("Telegram", fmt.Sprintf(urlFormat, config.Telegram.Token), false, config.Telegram.CheckCert, *initClientArgs)
telegramClient, err = outputs.NewClient("Telegram", fmt.Sprintf(urlFormat, config.Telegram.Token), types.CommonConfig{CheckCert: config.Telegram.CheckCert}, *initClientArgs)

if err != nil {
config.Telegram.ChatID = ""
Expand All @@ -758,7 +758,7 @@ func init() {

if config.N8N.Address != "" {
var err error
n8nClient, err = outputs.NewClient("n8n", config.N8N.Address, false, config.N8N.CheckCert, *initClientArgs)
n8nClient, err = outputs.NewClient("n8n", config.N8N.Address, types.CommonConfig{CheckCert: config.N8N.CheckCert}, *initClientArgs)
if err != nil {
config.N8N.Address = ""
} else {
Expand All @@ -769,7 +769,7 @@ func init() {
if config.OpenObserve.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/api/%s/%s/_multi", config.OpenObserve.HostPort, config.OpenObserve.OrganizationName, config.OpenObserve.StreamName)
openObserveClient, err = outputs.NewClient("OpenObserve", endpointUrl, config.OpenObserve.MutualTLS, config.OpenObserve.CheckCert, *initClientArgs)
openObserveClient, err = outputs.NewClient("OpenObserve", endpointUrl, config.OpenObserve.CommonConfig, *initClientArgs)
if err != nil {
config.OpenObserve.HostPort = ""
} else {
Expand All @@ -780,7 +780,7 @@ func init() {
if config.Dynatrace.APIToken != "" && config.Dynatrace.APIUrl != "" {
var err error
dynatraceApiUrl := strings.TrimRight(config.Dynatrace.APIUrl, "/") + "/v2/logs/ingest"
dynatraceClient, err = outputs.NewClient("Dynatrace", dynatraceApiUrl, false, config.Dynatrace.CheckCert, *initClientArgs)
dynatraceClient, err = outputs.NewClient("Dynatrace", dynatraceApiUrl, types.CommonConfig{CheckCert: config.Dynatrace.CheckCert}, *initClientArgs)
if err != nil {
config.Dynatrace.APIToken = ""
config.Dynatrace.APIUrl = ""
Expand All @@ -802,7 +802,7 @@ func init() {

if config.Talon.Address != "" {
var err error
talonClient, err = outputs.NewClient("Talon", config.Talon.Address, false, config.Talon.CheckCert, *initClientArgs)
talonClient, err = outputs.NewClient("Talon", config.Talon.Address, types.CommonConfig{CheckCert: config.Talon.CheckCert}, *initClientArgs)
if err != nil {
config.Talon.Address = ""
} else {
Expand Down
12 changes: 6 additions & 6 deletions outputs/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package outputs
import (
"encoding/json"
"log"
"net/http"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -130,13 +131,12 @@ func newAlertmanagerPayload(falcopayload types.FalcoPayload, config *types.Confi
// AlertmanagerPost posts event to AlertManager
func (c *Client) AlertmanagerPost(falcopayload types.FalcoPayload) {
c.Stats.Alertmanager.Add(Total, 1)
c.httpClientLock.Lock()
defer c.httpClientLock.Unlock()
for i, j := range c.Config.Alertmanager.CustomHeaders {
c.AddHeader(i, j)
}

err := c.Post(newAlertmanagerPayload(falcopayload, c.Config))
err := c.Post(newAlertmanagerPayload(falcopayload, c.Config), func(req *http.Request) {
for i, j := range c.Config.Alertmanager.CustomHeaders {
req.Header.Set(i, j)
}
})
if err != nil {
go c.CountMetric(Outputs, 1, []string{"output:alertmanager", "status:error"})
c.Stats.Alertmanager.Add(Error, 1)
Expand Down
Loading

0 comments on commit 261eb9d

Please sign in to comment.