Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(general): Used jsonpath to update vertex attributes #6852

Merged
merged 16 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions checkov/cloudformation/graph_builder/graph_components/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,15 @@ def _should_add_previous_breadcrumbs(change_origin_id: Optional[int],
@staticmethod
def _should_set_changed_attributes(change_origin_id: Optional[int], attribute_at_dest: Optional[str]) -> bool:
return change_origin_id is not None and attribute_at_dest is not None

def _handle_unique_key_characters(self, key: str) -> str:
# `::` is not a valid jsonpath character, but cloudformation have multiple functions like `Fn::If` which use it,
# so we solve it with escaping using parenthesis
key_parts = key.split(".")
updated_key = ""
for part in key_parts:
if part.startswith("Fn::"):
updated_key += f'"{part}"'
else:
updated_key += part
return updated_key
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,17 @@ def _evaluate_if_connection(
# The operand is a simple string
evaluated_value = operand_to_eval
evaluated_value_hierarchy = str(operand_index)
elif isinstance(operand_to_eval, dict) and ConditionFunctions.IF in operand_to_eval:
# The operand is {'Fn::If': new value to evaluate}
condition_to_eval = operand_to_eval[ConditionFunctions.IF]
if isinstance(condition_to_eval, list) and isinstance(condition_to_eval[0], str):
condition_vertex_attributes = self._fetch_vertex_attributes(condition_to_eval[0], BlockType.CONDITIONS)
evaluated_value, evaluated_value_operand_index = self._evaluate_if_connection(condition_to_eval, condition_vertex_attributes)
evaluated_value_hierarchy = f'{operand_index}.{ConditionFunctions.IF}.{evaluated_value_operand_index}'
elif isinstance(operand_to_eval, dict):
if ConditionFunctions.IF in operand_to_eval:
# The operand is {'Fn::If': new value to evaluate}
condition_to_eval = operand_to_eval[ConditionFunctions.IF]
if isinstance(condition_to_eval, list) and isinstance(condition_to_eval[0], str):
condition_vertex_attributes = self._fetch_vertex_attributes(condition_to_eval[0], BlockType.CONDITIONS)
evaluated_value, evaluated_value_operand_index = self._evaluate_if_connection(condition_to_eval, condition_vertex_attributes)
evaluated_value_hierarchy = f'{operand_index}.{ConditionFunctions.IF}.{evaluated_value_operand_index}'
elif not any([op for op in self.CONDITIONS_EVALUATED_FUNCTIONS if op in operand_to_eval]):
# The operand is a dict without any further actions to perform
evaluated_value = operand_to_eval

return evaluated_value, evaluated_value_hierarchy

Expand Down
51 changes: 50 additions & 1 deletion checkov/common/graph/graph_builder/graph_components/blocks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import logging
import typing
from collections.abc import Collection
from typing import Union, Dict, Any, List, cast

Expand All @@ -8,6 +10,11 @@
from checkov.common.graph.graph_builder.variable_rendering.breadcrumb_metadata import BreadcrumbMetadata
from checkov.common.util.data_structures_utils import pickle_deepcopy

from bc_jsonpath_ng.ext import parse

if typing.TYPE_CHECKING:
from bc_jsonpath_ng import JSONPath


class Block:
__slots__ = (
Expand All @@ -25,6 +32,8 @@ class Block:
"foreach_attrs"
)

jsonpath_parsed_statement_cache: "dict[str, JSONPath]" = {} # noqa: CCE003 # global cache

def __init__(
self,
name: str,
Expand Down Expand Up @@ -158,7 +167,16 @@ def update_attribute(
key = join_trimmed_strings(char_to_join=".", str_lst=attribute_key_parts, num_to_trim=i)
if key.find(".") > -1:
additional_changed_attributes = self.extract_additional_changed_attributes(key)
self.attributes[key] = attribute_value
if key in self.attributes and isinstance(self.attributes[key], dict) and key != attribute_key:
try:
self._update_attribute_based_on_jsonpath_key(attribute_value, key)
except Exception as e:
logging.warning(f"Failed updating attribute for key: {key} and value {attribute_value} for"
f"vertex attributes {self.attributes}. Falling back to explicitly setting it."
f"Exception - {e}")
self.attributes[key] = attribute_value
else:
self.attributes[key] = attribute_value
end_key_part = attribute_key_parts[len(attribute_key_parts) - 1 - i]
if transform_step and end_key_part in ("1", "2"):
# if condition logic during the transform step breaks the values
Expand All @@ -170,6 +188,37 @@ def update_attribute(
for changed_attribute in additional_changed_attributes:
self.changed_attributes[changed_attribute] = previous_breadcrumbs

def _update_attribute_based_on_jsonpath_key(self, attribute_value: Any, key: str) -> None:
"""
When updating all the attributes we might try to update a specific attribute inside a complex object,
so we use jsonpath to refer to the specific location only.
"""
if key not in Block.jsonpath_parsed_statement_cache:
jsonpath_key = self._get_jsonpath_key(key)
expr = parse(jsonpath_key)
Block.jsonpath_parsed_statement_cache[key] = expr
else:
expr = Block.jsonpath_parsed_statement_cache[key]
match = expr.find(self.attributes)
if match:
match[0].value = attribute_value
return None

def _get_jsonpath_key(self, key: str) -> str:
key = self._handle_unique_key_characters(key)
# Replace .0 with [0] to match jsonpath style
jsonpath_key = "$."
key_parts = key.split(".")
for part in key_parts:
if part.isnumeric():
jsonpath_key += f"[{part}]"
else:
jsonpath_key += part
return jsonpath_key

def _handle_unique_key_characters(self, key: str) -> str:
return key

def update_inner_attribute(
self, attribute_key: str, nested_attributes: list[Any] | dict[str, Any], value_to_update: Any
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion performance_tests/test_checkov_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
'repo_name': 'terraform-aws-components',
'threshold': {
"Darwin": 19.0,
"Linux": 10.0,
"Linux": 11.0,
"Windows": 15.0,
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
metadata:
id: "complex_jsonpath_if_condition"
name: "cfn-if"
severity: "high"
guidelines: "MGUIFDE"
category: "general"
scope:
provider: "aws"
definition:
and:
- cond_type: "attribute"
resource_types:
- "AWS::ECS::TaskDefinition"
attribute: "ContainerDefinitions[?(!@.LogConfiguration)]"
operator: "jsonpath_not_exists"
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
AWSTemplateFormatVersion: "2010-09-09"
Description: This template deploys microservice to a ECS cluster with Fargate.
Parameters:
AppName:
Description: Tech stack name to use when constructing resource names
Type: String

ApplicationIAMRole:
Description: "IAM Role for the application"
Type: String

CloudformationBucket:
Type: String

DockerImage:
Description: Placeholder image for the primeval task definition
Type: String
Default: moodysanalytics.jfrog.io/

env:
Type: String
Default: dev

version:
Type: String

AWSSSMAgentDockerImage:
Description: Image for AWS Amazon SSM Agent
Type: String
Default: "amazon-ssm-agent"

ApplicationPort:
Type: String
Default: "80"

TaskDefMemory:
Type: String
Default: 2048

TaskDefCPU:
Type: String
Default: 1024

Conditions:
IsTrueCondition: !And
- !Equals [ !Ref AWSSSMAgentDockerImage, "amazon-ssm-agent" ]
- !Equals [ !Ref ApplicationPort, "80" ]
IsFalseCondition: !And
- !Equals [ !Ref AWSSSMAgentDockerImage, "amazon-ssm-agent" ]
- !Equals [ !Ref ApplicationPort, "81" ]

Resources:
PassingExample:
Type: AWS::ECS::TaskDefinition
Properties:
Family: !Ref AppName
ContainerDefinitions:
- !If
- IsTrueCondition
- Name: amazon-ssm-agent
Image: !Ref AWSSSMAgentDockerImage
Essential: true
Cpu: 0
EntryPoint: [ ]
Command: [
"/bin/bash",
"-c",
"echo hello"
]
Environment:
- Name: MANAGED_INSTANCE_ROLE_NAME
Value: lobadmin-managed-fis-ssm-instance
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-group: !Ref SSMLogGroup
awslogs-region: !Ref AWS::Region
awslogs-stream-prefix: !Ref AppName
- !Ref "AWS::NoValue"
Cpu: !Ref TaskDefCPU
Memory: !Ref TaskDefMemory
ExecutionRoleArn: !Sub arn:aws:iam::${AWS::AccountId}:role/${ApplicationIAMRole}
TaskRoleArn: !Sub arn:aws:iam::${AWS::AccountId}:role/lobadmin-managed-ecs-instance-default
NetworkMode: awsvpc
RequiresCompatibilities:
- FARGATE
Tags:
- Key: application_name
Value: !Ref AppName

FailingExample:
Type: AWS::ECS::TaskDefinition
Properties:
Family: !Ref AppName
ContainerDefinitions:
- !If
- IsFalseCondition
- Name: amazon-ssm-agent
Image: !Ref AWSSSMAgentDockerImage
Essential: true
Cpu: 0
EntryPoint: [ ]
Command: [
"/bin/bash",
"-c",
"echo hello"
]
Environment:
- Name: MANAGED_INSTANCE_ROLE_NAME
Value: lobadmin-managed-fis-ssm-instance
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-group: !Ref SSMLogGroup
awslogs-region: !Ref AWS::Region
awslogs-stream-prefix: !Ref AppName
- !Ref "AWS::NoValue"
Cpu: !Ref TaskDefCPU
Memory: !Ref TaskDefMemory
ExecutionRoleArn: !Sub arn:aws:iam::${AWS::AccountId}:role/${ApplicationIAMRole}
TaskRoleArn: !Sub arn:aws:iam::${AWS::AccountId}:role/lobadmin-managed-ecs-instance-default
NetworkMode: awsvpc
RequiresCompatibilities:
- FARGATE
Tags:
- Key: application_name
Value: !Ref AppName
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,45 @@ def test_jsonpath_policy(self):
self.assertEqual(failing_resources, failed_check_resources)


def test_complex_jsonpath_if_condition(self):
test_dir_path = Path(__file__).parent / "resources" / "complex_jsonpath_if_condition"
check_dir = Path(__file__).parent / "external_graph_checks"

test_check_registry = Registry(
checks_dir=f'{check_dir}',
parser=GraphCheckParser()
)

# when
report = Runner(
external_registries=[test_check_registry]).\
run(root_folder=str(test_dir_path),
runner_filter=RunnerFilter(checks=["complex_jsonpath_if_condition"])
)

# then
summary = report.get_summary()

passing_resources = {
"AWS::ECS::TaskDefinition.PassingExample",
}

failing_resources = {
"AWS::ECS::TaskDefinition.FailingExample"
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}


self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()
Loading