Skip to content

Commit

Permalink
Add QUIC support
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Oct 24, 2024
1 parent 5be4d7e commit 6a6a2a5
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 37 deletions.
33 changes: 21 additions & 12 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -823,16 +823,19 @@ defaults:
interceptors: []
# @schema {"name": "defaults.grpc.client.dial_option.net", "type": "object", "anchor": "net"}
net:
# @schema {"name": "defaults.grpc.client.dial_option.net.network", "type": "string", "enum": ["tcp", "udp", "unix"]}
# defaults.grpc.client.dial_option.net.network -- gRPC client dialer network type
network: tcp
# @schema {"name": "defaults.grpc.client.dial_option.net.dns", "type": "object"}
dns:
# @schema {"name": "defaults.grpc.client.dial_option.net.dns.cache_enabled", "type": "boolean"}
# defaults.grpc.client.dial_option.net.dns.cache_enabled -- gRPC client TCP DNS cache enabled
# defaults.grpc.client.dial_option.net.dns.cache_enabled -- gRPC client DNS cache enabled
cache_enabled: true
# @schema {"name": "defaults.grpc.client.dial_option.net.dns.refresh_duration", "type": "string"}
# defaults.grpc.client.dial_option.net.dns.refresh_duration -- gRPC client TCP DNS cache refresh duration
# defaults.grpc.client.dial_option.net.dns.refresh_duration -- gRPC client DNS cache refresh duration
refresh_duration: 30m
# @schema {"name": "defaults.grpc.client.dial_option.net.dns.cache_expiration", "type": "string"}
# defaults.grpc.client.dial_option.net.dns.cache_expiration -- gRPC client TCP DNS cache expiration
# defaults.grpc.client.dial_option.net.dns.cache_expiration -- gRPC client DNS cache expiration
cache_expiration: 1h
# @schema {"name": "defaults.grpc.client.dial_option.net.dialer", "type": "object"}
dialer:
Expand Down Expand Up @@ -1815,12 +1818,14 @@ gateway:
gateway_config:
# @schema {"name": "gateway.mirror.gateway_config.net", "alias": "net"}
net:
# gateway.mirror.gateway_config.net.network.cache_enabled -- gRPC client dialer network type
network: tcp
dns:
# gateway.mirror.gateway_config.net.dns.cache_enabled -- TCP DNS cache enabled
# gateway.mirror.gateway_config.net.dns.cache_enabled -- DNS cache enabled
cache_enabled: true
# gateway.mirror.gateway_config.net.dns.refresh_duration -- TCP DNS cache refresh duration
# gateway.mirror.gateway_config.net.dns.refresh_duration -- DNS cache refresh duration
refresh_duration: 5m
# gateway.mirror.gateway_config.net.dns.cache_expiration -- TCP DNS cache expiration
# gateway.mirror.gateway_config.net.dns.cache_expiration -- DNS cache expiration
cache_expiration: 24h
dialer:
# gateway.mirror.gateway_config.net.dialer.timeout -- TCP dialer timeout
Expand Down Expand Up @@ -2630,12 +2635,14 @@ agent:
client:
# @schema {"name": "agent.sidecar.config.client.net", "alias": "net"}
net:
# agent.sidecar.config.client.net.network -- gRPC client dialer network type
network: tcp
dns:
# agent.sidecar.config.client.net.dns.cache_enabled -- HTTP client TCP DNS cache enabled
# agent.sidecar.config.client.net.dns.cache_enabled -- HTTP client DNS cache enabled
cache_enabled: true
# agent.sidecar.config.client.net.dns.refresh_duration -- HTTP client TCP DNS cache refresh duration
# agent.sidecar.config.client.net.dns.refresh_duration -- HTTP client DNS cache refresh duration
refresh_duration: 1h
# agent.sidecar.config.client.net.dns.refresh_duration -- HTTP client TCP DNS cache expiration
# agent.sidecar.config.client.net.dns.refresh_duration -- HTTP client DNS cache expiration
cache_expiration: 24h
dialer:
# agent.sidecar.config.client.net.dialer.timeout -- HTTP client TCP dialer connect timeout
Expand Down Expand Up @@ -3005,12 +3012,14 @@ discoverer:
fields: {}
# @schema {"name": "discoverer.discoverer.net", "alias": "net"}
net:
# discoverer.discoverer.net.network -- gRPC client dialer network type
network: tcp
dns:
# discoverer.discoverer.net.dns.cache_enabled -- TCP DNS cache enabled
# discoverer.discoverer.net.dns.cache_enabled -- DNS cache enabled
cache_enabled: true
# discoverer.discoverer.net.dns.refresh_duration -- TCP DNS cache refresh duration
# discoverer.discoverer.net.dns.refresh_duration -- DNS cache refresh duration
refresh_duration: 5m
# discoverer.discoverer.net.dns.cache_expiration -- TCP DNS cache expiration
# discoverer.discoverer.net.dns.cache_expiration -- DNS cache expiration
cache_expiration: 24h
dialer:
# discoverer.discoverer.net.dialer.timeout -- TCP dialer timeout
Expand Down
6 changes: 5 additions & 1 deletion internal/config/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,12 @@ func (g *GRPCClient) Opts() ([]grpc.Option, error) {
if err != nil {
return nil, err
}
network := g.DialOption.Net.Network
if network == "" {
network = net.TCP.String()
}
opts = append(opts,
grpc.WithDialer(der),
grpc.WithDialer(network, der),
)
}

Expand Down
1 change: 1 addition & 0 deletions internal/config/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

// Net represents the network configuration tcp, udp, unix domain socket.
type Net struct {
Network string `json:"network,omitempty" yaml:"network"`
DNS *DNS `json:"dns,omitempty" yaml:"dns"`
Dialer *Dialer `json:"dialer,omitempty" yaml:"dialer"`
SocketOption *SocketOption `json:"socket_option,omitempty" yaml:"socket_option"`
Expand Down
18 changes: 15 additions & 3 deletions internal/net/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"syscall"
"time"

"github.com/cloudwego/netpoll"

Check failure on line 28 in internal/net/dialer.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

cannot find module providing package github.com/cloudwego/netpoll: import lookup disabled by -mod=readonly

Check failure on line 28 in internal/net/dialer.go

View workflow job for this annotation

GitHub Actions / grpc-stream

cannot find module providing package github.com/cloudwego/netpoll: import lookup disabled by -mod=readonly
"github.com/vdaas/vald/internal/cache"
"github.com/vdaas/vald/internal/cache/cacher"
"github.com/vdaas/vald/internal/errors"
Expand Down Expand Up @@ -61,6 +62,7 @@ type dialer struct {
dialerDualStack bool
addrs sync.Map[string, *addrInfo]
der *net.Dialer
npDialer netpoll.Dialer
dialer func(ctx context.Context, network, addr string) (Conn, error)
}

Expand Down Expand Up @@ -114,6 +116,9 @@ func NewDialer(opts ...DialerOption) (der Dialer, err error) {
},
}

netpoll.SetLoadBalance(netpoll.RoundRobin)
d.npDialer = netpoll.NewDialer()

d.dialer = d.dial
if d.enableDNSCache {
if d.dnsRefreshDuration > d.dnsCacheExpiration {
Expand Down Expand Up @@ -318,8 +323,15 @@ func (d *dialer) dial(ctx context.Context, network, addr string) (conn Conn, err
}
}()
log.Debugf("%s connection dialing to addr %s", network, addr)
err = safety.RecoverWithoutPanicFunc(func() error {
conn, err = d.der.DialContext(ctx, network, addr)
err = safety.RecoverWithoutPanicFunc(func() (err error) {
if IsUDP(network) {
conn, err = quic.DialContext(ctx, addr, d.tlsConfig)
} else {
conn, err = d.npDialer.DialConnection(network, addr, d.der.Timeout)
if err != nil {
conn, err = d.der.DialContext(ctx, network, addr)
}
}
return err
})()
if err != nil {
Expand All @@ -336,7 +348,7 @@ func (d *dialer) dial(ctx context.Context, network, addr string) (conn Conn, err
}

d.tmu.RLock()
if d.tlsConfig != nil {
if !IsUDP(network) && d.tlsConfig != nil {
d.tmu.RUnlock()
return d.tlsHandshake(ctx, conn, network, addr)
}
Expand Down
16 changes: 12 additions & 4 deletions internal/net/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,18 +485,26 @@ func WithKeepaliveParams(t, to string, permitWithoutStream bool) Option {
}
}

func WithDialer(der net.Dialer) Option {
func WithDialer(network string, der net.Dialer) Option {
return func(g *gRPCClient) {
if der != nil {
g.dialer = der
if g.dopts == nil && cap(g.dopts) == 0 {
g.dopts = make([]grpc.DialOption, 0, defaultDialOptionLength)
}
nt := net.NetworkTypeFromString(network)
switch nt {
case net.UDP, net.UDP4, net.UDP6:
nt = net.UDP
case net.UNIX, net.UNIXGRAM, net.UNIXPACKET:
nt = net.UNIX
default:
nt = net.TCP
}
g.dopts = append(g.dopts,
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
// TODO we need change network type dynamically
log.Debugf("gRPC context Dialer addr is %s", addr)
return der.GetDialer()(ctx, net.TCP.String(), addr)
log.Debugf("gRPC context Dialer for network %s, addr is %s", nt.String(), addr)
return g.dialer.GetDialer()(ctx, nt.String(), addr)
}),
)
}
Expand Down
28 changes: 28 additions & 0 deletions internal/net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
)

type (
// Addr is an alias of net.Addr.
Addr = net.Addr

// Conn is an alias of net.Conn.
Conn = net.Conn

Expand All @@ -46,6 +49,15 @@ type (
// Resolver is an alias of net.Resolver.
Resolver = net.Resolver

// UDPConn is an alias of net.UDPConn.
UDPConn = net.UDPConn

// TCPListener is an alias of net.TCPListener.
TCPListener = net.TCPListener

// UnixListener is an alias of net.UnixListener.
UnixListener = net.UnixListener

// NetworkType represents a network type such as TCP, TCP6, etc.
NetworkType uint
)
Expand Down Expand Up @@ -151,6 +163,22 @@ func IsLocal(host string) bool {
host == localIPv6
}

// IsUDP returns if the network type is the udp or udp4 or udp6.
func IsUDP(network string) bool {
rip := NetworkTypeFromString(network)
return rip == UDP ||
rip == UDP4 ||
rip == UDP6
}

// IsTCP returns if the network type is the tcp or tcp4 or tcp6.
func IsTCP(network string) bool {
rip := NetworkTypeFromString(network)
return rip == TCP ||
rip == TCP4 ||
rip == TCP6
}

// Parse parses the hostname, IPv4 or IPv6 address and return the hostname/IP, port number,
// whether the address is local IP and IPv4 or IPv6, and any parsing error occurred.
// The address should contains the port number, otherwise an error will return.
Expand Down
82 changes: 82 additions & 0 deletions internal/net/quic/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (C) 2019-2022 vdaas.org vald team <vald@vdaas.org>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package quic

import (
"context"

quic "github.com/quic-go/quic-go"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/tls"
)

type Conn struct {
quic.Connection
quic.Stream
}

type qconn struct {
connectionCache sync.Map
}

var defaultQconn = new(qconn)

func NewConn(ctx context.Context, conn quic.Connection) (net.Conn, error) {
stream, err := conn.OpenStreamSync(ctx)
if err != nil {
return nil, err
}
return &Conn{
Connection: conn,
Stream: stream,
}, nil
}

func (c *Conn) Close() (err error) {
return c.Stream.Close()
}

func DialContext(ctx context.Context, addr string, tcfg *tls.Config) (net.Conn, error) {
return defaultQconn.dialQuicContext(ctx, addr, tcfg)
}

func (q *qconn) dialQuicContext(ctx context.Context, addr string, tcfg *tls.Config) (net.Conn, error) {
si, ok := q.connectionCache.Load(addr)
if ok {
if conn, ok := si.(quic.Connection); ok {
return NewConn(ctx, conn)
}
}
conn, err := quic.DialAddrContext(ctx, addr, tcfg, nil)
if err != nil {
return nil, err
}
q.connectionCache.Store(addr, conn)
return NewConn(ctx, conn)
}

func (q *qconn) Close() (err error) {
q.connectionCache.Range(func(addr, si interface{}) bool {
if conn, ok := si.(quic.Connection); ok {
e := conn.CloseWithError(0, addr.(string))
if e != nil {
err = errors.Wrap(err, e.Error())
}
}
return true
})
return nil
}
76 changes: 76 additions & 0 deletions internal/net/quic/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (C) 2019-2022 vdaas.org vald team <vald@vdaas.org>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package quic

import (
"context"

quic "github.com/quic-go/quic-go"
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/tls"
)

type Listener struct {
quic.Listener

ctx context.Context
}

func Listen(ctx context.Context, addr string, tcfg *tls.Config) (net.Listener, error) {
ql, err := quic.ListenAddr(addr, tcfg, &quic.Config{
// Versions: nil,
// ConnectionIDLength: 0,
// HandshakeIdleTimeout: 0,
// MaxIdleTimeout: 0,
// AcceptToken: func(clientAddr net.Addr, token *quic.Token) bool {
// return true
// },
// TokenStore: quic.NewLRUTokenStore(clientAddr),
// InitialStreamReceiveWindow: 0,
// InitialConnectionReceiveWindow: 0,
// MaxStreamReceiveWindow: 0,
// MaxConnectionReceiveWindow: 0,
// MaxIncomingStreams: 0,
// MaxIncomingUniStreams: 0,
// StatelessResetKey: nil,
// KeepAlive: true,
// DisablePathMTUDiscovery: false,
EnableDatagrams: true,
// Tracer: logging.NewMultiplexedTracer(),
})
if err != nil {
return nil, err
}
return &Listener{
Listener: ql,
ctx: ctx,
}, nil
}

func (l *Listener) Accept() (net.Conn, error) {
sess, err := l.Listener.Accept(l.ctx)
if err != nil {
return nil, err
}

stream, err := sess.AcceptStream(l.ctx)
if err != nil {
return nil, err
}

return &Conn{
Session: sess,
Stream: stream,
}, nil
}
Loading

0 comments on commit 6a6a2a5

Please sign in to comment.