Skip to content

Commit

Permalink
Refactor the AES-GCM support a bunch
Browse files Browse the repository at this point in the history
Some of this is also just long overdue cleanup.

Includes renaming `IV_*` to `iv_*` for consistency (again, this is old).
  • Loading branch information
bitprophet committed Sep 15, 2024
1 parent f2ebac5 commit 977913f
Showing 1 changed file with 75 additions and 113 deletions.
188 changes: 75 additions & 113 deletions paramiko/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,52 +244,44 @@ class Transport(threading.Thread, ClosingContextManager):
"class": algorithms.AES,
"mode": modes.CTR,
"block-size": 16,
"iv-size": 16,
"key-size": 16,
},
"aes192-ctr": {
"class": algorithms.AES,
"mode": modes.CTR,
"block-size": 16,
"iv-size": 16,
"key-size": 24,
},
"aes256-ctr": {
"class": algorithms.AES,
"mode": modes.CTR,
"block-size": 16,
"iv-size": 16,
"key-size": 32,
},
"aes128-cbc": {
"class": algorithms.AES,
"mode": modes.CBC,
"block-size": 16,
"iv-size": 16,
"key-size": 16,
},
"aes192-cbc": {
"class": algorithms.AES,
"mode": modes.CBC,
"block-size": 16,
"iv-size": 16,
"key-size": 24,
},
"aes256-cbc": {
"class": algorithms.AES,
"mode": modes.CBC,
"block-size": 16,
"iv-size": 16,
"key-size": 32,
},
"3des-cbc": {
"class": TripleDES,
"mode": modes.CBC,
"block-size": 8,
"iv-size": 8,
"key-size": 24,
},
# aead cipher
"aes128-gcm@openssh.com": {
"class": aead.AESGCM,
"block-size": 16,
Expand Down Expand Up @@ -2054,23 +2046,26 @@ def _compute_key(self, id, nbytes):
sofar += digest
return out[:nbytes]

def _get_cipher(self, name, key, iv, operation):
def _get_engine(self, name, key, iv=None, operation=None, aead=False):
if name not in self._cipher_info:
raise SSHException("Unknown client cipher " + name)
raise SSHException("Unknown cipher " + name)
info = self._cipher_info[name]
algorithm = info["class"](key)
# AEAD types (eg GCM) use their algorithm class /as/ the encryption
# engine (they expose the same encrypt/decrypt API as a CipherContext)
if aead:
return algorithm
# All others go through the Cipher class.
cipher = Cipher(
algorithm=algorithm,
# TODO: why is this getting tickled in aesgcm mode???
mode=info["mode"](iv),
backend=default_backend(),
)
if operation is self._ENCRYPT:
return cipher.encryptor()
else:
cipher = Cipher(
self._cipher_info[name]["class"](key),
self._cipher_info[name]["mode"](iv),
backend=default_backend(),
)
if operation is self._ENCRYPT:
return cipher.encryptor()
else:
return cipher.decryptor()

def _get_aead_cipher(self, name, key):
aead_cipher = self._cipher_info[name]["class"](key)
return aead_cipher
return cipher.decryptor()

def _set_forward_agent_handler(self, handler):
if handler is None:
Expand Down Expand Up @@ -2738,35 +2733,27 @@ def _parse_kex_init(self, m):
def _activate_inbound(self):
"""switch on newly negotiated encryption parameters for
inbound traffic"""
block_size = self._cipher_info[self.remote_cipher]["block-size"]
info = self._cipher_info[self.remote_cipher]
aead = info.get("is_aead", False)
block_size = info["block-size"]
key_size = info["key-size"]
# Non-AEAD/GCM type ciphers' IV size is their block size.
iv_size = info.get("iv-size", block_size)
if self.server_mode:
IV_in = self._compute_key(
"A", self._cipher_info[self.remote_cipher]["iv-size"]
)
key_in = self._compute_key(
"C", self._cipher_info[self.remote_cipher]["key-size"]
)
iv_in = self._compute_key("A", iv_size)
key_in = self._compute_key("C", key_size)
else:
IV_in = self._compute_key(
"B", self._cipher_info[self.remote_cipher]["iv-size"]
)
key_in = self._compute_key(
"D", self._cipher_info[self.remote_cipher]["key-size"]
)

is_aead = (
True
if self._cipher_info[self.remote_cipher].get("is_aead")
else False
iv_in = self._compute_key("B", iv_size)
key_in = self._compute_key("D", key_size)

engine = self._get_engine(
name=self.remote_cipher,
key=key_in,
iv=iv_in,
operation=self._DECRYPT,
aead=aead,
)

if is_aead:
engine = self._get_aead_cipher(self.remote_cipher, key_in)
else:
engine = self._get_cipher(
self.remote_cipher, key_in, IV_in, self._DECRYPT
)
etm = "etm@openssh.com" in self.remote_mac
etm = (not aead) and "etm@openssh.com" in self.remote_mac
mac_size = self._mac_info[self.remote_mac]["size"]
mac_engine = self._mac_info[self.remote_mac]["class"]
# initial mac keys are done in the hash's natural size (not the
Expand All @@ -2776,22 +2763,16 @@ def _activate_inbound(self):
else:
mac_key = self._compute_key("F", mac_engine().digest_size)

if is_aead:
self._log(DEBUG, "use aead-cipher, so set mac to None")
self.packetizer.set_inbound_cipher(
engine,
block_size,
None,
16,
bytes(),
etm=False,
aead=is_aead,
iv_in=IV_in,
)
else:
self.packetizer.set_inbound_cipher(
engine, block_size, mac_engine, mac_size, mac_key, etm=etm
)
self.packetizer.set_inbound_cipher(
block_engine=engine,
block_size=block_size,
mac_engine=None if aead else mac_engine,
mac_size=16 if aead else mac_size,
mac_key=None if aead else mac_key,
etm=etm,
aead=aead,
iv_in=iv_in if aead else None,
)

compress_in = self._compression_info[self.remote_compression][1]
if compress_in is not None and (
Expand Down Expand Up @@ -2820,35 +2801,27 @@ def _activate_outbound(self):
"Resetting outbound seqno after NEWKEYS due to strict mode",
)
self.packetizer.reset_seqno_out()
block_size = self._cipher_info[self.local_cipher]["block-size"]
info = self._cipher_info[self.local_cipher]
aead = info.get("is_aead", False)
block_size = info["block-size"]
key_size = info["key-size"]
# Non-AEAD/GCM type ciphers' IV size is their block size.
iv_size = info.get("iv-size", block_size)
if self.server_mode:
IV_out = self._compute_key(
"B", self._cipher_info[self.local_cipher]["iv-size"]
)
key_out = self._compute_key(
"D", self._cipher_info[self.local_cipher]["key-size"]
)
iv_out = self._compute_key("B", iv_size)
key_out = self._compute_key("D", key_size)
else:
IV_out = self._compute_key(
"A", self._cipher_info[self.local_cipher]["iv-size"]
)
key_out = self._compute_key(
"C", self._cipher_info[self.local_cipher]["key-size"]
)

is_aead = (
True
if self._cipher_info[self.local_cipher].get("is_aead")
else False
iv_out = self._compute_key("A", iv_size)
key_out = self._compute_key("C", key_size)

engine = self._get_engine(
name=self.local_cipher,
key=key_out,
iv=iv_out,
operation=self._ENCRYPT,
aead=aead,
)

if is_aead:
engine = self._get_aead_cipher(self.local_cipher, key_out)
else:
engine = self._get_cipher(
self.local_cipher, key_out, IV_out, self._ENCRYPT
)
etm = "etm@openssh.com" in self.local_mac
etm = (not aead) and "etm@openssh.com" in self.local_mac
mac_size = self._mac_info[self.local_mac]["size"]
mac_engine = self._mac_info[self.local_mac]["class"]
# initial mac keys are done in the hash's natural size (not the
Expand All @@ -2859,28 +2832,17 @@ def _activate_outbound(self):
mac_key = self._compute_key("E", mac_engine().digest_size)
sdctr = self.local_cipher.endswith("-ctr")

if is_aead:
self.packetizer.set_outbound_cipher(
engine,
block_size,
None,
16,
bytes(),
sdctr,
etm=False,
aead=is_aead,
iv_out=IV_out,
)
else:
self.packetizer.set_outbound_cipher(
engine,
block_size,
mac_engine,
mac_size,
mac_key,
sdctr,
etm=etm,
)
self.packetizer.set_outbound_cipher(
block_engine=engine,
block_size=block_size,
mac_engine=None if aead else mac_engine,
mac_size=16 if aead else mac_size,
mac_key=None if aead else mac_key,
sdctr=sdctr,
etm=etm,
aead=aead,
iv_out=iv_out if aead else None,
)

compress_out = self._compression_info[self.local_compression][0]
if compress_out is not None and (
Expand Down

0 comments on commit 977913f

Please sign in to comment.