Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3006.x] Ensure channels are closed on connection errors #65957

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions salt/channel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,14 +594,22 @@ def _verify_master_signature(self, payload):
def _decode_payload(self, payload):
# we need to decrypt it
log.trace("Decoding payload: %s", payload)
reauth = False
if payload["enc"] == "aes":
self._verify_master_signature(payload)
try:
payload["load"] = self.auth.crypticle.loads(payload["load"])
except salt.crypt.AuthenticationError:
yield self.auth.authenticate()
payload["load"] = self.auth.crypticle.loads(payload["load"])

reauth = True
if reauth:
try:
yield self.auth.authenticate()
payload["load"] = self.auth.crypticle.loads(payload["load"])
except salt.crypt.AuthenticationError:
log.error(
"Payload decryption failed even after re-authenticating with master %s",
self.opts["master_ip"],
s0undt3ch marked this conversation as resolved.
Show resolved Hide resolved
)
raise salt.ext.tornado.gen.Return(payload)

def __enter__(self):
Expand Down
16 changes: 7 additions & 9 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1152,17 +1152,20 @@ def _connect_minion(self, minion):
self.minions.append(minion)
break
except SaltClientError as exc:
minion.destroy()
failed = True
log.error(
"Error while bringing up minion for multi-master. Is "
"master at %s responding?",
"master at %s responding? The error message was %s",
minion.opts["master"],
exc,
)
last = time.time()
if auth_wait < self.max_auth_wait:
auth_wait += self.auth_wait
yield salt.ext.tornado.gen.sleep(auth_wait) # TODO: log?
except SaltMasterUnresolvableError:
minion.destroy()
err = (
"Master address: '{}' could not be resolved. Invalid or"
" unresolveable address. Set 'master' value in minion config.".format(
Expand All @@ -1172,6 +1175,7 @@ def _connect_minion(self, minion):
log.error(err)
break
except Exception as e: # pylint: disable=broad-except
minion.destroy()
failed = True
log.critical(
"Unexpected error while connecting to %s",
Expand Down Expand Up @@ -3281,20 +3285,14 @@ def destroy(self):
"""
Tear down the minion
"""
if self._running is False:
return
s0undt3ch marked this conversation as resolved.
Show resolved Hide resolved

self._running = False
if hasattr(self, "schedule"):
del self.schedule
if hasattr(self, "pub_channel") and self.pub_channel is not None:
self.pub_channel.on_recv(None)
if hasattr(self.pub_channel, "close"):
self.pub_channel.close()
del self.pub_channel
if hasattr(self, "req_channel") and self.req_channel:
self.pub_channel.close()
s0undt3ch marked this conversation as resolved.
Show resolved Hide resolved
if hasattr(self, "req_channel") and self.req_channel is not None:
self.req_channel.close()
self.req_channel = None
s0undt3ch marked this conversation as resolved.
Show resolved Hide resolved
if hasattr(self, "periodic_callbacks"):
for cb in self.periodic_callbacks.values():
cb.stop()
Expand Down
7 changes: 6 additions & 1 deletion salt/transport/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,12 @@ def on_recv(self, callback):
:param func callback: A function which should be called when data is received
"""
return self.stream.on_recv(callback)
try:
return self.stream.on_recv(callback)
except OSError as exc:
if callback is None and str(exc) == "Stream is closed":
return
raise

@salt.ext.tornado.gen.coroutine
def send(self, msg):
Expand Down
77 changes: 77 additions & 0 deletions tests/pytests/unit/test_minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,31 @@
log = logging.getLogger(__name__)


@pytest.fixture
def connect_master_mock():
class ConnectMasterMock:
"""
Mock connect master call.

The first call will raise an exception stored on the exc attribute.
Subsequent calls will return True.
"""

def __init__(self):
self.calls = 0
self.exc = Exception

@salt.ext.tornado.gen.coroutine
def __call__(self, *args, **kwargs):
self.calls += 1
if self.calls == 1:
raise self.exc()
else:
return True

return ConnectMasterMock()


def test_minion_load_grains_false(minion_opts):
"""
Minion does not generate grains when load_grains is False
Expand Down Expand Up @@ -1119,3 +1144,55 @@ def test_load_args_and_kwargs(minion_opts):
_args = [{"max_sleep": 40, "__kwarg__": True}]
with pytest.raises(salt.exceptions.SaltInvocationError):
ret = salt.minion.load_args_and_kwargs(test_mod.rand_sleep, _args)


async def test_connect_master_salt_client_error(minion_opts, connect_master_mock):
"""
Ensure minion's destory method is called on an salt client error while connecting to master.
"""
minion_opts["acceptance_wait_time"] = 0
mm = salt.minion.MinionManager(minion_opts)
minion = salt.minion.Minion(minion_opts)

connect_master_mock.exc = SaltClientError
minion.connect_master = connect_master_mock
minion.destroy = MagicMock()
await mm._connect_minion(minion)
minion.destroy.assert_called_once()

# The first call raised an error which caused minion.destroy to get called,
# the second call is a success.
assert minion.connect_master.calls == 2


async def test_connect_master_unresolveable_error(minion_opts, connect_master_mock):
"""
Ensure minion's destory method is called on an unresolvable while connecting to master.
"""
mm = salt.minion.MinionManager(minion_opts)
minion = salt.minion.Minion(minion_opts)
connect_master_mock.exc = SaltMasterUnresolvableError
minion.connect_master = connect_master_mock
minion.destroy = MagicMock()
mm._connect_minion(minion)
minion.destroy.assert_called_once()

# Unresolvable errors break out of the loop.
assert minion.connect_master.calls == 1


async def test_connect_master_general_exception_error(minion_opts, connect_master_mock):
"""
Ensure minion's destory method is called on an un-handled exception while connecting to master.
"""
mm = salt.minion.MinionManager(minion_opts)
minion = salt.minion.Minion(minion_opts)
connect_master_mock.exc = Exception
minion.connect_master = connect_master_mock
minion.destroy = MagicMock()
mm._connect_minion(minion)
minion.destroy.assert_called_once()

# The first call raised an error which caused minion.destroy to get called,
# the second call is a success.
assert minion.connect_master.calls == 2
Loading