Skip to content

Commit

Permalink
add defaultDialer
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Jan 18, 2024
1 parent bb8ce50 commit 711681f
Show file tree
Hide file tree
Showing 18 changed files with 279 additions and 46 deletions.
16 changes: 7 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ import (
"github.com/qiniu/go-sdk/v7/reqid"
)

var UserAgent = getUserAgentWithAppName("default")
var DefaultClient = Client{
&http.Client{
Transport: http.DefaultTransport,
},
}
var (
UserAgent = getUserAgentWithAppName("default")
DefaultClient = Client{&http.Client{Transport: DefaultTransport}}

// 用来打印调试信息
var DebugMode = false
var DeepDebugInfo = false
// 用来打印调试信息
DebugMode = false
DeepDebugInfo = false
)

// --------------------------------------------------------------------

Expand Down
18 changes: 18 additions & 0 deletions client/client_1.12.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//go:build !1.13
// +build !1.13

package client

import (
"net/http"
"time"
)

var DefaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: defaultDialFunc,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
19 changes: 19 additions & 0 deletions client/client_1.13.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//go:build 1.13
// +build 1.13

package client

import (
"net/http"
"time"
)

var DefaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: defaultDialFunc,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
58 changes: 58 additions & 0 deletions client/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package client

import (
"context"
"net"
"time"
)

type (
resolverContextKey struct{}
dialTimeoutContextKey struct{}
keepAliveIntervalContextKey struct{}
resolverContextValue struct {
domain string
ips []net.IP
}
)

func defaultDialFunc(ctx context.Context, network string, address string) (net.Conn, error) {
host, port, err := net.SplitHostPort(address)
if err != nil {
host = address
}

dialTimeout, ok := ctx.Value(dialTimeoutContextKey{}).(time.Duration)
if !ok {
dialTimeout = 30 * time.Second
}
keepAliveInterval, ok := ctx.Value(keepAliveIntervalContextKey{}).(time.Duration)
if !ok {
keepAliveInterval = 15 * time.Second
}
if resolved, ok := ctx.Value(resolverContextKey{}).(resolverContextValue); ok && len(resolved.ips) > 0 {
dialer := net.Dialer{Timeout: dialTimeout / time.Duration(len(resolved.ips)), KeepAlive: keepAliveInterval}
for _, ip := range resolved.ips {
newAddr := ip.String()
if port != "" {
newAddr = net.JoinHostPort(newAddr, port)
}
if conn, err := dialer.DialContext(ctx, network, newAddr); err == nil {
return conn, nil
}
}
}
return (&net.Dialer{Timeout: dialTimeout, KeepAlive: keepAliveInterval}).DialContext(ctx, network, address)
}

func WithResolvedIPs(ctx context.Context, domain string, ips []net.IP) context.Context {
return context.WithValue(ctx, resolverContextKey{}, resolverContextValue{domain: domain, ips: ips})
}

func WithDialTimeout(ctx context.Context, timeout time.Duration) context.Context {
return context.WithValue(ctx, dialTimeoutContextKey{}, timeout)
}

func WithKeepAliveInterval(ctx context.Context, interval time.Duration) context.Context {
return context.WithValue(ctx, keepAliveIntervalContextKey{}, interval)
}
37 changes: 37 additions & 0 deletions client/dialer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//go:build unit
// +build unit

package client

import (
"context"
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
)

func TestDefaultDialer(t *testing.T) {
var responseBody struct {
Status string `json:"status"`
}
mux := http.NewServeMux()
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
w.Write([]byte(`{"status":"ok"}`))
}))
server := httptest.NewServer(mux)
defer server.Close()

port := server.Listener.Addr().(*net.TCPAddr).Port

ctx := WithResolvedIPs(context.Background(), "www.qiniu.com", []net.IP{net.IPv4(127, 0, 0, 1)})
err := DefaultClient.Call(ctx, &responseBody, http.MethodGet, fmt.Sprintf("http://www.qiniu.com:%d/", port), nil)
if err != nil {
t.Fatal(err)
}
if responseBody.Status != "ok" {
t.Fatal("unexpected response")
}
}
17 changes: 1 addition & 16 deletions internal/clientv2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@ type client struct {

func NewClient(cli Client, interceptors ...Interceptor) Client {
if cli == nil {
if clientV1.DefaultClient.Client != nil {
cli = NewClientWithClientV1(&clientV1.DefaultClient)
} else if http.DefaultClient != nil {
cli = http.DefaultClient
} else {
cli = &http.Client{}
}
cli = NewClientWithClientV1(&clientV1.DefaultClient)
}

var is interceptorList = interceptors
Expand Down Expand Up @@ -130,15 +124,6 @@ func NewClientWithClientV1(c *clientV1.Client) Client {
if c == nil {
c = &clientV1.DefaultClient
}

if c.Client == nil {
if clientV1.DefaultClient.Client != nil {
c.Client = clientV1.DefaultClient.Client
} else {
c.Client = &http.Client{}
}
}

return &clientV1Wrapper{
c: c,
}
Expand Down
10 changes: 10 additions & 0 deletions internal/clientv2/interceptor_retry_hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"strings"
"time"

clientV1 "github.com/qiniu/go-sdk/v7/client"
"github.com/qiniu/go-sdk/v7/internal/hostprovider"
internal_io "github.com/qiniu/go-sdk/v7/internal/io"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
)

type HostsRetryConfig struct {
Resolver resolver.Resolver // 主备域名解析器
RetryConfig RetryConfig // 主备域名重试参数
HostFreezeDuration time.Duration // 主备域名冻结时间(默认:600s),当一个域名请求失败被冻结的时间,最小 time.Millisecond
HostProvider hostprovider.HostProvider // 备用域名获取方法
Expand Down Expand Up @@ -69,6 +72,13 @@ func (interceptor *hostsRetryInterceptor) Intercept(req *http.Request, handler H
for i := 0; ; i++ {
// Clone 防止后面 Handler 处理对 req 有污染
reqBefore := cloneReq(req)

if resolver := interceptor.options.Resolver; resolver != nil {
if ips, err := resolver.Resolve(req.Context(), req.URL.Hostname()); err == nil && len(ips) > 0 {
req = req.WithContext(clientV1.WithResolvedIPs(req.Context(), req.URL.Hostname(), ips))
}
}

resp, err = handler(req)

if !interceptor.options.RetryConfig.ShouldRetry(reqBefore, resp, err) {
Expand Down
3 changes: 2 additions & 1 deletion storage/backward_compatible.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package storage

import (
"fmt"
"runtime"

"github.com/qiniu/go-sdk/v7/client"
"github.com/qiniu/go-sdk/v7/conf"
"runtime"
)

var DefaultClient = client.DefaultClient
Expand Down
38 changes: 34 additions & 4 deletions storage/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/qiniu/go-sdk/v7/storagev2/apis"
"github.com/qiniu/go-sdk/v7/storagev2/apis/batch_ops"
"github.com/qiniu/go-sdk/v7/storagev2/http_client"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"

"github.com/qiniu/go-sdk/v7/auth"
clientv1 "github.com/qiniu/go-sdk/v7/client"
Expand Down Expand Up @@ -287,6 +288,7 @@ type BucketManagerOptions struct {
RetryMax int // 单域名重试次数,当前只有 uc 相关的服务有多域名
// 主备域名冻结时间(默认:600s),当一个域名请求失败(单个域名会被重试 TryTimes 次),会被冻结一段时间,使用备用域名进行重试,在冻结时间内,域名不能被使用,当一个操作中所有域名竣备冻结操作不在进行重试,返回最后一次操作的错误。
HostFreezeDuration time.Duration
Resolver resolver.Resolver
}

// BucketManager 提供了对资源进行管理的操作
Expand Down Expand Up @@ -852,7 +854,11 @@ type DomainInfo struct {
// ListBucketDomains 返回绑定在存储空间中的域名信息
func (m *BucketManager) ListBucketDomains(bucket string) (info []DomainInfo, err error) {
reqURL := fmt.Sprintf("%s/v3/domains?tbl=%s", getUcHost(m.Cfg.UseHTTPS), bucket)
err = clientv2.DoAndDecodeJsonResponse(m.getUCClient(), clientv2.RequestParams{
c, err := m.getUCClient()
if err != nil {
return
}
err = clientv2.DoAndDecodeJsonResponse(c, clientv2.RequestParams{
Context: context.Background(),
Method: clientv2.RequestMethodGet,
Url: reqURL,
Expand All @@ -877,7 +883,11 @@ func (m *BucketManager) Prefetch(bucket, key string) error {
// SetImage 用来设置空间镜像源
func (m *BucketManager) SetImage(siteURL, bucket string) (err error) {
reqURL := fmt.Sprintf("%s%s", getUcHost(m.Cfg.UseHTTPS), uriSetImage(siteURL, bucket))
return clientv2.DoAndDecodeJsonResponse(m.getUCClient(), clientv2.RequestParams{
c, err := m.getUCClient()
if err != nil {
return
}
return clientv2.DoAndDecodeJsonResponse(c, clientv2.RequestParams{
Context: context.Background(),
Method: clientv2.RequestMethodPost,
Url: reqURL,
Expand All @@ -890,7 +900,11 @@ func (m *BucketManager) SetImage(siteURL, bucket string) (err error) {
func (m *BucketManager) SetImageWithHost(siteURL, bucket, host string) (err error) {
reqURL := fmt.Sprintf("%s%s", getUcHost(m.Cfg.UseHTTPS),
uriSetImageWithHost(siteURL, bucket, host))
return clientv2.DoAndDecodeJsonResponse(m.getUCClient(), clientv2.RequestParams{
c, err := m.getUCClient()
if err != nil {
return
}
return clientv2.DoAndDecodeJsonResponse(c, clientv2.RequestParams{
Context: context.Background(),
Method: clientv2.RequestMethodPost,
Url: reqURL,
Expand All @@ -902,7 +916,11 @@ func (m *BucketManager) SetImageWithHost(siteURL, bucket, host string) (err erro
// UnsetImage 用来取消空间镜像源设置
func (m *BucketManager) UnsetImage(bucket string) (err error) {
reqURL := fmt.Sprintf("%s%s", getUcHost(m.Cfg.UseHTTPS), uriUnsetImage(bucket))
return clientv2.DoAndDecodeJsonResponse(m.getUCClient(), clientv2.RequestParams{
c, err := m.getUCClient()
if err != nil {
return
}
return clientv2.DoAndDecodeJsonResponse(c, clientv2.RequestParams{
Context: context.Background(),
Method: clientv2.RequestMethodPost,
Url: reqURL,
Expand Down Expand Up @@ -1026,6 +1044,18 @@ func (m *BucketManager) makeRequestOptions() *apis.Options {
return &apis.Options{OverwrittenBucketHosts: getUcEndpoint(m.Cfg.UseHTTPS)}
}

func (m *BucketManager) resolver() (resolver.Resolver, error) {
if m.options.Resolver != nil {
return m.options.Resolver, nil
}
if resolver, err := resolver.NewCacheResolver(nil, nil); err != nil {
return nil, err
} else {
m.options.Resolver = resolver
return resolver, nil
}
}

// 构建op的方法,导出的方法支持在Batch操作中使用

// URIStat 构建 stat 接口的请求命令
Expand Down
5 changes: 5 additions & 0 deletions storage/bucket_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,16 @@ func (m *BucketManager) Get(bucket, key string, options *GetObjectInput) (*GetOb
// 使用用户配置域名
domain = options.DownloadDomains[0]
} else {
resolver, e := m.resolver()
if e != nil {
return nil, e
}
// 查源站域名
if rg, e := getRegionByV4(m.Mac.AccessKey, bucket, UCApiOptions{
UseHttps: m.Cfg.UseHTTPS,
RetryMax: m.options.RetryMax,
HostFreezeDuration: m.options.HostFreezeDuration,
Resolver: resolver,
}); e != nil {
return nil, e
} else if len(rg.regions) == 0 {
Expand Down
3 changes: 2 additions & 1 deletion storage/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func init() {
}
clt = client.Client{
Client: &http.Client{
Timeout: time.Minute * 10,
Timeout: time.Minute * 10,
Transport: client.DefaultTransport,
},
}
mac = auth.New(testAK, testSK)
Expand Down
4 changes: 4 additions & 0 deletions storage/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/qiniu/go-sdk/v7/storagev2/apis"
"github.com/qiniu/go-sdk/v7/storagev2/http_client"
region_v2 "github.com/qiniu/go-sdk/v7/storagev2/region"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
)

// 存储所在的地区,例如华东,华南,华北
Expand Down Expand Up @@ -380,6 +381,8 @@ type ucClientConfig struct {
// 主备域名冻结时间(默认:600s),当一个域名请求失败(单个域名会被重试 TryTimes 次),会被冻结一段时间,使用备用域名进行重试,在冻结时间内,域名不能被使用,当一个操作中所有域名竣备冻结操作不在进行重试,返回最后一次操作的错误。
HostFreezeDuration time.Duration

Resolver resolver.Resolver

Client *client.Client
}

Expand Down Expand Up @@ -407,6 +410,7 @@ func getUCClient(config ucClientConfig, mac *auth.Credentials) clientv2.Client {
ShouldFreezeHost: nil,
HostFreezeDuration: config.HostFreezeDuration,
HostProvider: hostprovider.NewWithHosts(hosts),
Resolver: config.Resolver,
}),
clientv2.NewSimpleRetryInterceptor(clientv2.RetryConfig{
RetryMax: config.RetryMax,
Expand Down
Loading

0 comments on commit 711681f

Please sign in to comment.