From 766e516ad9bf383d4fbcec3a9d71a7498e70722b Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Tue, 12 Sep 2023 10:00:48 +0200 Subject: [PATCH 1/7] Fix possible race condition during transaction with errors --- client.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/client.go b/client.go index 186722d..9700069 100644 --- a/client.go +++ b/client.go @@ -182,26 +182,25 @@ func (this *Client) FlushRedisAndRespond() (err error) { } defer func() { - if this.transactionMode == transactionModeNone { - // We are not in a transaction, so we can simply recycle it - connectionPool.RecycleRemoteConnection(redisConn) - - if this.reservedRedisConn != nil { - this.reservedRedisConn = nil + if err != nil { + // Force upstream disconnect on any error + redisConn.Disconnect() + } - } + if this.transactionMode == transactionModeNone { if this.transactionDoneChannel != nil { + // If there was a transaction going on, we need to stop it, which will recycle the connection close(this.transactionDoneChannel) - this.transactionDoneChannel = nil + } else { + // We are not in a transaction, so we can simply recycle it + connectionPool.RecycleRemoteConnection(redisConn) } } else { // We are currently in a transaction if err != nil { // Reset client and server connection as we can not recover from any error states this.ReadChannel <- readItem{nil, err} - redisConn.Disconnect() close(this.transactionDoneChannel) - connectionPool.RecycleRemoteConnection(redisConn) } else if this.reservedRedisConn == nil { this.reservedRedisConn = redisConn this.transactionDoneChannel = make(chan interface{}, 1) @@ -211,8 +210,10 @@ func (this *Client) FlushRedisAndRespond() (err error) { case <-time.After(this.TransactionTimeout): this.ReadChannel <- readItem{nil, ERR_TRANSACTION_TIMEOUT} redisConn.Disconnect() - connectionPool.RecycleRemoteConnection(redisConn) } + this.transactionDoneChannel = nil + this.reservedRedisConn = nil + connectionPool.RecycleRemoteConnection(redisConn) }() } } @@ -220,8 +221,6 @@ func (this *Client) FlushRedisAndRespond() (err error) { if redisConn.DatabaseId != this.DatabaseId { if err = redisConn.SelectDatabase(this.DatabaseId); err != nil { - // Disconnect the current connection if selecting failed, will auto-reconnect this connection holder when queried later - redisConn.Disconnect() return } } @@ -235,7 +234,6 @@ func (this *Client) FlushRedisAndRespond() (err error) { _, err = redisConn.Writer.Write(command.GetBuffer()) if err != nil { log.Error("Error when writing to server: %s. Disconnecting the connection.", err) - redisConn.Disconnect() return } } @@ -244,7 +242,6 @@ func (this *Client) FlushRedisAndRespond() (err error) { err = redisConn.Writer.Flush() if err != nil { log.Error("Error when flushing to server: %s. Disconnecting the connection.", err) - redisConn.Disconnect() return } } @@ -253,7 +250,6 @@ func (this *Client) FlushRedisAndRespond() (err error) { if err = protocol.CopyServerResponses(redisConn.Reader, this.Writer, numCommands); err != nil { log.Error("Error when copying redis responses to client: %s. Disconnecting the connection.", err) - redisConn.Disconnect() this.ReadChannel <- readItem{nil, err} return } From a29d7f416ce20731bb0abc915e86c636cfcb33ef Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Tue, 12 Sep 2023 10:02:17 +0200 Subject: [PATCH 2/7] Deny connection reuse if unexpected data is returned --- connection/connection.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/connection/connection.go b/connection/connection.go index 95cfa2c..3660525 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -239,7 +239,9 @@ func (c *Connection) IsConnected() bool { } if n != 0 { + // If we get stuff back here, the connection is most likely unusable at this point log.Warn("Got %d bytes back when we expected 0.", n) + return false } return true From ac6bea54f008ac9f596d35528d29e9dd6455879f Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Tue, 12 Sep 2023 10:03:14 +0200 Subject: [PATCH 3/7] Fix actual errors hidden behind io.EOF error in `CopyServerResponses` --- protocol/protocol.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index 8abd88e..9eb19b0 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -427,7 +427,7 @@ func WriteLine(line []byte, destination *writer.FlexibleWriter, flush bool) (err // Copies a server response from the remoteBuffer into your localBuffer // If a protocol or buffer error is encountered, it is bubbled up -func CopyServerResponses(reader *bufio.Reader, localBuffer *writer.FlexibleWriter, numResponses int) (err error) { +func CopyServerResponses(reader *bufio.Reader, localBuffer *writer.FlexibleWriter, numResponses int) error { //start := time.Now() //defer func() { // graphite.Timing("copy_server_responses", time.Now().Sub(start)) @@ -443,16 +443,12 @@ func CopyServerResponses(reader *bufio.Reader, localBuffer *writer.FlexibleWrite numRead++ } - if numRead < numResponses { - return io.EOF - } - if sErr := scanner.Err(); sErr != nil { return sErr } - if err != nil { - return err + if numRead < numResponses { + return io.EOF } return nil From e32f648d9c2f011adfb5af43682859cb85ba86e0 Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Tue, 12 Sep 2023 10:05:23 +0200 Subject: [PATCH 4/7] Add logging of transaction timeouts --- client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client.go b/client.go index 9700069..44d8fe8 100644 --- a/client.go +++ b/client.go @@ -208,6 +208,7 @@ func (this *Client) FlushRedisAndRespond() (err error) { select { case <-this.transactionDoneChannel: case <-time.After(this.TransactionTimeout): + log.Error("Transaction timed out. Disconnecting the connection.") this.ReadChannel <- readItem{nil, ERR_TRANSACTION_TIMEOUT} redisConn.Disconnect() } From f3f612fbdfe99179079375f581f211320d3e2592 Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Tue, 12 Sep 2023 16:58:43 +0200 Subject: [PATCH 5/7] Add automatic connection reconnect with configurable interval --- connection/connection.go | 26 ++++++++++++++++---------- connection/connection_pool.go | 8 +++++++- connection/connection_pool_test.go | 4 ++-- connection/connection_test.go | 16 ++++++++-------- doc/config.md | 4 +++- main/config.go | 1 + main/main.go | 16 ++++++++++++---- server.go | 7 +++++-- 8 files changed, 54 insertions(+), 28 deletions(-) diff --git a/connection/connection.go b/connection/connection.go index 3660525..84fc0e3 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -49,19 +49,21 @@ type Connection struct { // The writer to the redis server Writer *writer.FlexibleWriter - protocol string - endpoint string - authUser string - authPassword string - connectTimeout time.Duration - readTimeout time.Duration - writeTimeout time.Duration + protocol string + endpoint string + authUser string + authPassword string + connectTimeout time.Duration + readTimeout time.Duration + writeTimeout time.Duration + reconnectInterval time.Duration + nextReconnect time.Time } // Initializes a new connection, of the given protocol and endpoint, with the given connection timeout // ex: "unix", "/tmp/myAwesomeSocket", 50*time.Millisecond func NewConnection(Protocol, Endpoint string, ConnectTimeout, ReadTimeout, WriteTimeout time.Duration, - authUser string, authPassword string) *Connection { + reconnectInterval time.Duration, authUser string, authPassword string) *Connection { c := &Connection{} c.protocol = Protocol c.endpoint = Endpoint @@ -70,13 +72,14 @@ func NewConnection(Protocol, Endpoint string, ConnectTimeout, ReadTimeout, Write c.connectTimeout = ConnectTimeout c.readTimeout = ReadTimeout c.writeTimeout = WriteTimeout + c.reconnectInterval = reconnectInterval return c } func (c *Connection) Disconnect() { if c.connection != nil { c.connection.Close() - log.Info("Disconnected a connection") + log.Debug("Disconnected a connection") graphite.Increment("disconnect") } c.connection = nil @@ -86,7 +89,7 @@ func (c *Connection) Disconnect() { } func (c *Connection) ReconnectIfNecessary() (err error) { - if c.IsConnected() { + if c.IsConnected() && time.Now().Before(c.nextReconnect) { return nil } @@ -109,6 +112,9 @@ func (c *Connection) ReconnectIfNecessary() (err error) { return err } + c.nextReconnect = time.Now().Add(c.reconnectInterval) + log.Debug("Connected a connection") + return nil } diff --git a/connection/connection_pool.go b/connection/connection_pool.go index 536b9d0..4083f1d 100644 --- a/connection/connection_pool.go +++ b/connection/connection_pool.go @@ -41,6 +41,8 @@ const ( EXTERN_READ_TIMEOUT = time.Millisecond * 500 //Default write timeout, for connection pools. Can be adjusted on individual pools after initialization EXTERN_WRITE_TIMEOUT = time.Millisecond * 500 + // Default reconnect interval, for connection pools. Can be adjusted on individual pools after initialization + EXTERN_RECONNECT_INTERVAL = time.Hour * 24 ) // A pool of connections to a single outbound redis server @@ -59,6 +61,8 @@ type ConnectionPool struct { ReadTimeout time.Duration //An overridable write timeout. Defaults to EXTERN_WRITE_TIMEOUT WriteTimeout time.Duration + //An overridable reconnection interval. Defaults to EXTERN_RECONNECT_INTERVAL + ReconnectInterval time.Duration //channel of recycled connections, for re-use connectionPool chan *Connection // The connection used for diagnostics (like checking that the pool is up) @@ -74,7 +78,7 @@ type ConnectionPool struct { // Initialize a new connection pool, for the given protocol/endpoint, with a given pool capacity // ex: "unix", "/tmp/myAwesomeSocket", 5 func NewConnectionPool(Protocol, Endpoint string, poolCapacity int, connectTimeout time.Duration, - readTimeout time.Duration, writeTimeout time.Duration, authUser string, + readTimeout time.Duration, writeTimeout time.Duration, reconnectInterval time.Duration, authUser string, authPassword string) (newConnectionPool *ConnectionPool) { newConnectionPool = &ConnectionPool{} newConnectionPool.Protocol = Protocol @@ -85,6 +89,7 @@ func NewConnectionPool(Protocol, Endpoint string, poolCapacity int, connectTimeo newConnectionPool.ConnectTimeout = connectTimeout newConnectionPool.ReadTimeout = readTimeout newConnectionPool.WriteTimeout = writeTimeout + newConnectionPool.ReconnectInterval = reconnectInterval newConnectionPool.Count = 0 // Fill the pool with as many handlers as it asks for @@ -124,6 +129,7 @@ func (cp *ConnectionPool) CreateConnection() *Connection { cp.ConnectTimeout, cp.ReadTimeout, cp.WriteTimeout, + cp.ReconnectInterval, cp.AuthUser, cp.AuthPassword, ) diff --git a/connection/connection_pool_test.go b/connection/connection_pool_test.go index 61a3abd..7fa03ca 100644 --- a/connection/connection_pool_test.go +++ b/connection/connection_pool_test.go @@ -46,7 +46,7 @@ func TestRecycleConnection(test *testing.T) { //Setting the channel at size 2 makes this more interesting timeout := 500 * time.Millisecond - connectionPool := NewConnectionPool("unix", testSocket, 2, timeout, timeout, timeout, "", "") + connectionPool := NewConnectionPool("unix", testSocket, 2, timeout, timeout, timeout, time.Hour, "", "") connection, err := connectionPool.GetConnection() if err != nil { @@ -111,7 +111,7 @@ func TestCheckConnectionState(test *testing.T) { // Create the pool, have a size of zero so that no connections are made except for diagnostics timeout := 10 * time.Millisecond - connectionPool := NewConnectionPool("unix", testSocket, 0, timeout, timeout, timeout, "", "") + connectionPool := NewConnectionPool("unix", testSocket, 0, timeout, timeout, timeout, time.Hour, "", "") // get and release which will actually create the connection connectionPool.getDiagnosticConnection() diff --git a/connection/connection_test.go b/connection/connection_test.go index f25853d..a52ad09 100644 --- a/connection/connection_test.go +++ b/connection/connection_test.go @@ -42,7 +42,7 @@ func verifySelectDatabaseSuccess(test *testing.T, database int) { test.Fatal("Failed to listen on test socket ", testSocket) } defer listenSock.Close() - testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "") + testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "") testConnection.ReconnectIfNecessary() //read buffer does't matter @@ -77,7 +77,7 @@ func verifySelectDatabaseError(test *testing.T, database int) { defer func() { listenSock.Close() }() - testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "") + testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "") testConnection.ReconnectIfNecessary() //read buffer does't matter readBuf := bufio.NewReader(bytes.NewBufferString("+NOPE\r\n")) @@ -106,7 +106,7 @@ func verifySelectDatabaseTimeout(test *testing.T, database int) { } defer listenSock.Close() - testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "") + testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "") if err := testConnection.ReconnectIfNecessary(); err != nil { test.Fatalf("Could not connect to testSocket %s: %s", testSocket, err) } @@ -149,13 +149,13 @@ func TestNewUnixConnection(test *testing.T) { } defer listenSock.Close() - connection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "") + connection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "") connection.ReconnectIfNecessary() if connection == nil || connection.connection == nil { test.Fatal("Connection initialization returned nil, binding to unix endpoint failed") } - connection = NewConnection("unix", "/tmp/thisdoesnotexist", 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "") + connection = NewConnection("unix", "/tmp/thisdoesnotexist", 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "") connection.ReconnectIfNecessary() if connection != nil && connection.connection != nil { test.Fatal("Connection initialization success, binding to fake unix endpoint succeeded????") @@ -170,14 +170,14 @@ func TestNewTcpConnection(test *testing.T) { } defer listenSock.Close() - connection := NewConnection("tcp", testEndpoint, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "") + connection := NewConnection("tcp", testEndpoint, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "") connection.ReconnectIfNecessary() if connection == nil || connection.connection == nil { test.Fatal("Connection initialization returned nil, binding to tcp endpoint failed") } //reserved sock should have nothing on it - connection = NewConnection("tcp", "localhost:49151", 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "") + connection = NewConnection("tcp", "localhost:49151", 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "") connection.ReconnectIfNecessary() if connection != nil && connection.connection != nil { test.Fatal("Connection initialization success, binding to fake tcp endpoint succeeded????") @@ -194,7 +194,7 @@ func TestCheckConnection(test *testing.T) { listenSock.Close() }() - connection := NewConnection("unix", testSocket, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond, "", "") + connection := NewConnection("unix", testSocket, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond, time.Hour, "", "") connection.ReconnectIfNecessary() if connection == nil { test.Fatal("Connection initialization returned nil, binding to unix endpoint failed") diff --git a/doc/config.md b/doc/config.md index eb2ecaa..4596177 100644 --- a/doc/config.md +++ b/doc/config.md @@ -16,6 +16,7 @@ Configuration can be handled either via command-line arguments or via config fil -remoteReadTimeout=0: Timeout to set for remote redises (read) -remoteTimeout=0: Timeout to set for remote redises (connect+read+write) -remoteWriteTimeout=0: Timeout to set for remote redises (write) + -remoteReconnectInterval=0: Interval in which connected redis connections will be forced to reconnect in minutes -socket="": The socket to listen for incoming connections on. If this is provided, host and port are ignored -tcpConnections="localhost:6380 localhost:6381": TCP connections (destination redis servers) to multiplex over -unixConnections="": Unix connections (destination redis servers) to multiplex over @@ -46,7 +47,8 @@ for the configuration json is as follows: "remoteTimeout": int, "remoteReadTimeout": int, "remoteWriteTimeout": int, - "remoteConnectTimeout": int + "remoteConnectTimeout": int, + "remoteReconnectInterval": int }, ... ] diff --git a/main/config.go b/main/config.go index 3f3ba01..cca65f3 100644 --- a/main/config.go +++ b/main/config.go @@ -47,6 +47,7 @@ type PoolConfig struct { RemoteTimeout int64 `json:"remoteTimeout"` RemoteReadTimeout int64 `json:"remoteReadTimeout"` RemoteWriteTimeout int64 `json:"remoteWriteTimeout"` + RemoteReconnectInterval int64 `json:"remoteReconnectInterval"` RemoteConnectTimeout int64 `json:"remoteConnectTimeout"` Failover bool `json:"failover"` } diff --git a/main/main.go b/main/main.go index cb8c3e4..6c56297 100644 --- a/main/main.go +++ b/main/main.go @@ -60,6 +60,7 @@ var remoteTimeout = flag.Int64("remoteTimeout", 0, "Timeout to set for remote re var remoteReadTimeout = flag.Int64("remoteReadTimeout", 0, "Timeout to set for remote redises (read)") var remoteWriteTimeout = flag.Int64("remoteWriteTimeout", 0, "Timeout to set for remote redises (write)") var remoteConnectTimeout = flag.Int64("remoteConnectTimeout", 0, "Timeout to set for remote redises (connect)") +var remoteReconnectInterval = flag.Int64("remoteReconnectInterval", 0, "Interval in which connected redis connections will be forced to reconnect in minutes") var cpuProfile = flag.String("cpuProfile", "", "Direct CPU Profile to target file") var configFile = flag.String("config", "", "Configuration file (JSON)") var doDebug = flag.Bool("debug", false, "Debug mode") @@ -148,10 +149,11 @@ func configureFromArgs() ([]PoolConfig, error) { LocalWriteTimeout: *localWriteTimeout, LocalTransactionTimeout: *localTransactionTimeout, - RemoteTimeout: *remoteTimeout, - RemoteReadTimeout: *remoteReadTimeout, - RemoteWriteTimeout: *remoteWriteTimeout, - RemoteConnectTimeout: *remoteConnectTimeout, + RemoteTimeout: *remoteTimeout, + RemoteReadTimeout: *remoteReadTimeout, + RemoteWriteTimeout: *remoteWriteTimeout, + RemoteConnectTimeout: *remoteConnectTimeout, + RemoteReconnectInterval: *remoteReconnectInterval, }} return config, nil @@ -257,6 +259,12 @@ func createInstances(configs []PoolConfig) (rmuxInstances []*rmux.RedisMultiplex log.Info("Setting remote redis write timeout to: %s", duration) } + if config.RemoteReconnectInterval != 0 { + interval := time.Duration(config.RemoteReconnectInterval) * time.Minute + rmuxInstance.EndpointReconnectInterval = interval + log.Info("Setting remote reconnect interval to: %s", interval) + } + rmuxInstance.AuthUser = config.AuthUser rmuxInstance.AuthPassword = config.AuthPassword diff --git a/server.go b/server.go index c055a05..9900b95 100644 --- a/server.go +++ b/server.go @@ -75,6 +75,8 @@ type RedisMultiplexer struct { EndpointReadTimeout time.Duration //An overridable write timeout. Defaults to EXTERN_WRITE_TIMEOUT EndpointWriteTimeout time.Duration + //An overridable reconnection interval. Defaults to EXTERN_RECONNECT_INTERVAL + EndpointReconnectInterval time.Duration //An overridable read timeout. Defaults to EXTERN_READ_TIMEOUT ClientReadTimeout time.Duration //An overridable write timeout. Defaults to EXTERN_WRITE_TIMEOUT @@ -131,6 +133,7 @@ func NewRedisMultiplexer(listenProtocol, listenEndpoint string, poolSize int) (n newRedisMultiplexer.EndpointConnectTimeout = connection.EXTERN_CONNECT_TIMEOUT newRedisMultiplexer.EndpointReadTimeout = connection.EXTERN_READ_TIMEOUT newRedisMultiplexer.EndpointWriteTimeout = connection.EXTERN_WRITE_TIMEOUT + newRedisMultiplexer.EndpointReconnectInterval = connection.EXTERN_RECONNECT_INTERVAL newRedisMultiplexer.ClientReadTimeout = connection.EXTERN_READ_TIMEOUT newRedisMultiplexer.ClientWriteTimeout = connection.EXTERN_WRITE_TIMEOUT newRedisMultiplexer.ClientTransactionTimeout = EXTERN_TRANSACTION_TIMEOUT @@ -142,8 +145,8 @@ func NewRedisMultiplexer(listenProtocol, listenEndpoint string, poolSize int) (n // Adds a connection to the redis multiplexer, for the given protocol and endpoint func (this *RedisMultiplexer) AddConnection(remoteProtocol, remoteEndpoint string) { connectionCluster := connection.NewConnectionPool(remoteProtocol, remoteEndpoint, this.PoolSize, - this.EndpointConnectTimeout, this.EndpointReadTimeout, this.EndpointWriteTimeout, this.AuthUser, - this.AuthPassword) + this.EndpointConnectTimeout, this.EndpointReadTimeout, this.EndpointWriteTimeout, this.EndpointReconnectInterval, + this.AuthUser, this.AuthPassword) this.ConnectionCluster = append(this.ConnectionCluster, connectionCluster) if len(this.ConnectionCluster) == 1 { this.PrimaryConnectionPool = connectionCluster From 11d48d9fd0a2dd1d23757124fc01038417fce9e9 Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Tue, 12 Sep 2023 17:01:28 +0200 Subject: [PATCH 6/7] Improve log messages caused by diagnostic connection events --- connection/connection.go | 11 +++++++---- connection/connection_pool.go | 9 ++------- server.go | 7 ++++++- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/connection/connection.go b/connection/connection.go index 84fc0e3..16e469c 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -187,6 +187,7 @@ func (this *Connection) authenticate() (err error) { // Checks if the current connection is up or not // If we do not get a response, or if we do not get a PONG reply, or if there is any error, returns false +// This is only used for diagnostic connections! func (myConnection *Connection) CheckConnection() bool { if myConnection.connection == nil { return false @@ -200,7 +201,8 @@ func (myConnection *Connection) CheckConnection() bool { startWrite := time.Now() err := protocol.WriteLine(protocol.SHORT_PING_COMMAND, myConnection.Writer, true) if err != nil { - log.Error("CheckConnection: Could not write PING Err:%s Timing:%s", err, time.Now().Sub(startWrite)) + log.Error("CheckConnection: Could not write PING on diagnostics connection. Err:%s Timing:%s", + err, time.Now().Sub(startWrite)) myConnection.Disconnect() return false } @@ -212,11 +214,12 @@ func (myConnection *Connection) CheckConnection() bool { return true } else { if err != nil { - log.Error("CheckConnection: Could not read PING. Error: %s Timing:%s", err, time.Now().Sub(startRead)) + log.Error("CheckConnection: Could not read PING on diagnostics connection. Error: %s Timing:%s", + err, time.Now().Sub(startRead)) } else if isPrefix { - log.Error("CheckConnection: ReadLine returned prefix: %q", line) + log.Error("CheckConnection: ReadLine returned prefix on diagnostics connection: %q", line) } else { - log.Error("CheckConnection: Expected PONG response. Got: %q", line) + log.Error("CheckConnection: Expected PONG response on diagnostics connection. Got: %q", line) } myConnection.Disconnect() return false diff --git a/connection/connection_pool.go b/connection/connection_pool.go index 4083f1d..b151489 100644 --- a/connection/connection_pool.go +++ b/connection/connection_pool.go @@ -139,7 +139,7 @@ func (cp *ConnectionPool) getDiagnosticConnection() (connection *Connection, err cp.diagnosticConnectionLock.Lock() if err := cp.diagnosticConnection.ReconnectIfNecessary(); err != nil { - log.Error("The diangnostic connection is down for %s:%s : %s", cp.Protocol, cp.Endpoint, err) + log.Error("The diagnostic connection is down for %s:%s : %s", cp.Protocol, cp.Endpoint, err) cp.diagnosticConnectionLock.Unlock() return nil, err } @@ -172,6 +172,7 @@ func (cp *ConnectionPool) IsConnected() bool { // Checks the state of connections in this connection pool // If a remote server has severe lag, mysteriously goes away, or stops responding all-together, returns false +// This is only used for diagnostic connections! func (cp *ConnectionPool) CheckConnectionState() (isUp bool) { isUp = true defer func() { @@ -185,12 +186,6 @@ func (cp *ConnectionPool) CheckConnectionState() (isUp bool) { } defer cp.releaseDiagnosticConnection() - //If we failed to bind, or if our PING fails, the pool is down - if connection == nil || connection.connection == nil { - isUp = false - return - } - if !connection.CheckConnection() { connection.Disconnect() isUp = false diff --git a/server.go b/server.go index 9900b95..42402b9 100644 --- a/server.go +++ b/server.go @@ -155,7 +155,7 @@ func (this *RedisMultiplexer) AddConnection(remoteProtocol, remoteEndpoint strin } } -// Counts the number of active endpoints on the server +// Counts the number of active endpoints (connection pools) on the server func (this *RedisMultiplexer) countActiveConnections() (activeConnections int) { activeConnections = 0 for _, connectionPool := range this.ConnectionCluster { @@ -163,10 +163,15 @@ func (this *RedisMultiplexer) countActiveConnections() (activeConnections int) { activeConnections++ } } + + if this.activeConnectionCount < activeConnections { + log.Info("Connected diagnostics connection.") + } return } // Checks the status of all connections, and calculates how many of them are currently up +// This only counts connection pools / diagnostic connections not real redis sessions func (this *RedisMultiplexer) maintainConnectionStates() { var m runtime.MemStats for this.active { From 5e4fd39ce7569d19e69d3d3ecad25b2ae321b5e7 Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Tue, 12 Sep 2023 17:05:15 +0200 Subject: [PATCH 7/7] Increase default diagnostic connection check interval and make it configurable --- doc/config.md | 4 +++- main/config.go | 39 ++++++++++++++++++++------------------- main/main.go | 18 +++++++++++++----- server.go | 7 ++++++- 4 files changed, 42 insertions(+), 26 deletions(-) diff --git a/doc/config.md b/doc/config.md index 4596177..ccdd075 100644 --- a/doc/config.md +++ b/doc/config.md @@ -17,6 +17,7 @@ Configuration can be handled either via command-line arguments or via config fil -remoteTimeout=0: Timeout to set for remote redises (connect+read+write) -remoteWriteTimeout=0: Timeout to set for remote redises (write) -remoteReconnectInterval=0: Interval in which connected redis connections will be forced to reconnect in minutes + -remoteDiagnosticCheckInterval=0: Interval to check the diagnostic connection in seconds -socket="": The socket to listen for incoming connections on. If this is provided, host and port are ignored -tcpConnections="localhost:6380 localhost:6381": TCP connections (destination redis servers) to multiplex over -unixConnections="": Unix connections (destination redis servers) to multiplex over @@ -48,7 +49,8 @@ for the configuration json is as follows: "remoteReadTimeout": int, "remoteWriteTimeout": int, "remoteConnectTimeout": int, - "remoteReconnectInterval": int + "remoteReconnectInterval": int, + "remoteDiagnosticCheckInterval": int }, ... ] diff --git a/main/config.go b/main/config.go index cca65f3..85cf283 100644 --- a/main/config.go +++ b/main/config.go @@ -31,25 +31,26 @@ import ( ) type PoolConfig struct { - Host string `json:"host"` - Port int `json:"port"` - Socket string `json:"socket"` - MaxProcesses int `json:"maxProcesses"` - PoolSize int `json:"poolSize"` - TcpConnections []string `json:"tcpConnections"` - UnixConnections []string `json:"unixConnections"` - AuthUser string `json:"authUser"` - AuthPassword string `json:"authPassword"` - LocalTimeout int64 `json:"localTimeout"` - LocalReadTimeout int64 `json:"localReadTimeout"` - LocalWriteTimeout int64 `json:"localWriteTimeout"` - LocalTransactionTimeout int64 `json:"localTransactionTimeout"` - RemoteTimeout int64 `json:"remoteTimeout"` - RemoteReadTimeout int64 `json:"remoteReadTimeout"` - RemoteWriteTimeout int64 `json:"remoteWriteTimeout"` - RemoteReconnectInterval int64 `json:"remoteReconnectInterval"` - RemoteConnectTimeout int64 `json:"remoteConnectTimeout"` - Failover bool `json:"failover"` + Host string `json:"host"` + Port int `json:"port"` + Socket string `json:"socket"` + MaxProcesses int `json:"maxProcesses"` + PoolSize int `json:"poolSize"` + TcpConnections []string `json:"tcpConnections"` + UnixConnections []string `json:"unixConnections"` + AuthUser string `json:"authUser"` + AuthPassword string `json:"authPassword"` + LocalTimeout int64 `json:"localTimeout"` + LocalReadTimeout int64 `json:"localReadTimeout"` + LocalWriteTimeout int64 `json:"localWriteTimeout"` + LocalTransactionTimeout int64 `json:"localTransactionTimeout"` + RemoteTimeout int64 `json:"remoteTimeout"` + RemoteReadTimeout int64 `json:"remoteReadTimeout"` + RemoteWriteTimeout int64 `json:"remoteWriteTimeout"` + RemoteReconnectInterval int64 `json:"remoteReconnectInterval"` + RemoteDiagnosticCheckInterval int64 `json:"remoteDiagnosticCheckInterval"` + RemoteConnectTimeout int64 `json:"remoteConnectTimeout"` + Failover bool `json:"failover"` } func ReadConfigFromFile(configFile string) ([]PoolConfig, error) { diff --git a/main/main.go b/main/main.go index 6c56297..95d5493 100644 --- a/main/main.go +++ b/main/main.go @@ -61,6 +61,7 @@ var remoteReadTimeout = flag.Int64("remoteReadTimeout", 0, "Timeout to set for r var remoteWriteTimeout = flag.Int64("remoteWriteTimeout", 0, "Timeout to set for remote redises (write)") var remoteConnectTimeout = flag.Int64("remoteConnectTimeout", 0, "Timeout to set for remote redises (connect)") var remoteReconnectInterval = flag.Int64("remoteReconnectInterval", 0, "Interval in which connected redis connections will be forced to reconnect in minutes") +var remoteDiagnosticCheckInterval = flag.Int64("remoteDiagnosticCheckInterval", 0, "Interval to check the diagnostic connection in seconds") var cpuProfile = flag.String("cpuProfile", "", "Direct CPU Profile to target file") var configFile = flag.String("config", "", "Configuration file (JSON)") var doDebug = flag.Bool("debug", false, "Debug mode") @@ -149,11 +150,12 @@ func configureFromArgs() ([]PoolConfig, error) { LocalWriteTimeout: *localWriteTimeout, LocalTransactionTimeout: *localTransactionTimeout, - RemoteTimeout: *remoteTimeout, - RemoteReadTimeout: *remoteReadTimeout, - RemoteWriteTimeout: *remoteWriteTimeout, - RemoteConnectTimeout: *remoteConnectTimeout, - RemoteReconnectInterval: *remoteReconnectInterval, + RemoteTimeout: *remoteTimeout, + RemoteReadTimeout: *remoteReadTimeout, + RemoteWriteTimeout: *remoteWriteTimeout, + RemoteConnectTimeout: *remoteConnectTimeout, + RemoteReconnectInterval: *remoteReconnectInterval, + RemoteDiagnosticCheckInterval: *remoteDiagnosticCheckInterval, }} return config, nil @@ -265,6 +267,12 @@ func createInstances(configs []PoolConfig) (rmuxInstances []*rmux.RedisMultiplex log.Info("Setting remote reconnect interval to: %s", interval) } + if config.RemoteDiagnosticCheckInterval != 0 { + interval := time.Duration(config.RemoteDiagnosticCheckInterval) * time.Second + rmuxInstance.EndpointDiagnosticCheckInterval = interval + log.Info("Setting remote diagnostic check interval to: %s", interval) + } + rmuxInstance.AuthUser = config.AuthUser rmuxInstance.AuthPassword = config.AuthPassword diff --git a/server.go b/server.go index 42402b9..55bc2b9 100644 --- a/server.go +++ b/server.go @@ -48,6 +48,8 @@ var ( MULTIPLEX_OPERATION_UNSUPPORTED_RESPONSE = []byte("This command is not supported for multiplexing servers") //Response code for when a client can't connect to any target servers CONNECTION_DOWN_RESPONSE = []byte("Connection down") + //Default diagnostic check interval + EXTERN_DIAGNOSTIC_CHECK_INTERVAL = 1 * time.Second ) var version string = "dev" @@ -77,6 +79,8 @@ type RedisMultiplexer struct { EndpointWriteTimeout time.Duration //An overridable reconnection interval. Defaults to EXTERN_RECONNECT_INTERVAL EndpointReconnectInterval time.Duration + //An overridable diagnostic check interval. Defaults to EXTERN_DIAGNOSTIC_CHECK_INTERVAL + EndpointDiagnosticCheckInterval time.Duration //An overridable read timeout. Defaults to EXTERN_READ_TIMEOUT ClientReadTimeout time.Duration //An overridable write timeout. Defaults to EXTERN_WRITE_TIMEOUT @@ -134,6 +138,7 @@ func NewRedisMultiplexer(listenProtocol, listenEndpoint string, poolSize int) (n newRedisMultiplexer.EndpointReadTimeout = connection.EXTERN_READ_TIMEOUT newRedisMultiplexer.EndpointWriteTimeout = connection.EXTERN_WRITE_TIMEOUT newRedisMultiplexer.EndpointReconnectInterval = connection.EXTERN_RECONNECT_INTERVAL + newRedisMultiplexer.EndpointDiagnosticCheckInterval = EXTERN_DIAGNOSTIC_CHECK_INTERVAL newRedisMultiplexer.ClientReadTimeout = connection.EXTERN_READ_TIMEOUT newRedisMultiplexer.ClientWriteTimeout = connection.EXTERN_WRITE_TIMEOUT newRedisMultiplexer.ClientTransactionTimeout = EXTERN_TRANSACTION_TIMEOUT @@ -180,7 +185,7 @@ func (this *RedisMultiplexer) maintainConnectionStates() { runtime.ReadMemStats(&m) // // Debug("Memory profile: InUse(%d) Idle (%d) Released(%d)", m.HeapInuse, m.HeapIdle, m.HeapReleased) this.generateMultiplexInfo() - time.Sleep(100 * time.Millisecond) + time.Sleep(this.EndpointDiagnosticCheckInterval) } }