-
Notifications
You must be signed in to change notification settings - Fork 1
/
packet.py
208 lines (206 loc) · 8.72 KB
/
packet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import random
import socket
import struct
import zlib
OT_RSA = 109120132967399429278860960508995541528237502902798129123468757937266291492576446330739696001110603907230888610072655818825358503429057592827629436413108566029093628212635953836686562675849720620786279431090218017681061521755056710823876476444260558147179707119674283982419152118103759076030616683978566631413
headerSize = 6
class TibiaPacket(object):
def __init__(self, packetBytes=bytearray()):
self.header = packetBytes[:headerSize]
self.packet = packetBytes[headerSize:]
self.position = 0
self.encryptionPos = 0
'''header'''
def writeHeader(self):
self.header = struct.pack('=HI', len(self.packet) + 4, zlib.adler32(self.packet)) + self.packet
def readHeader(self):
packetSize, adler32Checksum = struct.unpack('=HI', self.header)
self.packetSize = packetSize
self.adler32 = adler32Checksum
return {'packetSize': packetSize, 'adler32Checksum': adler32Checksum}
'''XTEA stuff'''
def xtea_decrypt_block(self, block):
v0, v1 = struct.unpack('=2I', block)
k = struct.unpack('=4I', xtea_key)
delta, mask, rounds = 0x9E3779B9, 0xFFFFFFFF, 32
sum = (delta * rounds) & mask
for round in range(rounds):
v1 = (v1 - (((v0 << 4 ^ v0 >> 5) + v0) ^ (sum + k[sum >> 11 & 3]))) & mask
sum = (sum - delta) & mask
v0 = (v0 - (((v1 << 4 ^ v1 >> 5) + v1) ^ (sum + k[sum & 3]))) & mask
return struct.pack('=2I', v0, v1)
def xtea_decrypt(self):
self.packet = b''.join(self.xtea_decrypt_block(self.packet[i:i + 8]) for i in range(0, len(self.packet), 8))
def trim_size(self):
self.packet = self.packet[2:2+self.packetSize]
'''RSA stuff'''
def setEncryptionPos(self):
self.encryptionPos = len(self.packet)
def rsa_encrypt(self):
m = sum(x*pow(256, i) for i, x in enumerate(reversed(self.packet[self.encryptionPos:])))
c = pow(m, 65537, OT_RSA)
self.packet[self.encryptionPos:] = bytearray((c >> i) & 255 for i in reversed(range(0, 1024, 8)))
self.encryptionPos = 0
def fillBytes(self):
self.packet += bytearray(random.randint(0,255) for i in range(len(self.packet)-self.encryptionPos, 128))
'''writters'''
def writeU8(self, n):
self.packet+=struct.pack('=B', n)
def writeU16(self, n):
self.packet+=struct.pack('=H', n)
def writeU32(self, n):
self.packet+=struct.pack('=I', n)
def writeString(self, s):
if type(s) is str:
s = bytes(str)
stringLength = len(s)
self.writeU16(stringLength)
self.packet += struct.pack('%is' % (stringLength), s)
def writeBytes(self, b):
self.packet+=b
'''readers'''
def getU8(self):
n = self.packet[self.position]
self.position+=1
return n
def getU16(self):
n = struct.unpack('=H', self.packet[self.position:self.position+2])[0]
self.position+=2
return n
def getU32(self):
n = struct.unpack('=I', self.packet[self.position:self.position+4])[0]
self.position+=4
return n
def getString(self):
stringLength = self.getU16()
string = struct.unpack('=%is' % (stringLength), self.packet[self.position:self.position+stringLength])[0]
self.position+=stringLength
return string
def getDouble(self, parameter_list):
raise NotImplementedError
def getPacket(self):
return self.packet
def getWholePacket(self):
return self.header + self.packet
def makeLoginPacket(xtea_key, acc_name, acc_password):
packet = TibiaPacket() #get charlist packet (login)
packet.writeU8(1)
packet.writeU16(2)
packet.writeU16(1100)
packet.writeU32(1100)
packet.writeU32(0x4E12DAFF)
packet.writeU32(0x4E12DB27)
packet.writeU32(0x4E119CBF)
packet.writeU8(0)
packet.setEncryptionPos()
packet.writeU8(0) #0 first RSA byte must be 0
packet.writeBytes(xtea_key) #we're writing XTEA key, ist just a set of bytes so we i have to use dedicated function
packet.writeString(acc_name)
packet.writeString(acc_password)
packet.fillBytes()
packet.rsa_encrypt()
packet.writeHeader()
return packet.getWholePacket()
def makeEnterGamePacket(sessionkey, charname, timestamp, randomNumber):
print(sessionkey, charname, timestamp, randomNumber)
packet = TibiaPacket()
packet.writeU8(10)
packet.writeU16(2)
packet.writeU16(1098)
packet.writeU32(1098)
packet.writeU16(65)
packet.writeU8(0)
packet.setEncryptionPos()
packet.writeU8(0)
packet.writeBytes(xtea_key)
packet.writeU8(0)
packet.writeString(sessionkey)
packet.writeString(charname)
packet.writeU32(timestamp)
packet.writeU8(randomNumber)
packet.fillBytes()
packet.rsa_encrypt()
packet.writeHeader()
return packet.getWholePacket()
def loginPacketHandler(s):
packetBytes = s.recv(headerSize)
packetSize, adler32Checksum = struct.unpack('=HI', packetBytes)
packetBytes += s.recv(packetSize - 2) # U16 size
received_packet = TibiaPacket(packetBytes)
print(received_packet.readHeader())
received_packet.xtea_decrypt()
received_packet.trim_size()
print(received_packet.getPacket())
# for index in range(len(received_packet.getPacket())):
index =0
while received_packet.position < len(received_packet.getPacket()):
packetCode = received_packet.getU8()
if packetCode == 10: #servererror
yield 'servererror', received_packet.getString()
elif packetCode == 11: #loginerror
yield 'loginerror', received_packet.getString()
elif packetCode == 20: #loginservermotd
yield 'loginservermotd', received_packet.getString()
elif packetCode == 40: #session key
global sessionkey
sessionkey = received_packet.getString()
yield 'sessionkey', sessionkey
elif packetCode == 100: #charlist
worlds = {}
worldsCount = received_packet.getU8()
for world in range(worldsCount):
worldId = received_packet.getU8()
worlds[world] = {}
worlds[worldId]['name'] = received_packet.getString()
worlds[worldId]['ip'] = received_packet.getString()
worlds[worldId]['port'] = received_packet.getU16()
worlds[worldId]['previewState'] = received_packet.getU8()
charactersCount = received_packet.getU8()
global characters
for character in range(charactersCount):
worldId = received_packet.getU8()
characters[character] = {}
characters[character]['name'] = received_packet.getString()
characters[character]['worldName'] = worlds[worldId]['name']
characters[character]['worldIp'] = worlds[worldId]['ip']
characters[character]['worldPort'] = worlds[worldId]['port']
characters[character]['previewState'] = worlds[worldId]['previewState']
print('premdays: ', received_packet.getU32() + received_packet.getU32())
yield 'characters', characters
else:
yield 'unknown packet', '%i %d (0x%x)' % (index, packetCode, packetCode)
def handleGamePackets(c):
packetBytes = c.recv(headerSize)
packetSize, adler32Checksum = struct.unpack('=HI', packetBytes)
packetBytes += c.recv(packetSize - 2) # U16 size
received_packet = TibiaPacket(packetBytes)
received_packet.readHeader()
print(received_packet.getPacket())
received_packet.trim_size()
print(received_packet.getPacket())
print('pos', received_packet.position)
while received_packet.position < len(received_packet.getPacket()):
packetCode = received_packet.getU8()
if packetCode == 31: #server challenge
timestamp = received_packet.getU32()
randomNumber = received_packet.getU8()
c.sendall(makeEnterGamePacket(sessionkey, characters[0]['name'], timestamp, randomNumber))
yield 'serverchallenge', {'timestamp': timestamp, 'randomNumber': randomNumber}
yield None, None
acc_name = b'bot1xd'
acc_password = b'dupa123'
characters = {}
sessionkey = None
xtea_key = bytes(random.randint(0,255) for i in range(16))
# print('xtea_key', xtea_key)
with socket.socket() as c:
c.connect(('144.217.149.144', 7171))
c.sendall(makeLoginPacket(xtea_key, acc_name, acc_password))
for packet, data in loginPacketHandler(c):
print(packet, data)
# we have everything, lets login
with socket.socket() as c:# client
print(characters, sessionkey)
c.connect((characters[0]['worldIp'], characters[0]['worldPort']))
for packet, data in handleGamePackets(c):
print(packet, data)