Skip to content

Commit

Permalink
TUN-7776: Remove warp-routing flag from cloudflared
Browse files Browse the repository at this point in the history
  • Loading branch information
joliveirinha committed Sep 18, 2023
1 parent 3495860 commit fc0ecf4
Show file tree
Hide file tree
Showing 13 changed files with 23 additions and 126 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2023.9.0
### Notices
- The `warp-routing` `enabled: boolean` flag is no longer supported in the configuration file. Warp Routing traffic (eg TCP, UDP, ICMP) traffic is proxied to cloudflared if routes to the target tunnel are configured. This change does not affect remotely managed tunnels, but for locally managed tunnels, users that might be relying on this feature flag to block traffic should instead guarantee that tunnel has no Private Routes configured for the tunnel.
## 2023.7.0
### New Features
- You can now enable additional diagnostics over the management.argotunnel.com service for your active cloudflared connectors via a new runtime flag `--management-diagnostics` (or env `TUNNEL_MANAGEMENT_DIAGNOSTICS`). This feature is provided as opt-in and requires the flag to enable. Endpoints such as /metrics provides your prometheus metrics endpoint another mechanism to be reached. Additionally /debug/pprof/(goroutine|heap) are also introduced to allow for remotely retrieving active pprof information from a running cloudflared connector.
Expand Down
1 change: 0 additions & 1 deletion config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ type Configuration struct {
}

type WarpRoutingConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
}
Expand Down
1 change: 0 additions & 1 deletion config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func TestConfigFileSettings(t *testing.T) {
Service: "https://localhost:8001",
}
warpRouting = WarpRoutingConfig{
Enabled: true,
ConnectTimeout: &CustomDuration{Duration: 2 * time.Second},
TCPKeepAlive: &CustomDuration{Duration: 10 * time.Second},
}
Expand Down
1 change: 0 additions & 1 deletion connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type Orchestrator interface {
UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse
GetConfigJSON() ([]byte, error)
GetOriginProxy() (OriginProxy, error)
WarpRoutingEnabled() (enabled bool)
}

type NamedTunnelProperties struct {
Expand Down
2 changes: 1 addition & 1 deletion connection/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewQUICConnection(
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan)
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger, orchestrator.WarpRoutingEnabled)
packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger)

return &QUICConnection{
session: session,
Expand Down
6 changes: 1 addition & 5 deletions ingress/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@ const (
)

type WarpRoutingConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
ConnectTimeout config.CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
TCPKeepAlive config.CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
}

func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig {
cfg := WarpRoutingConfig{
Enabled: raw.Enabled,
ConnectTimeout: defaultWarpRoutingConnectTimeout,
TCPKeepAlive: defaultTCPKeepAlive,
}
Expand All @@ -65,9 +63,7 @@ func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig {
}

func (c *WarpRoutingConfig) RawConfig() config.WarpRoutingConfig {
raw := config.WarpRoutingConfig{
Enabled: c.Enabled,
}
raw := config.WarpRoutingConfig{}
if c.ConnectTimeout.Duration != defaultWarpRoutingConnectTimeout.Duration {
raw.ConnectTimeout = &c.ConnectTimeout
}
Expand Down
28 changes: 11 additions & 17 deletions ingress/packet_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ type muxer interface {

// PacketRouter routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets
type PacketRouter struct {
globalConfig *GlobalRouterConfig
muxer muxer
logger *zerolog.Logger
checkRouterEnabledFunc func() bool
icmpDecoder *packet.ICMPDecoder
encoder *packet.Encoder
globalConfig *GlobalRouterConfig
muxer muxer
logger *zerolog.Logger
icmpDecoder *packet.ICMPDecoder
encoder *packet.Encoder
}

// GlobalRouterConfig is the configuration shared by all instance of Router.
Expand All @@ -40,14 +39,13 @@ type GlobalRouterConfig struct {
}

// NewPacketRouter creates a PacketRouter that handles ICMP packets. Packets are read from muxer but dropped if globalConfig is nil.
func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *PacketRouter {
func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger) *PacketRouter {
return &PacketRouter{
globalConfig: globalConfig,
muxer: muxer,
logger: logger,
checkRouterEnabledFunc: checkRouterEnabledFunc,
icmpDecoder: packet.NewICMPDecoder(),
encoder: packet.NewEncoder(),
globalConfig: globalConfig,
muxer: muxer,
logger: logger,
icmpDecoder: packet.NewICMPDecoder(),
encoder: packet.NewEncoder(),
}
}

Expand Down Expand Up @@ -92,10 +90,6 @@ func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPac
return
}

if enabled := r.checkRouterEnabledFunc(); !enabled {
return
}

icmpPacket, err := r.icmpDecoder.Decode(rawPacket)
if err != nil {
r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram")
Expand Down
64 changes: 1 addition & 63 deletions ingress/packet_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/netip"
"sync/atomic"
"testing"
"time"

"github.com/google/gopacket/layers"
"github.com/stretchr/testify/require"
Expand All @@ -29,9 +28,7 @@ var (

func TestRouterReturnTTLExceed(t *testing.T) {
muxer := newMockMuxer(0)
routerEnabled := &routerEnabledChecker{}
routerEnabled.set(true)
router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled)
router := NewPacketRouter(packetConfig, muxer, &noopLogger)
ctx, cancel := context.WithCancel(context.Background())
routerStopped := make(chan struct{})
go func() {
Expand Down Expand Up @@ -80,65 +77,6 @@ func TestRouterReturnTTLExceed(t *testing.T) {
<-routerStopped
}

func TestRouterCheckEnabled(t *testing.T) {
muxer := newMockMuxer(0)
routerEnabled := &routerEnabledChecker{}
router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled)
ctx, cancel := context.WithCancel(context.Background())
routerStopped := make(chan struct{})
go func() {
router.Serve(ctx)
close(routerStopped)
}()

pk := packet.ICMP{
IP: &packet.IP{
Src: netip.MustParseAddr("192.168.1.1"),
Dst: netip.MustParseAddr("10.0.0.1"),
Protocol: layers.IPProtocolICMPv4,
TTL: 1,
},
Message: &icmp.Message{
Type: ipv4.ICMPTypeEcho,
Code: 0,
Body: &icmp.Echo{
ID: 12481,
Seq: 8036,
Data: []byte(t.Name()),
},
},
}

// router is disabled
encoder := packet.NewEncoder()
encodedPacket, err := encoder.Encode(&pk)
require.NoError(t, err)
sendPacket := quicpogs.RawPacket(encodedPacket)

muxer.edgeToCfd <- sendPacket
select {
case <-time.After(time.Millisecond * 10):
case <-muxer.cfdToEdge:
t.Error("Unexpected reply when router is disabled")
}
routerEnabled.set(true)
// router is enabled, expects reply
muxer.edgeToCfd <- sendPacket
<-muxer.cfdToEdge

routerEnabled.set(false)
// router is disabled
muxer.edgeToCfd <- sendPacket
select {
case <-time.After(time.Millisecond * 10):
case <-muxer.cfdToEdge:
t.Error("Unexpected reply when router is disabled")
}

cancel()
<-routerStopped
}

func assertTTLExceed(t *testing.T, originalPacket *packet.ICMP, expectedSrc netip.Addr, muxer *mockMuxer) {
encoder := packet.NewEncoder()
rawPacket, err := encoder.Encode(originalPacket)
Expand Down
2 changes: 0 additions & 2 deletions orchestration/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func TestNewLocalConfig_MarshalJSON(t *testing.T) {
}
],
"warp-routing": {
"enabled": true,
"connectTimeout": 1
}
}
Expand All @@ -83,7 +82,6 @@ func TestNewLocalConfig_MarshalJSON(t *testing.T) {
require.NoError(t, err)

require.Equal(t, remoteConfig.WarpRouting, ingress.WarpRoutingConfig{
Enabled: true,
ConnectTimeout: config.CustomDuration{
Duration: time.Second,
},
Expand Down
18 changes: 4 additions & 14 deletions orchestration/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ type Orchestrator struct {
// Underlying value is proxy.Proxy, can be read without the lock, but still needs the lock to update
proxy atomic.Value
// Set of internal ingress rules defined at cloudflared startup (separate from user-defined ingress rules)
internalRules []ingress.Rule
warpRoutingEnabled atomic.Bool
config *Config
tags []tunnelpogs.Tag
log *zerolog.Logger
internalRules []ingress.Rule
config *Config
tags []tunnelpogs.Tag
log *zerolog.Logger

// orchestrator must not handle any more updates after shutdownC is closed
shutdownC <-chan struct{}
Expand Down Expand Up @@ -136,11 +135,6 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i
o.proxy.Store(proxy)
o.config.Ingress = &ingressRules
o.config.WarpRouting = warpRouting
if warpRouting.Enabled {
o.warpRoutingEnabled.Store(true)
} else {
o.warpRoutingEnabled.Store(false)
}

// If proxyShutdownC is nil, there is no previous running proxy
if o.proxyShutdownC != nil {
Expand Down Expand Up @@ -209,10 +203,6 @@ func (o *Orchestrator) GetOriginProxy() (connection.OriginProxy, error) {
return proxy, nil
}

func (o *Orchestrator) WarpRoutingEnabled() bool {
return o.warpRoutingEnabled.Load()
}

func (o *Orchestrator) waitToCloseLastProxy() {
<-o.shutdownC
o.lock.Lock()
Expand Down
16 changes: 0 additions & 16 deletions orchestration/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func TestUpdateConfiguration(t *testing.T) {
initOriginProxy, err := orchestrator.GetOriginProxy()
require.NoError(t, err)
require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
require.False(t, orchestrator.WarpRoutingEnabled())

configJSONV2 := []byte(`
{
Expand Down Expand Up @@ -87,7 +86,6 @@ func TestUpdateConfiguration(t *testing.T) {
}
],
"warp-routing": {
"enabled": true,
"connectTimeout": 10
}
}
Expand Down Expand Up @@ -126,8 +124,6 @@ func TestUpdateConfiguration(t *testing.T) {
require.Equal(t, config.CustomDuration{Duration: time.Second * 90}, configV2.Ingress.Rules[2].Config.ConnectTimeout)
require.Equal(t, false, configV2.Ingress.Rules[2].Config.NoTLSVerify)
require.Equal(t, true, configV2.Ingress.Rules[2].Config.NoHappyEyeballs)
require.True(t, configV2.WarpRouting.Enabled)
require.Equal(t, configV2.WarpRouting.Enabled, orchestrator.WarpRoutingEnabled())
require.Equal(t, configV2.WarpRouting.ConnectTimeout.Duration, 10*time.Second)

originProxyV2, err := orchestrator.GetOriginProxy()
Expand Down Expand Up @@ -162,7 +158,6 @@ func TestUpdateConfiguration(t *testing.T) {
}
],
"warp-routing": {
"enabled": false
}
}
`)
Expand All @@ -171,8 +166,6 @@ func TestUpdateConfiguration(t *testing.T) {
require.Len(t, configV10.Ingress.Rules, 1)
require.True(t, configV10.Ingress.Rules[0].Matches("blogs.tunnel.io", "/2022/02/10"))
require.Equal(t, ingress.HelloWorldService, configV10.Ingress.Rules[0].Service.String())
require.False(t, configV10.WarpRouting.Enabled)
require.Equal(t, configV10.WarpRouting.Enabled, orchestrator.WarpRoutingEnabled())

originProxyV10, err := orchestrator.GetOriginProxy()
require.NoError(t, err)
Expand All @@ -191,7 +184,6 @@ func TestUpdateConfiguration_FromMigration(t *testing.T) {
initOriginProxy, err := orchestrator.GetOriginProxy()
require.NoError(t, err)
require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
require.False(t, orchestrator.WarpRoutingEnabled())

configJSONV2 := []byte(`
{
Expand All @@ -201,7 +193,6 @@ func TestUpdateConfiguration_FromMigration(t *testing.T) {
}
],
"warp-routing": {
"enabled": true
}
}
`)
Expand Down Expand Up @@ -271,7 +262,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
}
],
"warp-routing": {
"enabled": true
}
}
`, hostname, httpOrigin.URL, expectedHost))
Expand All @@ -283,7 +273,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
}
],
"warp-routing": {
"enabled": false
}
}
`)
Expand All @@ -296,7 +285,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
}
],
"warp-routing": {
"enabled": true
}
}
`)
Expand Down Expand Up @@ -516,7 +504,6 @@ func TestClosePreviousProxies(t *testing.T) {
}
],
"warp-routing": {
"enabled": true
}
}
`, hostname))
Expand All @@ -529,7 +516,6 @@ func TestClosePreviousProxies(t *testing.T) {
}
],
"warp-routing": {
"enabled": true
}
}
`)
Expand Down Expand Up @@ -612,7 +598,6 @@ func TestPersistentConnection(t *testing.T) {
}
],
"warp-routing": {
"enabled": true
}
}
`, wsOrigin.URL))
Expand Down Expand Up @@ -679,7 +664,6 @@ func TestPersistentConnection(t *testing.T) {
}
],
"warp-routing": {
"enabled": false
}
}
`)
Expand Down
6 changes: 2 additions & 4 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ func NewOriginProxy(
tags: tags,
log: log,
}
if warpRouting.Enabled {
proxy.warpRouting = ingress.NewWarpRoutingService(warpRouting)
log.Info().Msgf("Warp-routing is enabled")
}

proxy.warpRouting = ingress.NewWarpRoutingService(warpRouting)

return proxy
}
Expand Down
1 change: 0 additions & 1 deletion proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ var (
testTags = []tunnelpogs.Tag{{Name: "Name", Value: "value"}}
noWarpRouting = ingress.WarpRoutingConfig{}
testWarpRouting = ingress.WarpRoutingConfig{
Enabled: true,
ConnectTimeout: config.CustomDuration{Duration: time.Second},
}
)
Expand Down

0 comments on commit fc0ecf4

Please sign in to comment.