Skip to content

Commit

Permalink
Being on PSL should skip subdomain enumeration, but not scanning. (#1324
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kazet authored Oct 14, 2024
1 parent f371b9a commit ef72062
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 25 deletions.
6 changes: 3 additions & 3 deletions artemis/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ class Locking:
] = get_config("SCAN_DESTINATION_LOCK_MAX_TRIES", default=2, cast=int)

class PublicSuffixes:
ALLOW_SCANNING_PUBLIC_SUFFIXES: Annotated[
ALLOW_SUBDOMAIN_ENUMERATION_IN_PUBLIC_SUFFIXES: Annotated[
bool,
"Whether we will scan a public suffix (e.g. .pl) if it appears on the target list. This may cause very large "
"Whether we will enumerate subdomains for a public suffix (e.g. .pl) if it appears on the target list. This may cause very large "
"number of domains to be scanned.",
] = get_config("ALLOW_SCANNING_PUBLIC_SUFFIXES", default=False, cast=bool)
] = get_config("ALLOW_SUBDOMAIN_ENUMERATION_IN_PUBLIC_SUFFIXES", default=False, cast=bool)

ADDITIONAL_PUBLIC_SUFFIXES: Annotated[
List[str],
Expand Down
22 changes: 2 additions & 20 deletions artemis/modules/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@
from typing import List, Optional

from karton.core import Task
from publicsuffixlist import PublicSuffixList

from artemis import load_risk_class
from artemis.binds import Service, TaskStatus, TaskType
from artemis.config import Config
from artemis.domains import is_domain
from artemis.module_base import ArtemisBase
from artemis.utils import check_output_log_on_error, is_ip_address, throttle_request

PUBLIC_SUFFIX_LIST = PublicSuffixList()


@load_risk_class.load_risk_class(load_risk_class.LoadRiskClass.LOW)
class Classifier(ArtemisBase):
Expand Down Expand Up @@ -137,6 +133,8 @@ def _to_ip_range(data: str) -> Optional[List[str]]:
def run(self, current_task: Task) -> None:
data = current_task.get_payload("data")

data = data.lower()

if not Classifier.is_supported(data):
self.db.save_task_result(
task=current_task, status=TaskStatus.ERROR, status_reason="Unsupported data: " + data
Expand Down Expand Up @@ -215,22 +213,6 @@ def run(self, current_task: Task) -> None:
else:
data = Classifier._clean_ipv6_brackets(data)

if task_type == TaskType.DOMAIN:
if (
PUBLIC_SUFFIX_LIST.publicsuffix(sanitized) == sanitized
or sanitized in Config.PublicSuffixes.ADDITIONAL_PUBLIC_SUFFIXES
):
if not Config.PublicSuffixes.ALLOW_SCANNING_PUBLIC_SUFFIXES:
message = (
f"{sanitized} is a public suffix - adding it to the list of "
"scanned targets may result in scanning too much. Quitting."
)
self.log.warning(message)
self.db.save_task_result(
task=current_task, status=TaskStatus.ERROR, status_reason=message, data=task_type
)
return

new_task = Task(
{"type": task_type},
payload={
Expand Down
19 changes: 18 additions & 1 deletion artemis/modules/subdomain_enumeration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from karton.core import Consumer, Task
from karton.core.config import Config as KartonConfig
from publicsuffixlist import PublicSuffixList

from artemis import load_risk_class
from artemis.binds import TaskStatus, TaskType
Expand All @@ -14,6 +15,8 @@
from artemis.module_base import ArtemisBase
from artemis.utils import check_output_log_on_error

PUBLIC_SUFFIX_LIST = PublicSuffixList()


class UnableToObtainSubdomainsException(Exception):
pass
Expand Down Expand Up @@ -127,7 +130,21 @@ def get_subdomains_from_gau(self, domain: str) -> Optional[Set[str]]:
)

def run(self, current_task: Task) -> None:
domain = current_task.get_payload("domain")
domain = current_task.get_payload("domain").lower()

if (
PUBLIC_SUFFIX_LIST.publicsuffix(domain) == domain
or domain in Config.PublicSuffixes.ADDITIONAL_PUBLIC_SUFFIXES
):
if not Config.PublicSuffixes.ALLOW_SUBDOMAIN_ENUMERATION_IN_PUBLIC_SUFFIXES:
message = (
f"{domain} is a public suffix - adding subdomains to the list of "
"scanned targets may result in scanning too much. Quitting."
)
self.log.warning(message)
self.db.save_task_result(task=current_task, status=TaskStatus.ERROR, status_reason=message)
return

encoded_domain = domain.encode("idna").decode("utf-8")

if self.redis.get(f"subdomain-enumeration-done-{encoded_domain}-{current_task.root_uid}"):
Expand Down
2 changes: 1 addition & 1 deletion test/modules/test_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,4 @@ def test_invalid_data(self) -> None:

self.run_task(task)
(call,) = self.mock_db.save_task_result.call_args_list
self.assertEqual(call.kwargs["status_reason"], "Unsupported data: INVALID_DATA")
self.assertEqual(call.kwargs["status_reason"], "Unsupported data: invalid_data")

0 comments on commit ef72062

Please sign in to comment.