diff --git a/app/models/manageiq/providers/kubernetes/monitoring_manager.rb b/app/models/manageiq/providers/kubernetes/monitoring_manager.rb deleted file mode 100644 index 13e93db054..0000000000 --- a/app/models/manageiq/providers/kubernetes/monitoring_manager.rb +++ /dev/null @@ -1,102 +0,0 @@ -module ManageIQ::Providers - class Kubernetes::MonitoringManager < ManageIQ::Providers::MonitoringManager - require_nested :EventCatcher - - ENDPOINT_ROLE = :prometheus_alerts - - - belongs_to :parent_manager, - :foreign_key => :parent_ems_id, - :class_name => "ManageIQ::Providers::Kubernetes::ContainerManager", - :inverse_of => :monitoring_manager - - delegate :authentication_check, - :authentication_for_summary, - :authentication_status, - :authentication_status_ok, - :authentication_token, - :authentications, - :endpoints, - :zone, - :to => :parent_manager, - :allow_nil => true - - def self.hostname_required? - false - end - - def self.ems_type - @ems_type ||= "kubernetes_monitor".freeze - end - - def self.description - @description ||= "Kubernetes Monitor".freeze - end - - def self.event_monitor_class - ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher - end - - def self.verify_credentials(options) - raw_connect(options)&.get&.key?('generationID') - rescue OpenSSL::X509::CertificateError => err - raise MiqException::MiqInvalidCredentialsError, "SSL Error: #{err.message}" - rescue Faraday::ParsingError - raise MiqException::MiqUnreachableError, 'Unexpected Response' - rescue Faraday::ClientError => err - raise MiqException::MiqUnreachableError, err.message - rescue => err - raise MiqException::MiqUnreachableError, err.message, err.backtrace - end - - def self.raw_connect(options) - require 'prometheus/alert_buffer_client' - Prometheus::AlertBufferClient::Client.new(options) - end - - def prometheus_alerts_endpoint - connection_configurations.prometheus_alerts.try(:endpoint) - end - - def verify_credentials(_auth_type = nil, _options = {}) - with_provider_connection do |conn| - conn.get.key?('generationID') - end - rescue OpenSSL::X509::CertificateError => err - raise MiqException::MiqInvalidCredentialsError, "SSL Error: #{err.message}" - rescue Faraday::ParsingError - raise MiqException::MiqUnreachableError, 'Unexpected Response' - rescue Faraday::ClientError => err - raise MiqException::MiqUnreachableError, err.message - rescue StandardError => err - raise MiqException::MiqUnreachableError, err.message, err.backtrace - end - - def connect(_options = {}) - settings = ::Settings.ems.ems_kubernetes.ems_monitoring.alerts_collection - self.class.raw_connect( - :url => "https://#{prometheus_alerts_endpoint.hostname}:#{prometheus_alerts_endpoint.port}", - :path => "/topics/alerts", - :credentials => {:token => authentication_token}, - :ssl => {:verify => verify_ssl, - :cert_store => ssl_cert_store}, - :request => {:open_timeout => settings.open_timeout.to_f_with_method, - :timeout => settings.timeout.to_f_with_method}, - :proxy => parent_manager.options ? parent_manager.options.fetch_path(:proxy_settings, :http_proxy) : nil, - ) - end - - def default_authentication_type - ENDPOINT_ROLE - end - - def ssl_cert_store - # nil === use system CA bundle - prometheus_alerts_endpoint.try(:ssl_cert_store) - end - - def verify_ssl - parent_manager.verify_ssl_mode(prometheus_alerts_endpoint) == OpenSSL::SSL::VERIFY_PEER - end - end -end diff --git a/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher.rb b/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher.rb deleted file mode 100644 index 7f8f4bad69..0000000000 --- a/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher.rb +++ /dev/null @@ -1,9 +0,0 @@ -class ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher < ManageIQ::Providers::BaseManager::EventCatcher - require_nested :Runner - require_nested :RunnerMixin - require_nested :Stream - - def self.settings_name - :event_catcher_prometheus - end -end diff --git a/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher/runner.rb b/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher/runner.rb deleted file mode 100644 index 3dae6ad9bd..0000000000 --- a/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher/runner.rb +++ /dev/null @@ -1,3 +0,0 @@ -class ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher::Runner < ManageIQ::Providers::BaseManager::EventCatcher::Runner - include ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher::RunnerMixin -end diff --git a/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher/runner_mixin.rb b/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher/runner_mixin.rb deleted file mode 100644 index fef55df18c..0000000000 --- a/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher/runner_mixin.rb +++ /dev/null @@ -1,138 +0,0 @@ -module ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher::RunnerMixin - extend ActiveSupport::Concern - include Vmdb::Logging - - # This module is shared between: - # - Kubernetes::MonitoringManager::EventCatcher - # - Openshift::MonitoringManager::EventCatcher - - def event_monitor_handle - @event_monitor_handle ||= ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher::Stream.new(@ems) - end - - def reset_event_monitor_handle - @event_monitor_handle = nil - end - - def stop_event_monitor - @event_monitor_handle.stop unless @event_monitor_handle.nil? - rescue => err - _log.error("Event Monitor error [#{err.message}]") - _log.error("Error details: [#{err.details}]") - _log.log_backtrace(err) - ensure - reset_event_monitor_handle - end - - def monitor_events - _log.info("[#{self.class.name}] Event Monitor started") - @target_ems_id = @ems.parent_manager.id - event_monitor_handle.start - event_monitor_running - event_monitor_handle.each_batch do |events| - @queue.enq(events) unless events.blank? - sleep_poll_normal - end - ensure - reset_event_monitor_handle - end - - def queue_event(event) - event_hash = extract_event_data(event) - if event_hash - _log.info("Queuing event [#{event_hash}]") - EmsEvent.add_queue("add", @target_ems_id, event_hash) - end - end - - def extract_event_data(event) - # EXAMPLE: - # - # { - # "annotations": { - # "description": "Node ocp-compute01.10.35.48.236.nip.io is down", - # "source": "ManageIQ", - # "url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ" - # }, - # "endsAt": "0001-01-01T00:00:00Z", - # "generatorURL": "http://prometheus-4018548653-w3str:9090/graph?g0.expr=container_fs_usage_bytes%7Bcontainer_name%3D%22%22%2Cdevice%3D%22%2Fdev%2Fmapper%2Fvg0-lv_root%22%7D+%3E+4e%2B07&g0.tab=0", - # "labels": { - # "severity": "error", - # "alertname": "Node down", - # "beta_kubernetes_io_arch": "amd64", - # "beta_kubernetes_io_os": "linux", - # "device": "/dev/mapper/vg0-lv_root", - # "id": "/", - # "instance": "ocp-compute01.10.35.48.236.nip.io", - # "job": "kubernetes-nodes", - # "kubernetes_io_hostname": "ocp-compute01.10.35.48.236.nip.io", - # "region": "primary", - # "zone": "default" - # }, - # "startsAt": "2017-07-17T12:18:00.457154718Z", - # "status": "firing", - # "generationID" : "323e0863-f501-4896-b7dc-353cf863597d", # Added in stream - # "index": 1, # Added in stream - # }, - event = event.dup - - annotations = event["annotations"] - labels = event["labels"] - - event[:url] = annotations["url"] - event[:severity] = parse_severity(labels["severity"]) - # TODO(mtayer): remove after https://github.com/ManageIQ/manageiq/pull/16339 - event[:ems_ref] = incident_identifier(labels, annotations, event["startsAt"]) - event[:resolved] = event["status"] == "resolved" - { - :source => "DATAWAREHOUSE", - :timestamp => Time.zone.now, - :event_type => "datawarehouse_alert", - :message => annotations["description"], - :ems_ref => incident_identifier(labels, annotations, event["startsAt"]), - :full_data => event.to_h - }.merge( - find_target( - annotations, - labels - ) - ) - end - - def find_target(annotations, labels) - unless annotations.fetch_path("miqTarget") == "ExtManagementSystem" - # TODO: we must do the db query here unless we get Prometheus to emit ems_ref - node = ContainerNode.find_by(:ems_id => @target_ems_id, :name => labels["instance"]) - if node - return { - :container_node_name => labels["instance"], - :container_node_id => node.try(:id), - :target_type => node.try(:class).try(:name), - :target_id => node.try(:id), - } - else - _log.warn("Could not find node from labels: [#{labels}] defaulting to ems") - end - end - { - :target_type => 'ExtManagementSystem', - :target_id => @target_ems_id, - } - end - - def parse_severity(severity) - MiqAlertStatus::SEVERITY_LEVELS.find { |x| x == severity.to_s.downcase } || "error" - end - - def incident_identifier(labels, annotations, started) - # When event b resolves event a, they both have the same startAt. - # Labels are added to avoid having two incidents starting at the same time. - Digest::SHA256.hexdigest( - ( - annotations.sort.flatten + - labels.sort.flatten + - ["startsAt", started] - ).join('|') - ) - end -end diff --git a/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher/stream.rb b/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher/stream.rb deleted file mode 100644 index bc1572412b..0000000000 --- a/app/models/manageiq/providers/kubernetes/monitoring_manager/event_catcher/stream.rb +++ /dev/null @@ -1,82 +0,0 @@ -class ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher::Stream - include Vmdb::Logging - - def initialize(ems) - @ems = ems - end - - def start - @collecting_events = true - end - - def stop - @collecting_events = false - end - - def each_batch - while @collecting_events - yield(fetch) - end - rescue EOFError => err - _log.info("Monitoring connection closed #{err}") - end - - def fetch - unless @current_generation - @current_generation, @current_index = last_position - end - _log.info("Fetching alerts. Generation: [#{@current_generation}/#{@current_index}]") - - # { - # "generationID":"323e0863-f501-4896-b7dc-353cf863597d", - # "messages":[ - # "index": 1, - # "timestamp": "2017-10-17T08:30:00.466775417Z", - # "data": { - # "alerts": [ - # ... - # ] - # } - # ... - # ] - # } - alert_list = @ems.connect.get(:generation_id => @current_generation, :from_index => @current_index) - alerts = [] - @current_generation = alert_list["generationID"] - return alerts if alert_list['messages'].blank? - alert_list["messages"].each do |message| - @current_index = message['index'] - message["data"]["alerts"].each do |alert| - if alert_for_miq?(alert) - alerts << process_alert!(alert, @current_generation, @current_index) - else - _log.info("Skipping alert due to missing annotation or unexpected target") - end - end - @current_index += 1 - end - _log.info("[#{alerts.size}] new alerts. New generation: [#{@current_generation}/#{@current_index}]") - _log.debug(alerts) - alerts - end - - def process_alert!(alert, generation, group_index) - alert['generationID'] = generation - alert['index'] = group_index - alert - end - - def alert_for_miq?(alert) - alert.fetch_path("annotations", "miqIgnore").to_s.downcase != "true" - end - - def last_position - last_event = @ems.parent_manager.ems_events.where(:source => "DATAWAREHOUSE").last - last_event ||= OpenStruct.new(:full_data => {}) - last_index = last_event.full_data['index'] - [ - last_event.full_data['generationID'].to_s, - last_index ? last_index + 1 : 0 - ] - end -end