diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 816a196b..2bd9c8b7 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -16,6 +16,7 @@ package main import ( "container/list" + "context" "flag" "fmt" "log/slog" @@ -28,6 +29,7 @@ import ( "syscall" "time" + "github.com/Jigsaw-Code/outline-sdk/transport" "github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks" "github.com/Jigsaw-Code/outline-ss-server/ipinfo" "github.com/Jigsaw-Code/outline-ss-server/service" @@ -61,7 +63,7 @@ type SSServer struct { stopConfig func() error lnManager service.ListenerManager natTimeout time.Duration - m *outlineMetricsCollector + m *outlineMetrics replayCache service.ReplayCache } @@ -85,13 +87,13 @@ func (s *SSServer) loadConfig(filename string) error { } func (s *SSServer) NewShadowsocksStreamHandler(ciphers service.CipherList) service.StreamHandler { - authFunc := service.NewShadowsocksStreamAuthenticator(ciphers, &s.replayCache, s.m) + authFunc := service.NewShadowsocksStreamAuthenticator(ciphers, &s.replayCache, s.m.tcpServiceMetrics) // TODO: Register initial data metrics at zero. - return service.NewStreamHandler(authFunc, s.m, tcpReadTimeout) + return service.NewStreamHandler(authFunc, tcpReadTimeout) } func (s *SSServer) NewShadowsocksPacketHandler(ciphers service.CipherList) service.PacketHandler { - return service.NewPacketHandler(s.natTimeout, ciphers, s.m) + return service.NewPacketHandler(s.natTimeout, ciphers, s.m, s.m.udpServiceMetrics) } type listenerSet struct { @@ -196,7 +198,10 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { return err } slog.Info("Shadowsocks TCP service started.", "address", ln.Addr().String()) - go service.StreamServe(ln.AcceptStream, sh.Handle) + go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) { + connMetrics := s.m.AddOpenTCPConnection(conn) + sh.Handle(ctx, conn, connMetrics) + }) pc, err := lnSet.ListenPacket(addr) if err != nil { @@ -243,7 +248,7 @@ func (s *SSServer) Stop() error { } // RunSSServer starts a shadowsocks server running, and returns the server or an error. -func RunSSServer(filename string, natTimeout time.Duration, sm *outlineMetricsCollector, replayHistory int) (*SSServer, error) { +func RunSSServer(filename string, natTimeout time.Duration, sm *outlineMetrics, replayHistory int) (*SSServer, error) { server := &SSServer{ lnManager: service.NewListenerManager(), natTimeout: natTimeout, @@ -348,7 +353,10 @@ func main() { } defer ip2info.Close() - metrics := newPrometheusOutlineMetrics(ip2info) + metrics, err := newPrometheusOutlineMetrics(ip2info) + if err != nil { + slog.Error("Failed to create Outline Prometheus metrics. Aborting.", "err", err) + } metrics.SetBuildInfo(version) r := prometheus.WrapRegistererWithPrefix("shadowsocks_", prometheus.DefaultRegisterer) r.MustRegister(metrics) diff --git a/cmd/outline-ss-server/metrics.go b/cmd/outline-ss-server/metrics.go index 740bd489..c5641411 100644 --- a/cmd/outline-ss-server/metrics.go +++ b/cmd/outline-ss-server/metrics.go @@ -31,20 +31,140 @@ import ( // `now` is stubbable for testing. var now = time.Now -type tcpCollector struct { - // NOTE: New metrics need to be added to `newTCPCollector()`, `Describe()` and - // `Collect()`. +func NewTimeToCipherVec(proto string) (prometheus.ObserverVec, error) { + vec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "time_to_cipher_ms", + Help: "Time needed to find the cipher", + Buckets: []float64{0.1, 1, 10, 100, 1000}, + }, []string{"proto", "found_key"}) + return vec.CurryWith(map[string]string{"proto": proto}) +} + +type proxyCollector struct { + // NOTE: New metrics need to be added to `newProxyCollector()`, `Describe()` and `Collect()`. + dataBytesPerKey *prometheus.CounterVec + dataBytesPerLocation *prometheus.CounterVec +} + +func newProxyCollector(proto string) (*proxyCollector, error) { + dataBytesPerKey, err := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "data_bytes", + Help: "Bytes transferred by the proxy, per access key", + }, []string{"proto", "dir", "access_key"}).CurryWith(map[string]string{"proto": proto}) + if err != nil { + return nil, err + } + dataBytesPerLocation, err := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "data_bytes_per_location", + Help: "Bytes transferred by the proxy, per location", + }, []string{"proto", "dir", "location", "asn"}).CurryWith(map[string]string{"proto": proto}) + if err != nil { + return nil, err + } + return &proxyCollector{ + dataBytesPerKey: dataBytesPerKey, + dataBytesPerLocation: dataBytesPerLocation, + }, nil +} + +func (c *proxyCollector) Describe(ch chan<- *prometheus.Desc) { + c.dataBytesPerKey.Describe(ch) + c.dataBytesPerLocation.Describe(ch) +} + +func (c *proxyCollector) Collect(ch chan<- prometheus.Metric) { + c.dataBytesPerKey.Collect(ch) + c.dataBytesPerLocation.Collect(ch) +} + +func (c *proxyCollector) addClientTarget(clientProxyBytes, proxyTargetBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { + addIfNonZero(clientProxyBytes, c.dataBytesPerKey, "c>p", accessKey) + addIfNonZero(clientProxyBytes, c.dataBytesPerLocation, "c>p", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)) + addIfNonZero(proxyTargetBytes, c.dataBytesPerKey, "p>t", accessKey) + addIfNonZero(proxyTargetBytes, c.dataBytesPerLocation, "p>t", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)) +} + +func (c *proxyCollector) addTargetClient(targetProxyBytes, proxyClientBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { + addIfNonZero(targetProxyBytes, c.dataBytesPerKey, "pp", "tcp", accessKey) - addIfNonZero(data.ClientProxy, m.dataBytesPerLocation, "c>p", "tcp", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)) - addIfNonZero(data.ProxyTarget, m.dataBytes, "p>t", "tcp", accessKey) - addIfNonZero(data.ProxyTarget, m.dataBytesPerLocation, "p>t", "tcp", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)) - addIfNonZero(data.TargetProxy, m.dataBytes, "pp", "udp", accessKey) - addIfNonZero(int64(clientProxyBytes), m.dataBytesPerLocation, "c>p", "udp", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)) - addIfNonZero(int64(proxyTargetBytes), m.dataBytes, "p>t", "udp", accessKey) - addIfNonZero(int64(proxyTargetBytes), m.dataBytesPerLocation, "p>t", "udp", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)) -} - -func (m *outlineMetricsCollector) AddUDPPacketFromTarget(clientInfo ipinfo.IPInfo, accessKey, status string, targetProxyBytes, proxyClientBytes int) { - addIfNonZero(int64(targetProxyBytes), m.dataBytes, "p %d)", report.clientProxyBytes, report.proxyTargetBytes) assert.Equal(t, "id-0", report.accessKey, "Unexpected access key name: %s", report.accessKey) assert.Equal(t, "OK", report.status, "Wrong status: %s", report.status) @@ -215,7 +217,7 @@ func setupNAT() (*fakePacketConn, *fakePacketConn, *natconn) { nat := newNATmap(timeout, &natTestMetrics{}, &sync.WaitGroup{}) clientConn := makePacketConn() targetConn := makePacketConn() - nat.Add(&clientAddr, clientConn, natCryptoKey, targetConn, ipinfo.IPInfo{CountryCode: "ZZ"}, "key id") + nat.Add(&clientAddr, clientConn, natCryptoKey, targetConn, "key id") entry := nat.Get(clientAddr.String()) return clientConn, targetConn, entry } @@ -480,7 +482,7 @@ func TestUDPEarlyClose(t *testing.T) { } testMetrics := &natTestMetrics{} const testTimeout = 200 * time.Millisecond - s := NewPacketHandler(testTimeout, cipherList, testMetrics) + s := NewPacketHandler(testTimeout, cipherList, testMetrics, &fakeShadowsocksMetrics{}) clientConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}) if err != nil {