diff --git a/web/api/views.py b/web/api/views.py index e2c7805a0..f5df4bbbf 100644 --- a/web/api/views.py +++ b/web/api/views.py @@ -1126,13 +1126,15 @@ def get(self, request): class Whois(APIView): def get(self, request): req = self.request - ip_domain = req.query_params.get('ip_domain') - if not (validators.domain(ip_domain) or validators.ipv4(ip_domain) or validators.ipv6(ip_domain)): - print(f'Ip address or domain "{ip_domain}" did not pass validator.') + target = req.query_params.get('target') + if not target: + return Response({'status': False, 'message': 'Target IP/Domain required!'}) + if not (validators.domain(target) or validators.ipv4(target) or validators.ipv6(target)): + print(f'Ip address or domain "{target}" did not pass validator.') return Response({'status': False, 'message': 'Invalid domain or IP'}) is_force_update = req.query_params.get('is_reload') is_force_update = True if is_force_update and 'true' == is_force_update.lower() else False - task = query_whois.apply_async(args=(ip_domain,is_force_update)) + task = query_whois.apply_async(args=(target,is_force_update)) response = task.wait() return Response(response) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index c9c50fe5d..bab9a9646 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -19,6 +19,7 @@ from celery.utils.log import get_task_logger from discord_webhook import DiscordEmbed, DiscordWebhook from django.db.models import Q +from dotted_dict import DottedDict from reNgine.common_serializers import * from reNgine.definitions import * @@ -567,7 +568,7 @@ def get_cms_details(url): try: shutil.rmtree(cms_dir_path) except Exception as e: - print(e) + logger.error(e) return response @@ -948,6 +949,7 @@ def reverse_whois(lookup_keyword): Input: lookup keyword like email or registrar name Returns a list of domains as string. ''' + logger.info(f'Querying reverse whois for {lookup_keyword}') url = f"https://viewdns.info:443/reversewhois/?q={lookup_keyword}" headers = { "Sec-Ch-Ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"104\"", @@ -967,12 +969,15 @@ def reverse_whois(lookup_keyword): response = requests.get(url, headers=headers) soup = BeautifulSoup(response.content, 'lxml') table = soup.find("table", {"border" : "1"}) - for row in table or []: - dom = row.findAll('td')[0].getText() - created_on = row.findAll('td')[1].getText() - if dom == 'Domain Name': - continue - domains.append({'name': dom, 'created_on': created_on}) + try: + for row in table or []: + dom = row.findAll('td')[0].getText() + # created_on = row.findAll('td')[1].getText() TODO: add this in 3.0 + if dom == 'Domain Name': + continue + domains.append(dom) + except Exception as e: + logger.error(f'Error while fetching reverse whois info: {e}') return domains @@ -982,6 +987,7 @@ def get_domain_historical_ip_address(domain): This function will use viewdns to fetch historical IP address for a domain ''' + logger.info(f'Fetching historical IP address for domain {domain}') url = f"https://viewdns.info/iphistory/?domain={domain}" headers = { "Sec-Ch-Ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"104\"", @@ -1158,4 +1164,342 @@ def update_or_create_port(port_number, service_name=None, description=None): ) created = True finally: - return port, created \ No newline at end of file + return port, created + + +def get_domain_info_from_db(target): + """ + Retrieves the Domain object from the database using the target domain name. + + Args: + target (str): The domain name to search for. + + Returns: + Domain: The Domain object if found, otherwise None. + """ + try: + domain = Domain.objects.get(name=target) + if not domain.insert_date: + domain.insert_date = timezone.now() + domain.save() + return extract_domain_info(domain) + except Domain.DoesNotExist: + return None + +def extract_domain_info(domain): + """ + Extract domain info from the domain_info_db. + Args: + domain: Domain object + + Returns: + DottedDict: The domain info object. + """ + if not domain: + return DottedDict() + + domain_name = domain.name + domain_info_db = domain.domain_info + + try: + domain_info = DottedDict({ + 'dnssec': domain_info_db.dnssec, + 'created': domain_info_db.created, + 'updated': domain_info_db.updated, + 'expires': domain_info_db.expires, + 'geolocation_iso': domain_info_db.geolocation_iso, + 'status': [status.name for status in domain_info_db.status.all()], + 'whois_server': domain_info_db.whois_server, + 'ns_records': [ns.name for ns in domain_info_db.name_servers.all()], + }) + + # Extract registrar info + registrar = domain_info_db.registrar + if registrar: + domain_info.update({ + 'registrar_name': registrar.name, + 'registrar_phone': registrar.phone, + 'registrar_email': registrar.email, + 'registrar_url': registrar.url, + }) + + # Extract registration info (registrant, admin, tech) + for role in ['registrant', 'admin', 'tech']: + registration = getattr(domain_info_db, role) + if registration: + domain_info.update({ + f'{role}_{key}': getattr(registration, key) + for key in ['name', 'id_str', 'organization', 'city', 'state', 'zip_code', + 'country', 'phone', 'fax', 'email', 'address'] + }) + + # Extract DNS records + dns_records = domain_info_db.dns_records.all() + for record_type in ['a', 'txt', 'mx']: + domain_info[f'{record_type}_records'] = [ + record.name for record in dns_records if record.type == record_type + ] + + # Extract related domains and TLDs + domain_info.update({ + 'related_tlds': [domain.name for domain in domain_info_db.related_tlds.all()], + 'related_domains': [domain.name for domain in domain_info_db.related_domains.all()], + }) + + # Extract historical IPs + domain_info['historical_ips'] = [ + { + 'ip': ip.ip, + 'owner': ip.owner, + 'location': ip.location, + 'last_seen': ip.last_seen + } + for ip in domain_info_db.historical_ips.all() + ] + + domain_info['target'] = domain_name + except Exception as e: + logger.error(f'Error while extracting domain info: {e}') + domain_info = DottedDict() + + return domain_info + + +def format_whois_response(domain_info): + """ + Format the domain info for the whois response. + Args: + domain_info (DottedDict): The domain info object. + Returns: + dict: The formatted whois response. + """ + return { + 'status': True, + 'target': domain_info.get('target'), + 'dnssec': domain_info.get('dnssec'), + 'created': domain_info.get('created'), + 'updated': domain_info.get('updated'), + 'expires': domain_info.get('expires'), + 'geolocation_iso': domain_info.get('registrant_country'), + 'domain_statuses': domain_info.get('status'), + 'whois_server': domain_info.get('whois_server'), + 'dns': { + 'a': domain_info.get('a_records'), + 'mx': domain_info.get('mx_records'), + 'txt': domain_info.get('txt_records'), + }, + 'registrar': { + 'name': domain_info.get('registrar_name'), + 'phone': domain_info.get('registrar_phone'), + 'email': domain_info.get('registrar_email'), + 'url': domain_info.get('registrar_url'), + }, + 'registrant': { + 'name': domain_info.get('registrant_name'), + 'id': domain_info.get('registrant_id'), + 'organization': domain_info.get('registrant_organization'), + 'address': domain_info.get('registrant_address'), + 'city': domain_info.get('registrant_city'), + 'state': domain_info.get('registrant_state'), + 'zipcode': domain_info.get('registrant_zip_code'), + 'country': domain_info.get('registrant_country'), + 'phone': domain_info.get('registrant_phone'), + 'fax': domain_info.get('registrant_fax'), + 'email': domain_info.get('registrant_email'), + }, + 'admin': { + 'name': domain_info.get('admin_name'), + 'id': domain_info.get('admin_id'), + 'organization': domain_info.get('admin_organization'), + 'address':domain_info.get('admin_address'), + 'city': domain_info.get('admin_city'), + 'state': domain_info.get('admin_state'), + 'zipcode': domain_info.get('admin_zip_code'), + 'country': domain_info.get('admin_country'), + 'phone': domain_info.get('admin_phone'), + 'fax': domain_info.get('admin_fax'), + 'email': domain_info.get('admin_email'), + }, + 'technical_contact': { + 'name': domain_info.get('tech_name'), + 'id': domain_info.get('tech_id'), + 'organization': domain_info.get('tech_organization'), + 'address': domain_info.get('tech_address'), + 'city': domain_info.get('tech_city'), + 'state': domain_info.get('tech_state'), + 'zipcode': domain_info.get('tech_zip_code'), + 'country': domain_info.get('tech_country'), + 'phone': domain_info.get('tech_phone'), + 'fax': domain_info.get('tech_fax'), + 'email': domain_info.get('tech_email'), + }, + 'nameservers': domain_info.get('ns_records'), + 'related_domains': domain_info.get('related_domains'), + 'related_tlds': domain_info.get('related_tlds'), + 'historical_ips': domain_info.get('historical_ips'), + } + + +def parse_whois_data(domain_info, whois_data): + """Parse WHOIS data and update domain_info.""" + whois = whois_data.get('whois', {}) + dns = whois_data.get('dns', {}) + + # Parse basic domain information + domain_info.update({ + 'created': whois.get('created_date', None), + 'expires': whois.get('expiration_date', None), + 'updated': whois.get('updated_date', None), + 'whois_server': whois.get('whois_server', None), + 'dnssec': bool(whois.get('dnssec', False)), + 'status': whois.get('status', []), + }) + + # Parse registrar information + parse_registrar_info(domain_info, whois.get('registrar', {})) + + # Parse registration information + for role in ['registrant', 'administrative', 'technical']: + parse_registration_info(domain_info, whois.get(role, {}), role) + + # Parse DNS records + parse_dns_records(domain_info, dns) + + # Parse name servers + domain_info.ns_records = dns.get('ns', []) + + +def parse_registrar_info(domain_info, registrar): + """Parse registrar information.""" + domain_info.update({ + 'registrar_name': registrar.get('name', None), + 'registrar_email': registrar.get('email', None), + 'registrar_phone': registrar.get('phone', None), + 'registrar_url': registrar.get('url', None), + }) + +def parse_registration_info(domain_info, registration, role): + """Parse registration information for registrant, admin, and tech contacts.""" + role_prefix = role if role != 'administrative' else 'admin' + domain_info.update({ + f'{role_prefix}_{key}': value + for key, value in registration.items() + if key in ['name', 'id', 'organization', 'street', 'city', 'province', 'postal_code', 'country', 'phone', 'fax'] + }) + + # Handle email separately to apply regex + email = registration.get('email') + if email: + email_match = EMAIL_REGEX.search(str(email)) + domain_info[f'{role_prefix}_email'] = email_match.group(0) if email_match else None + +def parse_dns_records(domain_info, dns): + """Parse DNS records.""" + domain_info.update({ + 'mx_records': dns.get('mx', []), + 'txt_records': dns.get('txt', []), + 'a_records': dns.get('a', []), + 'ns_records': dns.get('ns', []), + }) + + +def save_domain_info_to_db(target, domain_info): + """Save domain info to the database.""" + if Domain.objects.filter(name=target).exists(): + domain, _ = Domain.objects.get_or_create(name=target) + + # Create or update DomainInfo + domain_info_obj, created = DomainInfo.objects.get_or_create(domain=domain) + + # Update basic domain information + domain_info_obj.dnssec = domain_info.get('dnssec', False) + domain_info_obj.created = domain_info.get('created') + domain_info_obj.updated = domain_info.get('updated') + domain_info_obj.expires = domain_info.get('expires') + domain_info_obj.whois_server = domain_info.get('whois_server') + domain_info_obj.geolocation_iso = domain_info.get('registrant_country') + + # Save or update Registrar + registrar, _ = Registrar.objects.get_or_create( + name=domain_info.get('registrar_name', ''), + defaults={ + 'email': domain_info.get('registrar_email'), + 'phone': domain_info.get('registrar_phone'), + 'url': domain_info.get('registrar_url'), + } + ) + domain_info_obj.registrar = registrar + + # Save or update Registrations (registrant, admin, tech) + for role in ['registrant', 'admin', 'tech']: + registration, _ = DomainRegistration.objects.get_or_create( + name=domain_info.get(f'{role}_name', ''), + defaults={ + 'organization': domain_info.get(f'{role}_organization'), + 'address': domain_info.get(f'{role}_address'), + 'city': domain_info.get(f'{role}_city'), + 'state': domain_info.get(f'{role}_state'), + 'zip_code': domain_info.get(f'{role}_zip_code'), + 'country': domain_info.get(f'{role}_country'), + 'email': domain_info.get(f'{role}_email'), + 'phone': domain_info.get(f'{role}_phone'), + 'fax': domain_info.get(f'{role}_fax'), + 'id_str': domain_info.get(f'{role}_id'), + } + ) + setattr(domain_info_obj, role, registration) + + # Save domain statuses + domain_info_obj.status.clear() + for status in domain_info.get('status', []): + status_obj, _ = WhoisStatus.objects.get_or_create(name=status) + domain_info_obj.status.add(status_obj) + + # Save name servers + domain_info_obj.name_servers.clear() + for ns in domain_info.get('ns_records', []): + ns_obj, _ = NameServer.objects.get_or_create(name=ns) + domain_info_obj.name_servers.add(ns_obj) + + # Save DNS records + domain_info_obj.dns_records.clear() + for record_type in ['a', 'mx', 'txt']: + for record in domain_info.get(f'{record_type}_records', []): + dns_record, _ = DNSRecord.objects.get_or_create( + name=record, + type=record_type + ) + domain_info_obj.dns_records.add(dns_record) + + # Save related domains and TLDs + domain_info_obj.related_domains.clear() + for related_domain in domain_info.get('related_domains', []): + related_domain_obj, _ = RelatedDomain.objects.get_or_create(name=related_domain) + domain_info_obj.related_domains.add(related_domain_obj) + + domain_info_obj.related_tlds.clear() + for related_tld in domain_info.get('related_tlds', []): + related_tld_obj, _ = RelatedDomain.objects.get_or_create(name=related_tld) + domain_info_obj.related_tlds.add(related_tld_obj) + + # Save historical IPs + domain_info_obj.historical_ips.clear() + for ip_info in domain_info.get('historical_ips', []): + historical_ip, _ = HistoricalIP.objects.get_or_create( + ip=ip_info['ip'], + defaults={ + 'owner': ip_info.get('owner'), + 'location': ip_info.get('location'), + 'last_seen': ip_info.get('last_seen'), + } + ) + domain_info_obj.historical_ips.add(historical_ip) + + # Save the DomainInfo object + domain_info_obj.save() + + # Update the Domain object with the new DomainInfo + domain.domain_info = domain_info_obj + domain.save() + + return domain_info_obj diff --git a/web/reNgine/definitions.py b/web/reNgine/definitions.py index e5b80d577..5a9f6f0bf 100644 --- a/web/reNgine/definitions.py +++ b/web/reNgine/definitions.py @@ -11,7 +11,7 @@ # TOOLS DEFINITIONS ############################################################################### -EMAIL_REGEX = re.compile(r'[a-z0-9\.\-+_]+@[a-z0-9\.\-+_]+\.[a-z]+') +EMAIL_REGEX = re.compile(r'[\w\.-]+@[\w\.-]+') ############################################################################### # YAML CONFIG DEFINITIONS diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index 7ab9ed963..66a45498d 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -3684,435 +3684,201 @@ def geo_localize(host, ip_id=None): @app.task(name='query_whois', bind=False, queue='query_whois_queue') -def query_whois(ip_domain, force_reload_whois=False): +def query_whois(target, force_reload_whois=False): """Query WHOIS information for an IP or a domain name. Args: - ip_domain (str): IP address or domain name. + target (str): IP address or domain name. save_domain (bool): Whether to save domain or not, default False Returns: dict: WHOIS information. """ - if not force_reload_whois and Domain.objects.filter(name=ip_domain).exists() and Domain.objects.get(name=ip_domain).domain_info: - domain = Domain.objects.get(name=ip_domain) - if not domain.insert_date: - domain.insert_date = timezone.now() - domain.save() - domain_info_db = domain.domain_info - domain_info = DottedDict( - dnssec=domain_info_db.dnssec, - created=domain_info_db.created, - updated=domain_info_db.updated, - expires=domain_info_db.expires, - geolocation_iso=domain_info_db.geolocation_iso, - status=[status['name'] for status in DomainWhoisStatusSerializer(domain_info_db.status, many=True).data], - whois_server=domain_info_db.whois_server, - ns_records=[ns['name'] for ns in NameServersSerializer(domain_info_db.name_servers, many=True).data], - registrar_name=domain_info_db.registrar.name, - registrar_phone=domain_info_db.registrar.phone, - registrar_email=domain_info_db.registrar.email, - registrar_url=domain_info_db.registrar.url, - registrant_name=domain_info_db.registrant.name, - registrant_id=domain_info_db.registrant.id_str, - registrant_organization=domain_info_db.registrant.organization, - registrant_city=domain_info_db.registrant.city, - registrant_state=domain_info_db.registrant.state, - registrant_zip_code=domain_info_db.registrant.zip_code, - registrant_country=domain_info_db.registrant.country, - registrant_phone=domain_info_db.registrant.phone, - registrant_fax=domain_info_db.registrant.fax, - registrant_email=domain_info_db.registrant.email, - registrant_address=domain_info_db.registrant.address, - admin_name=domain_info_db.admin.name, - admin_id=domain_info_db.admin.id_str, - admin_organization=domain_info_db.admin.organization, - admin_city=domain_info_db.admin.city, - admin_state=domain_info_db.admin.state, - admin_zip_code=domain_info_db.admin.zip_code, - admin_country=domain_info_db.admin.country, - admin_phone=domain_info_db.admin.phone, - admin_fax=domain_info_db.admin.fax, - admin_email=domain_info_db.admin.email, - admin_address=domain_info_db.admin.address, - tech_name=domain_info_db.tech.name, - tech_id=domain_info_db.tech.id_str, - tech_organization=domain_info_db.tech.organization, - tech_city=domain_info_db.tech.city, - tech_state=domain_info_db.tech.state, - tech_zip_code=domain_info_db.tech.zip_code, - tech_country=domain_info_db.tech.country, - tech_phone=domain_info_db.tech.phone, - tech_fax=domain_info_db.tech.fax, - tech_email=domain_info_db.tech.email, - tech_address=domain_info_db.tech.address, - related_tlds=[domain['name'] for domain in RelatedDomainSerializer(domain_info_db.related_tlds, many=True).data], - related_domains=[domain['name'] for domain in RelatedDomainSerializer(domain_info_db.related_domains, many=True).data], - historical_ips=[ip for ip in HistoricalIPSerializer(domain_info_db.historical_ips, many=True).data], - ) - if domain_info_db.dns_records: - a_records = [] - txt_records = [] - mx_records = [] - dns_records = [{'name': dns['name'], 'type': dns['type']} for dns in DomainDNSRecordSerializer(domain_info_db.dns_records, many=True).data] - for dns in dns_records: - if dns['type'] == 'a': - a_records.append(dns['name']) - elif dns['type'] == 'txt': - txt_records.append(dns['name']) - elif dns['type'] == 'mx': - mx_records.append(dns['name']) - domain_info.a_records = a_records - domain_info.txt_records = txt_records - domain_info.mx_records = mx_records - else: - logger.info(f'Domain info for "{ip_domain}" not found in DB, querying whois') + try: + # TODO: Implement cache whois only for 48 hours otherwise get from whois server + # TODO: in 3.0 + if not force_reload_whois: + logger.info(f'Querying WHOIS information for {target} from db...') + domain_info = get_domain_info_from_db(target) + if domain_info: + return format_whois_response(domain_info) + + # Query WHOIS information as not found in db + logger.info(f'Whois info not found in db') + logger.info(f'Querying WHOIS information for {target} from WHOIS server...') + domain_info = DottedDict() - # find domain historical ip - try: - historical_ips = get_domain_historical_ip_address(ip_domain) - domain_info.historical_ips = historical_ips - except Exception as e: - logger.error(f'HistoricalIP for {ip_domain} not found!\nError: {str(e)}') - historical_ips = [] - # find associated domains using ip_domain - try: - related_domains = reverse_whois(ip_domain.split('.')[0]) - except Exception as e: - logger.error(f'Associated domain not found for {ip_domain}\nError: {str(e)}') - similar_domains = [] - # find related tlds using TLSx + domain_info.target = target + + whois_data = None + related_domains = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + futures_func = { + executor.submit(get_domain_historical_ip_address, target): 'historical_ips', + executor.submit(fetch_related_tlds_and_domains, target): 'related_tlds_and_domains', + executor.submit(reverse_whois, target): 'reverse_whois', + executor.submit(fetch_whois_data_using_netlas, target): 'whois_data', + } + + for future in concurrent.futures.as_completed(futures_func): + func_name = futures_func[future] + try: + result = future.result() + if func_name == 'historical_ips': + domain_info.historical_ips = result + elif func_name == 'related_tlds_and_domains': + domain_info.related_tlds, tlsx_related_domain = result + elif func_name == 'reverse_whois': + related_domains = result + elif func_name == 'whois_data': + whois_data = result + + logger.debug('*'*100) + logger.info(f'Task {func_name} finished for target {target}') + logger.debug(result) + logger.debug('*'*100) + + except Exception as e: + logger.error(f'An error occurred while fetching {func_name} for {target}: {str(e)}') + continue + + logger.info(f'All concurrent whosi lookup tasks finished for target {target}') + + if 'tlsx_related_domain' in locals(): + related_domains += tlsx_related_domain + + whois_data = whois_data.get('data', {}) + + # related domains can also be fetched from whois_data + whois_related_domains = whois_data.get('related_domains', []) + related_domains += whois_related_domains + + # remove duplicate ones + related_domains = list(set(related_domains)) + domain_info.related_domains = related_domains + + + parse_whois_data(domain_info, whois_data) + saved_domain_info = save_domain_info_to_db(target, domain_info) + return format_whois_response(domain_info) + except Exception as e: + logger.error(f'An error occurred while querying WHOIS information for {target}: {str(e)}') + return { + 'status': False, + 'target': target, + 'result': f'An error occurred while querying WHOIS information for {target}: {str(e)}' + } + + +def fetch_related_tlds_and_domains(domain): + """ + Fetch related TLDs and domains using TLSx. + related domains are those that are not part of related TLDs. + + Args: + domain (str): The domain to find related TLDs and domains for. + + Returns: + tuple: A tuple containing two lists (related_tlds, related_domains). + """ + logger.info(f"Fetching related TLDs and domains for {domain}") + related_tlds = set() + related_domains = set() + + # Extract the base domain + extracted = tldextract.extract(domain) + base_domain = f"{extracted.domain}.{extracted.suffix}" + + cmd = f'tlsx -san -cn -silent -ro -host {domain}' + _, result = run_command(cmd, shell=True) + + for line in result.splitlines(): try: - related_tlds = [] - output_path = '/tmp/ip_domain_tlsx.txt' - tlsx_command = f'tlsx -san -cn -silent -ro -host {ip_domain} -o {output_path}' - run_command( - tlsx_command, - shell=True, - ) - tlsx_output = [] - with open(output_path) as f: - tlsx_output = f.readlines() - - tldextract_target = tldextract.extract(ip_domain) - for doms in tlsx_output: - doms = doms.strip() - tldextract_res = tldextract.extract(doms) - if ip_domain != doms and tldextract_res.domain == tldextract_target.domain and tldextract_res.subdomain == '': - related_tlds.append(doms) - - related_tlds = list(set(related_tlds)) - domain_info.related_tlds = related_tlds + line = line.strip() + if line == "": + continue + extracted_result = tldextract.extract(line) + full_domain = f"{extracted_result.domain}.{extracted_result.suffix}" + + if extracted_result.domain == extracted.domain: + if full_domain != base_domain: + related_tlds.add(full_domain) + elif extracted_result.domain != extracted.domain or extracted_result.subdomain: + related_domains.add(line) except Exception as e: - logger.error(f'Associated domain not found for {ip_domain}\nError: {str(e)}') - similar_domains = [] - - related_domains_list = [] - if Domain.objects.filter(name=ip_domain).exists(): - domain = Domain.objects.get(name=ip_domain) - db_domain_info = domain.domain_info if domain.domain_info else DomainInfo() - db_domain_info.save() - for _domain in related_domains: - domain_related = RelatedDomain.objects.get_or_create( - name=_domain['name'], - )[0] - db_domain_info.related_domains.add(domain_related) - related_domains_list.append(_domain['name']) - - for _domain in related_tlds: - domain_related = RelatedDomain.objects.get_or_create( - name=_domain, - )[0] - db_domain_info.related_tlds.add(domain_related) - - for _ip in historical_ips: - historical_ip = HistoricalIP.objects.get_or_create( - ip=_ip['ip'], - owner=_ip['owner'], - location=_ip['location'], - last_seen=_ip['last_seen'], - )[0] - db_domain_info.historical_ips.add(historical_ip) - domain.domain_info = db_domain_info - domain.save() - - command = f'netlas host {ip_domain} -f json' - # check if netlas key is provided - netlas_key = get_netlas_key() - command += f' -a {netlas_key}' if netlas_key else '' + logger.error(f"An error occurred while fetching related TLDs and domains for {domain}: {str(e)}") + continue + + logger.info(f"Found {len(related_tlds)} related TLDs and {len(related_domains)} related domains for {domain}") + return list(related_tlds), list(related_domains) + + + +def fetch_whois_data_using_netlas(target): + """ + Fetch WHOIS data using netlas. + Args: + target (str): IP address or domain name. + Returns: + dict: WHOIS information. + """ + logger.info(f'Fetching WHOIS data for {target} using Netlas...') + command = f'netlas host {target} -f json' + netlas_key = get_netlas_key() + if netlas_key: + command += f' -a {netlas_key}' + try: _, result = run_command(command, remove_ansi_sequence=True) + + # catch errors if 'Failed to parse response data' in result: - # do fallback return { - 'status': False, - 'ip_domain': ip_domain, - 'result': "Netlas limit exceeded.", + 'status': False, 'message': 'Netlas limit exceeded.' } - try: - result = json.loads(result) - logger.info(result) - whois = result.get('whois') if result.get('whois') else {} - - domain_info.created = whois.get('created_date') - domain_info.expires = whois.get('expiration_date') - domain_info.updated = whois.get('updated_date') - domain_info.whois_server = whois.get('whois_server') - - - if 'registrant' in whois: - registrant = whois.get('registrant') - domain_info.registrant_name = registrant.get('name') - domain_info.registrant_country = registrant.get('country') - domain_info.registrant_id = registrant.get('id') - domain_info.registrant_state = registrant.get('province') - domain_info.registrant_city = registrant.get('city') - domain_info.registrant_phone = registrant.get('phone') - domain_info.registrant_address = registrant.get('street') - domain_info.registrant_organization = registrant.get('organization') - domain_info.registrant_fax = registrant.get('fax') - domain_info.registrant_zip_code = registrant.get('postal_code') - email_search = EMAIL_REGEX.search(str(registrant.get('email'))) - field_content = email_search.group(0) if email_search else None - domain_info.registrant_email = field_content - - if 'administrative' in whois: - administrative = whois.get('administrative') - domain_info.admin_name = administrative.get('name') - domain_info.admin_country = administrative.get('country') - domain_info.admin_id = administrative.get('id') - domain_info.admin_state = administrative.get('province') - domain_info.admin_city = administrative.get('city') - domain_info.admin_phone = administrative.get('phone') - domain_info.admin_address = administrative.get('street') - domain_info.admin_organization = administrative.get('organization') - domain_info.admin_fax = administrative.get('fax') - domain_info.admin_zip_code = administrative.get('postal_code') - mail_search = EMAIL_REGEX.search(str(administrative.get('email'))) - field_content = email_search.group(0) if email_search else None - domain_info.admin_email = field_content - - if 'technical' in whois: - technical = whois.get('technical') - domain_info.tech_name = technical.get('name') - domain_info.tech_country = technical.get('country') - domain_info.tech_state = technical.get('province') - domain_info.tech_id = technical.get('id') - domain_info.tech_city = technical.get('city') - domain_info.tech_phone = technical.get('phone') - domain_info.tech_address = technical.get('street') - domain_info.tech_organization = technical.get('organization') - domain_info.tech_fax = technical.get('fax') - domain_info.tech_zip_code = technical.get('postal_code') - mail_search = EMAIL_REGEX.search(str(technical.get('email'))) - field_content = email_search.group(0) if email_search else None - domain_info.tech_email = field_content - - if 'dns' in result: - dns = result.get('dns') - domain_info.mx_records = dns.get('mx') - domain_info.txt_records = dns.get('txt') - domain_info.a_records = dns.get('a') - - domain_info.ns_records = whois.get('name_servers') - domain_info.dnssec = True if whois.get('dnssec') else False - domain_info.status = whois.get('status') - - if 'registrar' in whois: - registrar = whois.get('registrar') - domain_info.registrar_name = registrar.get('name') - domain_info.registrar_email = registrar.get('email') - domain_info.registrar_phone = registrar.get('phone') - domain_info.registrar_url = registrar.get('url') - - # find associated domains if registrant email is found - related_domains = reverse_whois(domain_info.get('registrant_email')) if domain_info.get('registrant_email') else [] - for _domain in related_domains: - related_domains_list.append(_domain['name']) - - # remove duplicate domains from related domains list - related_domains_list = list(set(related_domains_list)) - domain_info.related_domains = related_domains_list - - # save to db if domain exists - if Domain.objects.filter(name=ip_domain).exists(): - domain = Domain.objects.get(name=ip_domain) - db_domain_info = domain.domain_info if domain.domain_info else DomainInfo() - db_domain_info.save() - for _domain in related_domains: - domain_rel = RelatedDomain.objects.get_or_create( - name=_domain['name'], - )[0] - db_domain_info.related_domains.add(domain_rel) - - db_domain_info.dnssec = domain_info.get('dnssec') - #dates - db_domain_info.created = domain_info.get('created') - db_domain_info.updated = domain_info.get('updated') - db_domain_info.expires = domain_info.get('expires') - #registrar - db_domain_info.registrar = Registrar.objects.get_or_create( - name=domain_info.get('registrar_name'), - email=domain_info.get('registrar_email'), - phone=domain_info.get('registrar_phone'), - url=domain_info.get('registrar_url'), - )[0] - db_domain_info.registrant = DomainRegistration.objects.get_or_create( - name=domain_info.get('registrant_name'), - organization=domain_info.get('registrant_organization'), - address=domain_info.get('registrant_address'), - city=domain_info.get('registrant_city'), - state=domain_info.get('registrant_state'), - zip_code=domain_info.get('registrant_zip_code'), - country=domain_info.get('registrant_country'), - email=domain_info.get('registrant_email'), - phone=domain_info.get('registrant_phone'), - fax=domain_info.get('registrant_fax'), - id_str=domain_info.get('registrant_id'), - )[0] - db_domain_info.admin = DomainRegistration.objects.get_or_create( - name=domain_info.get('admin_name'), - organization=domain_info.get('admin_organization'), - address=domain_info.get('admin_address'), - city=domain_info.get('admin_city'), - state=domain_info.get('admin_state'), - zip_code=domain_info.get('admin_zip_code'), - country=domain_info.get('admin_country'), - email=domain_info.get('admin_email'), - phone=domain_info.get('admin_phone'), - fax=domain_info.get('admin_fax'), - id_str=domain_info.get('admin_id'), - )[0] - db_domain_info.tech = DomainRegistration.objects.get_or_create( - name=domain_info.get('tech_name'), - organization=domain_info.get('tech_organization'), - address=domain_info.get('tech_address'), - city=domain_info.get('tech_city'), - state=domain_info.get('tech_state'), - zip_code=domain_info.get('tech_zip_code'), - country=domain_info.get('tech_country'), - email=domain_info.get('tech_email'), - phone=domain_info.get('tech_phone'), - fax=domain_info.get('tech_fax'), - id_str=domain_info.get('tech_id'), - )[0] - for status in domain_info.get('status') or []: - _status = WhoisStatus.objects.get_or_create( - name=status - )[0] - _status.save() - db_domain_info.status.add(_status) - - for ns in domain_info.get('ns_records') or []: - _ns = NameServer.objects.get_or_create( - name=ns - )[0] - _ns.save() - db_domain_info.name_servers.add(_ns) - - for a in domain_info.get('a_records') or []: - _a = DNSRecord.objects.get_or_create( - name=a, - type='a' - )[0] - _a.save() - db_domain_info.dns_records.add(_a) - for mx in domain_info.get('mx_records') or []: - _mx = DNSRecord.objects.get_or_create( - name=mx, - type='mx' - )[0] - _mx.save() - db_domain_info.dns_records.add(_mx) - for txt in domain_info.get('txt_records') or []: - _txt = DNSRecord.objects.get_or_create( - name=txt, - type='txt' - )[0] - _txt.save() - db_domain_info.dns_records.add(_txt) - - db_domain_info.geolocation_iso = domain_info.get('registrant_country') - db_domain_info.whois_server = domain_info.get('whois_server') - db_domain_info.save() - domain.domain_info = db_domain_info - domain.save() + + if 'api key doesn\'t exist' in result: + return { + 'status': False, + 'message': 'Invalid Netlas API Key!' + } + + if 'Request limit' in result: + return { + 'status': False, + 'message': 'Netlas request limit exceeded.' + } + + data = json.loads(result) - except Exception as e: + if not data: return { - 'status': False, - 'ip_domain': ip_domain, - 'result': "unable to fetch records from WHOIS database.", - 'message': str(e) + 'status': False, + 'message': 'No data available for the given domain or IP.' } + # if 'whois' not in data: + # return { + # 'status': False, + # 'message': 'Invalid domain or no WHOIS data available.' + # } - return { - 'status': True, - 'ip_domain': ip_domain, - 'dnssec': domain_info.get('dnssec'), - 'created': domain_info.get('created'), - 'updated': domain_info.get('updated'), - 'expires': domain_info.get('expires'), - 'geolocation_iso': domain_info.get('registrant_country'), - 'domain_statuses': domain_info.get('status'), - 'whois_server': domain_info.get('whois_server'), - 'dns': { - 'a': domain_info.get('a_records'), - 'mx': domain_info.get('mx_records'), - 'txt': domain_info.get('txt_records'), - }, - 'registrar': { - 'name': domain_info.get('registrar_name'), - 'phone': domain_info.get('registrar_phone'), - 'email': domain_info.get('registrar_email'), - 'url': domain_info.get('registrar_url'), - }, - 'registrant': { - 'name': domain_info.get('registrant_name'), - 'id': domain_info.get('registrant_id'), - 'organization': domain_info.get('registrant_organization'), - 'address': domain_info.get('registrant_address'), - 'city': domain_info.get('registrant_city'), - 'state': domain_info.get('registrant_state'), - 'zipcode': domain_info.get('registrant_zip_code'), - 'country': domain_info.get('registrant_country'), - 'phone': domain_info.get('registrant_phone'), - 'fax': domain_info.get('registrant_fax'), - 'email': domain_info.get('registrant_email'), - }, - 'admin': { - 'name': domain_info.get('admin_name'), - 'id': domain_info.get('admin_id'), - 'organization': domain_info.get('admin_organization'), - 'address':domain_info.get('admin_address'), - 'city': domain_info.get('admin_city'), - 'state': domain_info.get('admin_state'), - 'zipcode': domain_info.get('admin_zip_code'), - 'country': domain_info.get('admin_country'), - 'phone': domain_info.get('admin_phone'), - 'fax': domain_info.get('admin_fax'), - 'email': domain_info.get('admin_email'), - }, - 'technical_contact': { - 'name': domain_info.get('tech_name'), - 'id': domain_info.get('tech_id'), - 'organization': domain_info.get('tech_organization'), - 'address': domain_info.get('tech_address'), - 'city': domain_info.get('tech_city'), - 'state': domain_info.get('tech_state'), - 'zipcode': domain_info.get('tech_zip_code'), - 'country': domain_info.get('tech_country'), - 'phone': domain_info.get('tech_phone'), - 'fax': domain_info.get('tech_fax'), - 'email': domain_info.get('tech_email'), - }, - 'nameservers': domain_info.get('ns_records'), - # 'similar_domains': domain_info.get('similar_domains'), - 'related_domains': domain_info.get('related_domains'), - 'related_tlds': domain_info.get('related_tlds'), - 'historical_ips': domain_info.get('historical_ips'), - } + return { + 'status': True, + 'data': data + } + except json.JSONDecodeError: + return { + 'status': False, + 'message': 'Failed to parse JSON response from Netlas.' + } + except Exception as e: + return { + 'status': False, + 'message': f'An error occurred while fetching WHOIS data: {str(e)}' + } + @app.task(name='remove_duplicate_endpoints', bind=False, queue='remove_duplicate_endpoints_queue') def remove_duplicate_endpoints( diff --git a/web/static/custom/custom.js b/web/static/custom/custom.js index 1755b0508..490155324 100644 --- a/web/static/custom/custom.js +++ b/web/static/custom/custom.js @@ -1412,7 +1412,7 @@ function get_and_render_subscan_history(subdomain_id, subdomain_name) { function fetch_whois(domain_name, force_reload_whois=false) { // this function will fetch WHOIS record for any subdomain and also display // snackbar once whois is fetched - var url = `/api/tools/whois/?format=json&ip_domain=${domain_name}`; + var url = `/api/tools/whois/?format=json&target=${domain_name}`; if (force_reload_whois) { url+='&is_reload=true' } @@ -1456,79 +1456,113 @@ function fetch_whois(domain_name, force_reload_whois=false) { } function get_target_whois(domain_name) { - var url = `/api/tools/whois/?format=json&ip_domain=${domain_name}` - Swal.fire({ - title: `Fetching WHOIS details for ${domain_name}...` - }); - swal.showLoading(); - fetch(url, { - method: 'GET', - credentials: "same-origin", - headers: { - "X-CSRFToken": getCookie("csrftoken"), - 'Content-Type': 'application/json' - }, - }).then(response => response.json()).then(function(response) { - console.log(response); - if (response.status) { - swal.close(); - display_whois_on_modal(response); - } else { - fetch(`/api/tools/whois/?format=json&ip_domain=${domain_name}`, { - method: 'GET', - credentials: "same-origin", - headers: { - "X-CSRFToken": getCookie("csrftoken"), - 'Content-Type': 'application/json' - }, - }).then(response => response.json()).then(function(response) { - console.log(response); - if (response.status) { - swal.close(); - display_whois_on_modal(response); - } else { - Swal.fire({ - title: 'Oops!', - text: `reNgine could not fetch WHOIS records for ${domain_name}!`, - icon: 'error' - }); - } - }); - } - }); + const url = `/api/tools/whois/?format=json&target=${domain_name}`; + + Swal.fire({ + title: `Fetching WHOIS details for ${domain_name}...`, + allowOutsideClick: false, + showConfirmButton: false, + willOpen: () => { + Swal.showLoading(); + } + }); + + fetch(url, { + method: 'GET', + credentials: "same-origin", + headers: { + "X-CSRFToken": getCookie("csrftoken"), + 'Content-Type': 'application/json' + }, + }) + .then(response => { + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + return response.json(); + }) + .then(data => { + console.log(data); + if (data.status) { + Swal.close(); + display_whois_on_modal(data); + } else { + throw new Error(data.result || 'Failed to fetch WHOIS data'); + } + }) + .catch(error => { + console.error('Error:', error); + let errorMessage = error.message; + if (errorMessage.includes('Netlas limit exceeded')) { + errorMessage = 'Rate limit exceeded. Please try again later.'; + } else if (errorMessage.includes('Invalid domain')) { + errorMessage = 'Invalid domain or no WHOIS data available.'; + } + Swal.fire({ + title: 'Error!', + text: `Failed to fetch WHOIS records for ${domain_name}: ${errorMessage}`, + icon: 'error' + }); + }); +} + +function get_domain_whois(domain_name, show_add_target_btn = false) { + const url = `/api/tools/whois/?format=json&target=${domain_name}`; + + Swal.fire({ + title: `Fetching WHOIS details for ${domain_name}...`, + allowOutsideClick: false, + showConfirmButton: false, + willOpen: () => { + Swal.showLoading(); + } + }); + + $('.modal').modal('hide'); + + fetch(url, { + method: 'GET', + credentials: "same-origin", + headers: { + "X-CSRFToken": getCookie("csrftoken"), + 'Content-Type': 'application/json' + }, + }) + .then(response => { + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + return response.json(); + }) + .then(data => { + console.log(data); + Swal.close(); + if (data.status) { + display_whois_on_modal(data, show_add_target_btn); + } else { + throw new Error(data.result || 'Failed to fetch WHOIS data'); + } + }) + .catch(error => { + console.error('Error:', error); + let errorMessage = error.message; + if (errorMessage.includes('Netlas limit exceeded')) { + errorMessage = 'Rate limit exceeded. Please try again later.'; + } else if (errorMessage.includes('Invalid domain')) { + errorMessage = 'Invalid domain or no WHOIS data available.'; + } + Swal.fire({ + title: 'Error!', + text: `Failed to fetch WHOIS records for ${domain_name}: ${errorMessage}`, + icon: 'error' + }); + }); } -function get_domain_whois(domain_name, show_add_target_btn=false) { - // this function will get whois for domains that are not targets, this will - // not store whois into db nor create target - var url = `/api/tools/whois/?format=json&ip_domain=${domain_name}` - Swal.fire({ - title: `Fetching WHOIS details for ${domain_name}...` - }); - $('.modal').modal('hide'); - swal.showLoading(); - fetch(url, { - method: 'GET', - credentials: "same-origin", - headers: { - "X-CSRFToken": getCookie("csrftoken"), - 'Content-Type': 'application/json' - }, - }).then(response => response.json()).then(function(response) { - swal.close(); - if (response.status) { - display_whois_on_modal(response, show_add_target_btn=show_add_target_btn); - } else { - Swal.fire({ - title: 'Oops!', - text: `reNgine could not fetch WHOIS records for ${domain_name}! ${response['message']}`, - icon: 'error' - }); - } - }); -} + function display_whois_on_modal(response, show_add_target_btn=false) { + console.log(response); // this function will display whois data on modal, should be followed after get_domain_whois() $('#modal_dialog').modal('show'); $('#modal-content').empty(); @@ -1553,7 +1587,7 @@ function display_whois_on_modal(response, show_add_target_btn=false) {