Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1581: kill the profiler topology immediately before restarting the topology if it is in 'KILLED' state for longer time #1035

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def set_hbase_acls(self):
Logger.info("Setting HBase ACLs for profiler")
if self.__params.security_enabled:
metron_security.kinit(self.__params.kinit_path_local,
self.__params.hbase_keytab_path,
self.__params.hbase_principal_name,
execute_user=self.__params.hbase_user)
self.__params.hbase_keytab_path,
self.__params.hbase_principal_name,
execute_user=self.__params.hbase_user)

cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n"
add_table_acl_cmd = cmd.format(self.__params.metron_user, self.__params.profiler_hbase_table)
Execute(add_table_acl_cmd,
Expand Down Expand Up @@ -150,6 +150,24 @@ def stop_profiler_topology(self, env):

Logger.info('Done stopping profiler topologies')

def stop_profiler_topology_immediately(self, env):
Logger.info('Stopping ' + self.__profiler_topology)

if self.is_topology_active(env) or self.is_topology_in_killed_state_for_longtime(env):
if self.__params.security_enabled:
metron_security.kinit(self.__params.kinit_path_local,
self.__params.metron_keytab_path,
self.__params.metron_principal_name,
execute_user=self.__params.metron_user)
stop_cmd = 'storm kill ' + self.__profiler_topology +' -w 0'
Execute(stop_cmd, user=self.__params.metron_user, tries=3, try_sleep=5, logoutput=True)

else:
Logger.info("Profiler topology already stopped")

Logger.info('Done stopping profiler topologies')


def restart_profiler_topology(self, env):
Logger.info('Restarting the profiler topologies')
self.stop_profiler_topology(env)
Expand All @@ -165,6 +183,9 @@ def restart_profiler_topology(self, env):
if not topology_active:
Logger.info('Waiting for storm kill to complete')
time.sleep(30)
if self.is_topology_in_killed_state_for_longtime(env):
Logger.info('kill topology immediately before restarting the profiler topologies')
self.stop_profiler_topology_immediately(env)
self.start_profiler_topology(env)
Logger.info('Done restarting the profiler topologies')
else:
Expand All @@ -180,6 +201,18 @@ def is_topology_active(self, env):
active &= is_running
return active

def is_topology_in_killed_state_for_longtime(self, env):
env.set_params(self.__params)
in_killed_for_long = True
topologies = metron_service.get_running_topologies(self.__params)
is_killed = False
if self.__profiler_topology in topologies:
is_killed = topologies[self.__profiler_topology] == 'KILLED'
in_killed_for_long &= is_killed
if in_killed_for_long:
Logger.info('Profiler topology is in KILLED state for long time.')
return in_killed_for_long

def service_check(self, env):
"""
Performs a service check for the Profiler.
Expand All @@ -206,3 +239,4 @@ def service_check(self, env):
raise Fail("Profiler topology not running")

Logger.info("Profiler service check completed successfully")