Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(servicecatalog): Add new service servicecatalog #5618

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from prowler.providers.aws.services.servicecatalog.servicecatalog_service import (
ServiceCatalog,
)
from prowler.providers.common.provider import Provider

servicecatalog_client = ServiceCatalog(Provider.get_global_provider())
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from typing import Optional

from pydantic import BaseModel

from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService

PORTFOLIO_SHARE_TYPES = [
"ACCOUNT",
"ORGANIZATION",
"ORGANIZATION_UNIT",
"ORGANIZATION_MEMBER_ACCOUNT",
]


class ServiceCatalog(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.portfolios = {}
self.__threading_call__(self._list_portfolios)
self.__threading_call__(
self._describe_portfolio_shares, self.portfolios.values()
)
self.__threading_call__(self._describe_portfolio, self.portfolios.values())

def _list_portfolios(self, regional_client):
logger.info("ServiceCatalog - listing portfolios...")
try:
response = regional_client.list_portfolios()
for portfolio in response["PortfolioDetails"]:
portfolio_arn = portfolio["ARN"]
if not self.audit_resources or (
is_resource_filtered(portfolio_arn, self.audit_resources)
):
self.portfolios[portfolio_arn] = Portfolio(
arn=portfolio_arn,
id=portfolio["Id"],
name=portfolio["DisplayName"],
region=regional_client.region,
)
except Exception as error:
logger.error(

Check warning on line 44 in prowler/providers/aws/services/servicecatalog/servicecatalog_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/servicecatalog/servicecatalog_service.py#L43-L44

Added lines #L43 - L44 were not covered by tests
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _describe_portfolio_shares(self, portfolio):
try:
logger.info("ServiceCatalog - describing portfolios shares...")
try:
regional_client = self.regional_clients[portfolio.region]
for portfolio_type in PORTFOLIO_SHARE_TYPES:
for share in regional_client.describe_portfolio_shares(
PortfolioId=portfolio.id,
Type=portfolio_type,
).get("PortfolioShareDetails", []):
portfolio_share = PortfolioShare(
type=portfolio_type,
accepted=share["Accepted"],
)
portfolio.shares.append(portfolio_share)
except Exception as error:
logger.error(

Check warning on line 64 in prowler/providers/aws/services/servicecatalog/servicecatalog_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/servicecatalog/servicecatalog_service.py#L63-L64

Added lines #L63 - L64 were not covered by tests
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(

Check warning on line 68 in prowler/providers/aws/services/servicecatalog/servicecatalog_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/servicecatalog/servicecatalog_service.py#L67-L68

Added lines #L67 - L68 were not covered by tests
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _describe_portfolio(self, portfolio):
try:
logger.info("ServiceCatalog - describing portfolios...")
try:
regional_client = self.regional_clients[portfolio.region]
portfolio.tags = regional_client.describe_portfolio(
PortfolioId=portfolio.id,
)["Tags"]
except Exception as error:
logger.error(

Check warning on line 81 in prowler/providers/aws/services/servicecatalog/servicecatalog_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/servicecatalog/servicecatalog_service.py#L80-L81

Added lines #L80 - L81 were not covered by tests
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(

Check warning on line 85 in prowler/providers/aws/services/servicecatalog/servicecatalog_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/servicecatalog/servicecatalog_service.py#L84-L85

Added lines #L84 - L85 were not covered by tests
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class PortfolioShare(BaseModel):
type: str
accepted: bool


class Portfolio(BaseModel):
id: str
name: str
arn: str
region: str
shares: Optional[list[PortfolioShare]] = []
tags: Optional[list] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from unittest.mock import patch

import botocore
from moto import mock_aws

from prowler.providers.aws.services.servicecatalog.servicecatalog_service import (
ServiceCatalog,
)
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)

make_api_call = botocore.client.BaseClient._make_api_call


def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListPortfolios":
return {
"PortfolioDetails": [
{
"Id": "portfolio-id-test",
"ARN": "arn:aws:servicecatalog:eu-west-1:123456789012:portfolio/portfolio-id-test",
"DisplayName": "portfolio-name",
}
],
}
elif operation_name == "DescribePortfolioShares":
return {
"PortfolioShareDetails": [
{
"Type": "ACCOUNT",
"Accepted": True,
}
],
}
elif operation_name == "DescribePortfolio":
return {
"Tags": {"tag1": "value1", "tag2": "value2"},
}
return make_api_call(self, operation_name, kwarg)


def mock_generate_regional_clients(provider, service):
regional_client = provider._session.current_session.client(
service, region_name=AWS_REGION_EU_WEST_1
)
regional_client.region = AWS_REGION_EU_WEST_1
return {AWS_REGION_EU_WEST_1: regional_client}


@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"prowler.providers.aws.aws_provider.AwsProvider.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_ServiceCatalog_Service:
# Test ServiceCatalog Service
def test_service(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
service_catalog = ServiceCatalog(aws_provider)
assert service_catalog.service == "servicecatalog"

# Test ServiceCatalog client
def test_client(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
service_catalog = ServiceCatalog(aws_provider)
for reg_client in service_catalog.regional_clients.values():
assert reg_client.__class__.__name__ == "ServiceCatalog"

# Test ServiceCatalog session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ses = ServiceCatalog(aws_provider)
assert ses.session.__class__.__name__ == "Session"

@mock_aws
# Test ServiceCatalog list portfolios
def test_list_portfolios(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
service_catalog = ServiceCatalog(aws_provider)
arn = f"arn:aws:servicecatalog:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:portfolio/portfolio-id-test"
assert service_catalog.portfolios[arn].name == "portfolio-name"
assert service_catalog.portfolios[arn].id == "portfolio-id-test"
assert service_catalog.portfolios[arn].arn == arn
assert service_catalog.portfolios[arn].region == AWS_REGION_EU_WEST_1

@mock_aws
# Test ServiceCatalog describe portfolio shares
def test_describe_portfolio_shares(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
service_catalog = ServiceCatalog(aws_provider)
arn = f"arn:aws:servicecatalog:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:portfolio/portfolio-id-test"
assert len(service_catalog.portfolios[arn].shares) == 4
assert service_catalog.portfolios[arn].shares[0].accepted
assert service_catalog.portfolios[arn].shares[0].type == "ACCOUNT"

@mock_aws
# Test ServiceCatalog describe portfolio
def test_describe_portfolio(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
service_catalog = ServiceCatalog(aws_provider)
arn = f"arn:aws:servicecatalog:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:portfolio/portfolio-id-test"
assert service_catalog.portfolios[arn].tags == {
"tag1": "value1",
"tag2": "value2",
}
Loading