diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py index b026a300de..95ed0135d4 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py @@ -95,10 +95,10 @@ def set_hbase_acls(self): Logger.info("Setting HBase ACLs for profiler") if self.__params.security_enabled: metron_security.kinit(self.__params.kinit_path_local, - self.__params.hbase_keytab_path, - self.__params.hbase_principal_name, - execute_user=self.__params.hbase_user) - + self.__params.hbase_keytab_path, + self.__params.hbase_principal_name, + execute_user=self.__params.hbase_user) + cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n" add_table_acl_cmd = cmd.format(self.__params.metron_user, self.__params.profiler_hbase_table) Execute(add_table_acl_cmd, @@ -150,6 +150,24 @@ def stop_profiler_topology(self, env): Logger.info('Done stopping profiler topologies') + def stop_profiler_topology_immediately(self, env): + Logger.info('Stopping ' + self.__profiler_topology) + + if self.is_topology_active(env) or self.is_topology_in_killed_state_for_longtime(env): + if self.__params.security_enabled: + metron_security.kinit(self.__params.kinit_path_local, + self.__params.metron_keytab_path, + self.__params.metron_principal_name, + execute_user=self.__params.metron_user) + stop_cmd = 'storm kill ' + self.__profiler_topology +' -w 0' + Execute(stop_cmd, user=self.__params.metron_user, tries=3, try_sleep=5, logoutput=True) + + else: + Logger.info("Profiler topology already stopped") + + Logger.info('Done stopping profiler topologies') + + def restart_profiler_topology(self, env): Logger.info('Restarting the profiler topologies') self.stop_profiler_topology(env) @@ -165,6 +183,9 @@ def restart_profiler_topology(self, env): if not topology_active: Logger.info('Waiting for storm kill to complete') time.sleep(30) + if self.is_topology_in_killed_state_for_longtime(env): + Logger.info('kill topology immediately before restarting the profiler topologies') + self.stop_profiler_topology_immediately(env) self.start_profiler_topology(env) Logger.info('Done restarting the profiler topologies') else: @@ -180,6 +201,18 @@ def is_topology_active(self, env): active &= is_running return active + def is_topology_in_killed_state_for_longtime(self, env): + env.set_params(self.__params) + in_killed_for_long = True + topologies = metron_service.get_running_topologies(self.__params) + is_killed = False + if self.__profiler_topology in topologies: + is_killed = topologies[self.__profiler_topology] == 'KILLED' + in_killed_for_long &= is_killed + if in_killed_for_long: + Logger.info('Profiler topology is in KILLED state for long time.') + return in_killed_for_long + def service_check(self, env): """ Performs a service check for the Profiler. @@ -206,3 +239,4 @@ def service_check(self, env): raise Fail("Profiler topology not running") Logger.info("Profiler service check completed successfully") +