-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchecker.py
56 lines (48 loc) · 1.65 KB
/
checker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import zmq
import sys
import binascii
import struct
def handle(data):
topic, body, seq = data
sequence = 'Unknown'
if len(seq) == 4:
sequence = str(struct.unpack('<I', seq)[-1])
if topic == b'hashblock':
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body))
elif topic == b'hashtx':
print('- HASH TX ('+sequence+') -')
print(binascii.hexlify(body))
elif topic == b'rawblock':
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]))
elif topic == b'rawtx':
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body)[:32])
elif topic == b'sequence':
hash = binascii.hexlify(body[:32])
label = chr(body[32])
mempool_sequence = None if len(
body) != 32+1+8 else struct.unpack('<Q', body[32+1:])[0]
print('- SEQUENCE ('+sequence+') -')
print(hash, label, mempool_sequence)
def main():
if len(sys.argv) != 2:
print(f'Usage: {sys.argv[0]} <bitcoind_zmq_url>')
sys.exit(1)
bitcoind_zmq_url = sys.argv[1]
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.RCVHWM, 0)
socket.setsockopt_string(zmq.SUBSCRIBE, 'hashblock')
socket.setsockopt_string(zmq.SUBSCRIBE, 'hashtx')
socket.setsockopt_string(zmq.SUBSCRIBE, 'rawblock')
socket.setsockopt_string(zmq.SUBSCRIBE, 'rawtx')
socket.setsockopt_string(zmq.SUBSCRIBE, 'sequence')
socket.connect(bitcoind_zmq_url)
while True:
data = socket.recv_multipart()
print('data:', len(data))
handle(data)
if __name__ == '__main__':
main()