Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dms): add new check dms_endpoint_redis_tls_enabled #5583

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "dms_endpoint_redis_tls_enabled",
MrCloudSec marked this conversation as resolved.
Show resolved Hide resolved
"CheckTitle": "Check if DMS endpoints for Redis OSS have TLS enabled.",
MrCloudSec marked this conversation as resolved.
Show resolved Hide resolved
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "dms",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:dms:region:account-id:endpoint/endpoint-id",
"Severity": "medium",
"ResourceType": "AwsDmsEndpoint",
"Description": "This control checks whether an AWS DMS endpoint for Redis OSS is configured with a TLS connection. The control fails if the endpoint doesn't have TLS enabled.",
"Risk": "Without TLS, data transmitted between databases may be vulnerable to interception or eavesdropping, increasing the risk of data breaches and other security incidents.",
"RelatedUrl": "https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.Redis.html",
"Remediation": {
"Code": {
"CLI": "aws dms modify-endpoint --endpoint-arn <endpoint-arn> --redis-settings '{'SslSecurityProtocol': 'ssl-encryption'}'",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/dms-controls.html#dms-12",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable TLS for DMS endpoints for Redis OSS to ensure encrypted communication during data migration.",
"Url": "https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Redis.html#CHAP_Target.Redis.EndpointSettings"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import List

from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.dms.dms_client import dms_client


class dms_endpoint_redis_tls_enabled(Check):
"""
Check if AWS DMS Endpoints for Redis OSS have TLS enabled.

This class verifies whether each AWS DMS Endpoint configured for Redis OSS has TLS enabled
by checking the `TlsEnabled` property in the endpoint's configuration. The check ensures that
TLS is enabled to secure data in transit, preventing unauthorized access and ensuring data integrity.
"""

def execute(self) -> List[Check_Report_AWS]:
"""
Execute the DMS Redis TLS enabled check.

Iterates over all DMS Endpoints and generates a report indicating whether
each Redis OSS endpoint has TLS enabled.

Returns:
List[Check_Report_AWS]: A list of report objects with the results of the check.
"""
findings = []
for endpoint_arn, endpoint in dms_client.endpoints.items():
if endpoint.engine_name == "redis":
report = Check_Report_AWS(self.metadata())
report.resource_id = endpoint.id
report.resource_arn = endpoint_arn
report.region = endpoint.region
report.resource_tags = endpoint.tags
report.status = "FAIL"
report.status_extended = f"DMS Endpoint '{endpoint.id}' for Redis OSS does not have TLS enabled."
if endpoint.redis_tls_enabled == "ssl-encryption":
report.status = "PASS"
report.status_extended = (
f"DMS Endpoint '{endpoint.id}' for Redis OSS has TLS enabled."
)

findings.append(report)

return findings
4 changes: 4 additions & 0 deletions prowler/providers/aws/services/dms/dms_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def _describe_endpoints(self, regional_client):
id=endpoint["EndpointIdentifier"],
region=regional_client.region,
ssl_mode=endpoint.get("SslMode", False),
redis_tls_enabled=endpoint.get("RedisSettings", {}).get(
"SslSecurityProtocol", "plaintext"
),
mongodb_auth_type=endpoint.get("MongoDbSettings", {}).get(
"AuthType", "no"
),
Expand Down Expand Up @@ -101,6 +104,7 @@ class Endpoint(BaseModel):
region: str
ssl_mode: str
tags: Optional[list]
redis_tls_enabled: Optional[str]
MrCloudSec marked this conversation as resolved.
Show resolved Hide resolved
mongodb_auth_type: str
neptune_iam_auth_enabled: bool = False
engine_name: str
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
from unittest import mock

import botocore
from boto3 import client
from moto import mock_aws

from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)

make_api_call = botocore.client.BaseClient._make_api_call

DMS_ENDPOINT_NAME = "dms-endpoint"
DMS_ENDPOINT_ARN = f"arn:aws:dms:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:endpoint:{DMS_ENDPOINT_NAME}"
DMS_INSTANCE_NAME = "rep-instance"
DMS_INSTANCE_ARN = (
f"arn:aws:dms:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:rep:{DMS_INSTANCE_NAME}"
)


def mock_make_api_call_enabled_not_redis(self, operation_name, kwarg):
if operation_name == "DescribeEndpoints":
return {
"Endpoints": [
{
"EndpointIdentifier": DMS_ENDPOINT_NAME,
"EndpointArn": DMS_ENDPOINT_ARN,
"SslMode": "require",
"RedisSettings": {
"SslSecurityProtocol": "ssl-encryption",
},
"EngineName": "oracle",
}
]
}
elif operation_name == "ListTagsForResource":
if kwarg["ResourceArn"] == DMS_INSTANCE_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "rep-instance"},
{"Key": "Owner", "Value": "admin"},
]
}
elif kwarg["ResourceArn"] == DMS_ENDPOINT_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "dms-endpoint"},
{"Key": "Owner", "Value": "admin"},
]
}
return make_api_call(self, operation_name, kwarg)


def mock_make_api_call_enabled(self, operation_name, kwarg):
if operation_name == "DescribeEndpoints":
return {
"Endpoints": [
{
"EndpointIdentifier": DMS_ENDPOINT_NAME,
"EndpointArn": DMS_ENDPOINT_ARN,
"SslMode": "require",
"RedisSettings": {
"SslSecurityProtocol": "ssl-encryption",
},
"EngineName": "redis",
}
]
}
elif operation_name == "ListTagsForResource":
if kwarg["ResourceArn"] == DMS_INSTANCE_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "rep-instance"},
{"Key": "Owner", "Value": "admin"},
]
}
elif kwarg["ResourceArn"] == DMS_ENDPOINT_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "dms-endpoint"},
{"Key": "Owner", "Value": "admin"},
]
}
return make_api_call(self, operation_name, kwarg)


def mock_make_api_call_not_enabled(self, operation_name, kwarg):
if operation_name == "DescribeEndpoints":
return {
"Endpoints": [
{
"EndpointIdentifier": DMS_ENDPOINT_NAME,
"EndpointArn": DMS_ENDPOINT_ARN,
"SslMode": "require",
"RedisSettings": {
"SslSecurityProtocol": "plaintext",
},
"EngineName": "redis",
}
]
}
elif operation_name == "ListTagsForResource":
if kwarg["ResourceArn"] == DMS_INSTANCE_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "rep-instance"},
{"Key": "Owner", "Value": "admin"},
]
}
elif kwarg["ResourceArn"] == DMS_ENDPOINT_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "dms-endpoint"},
{"Key": "Owner", "Value": "admin"},
]
}
return make_api_call(self, operation_name, kwarg)


class Test_dms_endpoint_redis_tls_enabled:
@mock_aws
def test_no_dms_endpoints(self):
dms_client = client("dms", region_name=AWS_REGION_US_EAST_1)
dms_client.endpoints = {}

from prowler.providers.aws.services.dms.dms_service import DMS

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.dms.dms_endpoint_redis_tls_enabled.dms_endpoint_redis_tls_enabled.dms_client",
new=DMS(aws_provider),
):
# Test Check
from prowler.providers.aws.services.dms.dms_endpoint_redis_tls_enabled.dms_endpoint_redis_tls_enabled import (
dms_endpoint_redis_tls_enabled,
)

check = dms_endpoint_redis_tls_enabled()
result = check.execute()

assert len(result) == 0

@mock_aws
def test_dms_not_mongodb_auth_mecanism_enabled(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_enabled_not_redis,
):

from prowler.providers.aws.services.dms.dms_service import DMS

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.dms.dms_endpoint_redis_tls_enabled.dms_endpoint_redis_tls_enabled.dms_client",
new=DMS(aws_provider),
):
# Test Check
from prowler.providers.aws.services.dms.dms_endpoint_redis_tls_enabled.dms_endpoint_redis_tls_enabled import (
dms_endpoint_redis_tls_enabled,
)

check = dms_endpoint_redis_tls_enabled()
result = check.execute()

assert len(result) == 0

@mock_aws
def test_dms_mongodb_auth_mecanism_not_enabled(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_not_enabled,
):

from prowler.providers.aws.services.dms.dms_service import DMS

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.dms.dms_endpoint_redis_tls_enabled.dms_endpoint_redis_tls_enabled.dms_client",
new=DMS(aws_provider),
):
# Test Check
from prowler.providers.aws.services.dms.dms_endpoint_redis_tls_enabled.dms_endpoint_redis_tls_enabled import (
dms_endpoint_redis_tls_enabled,
)

check = dms_endpoint_redis_tls_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
"DMS Endpoint 'dms-endpoint' for Redis OSS does not have TLS enabled."
)
assert result[0].resource_id == "dms-endpoint"
assert (
result[0].resource_arn
== "arn:aws:dms:us-east-1:123456789012:endpoint:dms-endpoint"
)
assert result[0].resource_tags == [
{
"Key": "Name",
"Value": "dms-endpoint",
},
{
"Key": "Owner",
"Value": "admin",
},
]
assert result[0].region == "us-east-1"

@mock_aws
def test_dms_mongodb_auth_mecanism_enabled(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_enabled,
):

from prowler.providers.aws.services.dms.dms_service import DMS

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.dms.dms_endpoint_redis_tls_enabled.dms_endpoint_redis_tls_enabled.dms_client",
new=DMS(aws_provider),
):
# Test Check
from prowler.providers.aws.services.dms.dms_endpoint_redis_tls_enabled.dms_endpoint_redis_tls_enabled import (
dms_endpoint_redis_tls_enabled,
)

check = dms_endpoint_redis_tls_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
"DMS Endpoint 'dms-endpoint' for Redis OSS has TLS enabled."
)
assert result[0].resource_id == "dms-endpoint"
assert (
result[0].resource_arn
== "arn:aws:dms:us-east-1:123456789012:endpoint:dms-endpoint"
)
assert result[0].resource_tags == [
{
"Key": "Name",
"Value": "dms-endpoint",
},
{
"Key": "Owner",
"Value": "admin",
},
]
assert result[0].region == "us-east-1"
4 changes: 4 additions & 0 deletions tests/providers/aws/services/dms/dms_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def mock_make_api_call(self, operation_name, kwargs):
"EndpointIdentifier": DMS_ENDPOINT_NAME,
"EndpointArn": DMS_ENDPOINT_ARN,
"SslMode": "require",
"RedisSettings": {
"SslSecurityProtocol": "ssl-encryption",
},
"MongoDbSettings": {
"AuthType": "password",
},
Expand Down Expand Up @@ -128,6 +131,7 @@ def test_describe_endpoints(self):
assert len(dms.endpoints) == 1
assert dms.endpoints[DMS_ENDPOINT_ARN].id == DMS_ENDPOINT_NAME
assert dms.endpoints[DMS_ENDPOINT_ARN].ssl_mode == "require"
assert dms.endpoints[DMS_ENDPOINT_ARN].redis_tls_enabled == "ssl-encryption"
assert dms.endpoints[DMS_ENDPOINT_ARN].mongodb_auth_type == "password"
assert dms.endpoints[DMS_ENDPOINT_ARN].neptune_iam_auth_enabled
assert dms.endpoints[DMS_ENDPOINT_ARN].engine_name == "neptune"
Expand Down
Loading