Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: add utility script to convert pcap into test sequence #62

Merged
merged 8 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/reliabletransport/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package reliabletransport
import (
"bytes"
"fmt"
"log"
"sort"

"github.com/ooni/minivpn/internal/model"
Expand Down
119 changes: 119 additions & 0 deletions scripts/get_trace_from_pcap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#!/usr/bin/env python3
#
ainghazal marked this conversation as resolved.
Show resolved Hide resolved
# Parse an OpenVPN handshake pcap file, extract relevant fields from the json,
# and return a compact representation of the packets in the handshake that can be
# used for testing minivpn's implementation.
#
# This script depends on tshark.
ainghazal marked this conversation as resolved.
Show resolved Hide resolved
#
# Usage:
#
# There are two subcommands: json | sequence
# - json dumps the json representation of a subset of the handshake
# - sequence outputs a test sequence that can be used to write unit tests.
#
# Examples:
#
# python3 get_trace_from_pcap.py good-handshake.pcapng json | jq
# python3 get_trace_from_pcap.py good-handshake.pcapng sequence | nl
#

import json
import subprocess
import sys

opcodes = {
'0x01': 'CONTROL_HARD_RESET_CLIENT_V1',
'0x02': 'CONTROL_HARD_RESET_SERVER_V1',
'0x03': 'CONTROL_SOFT_RESET_V1',
'0x04': 'CONTROL_V1',
'0x05': 'ACK_V1',
'0x06': 'DATA_V1',
'0x07': 'CONTROL_HARD_RESET_CLIENT_V2',
'0x08': 'CONTROL_HARD_RESET_SERVER_V2',
'0x09': 'DATA_V2'
}

def process_tshark_output(data):
packets = []
ips = {}

for packet in data:
ip = packet['_source']['layers']['ip']
udp = packet['_source']['layers']['udp']

# TODO(ainghazal): do sanity check here and verify all of them belong to the same UDP stream.

openvpn = packet['_source']['layers']['openvpn']

time_relative = udp['Timestamps']['udp.time_relative']
time_delta = udp['Timestamps']['udp.time_delta']

ip_src = ip['ip.src']
if len(ips) == 0:
ips[ip_src] = 'client'

ip_dst = ip['ip.dst']
if len(ips) == 1:
ips[ip_dst] = 'server'

packets.append({
'time_relative': time_relative,
'time_delta': time_delta,
'from': ips[ip_src],
'to': ips[ip_dst],
'openvpn': openvpn,
})

return packets


def sequence_from_packets(packets):
for i, packet in enumerate(packets):
if packet['from'] == 'client':
dir = '>'
else:
dir = '<'

packet_id = packet['openvpn'].get('openvpn.mpid', 0)
opcode = opcodes[packet['openvpn']['openvpn.type_tree']['openvpn.opcode']]

acks = []
ack_len = packet['openvpn'].get('openvpn.mpidarraylength')
if ack_len is not None and int(ack_len) != 0:
acks = packet['openvpn']['Packet-ID Array']['openvpn.mpidarrayelement']

if len(acks) > 0:
ack_str = ','.join([str(ack) for ack in acks])
else:
ack_str = ''

try:
# get the inter-arrival time until the next packet in the
# handshake arrives. in the unit tests, we specify this as IAT
# for a TestPacket, since we want the packet writer to sleep
# for this amount of time.
next_packet_ts = float(packets[i+1].get('time_delta')) * 1000
except IndexError:
next_packet_ts = 0

print(f"{dir} [{packet_id}] {opcode} (acks:{ack_str}) +{next_packet_ts:.8f}ms")


if __name__ == "__main__":
pcap = sys.argv[1]
subcmd = sys.argv[2]

command = f"tshark -r {pcap} -T json --no-duplicate-keys"
out = subprocess.check_output(command, shell=True)
output_str = out.decode('utf-8')
data = json.loads(output_str)
packets = process_tshark_output(data)

if subcmd == "json":
print(json.dumps(packets))
sys.exit(0)

if subcmd == "sequence":
sequence_from_packets(packets)
ainghazal marked this conversation as resolved.
Show resolved Hide resolved
sys.exit(0)
Loading