-
Notifications
You must be signed in to change notification settings - Fork 45
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a578fb4
commit 52098dc
Showing
3 changed files
with
70 additions
and
59 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
#!/usr/bin/env python3 | ||
from typing import Set | ||
from karton.core import Task | ||
from artemis.resolvers import lookup, ResolutionException | ||
|
||
from artemis.binds import TaskStatus, TaskType | ||
from artemis.module_base import ArtemisBase | ||
|
||
class DomainScanner(ArtemisBase): | ||
""" | ||
DNS checker to verify if domains exist in the DNS system. | ||
""" | ||
|
||
identity = "domain_scanner" | ||
filters = [ | ||
{"type": TaskType.DOMAIN.value}, | ||
] | ||
|
||
def run(self, current_task: Task) -> None: | ||
domain = current_task.get_payload(TaskType.DOMAIN) | ||
domain_exists = self.check_domain_exists(domain) | ||
|
||
if domain_exists: | ||
self.db.save_task_result( | ||
task=current_task, | ||
status=TaskStatus.OK, | ||
data={ | ||
"domain": domain, | ||
"exists": True | ||
} | ||
) | ||
# Create a new task for the next module | ||
new_task = Task( | ||
{"type": TaskType.DOMAIN}, | ||
payload={TaskType.DOMAIN: domain}, | ||
payload_persistent={"domain_exists": True} | ||
) | ||
self.add_task(current_task, new_task) | ||
else: | ||
self.db.save_task_result( | ||
task=current_task, | ||
status=TaskStatus.ERROR, | ||
status_reason=f"Domain does not exist: {domain}", | ||
data={"domain": domain, "exists": False} | ||
) | ||
|
||
def check_domain_exists(self, domain: str) -> bool: | ||
try: | ||
# Check for NS records | ||
ns_records = lookup(domain, "NS") | ||
if ns_records: | ||
return True | ||
|
||
# If no NS records, check for A records | ||
a_records = lookup(domain, "A") | ||
return len(a_records) > 0 | ||
except ResolutionException: | ||
return False | ||
|
||
if __name__ == "__main__": | ||
DomainScanner().loop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters