Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(neptune): add new fixer neptune_cluster_public_snapshot_fixer #5749

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.neptune.neptune_client import neptune_client


def fixer(resource_id: str, region: str) -> bool:
"""
Modify the attributes of a Neptune DB cluster snapshot to remove public access.
Specifically, this fixer removes the 'all' value from the 'restore' attribute to
prevent the snapshot from being publicly accessible.

Requires the rds:ModifyDBClusterSnapshotAttribute permissions.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "rds:ModifyDBClusterSnapshotAttribute",
"Resource": "*"
}
]
}

Args:
resource_id (str): The DB cluster snapshot identifier.
region (str): AWS region where the snapshot exists.

Returns:
bool: True if the operation is successful (public access is removed), False otherwise.
"""
try:
regional_client = neptune_client.regional_clients[region]
regional_client.modify_db_cluster_snapshot_attribute(
DBClusterSnapshotIdentifier=resource_id,
AttributeName="restore",
ValuesToRemove=["all"],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from unittest import mock

import botocore
import botocore.client
from moto import mock_aws

from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider

mock_make_api_call = botocore.client.BaseClient._make_api_call


def mock_make_api_call_public_snapshot(self, operation_name, kwarg):
if operation_name == "ModifyDBClusterSnapshotAttribute":
return {
"DBClusterSnapshotAttributesResult": {
"DBClusterSnapshotAttributes": [
{
"AttributeName": "restore",
"DBClusterSnapshotIdentifier": "test-snapshot",
"AttributeValues": [],
}
]
}
}
return mock_make_api_call(self, operation_name, kwarg)


def mock_make_api_call_public_snapshot_error(self, operation_name, kwarg):
if operation_name == "ModifyDBClusterSnapshotAttribute":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "DBClusterSnapshotNotFoundFault",
"Message": "DBClusterSnapshotNotFoundFault",
}
},
operation_name,
)
return mock_make_api_call(self, operation_name, kwarg)


class Test_neptune_cluster_public_snapshot_fixer:
@mock_aws
def test_neptune_cluster_public_snapshot_fixer(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_public_snapshot,
):
from prowler.providers.aws.services.neptune.neptune_service import Neptune

aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer.neptune_client",
new=Neptune(aws_provider),
):
from prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer import (
fixer,
)

assert fixer(resource_id="test-snapshot", region=AWS_REGION_EU_WEST_1)

@mock_aws
def test_neptune_cluster_public_snapshot_fixer_error(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_public_snapshot_error,
):
from prowler.providers.aws.services.neptune.neptune_service import Neptune

aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer.neptune_client",
new=Neptune(aws_provider),
):
from prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer import (
fixer,
)

assert not fixer(
resource_id="test-snapshot", region=AWS_REGION_EU_WEST_1
)
Loading