Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve http output handling #966

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
762 changes: 397 additions & 365 deletions config.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/text v0.16.0
google.golang.org/api v0.188.0
google.golang.org/genproto v0.0.0-20240708141625-4ad9e859172b
Expand Down Expand Up @@ -136,7 +137,6 @@ require (
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
68 changes: 34 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func init() {

if config.Slack.WebhookURL != "" {
var err error
slackClient, err = outputs.NewClient("Slack", config.Slack.WebhookURL, config.Slack.MutualTLS, config.Slack.CheckCert, *initClientArgs)
slackClient, err = outputs.NewClient("Slack", config.Slack.WebhookURL, config.Slack.CommonConfig, *initClientArgs)
if err != nil {
config.Slack.WebhookURL = ""
} else {
Expand All @@ -157,7 +157,7 @@ func init() {

if config.Cliq.WebhookURL != "" {
var err error
cliqClient, err = outputs.NewClient("Cliq", config.Cliq.WebhookURL, config.Cliq.MutualTLS, config.Cliq.CheckCert, *initClientArgs)
cliqClient, err = outputs.NewClient("Cliq", config.Cliq.WebhookURL, config.Cliq.CommonConfig, *initClientArgs)
if err != nil {
config.Cliq.WebhookURL = ""
} else {
Expand All @@ -167,7 +167,7 @@ func init() {

if config.Rocketchat.WebhookURL != "" {
var err error
rocketchatClient, err = outputs.NewClient("Rocketchat", config.Rocketchat.WebhookURL, config.Rocketchat.MutualTLS, config.Rocketchat.CheckCert, *initClientArgs)
rocketchatClient, err = outputs.NewClient("Rocketchat", config.Rocketchat.WebhookURL, config.Rocketchat.CommonConfig, *initClientArgs)
if err != nil {
config.Rocketchat.WebhookURL = ""
} else {
Expand All @@ -177,7 +177,7 @@ func init() {

if config.Mattermost.WebhookURL != "" {
var err error
mattermostClient, err = outputs.NewClient("Mattermost", config.Mattermost.WebhookURL, config.Mattermost.MutualTLS, config.Mattermost.CheckCert, *initClientArgs)
mattermostClient, err = outputs.NewClient("Mattermost", config.Mattermost.WebhookURL, config.Mattermost.CommonConfig, *initClientArgs)
if err != nil {
config.Mattermost.WebhookURL = ""
} else {
Expand All @@ -187,7 +187,7 @@ func init() {

if config.Teams.WebhookURL != "" {
var err error
teamsClient, err = outputs.NewClient("Teams", config.Teams.WebhookURL, config.Teams.MutualTLS, config.Teams.CheckCert, *initClientArgs)
teamsClient, err = outputs.NewClient("Teams", config.Teams.WebhookURL, config.Teams.CommonConfig, *initClientArgs)
if err != nil {
config.Teams.WebhookURL = ""
} else {
Expand All @@ -198,7 +198,7 @@ func init() {
if config.Datadog.APIKey != "" {
var err error
endpointUrl := fmt.Sprintf("%s?api_key=%s", config.Datadog.Host+outputs.DatadogPath, config.Datadog.APIKey)
datadogClient, err = outputs.NewClient("Datadog", endpointUrl, config.Datadog.MutualTLS, config.Datadog.CheckCert, *initClientArgs)
datadogClient, err = outputs.NewClient("Datadog", endpointUrl, config.Datadog.CommonConfig, *initClientArgs)
if err != nil {
config.Datadog.APIKey = ""
} else {
Expand All @@ -208,7 +208,7 @@ func init() {

if config.Discord.WebhookURL != "" {
var err error
discordClient, err = outputs.NewClient("Discord", config.Discord.WebhookURL, config.Discord.MutualTLS, config.Discord.CheckCert, *initClientArgs)
discordClient, err = outputs.NewClient("Discord", config.Discord.WebhookURL, config.Discord.CommonConfig, *initClientArgs)
if err != nil {
config.Discord.WebhookURL = ""
} else {
Expand All @@ -219,7 +219,7 @@ func init() {
if config.Alertmanager.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s%s", config.Alertmanager.HostPort, config.Alertmanager.Endpoint)
alertmanagerClient, err = outputs.NewClient("AlertManager", endpointUrl, config.Alertmanager.MutualTLS, config.Alertmanager.CheckCert, *initClientArgs)
alertmanagerClient, err = outputs.NewClient("AlertManager", endpointUrl, config.Alertmanager.CommonConfig, *initClientArgs)
if err != nil {
config.Alertmanager.HostPort = ""
} else {
Expand All @@ -230,7 +230,7 @@ func init() {
if config.Elasticsearch.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/%s/%s", config.Elasticsearch.HostPort, config.Elasticsearch.Index, config.Elasticsearch.Type)
elasticsearchClient, err = outputs.NewClient("Elasticsearch", endpointUrl, config.Elasticsearch.MutualTLS, config.Elasticsearch.CheckCert, *initClientArgs)
elasticsearchClient, err = outputs.NewClient("Elasticsearch", endpointUrl, config.Elasticsearch.CommonConfig, *initClientArgs)
if err != nil {
config.Elasticsearch.HostPort = ""
} else {
Expand All @@ -251,7 +251,7 @@ func init() {
var err error

endpointUrl := fmt.Sprintf("%s/%s/%s/ingest", config.Quickwit.HostPort, config.Quickwit.ApiEndpoint, config.Quickwit.Index)
quickwitClient, err = outputs.NewClient("Quickwit", endpointUrl, config.Quickwit.MutualTLS, config.Quickwit.CheckCert, *initClientArgs)
quickwitClient, err = outputs.NewClient("Quickwit", endpointUrl, config.Quickwit.CommonConfig, *initClientArgs)
if err == nil && config.Quickwit.AutoCreateIndex {
err = quickwitClient.AutoCreateQuickwitIndex(*initClientArgs)
}
Expand All @@ -265,7 +265,7 @@ func init() {

if config.Loki.HostPort != "" {
var err error
lokiClient, err = outputs.NewClient("Loki", config.Loki.HostPort+config.Loki.Endpoint, config.Loki.MutualTLS, config.Loki.CheckCert, *initClientArgs)
lokiClient, err = outputs.NewClient("Loki", config.Loki.HostPort+config.Loki.Endpoint, config.Loki.CommonConfig, *initClientArgs)
if err != nil {
config.Loki.HostPort = ""
} else {
Expand All @@ -275,7 +275,7 @@ func init() {

if config.SumoLogic.ReceiverURL != "" {
var err error
sumologicClient, err = outputs.NewClient("SumoLogic", config.SumoLogic.ReceiverURL, false, config.SumoLogic.CheckCert, *initClientArgs)
sumologicClient, err = outputs.NewClient("SumoLogic", config.SumoLogic.ReceiverURL, config.SumoLogic.CommonConfig, *initClientArgs)
if err != nil {
config.SumoLogic.ReceiverURL = ""
} else {
Expand All @@ -285,7 +285,7 @@ func init() {

if config.Nats.HostPort != "" {
var err error
natsClient, err = outputs.NewClient("NATS", config.Nats.HostPort, config.Nats.MutualTLS, config.Nats.CheckCert, *initClientArgs)
natsClient, err = outputs.NewClient("NATS", config.Nats.HostPort, config.Nats.CommonConfig, *initClientArgs)
if err != nil {
config.Nats.HostPort = ""
} else {
Expand All @@ -295,7 +295,7 @@ func init() {

if config.Stan.HostPort != "" && config.Stan.ClusterID != "" && config.Stan.ClientID != "" {
var err error
stanClient, err = outputs.NewClient("STAN", config.Stan.HostPort, config.Stan.MutualTLS, config.Stan.CheckCert, *initClientArgs)
stanClient, err = outputs.NewClient("STAN", config.Stan.HostPort, config.Stan.CommonConfig, *initClientArgs)
if err != nil {
config.Stan.HostPort = ""
config.Stan.ClusterID = ""
Expand All @@ -320,7 +320,7 @@ func init() {
}

var err error
influxdbClient, err = outputs.NewClient("Influxdb", url, config.Influxdb.MutualTLS, config.Influxdb.CheckCert, *initClientArgs)
influxdbClient, err = outputs.NewClient("Influxdb", url, config.Influxdb.CommonConfig, *initClientArgs)
if err != nil {
config.Influxdb.HostPort = ""
} else {
Expand Down Expand Up @@ -402,7 +402,7 @@ func init() {
if strings.ToLower(config.Opsgenie.Region) == "eu" {
url = "https://api.eu.opsgenie.com/v2/alerts"
}
opsgenieClient, err = outputs.NewClient("Opsgenie", url, config.Opsgenie.MutualTLS, config.Opsgenie.CheckCert, *initClientArgs)
opsgenieClient, err = outputs.NewClient("Opsgenie", url, config.Opsgenie.CommonConfig, *initClientArgs)
if err != nil {
config.Opsgenie.APIKey = ""
} else {
Expand All @@ -412,7 +412,7 @@ func init() {

if config.Webhook.Address != "" {
var err error
webhookClient, err = outputs.NewClient("Webhook", config.Webhook.Address, config.Webhook.MutualTLS, config.Webhook.CheckCert, *initClientArgs)
webhookClient, err = outputs.NewClient("Webhook", config.Webhook.Address, config.Webhook.CommonConfig, *initClientArgs)
if err != nil {
config.Webhook.Address = ""
} else {
Expand All @@ -422,7 +422,7 @@ func init() {

if config.NodeRed.Address != "" {
var err error
noderedClient, err = outputs.NewClient("NodeRed", config.NodeRed.Address, false, config.NodeRed.CheckCert, *initClientArgs)
noderedClient, err = outputs.NewClient("NodeRed", config.NodeRed.Address, config.NodeRed.CommonConfig, *initClientArgs)
if err != nil {
config.NodeRed.Address = ""
} else {
Expand All @@ -432,7 +432,7 @@ func init() {

if config.CloudEvents.Address != "" {
var err error
cloudeventsClient, err = outputs.NewClient("CloudEvents", config.CloudEvents.Address, config.CloudEvents.MutualTLS, config.CloudEvents.CheckCert, *initClientArgs)
cloudeventsClient, err = outputs.NewClient("CloudEvents", config.CloudEvents.Address, config.CloudEvents.CommonConfig, *initClientArgs)
if err != nil {
config.CloudEvents.Address = ""
} else {
Expand Down Expand Up @@ -478,7 +478,7 @@ func init() {
var err error
var outputName = "GCPCloudRun"

gcpCloudRunClient, err = outputs.NewClient(outputName, config.GCP.CloudRun.Endpoint, false, false, *initClientArgs)
gcpCloudRunClient, err = outputs.NewClient(outputName, config.GCP.CloudRun.Endpoint, types.CommonConfig{}, *initClientArgs)

if err != nil {
config.GCP.CloudRun.Endpoint = ""
Expand All @@ -489,7 +489,7 @@ func init() {

if config.Googlechat.WebhookURL != "" {
var err error
googleChatClient, err = outputs.NewClient("Googlechat", config.Googlechat.WebhookURL, config.Googlechat.MutualTLS, config.Googlechat.CheckCert, *initClientArgs)
googleChatClient, err = outputs.NewClient("Googlechat", config.Googlechat.WebhookURL, config.Googlechat.CommonConfig, *initClientArgs)
if err != nil {
config.Googlechat.WebhookURL = ""
} else {
Expand All @@ -509,7 +509,7 @@ func init() {

if config.KafkaRest.Address != "" {
var err error
kafkaRestClient, err = outputs.NewClient("KafkaRest", config.KafkaRest.Address, config.KafkaRest.MutualTLS, config.KafkaRest.CheckCert, *initClientArgs)
kafkaRestClient, err = outputs.NewClient("KafkaRest", config.KafkaRest.Address, config.KafkaRest.CommonConfig, *initClientArgs)
if err != nil {
config.KafkaRest.Address = ""
} else {
Expand All @@ -522,7 +522,7 @@ func init() {
var url = "https://events.pagerduty.com/v2/enqueue"
var outputName = "Pagerduty"

pagerdutyClient, err = outputs.NewClient(outputName, url, config.Pagerduty.MutualTLS, config.Pagerduty.CheckCert, *initClientArgs)
pagerdutyClient, err = outputs.NewClient(outputName, url, config.Pagerduty.CommonConfig, *initClientArgs)

if err != nil {
config.Pagerduty.RoutingKey = ""
Expand All @@ -545,7 +545,7 @@ func init() {

if config.WebUI.URL != "" {
var err error
webUIClient, err = outputs.NewClient("WebUI", config.WebUI.URL, config.WebUI.MutualTLS, config.WebUI.CheckCert, *initClientArgs)
webUIClient, err = outputs.NewClient("WebUI", config.WebUI.URL, config.WebUI.CommonConfig, *initClientArgs)
if err != nil {
config.WebUI.URL = ""
} else {
Expand Down Expand Up @@ -575,7 +575,7 @@ func init() {

if config.Tekton.EventListener != "" {
var err error
tektonClient, err = outputs.NewClient("Tekton", config.Tekton.EventListener, config.Tekton.MutualTLS, config.Tekton.CheckCert, *initClientArgs)
tektonClient, err = outputs.NewClient("Tekton", config.Tekton.EventListener, config.Tekton.CommonConfig, *initClientArgs)
if err != nil {
log.Printf("[ERROR] : Tekton - %v\n", err)
} else {
Expand Down Expand Up @@ -618,7 +618,7 @@ func init() {
var err error
var outputName = "Grafana"
endpointUrl := fmt.Sprintf("%s/api/annotations", config.Grafana.HostPort)
grafanaClient, err = outputs.NewClient(outputName, endpointUrl, config.Grafana.MutualTLS, config.Grafana.CheckCert, *initClientArgs)
grafanaClient, err = outputs.NewClient(outputName, endpointUrl, config.Grafana.CommonConfig, *initClientArgs)
if err != nil {
config.Grafana.HostPort = ""
config.Grafana.APIKey = ""
Expand All @@ -630,7 +630,7 @@ func init() {
if config.GrafanaOnCall.WebhookURL != "" {
var err error
var outputName = "GrafanaOnCall"
grafanaOnCallClient, err = outputs.NewClient(outputName, config.GrafanaOnCall.WebhookURL, config.GrafanaOnCall.MutualTLS, config.GrafanaOnCall.CheckCert, *initClientArgs)
grafanaOnCallClient, err = outputs.NewClient(outputName, config.GrafanaOnCall.WebhookURL, config.GrafanaOnCall.CommonConfig, *initClientArgs)
if err != nil {
config.GrafanaOnCall.WebhookURL = ""
} else {
Expand Down Expand Up @@ -689,7 +689,7 @@ func init() {
if config.Zincsearch.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/api/%s/_doc", config.Zincsearch.HostPort, config.Zincsearch.Index)
zincsearchClient, err = outputs.NewClient("Zincsearch", endpointUrl, false, config.Zincsearch.CheckCert, *initClientArgs)
zincsearchClient, err = outputs.NewClient("Zincsearch", endpointUrl, types.CommonConfig{CheckCert: config.Zincsearch.CheckCert}, *initClientArgs)
if err != nil {
config.Zincsearch.HostPort = ""
} else {
Expand All @@ -700,7 +700,7 @@ func init() {
if config.Gotify.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/message", config.Gotify.HostPort)
gotifyClient, err = outputs.NewClient("Gotify", endpointUrl, false, config.Gotify.CheckCert, *initClientArgs)
gotifyClient, err = outputs.NewClient("Gotify", endpointUrl, types.CommonConfig{CheckCert: config.Gotify.CheckCert}, *initClientArgs)
if err != nil {
config.Gotify.HostPort = ""
} else {
Expand Down Expand Up @@ -744,7 +744,7 @@ func init() {
var err error
var urlFormat = "https://api.telegram.org/bot%s/sendMessage"

telegramClient, err = outputs.NewClient("Telegram", fmt.Sprintf(urlFormat, config.Telegram.Token), false, config.Telegram.CheckCert, *initClientArgs)
telegramClient, err = outputs.NewClient("Telegram", fmt.Sprintf(urlFormat, config.Telegram.Token), types.CommonConfig{CheckCert: config.Telegram.CheckCert}, *initClientArgs)

if err != nil {
config.Telegram.ChatID = ""
Expand All @@ -758,7 +758,7 @@ func init() {

if config.N8N.Address != "" {
var err error
n8nClient, err = outputs.NewClient("n8n", config.N8N.Address, false, config.N8N.CheckCert, *initClientArgs)
n8nClient, err = outputs.NewClient("n8n", config.N8N.Address, types.CommonConfig{CheckCert: config.N8N.CheckCert}, *initClientArgs)
if err != nil {
config.N8N.Address = ""
} else {
Expand All @@ -769,7 +769,7 @@ func init() {
if config.OpenObserve.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/api/%s/%s/_multi", config.OpenObserve.HostPort, config.OpenObserve.OrganizationName, config.OpenObserve.StreamName)
openObserveClient, err = outputs.NewClient("OpenObserve", endpointUrl, config.OpenObserve.MutualTLS, config.OpenObserve.CheckCert, *initClientArgs)
openObserveClient, err = outputs.NewClient("OpenObserve", endpointUrl, config.OpenObserve.CommonConfig, *initClientArgs)
if err != nil {
config.OpenObserve.HostPort = ""
} else {
Expand All @@ -780,7 +780,7 @@ func init() {
if config.Dynatrace.APIToken != "" && config.Dynatrace.APIUrl != "" {
var err error
dynatraceApiUrl := strings.TrimRight(config.Dynatrace.APIUrl, "/") + "/v2/logs/ingest"
dynatraceClient, err = outputs.NewClient("Dynatrace", dynatraceApiUrl, false, config.Dynatrace.CheckCert, *initClientArgs)
dynatraceClient, err = outputs.NewClient("Dynatrace", dynatraceApiUrl, types.CommonConfig{CheckCert: config.Dynatrace.CheckCert}, *initClientArgs)
if err != nil {
config.Dynatrace.APIToken = ""
config.Dynatrace.APIUrl = ""
Expand All @@ -802,7 +802,7 @@ func init() {

if config.Talon.Address != "" {
var err error
talonClient, err = outputs.NewClient("Talon", config.Talon.Address, false, config.Talon.CheckCert, *initClientArgs)
talonClient, err = outputs.NewClient("Talon", config.Talon.Address, types.CommonConfig{CheckCert: config.Talon.CheckCert}, *initClientArgs)
if err != nil {
config.Talon.Address = ""
} else {
Expand Down
12 changes: 6 additions & 6 deletions outputs/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package outputs
import (
"encoding/json"
"log"
"net/http"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -130,13 +131,12 @@ func newAlertmanagerPayload(falcopayload types.FalcoPayload, config *types.Confi
// AlertmanagerPost posts event to AlertManager
func (c *Client) AlertmanagerPost(falcopayload types.FalcoPayload) {
c.Stats.Alertmanager.Add(Total, 1)
c.httpClientLock.Lock()
defer c.httpClientLock.Unlock()
for i, j := range c.Config.Alertmanager.CustomHeaders {
c.AddHeader(i, j)
}

err := c.Post(newAlertmanagerPayload(falcopayload, c.Config))
err := c.Post(newAlertmanagerPayload(falcopayload, c.Config), func(req *http.Request) {
for i, j := range c.Config.Alertmanager.CustomHeaders {
req.Header.Set(i, j)
}
})
if err != nil {
go c.CountMetric(Outputs, 1, []string{"output:alertmanager", "status:error"})
c.Stats.Alertmanager.Add(Error, 1)
Expand Down
Loading