Skip to content

Commit

Permalink
better registrar_abusemail
Browse files Browse the repository at this point in the history
  • Loading branch information
e3rd committed Jan 21, 2025
1 parent 004b2c0 commit 8532e0a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 25 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# CHANGELOG

## 1.4.5 (2025-01-13)
## 1.4.7 (2025-01-21)
* drop Python3.9 support
* fix: Setting field type by `--type` suppresses the auto-detection.
* fix: split by CIDR
Expand Down
2 changes: 1 addition & 1 deletion convey/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .decorators import PickMethod, PickInput

__all__ = ["PickMethod", "PickInput"]
__version__ = "1.4.6"
__version__ = "1.4.7"
19 changes: 10 additions & 9 deletions convey/args_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from . import __version__

otrs_flags = [("otrs_id", "Ticket id"), ("otrs_num", "Ticket num"), ("otrs_cookie", "OTRSAgentInterface cookie"),
("otrs_token", "OTRS challenge token")]
("otrs_token", "OTRS challenge token")]

new_fields: List[Tuple[bool, Any]] = []
"User has requested to compute these. Defined by tuples: add (whether to include the column in the result), field definition"


class BlankTrue(argparse.Action):
""" When left blank, this flag produces True. (Normal behaviour is to produce None which I use for not being set.)
Return boolean for 0/false/off/1/true/on.
Expand Down Expand Up @@ -44,6 +45,7 @@ class BlankTrueString(BlankTrue):
def __call__(self, *args, **kwargs):
super().__call__(*args, **kwargs, allow_string=True)


class FieldExcludedAppend(argparse.Action):
def __call__(self, _, namespace, values, option_string=None):
new_fields.append((False, values))
Expand All @@ -63,7 +65,6 @@ def _split_lines(self, text, width):
return argparse.HelpFormatter._split_lines(self, text, width)



class ArgsController:
def parse_args(self):
epilog = "To launch a web service see README.md."
Expand Down Expand Up @@ -136,13 +137,13 @@ def parse_args(self):

parser.add_argument('--server', help=f"Launches simple web server", action="store_true")
parser.add_argument('--daemon', help=f"R|Run a UNIX socket daemon to speed up single query requests."
"\n * 1/true/on – allow using the daemon"
"\n * 0/false/off – do not use the daemon"
"\n * start – start the daemon and exit"
"\n * stop – stop the daemon and exit"
"\n * status – print out the status of the daemon"
"\n * restart – restart the daemon and continue"
"\n * server – run the server in current process (I.E. for debugging)",
"\n * 1/true/on – allow using the daemon"
"\n * 0/false/off – do not use the daemon"
"\n * start – start the daemon and exit"
"\n * stop – stop the daemon and exit"
"\n * status – print out the status of the daemon"
"\n * restart – restart the daemon and continue"
"\n * server – run the server in current process (I.E. for debugging)",
action=BlankTrue, nargs="?", metavar="start/restart/stop/status/server")

group = parser.add_argument_group("CSV dialect")
Expand Down
2 changes: 1 addition & 1 deletion convey/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def _get_methods():
(t.whois, t.prefix): lambda x: str(x.get[0]),
(t.whois, t.asn): lambda x: x.get[3],
(t.whois, t.abusemail): lambda x: x.get[6],
(t.whoisdomain, t.registrar_abusemail): lambda x: x.get[8],
(t.whoisdomain, t.registrar_abusemail): lambda x: x.get[6],
(t.whois, t.country): lambda x: x.get[5],
(t.whois, t.netname): lambda x: x.get[4],
(t.whois, t.csirt_contact):
Expand Down
34 changes: 21 additions & 13 deletions convey/whois.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timedelta
from subprocess import PIPE, Popen
from time import time, sleep
from typing import Literal

from netaddr import IPRange, IPNetwork
from tldextract import tldextract
Expand All @@ -16,6 +17,16 @@

whole_space = IPRange('0.0.0.0', '255.255.255.255')

Ttl = int
Asn = NetName = Country = str
Email = str
CountryMail = Email
AbuseMail = Email
IncidentContact = AbuseMail | CountryMail
Location = Literal["local", "abroad"]
Prefix = str
AnalysisResult = tuple[Prefix, Location, IncidentContact, Asn, NetName, Country, AbuseMail, Ttl]


class Quota:
def __init__(self):
Expand Down Expand Up @@ -77,9 +88,7 @@ def init(cls, stats, ranges, ip_seen, csvstats, slow_mode=False, unknown_mode=Fa
# Whois.servers[name] = val

def __init__(self, ip, hostname=None):
"""
self.get stores tuple: prefix, location, mail, asn, netname, country, ttl
"""
""" Access self.get for AnalyzisResult. """
self.ip = ip
self.hostname = hostname
self.whois_response = []
Expand Down Expand Up @@ -110,7 +119,7 @@ def __init__(self, ip, hostname=None):
if self.see:
print("waiting 7 seconds... ", end="", flush=True)
sleep(7)
get = self.analyze() # prefix, location, mail, asn, netname, country...
get: AnalysisResult = self.analyze() # AnalyzisResult: prefix, location, mail, asn, netname, country...
if self.see:
print(get[2] or "no incident contact.")
prefix = get[0]
Expand Down Expand Up @@ -246,10 +255,7 @@ def _match_response(self, patterns, last_word=False):
# return line
# return "" # no grep result found

def analyze(self):
"""
:return: prefix, "local"|"abroad", incident-contact ( = abuse-mail|country), asn, netname, country, abuse-mail, TTL
"""
def analyze(self) -> AnalysisResult:
prefix = country = ""

for server in list(self.servers):
Expand Down Expand Up @@ -359,7 +365,6 @@ def analyze(self):
asn = self._match_response(r'\norigin(.*)\d+', last_word=True)
netname = self._match_response([r'netname:\s*([^\s]*)', r'network:network-name:\s*([^\s]*)'])

registrar_ab = self.get_registrar_abusemail()
ab = self.get_abusemail()
if Whois.unknown_mode and not ab:
ab = self.resolve_unknown_mail()
Expand All @@ -372,7 +377,7 @@ def analyze(self):
else:
get1 = "local"
get2 = ab
return prefix, get1, get2, asn, netname, country, ab, int(time()), registrar_ab
return prefix, get1, get2, asn, netname, country, ab, int(time())

def _load_country_from_addresses(self):
# let's try to find country in the non-standardised address field
Expand All @@ -388,16 +393,19 @@ def _load_country_from_addresses(self):
reAbuse = re.compile(email_regex)

def get_abusemail(self):
""" Loads abusemail from last whois response OR from whois json api. """
""" Loads abusemail from last whois response """
match = self.reAbuse.search(self._match_response(['% abuse contact for.*',
'orgabuseemail.*',
'abuse-mailbox.*',
"e-mail:.*" # whois 179.50.80.0/21
"e-mail:.*", # whois 179.50.80.0/21,
"email:.*" # ex: 'Registrar Abuse Contact Email: domainabuse@tucows.com',
]))
return match.group(0) if match else ""

def get_registrar_abusemail(self):
"""Loads registrar's abusemail from last whois response OR from whois json api."""
"""Loads registrar's abusemail from last whois response.
Grep a line where both 'abuse', 'registrar' and e-mail are present.
"""

abuse_line_pattern = r"^(?=.*\babuse\b)(?=.*\bregistrar\b).*"
matches = re.findall(abuse_line_pattern, self.whois_response[0], flags=re.MULTILINE)
Expand Down

0 comments on commit 8532e0a

Please sign in to comment.