From 232964517e2a4477d8be8e48ccab9f53ec8da1c9 Mon Sep 17 00:00:00 2001 From: Joao Morais Date: Sat, 31 Jul 2021 15:13:18 -0300 Subject: [PATCH] add close-sessions-duration config key Active sessions of a stopping instance (haproxy -sf -x) are terminated just after hard-stop-after (our timeout-stop) times out and just before the old haproxy instance terminates. All the sessions at the same time. This option adds the ability to start the shutdown session at a configured time before the instance times out. This is done on top of two new implementations: now sockets can be configured to leave the connection open, so we have a way to reach the stopping instance since its admin socket isn't being listened anymore (standalone/embedded only, we could reach an old instance using master/ worker mode). The other one is a new struct and associated funcs called connections, that holds the master socket (if used), admin sockets, a list of stopping instances, and also controls concurrent access. The instances list can be used by functionalities that need to read or change their state. close-sessions-duration is built on top of them. A new goroutine starts when `timeout - duration` time has passed after the reload, and fairly distributes shutdowns along the remaining time of the instance. --- .../en/docs/configuration/command-line.md | 19 +++ docs/content/en/docs/configuration/keys.md | 38 +++++ pkg/common/ingress/controller/controller.go | 1 + pkg/common/ingress/controller/launch.go | 6 + pkg/controller/controller.go | 7 +- pkg/converters/ingress/annotations/global.go | 46 ++++++ .../ingress/annotations/global_test.go | 83 ++++++++++ pkg/converters/ingress/annotations/updater.go | 4 +- pkg/converters/ingress/types/global.go | 1 + pkg/converters/types/options.go | 2 + pkg/haproxy/connections.go | 150 ++++++++++++++++++ pkg/haproxy/dynupdate.go | 4 +- pkg/haproxy/dynupdate_test.go | 3 +- pkg/haproxy/instance.go | 61 +++---- pkg/haproxy/types/types.go | 3 + rootfs/etc/templates/haproxy/haproxy.tmpl | 3 + 16 files changed, 390 insertions(+), 41 deletions(-) create mode 100644 pkg/haproxy/connections.go diff --git a/docs/content/en/docs/configuration/command-line.md b/docs/content/en/docs/configuration/command-line.md index ecc19d764..f94b23bea 100644 --- a/docs/content/en/docs/configuration/command-line.md +++ b/docs/content/en/docs/configuration/command-line.md @@ -50,6 +50,7 @@ The following command-line options are supported: | [`--stats-collect-processing-period`](#stats) | time | `500ms` | v0.10 | | [`--sync-period`](#sync-period) | time | `10m` | | | [`--tcp-services-configmap`](#tcp-services-configmap) | namespace/configmapname | no tcp svc | | +| [`--track-old-instances`](#track-old-instances) | [true\|false] | `false` | v0.14 | | [`--update-status`](#update-status) | [true\|false] | `true` | | | [`--update-status-on-shutdown`](#update-status-on-shutdown) | [true\|false] | `true` | | | [`--v`](#v) | log level as integer | `1` | | @@ -486,6 +487,24 @@ See also: --- +## --track-old-instances + +Since v0.14 + +Creates an internal list of connections to old HAProxy instances. These connections are used to +read or send data to stopping instances, which is usually serving long lived connections like +TCP services or websockets. + +Enabling this option will make old HAProxy instances to not stop before `timeout-stop` timeout, +even if all the remaining sessions finish, so only enable it if using a feature that requests +it. + +See also: + +* [`close-sessions-duration`]({{% relref "keys#close-sessions-duration" %}}) configuration key + +--- + ## --update-status Indicates whether the ingress controller should update the `status` attribute of all the Ingress diff --git a/docs/content/en/docs/configuration/keys.md b/docs/content/en/docs/configuration/keys.md index 43d57ff18..222bd80c5 100644 --- a/docs/content/en/docs/configuration/keys.md +++ b/docs/content/en/docs/configuration/keys.md @@ -337,6 +337,7 @@ The table below describes all supported configuration keys. | [`blue-green-header`](#blue-green) | `HeaderName:LabelName` pair | Backend | | | [`blue-green-mode`](#blue-green) | [pod\|deploy] | Backend | | | [`cert-signer`](#acme) | "acme" | Host | | +| [`close-sessions-duration`](#close-sessions-duration) | time with suffix or percentage | Global | leave sessions open | | [`config-backend`](#configuration-snippet) | multiline backend config | Backend | | | [`config-defaults`](#configuration-snippet) | multiline config for the defaults section | Global | | | [`config-frontend`](#configuration-snippet) | multiline HTTP and HTTPS frontend config | Global | | @@ -1135,6 +1136,43 @@ See also: --- +## Close sessions duration + +| Configuration key | Scope | Default | Since | +|---------------------------|----------|----------|-------| +| `close-sessions-duration` | `Global` | | v0.14 | + +Defines the amount of time used to close active sessions before a stopping instance times out +and terminates. A stopping instance is an haproxy that doesn't listen sockets anymore, has an +old configuration, and it's just waiting remaining connections to terminate. + +Long lived sessions, like websockets or TCP connections, are usually closed only when the +`timeout-stop` of the old instance expires. Depending on how the clients are configured, +all the disconnected clients will reconnect almost at the same time. `close-sessions-duration` +configures the amount of time used to fairly distribute the sessions shutdown, so distributing +client reconnections to the new HAProxy instance. + +The default behavior is to not anticipate the disconnections, so all the active sessions will +be closed at the same time when `timeout-stop` expires. `close-sessions-duration` will only +take effect if `timeout-stop` configuration key and `--track-old-instances` command-line option +are also configured. + +The duration needs a suffix, which can be a time suffix like `s` (seconds), `m` (minutes) or +`h` (hours), or a `%` that represents a percentage of the `timeout-stop` configuration: + +* `10m` means that the last 10 minutes of the `timeout-stop` will be used to distribute sessions shutdown +* `10%` and a `timeout-stop` of `1h`, means that the last 6 minutes of the `timeout-stop` will be used to distribute sessions shutdown + +If the suffix is a time unit, the resulting value should be lower than the `timeout-stop` +configuration. If the suffix is a percentage, the value should be between `2%` and `98%`. + +See also: + +* [`track-old-instances`]({{% relref "command-line#track-old-instances" %}}) command-line option +* [`timeout-stop`](#timeout) configuration key + +--- + ## Configuration snippet | Configuration key | Scope | Default | Since | diff --git a/pkg/common/ingress/controller/controller.go b/pkg/common/ingress/controller/controller.go index 3cec966ca..78e659089 100644 --- a/pkg/common/ingress/controller/controller.go +++ b/pkg/common/ingress/controller/controller.go @@ -99,6 +99,7 @@ type Configuration struct { DefaultHealthzURL string StatsCollectProcPeriod time.Duration PublishService string + TrackOldInstances bool Backend ingress.Controller UpdateStatus bool diff --git a/pkg/common/ingress/controller/launch.go b/pkg/common/ingress/controller/launch.go index e57f9a800..cf94e6f1d 100644 --- a/pkg/common/ingress/controller/launch.go +++ b/pkg/common/ingress/controller/launch.go @@ -236,6 +236,11 @@ k8s endpoint order (default); 'name' - server/endpoint name; 'ip' - server/endpoint IP and port; 'random' - shuffle endpoints on every haproxy reload`) + trackOldInstances = flags.Bool("track-old-instances", false, + `Creates an internal list of connections to old HAProxy instances. These +connections are used to read or send data to stopping instances, which is +usually serving long lived connections like TCP services or websockets.`) + useNodeInternalIP = flags.Bool("report-node-internal-ip-address", false, `Defines if the nodes IP address to be returned in the ingress status should be the internal instead of the external IP address`) @@ -481,6 +486,7 @@ tracked.`) DisablePodList: *disablePodList, DisableExternalName: *disableExternalName, DisableConfigKeywords: *disableConfigKeywords, + TrackOldInstances: *trackOldInstances, UpdateStatusOnShutdown: *updateStatusOnShutdown, BackendShards: *backendShards, SortEndpointsBy: sortEndpoints, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 506d453b5..a8e02a035 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -122,6 +122,8 @@ func (hc *HAProxyController) configController() { instanceOptions := haproxy.InstanceOptions{ HAProxyCfgDir: "/etc/haproxy", HAProxyMapsDir: ingress.DefaultMapsDirectory, + MasterSocket: hc.cfg.MasterSocket, + AdminSocket: "/var/run/haproxy/admin.sock", BackendShards: hc.cfg.BackendShards, AcmeSigner: acmeSigner, AcmeQueue: hc.acmeQueue, @@ -132,6 +134,7 @@ func (hc *HAProxyController) configController() { MaxOldConfigFiles: hc.cfg.MaxOldConfigFiles, SortEndpointsBy: hc.cfg.SortEndpointsBy, StopCh: hc.stopCh, + TrackInstances: hc.cfg.TrackOldInstances, ValidateConfig: hc.cfg.ValidateConfig, } hc.instance = haproxy.CreateInstance(hc.logger, instanceOptions) @@ -143,7 +146,8 @@ func (hc *HAProxyController) configController() { Cache: hc.cache, Tracker: hc.tracker, DynamicConfig: hc.dynamicConfig, - MasterSocket: hc.cfg.MasterSocket, + MasterSocket: instanceOptions.MasterSocket, + AdminSocket: instanceOptions.AdminSocket, AnnotationPrefix: hc.cfg.AnnPrefix, DefaultBackend: hc.cfg.DefaultService, DefaultCrtSecret: hc.cfg.DefaultSSLCertificate, @@ -151,6 +155,7 @@ func (hc *HAProxyController) configController() { FakeCAFile: hc.createFakeCAFile(), DisableKeywords: strings.Split(hc.cfg.DisableConfigKeywords, ","), AcmeTrackTLSAnn: hc.cfg.AcmeTrackTLSAnn, + TrackInstances: hc.cfg.TrackOldInstances, HasGateway: hc.cache.hasGateway(), } } diff --git a/pkg/converters/ingress/annotations/global.go b/pkg/converters/ingress/annotations/global.go index 5669f1c6e..3bbf3e890 100644 --- a/pkg/converters/ingress/annotations/global.go +++ b/pkg/converters/ingress/annotations/global.go @@ -91,6 +91,49 @@ func (c *updater) buildGlobalBind(d *globalData) { } } +func (c *updater) buildGlobalCloseSessions(d *globalData) { + durationCfg := d.mapper.Get(ingtypes.GlobalCloseSessionsDuration).Value + if durationCfg == "" { + return + } + if !c.options.TrackInstances { + c.logger.Warn("ignoring close-sessions-duration config: tracking old instances is disabled") + return + } + timeoutCfg := d.mapper.Get(ingtypes.GlobalTimeoutStop).Value + if timeoutCfg == "" { + c.logger.Warn("ignoring close-sessions-duration config: timeout-stop need to be configured") + return + } + timeout, err := time.ParseDuration(timeoutCfg) + if err != nil { + c.logger.Warn("ignoring close-sessions-duration due to invalid timeout-stop config: %v", err) + return + } + var duration time.Duration + if strings.HasSuffix(durationCfg, "%") { + pct, _ := strconv.Atoi(durationCfg[:len(durationCfg)-1]) + if pct < 2 || pct > 98 { + c.logger.Warn("ignoring '%s' for close-sessions-duration value: value should be between 5%% and 95%%", durationCfg) + return + } + duration = timeout * time.Duration(pct) / 100 + } else { + duration, err = time.ParseDuration(durationCfg) + if err == nil { + if duration >= timeout { + err = fmt.Errorf("close-sessions-duration should be lower than timeout-stop") + } + } + if err != nil { + c.logger.Warn("ignoring invalid close-sessions-duration config: %v", err) + return + } + } + d.global.CloseSessionsDuration = duration + d.global.Timeout.Stats = timeoutCfg +} + func (c *updater) buildGlobalPathTypeOrder(d *globalData) { matchTypes := make(map[hatypes.MatchType]struct{}, len(hatypes.DefaultMatchOrder)) for _, match := range hatypes.DefaultMatchOrder { @@ -225,6 +268,9 @@ func (c *updater) buildGlobalTimeout(d *globalData) { d.global.Timeout.ServerFin = c.validateTime(d.mapper.Get(ingtypes.BackTimeoutServerFin)) d.global.Timeout.Stop = c.validateTime(d.mapper.Get(ingtypes.GlobalTimeoutStop)) d.global.Timeout.Tunnel = c.validateTime(d.mapper.Get(ingtypes.BackTimeoutTunnel)) + if timeoutStop, err := time.ParseDuration(d.global.Timeout.Stop); err == nil { + d.global.TimeoutStopDuration = timeoutStop + } } func (c *updater) buildSecurity(d *globalData) { diff --git a/pkg/converters/ingress/annotations/global_test.go b/pkg/converters/ingress/annotations/global_test.go index 64f83ada7..b42f2401c 100644 --- a/pkg/converters/ingress/annotations/global_test.go +++ b/pkg/converters/ingress/annotations/global_test.go @@ -19,6 +19,7 @@ package annotations import ( "reflect" "testing" + "time" ingtypes "github.com/jcmoraisjr/haproxy-ingress/pkg/converters/ingress/types" convtypes "github.com/jcmoraisjr/haproxy-ingress/pkg/converters/types" @@ -149,6 +150,88 @@ func TestBind(t *testing.T) { } } +func TestCloseSessions(t *testing.T) { + testCases := []struct { + annDuration string + annStop string + expDuration time.Duration + untrack bool + logging string + }{ + // 0 + {}, + // 1 + { + annDuration: "5m", + annStop: "10m", + untrack: true, + logging: `WARN ignoring close-sessions-duration config: tracking old instances is disabled`, + }, + // 2 + { + annDuration: "10m", + logging: `WARN ignoring close-sessions-duration config: timeout-stop need to be configured`, + }, + // 3 + { + annDuration: "10m", + annStop: "10%", + logging: `WARN ignoring close-sessions-duration due to invalid timeout-stop config: time: unknown unit "%" in duration "10%"`, + }, + // 4 + { + annDuration: "1%", + annStop: "10m", + logging: `WARN ignoring '1%' for close-sessions-duration value: value should be between 5% and 95%`, + }, + // 5 + { + annDuration: "99%", + annStop: "10m", + logging: `WARN ignoring '99%' for close-sessions-duration value: value should be between 5% and 95%`, + }, + // 6 + { + annDuration: "10x", + annStop: "10m", + logging: `WARN ignoring invalid close-sessions-duration config: time: unknown unit "x" in duration "10x"`, + }, + // 7 + { + annDuration: "10m", + annStop: "10m", + logging: `WARN ignoring invalid close-sessions-duration config: close-sessions-duration should be lower than timeout-stop`, + }, + // 8 + { + annDuration: "5m", + annStop: "10m", + expDuration: 5 * time.Minute, + }, + // 9 + { + annDuration: "5%", + annStop: "10m", + expDuration: 30 * time.Second, + }, + } + for i, test := range testCases { + c := setup(t) + d := c.createGlobalData(map[string]string{ + ingtypes.GlobalCloseSessionsDuration: test.annDuration, + ingtypes.GlobalTimeoutStop: test.annStop, + }) + u := c.createUpdater() + if !test.untrack { + u.options.TrackInstances = true + } + u.buildGlobalCloseSessions(d) + c.compareObjects("close sessions duration", i, d.global.CloseSessionsDuration, test.expDuration) + c.logger.CompareLogging(test.logging) + c.teardown() + } +} + func TestCustomConfigProxy(t *testing.T) { testCases := []struct { config string diff --git a/pkg/converters/ingress/annotations/updater.go b/pkg/converters/ingress/annotations/updater.go index 92e7b3506..69f38fea5 100644 --- a/pkg/converters/ingress/annotations/updater.go +++ b/pkg/converters/ingress/annotations/updater.go @@ -142,8 +142,7 @@ func (c *updater) UpdateGlobalConfig(haproxyConfig haproxy.Config, mapper *Mappe global: haproxyConfig.Global(), mapper: mapper, } - // TODO Move all magic strings to a single place - d.global.AdminSocket = "/var/run/haproxy/admin.sock" + d.global.AdminSocket = c.options.AdminSocket d.global.MaxConn = mapper.Get(ingtypes.GlobalMaxConnections).Int() d.global.DefaultBackendRedir = mapper.Get(ingtypes.GlobalDefaultBackendRedirect).String() d.global.DefaultBackendRedirCode = mapper.Get(ingtypes.GlobalDefaultBackendRedirectCode).Int() @@ -164,6 +163,7 @@ func (c *updater) UpdateGlobalConfig(haproxyConfig haproxy.Config, mapper *Mappe c.buildGlobalAcme(d) c.buildGlobalAuthProxy(d) c.buildGlobalBind(d) + c.buildGlobalCloseSessions(d) c.buildGlobalCustomConfig(d) c.buildGlobalDNS(d) c.buildGlobalDynamic(d) diff --git a/pkg/converters/ingress/types/global.go b/pkg/converters/ingress/types/global.go index c35255950..78fb159da 100644 --- a/pkg/converters/ingress/types/global.go +++ b/pkg/converters/ingress/types/global.go @@ -33,6 +33,7 @@ const ( GlobalBindIPAddrPrometheus = "bind-ip-addr-prometheus" GlobalBindIPAddrStats = "bind-ip-addr-stats" GlobalBindIPAddrTCP = "bind-ip-addr-tcp" + GlobalCloseSessionsDuration = "close-sessions-duration" GlobalConfigDefaults = "config-defaults" GlobalConfigFrontend = "config-frontend" GlobalConfigGlobal = "config-global" diff --git a/pkg/converters/types/options.go b/pkg/converters/types/options.go index b8c680a67..11cffe6ca 100644 --- a/pkg/converters/types/options.go +++ b/pkg/converters/types/options.go @@ -27,6 +27,7 @@ type ConverterOptions struct { Tracker Tracker DynamicConfig *DynamicConfig MasterSocket string + AdminSocket string DefaultConfig func() map[string]string DefaultBackend string DefaultCrtSecret string @@ -35,6 +36,7 @@ type ConverterOptions struct { AnnotationPrefix []string DisableKeywords []string AcmeTrackTLSAnn bool + TrackInstances bool HasGateway bool } diff --git a/pkg/haproxy/connections.go b/pkg/haproxy/connections.go new file mode 100644 index 000000000..bda1379c3 --- /dev/null +++ b/pkg/haproxy/connections.go @@ -0,0 +1,150 @@ +/* +Copyright 2021 The HAProxy Ingress Controller Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package haproxy + +import ( + "strings" + "sync" + "time" + + "github.com/jcmoraisjr/haproxy-ingress/pkg/haproxy/socket" +) + +func newConnections(masterSock, adminSock string) *connections { + return &connections{ + mutex: sync.Mutex{}, + masterSock: masterSock, + adminSock: adminSock, + } +} + +type connections struct { + mutex sync.Mutex + masterSock string + adminSock string + oldInstances []socket.HAProxySocket + master socket.HAProxySocket + dynUpdate socket.HAProxySocket + idleChk socket.HAProxySocket +} + +func (c *connections) TrackCurrentInstance(timeoutStopDur, closeSessDur time.Duration) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.shrinkConns() + sock := socket.NewSocketConcurrent(c.adminSock, true) + sock.Unlistening() + c.oldInstances = append(c.oldInstances, sock) + + if closeSessDur > 0 && closeSessDur < timeoutStopDur { + // schedule shutdown sessions + time.AfterFunc(timeoutStopDur-closeSessDur, func() { + // All the shuwdowns run synchronously and exits after all the + // remaining sessions have been shutdown, or in the case of an error. + // When it finishes we can safely close the connection + shutdownSessionsSync(sock, closeSessDur) + sock.Close() + }) + } else { + // This connection can be used by other jobs, and this schedule is + // responsible for closing it if closeSessDur wasn't configured. + time.AfterFunc(timeoutStopDur, func() { sock.Close() }) + } +} + +func (c *connections) ReleaseLastInstance() { + c.mutex.Lock() + defer c.mutex.Unlock() + l := len(c.oldInstances) + if l > 0 { + c.oldInstances[l-1].Close() + c.oldInstances = c.oldInstances[:l-1] + } +} + +func (c *connections) OldInstancesCount() int { + return len(c.oldInstances) +} + +func (c *connections) shrinkConns() { + i := 0 + for j, old := range c.oldInstances { + if i < j { + c.oldInstances[i] = old + } + if old.HasConn() { + i++ + } + } + c.oldInstances = c.oldInstances[:i] +} + +func shutdownSessionsSync(sock socket.HAProxySocket, duration time.Duration) { + sess, err := sock.Send(nil, "show sess") + if err != nil { + return + } + // sess output: + // + // 0x7f9440810000: proto=unix_stream src=... + // 0x7f943f87f200: proto=unix_stream src=... + // 0x7f9442808200: proto=unix_stream src=... + // ... + var sessionList []string + for _, s := range strings.Split(sess[0], "\n") { + i := strings.Index(s, ":") + if i > 0 { + sessionList = append(sessionList, s[:i]) + } + } + interval := duration / time.Duration((len(sessionList) + 1)) + for _, s := range sessionList { + _, err := sock.Send(nil, "shutdown session "+s) + if err != nil { + // maybe the connection or the instance is gone, + // haproxy takes care of the remaining sessions if any + return + } + // we can enqueue shutdowns and sleeps, this is a dedicated go routine + // TODO interval should be the duration between two shutdown starts, + // but it's currently between the end of the former and the start of the next. + time.Sleep(interval) + } +} + +func (c *connections) Master() socket.HAProxySocket { + if c.master == nil { + c.master = socket.NewSocket(c.masterSock, false) + } + return c.master +} + +func (c *connections) DynUpdate() socket.HAProxySocket { + if c.dynUpdate == nil { + // using a non persistent connection (keep alive false) + // to ensure that the current instance will be used + c.dynUpdate = socket.NewSocket(c.adminSock, false) + } + return c.dynUpdate +} + +func (c *connections) IdleChk() socket.HAProxySocket { + if c.idleChk == nil { + c.idleChk = socket.NewSocket(c.adminSock, false) + } + return c.idleChk +} diff --git a/pkg/haproxy/dynupdate.go b/pkg/haproxy/dynupdate.go index 1cb097559..837d47c0d 100644 --- a/pkg/haproxy/dynupdate.go +++ b/pkg/haproxy/dynupdate.go @@ -53,11 +53,11 @@ type epPair struct { cur *hatypes.Endpoint } -func (i *instance) newDynUpdater(socket socket.HAProxySocket) *dynUpdater { +func (i *instance) newDynUpdater() *dynUpdater { return &dynUpdater{ logger: i.logger, config: i.config.(*config), - socket: socket, + socket: i.conns.DynUpdate(), metrics: i.metrics, } } diff --git a/pkg/haproxy/dynupdate_test.go b/pkg/haproxy/dynupdate_test.go index 668093ca2..4e42d6960 100644 --- a/pkg/haproxy/dynupdate_test.go +++ b/pkg/haproxy/dynupdate_test.go @@ -1024,7 +1024,8 @@ INFO-V(2) need to reload due to config changes: [hosts] clientMock := &clientMock{ cmdOutput: test.cmdOutput, } - dynUpdater := c.instance.newDynUpdater(clientMock) + dynUpdater := c.instance.newDynUpdater() + dynUpdater.socket = clientMock dynamic := dynUpdater.update() var actual []string for _, ep := range c.config.Backends().AcquireBackend("default", "app", "8080").Endpoints { diff --git a/pkg/haproxy/instance.go b/pkg/haproxy/instance.go index e0f28422c..d428adb54 100644 --- a/pkg/haproxy/instance.go +++ b/pkg/haproxy/instance.go @@ -42,12 +42,15 @@ type InstanceOptions struct { HAProxyCfgDir string HAProxyMapsDir string LeaderElector types.LeaderElector + MasterSocket string + AdminSocket string MaxOldConfigFiles int Metrics types.Metrics ReloadQueue utils.Queue ReloadStrategy string SortEndpointsBy string StopCh chan struct{} + TrackInstances bool ValidateConfig bool // TODO Fake is used to skip real haproxy calls. Use a mock instead. fake bool @@ -71,6 +74,7 @@ func CreateInstance(logger types.Logger, options InstanceOptions) Instance { haproxyTmpl: template.CreateConfig(), mapsTmpl: template.CreateConfig(), modsecTmpl: template.CreateConfig(), + conns: newConnections(options.MasterSocket, options.AdminSocket), metrics: options.Metrics, } } @@ -84,9 +88,7 @@ type instance struct { mapsTmpl *template.Config modsecTmpl *template.Config config Config - adminSock socket.HAProxySocket - masterSock socket.HAProxySocket - idleChkSock socket.HAProxySocket + conns *connections metrics types.Metrics } @@ -191,10 +193,7 @@ func (i *instance) CalcIdleMetric() { if !i.up { return } - if i.idleChkSock == nil { - i.idleChkSock = socket.NewSocket(i.config.Global().AdminSocket, false) - } - msg, err := i.idleChkSock.Send(i.metrics.HAProxyShowInfoResponseTime, "show info") + msg, err := i.conns.IdleChk().Send(i.metrics.HAProxyShowInfoResponseTime, "show info") if err != nil { i.logger.Error("error reading admin socket: %v", err) return @@ -272,10 +271,7 @@ func (i *instance) haproxyUpdate(timer *utils.Timer) { // TODO update tests and remove `if !fake` above i.logChanged() } - if i.adminSock == nil { - i.adminSock = socket.NewSocket(i.config.Global().AdminSocket, false) - } - updater := i.newDynUpdater(i.adminSock) + updater := i.newDynUpdater() updated := updater.update() if i.options.SortEndpointsBy != "random" { i.config.Backends().SortChangedEndpoints(i.options.SortEndpointsBy) @@ -331,36 +327,33 @@ func (i *instance) haproxyUpdate(timer *utils.Timer) { func (i *instance) Reload(timer *utils.Timer) { i.metrics.IncUpdateFull() + if i.options.TrackInstances { + timeoutStopDur := i.config.Global().TimeoutStopDuration + closeSessDur := i.config.Global().CloseSessionsDuration + i.conns.TrackCurrentInstance(timeoutStopDur, closeSessDur) + } err := i.reloadHAProxy() timer.Tick("reload_haproxy") if err != nil { i.logger.Error("error reloading server:\n%v", err) i.updateSuccessful(false) + if i.options.TrackInstances { + i.conns.ReleaseLastInstance() + } return } i.up = true - i.releaseSockets() i.updateSuccessful(true) + message := "haproxy successfully reloaded" if i.config.Global().External.IsExternal() { - i.logger.Info("haproxy successfully reloaded (external)") + message += " (external)" } else { - i.logger.Info("haproxy successfully reloaded (embedded)") + message += " (embedded)" } -} - -func (i *instance) releaseSockets() { - if i.idleChkSock != nil { - i.idleChkSock.Close() - i.idleChkSock = nil - } - if i.adminSock != nil { - i.adminSock.Close() - i.adminSock = nil - } - if i.masterSock != nil { - i.masterSock.Close() - i.masterSock = nil + if i.options.TrackInstances { + message += "; tracked instance(s): " + strconv.Itoa(i.conns.OldInstancesCount()) } + i.logger.Info(message) } func (i *instance) logChanged() { @@ -529,9 +522,7 @@ func (i *instance) reloadEmbedded() error { } func (i *instance) reloadExternal() error { - if i.masterSock == nil { - i.masterSock = socket.NewSocket(i.config.Global().External.MasterSocket, false) - } + masterSock := i.conns.Master() if !i.up { // first run, wait until the external haproxy is running // and successfully listening to the master socket. @@ -539,12 +530,12 @@ func (i *instance) reloadExternal() error { i.logger.Info("waiting for the external haproxy...") for { var err error - if _, err = i.masterSock.Send(nil, "show proc"); err == nil { + if _, err = masterSock.Send(nil, "show proc"); err == nil { break } j++ if j%10 == 0 { - i.logger.Info("cannot connect to the master socket '%s': %v", i.masterSock.Address(), err) + i.logger.Info("cannot connect to the master socket '%s': %v", masterSock.Address(), err) } select { case <-i.options.StopCh: @@ -553,10 +544,10 @@ func (i *instance) reloadExternal() error { } } } - if _, err := i.masterSock.Send(nil, "reload"); err != nil { + if _, err := masterSock.Send(nil, "reload"); err != nil { return fmt.Errorf("error sending reload to master socket: %w", err) } - out, err := socket.HAProxyProcs(i.masterSock) + out, err := socket.HAProxyProcs(masterSock) if err != nil { return fmt.Errorf("error reading procs from master socket: %w", err) } diff --git a/pkg/haproxy/types/types.go b/pkg/haproxy/types/types.go index 12b4b084f..dc373beea 100644 --- a/pkg/haproxy/types/types.go +++ b/pkg/haproxy/types/types.go @@ -72,6 +72,8 @@ type Global struct { Prometheus PromConfig Security SecurityConfig Stats StatsConfig + CloseSessionsDuration time.Duration + TimeoutStopDuration time.Duration StrictHost bool UseHTX bool DefaultBackendRedir string @@ -124,6 +126,7 @@ type TimeoutConfig struct { BackendTimeoutConfig Client string ClientFin string + Stats string Stop string } diff --git a/rootfs/etc/templates/haproxy/haproxy.tmpl b/rootfs/etc/templates/haproxy/haproxy.tmpl index cf094619e..613ab7c25 100644 --- a/rootfs/etc/templates/haproxy/haproxy.tmpl +++ b/rootfs/etc/templates/haproxy/haproxy.tmpl @@ -86,6 +86,9 @@ global {{- end }} stats socket {{ default "--" $global.AdminSocket }} level admin expose-fd listeners mode 600 {{- if gt $global.Procs.Nbproc 1 }} process 1{{ end }} +{{- if $global.Timeout.Stats }} + stats timeout {{ $global.Timeout.Stats }} +{{- end }} {{- if $global.LoadServerState }} server-state-file state-global server-state-base /var/lib/haproxy/