Skip to content

Commit

Permalink
feat: implement reliable transport (#57)
Browse files Browse the repository at this point in the history
# Checklist

* [x] I have read the contribution guidelines
* [x] Iff you changed code related to services, or inter-service
communication, make sure you update the diagrams in `ARCHITECTURE.md`.
* [x] Reference issue for this pull request: #47

# Description

In this PR I implement the reliability algorithm for the OpenVPN
protocol:

- Each reliable packet emitted is subject to retransmission if it's not
acked in time. Retransmission follows an exponential backoff - beginning
at 2 seconds, with a cap at 60s.
- Incoming packets are subject to reordering before being passed to TLS.
- Incoming packets are passed to reliable.moveDownWorker to send ACKs if
there's a packet in the outgoing queue.

---------

Co-authored-by: Simone Basso <bassosimone@gmail.com>
  • Loading branch information
ainghazal and bassosimone authored Feb 2, 2024
1 parent f359d87 commit 86b9861
Show file tree
Hide file tree
Showing 17 changed files with 2,016 additions and 255 deletions.
30 changes: 30 additions & 0 deletions internal/model/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,33 @@ func (p *Packet) IsControl() bool {
func (p *Packet) IsData() bool {
return p.Opcode.IsData()
}

const (
DirectionIncoming = iota
DirectionOutgoing
)

// Log writes an entry in the passed logger with a representation of this packet.
func (p *Packet) Log(logger Logger, direction int) {
var dir string
switch direction {
case DirectionIncoming:
dir = "<"
case DirectionOutgoing:
dir = ">"
default:
logger.Warnf("wrong direction: %d", direction)
return
}

logger.Debugf(
"%s %s {id=%d, acks=%v} localID=%x remoteID=%x [%d bytes]",
dir,
p.Opcode,
p.ID,
p.ACKs,
p.LocalSessionID,
p.RemoteSessionID,
len(p.Payload),
)
}
89 changes: 50 additions & 39 deletions internal/packetmuxer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
package packetmuxer

import (
"errors"
"fmt"
"time"

"github.com/ooni/minivpn/internal/model"
"github.com/ooni/minivpn/internal/session"
Expand All @@ -13,6 +15,11 @@ var (
serviceName = "packetmuxer"
)

const (
// A sufficiently long wakup period to initialize a ticker with.
longWakeup = time.Hour * 24 * 30
)

// Service is the packetmuxer service. Make sure you initialize
// the channels before invoking [Service.StartWorkers].
type Service struct {
Expand Down Expand Up @@ -48,12 +55,14 @@ func (s *Service) StartWorkers(
sessionManager *session.Manager,
) {
ws := &workersState{
logger: logger,
hardReset: s.HardReset,
logger: logger,
hardReset: s.HardReset,
// initialize to a sufficiently long time from now
hardResetTicker: time.NewTicker(longWakeup),
notifyTLS: *s.NotifyTLS,
dataOrControlToMuxer: s.DataOrControlToMuxer,
muxerToReliable: *s.MuxerToReliable,
muxerToData: *s.MuxerToData,
dataOrControlToMuxer: s.DataOrControlToMuxer,
muxerToNetwork: *s.MuxerToNetwork,
networkToMuxer: s.NetworkToMuxer,
sessionManager: sessionManager,
Expand All @@ -71,6 +80,12 @@ type workersState struct {
// hardReset is the channel posted to force a hard reset.
hardReset <-chan any

// how many times have we sent the initial hardReset packet
hardResetCount int

// hardResetTicker is a channel to retry the initial send of hard reset packet.
hardResetTicker *time.Ticker

// notifyTLS is used to send notifications to the TLS service.
notifyTLS chan<- *model.Notification

Expand Down Expand Up @@ -116,6 +131,13 @@ func (ws *workersState) moveUpWorker() {
return
}

case <-ws.hardResetTicker.C:
// retry the hard reset, it probably was lost
if err := ws.startHardReset(); err != nil {
// error already logged
return
}

case <-ws.hardReset:
if err := ws.startHardReset(); err != nil {
// error already logged
Expand Down Expand Up @@ -168,17 +190,18 @@ func (ws *workersState) moveDownWorker() {

// startHardReset is invoked when we need to perform a HARD RESET.
func (ws *workersState) startHardReset() error {
ws.hardResetCount += 1

// emit a CONTROL_HARD_RESET_CLIENT_V2 pkt
packet, err := ws.sessionManager.NewPacket(model.P_CONTROL_HARD_RESET_CLIENT_V2, nil)
if err != nil {
ws.logger.Warnf("packetmuxer: NewPacket: %s", err.Error())
return err
}
packet := ws.sessionManager.NewHardResetPacket()
if err := ws.serializeAndEmit(packet); err != nil {
return err
}

// reset the state to become initial again
// resend if not received the server's reply in 2 seconds.
ws.hardResetTicker.Reset(time.Second * 2)

// reset the state to become initial again.
ws.sessionManager.SetNegotiationState(session.S_PRE_START)

// TODO: any other change to apply in this case?
Expand All @@ -198,11 +221,11 @@ func (ws *workersState) handleRawPacket(rawPacket []byte) error {
// handle the case where we're performing a HARD_RESET
if ws.sessionManager.NegotiationState() == session.S_PRE_START &&
packet.Opcode == model.P_CONTROL_HARD_RESET_SERVER_V2 {
packet.Log(ws.logger, model.DirectionIncoming)
ws.hardResetTicker.Stop()
return ws.finishThreeWayHandshake(packet)
}

// TODO: introduce other sanity checks here

// multiplex the incoming packet POSSIBLY BLOCKING on delivering it
if packet.IsControl() || packet.Opcode == model.P_ACK_V1 {
select {
Expand All @@ -211,6 +234,14 @@ func (ws *workersState) handleRawPacket(rawPacket []byte) error {
return workers.ErrShutdown
}
} else {
if ws.sessionManager.NegotiationState() < session.S_GENERATED_KEYS {
// A well-behaved server should not send us data packets
// before we have a working session. Under normal operations, the
// connection in the client side should pick a different port,
// so that data sent from previous sessions will not be delivered.
// However, it does not harm to be defensive here.
return errors.New("not ready to handle data")
}
select {
case ws.muxerToData <- packet:
case <-ws.workersManager.ShouldShutdown():
Expand All @@ -226,29 +257,16 @@ func (ws *workersState) finishThreeWayHandshake(packet *model.Packet) error {
// register the server's session (note: the PoV is the server's one)
ws.sessionManager.SetRemoteSessionID(packet.LocalSessionID)

// we need to manually ACK because the reliable layer is above us
ws.logger.Debugf(
"< %s localID=%x remoteID=%x [%d bytes]",
packet.Opcode,
packet.LocalSessionID,
packet.RemoteSessionID,
len(packet.Payload),
)

// create the ACK packet
ACK, err := ws.sessionManager.NewACKForPacket(packet)
if err != nil {
return err
}

// emit the packet
if err := ws.serializeAndEmit(ACK); err != nil {
return err
}

// advance the state
ws.sessionManager.SetNegotiationState(session.S_START)

// pass the packet up so that we can ack it properly
select {
case ws.muxerToReliable <- packet:
case <-ws.workersManager.ShouldShutdown():
return workers.ErrShutdown
}

// attempt to tell TLS we want to handshake.
// This WILL BLOCK if the notifyTLS channel
// is Full, but we make sure we control that we don't pass spurious soft-reset packets while we're
Expand Down Expand Up @@ -280,13 +298,6 @@ func (ws *workersState) serializeAndEmit(packet *model.Packet) error {
return workers.ErrShutdown
}

ws.logger.Debugf(
"> %s localID=%x remoteID=%x [%d bytes]",
packet.Opcode,
packet.LocalSessionID,
packet.RemoteSessionID,
len(packet.Payload),
)

packet.Log(ws.logger, model.DirectionOutgoing)
return nil
}
26 changes: 26 additions & 0 deletions internal/reliabletransport/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package reliabletransport

const (
// Capacity for the array of packets that we're tracking at any given moment (outgoing).
// This is defined by OpenVPN in ssl_pkt.h
RELIABLE_SEND_BUFFER_SIZE = 6

// Capacity for the array of packets that we're tracking at any given moment (incoming).
// This is defined by OpenVPN in ssl_pkt.h
RELIABLE_RECV_BUFFER_SIZE = 12

// The maximum numbers of ACKs that we put in an array for an outgoing packet.
MAX_ACKS_PER_OUTGOING_PACKET = 4

// How many IDs pending to be acked can we store.
ACK_SET_CAPACITY = 8

// Initial timeout for TLS retransmission, in seconds.
INITIAL_TLS_TIMEOUT_SECONDS = 2

// Maximum backoff interval, in seconds.
MAX_BACKOFF_SECONDS = 60

// Default sender ticker period, in milliseconds.
SENDER_TICKER_MS = 1000 * 60
)
6 changes: 6 additions & 0 deletions internal/reliabletransport/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Package reliabletransport implements the reliable transport module for OpenVPN.
// See [the official documentation](https://community.openvpn.net/openvpn/wiki/SecurityOverview) for a detailed explanation
// of why this is needed, and how it relates to the requirements of the control channel.
// It is worth to mention that, even though the original need is to have a reliable control channel
// on top of UDP, this is also used when tunneling over TCP.
package reliabletransport
54 changes: 54 additions & 0 deletions internal/reliabletransport/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package reliabletransport

import (
"github.com/ooni/minivpn/internal/model"
)

// sequentialPacket is a packet that can return a [model.PacketID].
type sequentialPacket interface {
ID() model.PacketID
ExtractACKs() []model.PacketID
Packet() *model.Packet
}

// retransmissionPacket is a packet that can be scheduled for retransmission.
type retransmissionPacket interface {
ScheduleForRetransmission()
}

type outgoingPacketWriter interface {
// TryInsertOutgoingPacket attempts to insert a packet into the
// inflight queue. If return value is false, insertion was not successful (e.g., too many
// packets in flight).
TryInsertOutgoingPacket(*model.Packet) bool
}

type seenPacketHandler interface {
// OnIncomingPacketSeen processes a notification received in the shared lateral channel where receiver
// notifies sender of incoming packets. There are two side-effects expected from this call:
// 1. The ID in incomingPacketSeen needs to be appended to the array of packets pending to be acked, if not already
// there. This insertion needs to be reflected by NextPacketIDsToACK()
// 2. Any ACK values in the incomingPacketSeen need to:
// a) evict the matching packet, if existing in the in flight queue, and
// b) increment the counter of acks-with-higher-pid for each packet with a lesser
// packet id (used for fast retransmission)
OnIncomingPacketSeen(incomingPacketSeen)
}

type outgoingPacketHandler interface {
// NextPacketIDsToACK returns an array of pending IDs to ACK to
// our remote. The length of this array MUST NOT be larger than CONTROL_SEND_ACK_MAX.
// This is used to append it to the ACK array of the outgoing packet.
NextPacketIDsToACK() []model.PacketID
}

// incomingPacketHandler knows how to deal with incoming packets (going up).
type incomingPacketHandler interface {
// MaybeInsertIncoming will insert a given packet in the reliable
// incoming queue if it passes a series of sanity checks.
MaybeInsertIncoming(*model.Packet) bool

// NextIncomingSequence gets the largest sequence of packets ready to be passed along
// to the control channel above us.
NextIncomingSequence() incomingSequence
}
Loading

0 comments on commit 86b9861

Please sign in to comment.