From f7daca7653df2522bfdfc783b6cdb9a932d36b75 Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Fri, 9 Aug 2024 09:51:28 +0530 Subject: [PATCH 01/11] fix swal to catch errors --- web/static/custom/custom.js | 171 +++++++++++++++++++++--------------- 1 file changed, 102 insertions(+), 69 deletions(-) diff --git a/web/static/custom/custom.js b/web/static/custom/custom.js index 1755b0508..fc863d35e 100644 --- a/web/static/custom/custom.js +++ b/web/static/custom/custom.js @@ -1456,77 +1456,110 @@ function fetch_whois(domain_name, force_reload_whois=false) { } function get_target_whois(domain_name) { - var url = `/api/tools/whois/?format=json&ip_domain=${domain_name}` - Swal.fire({ - title: `Fetching WHOIS details for ${domain_name}...` - }); - swal.showLoading(); - fetch(url, { - method: 'GET', - credentials: "same-origin", - headers: { - "X-CSRFToken": getCookie("csrftoken"), - 'Content-Type': 'application/json' - }, - }).then(response => response.json()).then(function(response) { - console.log(response); - if (response.status) { - swal.close(); - display_whois_on_modal(response); - } else { - fetch(`/api/tools/whois/?format=json&ip_domain=${domain_name}`, { - method: 'GET', - credentials: "same-origin", - headers: { - "X-CSRFToken": getCookie("csrftoken"), - 'Content-Type': 'application/json' - }, - }).then(response => response.json()).then(function(response) { - console.log(response); - if (response.status) { - swal.close(); - display_whois_on_modal(response); - } else { - Swal.fire({ - title: 'Oops!', - text: `reNgine could not fetch WHOIS records for ${domain_name}!`, - icon: 'error' - }); - } - }); - } - }); + const url = `/api/tools/whois/?format=json&ip_domain=${domain_name}`; + + Swal.fire({ + title: `Fetching WHOIS details for ${domain_name}...`, + allowOutsideClick: false, + showConfirmButton: false, + willOpen: () => { + Swal.showLoading(); + } + }); + + fetch(url, { + method: 'GET', + credentials: "same-origin", + headers: { + "X-CSRFToken": getCookie("csrftoken"), + 'Content-Type': 'application/json' + }, + }) + .then(response => { + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + return response.json(); + }) + .then(data => { + console.log(data); + if (data.status) { + Swal.close(); + display_whois_on_modal(data); + } else { + throw new Error(data.result || 'Failed to fetch WHOIS data'); + } + }) + .catch(error => { + console.error('Error:', error); + let errorMessage = error.message; + if (errorMessage.includes('Netlas limit exceeded')) { + errorMessage = 'Rate limit exceeded. Please try again later.'; + } else if (errorMessage.includes('Invalid domain')) { + errorMessage = 'Invalid domain or no WHOIS data available.'; + } + Swal.fire({ + title: 'Error!', + text: `Failed to fetch WHOIS records for ${domain_name}: ${errorMessage}`, + icon: 'error' + }); + }); +} + +function get_domain_whois(domain_name, show_add_target_btn = false) { + const url = `/api/tools/whois/?format=json&ip_domain=${domain_name}`; + + Swal.fire({ + title: `Fetching WHOIS details for ${domain_name}...`, + allowOutsideClick: false, + showConfirmButton: false, + willOpen: () => { + Swal.showLoading(); + } + }); + + $('.modal').modal('hide'); + + fetch(url, { + method: 'GET', + credentials: "same-origin", + headers: { + "X-CSRFToken": getCookie("csrftoken"), + 'Content-Type': 'application/json' + }, + }) + .then(response => { + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + return response.json(); + }) + .then(data => { + console.log(data); + Swal.close(); + if (data.status) { + display_whois_on_modal(data, show_add_target_btn); + } else { + throw new Error(data.result || 'Failed to fetch WHOIS data'); + } + }) + .catch(error => { + console.error('Error:', error); + let errorMessage = error.message; + if (errorMessage.includes('Netlas limit exceeded')) { + errorMessage = 'Rate limit exceeded. Please try again later.'; + } else if (errorMessage.includes('Invalid domain')) { + errorMessage = 'Invalid domain or no WHOIS data available.'; + } + Swal.fire({ + title: 'Error!', + text: `Failed to fetch WHOIS records for ${domain_name}: ${errorMessage}`, + icon: 'error' + }); + }); } -function get_domain_whois(domain_name, show_add_target_btn=false) { - // this function will get whois for domains that are not targets, this will - // not store whois into db nor create target - var url = `/api/tools/whois/?format=json&ip_domain=${domain_name}` - Swal.fire({ - title: `Fetching WHOIS details for ${domain_name}...` - }); - $('.modal').modal('hide'); - swal.showLoading(); - fetch(url, { - method: 'GET', - credentials: "same-origin", - headers: { - "X-CSRFToken": getCookie("csrftoken"), - 'Content-Type': 'application/json' - }, - }).then(response => response.json()).then(function(response) { - swal.close(); - if (response.status) { - display_whois_on_modal(response, show_add_target_btn=show_add_target_btn); - } else { - Swal.fire({ - title: 'Oops!', - text: `reNgine could not fetch WHOIS records for ${domain_name}! ${response['message']}`, - icon: 'error' - }); - } - }); -} + function display_whois_on_modal(response, show_add_target_btn=false) { // this function will display whois data on modal, should be followed after get_domain_whois() From 25e75630135c24f681c7d6f5f95a7abd3ad8db71 Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Fri, 9 Aug 2024 10:30:49 +0530 Subject: [PATCH 02/11] fix: refactor whois fetching --- web/api/views.py | 10 +- web/reNgine/common_func.py | 328 ++++++++++++++++++++++- web/reNgine/definitions.py | 2 +- web/reNgine/tasks.py | 500 +++++++----------------------------- web/static/custom/custom.js | 10 +- 5 files changed, 428 insertions(+), 422 deletions(-) diff --git a/web/api/views.py b/web/api/views.py index e2c7805a0..f5df4bbbf 100644 --- a/web/api/views.py +++ b/web/api/views.py @@ -1126,13 +1126,15 @@ def get(self, request): class Whois(APIView): def get(self, request): req = self.request - ip_domain = req.query_params.get('ip_domain') - if not (validators.domain(ip_domain) or validators.ipv4(ip_domain) or validators.ipv6(ip_domain)): - print(f'Ip address or domain "{ip_domain}" did not pass validator.') + target = req.query_params.get('target') + if not target: + return Response({'status': False, 'message': 'Target IP/Domain required!'}) + if not (validators.domain(target) or validators.ipv4(target) or validators.ipv6(target)): + print(f'Ip address or domain "{target}" did not pass validator.') return Response({'status': False, 'message': 'Invalid domain or IP'}) is_force_update = req.query_params.get('is_reload') is_force_update = True if is_force_update and 'true' == is_force_update.lower() else False - task = query_whois.apply_async(args=(ip_domain,is_force_update)) + task = query_whois.apply_async(args=(target,is_force_update)) response = task.wait() return Response(response) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index c9c50fe5d..bb977fdde 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -19,6 +19,7 @@ from celery.utils.log import get_task_logger from discord_webhook import DiscordEmbed, DiscordWebhook from django.db.models import Q +from dotted_dict import DottedDict from reNgine.common_serializers import * from reNgine.definitions import * @@ -1158,4 +1159,329 @@ def update_or_create_port(port_number, service_name=None, description=None): ) created = True finally: - return port, created \ No newline at end of file + return port, created + + +def get_domain_info_from_db(target): + """ + Retrieves the Domain object from the database using the target domain name. + + Args: + target (str): The domain name to search for. + + Returns: + Domain: The Domain object if found, otherwise None. + """ + try: + domain = Domain.objects.get(name=target) + if not domain.insert_date: + domain.insert_date = timezone.now() + domain.save() + return extract_domain_info(domain.domain_info) + except Domain.DoesNotExist: + return None + +def extract_domain_info(domain_info_db): + """Extract domain info from the domain_info_db.""" + if not domain_info_db: + return DottedDict() + + try: + domain_info = DottedDict({ + 'dnssec': domain_info_db.dnssec, + 'created': domain_info_db.created, + 'updated': domain_info_db.updated, + 'expires': domain_info_db.expires, + 'geolocation_iso': domain_info_db.geolocation_iso, + 'status': [status.name for status in domain_info_db.status.all()], + 'whois_server': domain_info_db.whois_server, + 'ns_records': [ns.name for ns in domain_info_db.name_servers.all()], + }) + + # Extract registrar info + registrar = domain_info_db.registrar + if registrar: + domain_info.update({ + 'registrar_name': registrar.name, + 'registrar_phone': registrar.phone, + 'registrar_email': registrar.email, + 'registrar_url': registrar.url, + }) + + # Extract registration info (registrant, admin, tech) + for role in ['registrant', 'admin', 'tech']: + registration = getattr(domain_info_db, role) + if registration: + domain_info.update({ + f'{role}_{key}': getattr(registration, key) + for key in ['name', 'id_str', 'organization', 'city', 'state', 'zip_code', + 'country', 'phone', 'fax', 'email', 'address'] + }) + + # Extract DNS records + dns_records = domain_info_db.dns_records.all() + for record_type in ['a', 'txt', 'mx']: + domain_info[f'{record_type}_records'] = [ + record.name for record in dns_records if record.type == record_type + ] + + # Extract related domains and TLDs + domain_info.update({ + 'related_tlds': [domain.name for domain in domain_info_db.related_tlds.all()], + 'related_domains': [domain.name for domain in domain_info_db.related_domains.all()], + }) + + # Extract historical IPs + domain_info['historical_ips'] = [ + { + 'ip': ip.ip, + 'owner': ip.owner, + 'location': ip.location, + 'last_seen': ip.last_seen + } + for ip in domain_info_db.historical_ips.all() + ] + except Exception as e: + logger.error(f'Error while extracting domain info: {e}') + domain_info = DottedDict() + + return domain_info + + +def format_whois_response(domain_info): + """ + Format the domain info for the whois response. + Args: + domain_info (DottedDict): The domain info object. + Returns: + dict: The formatted whois response. + """ + return { + 'status': True, + 'target': domain_info.get('target'), + 'dnssec': domain_info.get('dnssec'), + 'created': domain_info.get('created'), + 'updated': domain_info.get('updated'), + 'expires': domain_info.get('expires'), + 'geolocation_iso': domain_info.get('registrant_country'), + 'domain_statuses': domain_info.get('status'), + 'whois_server': domain_info.get('whois_server'), + 'dns': { + 'a': domain_info.get('a_records'), + 'mx': domain_info.get('mx_records'), + 'txt': domain_info.get('txt_records'), + }, + 'registrar': { + 'name': domain_info.get('registrar_name'), + 'phone': domain_info.get('registrar_phone'), + 'email': domain_info.get('registrar_email'), + 'url': domain_info.get('registrar_url'), + }, + 'registrant': { + 'name': domain_info.get('registrant_name'), + 'id': domain_info.get('registrant_id'), + 'organization': domain_info.get('registrant_organization'), + 'address': domain_info.get('registrant_address'), + 'city': domain_info.get('registrant_city'), + 'state': domain_info.get('registrant_state'), + 'zipcode': domain_info.get('registrant_zip_code'), + 'country': domain_info.get('registrant_country'), + 'phone': domain_info.get('registrant_phone'), + 'fax': domain_info.get('registrant_fax'), + 'email': domain_info.get('registrant_email'), + }, + 'admin': { + 'name': domain_info.get('admin_name'), + 'id': domain_info.get('admin_id'), + 'organization': domain_info.get('admin_organization'), + 'address':domain_info.get('admin_address'), + 'city': domain_info.get('admin_city'), + 'state': domain_info.get('admin_state'), + 'zipcode': domain_info.get('admin_zip_code'), + 'country': domain_info.get('admin_country'), + 'phone': domain_info.get('admin_phone'), + 'fax': domain_info.get('admin_fax'), + 'email': domain_info.get('admin_email'), + }, + 'technical_contact': { + 'name': domain_info.get('tech_name'), + 'id': domain_info.get('tech_id'), + 'organization': domain_info.get('tech_organization'), + 'address': domain_info.get('tech_address'), + 'city': domain_info.get('tech_city'), + 'state': domain_info.get('tech_state'), + 'zipcode': domain_info.get('tech_zip_code'), + 'country': domain_info.get('tech_country'), + 'phone': domain_info.get('tech_phone'), + 'fax': domain_info.get('tech_fax'), + 'email': domain_info.get('tech_email'), + }, + 'nameservers': domain_info.get('ns_records'), + 'related_domains': domain_info.get('related_domains'), + 'related_tlds': domain_info.get('related_tlds'), + 'historical_ips': domain_info.get('historical_ips'), + } + + +def parse_whois_data(domain_info, whois_data): + """Parse WHOIS data and update domain_info.""" + whois = whois_data.get('whois', {}) + dns = whois_data.get('dns', {}) + + # Parse basic domain information + domain_info.update({ + 'created': whois.get('created_date'), + 'expires': whois.get('expiration_date'), + 'updated': whois.get('updated_date'), + 'whois_server': whois.get('whois_server'), + 'dnssec': bool(whois.get('dnssec')), + 'status': whois.get('status', []), + }) + + # Parse registrar information + parse_registrar_info(domain_info, whois.get('registrar', {})) + + # Parse registration information + for role in ['registrant', 'administrative', 'technical']: + parse_registration_info(domain_info, whois.get(role, {}), role) + + # Parse DNS records + parse_dns_records(domain_info, dns) + + # Parse name servers + domain_info.ns_records = dns.get('ns', []) + + +def parse_registrar_info(domain_info, registrar): + """Parse registrar information.""" + domain_info.update({ + 'registrar_name': registrar.get('name'), + 'registrar_email': registrar.get('email'), + 'registrar_phone': registrar.get('phone'), + 'registrar_url': registrar.get('url'), + }) + +def parse_registration_info(domain_info, registration, role): + """Parse registration information for registrant, admin, and tech contacts.""" + role_prefix = role if role != 'administrative' else 'admin' + domain_info.update({ + f'{role_prefix}_{key}': value + for key, value in registration.items() + if key in ['name', 'id', 'organization', 'street', 'city', 'province', 'postal_code', 'country', 'phone', 'fax'] + }) + + # Handle email separately to apply regex + email = registration.get('email') + if email: + email_match = EMAIL_REGEX.search(str(email)) + domain_info[f'{role_prefix}_email'] = email_match.group(0) if email_match else None + +def parse_dns_records(domain_info, dns): + """Parse DNS records.""" + domain_info.update({ + 'mx_records': dns.get('mx', []), + 'txt_records': dns.get('txt', []), + 'a_records': dns.get('a', []), + 'ns_records': dns.get('ns', []), + }) + + +def save_domain_info_to_db(ip_domain, domain_info): + """Save domain info to the database.""" + domain, _ = Domain.objects.get_or_create(name=ip_domain) + + # Create or update DomainInfo + domain_info_obj, created = DomainInfo.objects.get_or_create(domain=domain) + + # Update basic domain information + domain_info_obj.dnssec = domain_info.get('dnssec', False) + domain_info_obj.created = domain_info.get('created') + domain_info_obj.updated = domain_info.get('updated') + domain_info_obj.expires = domain_info.get('expires') + domain_info_obj.whois_server = domain_info.get('whois_server') + domain_info_obj.geolocation_iso = domain_info.get('registrant_country') + + # Save or update Registrar + registrar, _ = Registrar.objects.get_or_create( + name=domain_info.get('registrar_name', ''), + defaults={ + 'email': domain_info.get('registrar_email'), + 'phone': domain_info.get('registrar_phone'), + 'url': domain_info.get('registrar_url'), + } + ) + domain_info_obj.registrar = registrar + + # Save or update Registrations (registrant, admin, tech) + for role in ['registrant', 'admin', 'tech']: + registration, _ = DomainRegistration.objects.get_or_create( + name=domain_info.get(f'{role}_name', ''), + defaults={ + 'organization': domain_info.get(f'{role}_organization'), + 'address': domain_info.get(f'{role}_address'), + 'city': domain_info.get(f'{role}_city'), + 'state': domain_info.get(f'{role}_state'), + 'zip_code': domain_info.get(f'{role}_zip_code'), + 'country': domain_info.get(f'{role}_country'), + 'email': domain_info.get(f'{role}_email'), + 'phone': domain_info.get(f'{role}_phone'), + 'fax': domain_info.get(f'{role}_fax'), + 'id_str': domain_info.get(f'{role}_id'), + } + ) + setattr(domain_info_obj, role, registration) + + # Save domain statuses + domain_info_obj.status.clear() + for status in domain_info.get('status', []): + status_obj, _ = WhoisStatus.objects.get_or_create(name=status) + domain_info_obj.status.add(status_obj) + + # Save name servers + domain_info_obj.name_servers.clear() + for ns in domain_info.get('ns_records', []): + ns_obj, _ = NameServer.objects.get_or_create(name=ns) + domain_info_obj.name_servers.add(ns_obj) + + # Save DNS records + domain_info_obj.dns_records.clear() + for record_type in ['a', 'mx', 'txt']: + for record in domain_info.get(f'{record_type}_records', []): + dns_record, _ = DNSRecord.objects.get_or_create( + name=record, + type=record_type + ) + domain_info_obj.dns_records.add(dns_record) + + # Save related domains and TLDs + domain_info_obj.related_domains.clear() + for related_domain in domain_info.get('related_domains', []): + related_domain_obj, _ = RelatedDomain.objects.get_or_create(name=related_domain) + domain_info_obj.related_domains.add(related_domain_obj) + + domain_info_obj.related_tlds.clear() + for related_tld in domain_info.get('related_tlds', []): + related_tld_obj, _ = RelatedDomain.objects.get_or_create(name=related_tld) + domain_info_obj.related_tlds.add(related_tld_obj) + + # Save historical IPs + domain_info_obj.historical_ips.clear() + for ip_info in domain_info.get('historical_ips', []): + historical_ip, _ = HistoricalIP.objects.get_or_create( + ip=ip_info['ip'], + defaults={ + 'owner': ip_info.get('owner'), + 'location': ip_info.get('location'), + 'last_seen': ip_info.get('last_seen'), + } + ) + domain_info_obj.historical_ips.add(historical_ip) + + # Save the DomainInfo object + domain_info_obj.save() + + # Update the Domain object with the new DomainInfo + domain.domain_info = domain_info_obj + domain.save() + + return domain_info_obj diff --git a/web/reNgine/definitions.py b/web/reNgine/definitions.py index e5b80d577..5a9f6f0bf 100644 --- a/web/reNgine/definitions.py +++ b/web/reNgine/definitions.py @@ -11,7 +11,7 @@ # TOOLS DEFINITIONS ############################################################################### -EMAIL_REGEX = re.compile(r'[a-z0-9\.\-+_]+@[a-z0-9\.\-+_]+\.[a-z]+') +EMAIL_REGEX = re.compile(r'[\w\.-]+@[\w\.-]+') ############################################################################### # YAML CONFIG DEFINITIONS diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index 7ab9ed963..6cd89ee50 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -3684,435 +3684,113 @@ def geo_localize(host, ip_id=None): @app.task(name='query_whois', bind=False, queue='query_whois_queue') -def query_whois(ip_domain, force_reload_whois=False): +def query_whois(target, force_reload_whois=False): """Query WHOIS information for an IP or a domain name. Args: - ip_domain (str): IP address or domain name. + target (str): IP address or domain name. save_domain (bool): Whether to save domain or not, default False Returns: dict: WHOIS information. """ - if not force_reload_whois and Domain.objects.filter(name=ip_domain).exists() and Domain.objects.get(name=ip_domain).domain_info: - domain = Domain.objects.get(name=ip_domain) - if not domain.insert_date: - domain.insert_date = timezone.now() - domain.save() - domain_info_db = domain.domain_info - domain_info = DottedDict( - dnssec=domain_info_db.dnssec, - created=domain_info_db.created, - updated=domain_info_db.updated, - expires=domain_info_db.expires, - geolocation_iso=domain_info_db.geolocation_iso, - status=[status['name'] for status in DomainWhoisStatusSerializer(domain_info_db.status, many=True).data], - whois_server=domain_info_db.whois_server, - ns_records=[ns['name'] for ns in NameServersSerializer(domain_info_db.name_servers, many=True).data], - registrar_name=domain_info_db.registrar.name, - registrar_phone=domain_info_db.registrar.phone, - registrar_email=domain_info_db.registrar.email, - registrar_url=domain_info_db.registrar.url, - registrant_name=domain_info_db.registrant.name, - registrant_id=domain_info_db.registrant.id_str, - registrant_organization=domain_info_db.registrant.organization, - registrant_city=domain_info_db.registrant.city, - registrant_state=domain_info_db.registrant.state, - registrant_zip_code=domain_info_db.registrant.zip_code, - registrant_country=domain_info_db.registrant.country, - registrant_phone=domain_info_db.registrant.phone, - registrant_fax=domain_info_db.registrant.fax, - registrant_email=domain_info_db.registrant.email, - registrant_address=domain_info_db.registrant.address, - admin_name=domain_info_db.admin.name, - admin_id=domain_info_db.admin.id_str, - admin_organization=domain_info_db.admin.organization, - admin_city=domain_info_db.admin.city, - admin_state=domain_info_db.admin.state, - admin_zip_code=domain_info_db.admin.zip_code, - admin_country=domain_info_db.admin.country, - admin_phone=domain_info_db.admin.phone, - admin_fax=domain_info_db.admin.fax, - admin_email=domain_info_db.admin.email, - admin_address=domain_info_db.admin.address, - tech_name=domain_info_db.tech.name, - tech_id=domain_info_db.tech.id_str, - tech_organization=domain_info_db.tech.organization, - tech_city=domain_info_db.tech.city, - tech_state=domain_info_db.tech.state, - tech_zip_code=domain_info_db.tech.zip_code, - tech_country=domain_info_db.tech.country, - tech_phone=domain_info_db.tech.phone, - tech_fax=domain_info_db.tech.fax, - tech_email=domain_info_db.tech.email, - tech_address=domain_info_db.tech.address, - related_tlds=[domain['name'] for domain in RelatedDomainSerializer(domain_info_db.related_tlds, many=True).data], - related_domains=[domain['name'] for domain in RelatedDomainSerializer(domain_info_db.related_domains, many=True).data], - historical_ips=[ip for ip in HistoricalIPSerializer(domain_info_db.historical_ips, many=True).data], - ) - if domain_info_db.dns_records: - a_records = [] - txt_records = [] - mx_records = [] - dns_records = [{'name': dns['name'], 'type': dns['type']} for dns in DomainDNSRecordSerializer(domain_info_db.dns_records, many=True).data] - for dns in dns_records: - if dns['type'] == 'a': - a_records.append(dns['name']) - elif dns['type'] == 'txt': - txt_records.append(dns['name']) - elif dns['type'] == 'mx': - mx_records.append(dns['name']) - domain_info.a_records = a_records - domain_info.txt_records = txt_records - domain_info.mx_records = mx_records - else: - logger.info(f'Domain info for "{ip_domain}" not found in DB, querying whois') - domain_info = DottedDict() - # find domain historical ip - try: - historical_ips = get_domain_historical_ip_address(ip_domain) - domain_info.historical_ips = historical_ips - except Exception as e: - logger.error(f'HistoricalIP for {ip_domain} not found!\nError: {str(e)}') - historical_ips = [] - # find associated domains using ip_domain - try: - related_domains = reverse_whois(ip_domain.split('.')[0]) - except Exception as e: - logger.error(f'Associated domain not found for {ip_domain}\nError: {str(e)}') - similar_domains = [] - # find related tlds using TLSx - try: - related_tlds = [] - output_path = '/tmp/ip_domain_tlsx.txt' - tlsx_command = f'tlsx -san -cn -silent -ro -host {ip_domain} -o {output_path}' - run_command( - tlsx_command, - shell=True, - ) - tlsx_output = [] - with open(output_path) as f: - tlsx_output = f.readlines() - - tldextract_target = tldextract.extract(ip_domain) - for doms in tlsx_output: - doms = doms.strip() - tldextract_res = tldextract.extract(doms) - if ip_domain != doms and tldextract_res.domain == tldextract_target.domain and tldextract_res.subdomain == '': - related_tlds.append(doms) - - related_tlds = list(set(related_tlds)) - domain_info.related_tlds = related_tlds - except Exception as e: - logger.error(f'Associated domain not found for {ip_domain}\nError: {str(e)}') - similar_domains = [] - - related_domains_list = [] - if Domain.objects.filter(name=ip_domain).exists(): - domain = Domain.objects.get(name=ip_domain) - db_domain_info = domain.domain_info if domain.domain_info else DomainInfo() - db_domain_info.save() - for _domain in related_domains: - domain_related = RelatedDomain.objects.get_or_create( - name=_domain['name'], - )[0] - db_domain_info.related_domains.add(domain_related) - related_domains_list.append(_domain['name']) - - for _domain in related_tlds: - domain_related = RelatedDomain.objects.get_or_create( - name=_domain, - )[0] - db_domain_info.related_tlds.add(domain_related) - - for _ip in historical_ips: - historical_ip = HistoricalIP.objects.get_or_create( - ip=_ip['ip'], - owner=_ip['owner'], - location=_ip['location'], - last_seen=_ip['last_seen'], - )[0] - db_domain_info.historical_ips.add(historical_ip) - domain.domain_info = db_domain_info - domain.save() - - command = f'netlas host {ip_domain} -f json' - # check if netlas key is provided - netlas_key = get_netlas_key() - command += f' -a {netlas_key}' if netlas_key else '' + if not force_reload_whois: + logger.info(f'Querying WHOIS information for {target} from db...') + domain_info = get_domain_info_from_db(target) + if domain_info: + return format_whois_response(domain_info) + + # Query WHOIS information as not found in db + logger.info(f'Whois info not found in db') + logger.info(f'Querying WHOIS information for {target} from WHOIS server...') + + domain_info = DottedDict() + + # try: + # domain_info.historical_ips = get_domain_historical_ip_address(target) + # except Exception as e: + # logger.error(f'HistoricalIP for {target} not found! Error: {str(e)}') + + # domain_info.related_domains = fetch_related_domains(target) + # domain_info.related_tlds = fetch_related_tlds(target) + + whois_data = fetch_whois_data_using_netlas(target) + if not whois_data['status']: + return { + 'status': False, + 'target': target, + 'result': whois_data['message'] + } + + whois_data = whois_data['data'] + + parse_whois_data(domain_info, whois_data) + # saved_domain_info = save_domain_info_to_db(target, domain_info) + return format_whois_response(domain_info) + +def fetch_whois_data_using_netlas(target): + """ + Fetch WHOIS data using netlas. + Args: + target (str): IP address or domain name. + Returns: + dict: WHOIS information. + """ + command = f'netlas host {target} -f json' + netlas_key = get_netlas_key() + if netlas_key: + command += f' -a {netlas_key}' + try: _, result = run_command(command, remove_ansi_sequence=True) + + # catch errors if 'Failed to parse response data' in result: - # do fallback return { - 'status': False, - 'ip_domain': ip_domain, - 'result': "Netlas limit exceeded.", + 'status': False, 'message': 'Netlas limit exceeded.' } - try: - result = json.loads(result) - logger.info(result) - whois = result.get('whois') if result.get('whois') else {} - - domain_info.created = whois.get('created_date') - domain_info.expires = whois.get('expiration_date') - domain_info.updated = whois.get('updated_date') - domain_info.whois_server = whois.get('whois_server') - - - if 'registrant' in whois: - registrant = whois.get('registrant') - domain_info.registrant_name = registrant.get('name') - domain_info.registrant_country = registrant.get('country') - domain_info.registrant_id = registrant.get('id') - domain_info.registrant_state = registrant.get('province') - domain_info.registrant_city = registrant.get('city') - domain_info.registrant_phone = registrant.get('phone') - domain_info.registrant_address = registrant.get('street') - domain_info.registrant_organization = registrant.get('organization') - domain_info.registrant_fax = registrant.get('fax') - domain_info.registrant_zip_code = registrant.get('postal_code') - email_search = EMAIL_REGEX.search(str(registrant.get('email'))) - field_content = email_search.group(0) if email_search else None - domain_info.registrant_email = field_content - - if 'administrative' in whois: - administrative = whois.get('administrative') - domain_info.admin_name = administrative.get('name') - domain_info.admin_country = administrative.get('country') - domain_info.admin_id = administrative.get('id') - domain_info.admin_state = administrative.get('province') - domain_info.admin_city = administrative.get('city') - domain_info.admin_phone = administrative.get('phone') - domain_info.admin_address = administrative.get('street') - domain_info.admin_organization = administrative.get('organization') - domain_info.admin_fax = administrative.get('fax') - domain_info.admin_zip_code = administrative.get('postal_code') - mail_search = EMAIL_REGEX.search(str(administrative.get('email'))) - field_content = email_search.group(0) if email_search else None - domain_info.admin_email = field_content - - if 'technical' in whois: - technical = whois.get('technical') - domain_info.tech_name = technical.get('name') - domain_info.tech_country = technical.get('country') - domain_info.tech_state = technical.get('province') - domain_info.tech_id = technical.get('id') - domain_info.tech_city = technical.get('city') - domain_info.tech_phone = technical.get('phone') - domain_info.tech_address = technical.get('street') - domain_info.tech_organization = technical.get('organization') - domain_info.tech_fax = technical.get('fax') - domain_info.tech_zip_code = technical.get('postal_code') - mail_search = EMAIL_REGEX.search(str(technical.get('email'))) - field_content = email_search.group(0) if email_search else None - domain_info.tech_email = field_content - - if 'dns' in result: - dns = result.get('dns') - domain_info.mx_records = dns.get('mx') - domain_info.txt_records = dns.get('txt') - domain_info.a_records = dns.get('a') - - domain_info.ns_records = whois.get('name_servers') - domain_info.dnssec = True if whois.get('dnssec') else False - domain_info.status = whois.get('status') - - if 'registrar' in whois: - registrar = whois.get('registrar') - domain_info.registrar_name = registrar.get('name') - domain_info.registrar_email = registrar.get('email') - domain_info.registrar_phone = registrar.get('phone') - domain_info.registrar_url = registrar.get('url') - - # find associated domains if registrant email is found - related_domains = reverse_whois(domain_info.get('registrant_email')) if domain_info.get('registrant_email') else [] - for _domain in related_domains: - related_domains_list.append(_domain['name']) - - # remove duplicate domains from related domains list - related_domains_list = list(set(related_domains_list)) - domain_info.related_domains = related_domains_list - - # save to db if domain exists - if Domain.objects.filter(name=ip_domain).exists(): - domain = Domain.objects.get(name=ip_domain) - db_domain_info = domain.domain_info if domain.domain_info else DomainInfo() - db_domain_info.save() - for _domain in related_domains: - domain_rel = RelatedDomain.objects.get_or_create( - name=_domain['name'], - )[0] - db_domain_info.related_domains.add(domain_rel) - - db_domain_info.dnssec = domain_info.get('dnssec') - #dates - db_domain_info.created = domain_info.get('created') - db_domain_info.updated = domain_info.get('updated') - db_domain_info.expires = domain_info.get('expires') - #registrar - db_domain_info.registrar = Registrar.objects.get_or_create( - name=domain_info.get('registrar_name'), - email=domain_info.get('registrar_email'), - phone=domain_info.get('registrar_phone'), - url=domain_info.get('registrar_url'), - )[0] - db_domain_info.registrant = DomainRegistration.objects.get_or_create( - name=domain_info.get('registrant_name'), - organization=domain_info.get('registrant_organization'), - address=domain_info.get('registrant_address'), - city=domain_info.get('registrant_city'), - state=domain_info.get('registrant_state'), - zip_code=domain_info.get('registrant_zip_code'), - country=domain_info.get('registrant_country'), - email=domain_info.get('registrant_email'), - phone=domain_info.get('registrant_phone'), - fax=domain_info.get('registrant_fax'), - id_str=domain_info.get('registrant_id'), - )[0] - db_domain_info.admin = DomainRegistration.objects.get_or_create( - name=domain_info.get('admin_name'), - organization=domain_info.get('admin_organization'), - address=domain_info.get('admin_address'), - city=domain_info.get('admin_city'), - state=domain_info.get('admin_state'), - zip_code=domain_info.get('admin_zip_code'), - country=domain_info.get('admin_country'), - email=domain_info.get('admin_email'), - phone=domain_info.get('admin_phone'), - fax=domain_info.get('admin_fax'), - id_str=domain_info.get('admin_id'), - )[0] - db_domain_info.tech = DomainRegistration.objects.get_or_create( - name=domain_info.get('tech_name'), - organization=domain_info.get('tech_organization'), - address=domain_info.get('tech_address'), - city=domain_info.get('tech_city'), - state=domain_info.get('tech_state'), - zip_code=domain_info.get('tech_zip_code'), - country=domain_info.get('tech_country'), - email=domain_info.get('tech_email'), - phone=domain_info.get('tech_phone'), - fax=domain_info.get('tech_fax'), - id_str=domain_info.get('tech_id'), - )[0] - for status in domain_info.get('status') or []: - _status = WhoisStatus.objects.get_or_create( - name=status - )[0] - _status.save() - db_domain_info.status.add(_status) - - for ns in domain_info.get('ns_records') or []: - _ns = NameServer.objects.get_or_create( - name=ns - )[0] - _ns.save() - db_domain_info.name_servers.add(_ns) - - for a in domain_info.get('a_records') or []: - _a = DNSRecord.objects.get_or_create( - name=a, - type='a' - )[0] - _a.save() - db_domain_info.dns_records.add(_a) - for mx in domain_info.get('mx_records') or []: - _mx = DNSRecord.objects.get_or_create( - name=mx, - type='mx' - )[0] - _mx.save() - db_domain_info.dns_records.add(_mx) - for txt in domain_info.get('txt_records') or []: - _txt = DNSRecord.objects.get_or_create( - name=txt, - type='txt' - )[0] - _txt.save() - db_domain_info.dns_records.add(_txt) - - db_domain_info.geolocation_iso = domain_info.get('registrant_country') - db_domain_info.whois_server = domain_info.get('whois_server') - db_domain_info.save() - domain.domain_info = db_domain_info - domain.save() + + if 'api key doesn\'t exist' in result: + return { + 'status': False, + 'message': 'Invalid Netlas API Key!' + } + + if 'Request limit' in result: + return { + 'status': False, + 'message': 'Netlas request limit exceeded.' + } + + data = json.loads(result) - except Exception as e: + if not data: return { - 'status': False, - 'ip_domain': ip_domain, - 'result': "unable to fetch records from WHOIS database.", - 'message': str(e) + 'status': False, + 'message': 'No data available for the given domain or IP.' } + # if 'whois' not in data: + # return { + # 'status': False, + # 'message': 'Invalid domain or no WHOIS data available.' + # } - return { - 'status': True, - 'ip_domain': ip_domain, - 'dnssec': domain_info.get('dnssec'), - 'created': domain_info.get('created'), - 'updated': domain_info.get('updated'), - 'expires': domain_info.get('expires'), - 'geolocation_iso': domain_info.get('registrant_country'), - 'domain_statuses': domain_info.get('status'), - 'whois_server': domain_info.get('whois_server'), - 'dns': { - 'a': domain_info.get('a_records'), - 'mx': domain_info.get('mx_records'), - 'txt': domain_info.get('txt_records'), - }, - 'registrar': { - 'name': domain_info.get('registrar_name'), - 'phone': domain_info.get('registrar_phone'), - 'email': domain_info.get('registrar_email'), - 'url': domain_info.get('registrar_url'), - }, - 'registrant': { - 'name': domain_info.get('registrant_name'), - 'id': domain_info.get('registrant_id'), - 'organization': domain_info.get('registrant_organization'), - 'address': domain_info.get('registrant_address'), - 'city': domain_info.get('registrant_city'), - 'state': domain_info.get('registrant_state'), - 'zipcode': domain_info.get('registrant_zip_code'), - 'country': domain_info.get('registrant_country'), - 'phone': domain_info.get('registrant_phone'), - 'fax': domain_info.get('registrant_fax'), - 'email': domain_info.get('registrant_email'), - }, - 'admin': { - 'name': domain_info.get('admin_name'), - 'id': domain_info.get('admin_id'), - 'organization': domain_info.get('admin_organization'), - 'address':domain_info.get('admin_address'), - 'city': domain_info.get('admin_city'), - 'state': domain_info.get('admin_state'), - 'zipcode': domain_info.get('admin_zip_code'), - 'country': domain_info.get('admin_country'), - 'phone': domain_info.get('admin_phone'), - 'fax': domain_info.get('admin_fax'), - 'email': domain_info.get('admin_email'), - }, - 'technical_contact': { - 'name': domain_info.get('tech_name'), - 'id': domain_info.get('tech_id'), - 'organization': domain_info.get('tech_organization'), - 'address': domain_info.get('tech_address'), - 'city': domain_info.get('tech_city'), - 'state': domain_info.get('tech_state'), - 'zipcode': domain_info.get('tech_zip_code'), - 'country': domain_info.get('tech_country'), - 'phone': domain_info.get('tech_phone'), - 'fax': domain_info.get('tech_fax'), - 'email': domain_info.get('tech_email'), - }, - 'nameservers': domain_info.get('ns_records'), - # 'similar_domains': domain_info.get('similar_domains'), - 'related_domains': domain_info.get('related_domains'), - 'related_tlds': domain_info.get('related_tlds'), - 'historical_ips': domain_info.get('historical_ips'), - } + return { + 'status': True, + 'data': data + } + except json.JSONDecodeError: + return { + 'status': False, + 'message': 'Failed to parse JSON response from Netlas.' + } + except Exception as e: + return { + 'status': False, + 'message': f'An error occurred while fetching WHOIS data: {str(e)}' + } + @app.task(name='remove_duplicate_endpoints', bind=False, queue='remove_duplicate_endpoints_queue') def remove_duplicate_endpoints( diff --git a/web/static/custom/custom.js b/web/static/custom/custom.js index fc863d35e..b34095f93 100644 --- a/web/static/custom/custom.js +++ b/web/static/custom/custom.js @@ -1412,7 +1412,7 @@ function get_and_render_subscan_history(subdomain_id, subdomain_name) { function fetch_whois(domain_name, force_reload_whois=false) { // this function will fetch WHOIS record for any subdomain and also display // snackbar once whois is fetched - var url = `/api/tools/whois/?format=json&ip_domain=${domain_name}`; + var url = `/api/tools/whois/?format=json&target=${domain_name}`; if (force_reload_whois) { url+='&is_reload=true' } @@ -1456,7 +1456,7 @@ function fetch_whois(domain_name, force_reload_whois=false) { } function get_target_whois(domain_name) { - const url = `/api/tools/whois/?format=json&ip_domain=${domain_name}`; + const url = `/api/tools/whois/?format=json&target=${domain_name}`; Swal.fire({ title: `Fetching WHOIS details for ${domain_name}...`, @@ -1507,7 +1507,7 @@ function get_target_whois(domain_name) { } function get_domain_whois(domain_name, show_add_target_btn = false) { - const url = `/api/tools/whois/?format=json&ip_domain=${domain_name}`; + const url = `/api/tools/whois/?format=json&target=${domain_name}`; Swal.fire({ title: `Fetching WHOIS details for ${domain_name}...`, @@ -1586,7 +1586,7 @@ function display_whois_on_modal(response, show_add_target_btn=false) {
Domain -
${response.ip_domain}
+
${response.target}
Dnssec @@ -1877,7 +1877,7 @@ function display_whois_on_modal(response, show_add_target_btn=false) { if (show_add_target_btn) { content += `
- +
` } From 6fcf6299a8fdea35e942a7d951fd0e4b1558b1aa Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Fri, 9 Aug 2024 10:49:34 +0530 Subject: [PATCH 03/11] fix: use tlsx to fetch related tld and domains both --- web/reNgine/common_func.py | 16 ++++++----- web/reNgine/tasks.py | 55 +++++++++++++++++++++++++++++++++----- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index bb977fdde..7178569a9 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -20,6 +20,7 @@ from discord_webhook import DiscordEmbed, DiscordWebhook from django.db.models import Q from dotted_dict import DottedDict +from tempfile import NamedTemporaryFile from reNgine.common_serializers import * from reNgine.definitions import * @@ -968,12 +969,15 @@ def reverse_whois(lookup_keyword): response = requests.get(url, headers=headers) soup = BeautifulSoup(response.content, 'lxml') table = soup.find("table", {"border" : "1"}) - for row in table or []: - dom = row.findAll('td')[0].getText() - created_on = row.findAll('td')[1].getText() - if dom == 'Domain Name': - continue - domains.append({'name': dom, 'created_on': created_on}) + try: + for row in table or []: + dom = row.findAll('td')[0].getText() + created_on = row.findAll('td')[1].getText() + if dom == 'Domain Name': + continue + domains.append({'name': dom, 'created_on': created_on}) + except Exception as e: + logger.error(f'Error while fetching reverse whois info: {e}') return domains diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index 6cd89ee50..66d03e41e 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -3705,13 +3705,12 @@ def query_whois(target, force_reload_whois=False): domain_info = DottedDict() - # try: - # domain_info.historical_ips = get_domain_historical_ip_address(target) - # except Exception as e: - # logger.error(f'HistoricalIP for {target} not found! Error: {str(e)}') + domain_info.historical_ips = get_domain_historical_ip_address(target) + domain_info.related_tlds = fetch_related_tlds_and_domains(target) + related_domains = reverse_whois(target) + # add to related domains from related_tlds - # domain_info.related_domains = fetch_related_domains(target) - # domain_info.related_tlds = fetch_related_tlds(target) + domain_info.relaed_domains = related_domains whois_data = fetch_whois_data_using_netlas(target) if not whois_data['status']: @@ -3727,6 +3726,50 @@ def query_whois(target, force_reload_whois=False): # saved_domain_info = save_domain_info_to_db(target, domain_info) return format_whois_response(domain_info) + +def fetch_related_tlds_and_domains(domain): + """ + Fetch related TLDs and domains using TLSx. + related domains are those that are not part of related TLDs. + + Args: + domain (str): The domain to find related TLDs and domains for. + + Returns: + tuple: A tuple containing two lists (related_tlds, related_domains). + """ + related_tlds = set() + related_domains = set() + + # Extract the base domain + extracted = tldextract.extract(domain) + base_domain = f"{extracted.domain}.{extracted.suffix}" + + cmd = f'tlsx -san -cn -silent -ro -host {domain}' + _, result = run_command(cmd, shell=True) + + for line in result.splitlines(): + try: + line = line.strip() + if line == "": + continue + extracted_result = tldextract.extract(line) + full_domain = f"{extracted_result.domain}.{extracted_result.suffix}" + + if extracted_result.domain == extracted.domain: + if full_domain != base_domain: + related_tlds.add(full_domain) + elif extracted_result.domain != extracted.domain or extracted_result.subdomain: + related_domains.add(line) + except Exception as e: + logger.error(f"An error occurred while fetching related TLDs and domains for {domain}: {str(e)}") + continue + + logger.info(f"Found {len(related_tlds)} related TLDs and {len(related_domains)} related domains for {domain}") + return list(related_tlds), list(related_domains) + + + def fetch_whois_data_using_netlas(target): """ Fetch WHOIS data using netlas. From 2ee1346f49c15aac7e7c31ab919e7f9f67e31540 Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Fri, 9 Aug 2024 10:53:41 +0530 Subject: [PATCH 04/11] fix related domains --- web/reNgine/tasks.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index 66d03e41e..94afdb43d 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -3706,11 +3706,15 @@ def query_whois(target, force_reload_whois=False): domain_info = DottedDict() domain_info.historical_ips = get_domain_historical_ip_address(target) - domain_info.related_tlds = fetch_related_tlds_and_domains(target) - related_domains = reverse_whois(target) - # add to related domains from related_tlds + domain_info.related_tlds, tlsx_related_domain = fetch_related_tlds_and_domains(target) - domain_info.relaed_domains = related_domains + related_domains = reverse_whois(target) + if tlsx_related_domain: + related_domains += tlsx_related_domain + + # remove duplicate ones + related_domains = list(set(related_domains)) + domain_info.related_domains = related_domains whois_data = fetch_whois_data_using_netlas(target) if not whois_data['status']: From a12ca45199abbfd4787d27693bfe03af0c45cec9 Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Fri, 9 Aug 2024 10:57:23 +0530 Subject: [PATCH 05/11] fix: save domain info to database in query_whois function --- web/reNgine/common_func.py | 183 +++++++++++++++++++------------------ web/reNgine/tasks.py | 2 +- 2 files changed, 93 insertions(+), 92 deletions(-) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index 7178569a9..2a53fb03f 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -1390,102 +1390,103 @@ def parse_dns_records(domain_info, dns): }) -def save_domain_info_to_db(ip_domain, domain_info): +def save_domain_info_to_db(target, domain_info): """Save domain info to the database.""" - domain, _ = Domain.objects.get_or_create(name=ip_domain) - - # Create or update DomainInfo - domain_info_obj, created = DomainInfo.objects.get_or_create(domain=domain) - - # Update basic domain information - domain_info_obj.dnssec = domain_info.get('dnssec', False) - domain_info_obj.created = domain_info.get('created') - domain_info_obj.updated = domain_info.get('updated') - domain_info_obj.expires = domain_info.get('expires') - domain_info_obj.whois_server = domain_info.get('whois_server') - domain_info_obj.geolocation_iso = domain_info.get('registrant_country') - - # Save or update Registrar - registrar, _ = Registrar.objects.get_or_create( - name=domain_info.get('registrar_name', ''), - defaults={ - 'email': domain_info.get('registrar_email'), - 'phone': domain_info.get('registrar_phone'), - 'url': domain_info.get('registrar_url'), - } - ) - domain_info_obj.registrar = registrar - - # Save or update Registrations (registrant, admin, tech) - for role in ['registrant', 'admin', 'tech']: - registration, _ = DomainRegistration.objects.get_or_create( - name=domain_info.get(f'{role}_name', ''), + if Domain.objects.filter(name=target).exists(): + domain, _ = Domain.objects.get_or_create(name=target) + + # Create or update DomainInfo + domain_info_obj, created = DomainInfo.objects.get_or_create(domain=domain) + + # Update basic domain information + domain_info_obj.dnssec = domain_info.get('dnssec', False) + domain_info_obj.created = domain_info.get('created') + domain_info_obj.updated = domain_info.get('updated') + domain_info_obj.expires = domain_info.get('expires') + domain_info_obj.whois_server = domain_info.get('whois_server') + domain_info_obj.geolocation_iso = domain_info.get('registrant_country') + + # Save or update Registrar + registrar, _ = Registrar.objects.get_or_create( + name=domain_info.get('registrar_name', ''), defaults={ - 'organization': domain_info.get(f'{role}_organization'), - 'address': domain_info.get(f'{role}_address'), - 'city': domain_info.get(f'{role}_city'), - 'state': domain_info.get(f'{role}_state'), - 'zip_code': domain_info.get(f'{role}_zip_code'), - 'country': domain_info.get(f'{role}_country'), - 'email': domain_info.get(f'{role}_email'), - 'phone': domain_info.get(f'{role}_phone'), - 'fax': domain_info.get(f'{role}_fax'), - 'id_str': domain_info.get(f'{role}_id'), + 'email': domain_info.get('registrar_email'), + 'phone': domain_info.get('registrar_phone'), + 'url': domain_info.get('registrar_url'), } ) - setattr(domain_info_obj, role, registration) - - # Save domain statuses - domain_info_obj.status.clear() - for status in domain_info.get('status', []): - status_obj, _ = WhoisStatus.objects.get_or_create(name=status) - domain_info_obj.status.add(status_obj) - - # Save name servers - domain_info_obj.name_servers.clear() - for ns in domain_info.get('ns_records', []): - ns_obj, _ = NameServer.objects.get_or_create(name=ns) - domain_info_obj.name_servers.add(ns_obj) - - # Save DNS records - domain_info_obj.dns_records.clear() - for record_type in ['a', 'mx', 'txt']: - for record in domain_info.get(f'{record_type}_records', []): - dns_record, _ = DNSRecord.objects.get_or_create( - name=record, - type=record_type + domain_info_obj.registrar = registrar + + # Save or update Registrations (registrant, admin, tech) + for role in ['registrant', 'admin', 'tech']: + registration, _ = DomainRegistration.objects.get_or_create( + name=domain_info.get(f'{role}_name', ''), + defaults={ + 'organization': domain_info.get(f'{role}_organization'), + 'address': domain_info.get(f'{role}_address'), + 'city': domain_info.get(f'{role}_city'), + 'state': domain_info.get(f'{role}_state'), + 'zip_code': domain_info.get(f'{role}_zip_code'), + 'country': domain_info.get(f'{role}_country'), + 'email': domain_info.get(f'{role}_email'), + 'phone': domain_info.get(f'{role}_phone'), + 'fax': domain_info.get(f'{role}_fax'), + 'id_str': domain_info.get(f'{role}_id'), + } ) - domain_info_obj.dns_records.add(dns_record) - - # Save related domains and TLDs - domain_info_obj.related_domains.clear() - for related_domain in domain_info.get('related_domains', []): - related_domain_obj, _ = RelatedDomain.objects.get_or_create(name=related_domain) - domain_info_obj.related_domains.add(related_domain_obj) - - domain_info_obj.related_tlds.clear() - for related_tld in domain_info.get('related_tlds', []): - related_tld_obj, _ = RelatedDomain.objects.get_or_create(name=related_tld) - domain_info_obj.related_tlds.add(related_tld_obj) - - # Save historical IPs - domain_info_obj.historical_ips.clear() - for ip_info in domain_info.get('historical_ips', []): - historical_ip, _ = HistoricalIP.objects.get_or_create( - ip=ip_info['ip'], - defaults={ - 'owner': ip_info.get('owner'), - 'location': ip_info.get('location'), - 'last_seen': ip_info.get('last_seen'), - } - ) - domain_info_obj.historical_ips.add(historical_ip) + setattr(domain_info_obj, role, registration) + + # Save domain statuses + domain_info_obj.status.clear() + for status in domain_info.get('status', []): + status_obj, _ = WhoisStatus.objects.get_or_create(name=status) + domain_info_obj.status.add(status_obj) + + # Save name servers + domain_info_obj.name_servers.clear() + for ns in domain_info.get('ns_records', []): + ns_obj, _ = NameServer.objects.get_or_create(name=ns) + domain_info_obj.name_servers.add(ns_obj) + + # Save DNS records + domain_info_obj.dns_records.clear() + for record_type in ['a', 'mx', 'txt']: + for record in domain_info.get(f'{record_type}_records', []): + dns_record, _ = DNSRecord.objects.get_or_create( + name=record, + type=record_type + ) + domain_info_obj.dns_records.add(dns_record) + + # Save related domains and TLDs + domain_info_obj.related_domains.clear() + for related_domain in domain_info.get('related_domains', []): + related_domain_obj, _ = RelatedDomain.objects.get_or_create(name=related_domain) + domain_info_obj.related_domains.add(related_domain_obj) + + domain_info_obj.related_tlds.clear() + for related_tld in domain_info.get('related_tlds', []): + related_tld_obj, _ = RelatedDomain.objects.get_or_create(name=related_tld) + domain_info_obj.related_tlds.add(related_tld_obj) + + # Save historical IPs + domain_info_obj.historical_ips.clear() + for ip_info in domain_info.get('historical_ips', []): + historical_ip, _ = HistoricalIP.objects.get_or_create( + ip=ip_info['ip'], + defaults={ + 'owner': ip_info.get('owner'), + 'location': ip_info.get('location'), + 'last_seen': ip_info.get('last_seen'), + } + ) + domain_info_obj.historical_ips.add(historical_ip) - # Save the DomainInfo object - domain_info_obj.save() + # Save the DomainInfo object + domain_info_obj.save() - # Update the Domain object with the new DomainInfo - domain.domain_info = domain_info_obj - domain.save() + # Update the Domain object with the new DomainInfo + domain.domain_info = domain_info_obj + domain.save() - return domain_info_obj + return domain_info_obj diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index 94afdb43d..54c09fa8d 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -3727,7 +3727,7 @@ def query_whois(target, force_reload_whois=False): whois_data = whois_data['data'] parse_whois_data(domain_info, whois_data) - # saved_domain_info = save_domain_info_to_db(target, domain_info) + saved_domain_info = save_domain_info_to_db(target, domain_info) return format_whois_response(domain_info) From 36c99b40d44b8ada136f69c555ddd69a5d9ceb0b Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Mon, 12 Aug 2024 20:10:44 +0530 Subject: [PATCH 06/11] fix domain info target as key --- web/reNgine/common_func.py | 21 +++++++++++++++++---- web/static/custom/custom.js | 1 + web/targetApp/models.py | 2 +- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index 2a53fb03f..f52c29637 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -1181,15 +1181,25 @@ def get_domain_info_from_db(target): if not domain.insert_date: domain.insert_date = timezone.now() domain.save() - return extract_domain_info(domain.domain_info) + return extract_domain_info(domain) except Domain.DoesNotExist: return None -def extract_domain_info(domain_info_db): - """Extract domain info from the domain_info_db.""" - if not domain_info_db: +def extract_domain_info(domain): + """ + Extract domain info from the domain_info_db. + Args: + domain: Domain object + + Returns: + DottedDict: The domain info object. + """ + if not domain: return DottedDict() + domain_name = domain.name + domain_info_db = domain.domain_info + try: domain_info = DottedDict({ 'dnssec': domain_info_db.dnssec, @@ -1245,6 +1255,8 @@ def extract_domain_info(domain_info_db): } for ip in domain_info_db.historical_ips.all() ] + + domain_info['target'] = domain_name except Exception as e: logger.error(f'Error while extracting domain info: {e}') domain_info = DottedDict() @@ -1260,6 +1272,7 @@ def format_whois_response(domain_info): Returns: dict: The formatted whois response. """ + print(domain_info) return { 'status': True, 'target': domain_info.get('target'), diff --git a/web/static/custom/custom.js b/web/static/custom/custom.js index b34095f93..490155324 100644 --- a/web/static/custom/custom.js +++ b/web/static/custom/custom.js @@ -1562,6 +1562,7 @@ function get_domain_whois(domain_name, show_add_target_btn = false) { function display_whois_on_modal(response, show_add_target_btn=false) { + console.log(response); // this function will display whois data on modal, should be followed after get_domain_whois() $('#modal_dialog').modal('show'); $('#modal-content').empty(); diff --git a/web/targetApp/models.py b/web/targetApp/models.py index 1ec43e95b..9358ede4a 100644 --- a/web/targetApp/models.py +++ b/web/targetApp/models.py @@ -11,7 +11,7 @@ class HistoricalIP(models.Model): last_seen = models.CharField(max_length=500) def __str__(self): - return self.name + return self.ip class RelatedDomain(models.Model): From eada1d2a7682098e22cf61104842441e2411a255 Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Mon, 12 Aug 2024 20:54:01 +0530 Subject: [PATCH 07/11] fix whois dict error --- web/reNgine/common_func.py | 7 +++---- web/reNgine/tasks.py | 3 +++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index f52c29637..e988f883e 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -569,7 +569,7 @@ def get_cms_details(url): try: shutil.rmtree(cms_dir_path) except Exception as e: - print(e) + logger.error(e) return response @@ -972,10 +972,10 @@ def reverse_whois(lookup_keyword): try: for row in table or []: dom = row.findAll('td')[0].getText() - created_on = row.findAll('td')[1].getText() + # created_on = row.findAll('td')[1].getText() TODO: add this in 3.0 if dom == 'Domain Name': continue - domains.append({'name': dom, 'created_on': created_on}) + domains.append(dom) except Exception as e: logger.error(f'Error while fetching reverse whois info: {e}') return domains @@ -1272,7 +1272,6 @@ def format_whois_response(domain_info): Returns: dict: The formatted whois response. """ - print(domain_info) return { 'status': True, 'target': domain_info.get('target'), diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index 54c09fa8d..bc226c76e 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -3704,10 +3704,13 @@ def query_whois(target, force_reload_whois=False): logger.info(f'Querying WHOIS information for {target} from WHOIS server...') domain_info = DottedDict() + domain_info.target = target domain_info.historical_ips = get_domain_historical_ip_address(target) domain_info.related_tlds, tlsx_related_domain = fetch_related_tlds_and_domains(target) + logger.info('Identified historical ips, related TLDs and domains') + related_domains = reverse_whois(target) if tlsx_related_domain: related_domains += tlsx_related_domain From e507be071c4106b86c7bc4e3655cb92f1935aa0b Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Mon, 12 Aug 2024 21:34:23 +0530 Subject: [PATCH 08/11] fix: improve exception handling, improve lookup by introducing futures, get whois data from netlas as well --- web/reNgine/common_func.py | 2 + web/reNgine/tasks.py | 102 ++++++++++++++++++++++++++----------- 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index e988f883e..d334a4aa0 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -950,6 +950,7 @@ def reverse_whois(lookup_keyword): Input: lookup keyword like email or registrar name Returns a list of domains as string. ''' + logger.info(f'Querying reverse whois for {lookup_keyword}') url = f"https://viewdns.info:443/reversewhois/?q={lookup_keyword}" headers = { "Sec-Ch-Ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"104\"", @@ -987,6 +988,7 @@ def get_domain_historical_ip_address(domain): This function will use viewdns to fetch historical IP address for a domain ''' + logger.info(f'Fetching historical IP address for domain {domain}') url = f"https://viewdns.info/iphistory/?domain={domain}" headers = { "Sec-Ch-Ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"104\"", diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index bc226c76e..12ff0329d 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -3693,45 +3693,85 @@ def query_whois(target, force_reload_whois=False): Returns: dict: WHOIS information. """ - if not force_reload_whois: - logger.info(f'Querying WHOIS information for {target} from db...') - domain_info = get_domain_info_from_db(target) - if domain_info: - return format_whois_response(domain_info) - - # Query WHOIS information as not found in db - logger.info(f'Whois info not found in db') - logger.info(f'Querying WHOIS information for {target} from WHOIS server...') + try: + if not force_reload_whois: + logger.info(f'Querying WHOIS information for {target} from db...') + domain_info = get_domain_info_from_db(target) + if domain_info: + return format_whois_response(domain_info) + + # Query WHOIS information as not found in db + logger.info(f'Whois info not found in db') + logger.info(f'Querying WHOIS information for {target} from WHOIS server...') + + domain_info = DottedDict() + domain_info.target = target + + whois_data = None + related_domains = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + futures_func = { + executor.submit(get_domain_historical_ip_address, target): 'historical_ips', + executor.submit(fetch_related_tlds_and_domains, target): 'related_tlds_and_domains', + executor.submit(reverse_whois, target): 'reverse_whois', + executor.submit(fetch_whois_data_using_netlas, target): 'whois_data', + } - domain_info = DottedDict() - domain_info.target = target + for future in concurrent.futures.as_completed(futures_func): + func_name = futures_func[future] + try: + result = future.result() + if func_name == 'historical_ips': + domain_info.historical_ips = result + elif func_name == 'related_tlds_and_domains': + domain_info.related_tlds, tlsx_related_domain = result + elif func_name == 'reverse_whois': + related_domains = result + elif func_name == 'whois_data': + whois_data = result + + logger.debug('*'*100) + logger.info(f'Task {func_name} finished for target {target}') + logger.debug(result) + logger.debug('*'*100) - domain_info.historical_ips = get_domain_historical_ip_address(target) - domain_info.related_tlds, tlsx_related_domain = fetch_related_tlds_and_domains(target) + except Exception as e: + logger.error(f'An error occurred while fetching {func_name} for {target}: {str(e)}') + continue - logger.info('Identified historical ips, related TLDs and domains') + if 'tlsx_related_domain' in locals(): + related_domains += tlsx_related_domain - related_domains = reverse_whois(target) - if tlsx_related_domain: - related_domains += tlsx_related_domain - - # remove duplicate ones - related_domains = list(set(related_domains)) - domain_info.related_domains = related_domains + # whois_data = fetch_whois_data_using_netlas(target) + if whois_data is None or not whois_data['status']: + return { + 'status': False, + 'target': target, + 'result': whois_data['message'] + } + + + whois_data = whois_data.get('data', {}) + # related domains can also be fetched from whois_data + whois_related_domains = whois_data.get('related_domains', []) + related_domains += whois_related_domains + + # remove duplicate ones + related_domains = list(set(related_domains)) + domain_info.related_domains = related_domains - whois_data = fetch_whois_data_using_netlas(target) - if not whois_data['status']: + + parse_whois_data(domain_info, whois_data) + saved_domain_info = save_domain_info_to_db(target, domain_info) + return format_whois_response(domain_info) + except Exception as e: + logger.error(f'An error occurred while querying WHOIS information for {target}: {str(e)}') return { 'status': False, 'target': target, - 'result': whois_data['message'] + 'result': f'An error occurred while querying WHOIS information for {target}: {str(e)}' } - - whois_data = whois_data['data'] - - parse_whois_data(domain_info, whois_data) - saved_domain_info = save_domain_info_to_db(target, domain_info) - return format_whois_response(domain_info) def fetch_related_tlds_and_domains(domain): @@ -3745,6 +3785,7 @@ def fetch_related_tlds_and_domains(domain): Returns: tuple: A tuple containing two lists (related_tlds, related_domains). """ + logger.info(f"Fetching related TLDs and domains for {domain}") related_tlds = set() related_domains = set() @@ -3785,6 +3826,7 @@ def fetch_whois_data_using_netlas(target): Returns: dict: WHOIS information. """ + logger.info(f'Fetching WHOIS data for {target} using Netlas...') command = f'netlas host {target} -f json' netlas_key = get_netlas_key() if netlas_key: From 424493c5c7e5bd4672d552ca56e8bb5d8270fa87 Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Mon, 12 Aug 2024 22:00:04 +0530 Subject: [PATCH 09/11] add safe get if whois data is not valid --- web/reNgine/common_func.py | 18 +++++++++--------- web/reNgine/tasks.py | 14 ++++---------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index d334a4aa0..404a7469a 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -1348,11 +1348,11 @@ def parse_whois_data(domain_info, whois_data): # Parse basic domain information domain_info.update({ - 'created': whois.get('created_date'), - 'expires': whois.get('expiration_date'), - 'updated': whois.get('updated_date'), - 'whois_server': whois.get('whois_server'), - 'dnssec': bool(whois.get('dnssec')), + 'created': whois.get('created_date', None), + 'expires': whois.get('expiration_date', None), + 'updated': whois.get('updated_date', None), + 'whois_server': whois.get('whois_server', None), + 'dnssec': bool(whois.get('dnssec', False)), 'status': whois.get('status', []), }) @@ -1373,10 +1373,10 @@ def parse_whois_data(domain_info, whois_data): def parse_registrar_info(domain_info, registrar): """Parse registrar information.""" domain_info.update({ - 'registrar_name': registrar.get('name'), - 'registrar_email': registrar.get('email'), - 'registrar_phone': registrar.get('phone'), - 'registrar_url': registrar.get('url'), + 'registrar_name': registrar.get('name', None), + 'registrar_email': registrar.get('email', None), + 'registrar_phone': registrar.get('phone', None), + 'registrar_url': registrar.get('url', None), }) def parse_registration_info(domain_info, registration, role): diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index 12ff0329d..5c118b23b 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -3740,23 +3740,17 @@ def query_whois(target, force_reload_whois=False): logger.error(f'An error occurred while fetching {func_name} for {target}: {str(e)}') continue + logger.info(f'All concurrent whosi lookup tasks finished for target {target}') + if 'tlsx_related_domain' in locals(): related_domains += tlsx_related_domain - - # whois_data = fetch_whois_data_using_netlas(target) - if whois_data is None or not whois_data['status']: - return { - 'status': False, - 'target': target, - 'result': whois_data['message'] - } - whois_data = whois_data.get('data', {}) + # related domains can also be fetched from whois_data whois_related_domains = whois_data.get('related_domains', []) related_domains += whois_related_domains - + # remove duplicate ones related_domains = list(set(related_domains)) domain_info.related_domains = related_domains From 662698863f7e8fc2ad3d802cbbe8b56f0d42243c Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Tue, 13 Aug 2024 11:27:36 +0530 Subject: [PATCH 10/11] update whois comments --- web/reNgine/tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index 5c118b23b..66a45498d 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -3694,6 +3694,8 @@ def query_whois(target, force_reload_whois=False): dict: WHOIS information. """ try: + # TODO: Implement cache whois only for 48 hours otherwise get from whois server + # TODO: in 3.0 if not force_reload_whois: logger.info(f'Querying WHOIS information for {target} from db...') domain_info = get_domain_info_from_db(target) From bfe5a8ae1ae1e314f285031c18e690156fa9f775 Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Tue, 13 Aug 2024 19:33:35 +0530 Subject: [PATCH 11/11] remove unused import --- web/reNgine/common_func.py | 1 - 1 file changed, 1 deletion(-) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index 404a7469a..bab9a9646 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -20,7 +20,6 @@ from discord_webhook import DiscordEmbed, DiscordWebhook from django.db.models import Q from dotted_dict import DottedDict -from tempfile import NamedTemporaryFile from reNgine.common_serializers import * from reNgine.definitions import *