From 7d3a323c1e38b9bc7568a3551fdba97ecc41f444 Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Fri, 1 May 2020 11:08:05 -0400 Subject: [PATCH 1/5] Adding method to remove connections --- lib/connection_pool.rb | 6 ++++++ lib/connection_pool/timed_stack.rb | 6 ++++++ test/test_connection_pool.rb | 16 ++++++++++++++++ 3 files changed, 28 insertions(+) diff --git a/lib/connection_pool.rb b/lib/connection_pool.rb index 037c076..bc97fd0 100644 --- a/lib/connection_pool.rb +++ b/lib/connection_pool.rb @@ -92,6 +92,12 @@ def checkin nil end + # Removes a connection from the pool and makes the space available again + # connection may not currently be checked out of the queue. + def remove_connection(conn) + @available.remove_connection(conn) + end + def shutdown(&block) @available.shutdown(&block) end diff --git a/lib/connection_pool/timed_stack.rb b/lib/connection_pool/timed_stack.rb index 9d3dc1b..7b437ce 100644 --- a/lib/connection_pool/timed_stack.rb +++ b/lib/connection_pool/timed_stack.rb @@ -117,6 +117,12 @@ def length @max - @created + @que.length end + def remove_connection(conn) + @mutex.synchronize do + @created -= 1 if @que.delete(conn) + end + end + private def current_time diff --git a/test/test_connection_pool.rb b/test/test_connection_pool.rb index 4aa0c0a..71763f4 100644 --- a/test/test_connection_pool.rb +++ b/test/test_connection_pool.rb @@ -554,4 +554,20 @@ def test_stats_with_string_size assert_equal(1, pool.available) end end + + def test_removing_and_replacing_connections + created = 0 + pool = ConnectionPool.new(timeout:0, size: 1) do + created += 1 + NetworkConnection.new + end + + conn = pool.checkout + pool.checkin + + pool.remove_connection(conn) + pool.with{ } + + assert_equal created, 2 + end end From 82f5c8f3f5e306a8856831ae33b4637952de4039 Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Fri, 1 May 2020 11:27:34 -0400 Subject: [PATCH 2/5] Adding basic reaper --- lib/connection_pool.rb | 30 ++++++++++++++++------- lib/connection_pool/reaper.rb | 45 +++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 8 deletions(-) create mode 100644 lib/connection_pool/reaper.rb diff --git a/lib/connection_pool.rb b/lib/connection_pool.rb index bc97fd0..bbe7b58 100644 --- a/lib/connection_pool.rb +++ b/lib/connection_pool.rb @@ -1,5 +1,6 @@ require_relative 'connection_pool/version' require_relative 'connection_pool/timed_stack' +require_relative 'connection_pool/reaper' # Generic connection pool class for sharing a limited number of objects or network connections @@ -32,7 +33,7 @@ # - :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds # class ConnectionPool - DEFAULTS = {size: 5, timeout: 5} + DEFAULTS = {size: 5, timeout: 5, reaping_frequency: nil, reap_after: 60} class Error < RuntimeError end @@ -52,6 +53,14 @@ def initialize(options = {}, &block) @available = TimedStack.new(@size, &block) @key = :"pool-#{@available.object_id}" @key_count = :"pool-#{@available.object_id}-count" + + if options[:reaping_frequency] && options[:reap_after] + @reaper = ConnectionPoolReaper.new( + connection_pool: self, + reaping_frequency: options[:reaping_frequency], + reap_after: options[:reap_after] + ) + end end def with(options = {}) @@ -68,13 +77,17 @@ def with(options = {}) end def checkout(options = {}) - if ::Thread.current[@key] - ::Thread.current[@key_count] += 1 - ::Thread.current[@key] - else - ::Thread.current[@key_count] = 1 - ::Thread.current[@key] = @available.pop(options[:timeout] || @timeout) - end + connection = if ::Thread.current[@key] + ::Thread.current[@key_count] += 1 + ::Thread.current[@key] + else + ::Thread.current[@key_count] = 1 + ::Thread.current[@key] = @available.pop(options[:timeout] || @timeout) + end + + @reaper.mark_connection_as_used(connection) if defined?(@reaper) + + connection end def checkin @@ -99,6 +112,7 @@ def remove_connection(conn) end def shutdown(&block) + @reaper.shutdown if defined?(@reaper) @available.shutdown(&block) end diff --git a/lib/connection_pool/reaper.rb b/lib/connection_pool/reaper.rb new file mode 100644 index 0000000..b8e4672 --- /dev/null +++ b/lib/connection_pool/reaper.rb @@ -0,0 +1,45 @@ +class ConnectionPoolReaper + + def initialize(connection_pool:, reaping_frequency:, reap_after:) + @reaping_frequency = reaping_frequency + @reap_after = reap_after + @reaping_thread = start_reaping_thread + @connection_pool = connection_pool + @access_log = {} + @mutex = Mutex.new + + start_reaping_thread! + end + + def mark_connection_as_used(connection, used_at: Time.now) + @mutex.synchronize { @access_log[connection] = used_at } + end + + def reap_connections!(connection) + @mutex.synchronize do + required_last_access = Time.now - @reap_after + + to_remove = @access_log.delete_if do |_, last_access| + last_access < required_last_access + end + + to_remove.each_key do |connection| + @connection_pool.remove_connection(connection) + connection.close if connection.respond_to(:close) + end + end + end + + def shutdown + @reaping_thread.exit if @reaping_thread + end + + def start_reaping_thread + Thread.new do + loop do + sleep @reaping_frequency + reap_connections! + end + end + end +end From 294383df275b210e15d9f1b56138fd35adf52ed4 Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Fri, 1 May 2020 12:06:31 -0400 Subject: [PATCH 3/5] tests --- lib/connection_pool/reaper.rb | 18 +++++++++--------- test/test_connection_pool.rb | 32 +++++++++++++++++++++++++++++++- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/lib/connection_pool/reaper.rb b/lib/connection_pool/reaper.rb index b8e4672..c0bec3b 100644 --- a/lib/connection_pool/reaper.rb +++ b/lib/connection_pool/reaper.rb @@ -7,25 +7,25 @@ def initialize(connection_pool:, reaping_frequency:, reap_after:) @connection_pool = connection_pool @access_log = {} @mutex = Mutex.new - - start_reaping_thread! end - def mark_connection_as_used(connection, used_at: Time.now) - @mutex.synchronize { @access_log[connection] = used_at } + def mark_connection_as_used(connection) + @mutex.synchronize { @access_log[connection] = Time.now } end - def reap_connections!(connection) + def reap_connections! @mutex.synchronize do required_last_access = Time.now - @reap_after - to_remove = @access_log.delete_if do |_, last_access| - last_access < required_last_access + to_remove = [] + + @access_log.delete_if do |c, last_access| + last_access < required_last_access && to_remove << c end - to_remove.each_key do |connection| + to_remove.each do |connection| @connection_pool.remove_connection(connection) - connection.close if connection.respond_to(:close) + connection.close if connection.respond_to?(:close) end end end diff --git a/test/test_connection_pool.rb b/test/test_connection_pool.rb index 71763f4..437cc07 100644 --- a/test/test_connection_pool.rb +++ b/test/test_connection_pool.rb @@ -568,6 +568,36 @@ def test_removing_and_replacing_connections pool.remove_connection(conn) pool.with{ } - assert_equal created, 2 + assert_equal 2, created end + + def test_reaping + created = 0 + pool = ConnectionPool.new(timeout:0, size: 1, reap_after: 1, reaping_frequency: 1) do + created += 1 + NetworkConnection.new + end + + pool.with{} + sleep 2 + pool.with{} + + assert_equal 2, created + + # Don't forget to cleanup + pool.shutdown { } + end + + def test_reaping_tread_leakage + live_threads = Thread.list.count(:alive?) + + pool = ConnectionPool.new(timeout:0, size: 1, reap_after: 1, reaping_frequency: 1) do + NetworkConnection.new + end + + pool.shutdown { } + + assert_equal live_threads, Thread.list.count(:alive?) + end + end From dc9b5a1284aa7a8e062a0d9a48140e1ab3bbfd49 Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Fri, 1 May 2020 12:48:19 -0400 Subject: [PATCH 4/5] simplifying this --- lib/connection_pool/reaper.rb | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/connection_pool/reaper.rb b/lib/connection_pool/reaper.rb index c0bec3b..51789d8 100644 --- a/lib/connection_pool/reaper.rb +++ b/lib/connection_pool/reaper.rb @@ -17,15 +17,13 @@ def reap_connections! @mutex.synchronize do required_last_access = Time.now - @reap_after - to_remove = [] + @access_log.delete_if do |connection, last_access| + if last_access < required_last_access + @connection_pool.remove_connection(connection) + connection.close if connection.respond_to?(:close) - @access_log.delete_if do |c, last_access| - last_access < required_last_access && to_remove << c - end - - to_remove.each do |connection| - @connection_pool.remove_connection(connection) - connection.close if connection.respond_to?(:close) + true + end end end end From 087614c8a9854236ac8999b2b0b794368d4ee979 Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Fri, 1 May 2020 13:02:36 -0400 Subject: [PATCH 5/5] Rebuild