Skip to content

Commit

Permalink
refactor: split out UDP association handling from serving (#221)
Browse files Browse the repository at this point in the history
* refactor: split UDP serving from handling of packets

This will allow us to re-use the handling of packets in the Caddy
server where serving is handled separately.

* Use a read channel.

* Rename `wrappedPacketConn` to just `packetConn`.

* Update `shadowsocks_handler`.

* Change the `HandlePacket` API to not require the `pkt`.

* Remove the NAT map from the `Handle()` function.

Instead, make the connection an association by wrapping it with the
client address and pass that to the `Handle()` function. Each connection
will call `Handle()` only once.

* Rework the metrics now that the NAT is no longer done by the handler.

* Move the metrics `AddClosed()` call to after the `timedCopy` returns.

* Remove the explicit `targetConn.Close()` and let it expire on its own.

* Add the NAT metrics back in at the server level.

* Use buffer pool on `packetHandler` instead of global.

* Revert wrapping the `clientConn` with shadowsocks encryption/decryption.

* Rename `packet` to `association`.

* Fix tests.

* Update the docstring for `PacketServe`.

* Fix comment to refer to "associations".

* Fix metrics test.

* Use `slicepool`.

* Let the assocation handler provide the buffer.

* Remove the `packetConnWrapper` and move the logic into `natconn` instead.

* Fix close while reading of `natconn`.

* Simplify the natmap a little.

* Refactor `PacketServe` to use events (close and read).

* Use correct logger.

* Keep `readCh` and `closeCh` unbuffered.

* Catch panics in the `ReadFrom` go routine.

* Close the `readCh` instead of sending the error on the `readCh`.

* Wrap a logger with the association's client address so we can simplify the `timedCopy()` signature.

* Reference GitHub issue for supporting multiple IPs.

* Simplify packet handling with a new `association` struct.

* Add some comments to the `Association` interface.

* Consolidate debug logging.

* Rename some vars.

* Update doc.

* Format.

* Do not unpack first packets twice.

* Move handling into the association.

* Add some comments to the timeout value.

* Don't set the stream dialer in the old config flow.

* Rename `AddAuthentication` and `AddClose`.

* Update comment to reflect it handles packets from both directions.

* Separate the interfaces for `Handle()` and `HandlePacket()`.

* Fix typo in `UDPAssocationMetrics`.

* Don't pass `conn` to `Handle()`.

* Update comment.

* Add comments.

* Remove ConnAssociation in favor of a `HandleAssociation(Conn, PacketAssociation)`.

* Split `Service` interface into outline-ss-server and Caddy interfaces.

* Remove unused const.

* Don't require `conn` in `HandleAssociation()`.

* Exit the loop if the connection is closed.

* Re-use global buffer pool.

* Remove app-specific interfaces.

Instead move the `HandleAssociation` logic into the Caddy Shadowsocks handler.

* Decouple the association and the packet handling.

* Move timedCopy handling to the packet handler.

* Remove unused property.

* Decouple shadowsocks from association.

* Move authentication into its own function.

* Remove the `packetMetrics` struct.

We'll need to decouple metrics from handling in some future change.

* Update tests.

* Remove the `Metrics()` method.

* Move variables into the anonymous functions.

* Rename stream and packet handlers `Handle()` methods.

* More `handle` naming clarification.

* Refactor to make packet handler an association handler.

* Only handle the association if it was new.

* Fix the metric race condition in tests.

* Move `AddNATEntry()` call to new entry only.

* Format.

* Address review comments.

* Make `clientConn` an `io.Writer`.

* Handle the `EOF` case and stop reading from the connection.

* Address review comments.

* Remove debug log.
  • Loading branch information
sbruens authored Jan 24, 2025
1 parent 98db5b4 commit 6a15c34
Show file tree
Hide file tree
Showing 13 changed files with 870 additions and 657 deletions.
28 changes: 16 additions & 12 deletions caddy/shadowsocks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@ import (
"fmt"
"log/slog"
"net"
"time"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
outline "github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/caddyserver/caddy/v2"
"github.com/mholt/caddy-l4/layer4"

outline "github.com/Jigsaw-Code/outline-ss-server/service"
)

const ssModuleName = "layer4.handlers.shadowsocks"

// A UDP NAT timeout of at least 5 minutes is recommended in RFC 4787 Section 4.3.
const defaultNatTimeout time.Duration = 5 * time.Minute

func init() {
caddy.RegisterModule(ModuleRegistration{
ID: ssModuleName,
Expand All @@ -45,8 +50,10 @@ type KeyConfig struct {
type ShadowsocksHandler struct {
Keys []KeyConfig `json:"keys,omitempty"`

service outline.Service
logger *slog.Logger
streamHandler outline.StreamHandler
associationHandler outline.AssociationHandler
metrics outline.ServiceMetrics
logger *slog.Logger
}

var (
Expand All @@ -70,6 +77,7 @@ func (h *ShadowsocksHandler) Provision(ctx caddy.Context) error {
if !ok {
return fmt.Errorf("module `%s` is of type `%T`, expected `OutlineApp`", outlineModuleName, app)
}
h.metrics = app.Metrics

if len(h.Keys) == 0 {
h.logger.Warn("no keys configured")
Expand Down Expand Up @@ -97,26 +105,22 @@ func (h *ShadowsocksHandler) Provision(ctx caddy.Context) error {
ciphers := outline.NewCipherList()
ciphers.Update(cipherList)

service, err := outline.NewShadowsocksService(
h.streamHandler, h.associationHandler = outline.NewShadowsocksHandlers(
outline.WithLogger(h.logger),
outline.WithCiphers(ciphers),
outline.WithMetrics(app.Metrics),
outline.WithMetrics(h.metrics),
outline.WithReplayCache(&app.ReplayCache),
)
if err != nil {
return err
}
h.service = service
return nil
}

// Handle implements layer4.NextHandler.
func (h *ShadowsocksHandler) Handle(cx *layer4.Connection, _ layer4.Handler) error {
switch conn := cx.Conn.(type) {
case transport.StreamConn:
h.service.HandleStream(cx.Context, conn)
case net.PacketConn:
h.service.HandlePacket(conn)
h.streamHandler.HandleStream(cx.Context, conn, h.metrics.AddOpenTCPConnection(conn))
case net.Conn:
h.associationHandler.HandleAssociation(cx.Context, conn, h.metrics.AddOpenUDPAssociation(conn))
default:
return fmt.Errorf("failed to handle unknown connection type: %t", conn)
}
Expand Down
27 changes: 18 additions & 9 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"container/list"
"context"
"flag"
"fmt"
"log/slog"
Expand All @@ -27,6 +28,7 @@ import (
"syscall"
"time"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/lmittmann/tint"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -223,40 +225,43 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
ciphers := service.NewCipherList()
ciphers.Update(cipherList)

ssService, err := service.NewShadowsocksService(
streamHandler, associationHandler := service.NewShadowsocksHandlers(
service.WithCiphers(ciphers),
service.WithNatTimeout(s.natTimeout),
service.WithMetrics(s.serviceMetrics),
service.WithReplayCache(&s.replayCache),
service.WithPacketListener(service.MakeTargetUDPListener(s.natTimeout, 0)),
service.WithLogger(slog.Default()),
)
ln, err := lnSet.ListenStream(addr)
if err != nil {
return err
}
slog.Info("TCP service started.", "address", ln.Addr().String())
go service.StreamServe(ln.AcceptStream, ssService.HandleStream)
go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) {
streamHandler.HandleStream(ctx, conn, s.serviceMetrics.AddOpenTCPConnection(conn))
})

pc, err := lnSet.ListenPacket(addr)
if err != nil {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go ssService.HandlePacket(pc)
go service.PacketServe(pc, func(ctx context.Context, conn net.Conn) {
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
}, s.serverMetrics)
}

for _, serviceConfig := range config.Services {
ciphers, err := newCipherListFromConfig(serviceConfig)
if err != nil {
return fmt.Errorf("failed to create cipher list from config: %v", err)
}
ssService, err := service.NewShadowsocksService(
streamHandler, associationHandler := service.NewShadowsocksHandlers(
service.WithCiphers(ciphers),
service.WithNatTimeout(s.natTimeout),
service.WithMetrics(s.serviceMetrics),
service.WithReplayCache(&s.replayCache),
service.WithStreamDialer(service.MakeValidatingTCPStreamDialer(onet.RequirePublicIP, serviceConfig.Dialer.Fwmark)),
service.WithPacketListener(service.MakeTargetUDPListener(serviceConfig.Dialer.Fwmark)),
service.WithPacketListener(service.MakeTargetUDPListener(s.natTimeout, serviceConfig.Dialer.Fwmark)),
service.WithLogger(slog.Default()),
)
if err != nil {
Expand All @@ -275,7 +280,9 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
}
return serviceConfig.Dialer.Fwmark
}())
go service.StreamServe(ln.AcceptStream, ssService.HandleStream)
go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) {
streamHandler.HandleStream(ctx, conn, s.serviceMetrics.AddOpenTCPConnection(conn))
})
case listenerTypeUDP:
pc, err := lnSet.ListenPacket(lnConfig.Address)
if err != nil {
Expand All @@ -287,7 +294,9 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
}
return serviceConfig.Dialer.Fwmark
}())
go ssService.HandlePacket(pc)
go service.PacketServe(pc, func(ctx context.Context, conn net.Conn) {
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
}, s.serverMetrics)
}
}
totalCipherCount += len(serviceConfig.Keys)
Expand Down
32 changes: 29 additions & 3 deletions cmd/outline-ss-server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"time"

"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -25,12 +26,15 @@ var now = time.Now

type serverMetrics struct {
// NOTE: New metrics need to be added to `newPrometheusServerMetrics()`, `Describe()` and `Collect()`.
buildInfo *prometheus.GaugeVec
accessKeys prometheus.Gauge
ports prometheus.Gauge
buildInfo *prometheus.GaugeVec
accessKeys prometheus.Gauge
ports prometheus.Gauge
addedNatEntries prometheus.Counter
removedNatEntries prometheus.Counter
}

var _ prometheus.Collector = (*serverMetrics)(nil)
var _ service.NATMetrics = (*serverMetrics)(nil)

// newPrometheusServerMetrics constructs a Prometheus metrics collector for server
// related metrics.
Expand All @@ -48,19 +52,33 @@ func newPrometheusServerMetrics() *serverMetrics {
Name: "ports",
Help: "Count of open ports",
}),
addedNatEntries: prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "udp",
Name: "nat_entries_added",
Help: "Entries added to the UDP NAT table",
}),
removedNatEntries: prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "udp",
Name: "nat_entries_removed",
Help: "Entries removed from the UDP NAT table",
}),
}
}

func (m *serverMetrics) Describe(ch chan<- *prometheus.Desc) {
m.buildInfo.Describe(ch)
m.accessKeys.Describe(ch)
m.ports.Describe(ch)
m.addedNatEntries.Describe(ch)
m.removedNatEntries.Describe(ch)
}

func (m *serverMetrics) Collect(ch chan<- prometheus.Metric) {
m.buildInfo.Collect(ch)
m.accessKeys.Collect(ch)
m.ports.Collect(ch)
m.addedNatEntries.Collect(ch)
m.removedNatEntries.Collect(ch)
}

func (m *serverMetrics) SetVersion(version string) {
Expand All @@ -71,3 +89,11 @@ func (m *serverMetrics) SetNumAccessKeys(numKeys int, ports int) {
m.accessKeys.Set(float64(numKeys))
m.ports.Set(float64(ports))
}

func (m *serverMetrics) AddNATEntry() {
m.addedNatEntries.Inc()
}

func (m *serverMetrics) RemoveNATEntry() {
m.removedNatEntries.Inc()
}
Loading

0 comments on commit 6a15c34

Please sign in to comment.