Skip to content

Commit

Permalink
feat: add source http metric (#3468)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma authored Aug 28, 2024
1 parent b2babf8 commit 58b612f
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 9 deletions.
83 changes: 83 additions & 0 deletions pkg/source/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package source

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"

"d7y.io/dragonfly/v2/pkg/types"
)

var (
transportLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.DfdaemonMetricsName,
Name: "transport_latency",
Help: "The latency of transport request",
// promhttp.InstrumentRoundTripperTrace unit is second, the buckets starts from 1 millisecond to 32.768 seconds
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
}, []string{"stage"})
)

func withTraceRoundTripper(next http.RoundTripper) http.RoundTripper {
trace := promhttp.InstrumentTrace{
GotConn: func(f float64) {
transportLatency.WithLabelValues("GotConn").Observe(f)
},
PutIdleConn: func(f float64) {
transportLatency.WithLabelValues("PutIdleConn").Observe(f)
},
GotFirstResponseByte: func(f float64) {
transportLatency.WithLabelValues("GotFirstResponseByte").Observe(f)
},
Got100Continue: func(f float64) {
transportLatency.WithLabelValues("Got100Continue").Observe(f)
},
DNSStart: func(f float64) {
transportLatency.WithLabelValues("DNSStart").Observe(f)
},
DNSDone: func(f float64) {
transportLatency.WithLabelValues("DNSDone").Observe(f)
},
ConnectStart: func(f float64) {
transportLatency.WithLabelValues("ConnectStart").Observe(f)
},
ConnectDone: func(f float64) {
transportLatency.WithLabelValues("ConnectDone").Observe(f)
},
TLSHandshakeStart: func(f float64) {
transportLatency.WithLabelValues("TLSHandshakeStart").Observe(f)
},
TLSHandshakeDone: func(f float64) {
transportLatency.WithLabelValues("TLSHandshakeDone").Observe(f)
},
WroteHeaders: func(f float64) {
transportLatency.WithLabelValues("WroteHeaders").Observe(f)
},
Wait100Continue: func(f float64) {
transportLatency.WithLabelValues("Wait100Continue").Observe(f)
},
WroteRequest: func(f float64) {
transportLatency.WithLabelValues("WroteRequest").Observe(f)
},
}
return promhttp.InstrumentRoundTripperTrace(&trace, next)
}
46 changes: 37 additions & 9 deletions pkg/source/transport_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package source

import (
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"os"
"time"

"gopkg.in/yaml.v3"

logger "d7y.io/dragonfly/v2/internal/dflog"
)

var ProxyEnv = "D7Y_SOURCE_PROXY"
Expand All @@ -35,55 +36,83 @@ type transportOption struct {
DialTimeout time.Duration `yaml:"dialTimeout"`
KeepAlive time.Duration `yaml:"keepAlive"`
MaxIdleConns int `yaml:"maxIdleConns"`
MaxIdleConnsPerHost int `yaml:"maxIdleConnsPerHost"`
MaxConnsPerHost int `yaml:"maxConnsPerHost"`
IdleConnTimeout time.Duration `yaml:"idleConnTimeout"`
ResponseHeaderTimeout time.Duration `yaml:"responseHeaderTimeout"`
TLSHandshakeTimeout time.Duration `yaml:"tlsHandshakeTimeout"`
ExpectContinueTimeout time.Duration `yaml:"expectContinueTimeout"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
EnableTrace bool `yaml:"enableTrace"`
}

func UpdateTransportOption(transport *http.Transport, optionYaml []byte) error {
func CreateTransportWithOption(optionYaml []byte) (http.RoundTripper, error) {
opt := &transportOption{}
err := yaml.Unmarshal(optionYaml, opt)
if err != nil {
return err
return nil, err
}

var roundTripper http.RoundTripper

transport := DefaultTransport()
roundTripper = transport

if len(opt.Proxy) > 0 {
proxy, err := url.Parse(opt.Proxy)
if err != nil {
fmt.Printf("proxy parse error: %s\n", err)
return err
logger.Errorf("proxy parse error: %s\n", err)
return nil, err
}
logger.Debugf("update transport upstream proxy: %s", opt.Proxy)
transport.Proxy = http.ProxyURL(proxy)
}

if opt.IdleConnTimeout > 0 {
transport.IdleConnTimeout = opt.IdleConnTimeout
logger.Debugf("update transport idle conn timeout: %s", opt.IdleConnTimeout)
}
if opt.DialTimeout > 0 && opt.KeepAlive > 0 {
transport.DialContext = (&net.Dialer{
Timeout: opt.DialTimeout,
KeepAlive: opt.KeepAlive,
DualStack: true,
}).DialContext
logger.Debugf("update transport dial timeout: %s, keep alive: %s", opt.DialTimeout, opt.KeepAlive)
}
if opt.MaxIdleConns > 0 {
transport.MaxIdleConns = opt.MaxIdleConns
logger.Debugf("update transport max idle conns: %d", opt.MaxIdleConns)
}
if opt.MaxIdleConnsPerHost > 0 {
transport.MaxIdleConnsPerHost = opt.MaxIdleConnsPerHost
logger.Debugf("update transport max idle conns per host: %d", opt.MaxIdleConnsPerHost)
}
if opt.MaxConnsPerHost > 0 {
transport.MaxConnsPerHost = opt.MaxConnsPerHost
logger.Debugf("update transport max conns per host: %d", opt.MaxConnsPerHost)
}
if opt.ExpectContinueTimeout > 0 {
transport.ExpectContinueTimeout = opt.ExpectContinueTimeout
logger.Debugf("update transport expect continue timeout: %s", opt.ExpectContinueTimeout)
}
if opt.ResponseHeaderTimeout > 0 {
transport.ResponseHeaderTimeout = opt.ResponseHeaderTimeout
logger.Debugf("update transport response header timeout: %s", opt.ResponseHeaderTimeout)
}
if opt.TLSHandshakeTimeout > 0 {
transport.TLSHandshakeTimeout = opt.TLSHandshakeTimeout
logger.Debugf("update transport tls handshake timeout: %s", opt.TLSHandshakeTimeout)
}
if opt.InsecureSkipVerify {
transport.TLSClientConfig.InsecureSkipVerify = opt.InsecureSkipVerify
logger.Debugf("update transport skip insecure verify")
}
return nil
if opt.EnableTrace {
roundTripper = withTraceRoundTripper(transport)
logger.Debugf("update transport with trace")
}
return roundTripper, nil
}

func DefaultTransport() *http.Transport {
Expand All @@ -94,7 +123,7 @@ func DefaultTransport() *http.Transport {
if proxyEnv := os.Getenv(ProxyEnv); len(proxyEnv) > 0 {
proxy, err = url.Parse(proxyEnv)
if err != nil {
fmt.Printf("proxy parse error: %s\n", err)
logger.Errorf("proxy parse error: %s\n", err)
}
}

Expand All @@ -119,8 +148,7 @@ func DefaultTransport() *http.Transport {
}

func ParseToHTTPClient(optionYaml []byte) (*http.Client, error) {
transport := DefaultTransport()
err := UpdateTransportOption(transport, optionYaml)
transport, err := CreateTransportWithOption(optionYaml)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 58b612f

Please sign in to comment.