Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reaper for unused connections #127

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions lib/connection_pool.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require_relative 'connection_pool/version'
require_relative 'connection_pool/timed_stack'
require_relative 'connection_pool/reaper'


# Generic connection pool class for sharing a limited number of objects or network connections
Expand Down Expand Up @@ -32,7 +33,7 @@
# - :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds
#
class ConnectionPool
DEFAULTS = {size: 5, timeout: 5}
DEFAULTS = {size: 5, timeout: 5, reaping_frequency: nil, reap_after: 60}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the choice to make reaping_frequency the trigger to turn on reaping is to make it behave like ActiveRecord.

60s is a short reap_after in my opinion though.


class Error < RuntimeError
end
Expand All @@ -52,6 +53,14 @@ def initialize(options = {}, &block)
@available = TimedStack.new(@size, &block)
@key = :"pool-#{@available.object_id}"
@key_count = :"pool-#{@available.object_id}-count"

if options[:reaping_frequency] && options[:reap_after]
@reaper = ConnectionPoolReaper.new(
connection_pool: self,
reaping_frequency: options[:reaping_frequency],
reap_after: options[:reap_after]
)
end
end

def with(options = {})
Expand All @@ -68,13 +77,17 @@ def with(options = {})
end

def checkout(options = {})
if ::Thread.current[@key]
::Thread.current[@key_count] += 1
::Thread.current[@key]
else
::Thread.current[@key_count] = 1
::Thread.current[@key] = @available.pop(options[:timeout] || @timeout)
end
connection = if ::Thread.current[@key]
::Thread.current[@key_count] += 1
::Thread.current[@key]
else
::Thread.current[@key_count] = 1
::Thread.current[@key] = @available.pop(options[:timeout] || @timeout)
end

@reaper.mark_connection_as_used(connection) if defined?(@reaper)

connection
end

def checkin
Expand All @@ -92,7 +105,14 @@ def checkin
nil
end

# Removes a connection from the pool and makes the space available again
# connection may not currently be checked out of the queue.
def remove_connection(conn)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is called during a with block, or before checkin is called the connection will end up back in the queue.

This will not close the connection

@available.remove_connection(conn)
end

def shutdown(&block)
@reaper.shutdown if defined?(@reaper)
@available.shutdown(&block)
end

Expand Down
43 changes: 43 additions & 0 deletions lib/connection_pool/reaper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
class ConnectionPoolReaper

def initialize(connection_pool:, reaping_frequency:, reap_after:)
@reaping_frequency = reaping_frequency
@reap_after = reap_after
@reaping_thread = start_reaping_thread
@connection_pool = connection_pool
@access_log = {}
@mutex = Mutex.new
end

def mark_connection_as_used(connection)
@mutex.synchronize { @access_log[connection] = Time.now }
end

def reap_connections!
@mutex.synchronize do
required_last_access = Time.now - @reap_after

@access_log.delete_if do |connection, last_access|
if last_access < required_last_access
@connection_pool.remove_connection(connection)
connection.close if connection.respond_to?(:close)

true
end
end
end
end

def shutdown
@reaping_thread.exit if @reaping_thread
end

def start_reaping_thread
Thread.new do
loop do
sleep @reaping_frequency
reap_connections!
end
end
end
end
6 changes: 6 additions & 0 deletions lib/connection_pool/timed_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ def length
@max - @created + @que.length
end

def remove_connection(conn)
@mutex.synchronize do
@created -= 1 if @que.delete(conn)
end
end

private

def current_time
Expand Down
46 changes: 46 additions & 0 deletions test/test_connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -554,4 +554,50 @@ def test_stats_with_string_size
assert_equal(1, pool.available)
end
end

def test_removing_and_replacing_connections
created = 0
pool = ConnectionPool.new(timeout:0, size: 1) do
created += 1
NetworkConnection.new
end

conn = pool.checkout
pool.checkin

pool.remove_connection(conn)
pool.with{ }

assert_equal 2, created
end

def test_reaping
created = 0
pool = ConnectionPool.new(timeout:0, size: 1, reap_after: 1, reaping_frequency: 1) do
created += 1
NetworkConnection.new
end

pool.with{}
sleep 2
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't ideal, but I didn't want to introduce some mocking framework like timecop without feedback

pool.with{}

assert_equal 2, created

# Don't forget to cleanup
pool.shutdown { }
end

def test_reaping_tread_leakage
live_threads = Thread.list.count(:alive?)

pool = ConnectionPool.new(timeout:0, size: 1, reap_after: 1, reaping_frequency: 1) do
NetworkConnection.new
end

pool.shutdown { }

assert_equal live_threads, Thread.list.count(:alive?)
end

end