Skip to content

Commit

Permalink
fix db pool exaust issues with docker listener (#13)
Browse files Browse the repository at this point in the history
Co-authored-by: Vasco Santos <7835679+valexsantos@users.noreply.github.com>
  • Loading branch information
valexsantos and Vasco Santos authored Apr 24, 2024
1 parent 6a31b9e commit d59a034
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
63 changes: 49 additions & 14 deletions lib/mkit/docker_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,32 @@
# https://docs.docker.com/engine/reference/commandline/events
require 'mkit/app/helpers/docker_helper'
module MKIt
class StopThread < RuntimeError; end

class DockerListener
include MKIt::DockerHelper

def initialize
@consumers = []
@queue = Queue.new
end

def enqueue(msg)
@queue << msg
end

def register_consumer(consumer:)
def start
@consumer.run if register_consumer
@listener.run if register_listener
end

def stop
@listener.exit if @listener
@consumer.raise StopThread.new
MKItLogger.info("docker listener stopped")
end

private

def parse_message(msg)
action = msg['Action'].to_sym
type = msg['Type'].to_sym
Expand Down Expand Up @@ -45,15 +61,17 @@ def parse_message(msg)
MKItLogger.debug(" #{type} #{action} <<TODO>>")
end
else
MKItLogger.warn("docker <<#{type}>> <#{action}> received: #{msg}. But I don't know anything about pod #{pod_id}")
MKItLogger.warn("docker <<#{type}>> <#{action}> received: #{msg}. But I don't know anything about pod #{pod_id}/#{pod_name}")
end
when :network
pod_id = msg.Actor.Attributes.container
pod = Pod.find_by(pod_id: pod_id)
inspect = inspect_instance(pod_id).to_o
pod_name = inspect.Name[1..]
pod = Pod.find_by(name: pod_name)
unless pod.nil?
case action
when :connect
MKItLogger.info("docker network #{action} received: #{msg}")
MKItLogger.info("docker network #{action} received: #{msg} for pod #{pod_name}")
pod.update_ip
pod.save
when :disconnect
Expand All @@ -62,20 +80,41 @@ def parse_message(msg)
MKItLogger.debug(" #{type} #{action} <<TODO>>")
end
else
MKItLogger.warn("docker <<#{type}>> <#{action}> received: #{msg}. But I don't know anything about pod #{pod_id}")
MKItLogger.warn("docker <<#{type}>> <#{action}> received: #{msg}. But I don't know anything about pod #{pod_id}/#{pod_name}")
end
else
MKItLogger.info("\t#{type} #{action} <<unknown>>")
end
end

def start
@thread ||= Thread.new {
def register_consumer
return false unless @consumer.nil?

@consumer = Thread.new do
running = true
while running
begin
parse_message(@queue.pop)
rescue StopThread
running = false
MKItLogger.info("docker consumer ended")
rescue => e
MKItLogger.error("error while consuming docker notification: #{e}", e.message, e.backtrace.join("\n"))
end
end
end
true
end

def register_listener
return false unless @listener.nil?

@listener = Thread.new {
cmd = "docker events --format '{{json .}}'"
begin
PTY.spawn( cmd ) do |stdout, stdin, pid|
begin
stdout.each { |line| parse_message JSON.parse(line).to_o }
stdout.each { |line| enqueue JSON.parse(line).to_o }
rescue Errno::EIO
MKItLogger.warn("Errno:EIO error, but this probably just means " +
"that the process has finished giving output")
Expand All @@ -85,12 +124,8 @@ def start
MKItLogger.warn("docker event listener process exited!")
end
}
@thread.run
MKItLogger.info("docker listener started")
end
def stop
@thread.exit if @thread
MKItLogger.info("docker listener stopped")
true
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/mkit/version.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module MKIt
VERSION = "0.6.2"
VERSION = "0.6.3"
end

0 comments on commit d59a034

Please sign in to comment.