-
Notifications
You must be signed in to change notification settings - Fork 0
/
socks_patch.py
130 lines (114 loc) · 4.41 KB
/
socks_patch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
# monkey patch socks5 support into sockets
import os
import socket
import struct
def _split_proxy(uri, port):
# <DESTPORT1,DESTPORT2,...,DESTPORTN:>[username[:password]@]<PROXYHOST:><PROXYPORT>
split_auth = uri.split("@")
if uri == "":
split_uri = []
elif len(split_auth) == 2:
split_first = split_auth[0].split(":")
split_second = split_auth[1].split(":")
if len(split_first) == 3:
split_uri = split_first + [split_second[0], int(split_second[1])]
else:
split_uri = split_first + [""] + [split_second[0], int(split_second[1])]
else:
split_small = split_auth[0].split(":")
if len(split_small) != 3:
split_uri = []
else:
split_uri = [split_small[0],"",""] + [split_small[1]] + [int(split_small[2])]
if len(split_uri) != 5:
split_uri = None
elif port not in [int(p) for p in split_uri[0].split(",")]:
split_uri = None
else:
# we only care about one port
split_uri[0] = port
return split_uri
def _test_split_proxy():
# multi-port
# user exists
# password exists (force user exists)
# port in port list
# port not in port list
# positive tests
print("positive")
ports = [
["80:",[80]],
["80,90:",[80,90]],
["80,90,100:",[80,90,100]],
]
auths = [
["user:pass@",["user","pass"]],
["user@",["user",""]],
["",["",""]],
]
hosts = [
["host:100",["host",100]],
]
for port in ports:
for expected_port in port[1]:
for auth in auths:
for host in hosts:
teststring = str(port[0]) + auth[0] + host[0]
testlist = [expected_port] + auth[1] + host[1]
print(teststring,testlist,_split_proxy(teststring,expected_port))
assert testlist == _split_proxy(teststring,expected_port)
assert None == _split_proxy("80,90:host:100",100)
assert None == _split_proxy("80:host:100",100)
assert None == _split_proxy("80,90:100",80)
assert None == _split_proxy("80:100",80)
# CAVEATS:
# only supports ipv4
# only supports socks5
# user/pass auth has not been tested
# if socks_proxy env variable is set, all socket connections on that port will use it
class Socks5Socket(socket.socket):
def connect(self, address):
# socks_proxy=<DESTPORT1,DESTPORT2,...,DESTPORTN:>[username[:password]@]<PROXYHOST:><PROXYPORT>
socks_proxy = _split_proxy(os.getenv("socks_proxy",""), address[1])
if not socks_proxy:
true_socket.connect(self, address)
else:
true_socket.connect(self, (socks_proxy[3], socks_proxy[4]))
auth_methods_available = 1
auth_methods = [0x00]
if socks_proxy[1]:
auth_methods_available += 1
auth_methods.append(0x02)
# greet the socks server
msg = struct.pack("!BB",0x05,auth_methods_available)
for auth_method in auth_methods:
msg += struct.pack("!B", auth_method)
self.send(msg)
resp = self.recv(2)
(version, auth_method) = struct.unpack("!BB", resp)
# authorize to the socks server
if auth_method == 0x00:
pass
elif auth_method == 0x02:
# TODO: test this :/
msg = struct.pack("!BBsBs", 0x01, len(socks_proxy[1]), socks_proxy[1], len(socks_proxy[2]), socks_proxy[2])
self.send(msg)
resp = self.recv(2)
(version, status) = struct.unpack("!BB", resp)
if status != 0:
self.close()
raise Exception("socks authorization failed")
else:
raise Exception("no acceptable socks authorization available")
# set connection to tcp/ip stream, ipv4
ipb = [int(b) for b in address[0].split(".")]
msg = struct.pack("!B B B B BBBB H",0x05,0x01,0x00,0x01,ipb[0],ipb[1],ipb[2],ipb[3],address[1])
self.send(msg)
resp = self.recv(10)
(version, status) = struct.unpack("!B B 8x", resp)
if status != 0:
self.close()
raise Exception("socks connection failed, error: " + status)
true_socket = socket.socket
socket.socket = Socks5Socket