Skip to content

Commit

Permalink
refact: alerts query (#3216)
Browse files Browse the repository at this point in the history
* refact alerts: log messages

* refact: AlertPredicatesFromFilter
  • Loading branch information
mmetc authored Sep 18, 2024
1 parent b93b240 commit 5196932
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 144 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ linters-settings:
disabled: true
- name: cyclomatic
# lower this after refactoring
arguments: [41]
arguments: [39]
- name: defer
disabled: true
- name: empty-block
Expand Down
312 changes: 169 additions & 143 deletions pkg/database/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,14 +456,14 @@ func (c *Client) createAlertChunk(machineID string, owner *ent.Machine, alerts [

startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
if err != nil {
c.Log.Errorf("CreateAlertBulk: Failed to parse startAtTime '%s', defaulting to now: %s", *alertItem.StartAt, err)
c.Log.Errorf("creating alert: Failed to parse startAtTime '%s', defaulting to now: %s", *alertItem.StartAt, err)

startAtTime = time.Now().UTC()
}

stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
if err != nil {
c.Log.Errorf("CreateAlertBulk: Failed to parse stopAtTime '%s', defaulting to now: %s", *alertItem.StopAt, err)
c.Log.Errorf("creating alert: Failed to parse stopAtTime '%s', defaulting to now: %s", *alertItem.StopAt, err)

stopAtTime = time.Now().UTC()
}
Expand All @@ -483,7 +483,7 @@ func (c *Client) createAlertChunk(machineID string, owner *ent.Machine, alerts [
for i, eventItem := range alertItem.Events {
ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp)
if err != nil {
c.Log.Errorf("CreateAlertBulk: Failed to parse event timestamp '%s', defaulting to now: %s", *eventItem.Timestamp, err)
c.Log.Errorf("creating alert: Failed to parse event timestamp '%s', defaulting to now: %s", *eventItem.Timestamp, err)

ts = time.Now().UTC()
}
Expand Down Expand Up @@ -694,7 +694,7 @@ func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]str
return nil, fmt.Errorf("machine '%s': %w", machineID, err)
}

c.Log.Debugf("CreateAlertBulk: Machine Id %s doesn't exist", machineID)
c.Log.Debugf("creating alert: machine %s doesn't exist", machineID)

owner = nil
}
Expand Down Expand Up @@ -724,6 +724,160 @@ func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]str
return alertIDs, nil
}

func handleSimulatedFilter(filter map[string][]string, predicates *[]predicate.Alert) {
/* the simulated filter is a bit different : if it's not present *or* set to false, specifically exclude records with simulated to true */
if v, ok := filter["simulated"]; ok && v[0] == "false" {
*predicates = append(*predicates, alert.SimulatedEQ(false))
}
}

func handleOriginFilter(filter map[string][]string, predicates *[]predicate.Alert) {
if _, ok := filter["origin"]; ok {
filter["include_capi"] = []string{"true"}
}
}

func handleScopeFilter(scope string, predicates *[]predicate.Alert) {
if strings.ToLower(scope) == "ip" {
scope = types.Ip
} else if strings.ToLower(scope) == "range" {
scope = types.Range
}

*predicates = append(*predicates, alert.SourceScopeEQ(scope))
}

func handleTimeFilters(param, value string, predicates *[]predicate.Alert) error {
duration, err := ParseDuration(value)
if err != nil {
return fmt.Errorf("while parsing duration: %w", err)
}

timePoint := time.Now().UTC().Add(-duration)
if timePoint.IsZero() {
return fmt.Errorf("empty time now() - %s", timePoint.String())
}

switch param {
case "since":
*predicates = append(*predicates, alert.StartedAtGTE(timePoint))
case "created_before":
*predicates = append(*predicates, alert.CreatedAtLTE(timePoint))
case "until":
*predicates = append(*predicates, alert.StartedAtLTE(timePoint))
}

return nil
}

func handleIPv4Predicates(ip_sz int, contains bool, start_ip, start_sfx, end_ip, end_sfx int64, predicates *[]predicate.Alert) {
if contains { // decision contains {start_ip,end_ip}
*predicates = append(*predicates, alert.And(
alert.HasDecisionsWith(decision.StartIPLTE(start_ip)),
alert.HasDecisionsWith(decision.EndIPGTE(end_ip)),
alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
))
} else { // decision is contained within {start_ip,end_ip}
*predicates = append(*predicates, alert.And(
alert.HasDecisionsWith(decision.StartIPGTE(start_ip)),
alert.HasDecisionsWith(decision.EndIPLTE(end_ip)),
alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
))
}
}

func handleIPv6Predicates(ip_sz int, contains bool, start_ip, start_sfx, end_ip, end_sfx int64, predicates *[]predicate.Alert) {
if contains { // decision contains {start_ip,end_ip}
*predicates = append(*predicates, alert.And(
// matching addr size
alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
alert.Or(
// decision.start_ip < query.start_ip
alert.HasDecisionsWith(decision.StartIPLT(start_ip)),
alert.And(
// decision.start_ip == query.start_ip
alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
// decision.start_suffix <= query.start_suffix
alert.HasDecisionsWith(decision.StartSuffixLTE(start_sfx)),
),
),
alert.Or(
// decision.end_ip > query.end_ip
alert.HasDecisionsWith(decision.EndIPGT(end_ip)),
alert.And(
// decision.end_ip == query.end_ip
alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
// decision.end_suffix >= query.end_suffix
alert.HasDecisionsWith(decision.EndSuffixGTE(end_sfx)),
),
),
))
} else { // decision is contained within {start_ip,end_ip}
*predicates = append(*predicates, alert.And(
// matching addr size
alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
alert.Or(
// decision.start_ip > query.start_ip
alert.HasDecisionsWith(decision.StartIPGT(start_ip)),
alert.And(
// decision.start_ip == query.start_ip
alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
// decision.start_suffix >= query.start_suffix
alert.HasDecisionsWith(decision.StartSuffixGTE(start_sfx)),
),
),
alert.Or(
// decision.end_ip < query.end_ip
alert.HasDecisionsWith(decision.EndIPLT(end_ip)),
alert.And(
// decision.end_ip == query.end_ip
alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
// decision.end_suffix <= query.end_suffix
alert.HasDecisionsWith(decision.EndSuffixLTE(end_sfx)),
),
),
))
}
}

func handleIPPredicates(ip_sz int, contains bool, start_ip, start_sfx, end_ip, end_sfx int64, predicates *[]predicate.Alert) error {
if ip_sz == 4 {
handleIPv4Predicates(ip_sz, contains, start_ip, start_sfx, end_ip, end_sfx, predicates)
} else if ip_sz == 16 {
handleIPv6Predicates(ip_sz, contains, start_ip, start_sfx, end_ip, end_sfx, predicates)
} else if ip_sz != 0 {
return errors.Wrapf(InvalidFilter, "Unknown ip size %d", ip_sz)
}

return nil
}

func handleIncludeCapiFilter(value string, predicates *[]predicate.Alert) error {
if value == "false" {
*predicates = append(*predicates, alert.And(
// do not show alerts with active decisions having origin CAPI or lists
alert.And(
alert.Not(alert.HasDecisionsWith(decision.OriginEQ(types.CAPIOrigin))),
alert.Not(alert.HasDecisionsWith(decision.OriginEQ(types.ListOrigin))),
),
alert.Not(
alert.And(
// do not show neither alerts with no decisions if the Source Scope is lists: or CAPI
alert.Not(alert.HasDecisions()),
alert.Or(
alert.SourceScopeHasPrefix(types.ListOrigin+":"),
alert.SourceScopeEQ(types.CommunityBlocklistPullSourceScope),
),
),
),
))
} else if value != "true" {
log.Errorf("invalid bool '%s' for include_capi", value)
}

return nil
}

func AlertPredicatesFromFilter(filter map[string][]string) ([]predicate.Alert, error) {
predicates := make([]predicate.Alert, 0)

Expand All @@ -739,16 +893,8 @@ func AlertPredicatesFromFilter(filter map[string][]string) ([]predicate.Alert, e
/*if contains is true, return bans that *contains* the given value (value is the inner)
else, return bans that are *contained* by the given value (value is the outer)*/

/*the simulated filter is a bit different : if it's not present *or* set to false, specifically exclude records with simulated to true */
if v, ok := filter["simulated"]; ok {
if v[0] == "false" {
predicates = append(predicates, alert.SimulatedEQ(false))
}
}

if _, ok := filter["origin"]; ok {
filter["include_capi"] = []string{"true"}
}
handleSimulatedFilter(filter, &predicates)
handleOriginFilter(filter, &predicates)

for param, value := range filter {
switch param {
Expand All @@ -758,14 +904,7 @@ func AlertPredicatesFromFilter(filter map[string][]string) ([]predicate.Alert, e
return nil, errors.Wrapf(InvalidFilter, "invalid contains value : %s", err)
}
case "scope":
scope := value[0]
if strings.ToLower(scope) == "ip" {
scope = types.Ip
} else if strings.ToLower(scope) == "range" {
scope = types.Range
}

predicates = append(predicates, alert.SourceScopeEQ(scope))
handleScopeFilter(value[0], &predicates)
case "value":
predicates = append(predicates, alert.SourceValueEQ(value[0]))
case "scenario":
Expand All @@ -775,68 +914,18 @@ func AlertPredicatesFromFilter(filter map[string][]string) ([]predicate.Alert, e
if err != nil {
return nil, errors.Wrapf(InvalidIPOrRange, "unable to convert '%s' to int: %s", value[0], err)
}
case "since":
duration, err := ParseDuration(value[0])
if err != nil {
return nil, fmt.Errorf("while parsing duration: %w", err)
}

since := time.Now().UTC().Add(-duration)
if since.IsZero() {
return nil, fmt.Errorf("empty time now() - %s", since.String())
}

predicates = append(predicates, alert.StartedAtGTE(since))
case "created_before":
duration, err := ParseDuration(value[0])
if err != nil {
return nil, fmt.Errorf("while parsing duration: %w", err)
}
case "since", "created_before", "until":
if err := handleTimeFilters(param, value[0], &predicates); err != nil {
return nil, err

since := time.Now().UTC().Add(-duration)
if since.IsZero() {
return nil, fmt.Errorf("empty time now() - %s", since.String())
}

predicates = append(predicates, alert.CreatedAtLTE(since))
case "until":
duration, err := ParseDuration(value[0])
if err != nil {
return nil, fmt.Errorf("while parsing duration: %w", err)
}

until := time.Now().UTC().Add(-duration)
if until.IsZero() {
return nil, fmt.Errorf("empty time now() - %s", until.String())
}

predicates = append(predicates, alert.StartedAtLTE(until))
case "decision_type":
predicates = append(predicates, alert.HasDecisionsWith(decision.TypeEQ(value[0])))
case "origin":
predicates = append(predicates, alert.HasDecisionsWith(decision.OriginEQ(value[0])))
case "include_capi": // allows to exclude one or more specific origins
if value[0] == "false" {
predicates = append(predicates, alert.And(
// do not show alerts with active decisions having origin CAPI or lists
alert.And(
alert.Not(alert.HasDecisionsWith(decision.OriginEQ(types.CAPIOrigin))),
alert.Not(alert.HasDecisionsWith(decision.OriginEQ(types.ListOrigin))),
),
alert.Not(
alert.And(
// do not show neither alerts with no decisions if the Source Scope is lists: or CAPI
alert.Not(alert.HasDecisions()),
alert.Or(
alert.SourceScopeHasPrefix(types.ListOrigin+":"),
alert.SourceScopeEQ(types.CommunityBlocklistPullSourceScope),
),
),
),
),
)
} else if value[0] != "true" {
log.Errorf("Invalid bool '%s' for include_capi", value[0])
if err = handleIncludeCapiFilter(value[0], &predicates); err != nil {
return nil, err
}
case "has_active_decision":
if hasActiveDecision, err = strconv.ParseBool(value[0]); err != nil {
Expand All @@ -861,72 +950,9 @@ func AlertPredicatesFromFilter(filter map[string][]string) ([]predicate.Alert, e
}
}

if ip_sz == 4 {
if contains { /*decision contains {start_ip,end_ip}*/
predicates = append(predicates, alert.And(
alert.HasDecisionsWith(decision.StartIPLTE(start_ip)),
alert.HasDecisionsWith(decision.EndIPGTE(end_ip)),
alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
))
} else { /*decision is contained within {start_ip,end_ip}*/
predicates = append(predicates, alert.And(
alert.HasDecisionsWith(decision.StartIPGTE(start_ip)),
alert.HasDecisionsWith(decision.EndIPLTE(end_ip)),
alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
))
}
} else if ip_sz == 16 {
if contains { /*decision contains {start_ip,end_ip}*/
predicates = append(predicates, alert.And(
// matching addr size
alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
alert.Or(
// decision.start_ip < query.start_ip
alert.HasDecisionsWith(decision.StartIPLT(start_ip)),
alert.And(
// decision.start_ip == query.start_ip
alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
// decision.start_suffix <= query.start_suffix
alert.HasDecisionsWith(decision.StartSuffixLTE(start_sfx)),
)),
alert.Or(
// decision.end_ip > query.end_ip
alert.HasDecisionsWith(decision.EndIPGT(end_ip)),
alert.And(
// decision.end_ip == query.end_ip
alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
// decision.end_suffix >= query.end_suffix
alert.HasDecisionsWith(decision.EndSuffixGTE(end_sfx)),
),
),
))
} else { /*decision is contained within {start_ip,end_ip}*/
predicates = append(predicates, alert.And(
// matching addr size
alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
alert.Or(
// decision.start_ip > query.start_ip
alert.HasDecisionsWith(decision.StartIPGT(start_ip)),
alert.And(
// decision.start_ip == query.start_ip
alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
// decision.start_suffix >= query.start_suffix
alert.HasDecisionsWith(decision.StartSuffixGTE(start_sfx)),
)),
alert.Or(
// decision.end_ip < query.end_ip
alert.HasDecisionsWith(decision.EndIPLT(end_ip)),
alert.And(
// decision.end_ip == query.end_ip
alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
// decision.end_suffix <= query.end_suffix
alert.HasDecisionsWith(decision.EndSuffixLTE(end_sfx)),
),
),
))
}
} else if ip_sz != 0 {
return nil, errors.Wrapf(InvalidFilter, "Unknown ip size %d", ip_sz)
if err := handleIPPredicates(ip_sz, contains, start_ip, start_sfx, end_ip, end_sfx, &predicates); err != nil {
return nil, err

}

return predicates, nil
Expand Down

0 comments on commit 5196932

Please sign in to comment.