-
-
Notifications
You must be signed in to change notification settings - Fork 406
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
irc: ISupport class and basic parsing
- Loading branch information
Showing
2 changed files
with
603 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
# coding=utf-8 | ||
"""IRC Tools for ISUPPORT management. | ||
When a server wants to advertise its features and settings, it can use the | ||
``RPL_ISUPPORT`` command with a list of arguments. | ||
""" | ||
# Copyright 2019, Florian Strzelecki <florian.strzelecki@gmail.com> | ||
# | ||
# Licensed under the Eiffel Forum License 2. | ||
from __future__ import unicode_literals, absolute_import, print_function, division | ||
|
||
import functools | ||
import re | ||
|
||
|
||
def _optional(parser, default=None): | ||
# set a parser as optional: will always return the default value provided if value is None | ||
@functools.wraps(parser) | ||
def wrapped(value): | ||
if not value: | ||
return default | ||
return parser(value) | ||
return wrapped | ||
|
||
|
||
def _no_value(value): | ||
# always ignore the value | ||
return None | ||
|
||
|
||
def _single_character(value): | ||
if len(value) > 1: | ||
raise ValueError('Too many characters: %r.' % value) | ||
|
||
return value | ||
|
||
|
||
def _map_items(parser=str, map_separator=',', item_separator=':'): | ||
@functools.wraps(parser) | ||
def wrapped(value): | ||
items = sorted( | ||
item.split(item_separator) | ||
for item in value.split(map_separator)) | ||
|
||
return tuple( | ||
(k, parser(v) if v else None) | ||
for k, v in items | ||
) | ||
return wrapped | ||
|
||
|
||
def _parse_chanmodes(value): | ||
items = value.split(',') | ||
|
||
if len(items) < 4: | ||
raise ValueError('Not enough channel types to unpack from %r.' % value) | ||
|
||
# add extra channel type's modes to their own tuple | ||
# result in (A, B, C, D, (E, F, G, H, ..., Z)) | ||
# where A, B, C, D = result[:4] | ||
# and extras = result[4] | ||
return tuple(items[:4]) + (tuple(items[4:]),) | ||
|
||
|
||
def _parse_elist(value): | ||
# letters are case-insensitives | ||
return tuple(sorted(set(letter.upper() for letter in value))) | ||
|
||
|
||
def _parse_extban(value): | ||
args = value.split(',') | ||
|
||
if len(args) < 2: | ||
raise ValueError('Invalid value for EXTBAN: %r.' % value) | ||
|
||
prefix = args[0] or None | ||
items = tuple(sorted(set(args[1]))) | ||
|
||
return (prefix, items) | ||
|
||
|
||
def _parse_prefix(value): | ||
result = re.match(r'\((?P<modes>\S+)\)(?P<prefixes>\S+)', value) | ||
|
||
if not result: | ||
raise ValueError('Invalid value for PREFIX: %r' % value) | ||
|
||
modes = result.group('modes') | ||
prefixes = result.group('prefixes') | ||
|
||
if len(modes) != len(prefixes): | ||
raise ValueError('Mode list does not match for PREFIX: %r' % value) | ||
|
||
return tuple(sorted(zip(modes, prefixes))) | ||
|
||
|
||
ISUPPORT_PARSER = { | ||
'AWAYLEN': int, | ||
'CASEMAPPING': str, | ||
'CHANLIMIT': _map_items(int), | ||
'CHANMODES': _parse_chanmodes, | ||
'CHANNELLEN': int, | ||
'CHANTYPES': _optional(tuple), | ||
'ELIST': _parse_elist, | ||
'EXCEPTS': _optional(_single_character, default='e'), | ||
'EXTBAN': _parse_extban, | ||
'HOSTLEN': int, | ||
'INVEX': _optional(_single_character, default='I'), | ||
'KICKLEN': int, | ||
'MAXLIST': _map_items(int), | ||
'MAXTARGETS': _optional(int), | ||
'MODES': _optional(int), | ||
'NETWORK': str, | ||
'NICKLEN': int, | ||
'PREFIX': _optional(_parse_prefix), | ||
'SAFELIST': _no_value, | ||
'SILENCE': _optional(int), | ||
'STATUSMSG': _optional(tuple), | ||
'TARGMAX': _optional(_map_items(int)), | ||
'TOPICLEN': int, | ||
'USERLEN': int, | ||
} | ||
|
||
|
||
def parse_parameter(arg): | ||
items = arg.split('=', 1) | ||
if len(items) == 2: | ||
key, value = items | ||
else: | ||
key, value = items[0], None | ||
|
||
if key.startswith('-'): | ||
# ignore value for removed parameters | ||
return (key, None) | ||
|
||
parser = ISUPPORT_PARSER.get(key, str) | ||
return (key, parser(value)) | ||
|
||
|
||
class ISupport(object): | ||
"""Storage class for IRC's ``ISUPPORT`` feature.""" | ||
def __init__(self, **kwargs): | ||
self.__isupport = dict( | ||
(key.upper(), value) | ||
for key, value in kwargs.items() | ||
if not key.startswith('-')) | ||
|
||
def __getitem__(self, key): | ||
key_ci = key.upper() | ||
if key_ci not in self.__isupport: | ||
raise KeyError(key_ci) | ||
return self.__isupport[key_ci] | ||
|
||
def __contains__(self, key): | ||
return key.upper() in self.__isupport | ||
|
||
@property | ||
def CHANLIMIT(self): | ||
if 'CHANLIMIT' not in self: | ||
raise AttributeError('CHANLIMIT') | ||
|
||
return dict(self['CHANLIMIT']) | ||
|
||
@property | ||
def CHANMODES(self): | ||
if 'CHANMODES' not in self: | ||
raise AttributeError('CHANMODES') | ||
|
||
return dict(zip('ABCD', self['CHANMODES'][:4])) | ||
|
||
@property | ||
def MAXLIST(self): | ||
if 'MAXLIST' not in self: | ||
raise AttributeError('MAXLIST') | ||
|
||
return dict(self['MAXLIST']) | ||
|
||
@property | ||
def PREFIX(self): | ||
if 'PREFIX' not in self: | ||
raise AttributeError('PREFIX') | ||
|
||
return dict(self['PREFIX']) | ||
|
||
@property | ||
def TARGMAX(self): | ||
if 'TARGMAX' not in self: | ||
raise AttributeError('TARGMAX') | ||
|
||
targmax = self['TARGMAX'] | ||
|
||
if targmax is None: | ||
return {} | ||
|
||
return dict(self['TARGMAX'] or []) |
Oops, something went wrong.