Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add /dump/streams as a traffic stats API #1247

Merged
merged 4 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion core/internal/integration_tests/mocks/mock_TrafficLogger.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/internal/integration_tests/trafficlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestClientServerTrafficLoggerTCP(t *testing.T) {
return nil
})
serverOb.EXPECT().TCP(addr).Return(sobConn, nil).Once()
trafficLogger.EXPECT().TraceStream(mock.Anything, mock.Anything).Return().Once()

conn, err := c.TCP(addr)
assert.NoError(t, err)
Expand All @@ -84,6 +85,7 @@ func TestClientServerTrafficLoggerTCP(t *testing.T) {
time.Sleep(1 * time.Second) // Need some time for the server to receive the data

// Client reads from server again but blocked
trafficLogger.EXPECT().UntraceStream(mock.Anything).Return().Once()
trafficLogger.EXPECT().LogTraffic("nobody", uint64(0), uint64(4)).Return(false).Once()
trafficLogger.EXPECT().LogOnlineState("nobody", false).Return().Once()
sobConnCh <- []byte("nope")
Expand Down
30 changes: 30 additions & 0 deletions core/internal/utils/atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,33 @@ func (t *AtomicTime) Set(new time.Time) {
func (t *AtomicTime) Get() time.Time {
return t.v.Load().(time.Time)
}

type Atomic[T any] struct {
v atomic.Value
}

func (a *Atomic[T]) Load() T {
value := a.v.Load()
if value == nil {
var zero T
return zero
}
return value.(T)
}

func (a *Atomic[T]) Store(value T) {
a.v.Store(value)
}

func (a *Atomic[T]) Swap(new T) T {
old := a.v.Swap(new)
if old == nil {
var zero T
return zero
}
return old.(T)
}

func (a *Atomic[T]) CompareAndSwap(old, new T) bool {
return a.v.CompareAndSwap(old, new)
}
64 changes: 64 additions & 0 deletions core/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"crypto/tls"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/apernet/hysteria/core/v2/errors"
"github.com/apernet/hysteria/core/v2/internal/pmtud"
"github.com/apernet/hysteria/core/v2/internal/utils"
"github.com/apernet/quic-go"
)

Expand Down Expand Up @@ -212,4 +214,66 @@ type EventLogger interface {
type TrafficLogger interface {
LogTraffic(id string, tx, rx uint64) (ok bool)
LogOnlineState(id string, online bool)
TraceStream(stream quic.Stream, stats *StreamStats)
UntraceStream(stream quic.Stream)
}

type StreamState int

const (
// StreamStateInitial indicates the initial state of a stream.
// Client has opened the stream, but we have not received the proxy request yet.
StreamStateInitial StreamState = iota

// StreamStateHooking indicates that the hook (usually sniff) is processing.
// Client has sent the proxy request, but sniff requires more data to complete.
StreamStateHooking

// StreamStateConnecting indicates that we are connecting to the proxy target.
StreamStateConnecting

// StreamStateEstablished indicates the proxy is established.
StreamStateEstablished

// StreamStateClosed indicates the stream is closed.
StreamStateClosed
)

func (s StreamState) String() string {
switch s {
case StreamStateInitial:
return "init"
case StreamStateHooking:
return "hook"
case StreamStateConnecting:
return "connect"
case StreamStateEstablished:
return "estab"
case StreamStateClosed:
return "closed"
default:
return "unknown"
}
}

type StreamStats struct {
State utils.Atomic[StreamState]

AuthID string
ConnID uint32
InitialTime time.Time

ReqAddr utils.Atomic[string]
HookedReqAddr utils.Atomic[string]

Tx atomic.Uint64
Rx atomic.Uint64

LastActiveTime utils.Atomic[time.Time]
}

func (s *StreamStats) setHookedReqAddr(addr string) {
if addr != s.ReqAddr.Load() {
s.HookedReqAddr.Store(addr)
}
}
9 changes: 7 additions & 2 deletions core/server/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"errors"
"io"
"time"
)

var errDisconnect = errors.New("traffic logger requested disconnect")
Expand Down Expand Up @@ -31,23 +32,27 @@ func copyBufferLog(dst io.Writer, src io.Reader, log func(n uint64) bool) error
}
}

func copyTwoWayWithLogger(id string, serverRw, remoteRw io.ReadWriter, l TrafficLogger) error {
func copyTwoWayEx(id string, serverRw, remoteRw io.ReadWriter, l TrafficLogger, stats *StreamStats) error {
errChan := make(chan error, 2)
go func() {
errChan <- copyBufferLog(serverRw, remoteRw, func(n uint64) bool {
stats.LastActiveTime.Store(time.Now())
stats.Rx.Add(n)
return l.LogTraffic(id, 0, n)
})
}()
go func() {
errChan <- copyBufferLog(remoteRw, serverRw, func(n uint64) bool {
stats.LastActiveTime.Store(time.Now())
stats.Tx.Add(n)
return l.LogTraffic(id, n, 0)
})
}()
// Block until one of the two goroutines returns
return <-errChan
}

// copyTwoWay is the "fast-path" version of copyTwoWayWithLogger that does not log traffic.
// copyTwoWay is the "fast-path" version of copyTwoWayEx that does not log traffic or update stream stats.
// It uses the built-in io.Copy instead of our own copyBufferLog.
func copyTwoWay(serverRw, remoteRw io.ReadWriter) error {
errChan := make(chan error, 2)
Expand Down
32 changes: 29 additions & 3 deletions core/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package server
import (
"context"
"crypto/tls"
"math/rand"
"net/http"
"sync"
"time"

"github.com/apernet/quic-go"
"github.com/apernet/quic-go/http3"
Expand Down Expand Up @@ -100,6 +102,7 @@ type h3sHandler struct {
authenticated bool
authMutex sync.Mutex
authID string
connID uint32 // a random id for dump streams

udpSM *udpSessionManager // Only set after authentication
}
Expand All @@ -108,6 +111,7 @@ func newH3sHandler(config *Config, conn quic.Connection) *h3sHandler {
return &h3sHandler{
config: config,
conn: conn,
connID: rand.Uint32(),
}
}

Expand Down Expand Up @@ -205,12 +209,29 @@ func (h *h3sHandler) ProxyStreamHijacker(ft http3.FrameType, id quic.ConnectionT
}

func (h *h3sHandler) handleTCPRequest(stream quic.Stream) {
trafficLogger := h.config.TrafficLogger
streamStats := &StreamStats{
AuthID: h.authID,
ConnID: h.connID,
InitialTime: time.Now(),
}
streamStats.State.Store(StreamStateInitial)
streamStats.LastActiveTime.Store(time.Now())
defer func() {
streamStats.State.Store(StreamStateClosed)
}()
if trafficLogger != nil {
trafficLogger.TraceStream(stream, streamStats)
defer trafficLogger.UntraceStream(stream)
}

// Read request
reqAddr, err := protocol.ReadTCPRequest(stream)
if err != nil {
_ = stream.Close()
return
}
streamStats.ReqAddr.Store(reqAddr)
// Call the hook if set
var putback []byte
var hooked bool
Expand All @@ -220,19 +241,22 @@ func (h *h3sHandler) handleTCPRequest(stream quic.Stream) {
// so that the client will send whatever request the hook wants to see.
// This is essentially a server-side fast-open.
if hooked {
streamStats.State.Store(StreamStateHooking)
_ = protocol.WriteTCPResponse(stream, true, "RequestHook enabled")
putback, err = h.config.RequestHook.TCP(stream, &reqAddr)
if err != nil {
_ = stream.Close()
return
}
streamStats.setHookedReqAddr(reqAddr)
}
}
// Log the event
if h.config.EventLogger != nil {
h.config.EventLogger.TCPRequest(h.conn.RemoteAddr(), h.authID, reqAddr)
}
// Dial target
streamStats.State.Store(StreamStateConnecting)
tConn, err := h.config.Outbound.TCP(reqAddr)
if err != nil {
if !hooked {
Expand All @@ -248,13 +272,15 @@ func (h *h3sHandler) handleTCPRequest(stream quic.Stream) {
if !hooked {
_ = protocol.WriteTCPResponse(stream, true, "Connected")
}
streamStats.State.Store(StreamStateEstablished)
// Put back the data if the hook requested
if len(putback) > 0 {
_, _ = tConn.Write(putback)
n, _ := tConn.Write(putback)
streamStats.Tx.Add(uint64(n))
}
// Start proxying
if h.config.TrafficLogger != nil {
err = copyTwoWayWithLogger(h.authID, stream, tConn, h.config.TrafficLogger)
if trafficLogger != nil {
err = copyTwoWayEx(h.authID, stream, tConn, trafficLogger, streamStats)
} else {
// Use the fast path if no traffic logger is set
err = copyTwoWay(stream, tConn)
Expand Down
Loading