Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[patch] Test/internal/tcp #501

Merged
merged 53 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
1572b71
refactor
kevindiu Jun 22, 2020
dfd2ef8
bug fix
kevindiu Jun 22, 2020
46a3852
add test code
kevindiu Jun 22, 2020
eacb1d9
add test case
kevindiu Jun 23, 2020
5e1db67
improve performance
kevindiu Jun 23, 2020
824ca0f
fix test
kevindiu Jun 24, 2020
d45c690
fix template and rename file
kevindiu Jun 24, 2020
25a9a4c
add option test and missing file
kevindiu Jun 24, 2020
4061b1e
refactor
kevindiu Jun 25, 2020
ada6bdb
fix
kevindiu Jun 25, 2020
00383cf
use thread safe
kevindiu Jun 25, 2020
2b82375
fix
kevindiu Jun 25, 2020
17f358d
Apply suggestions from code review
kevindiu Jun 26, 2020
95ae82f
fix
Jun 26, 2020
2175dec
fix failed test case
kevindiu Jun 26, 2020
f324eef
fix
kevindiu Jun 26, 2020
6611d2e
fix
kevindiu Jul 1, 2020
b9a6e35
add test case
kevindiu Jul 1, 2020
43e932e
fix
kevindiu Jul 1, 2020
5f6026b
fix
kevindiu Jul 2, 2020
b492247
fix
kevindiu Jul 2, 2020
3d685d0
fix
kevindiu Jul 2, 2020
bb390a1
fix
kevindiu Jul 3, 2020
5dd306f
fix
kevindiu Jul 3, 2020
b657639
fix
kevindiu Jul 3, 2020
1e2405b
fix
kevindiu Jul 3, 2020
49eb18d
fix
kevindiu Jul 3, 2020
6f92a38
fix
kevindiu Jul 6, 2020
b123584
revert change of cache bug fix
kevindiu Jul 8, 2020
8d1a37a
Merge remote-tracking branch 'origin/master' into test/internal/tcp
kevindiu Jul 8, 2020
082ad0c
fix lint
kevindiu Jul 8, 2020
1f2f3d6
fix
kevindiu Jul 9, 2020
e727048
:robot: Update license headers / Format go codes and yaml files
vdaas-ci Jul 9, 2020
ee488ac
fix comment
kevindiu Jul 9, 2020
2408929
Apply suggestions from code review
kevindiu Jul 9, 2020
753a342
add connection check testing
kevindiu Jul 10, 2020
6c16247
fix
kevindiu Jul 10, 2020
37267e5
Merge branch 'master' into test/internal/tcp
kevindiu Jul 10, 2020
d2ae7fe
Merge branch 'master' into test/internal/tcp
kevindiu Jul 13, 2020
7b82188
Merge branch 'master' into test/internal/tcp
Jul 13, 2020
1590cdf
fix
kevindiu Jul 13, 2020
94e3929
Merge branch 'master' into test/internal/tcp
kevindiu Jul 14, 2020
cee2f5e
add comment to option.go
kevindiu Jul 14, 2020
9168656
Update internal/net/tcp/option.go
kevindiu Jul 14, 2020
0bac1e7
Merge branch 'master' into test/internal/tcp
Jul 16, 2020
82ae5e3
fix deepsource
kevindiu Jul 17, 2020
79b9b36
fix
kevindiu Jul 17, 2020
0b443b6
Apply suggestions from code review
kevindiu Jul 17, 2020
b353a7c
remove failed and unused test
kevindiu Jul 17, 2020
393c7f7
Update internal/net/tcp/dialer.go
kevindiu Jul 17, 2020
550685e
Update internal/net/tcp/dialer.go
kevindiu Jul 17, 2020
d0a06d8
rename function name
kevindiu Jul 17, 2020
c5b390d
fix comment
kevindiu Jul 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions internal/errors/net.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
vankichi marked this conversation as resolved.
Show resolved Hide resolved

// Package errors provides error types and function
package errors

import "time"

var (
// tcp

ErrFailedInitDialer = New("failed to init dialer")
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
ErrInvalidDNSConfig = func(dnsRefreshDur, dnsCacheExp time.Duration) error {
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
return Errorf("dnsRefreshDuration > dnsCacheExp, %s, %s", dnsRefreshDur, dnsCacheExp)
}
)
161 changes: 88 additions & 73 deletions internal/net/tcp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ package tcp
import (
"context"
"crypto/tls"
"strings"
"fmt"
"sync/atomic"
"time"

"github.com/vdaas/vald/internal/cache"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/safety"
)

// Dialer is an interface to get the dialer instance to connect to an address.
type Dialer interface {
GetDialer() func(ctx context.Context, network, addr string) (net.Conn, error)
StartDialerCache(ctx context.Context)
Expand All @@ -50,6 +53,27 @@ type dialer struct {
dialer func(ctx context.Context, network, addr string) (net.Conn, error)
}

type dialerCache struct {
ips []string
cnt uint32
}

// GetIP returns the next cached IP address in round robin order.
// It starts getting the index 1 cache instead of index 0 cache.
func (d *dialerCache) GetIP() string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please change function naming to IP()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed :)

if d.Len() == 1 {
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
return d.ips[0]
}

return d.ips[atomic.AddUint32(&d.cnt, 1)%d.Len()]
}

// Len returns the length of cached IP addresses
func (d *dialerCache) Len() uint32 {
return uint32(len(d.ips))
}

// NewDialer initialize and return the dialer instance
func NewDialer(opts ...DialerOption) (der Dialer, err error) {
d := new(dialer)
for _, opt := range append(defaultDialerOptions, opts...) {
Expand All @@ -63,52 +87,25 @@ func NewDialer(opts ...DialerOption) (der Dialer, err error) {
Control: Control,
}

if !d.dnsCache {
if d.tlsConfig != nil {
d.dialer = func(ctx context.Context, network,
addr string) (conn net.Conn, err error) {
conn, err = d.der.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
return tls.Client(conn, d.tlsConfig), nil
}
} else {
d.dialer = d.der.DialContext
}
d.der.Resolver = &net.Resolver{
PreferGo: false,
Dial: d.dialer,
}
return d, nil
}
d.dialer = d.dial

if d.dnsRefreshDuration > d.dnsCacheExpiration {
d.dnsRefreshDuration, d.dnsCacheExpiration =
d.dnsCacheExpiration, d.dnsRefreshDuration
d.dnsRefreshDurationStr, d.dnsCacheExpirationStr =
d.dnsCacheExpirationStr, d.dnsRefreshDurationStr
}
if d.dnsCache {
if d.dnsRefreshDuration > d.dnsCacheExpiration {
return nil, errors.ErrInvalidDNSConfig(d.dnsRefreshDuration, d.dnsCacheExpiration)
}

if d.cache == nil {
d.cache, err = cache.New(
cache.WithExpireDuration(d.dnsCacheExpirationStr),
cache.WithExpireCheckDuration(d.dnsRefreshDurationStr),
cache.WithExpiredHook(func(ctx context.Context, addr string) {
if err := safety.RecoverFunc(func() (err error) {
_, err = d.lookup(ctx, addr)
return err
}); err != nil {
log.Error(err)
}
}),
)
if err != nil {
return nil, err
if d.cache == nil {
if d.cache, err = cache.New(
cache.WithExpireDuration(d.dnsCacheExpirationStr),
cache.WithExpireCheckDuration(d.dnsRefreshDurationStr),
cache.WithExpiredHook(d.cacheExpireHook),
); err != nil {
return nil, err
}
}
}

d.dialer = d.cachedDialer
d.dialer = d.cachedDialer
}

d.der.Resolver = &net.Resolver{
PreferGo: false,
Expand All @@ -118,70 +115,88 @@ func NewDialer(opts ...DialerOption) (der Dialer, err error) {
return d, nil
}

func (d *dialer) GetDialer() func(ctx context.Context,
network, addr string) (net.Conn, error) {
// GetDialer returns a function to return the connection
func (d *dialer) GetDialer() func(ctx context.Context, network, addr string) (net.Conn, error) {
return d.dialer
}

func (d *dialer) lookup(ctx context.Context,
addr string) (ips map[int]string, err error) {
cache, ok := d.cache.Get(addr)
func (d *dialer) lookup(ctx context.Context, host string) (*dialerCache, error) {
cache, ok := d.cache.Get(host)
if ok {
return cache.(map[int]string), nil
return cache.(*dialerCache), nil
}

r, err := d.der.Resolver.LookupIPAddr(ctx, addr)
r, err := d.der.Resolver.LookupIPAddr(ctx, host)
if err != nil {
return nil, err
}

ips = make(map[int]string, len(r))
for i, ip := range r {
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
ips[i] = ip.String()
dc := &dialerCache{
ips: make([]string, 0, len(r)),
}
for _, ip := range r {
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
dc.ips = append(dc.ips, ip.String())
}

d.cache.Set(addr, ips)
return ips, nil
d.cache.Set(host, dc)
return dc, nil
}

// StartDialerCache starts the dialer cache to expire the cache automatically
func (d *dialer) StartDialerCache(ctx context.Context) {
if d.dnsCache && d.cache != nil {
d.cache.Start(ctx)
}
}

// DialContext returns the connection or error base on the input.
// If the DNS cache is enabled, it will lookup the DNS cache in round robin order and return a connection of it.
// Also if TLS is enabled, it will create a TLS connection for it.
func (d *dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
return d.GetDialer()(ctx, network, address)
}

func (d *dialer) cachedDialer(dctx context.Context, network, addr string) (
conn net.Conn, err error) {
sep := strings.LastIndex(addr, ":")

if sep < 0 {
sep = len(addr)
func (d *dialer) cachedDialer(dctx context.Context, network, addr string) (conn net.Conn, err error) {
host, port, isIP, err := net.Parse(addr)
if err != nil {
return nil, err
}

ips, err := d.lookup(dctx, addr[:sep])
if err == nil {
for _, ip := range ips {
conn, err = d.der.DialContext(dctx, network, ip+addr[sep:])
if err == nil {
if d.tlsConfig != nil {
return tls.Client(conn, d.tlsConfig), nil
if d.dnsCache && !isIP {
if dc, err := d.lookup(dctx, host); err == nil {
for i := uint32(0); i < dc.Len(); i++ {
if conn, err := d.dial(dctx, network, fmt.Sprintf("%s:%d", dc.GetIP(), port)); err == nil {
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
vankichi marked this conversation as resolved.
Show resolved Hide resolved
return conn, nil
}
return conn, nil
}
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
d.cache.Delete(host)
}
}
return d.dial(dctx, network, addr)
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
}

func (d *dialer) dial(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := d.der.DialContext(ctx, network, addr)
if err != nil {
defer func(conn net.Conn) {
if conn != nil {
conn.Close()
}
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
}
d.cache.Delete(addr[:sep])
}(conn)
return nil, err
}

conn, err = d.der.DialContext(dctx, network, addr)
if d.tlsConfig != nil {
return tls.Client(conn, d.tlsConfig), nil
}
return
return conn, nil
}

func (d *dialer) cacheExpireHook(ctx context.Context, addr string) {
if err := safety.RecoverFunc(func() error {
_, err1 := d.lookup(ctx, addr)
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do not use numbering variable

Suggested change
_, err1 := d.lookup(ctx, addr)
_, err1 := d.lookup(ctx, addr)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return err1
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
})(); err != nil {
log.Errorf("DNS cacheExpireHook error occurred: %v", err)
}
}
Loading