Skip to content

Commit

Permalink
🚀 add external labels for datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
Cairry committed Nov 27, 2024
1 parent 310e3fe commit 6daf11b
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 46 deletions.
50 changes: 45 additions & 5 deletions alert/eval/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import (
// Metrics 包含 Prometheus、VictoriaMetrics 数据源
func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.AlertRule) (curFiringKeys, curPendingKeys []string) {
pools := ctx.Redis.ProviderPools()
var resQuery []provider.Metrics
var (
resQuery []provider.Metrics
externalLabels map[string]interface{}
)

switch datasourceType {
case provider.PrometheusDsProvider:
cli, err := pools.GetClient(datasourceId)
Expand All @@ -33,6 +37,8 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.
logc.Error(ctx.Ctx, err.Error())
return
}

externalLabels = cli.(provider.PrometheusProvider).GetExternalLabels()
case provider.VictoriaMetricsDsProvider:
cli, err := pools.GetClient(datasourceId)
if err != nil {
Expand All @@ -45,6 +51,8 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.
logc.Error(ctx.Ctx, err.Error())
return
}

externalLabels = cli.(provider.VictoriaMetricsProvider).GetExternalLabels()
default:
logc.Errorf(ctx.Ctx, fmt.Sprintf("Unsupported metrics type, type: %s", datasourceType))
return
Expand All @@ -66,6 +74,9 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.
event.Fingerprint = v.GetFingerprint()
event.Metric = v.GetMetric()
event.Metric["severity"] = ruleExpr.Severity
for ek, ev := range externalLabels {
event.Metric[ek] = ev
}
event.Severity = ruleExpr.Severity
event.Annotations = tools.ParserVariables(rule.PrometheusConfig.Annotations, event.Metric)

Expand Down Expand Up @@ -96,9 +107,10 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.
// Logs 包含 AliSLS、Loki、ElasticSearch 数据源
func logs(ctx *ctx.Context, datasourceId, datasourceType string, rule models.AlertRule) (curFiringKeys []string) {
var (
queryRes []provider.Logs
count int
evalOptions models.EvalCondition
queryRes []provider.Logs
count int
evalOptions models.EvalCondition
externalLabels map[string]interface{}
)

pools := ctx.Redis.ProviderPools()
Expand All @@ -125,6 +137,8 @@ func logs(ctx *ctx.Context, datasourceId, datasourceType string, rule models.Ale
return
}

externalLabels = cli.(provider.LokiProvider).GetExternalLabels()

evalOptions = models.EvalCondition{
Operator: rule.LokiConfig.EvalCondition.Operator,
QueryValue: float64(count),
Expand Down Expand Up @@ -154,6 +168,8 @@ func logs(ctx *ctx.Context, datasourceId, datasourceType string, rule models.Ale
return
}

externalLabels = cli.(provider.AliCloudSlsDsProvider).GetExternalLabels()

evalOptions = models.EvalCondition{
Operator: rule.AliCloudSLSConfig.EvalCondition.Operator,
QueryValue: float64(count),
Expand Down Expand Up @@ -182,6 +198,8 @@ func logs(ctx *ctx.Context, datasourceId, datasourceType string, rule models.Ale
return
}

externalLabels = cli.(provider.ElasticSearchDsProvider).GetExternalLabels()

evalOptions = models.EvalCondition{
Operator: ">",
QueryValue: float64(count),
Expand All @@ -199,6 +217,9 @@ func logs(ctx *ctx.Context, datasourceId, datasourceType string, rule models.Ale
event.DatasourceId = datasourceId
event.Fingerprint = v.GetFingerprint()
event.Metric = v.GetMetric()
for ek, ev := range externalLabels {
event.Metric[ek] = ev
}
event.Annotations = fmt.Sprintf("统计日志条数: %d 条\n%s", count, tools.FormatJson(v.GetAnnotations()[0].(string)))

key := event.GetPendingAlertCacheKey()
Expand All @@ -219,7 +240,8 @@ func logs(ctx *ctx.Context, datasourceId, datasourceType string, rule models.Ale
// Traces 包含 Jaeger 数据源
func traces(ctx *ctx.Context, datasourceId, datasourceType string, rule models.AlertRule) (curFiringKeys []string) {
var (
queryRes []provider.Traces
queryRes []provider.Traces
externalLabels map[string]interface{}
)

pools := ctx.Redis.ProviderPools()
Expand All @@ -245,13 +267,18 @@ func traces(ctx *ctx.Context, datasourceId, datasourceType string, rule models.A
logc.Error(ctx.Ctx, err.Error())
return
}

externalLabels = cli.(provider.JaegerDsProvider).GetExternalLabels()
}

for _, v := range queryRes {
event := process.BuildEvent(rule)
event.DatasourceId = datasourceId
event.Fingerprint = v.GetFingerprint()
event.Metric = v.GetMetric()
for ek, ev := range externalLabels {
event.Metric[ek] = ev
}
event.Annotations = fmt.Sprintf("服务: %s 链路中存在异常状态码接口, TraceId: %s", rule.JaegerConfig.Service, v.TraceId)

key := event.GetFiringAlertCacheKey()
Expand All @@ -264,12 +291,16 @@ func traces(ctx *ctx.Context, datasourceId, datasourceType string, rule models.A
}

func cloudWatch(ctx *ctx.Context, datasourceId string, rule models.AlertRule) (curFiringKeys []string) {
var externalLabels map[string]interface{}
pools := ctx.Redis.ProviderPools()
cfg, err := pools.GetClient(datasourceId)
if err != nil {
logc.Errorf(ctx.Ctx, err.Error())
return
}

externalLabels = cfg.(provider.AwsConfig).GetExternalLabels()

cli := cfg.(provider.AwsConfig).CloudWatchCli()
curAt := time.Now().UTC()
startsAt := tools.ParserDuration(curAt, rule.CloudWatchConfig.Period, "m")
Expand All @@ -295,6 +326,9 @@ func cloudWatch(ctx *ctx.Context, datasourceId string, rule models.AlertRule) (c
event.DatasourceId = datasourceId
event.Fingerprint = query.GetFingerprint()
event.Metric = query.GetMetrics()
for ek, ev := range externalLabels {
event.Metric[ek] = ev
}
event.Annotations = fmt.Sprintf("%s %s %s %s %d", query.Namespace, query.MetricName, query.Statistic, rule.CloudWatchConfig.Expr, rule.CloudWatchConfig.Threshold)

return event
Expand All @@ -315,6 +349,7 @@ func cloudWatch(ctx *ctx.Context, datasourceId string, rule models.AlertRule) (c
}

func kubernetesEvent(ctx *ctx.Context, datasourceId string, rule models.AlertRule) (curFiringKeys []string) {
var externalLabels map[string]interface{}
datasourceObj, err := ctx.DB.Datasource().GetInstance(datasourceId)
if err != nil {
logc.Error(ctx.Ctx, err.Error())
Expand All @@ -334,6 +369,8 @@ func kubernetesEvent(ctx *ctx.Context, datasourceId string, rule models.AlertRul
return
}

externalLabels = cli.(provider.KubernetesClient).GetExternalLabels()

if len(event.Items) < rule.KubernetesConfig.Value {
return []string{}
}
Expand All @@ -347,6 +384,9 @@ func kubernetesEvent(ctx *ctx.Context, datasourceId string, rule models.AlertRul
alertEvent.DatasourceId = datasourceId
alertEvent.Fingerprint = k8sItem.GetFingerprint()
alertEvent.Metric = k8sItem.GetMetrics()
for ek, ev := range externalLabels {
alertEvent.Metric[ek] = ev
}
alertEvent.Annotations = fmt.Sprintf("- 环境: %s\n- 命名空间: %s\n- 资源类型: %s\n- 资源名称: %s\n- 事件类型: %s\n- 事件详情: %s\n",
datasourceObj.Name, item.Namespace, item.InvolvedObject.Kind,
item.InvolvedObject.Name, item.Reason, eventMapping[item.InvolvedObject.Name],
Expand Down
27 changes: 14 additions & 13 deletions internal/models/datasource.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package models

type AlertDataSource struct {
TenantId string `json:"tenantId"`
Id string `json:"id"`
Name string `json:"name"`
Type string `json:"type"`
HTTP HTTP `json:"http" gorm:"http;serializer:json"`
AliCloudEndpoint string `json:"alicloudEndpoint"`
AliCloudAk string `json:"alicloudAk"`
AliCloudSk string `json:"alicloudSk"`
AWSCloudWatch AWSCloudWatch `json:"awsCloudwatch" gorm:"awsCloudwatch;serializer:json"`
Description string `json:"description"`
KubeConfig string `json:"kubeConfig"`
ElasticSearch ElasticSearch `json:"elasticSearch" gorm:"elasticSearch;serializer:json"`
Enabled *bool `json:"enabled" `
TenantId string `json:"tenantId"`
Id string `json:"id"`
Name string `json:"name"`
Labels map[string]interface{} `json:"labels" gorm:"labels;serializer:json"` // 额外标签,会添加到事件Metric中,可用于区分数据来源;
Type string `json:"type"`
HTTP HTTP `json:"http" gorm:"http;serializer:json"`
AliCloudEndpoint string `json:"alicloudEndpoint"`
AliCloudAk string `json:"alicloudAk"`
AliCloudSk string `json:"alicloudSk"`
AWSCloudWatch AWSCloudWatch `json:"awsCloudwatch" gorm:"awsCloudwatch;serializer:json"`
Description string `json:"description"`
KubeConfig string `json:"kubeConfig"`
ElasticSearch ElasticSearch `json:"elasticSearch" gorm:"elasticSearch;serializer:json"`
Enabled *bool `json:"enabled" `
}

type ElasticSearch struct {
Expand Down
6 changes: 3 additions & 3 deletions internal/services/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (ds datasourceService) Create(req interface{}) (interface{}, interface{}) {

func (ds datasourceService) Update(req interface{}) (interface{}, interface{}) {
dataSource := req.(*models.AlertDataSource)

err := ds.ctx.DB.Datasource().Update(*dataSource)
if err != nil {
return nil, err
Expand Down Expand Up @@ -129,9 +129,9 @@ func (ds datasourceService) WithAddClientToProviderPools(datasource models.Alert
case provider.JaegerDsProviderName:
cli, err = provider.NewJaegerClient(datasource)
case "Kubernetes":
cli, err = provider.NewKubernetesClient(ds.ctx.Ctx, datasource.KubeConfig)
cli, err = provider.NewKubernetesClient(ds.ctx.Ctx, datasource.KubeConfig, datasource.Labels)
case "CloudWatch":
cli, err = provider.NewAWSCredentialCfg(datasource.AWSCloudWatch.Region, datasource.AWSCloudWatch.AccessKey, datasource.AWSCloudWatch.SecretKey)
cli, err = provider.NewAWSCredentialCfg(datasource.AWSCloudWatch.Region, datasource.AWSCloudWatch.AccessKey, datasource.AWSCloudWatch.SecretKey, datasource.Labels)
}

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/community/aws/cloudwatch/service/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a awsRdsService) GetDBInstanceIdentifier(req interface{}) (interface{}, in
return nil, err
}

cfg, err := provider.NewAWSCredentialCfg(datasourceObj.AWSCloudWatch.Region, datasourceObj.AWSCloudWatch.AccessKey, datasourceObj.AWSCloudWatch.SecretKey)
cfg, err := provider.NewAWSCredentialCfg(datasourceObj.AWSCloudWatch.Region, datasourceObj.AWSCloudWatch.AccessKey, datasourceObj.AWSCloudWatch.SecretKey, datasourceObj.Labels)
if err != nil {
return nil, err
}
Expand All @@ -59,7 +59,7 @@ func (a awsRdsService) GetDBClusterIdentifier(req interface{}) (interface{}, int
return nil, err
}

cfg, err := provider.NewAWSCredentialCfg(datasourceObj.AWSCloudWatch.Region, datasourceObj.AWSCloudWatch.AccessKey, datasourceObj.AWSCloudWatch.SecretKey)
cfg, err := provider.NewAWSCredentialCfg(datasourceObj.AWSCloudWatch.Region, datasourceObj.AWSCloudWatch.AccessKey, datasourceObj.AWSCloudWatch.SecretKey, datasourceObj.Labels)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/provider/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
)

type AwsConfig struct {
cfg aws.Config
ExternalLabels map[string]interface{}
cfg aws.Config
}

func NewAWSCredentialCfg(region, ak, sk string) (AwsConfig, error) {
func NewAWSCredentialCfg(region, ak, sk string, labels map[string]interface{}) (AwsConfig, error) {
cfg, err := config.LoadDefaultConfig(context.Background(),
func(options *config.LoadOptions) error {
options.Region = region
Expand All @@ -31,7 +32,8 @@ func NewAWSCredentialCfg(region, ak, sk string) (AwsConfig, error) {
}

return AwsConfig{
cfg: cfg,
ExternalLabels: labels,
cfg: cfg,
}, nil
}

Expand All @@ -42,3 +44,7 @@ func (a AwsConfig) CloudWatchCli() *cloudwatch.Client {
func (a AwsConfig) RdsCli() *rds.Client {
return rds.NewFromConfig(a.cfg)
}

func (a AwsConfig) GetExternalLabels() map[string]interface{} {
return a.ExternalLabels
}
2 changes: 1 addition & 1 deletion pkg/provider/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func CheckDatasourceHealth(datasource models.AlertDataSource) bool {
check, err = vmClient.Check()
}
case "Kubernetes":
cli, err := NewKubernetesClient(context.Background(), datasource.KubeConfig)
cli, err := NewKubernetesClient(context.Background(), datasource.KubeConfig, datasource.Labels)
if err == nil {
_, err = cli.GetWarningEvent("", 1)
check = (err == nil)
Expand Down
16 changes: 11 additions & 5 deletions pkg/provider/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
)

type KubernetesClient struct {
Cli *kubernetes.Clientset
Ctx context.Context
ExternalLabels map[string]interface{}
Cli *kubernetes.Clientset
Ctx context.Context
}

func NewKubernetesClient(ctx context.Context, kubeConfigContent string) (KubernetesClient, error) {
func NewKubernetesClient(ctx context.Context, kubeConfigContent string, labels map[string]interface{}) (KubernetesClient, error) {
// 如果配置内容为空,则去默认目录下取配置文件的内容
if kubeConfigContent == "" {
kubeConfigContent = os.Getenv("HOME") + "/.kube/config"
Expand Down Expand Up @@ -47,8 +48,9 @@ func NewKubernetesClient(ctx context.Context, kubeConfigContent string) (Kuberne
}

return KubernetesClient{
Cli: cs,
Ctx: ctx,
Cli: cs,
Ctx: ctx,
ExternalLabels: labels,
}, nil
}

Expand Down Expand Up @@ -85,3 +87,7 @@ func (a KubernetesClient) GetWarningEvent(reason string, scope int) (*corev1.Eve

return &warningEvents, nil
}

func (a KubernetesClient) GetExternalLabels() map[string]interface{} {
return a.ExternalLabels
}
1 change: 1 addition & 0 deletions pkg/provider/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
type LogsFactoryProvider interface {
Query(options LogQueryOptions) ([]Logs, int, error)
Check() (bool, error)
GetExternalLabels() map[string]interface{}
}

type LogQueryOptions struct {
Expand Down
12 changes: 10 additions & 2 deletions pkg/provider/logs_alicloud_sls.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

type AliCloudSlsDsProvider struct {
client *sls20201230.Client
client *sls20201230.Client
ExternalLabels map[string]interface{}
}

func NewAliCloudSlsClient(source models.AlertDataSource) (LogsFactoryProvider, error) {
Expand All @@ -24,7 +25,10 @@ func NewAliCloudSlsClient(source models.AlertDataSource) (LogsFactoryProvider, e
return AliCloudSlsDsProvider{}, err
}

return AliCloudSlsDsProvider{client: result}, nil
return AliCloudSlsDsProvider{
client: result,
ExternalLabels: source.Labels,
}, nil
}

func (a AliCloudSlsDsProvider) Query(query LogQueryOptions) ([]Logs, int, error) {
Expand Down Expand Up @@ -73,3 +77,7 @@ func (a AliCloudSlsDsProvider) Check() (bool, error) {

return true, nil
}

func (a AliCloudSlsDsProvider) GetExternalLabels() map[string]interface{} {
return a.ExternalLabels
}
14 changes: 10 additions & 4 deletions pkg/provider/logs_elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

type ElasticSearchDsProvider struct {
cli *elastic.Client
url string
cli *elastic.Client
url string
ExternalLabels map[string]interface{}
}

func NewElasticSearchClient(ctx context.Context, ds models.AlertDataSource) (LogsFactoryProvider, error) {
Expand All @@ -24,8 +25,9 @@ func NewElasticSearchClient(ctx context.Context, ds models.AlertDataSource) (Log
}

return ElasticSearchDsProvider{
client,
ds.ElasticSearch.Url,
cli: client,
url: ds.ElasticSearch.Url,
ExternalLabels: ds.Labels,
}, nil
}

Expand Down Expand Up @@ -95,3 +97,7 @@ func (e ElasticSearchDsProvider) Check() (bool, error) {
}
return true, nil
}

func (e ElasticSearchDsProvider) GetExternalLabels() map[string]interface{} {
return e.ExternalLabels
}
Loading

0 comments on commit 6daf11b

Please sign in to comment.