Skip to content

Commit

Permalink
add resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Jan 16, 2024
1 parent 0c1b84a commit 742e041
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 4 deletions.
14 changes: 12 additions & 2 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (cache *Cache) Get(key string, fallback func() (CacheValue, error)) (CacheV
value, ok := cache.cacheMap[key]
cache.cacheMapMutex.Unlock()

defer func() {
go cache.flush()
}()

if ok && value.Value.IsValid() {
return value.Value, GetResultFromCache
}
Expand All @@ -129,7 +133,7 @@ func (cache *Cache) Get(key string, fallback func() (CacheValue, error)) (CacheV
return nil, NoResultGot
}
}
cache.Set(key, newValue)
cache.set(key, newValue, false)
return newValue, GetResultFromFallback
}

Expand All @@ -142,6 +146,10 @@ func (cache *Cache) doFallback(key string, fallback func() (CacheValue, error))
}

func (cache *Cache) Set(key string, value CacheValue) {
cache.set(key, value, true)
}

func (cache *Cache) set(key string, value CacheValue, willFlushAsync bool) {
if value.IsValid() {
cache.checkType(value)

Expand All @@ -150,7 +158,9 @@ func (cache *Cache) Set(key string, value CacheValue) {
cache.cacheMap[key] = cacheValue{Value: value, CreatedAt: now}
cache.cacheMapMutex.Unlock()

go cache.flush()
if willFlushAsync {
go cache.flush()
}
}
}

Expand Down
203 changes: 203 additions & 0 deletions storagev2/http_client/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package http_client

import (
"context"
"fmt"
"hash/crc64"
"net"
"os"
"path/filepath"
"reflect"
"strconv"
"sync"
"time"

"github.com/qiniu/go-sdk/v7/internal/cache"
"github.com/qiniu/go-sdk/v7/internal/log"
)

// Resolver 域名解析器的接口
type Resolver interface {
// Resolve 解析域名的 IP 地址
Resolve(context.Context, string) ([]net.IP, error)
}

type DisableFlags int

const (
DisableIpv4Flag DisableFlags = 1 << iota
DisableIpv6Flag
)

// DefaultResolver 默认的域名解析器
type DefaultResolver struct {
disables DisableFlags
}

func NewDefaultResolver(disables DisableFlags) Resolver {
if disables < 0 || disables >= 3 {
panic("invalid disables, could only accept 0, 1, 2")
}
return &DefaultResolver{disables: disables}
}

func (resolver *DefaultResolver) Resolve(ctx context.Context, host string) ([]net.IP, error) {
network := "ip"
if resolver.disables&DisableIpv4Flag > 0 {
network = "ip6"
} else if resolver.disables&DisableIpv6Flag > 0 {
network = "ip4"
}
return net.DefaultResolver.LookupIP(ctx, network, host)
}

type (
cacheResolver struct {
resolver Resolver
cache *cache.Cache
crc64 string
}

// CacheResolverOptions 缓存域名解析器选项
CacheResolverOptions struct {
// 压缩周期(默认:60s)
CompactInterval time.Duration

// 持久化路径(默认:$TMPDIR/qiniu-golang-sdk/resolver_01.cache.json)
PersistentFilePath string

// 持久化周期(默认:60s)
PersistentDuration time.Duration

// 主备域名冻结时间(默认:600s),当一个域名请求失败(单个域名会被重试 RetryMax 次),会被冻结一段时间,使用备用域名进行重试,在冻结时间内,域名不能被使用,当一个操作中所有域名竣备冻结操作不在进行重试,返回最后一次操作的错误。
HostFreezeDuration time.Duration
}

resolverCacheValue struct {
IPs []net.IP `json:"ips"`
ExpiredAt time.Time `json:"expired_at"`
}
)

const cacheFileName = "resolver_01.cache.json"

var (
resolverCaches map[uint64]*cacheResolver
resolverCachesLock sync.Mutex
defaultResolver Resolver = &DefaultResolver{}
)

// NewCacheResolver 创建带缓存功能的域名解析器
func NewCacheResolver(resolver Resolver, opts *CacheResolverOptions) (Resolver, error) {
if opts == nil {
opts = &CacheResolverOptions{}
}
if opts.CompactInterval == time.Duration(0) {
opts.CompactInterval = time.Minute
}
if opts.PersistentFilePath == "" {
opts.PersistentFilePath = filepath.Join(os.TempDir(), "qiniu-golang-sdk", cacheFileName)
}
if opts.PersistentDuration == time.Duration(0) {
opts.PersistentDuration = time.Minute
}
if resolver == nil {
resolver = defaultResolver
}

crc64Value := calcCacheResolverCrc64(resolver, opts)
resolverCachesLock.Lock()
defer resolverCachesLock.Unlock()

if resolverCaches == nil {
resolverCaches = make(map[uint64]*cacheResolver)
}

if cresolver, ok := resolverCaches[crc64Value]; ok {
return cresolver, nil
} else {
persistentCache, err := cache.NewPersistentCache(reflect.TypeOf(&resolverCacheValue{}), opts.PersistentFilePath, opts.CompactInterval, opts.PersistentDuration, func(err error) {
log.Warn(fmt.Sprintf("CacheResolver persist error: %s", err))
})
if err != nil {
return nil, err
}
cresolver = &cacheResolver{
cache: persistentCache,
resolver: resolver,
crc64: strconv.FormatUint(crc64Value, 36),
}
resolverCaches[crc64Value] = cresolver
return cresolver, nil
}
}

func (resolver *cacheResolver) Resolve(ctx context.Context, host string) ([]net.IP, error) {
lip, err := resolver.localIp(host)
if err != nil {
return nil, err
}
cacheValue, status := resolver.cache.Get(resolver.crc64+":"+lip+":"+host, func() (cache.CacheValue, error) {
var ips []net.IP
if ips, err = resolver.resolver.Resolve(ctx, host); err != nil {
return nil, err
} else {
return &resolverCacheValue{IPs: ips, ExpiredAt: time.Now().Add(5 * time.Minute)}, nil
}
})
if status == cache.NoResultGot {
return nil, err
}
return cacheValue.(*resolverCacheValue).IPs, nil
}

func (left *resolverCacheValue) IsEqual(rightValue cache.CacheValue) bool {
if right, ok := rightValue.(*resolverCacheValue); ok {
if len(left.IPs) != len(right.IPs) {
return false
}
for idx := range left.IPs {
if !left.IPs[idx].Equal(right.IPs[idx]) {
return false
}
}
return true
}
return false
}

func (left *resolverCacheValue) IsValid() bool {
return time.Now().Before(left.ExpiredAt)
}

func (*cacheResolver) localIp(host string) (string, error) {
conn, err := net.Dial("udp", host+":80")
if err != nil {
return "", err
}
defer conn.Close()

return conn.LocalAddr().(*net.UDPAddr).IP.String(), nil
}

func (opts *CacheResolverOptions) toBytes(resolver Resolver) []byte {
bytes := make([]byte, 0, 1024)
if resolver != nil {
p := reflect.ValueOf(resolver).Pointer()
bytes = strconv.AppendUint(bytes, uint64(p), 36)
} else {
bytes = strconv.AppendUint(bytes, 0, 36)
}
bytes = strconv.AppendInt(bytes, int64(opts.CompactInterval), 36)
bytes = strconv.AppendInt(bytes, int64(opts.HostFreezeDuration), 36)
bytes = strconv.AppendInt(bytes, int64(opts.PersistentDuration), 36)
bytes = append(bytes, []byte(opts.PersistentFilePath)...)
bytes = append(bytes, byte(0))
return bytes
}

func calcCacheResolverCrc64(resolver Resolver, opts *CacheResolverOptions) uint64 {
hasher := crc64.New(crc64.MakeTable(crc64.ISO))
hasher.Write(opts.toBytes(resolver))
return hasher.Sum64()
}
73 changes: 73 additions & 0 deletions storagev2/http_client/resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//go:build unit
// +build unit

package http_client

import (
"context"
"net"
"testing"
)

func TestDefaultResolver(t *testing.T) {
ips, err := new(DefaultResolver).Resolve(context.Background(), "upload.qiniup.com")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if len(ips) == 0 {
t.Fatal("Unexpected empty ips")
}

ips, err = NewDefaultResolver(DisableIpv4Flag).Resolve(context.Background(), "upload.qiniup.com")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if len(ips) == 0 {
t.Fatal("Unexpected empty ips")
}
for _, ip := range ips {
if len([]byte(ip)) != 16 {
t.Fatal("Unexpected ipv4 address")
}
}

ips, err = NewDefaultResolver(DisableIpv6Flag).Resolve(context.Background(), "upload.qiniup.com")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if len(ips) == 0 {
t.Fatal("Unexpected empty ips")
}
for _, ip := range ips {
if len([]byte(ip)) != 4 {
t.Fatal("Unexpected ipv6 address")
}
}
}

type mockResolver struct {
m map[string][]net.IP
c map[string]int
}

func (mr *mockResolver) Resolve(ctx context.Context, host string) ([]net.IP, error) {
mr.c[host]++
return mr.m[host], nil
}

func TestCacheResolver(t *testing.T) {
mr := &mockResolver{m: map[string][]net.IP{"upload.qiniup.com": {net.IPv4(1, 1, 1, 1)}}, c: make(map[string]int)}
resolver, err := NewCacheResolver(mr, nil)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
ips, err := resolver.Resolve(context.Background(), "upload.qiniup.com")
if err != nil {
t.Fatal(err)
}
if len(ips) != 1 || !ips[0].Equal(net.IPv4(1, 1, 1, 1)) {
t.Fatal("Unexpected ips")
}
}
if mr.c["upload.qiniup.com"] != 1 {
t.Fatal("Unexpected cache")
}
}
4 changes: 2 additions & 2 deletions storagev2/region/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ func (opts *BucketRegionsQueryOptions) toBytes() []byte {
bytes = strconv.AppendInt(bytes, int64(opts.RetryMax), 36)
bytes = strconv.AppendInt(bytes, int64(opts.HostFreezeDuration), 36)
if opts.Client != nil {
bytes = strconv.AppendUint(bytes, uint64(uintptr(unsafe.Pointer(&opts.Client))), 10)
bytes = strconv.AppendUint(bytes, uint64(uintptr(unsafe.Pointer(&opts.Client))), 36)
} else {
bytes = strconv.AppendUint(bytes, 0, 10)
bytes = strconv.AppendUint(bytes, 0, 36)
}
return bytes
}
Expand Down

0 comments on commit 742e041

Please sign in to comment.