Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cloudformation): Support Fn::Sub in cases of using a pseudo parameter #6835

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from __future__ import annotations

import json
import logging
from typing import TYPE_CHECKING, Tuple, List, Any, Dict, Optional, Callable, TypedDict

from checkov.cloudformation.graph_builder.graph_components.block_types import BlockType
from checkov.cloudformation.graph_builder.utils import get_referenced_vertices_in_value, find_all_interpolations
from checkov.cloudformation.graph_builder.variable_rendering.vertex_reference import VertexReference
from checkov.cloudformation.parser.cfn_keywords import IntrinsicFunctions, ConditionFunctions
from checkov.cloudformation.parser.cfn_keywords import IntrinsicFunctions, ConditionFunctions, PseudoParameters
from checkov.common.graph.graph_builder import Edge, CustomAttributes
from checkov.common.graph.graph_builder.graph_components.blocks import Block
from checkov.common.graph.graph_builder.variable_rendering.renderer import VariableRenderer
from checkov.common.parsers.node import StrNode
from checkov.common.util.data_structures_utils import pickle_deepcopy

if TYPE_CHECKING:
Expand Down Expand Up @@ -510,5 +512,32 @@ def _evaluate_cfn_function(
return evaluated_value, changed_origin_id, attribute_at_dest

def evaluate_non_rendered_values(self) -> None:
# not used
pass

for vertex in self.local_graph.vertices:
vertex_attributes = pickle_deepcopy(vertex.attributes)
for attr_key, attr_value in vertex_attributes.items():
self._handle_sub_with_pseudo_param(attr_key, attr_value, vertex)

@staticmethod
def _handle_sub_with_pseudo_param(attr_key: str, attr_value: Any, vertex: CloudformationBlock) -> None:
"""
Pseudo Parameter in CFN is a parameter which is dynamically available (see reference).
As we do not render it on buildtime, we want to handle this case by keeping the reference itself without the
value, so we can at least build a semi-full resource.
https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/pseudo-parameter-reference.html
"""
if isinstance(attr_value, dict) and IntrinsicFunctions.SUB in attr_value:
inner_value = attr_value[IntrinsicFunctions.SUB]
is_pseudo_param_in_value = any([p.value for p in PseudoParameters if p.value in inner_value])
if isinstance(inner_value, (str, StrNode)):
try:
inner_value = json.loads(inner_value)
except Exception as e:
logging.warning(f"[Cloudformation_evaluate_non_rendered_values]- "
f"Inner_value - {inner_value} is not a valid json. "
f"Full exception - {str(e)}")
if is_pseudo_param_in_value:
vertex.update_attribute(
attribute_key=attr_key, attribute_value=inner_value, change_origin_id=None,
previous_breadcrumbs=[], attribute_at_dest=None
)
11 changes: 11 additions & 0 deletions checkov/cloudformation/parser/cfn_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,14 @@ class TemplateSections(str, Enum):
TRANSFORM = "Transform"
OUTPUTS = "Outputs"
GLOBALS = "Globals"


class PseudoParameters(Enum):
ACCOUNT_ID = "AWS::AccountId"
NOTIFICATION_ARNS = "AWS::NotificationARNs"
NO_VALUE = "AWS::NoValue"
PARTITION = "AWS::Partition"
REGION = "AWS::Region"
STACK_ID = "AWS::StackId"
STACK_NAME = "AWS::StackName"
URL_SUFFIX = "AWS::URLSuffix"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"Type": "AWS::RDS::DBInstance",
"Properties": {
"DBName": {
"Fn::Sub": "rds-${AWS::AccountId}-${CompanyName}-${Environment}"
"Fn::Sub": "rds-${CompanyName}-${Environment}"
},
"DBInstanceClass": "db.m4.large"
}
Expand Down Expand Up @@ -52,7 +52,7 @@
"DefaultDBName": {
"Description": "DefaultDB Name",
"Value": {
"Fn::Sub": "rds-${AWS::AccountId}-${CompanyName}-${Environment}"
"Fn::Sub": "rds-${CompanyName}-${Environment}"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Resources:
DefaultDB:
Type: AWS::RDS::DBInstance
Properties:
DBName: !Sub "rds-${AWS::AccountId}-${CompanyName}-${Environment}"
DBName: !Sub "rds-${CompanyName}-${Environment}"
DBInstanceClass: "db.m4.large"
Outputs:
DBEndpoint:
Expand All @@ -31,5 +31,5 @@ Outputs:
Value: !Sub ${WebVPC.CidrBlockAssociations}
DefaultDBName:
Description: DefaultDB Name
Value: !Sub "rds-${AWS::AccountId}-${CompanyName}-${Environment}"
Value: !Sub "rds-${CompanyName}-${Environment}"

4 changes: 2 additions & 2 deletions tests/cloudformation/graph/graph_builder/test_render.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ def validate_render_sub(self, test_dir: str, file_ext: str):
environment_expected_attributes = {'Default': None}
# Resources
web_vpc_expected_attributes = {'CidrBlock': web_vpc_expected_cidr_block}
default_db_expected_attributes = {'DBName': {'Fn::Sub': 'rds-${AWS::AccountId}-${CompanyName}-${Environment}'}}
default_db_expected_attributes = {'DBName': {'Fn::Sub': 'rds-${CompanyName}-${Environment}'}}
# Outputs
db_endpoint_sg_expected_attributes = {'Value.Fn::Sub': "${DefaultDB.Endpoint.Address}:${DefaultDB.Endpoint.Port}"}
web_vpc_cidr_block_expected_attributes = {'Value': web_vpc_expected_cidr_block}
cidr_block_associations_expected_attributes = {'Value.Fn::Sub': "${WebVPC.CidrBlockAssociations}"}
default_db_name_expected_attributes = {'Value': {'Fn::Sub': 'rds-${AWS::AccountId}-${CompanyName}-${Environment}'}}
default_db_name_expected_attributes = {'Value': {'Fn::Sub': 'rds-${CompanyName}-${Environment}'}}

self.compare_vertex_attributes(local_graph, company_name_expected_attributes, BlockType.PARAMETERS, 'CompanyName')
self.compare_vertex_attributes(local_graph, environment_expected_attributes, BlockType.PARAMETERS, 'Environment')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
metadata:
id: "CKV2_CFN_JSONPATH_POLICY"
name: "Jsonpath policy for cloudformation"
severity: "high"
guidelines: "Mediastore container and objects must not be accessible anonymously"
category: "general"
scope:
provider: "aws"
definition:
and:
- cond_type: "attribute"
resource_types:
- "AWS::MediaStore::Container"
attribute: "Policy.Statement[?(@.Effect == 'Allow' & @.Principal == '*')]"
operator: "jsonpath_not_exists"
- cond_type: "attribute"
resource_types:
- "AWS::MediaStore::Container"
attribute: "Policy.Statement[?(@.Effect == 'Allow')].Principal.AWS[*]"
operator: "jsonpath_not_equals"
value: "*"
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"fail-dict": {
"Type": "AWS::MediaStore::Container",
"Properties": {
"ContainerName": "fail-dict",
"Policy": {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "MediaStoredenyAccess",
"Effect": "Allow",
"Action": [
"mediastore:GetObject",
"mediastore:DeleteObject",
"mediastore:DescribeObject",
"mediastore:ListItems"
],
"Principal": "*",
"Resource": "arn:aws:mediastore:${AWS::Region}:${AWS::AccountId}:container/compmediastorecontainer/*",
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"fail-str": {
"Type": "AWS::MediaStore::Container",
"Properties": {
"ContainerName": "fail-str",
"Policy": {
"Fn::Sub": "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Sid\":\"MediaStoredenyAccess\",\"Effect\":\"Allow\",\"Action\":[\"mediastore:GetObject\",\"mediastore:DeleteObject\",\"mediastore:DescribeObject\",\"mediastore:ListItems\"],\"Principal\":\"*\",\"Resource\":\"arn:aws:mediastore:${AWS::Region}:${AWS::AccountId}:container/compmediastorecontainer/*\",\"Condition\":{\"Bool\":{\"aws:SecureTransport\":\"false\"}}}]}"
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"pass-str": {
"Type": "AWS::MediaStore::Container",
"Properties": {
"ContainerName": "pass-str",
"Policy": {
"Fn::Sub": "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Sid\":\"MediaStoredenyAccess\",\"Effect\":\"Deny\",\"Action\":[\"mediastore:GetObject\",\"mediastore:DeleteObject\",\"mediastore:DescribeObject\",\"mediastore:ListItems\"],\"Principal\":\"*\",\"Resource\":\"arn:aws:mediastore:${AWS::Region}:${AWS::AccountId}:container/ncmediastorecontainer/*\",\"Condition\":{\"Bool\":{\"aws:SecureTransport\":\"false\"}}}]}"
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import pytest

from checkov.cloudformation.runner import Runner
from checkov.common.checks_infra.checks_parser import GraphCheckParser
from checkov.common.checks_infra.registry import Registry
from checkov.runner_filter import RunnerFilter


Expand Down Expand Up @@ -47,6 +49,43 @@ def test_runner_sam(self):

self.assertEqual(passing_resources, passed_check_resources)

def test_jsonpath_policy(self):
test_dir_path = Path(__file__).parent / "resources" / "jsonpath_policy"
check_dir = Path(__file__).parent / "external_graph_checks"

test_check_registry = Registry(
checks_dir=f'{check_dir}',
parser=GraphCheckParser()
)

# when
report = Runner(external_registries=[test_check_registry]).run(root_folder=str(test_dir_path),
runner_filter=RunnerFilter(checks=["CKV2_CFN_JSONPATH_POLICY"]))

# then
summary = report.get_summary()

passing_resources = {
"AWS::MediaStore::Container.pass-str",
}

failing_resources = {
"AWS::MediaStore::Container.fail-str",
"AWS::MediaStore::Container.fail-dict",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}


self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 2)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()
Loading