forked from mk-fg/fgtk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ip-ext
executable file
·169 lines (138 loc) · 6.59 KB
/
ip-ext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
import os, sys, socket, logging, contextlib, ipaddress, re, subprocess
class LogMessage:
def __init__(self, fmt, a, k): self.fmt, self.a, self.k = fmt, a, k
def __str__(self): return self.fmt.format(*self.a, **self.k) if self.a or self.k else self.fmt
class LogStyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None):
super(LogStyleAdapter, self).__init__(logger, extra or {})
def log(self, level, msg, *args, **kws):
if not self.isEnabledFor(level): return
log_kws = {} if 'exc_info' not in kws else dict(exc_info=kws.pop('exc_info'))
msg, kws = self.process(msg, kws)
self.logger._log(level, LogMessage(msg, args, kws), (), **log_kws)
get_logger = lambda name: LogStyleAdapter(logging.getLogger(name))
def ipv6_dns(opts):
addr = ipaddress.IPv6Address(opts.ipv6_address).exploded
log.debug('Exploded address: {}', addr)
addr = addr.replace(':', '')
if not opts.djbdns: print('.'.join(reversed(addr)) + '.ip6.arpa')
else: print(addr)
def ipv6_lladdr(opts):
opts.parser.error('This option is not ported from py2 legacy code yet')
if opts.assign and not opts.iface:
parser.error('--assign requires --iface option to be specified.')
if not opts.lladdr and opts.iface:
addrs = netifaces.ifaddresses(opts.iface)
lladdr, = addrs[netifaces.AF_PACKET]
opts.lladdr = lladdr['addr']
log.debug('Got lladdr from interface (%s): %s', opts.iface, opts.lladdr)
ipv6_lladdr = netaddr.EUI(opts.lladdr).ipv6_link_local() # XXX: no EUI in ipaddress
if not opts.assign: print(ipv6_lladdr)
else:
addrs = netifaces.ifaddresses(opts.iface)
for addr in addrs.get(netifaces.AF_INET6, list()):
addr = addr.get('addr', '')
if '%' not in addr: continue
addr = netaddr.IPAddress(addr.split('%', 1)[0])
if addr == ipv6_lladdr:
log.debug('ipv6_lladdr (%s) already'
' assigned to interface, skipping: %s', ipv6_lladdr, opts.iface)
break
else:
if Popen([ 'ip', 'addr', 'add',
'{}/64'.format(ipv6_lladdr), 'dev', opts.iface, 'scope', 'link' ]).wait():
raise RuntimeError( 'Failed to assign link-local'
' address ({}) to interface: {}'.format(ipv6_lladdr, opts.iface) )
log.debug('Assigned ipv6_lladdr (%s) to interface: %s', ipv6_lladdr, opts.iface)
def ip_check(opts):
opts.parser.error('This option is not ported from py2 legacy code yet')
if opts.assign and not opts.iface:
parser.error('--assign requires --iface option to be specified.')
addr_seek = netaddr.IPAddress(opts.addr)
af = {4: netifaces.AF_INET, 6: netifaces.AF_INET6}[addr_seek.version]
found, ifaces = False, [opts.iface] if opts.iface else netifaces.interfaces()
for iface in ifaces:
addrs = netifaces.ifaddresses(iface)
for addr in addrs.get(af, list()):
if 'addr' not in addr: continue
addr = netaddr.IPAddress(addr['addr'].split('%', 1)[0])
if addr == addr_seek:
found = iface
break
if found: break
if found:
log.debug('Address (%s) found on interface: %s', addr_seek, iface)
return 0
elif not opts.assign:
log.debug(
'Address (%s, family: %s) not found on interfaces: %s',
addr_seek, netifaces.address_families[af], ', '.join(ifaces) )
return 1
opts.assign = opts.assign.strip().split()
if opts.assign[0].startswith('/'):
addr = '{}{}'.format(addr_seek, opts.assign[0])
opts.assign = opts.assign[1:]
else: addr = addr_seek.format()
cmd = ['ip', 'addr', 'add', addr, 'dev', opts.iface] + opts.assign
log.debug('Assign command: {}'.format(' '.join(cmd)))
if Popen(cmd).wait():
raise RuntimeError( 'Failed to assign address'
' ({}) to interface: {}'.format(ipv6_lladdr, opts.iface) )
log.debug('Assigned address (%s) to interface: %s', addr, opts.iface)
def iptables_flush(opts):
for ipt in 'iptables', 'ip6tables':
tables = subprocess.check_output(f'{ipt}-save')
tables_restore = subprocess.Popen(f'{ipt}-restore', stdin=subprocess.PIPE)
tables_restore_line = ( lambda line_new=None:
tables_restore.stdin.write((line_new or line).encode() + b'\n') )
for line in tables.decode().splitlines():
if line.startswith('*') or line == 'COMMIT': tables_restore_line()
if re.search('^:[A-Z]+', line): tables_restore_line(f'{line.split()[0]} ACCEPT')
tables_restore.stdin.close()
if tables_restore.wait() != 0:
raise RuntimeError(f'{ipt}-restore exited with error status')
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
description='Tool to manipulate IP addresses, network interfaces and misc settings.')
parser.add_argument('-d', '--debug', action='store_true', help='Verbose operation mode.')
cmds = parser.add_subparsers(title='Supported actions', dest='call')
cmd = cmds.add_parser('ip-check',
help='Check if os/interface has specified address configured.')
cmd.add_argument('addr',
help='IP address to look for. Exits with non-zero status if not found.')
cmd.add_argument('-i', '--iface', metavar='dev', help='Interface to limit check to.')
cmd.add_argument('-x', '--assign', metavar='[/cidr ][ip-add extras]',
help='Assign address to interface, if missing. Requires --iface option to be used.'
' Option argument must specify CIDR netmask and can also have'
' additional command-line parameters to pass to "ip add" line (except for "dev"),'
' examples: "/24", "/24 primary label main", "peer 1.2.3.4 scope global".')
cmd = cmds.add_parser('ipv6-lladdr',
help='Generate and/or assign link-local IPv6 address to the interface.')
cmd.add_argument('-i', '--iface', metavar='dev',
help='Interface name to generate address for.')
cmd.add_argument('-l', '--lladdr', metavar='mac',
help='MAC address to convert to link-local address.')
cmd.add_argument('-x', '--assign', action='store_true',
help='Assign address to interface,'
' unless already there. Requires --iface option to be used.')
cmd = cmds.add_parser('ipv6-dns',
help='Convert IPv6 address to "*.ip6.arpa" (or other) domain name format.')
cmd.add_argument('ipv6_address', help='Address to convert.')
cmd.add_argument('-d', '--djbdns',
action='store_true', help='Convert to canonical djbdns record format instead.')
cmd = cmds.add_parser('iptables-flush',
help='Flush all iptables rules (all chains/tables) using CLI save/restore tools.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
opts.parser = parser
global log
logging.basicConfig(level=logging.DEBUG if opts.debug else logging.WARNING)
log = get_logger('main')
try:
func = dict( ipv6_dns=ipv6_dns, iptables_flush=iptables_flush,
ip_check=ip_check, ipv6_lladdr=ipv6_lladdr )[opts.call.replace('-', '_')]
except KeyError:
parser.error('Action {!r} is not implemented.'.format(opts.call))
return func(opts)
if __name__ == '__main__': sys.exit(main())