Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kinesis): add new check kinesis_stream_data_retention_period #5547

5 changes: 5 additions & 0 deletions prowler/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ aws:
# Maximum number of days a secret should be rotated
max_days_secret_unrotated: 90

# AWS Kinesis Configuration
# Minimum retention period in hours for Kinesis streams
min_kinesis_stream_retention_hours: 168 # 7 days


# Azure Configuration
azure:
# Azure Network Configuration
Expand Down
8 changes: 5 additions & 3 deletions prowler/providers/aws/services/kinesis/kinesis_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum
from typing import Optional
from typing import Dict, List, Optional

from pydantic import BaseModel
from pydantic import BaseModel, Field

from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
Expand Down Expand Up @@ -49,6 +49,7 @@ def _describe_stream(self, stream):
stream.encrypted_at_rest = EncryptionType(
stream_description.get("EncryptionType", "NONE")
)
stream.retention_period = stream_description.get("RetentionPeriodHours", 24)
except Exception as error:
logger.error(
f"{stream.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
Expand Down Expand Up @@ -91,5 +92,6 @@ class Stream(BaseModel):
region: str
name: str
status: StreamStatus
tags: Optional[list]
tags: Optional[List[Dict[str, str]]] = Field(default_factory=list)
encrypted_at_rest: EncryptionType = EncryptionType.NONE
retention_period: int = 24 # 1 day
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "kinesis_stream_data_retention_period",
"CheckTitle": "Kinesis streams should have an adequate data retention period.",
"CheckType": [
"Software and Configuration Checks/Industry and Regulatory Standards/AWS Foundational Security Best Practices"
],
"ServiceName": "kinesis",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:kinesis::account-id:stream/stream-name",
"Severity": "medium",
"ResourceType": "AwsKinesisStream",
"Description": "Ensure Kinesis streams have an adequate data retention period.",
"Risk": "An inadequate data retention period may result in data records being deleted before they can be processed or backed up, increasing the risk of data loss. This is especially critical for applications that rely on historical data availability for analysis, monitoring, and recovery in case of failures.",
"RelatedUrl": "https://docs.aws.amazon.com/config/latest/developerguide/kinesis-stream-backup-retention-check.html",
"Remediation": {
"Code": {
"CLI": "aws kinesis increase-stream-retention-period --stream-name <stream-name> --retention-period-hours <hours>",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/kinesis-controls.html#kinesis-3",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure an adequate data retention period for Kinesis streams to ensure data is available for the required timeframe. Set the retention period based on your application’s data retention requirements, and consider at least 168 hours (or customize as necessary).",
"Url": "https://docs.aws.amazon.com/streams/latest/dev/kinesis-extended-retention.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.kinesis.kinesis_client import kinesis_client


class kinesis_stream_data_retention_period(Check):
"""Ensure Kinesis Stream has an adequate data retention period

The retention period for Kinesis Streams should be set to a value that meets the organization's data retention policy.
"""

def execute(self):
"""Execute Check Kinesis Stream data retention period

Iterate over all Kinesis Streams and check if the retention period is adequate.

Returns:
findings (list): List of findings
"""
findings = []
for stream in kinesis_client.streams.values():
report = Check_Report_AWS(self.metadata())
report.region = stream.region
report.resource_id = stream.name
report.resource_arn = stream.arn
report.resource_tags = stream.tags
report.status = "FAIL"
report.status_extended = f"Kinesis Stream {stream.name} does not have an adequate data retention period ({stream.retention_period}hrs)."

if stream.retention_period >= kinesis_client.audit_config.get(
"min_kinesis_stream_retention_hours", 168
):
report.status = "PASS"
report.status_extended = f"Kinesis Stream {stream.name} does have an adequate data retention period ({stream.retention_period}hrs)."

findings.append(report)

return findings
22 changes: 21 additions & 1 deletion tests/providers/aws/services/kinesis/kinesis_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import botocore
from moto import mock_aws

from prowler.providers.aws.services.kinesis.kinesis_service import Kinesis, StreamStatus
from prowler.providers.aws.services.kinesis.kinesis_service import (
EncryptionType,
Kinesis,
StreamStatus,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider

make_api_call = botocore.client.BaseClient._make_api_call
Expand All @@ -28,6 +32,8 @@ def mock_make_api_call(self, operation_name, kwarg):
"StreamARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",
"StreamStatus": "ACTIVE",
"Tags": [{"Key": "test_tag", "Value": "test_value"}],
"EncryptionType": "KMS",
"RetentionPeriodHours": 24,
}
}
if operation_name == "ListTagsForStream":
Expand Down Expand Up @@ -72,3 +78,17 @@ def test_list_streamscomplete(self):
assert kinesis.streams[arn].tags == [{"Key": "test_tag", "Value": "test_value"}]
assert kinesis.streams[arn].region == AWS_REGION_US_EAST_1
assert kinesis.streams[arn].arn == arn

@mock_aws
def test_describe_stream(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
kinesis = Kinesis(aws_provider)

arn = "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
assert kinesis.streams[arn].name == "test-stream"
assert kinesis.streams[arn].status == StreamStatus.ACTIVE
assert kinesis.streams[arn].tags == [{"Key": "test_tag", "Value": "test_value"}]
assert kinesis.streams[arn].region == AWS_REGION_US_EAST_1
assert kinesis.streams[arn].arn == arn
assert kinesis.streams[arn].encrypted_at_rest == EncryptionType.KMS
assert kinesis.streams[arn].retention_period == 24
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from unittest import mock

from boto3 import client
from moto import mock_aws

from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)


class Test_kinesis_encrypted_at_rest:
@mock_aws
def test_no_streams(self):
from prowler.providers.aws.services.kinesis.kinesis_service import Kinesis

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.kinesis.kinesis_stream_data_retention_period.kinesis_stream_data_retention_period.kinesis_client",
new=Kinesis(aws_provider),
):
# Test Check
from prowler.providers.aws.services.kinesis.kinesis_stream_data_retention_period.kinesis_stream_data_retention_period import (
kinesis_stream_data_retention_period,
)

check = kinesis_stream_data_retention_period()
result = check.execute()

assert len(result) == 0

@mock_aws
def test_adequate_retention_period(self):
kinesis_client = client("kinesis", region_name=AWS_REGION_US_EAST_1)
stream_name = "stream_test_us"
kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=1,
StreamModeDetails={"StreamMode": "PROVISIONED"},
)
retention_period = 400
kinesis_client.increase_stream_retention_period(
StreamName=stream_name,
RetentionPeriodHours=retention_period,
StreamARN=f"arn:aws:kinesis:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:stream/{stream_name}",
)

kinesis_client.audit_config = mock.MagicMock()
kinesis_client.audit_config = {"min_kinesis_stream_retention_hours": 350}

from prowler.providers.aws.services.kinesis.kinesis_service import Kinesis

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.kinesis.kinesis_stream_data_retention_period.kinesis_stream_data_retention_period.kinesis_client",
new=Kinesis(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.kinesis.kinesis_stream_data_retention_period.kinesis_stream_data_retention_period.kinesis_client.audit_config",
new=kinesis_client.audit_config,
):
# Test Check
from prowler.providers.aws.services.kinesis.kinesis_stream_data_retention_period.kinesis_stream_data_retention_period import (
kinesis_stream_data_retention_period,
)

check = kinesis_stream_data_retention_period()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Kinesis Stream {stream_name} does have an adequate data retention period ({retention_period}hrs)."
)
assert result[0].resource_id == stream_name
assert (
result[0].resource_arn
== f"arn:aws:kinesis:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:stream/{stream_name}"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1

@mock_aws
def test_unadequate_retention_period(self):
kinesis_client = client("kinesis", region_name=AWS_REGION_US_EAST_1)
stream_name = "stream_test_us"
kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=1,
StreamModeDetails={"StreamMode": "PROVISIONED"},
)
retention_period = 200
kinesis_client.increase_stream_retention_period(
StreamName=stream_name,
RetentionPeriodHours=retention_period,
StreamARN=f"arn:aws:kinesis:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:stream/{stream_name}",
)

kinesis_client.audit_config = mock.MagicMock()
kinesis_client.audit_config = {"min_kinesis_stream_retention_hours": 250}

from prowler.providers.aws.services.kinesis.kinesis_service import Kinesis

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.kinesis.kinesis_stream_data_retention_period.kinesis_stream_data_retention_period.kinesis_client",
new=Kinesis(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.kinesis.kinesis_stream_data_retention_period.kinesis_stream_data_retention_period.kinesis_client.audit_config",
new=kinesis_client.audit_config,
):
# Test Check
from prowler.providers.aws.services.kinesis.kinesis_stream_data_retention_period.kinesis_stream_data_retention_period import (
kinesis_stream_data_retention_period,
)

check = kinesis_stream_data_retention_period()
result = check.execute()

assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Kinesis Stream {stream_name} does not have an adequate data retention period ({retention_period}hrs)."
)
assert result[0].resource_id == stream_name
assert (
result[0].resource_arn
== f"arn:aws:kinesis:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:stream/{stream_name}"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1