Skip to content

Commit

Permalink
Fix security_rolling_upgrade_test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
rondagostino committed Feb 26, 2021
1 parent 6e6b32b commit e58e5c9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
8 changes: 8 additions & 0 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,14 @@ def security_config(self):
serves_intercontroller_sasl_mechanism=serves_intercontroller_sasl_mechanism,
uses_controller_sasl_mechanism=uses_controller_sasl_mechanism,
raft_tls=raft_tls)
# Ensure we have the right inter-broker security protocol because it may have been mutated
# since we cached our security config (ignore if this is a remote raft controller quorum case; the
# inter-broker security protocol is not used there).
if (self.quorum_info.using_zk or self.quorum_info.has_brokers) and \
self._security_config.interbroker_security_protocol != self.interbroker_security_protocol:
self._security_config.interbroker_security_protocol = self.interbroker_security_protocol
self._security_config.calc_has_sasl()
self._security_config.calc_has_ssl()
for port in self.port_mappings.values():
if port.open:
self._security_config.enable_security_protocol(port.security_protocol)
Expand Down
20 changes: 16 additions & 4 deletions tests/kafkatest/services/security/security_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,6 @@ def __init__(self, context, security_protocol=None, interbroker_security_protoco
uses_raft_sasl += [uses_controller_sasl_mechanism]
self.uses_raft_sasl = set(uses_raft_sasl)

self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol) or zk_sasl \
or self.serves_raft_sasl or self.uses_raft_sasl
self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol) or zk_tls \
or raft_tls
self.zk_sasl = zk_sasl
self.zk_tls = zk_tls
self.static_jaas_conf = static_jaas_conf
Expand All @@ -210,13 +206,29 @@ def __init__(self, context, security_protocol=None, interbroker_security_protoco
'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
'sasl.kerberos.service.name' : 'kafka'
}
self.raft_tls = raft_tls

if tls_version is not None:
self.properties.update({'tls.version' : tls_version})

self.properties.update(self.listener_security_config.client_listener_overrides)
self.jaas_override_variables = jaas_override_variables or {}

self.calc_has_sasl()
self.calc_has_ssl()

def calc_has_sasl(self):
self.has_sasl = self.is_sasl(self.properties['security.protocol']) \
or self.is_sasl(self.interbroker_security_protocol) \
or self.zk_sasl \
or self.serves_raft_sasl or self.uses_raft_sasl

def calc_has_ssl(self):
self.has_ssl = self.is_ssl(self.properties['security.protocol']) \
or self.is_ssl(self.interbroker_security_protocol) \
or self.zk_tls \
or self.raft_tls

def client_config(self, template_props="", node=None, jaas_override_variables=None,
use_inter_broker_mechanism_for_client = False):
# If node is not specified, use static jaas config which will be created later.
Expand Down

0 comments on commit e58e5c9

Please sign in to comment.