Skip to content

Commit

Permalink
Add prometheus capture context
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacov committed Jul 25, 2017
1 parent 5ad91c9 commit d447a05
Show file tree
Hide file tree
Showing 12 changed files with 3,686 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def log_severity
end

require_nested :HawkularCaptureContext
require_nested :PrometheusCaptureContext

INTERVAL = 20.seconds

Expand Down Expand Up @@ -57,7 +58,11 @@ def perf_collect_metrics(interval_name, start_time = nil, end_time = nil)
"[#{start_time}] [#{end_time}]")

begin
context = HawkularCaptureContext.new(target, start_time, end_time, INTERVAL)
context = if ems && ems.connection_configurations.prometheus.try(:endpoint)
PrometheusCaptureContext.new(target, start_time, end_time, INTERVAL)
else
HawkularCaptureContext.new(target, start_time, end_time, INTERVAL)
end
rescue TargetValidationError, TargetValidationWarning => e
_log.send(e.log_severity, "[#{target_name}] #{e.message}")
ems.try(:update_attributes,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
class ManageIQ::Providers::Kubernetes::ContainerManager::MetricsCapture
class PrometheusCaptureContext
include ManageIQ::Providers::Kubernetes::ContainerManager::MetricsCapture::PrometheusClientMixin
include CaptureContextMixin

def collect_node_metrics
# prometheus field is in sec, multiply by 1e9, sec to ns
cpu_resid = "sum(container_cpu_usage_seconds_total{container_name=\"\",id=\"/\",instance=\"#{@target.name}\",job=\"kubernetes-nodes\"}) * 1e9"
process_cpu_counters_rate(fetch_counters_rate(cpu_resid))

# prometheus field is in bytes
mem_resid = "sum(container_memory_usage_bytes{container_name=\"\",id=\"/\",instance=\"#{@target.name}\",job=\"kubernetes-nodes\"})"
process_mem_gauges_data(fetch_counters_data(mem_resid))

# prometheus field is in bytes
net_resid_rx = "sum(container_network_receive_bytes_total{container_name=\"\",id=\"/\",instance=\"#{@target.name}\",job=\"kubernetes-nodes\",interface=~\"eth.*\"})"
net_resid_tx = "sum(container_network_transmit_bytes_total{container_name=\"\",id=\"/\",instance=\"#{@target.name}\",job=\"kubernetes-nodes\",interface=~\"eth.*\"})"

net_counters = [fetch_counters_rate(net_resid_tx),
fetch_counters_rate(net_resid_rx)]

process_net_counters_rate(compute_summation(net_counters))
end

def collect_container_metrics
# FIXME: container_name => @target.name is a uniqe id ?
cpu_resid = "sum(container_cpu_usage_seconds_total{container_name=\"#{@target.name}\",job=\"kubernetes-nodes\"}) * 1e9"
process_cpu_counters_rate(fetch_counters_rate(cpu_resid))

mem_resid = "sum(container_memory_usage_bytes{container_name=\"#{@target.name}\",job=\"kubernetes-nodes\"})"
process_mem_gauges_data(fetch_counters_data(mem_resid))
end

def collect_group_metrics
cpu_counters = @target.containers.collect do |c|
cpu_resid = "sum(container_cpu_usage_seconds_total{container_name=\"#{c.name}\",job=\"kubernetes-nodes\"}) * 1e9"
fetch_counters_rate(cpu_resid)
end
process_cpu_counters_rate(compute_summation(cpu_counters))

mem_gauges = @target.containers.collect do |c|
mem_resid = "sum(container_memory_usage_bytes{container_name=\"#{c.name}\",job=\"kubernetes-nodes\"})"
fetch_counters_data(mem_resid)
end
process_mem_gauges_data(compute_summation(mem_gauges))

net_resid_rx = "sum(container_network_receive_bytes_total{container_name=\"POD\",pod_name=\"#{@target.name}\",job=\"kubernetes-nodes\",interface=~\"eth.*\"})"
net_resid_tx = "sum(container_network_transmit_bytes_total{container_name=\"POD\",pod_name=\"#{@target.name}\",job=\"kubernetes-nodes\",interface=~\"eth.*\"})"

net_counters = [fetch_counters_rate(net_resid_tx),
fetch_counters_rate(net_resid_rx)]
process_net_counters_rate(compute_summation(net_counters))
end

def fetch_counters_data(resource)
start_sec = (@starts / 1_000) - @interval
end_sec = @ends ? (@ends / 1_000).to_i : Time.now.utc.to_i

sort_and_normalize(
prometheus_client.get(
"query_range",
:query => resource,
:start => start_sec.to_i,
:end => end_sec,
:step => "#{@interval}s"
)
)
rescue StandardError => e
raise CollectionFailure, "#{e.class.name}: #{e.message}"
end

def sort_and_normalize(response)
response = JSON.parse(response.body)

if response["status"] == "error"
raise CollectionFailure, "[#{@target} #{@target.name}] " + response["error"]
end

unless response["data"] && response["data"]["result"] && response["data"]["result"][0]
raise CollectionFailure, "[#{@target} #{@target.name}] No data in response"
end

response["data"]["result"][0]["values"].map do |x|
# prometheus gives the time of last reading:
# devide and multiply to convert time to start of interval window
start_sec = (x[0] / @interval).to_i * @interval

{
"start" => start_sec.to_i.in_milliseconds,
"avg" => x[1].to_f
}
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class ManageIQ::Providers::Kubernetes::ContainerManager::MetricsCapture::PrometheusClient
include ManageIQ::Providers::Kubernetes::ContainerManager::MetricsCapture::PrometheusClientMixin

def initialize(ext_management_system, tenant = '_system')
@ext_management_system = ext_management_system
@tenant = tenant
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module ManageIQ::Providers::Kubernetes::ContainerManager::MetricsCapture::PrometheusClientMixin
require 'faraday'

def prometheus_client
@prometheus_uri ||= prometheus_uri
@prometheus_credentials ||= prometheus_credentials
@prometheus_options ||= prometheus_options

prometheus_client_new(@prometheus_uri, @prometheus_credentials, @prometheus_options)
end

def prometheus_client_new(uri, credentials, options)
Faraday.new(
:url => uri.to_s,
:proxy => options[:http_proxy_uri].empty? ? nil : options[:http_proxy_uri],
:ssl => {
:verify => options[:verify_ssl] != OpenSSL::SSL::VERIFY_NONE,
:cert_store => options[:ssl_cert_store]
},
:request => {
:open_timeout => 2, # opening a connection
:timeout => 5 # waiting for response
},
:headers => {
:Authorization => "Bearer " + credentials[:token]
}
)
end

def prometheus_endpoint
@ext_management_system.endpoints.find_by(:role => "prometheus")
end

def prometheus_uri
URI::HTTPS.build(
:host => prometheus_endpoint.hostname,
:port => prometheus_endpoint.port,
:path => "/api/v1/"
)
end

def prometheus_credentials
{:token => @ext_management_system.authentication_token("prometheus")}
end

def prometheus_options
{
:http_proxy_uri => VMDB::Util.http_proxy_uri.to_s,
:verify_ssl => @ext_management_system.verify_ssl_mode(prometheus_endpoint),
:ssl_cert_store => @ext_management_system.ssl_cert_store(prometheus_endpoint),
}
end

def prometheus_try_connect
prometheus_client.get("query").kind_of?(Hash)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
describe ManageIQ::Providers::Kubernetes::ContainerManager::MetricsCapture::PrometheusCaptureContext do
@node = nil

before(:each) do
allow(MiqServer).to receive(:my_zone).and_return("default")
hostname = 'capture.context.com'
token = 'theToken'

@ems = FactoryGirl.create(
:ems_openshift,
:name => 'OpenShiftProvider',
:connection_configurations => [{:endpoint => {:role => :default,
:hostname => hostname,
:port => "8443",
:verify_ssl => false},
:authentication => {:role => :bearer,
:auth_key => token,
:userid => "_"}},
{:endpoint => {:role => :prometheus,
:hostname => hostname,
:port => "443",
:verify_ssl => false},
:authentication => {:role => :prometheus,
:auth_key => token,
:userid => "_"}}]
)

if @node.nil?
VCR.use_cassette("#{described_class.name.underscore}_refresh",
:match_requests_on => [:path,]) do # , :record => :new_episodes) do
EmsRefresh.refresh(@ems)
@node = @ems.container_nodes.first
pod = @ems.container_groups.first
container = @ems.containers.first

@targets = [['node', @node], ['pod', pod], ['container', container]]
end
end
end

it "will read prometheus metrics" do
start_time = Time.parse("2017-07-12 06:40:42 UTC").utc
end_time = Time.parse("2017-07-12 09:45:42 UTC").utc
interval = 20

@targets.each do |target_name, target|
VCR.use_cassette("#{described_class.name.underscore}_#{target_name}_metrics") do # , :record => :new_episodes) do
context = ManageIQ::Providers::Kubernetes::ContainerManager::MetricsCapture::PrometheusCaptureContext.new(
target, start_time, end_time, interval
)

data = context.collect_metrics

expect(data).to be_a_kind_of(Array)
end
end
end

it "will read only specific timespan prometheus metrics" do
start_time = Time.parse("2017-07-12 06:40:42 UTC").utc
end_time = Time.parse("2017-07-12 06:45:42 UTC").utc
interval = 20

@targets.each do |target_name, target|
VCR.use_cassette("#{described_class.name.underscore}_#{target_name}_timespan") do # , :record => :new_episodes) do
context = ManageIQ::Providers::Kubernetes::ContainerManager::MetricsCapture::PrometheusCaptureContext.new(
target, start_time, end_time, interval
)

data = context.collect_metrics

expect(data.count).to be < 18
end
end
end
end

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Loading

0 comments on commit d447a05

Please sign in to comment.