Skip to content

Commit

Permalink
acl class
Browse files Browse the repository at this point in the history
  • Loading branch information
cunla committed Oct 9, 2024
1 parent 37f0d1f commit b8fc586
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
55 changes: 43 additions & 12 deletions fakeredis/_acl.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import hashlib
from typing import Dict, Set, List, Tuple

from fakeredis._command_info import get_commands_by_category


class UserAccessControlList:
def __init__(self):
self._passwords: Set[str] = set()
self._passwords: Set[bytes] = set()
self._enabled: bool = True
self._nopass: bool = False
self._key_patterns: Set[bytes] = set()
Expand All @@ -30,28 +32,33 @@ def set_nopass(self) -> None:
self._passwords.clear()

def check_password(self, password: bytes) -> bool:
password_hex = hashlib.sha256(password).hexdigest()
password_hex = hashlib.sha256(password).hexdigest().encode()

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic hashing algorithm on sensitive data High

Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.
return password_hex in self._passwords and self._enabled

def add_password_hex(self, password_hex: str) -> None:
def add_password_hex(self, password_hex: bytes) -> None:
self._nopass = True
self._passwords.add(password_hex)

def add_password(self, password: bytes) -> None:
password_hex = hashlib.sha256(password).hexdigest()
password_hex = hashlib.sha256(password).hexdigest().encode()

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic hashing algorithm on sensitive data High

Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.
self.add_password_hex(password_hex)

def remove_password_hex(self, password_hex: str) -> None:
def remove_password_hex(self, password_hex: bytes) -> None:
self._passwords.discard(password_hex)

def remove_password(self, password: bytes) -> None:
password_hex = hashlib.sha256(password).hexdigest()
password_hex = hashlib.sha256(password).hexdigest().encode()

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic hashing algorithm on sensitive data High

Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.
self.remove_password_hex(password_hex)

def add_command_or_category(self, selector: bytes) -> None:
enabled, command = selector[0] == ord("+"), selector[1:]
if command[0] == ord("@"):
self._categories[command[1:]] = enabled
category = command[1:]
self._categories[category] = enabled
category_commands = get_commands_by_category(category)
for command in category_commands:
if command in self._commands:
del self._commands[command]
else:
self._commands[command] = enabled

Expand All @@ -67,12 +74,31 @@ def reset_channels_patterns(self):
def add_channel_pattern(self, channel_pattern: bytes) -> None:
self._channel_patterns.add(channel_pattern)

def add_selector(self, selector: bytes) -> None:
command, data = selector.split(b" ", 1)
allowed, command = command[0] == ord("+"), command[1:]
data = data.split(b" ")
keys = []
channels = []
for item in data:
if item.startswith(b"&"):
channels.append(item)
continue
if item.startswith(b"%RW"):
item = item[3:]
key = item
if key.startswith(b"%"):
key = key[2:]
if key.startswith(b"~"):
keys.append(item)
self._selectors[command] = (allowed, b" ".join(keys), b" ".join(channels))

def as_rule(self) -> bytes:
results = []
results.append(b"on" if self._enabled else b"off")
if self._nopass:
results.append(b"nopass")
results.extend(b"#" + password.encode() for password in self._passwords)
results.extend(b"#" + password for password in self._passwords)
results.extend(b"~" + key_pattern for key_pattern in self._key_patterns)
if len(self._channel_patterns) == 0:
results.append(b"resetchannels")
Expand All @@ -87,8 +113,10 @@ def as_rule(self) -> bytes:
def _get_selectors(self) -> List[List[bytes]]:
results = []
for command, data in self._selectors.items():
selector = (b"+" if data[0] else b"-") + command
results.append([selector, b"keys", b"~" + data[1], b"channels", b"&" + data[2]])
selector = b"-@all " + (b"+" if data[0] else b"-") + command
keys = data[1]
channel = data[2]
results.append([b"commands", selector, b"keys", keys, b"channels", channel])
return results

def _get_commands(self) -> List[bytes]:
Expand All @@ -104,6 +132,9 @@ def _get_commands(self) -> List[bytes]:
def _get_key_patterns(self) -> List[bytes]:
return [b"~" + key_pattern for key_pattern in self._key_patterns]

def _get_channel_patterns(self):
return [b"&" + channel_pattern for channel_pattern in self._channel_patterns]

def as_array(self) -> List[bytes]:
flags = list()
flags.append(b"on" if self._enabled else b"off")
Expand All @@ -115,10 +146,10 @@ def as_array(self) -> List[bytes]:
flags.append(b"allchannels")
results = list()
results.extend([b"flags", flags])
results.extend([b"passwords", [b"#" + password.encode() for password in self._passwords]])
results.extend([b"passwords", list(self._passwords)])
results.extend([b"commands", b" ".join(self._get_commands())])
results.extend([b"keys", b" ".join(self._get_key_patterns())])
results.extend([b"channels", self._channel_patterns or [b""]])
results.extend([b"channels", b" ".join(self._get_channel_patterns())])
results.extend([b"selectors", self._get_selectors()])
return results

Expand Down
4 changes: 3 additions & 1 deletion fakeredis/commands_mixins/acl_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def acl_setuser(self, username: bytes, *args: bytes) -> bytes:
if casematch(arg, b"resetchannels"):
user_acl.reset_channels_patterns()
continue

elif casematch(arg, b"resetkeys"):
user_acl.reset_key_patterns()
continue
Expand All @@ -96,6 +95,9 @@ def acl_setuser(self, username: bytes, *args: bytes) -> bytes:
arg = b"~*"
elif casematch(arg, b"allchannels"):
arg = b"&*"
elif arg[0] == ord("(") and arg[-1] == ord(")"):
user_acl.add_selector(arg[1:-1])
continue

prefix = arg[0]
if prefix == ord(">"):
Expand Down
18 changes: 17 additions & 1 deletion test/test_mixins/test_acl_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,20 @@ def test_acl_getuser_setuser(r: redis.Redis):
r.acl_deluser(username)
assert acl["selectors"] == [["commands", "-@all +set", "keys", "%W~app*", "channels", ""]]

# [{"commands": "-@all +set", "keys": "%W~app*", "channels": ""}],
assert r.acl_setuser(
username,
enabled=True,
reset=True,
passwords=["+pass1", "+pass2"],
categories=["+set", "+@hash", "-geo"],
commands=["+get", "+mget", "-hset"],
keys=["cache:*", "objects:*"],
channels=["message:*"],
selectors=[("+set", "%W~app*"), ("+get", "%RW~app* &x"), ("-hset", "%W~app*")],
)
acl = r.acl_getuser(username)
assert acl["selectors"] == [
["commands", "-@all +set", "keys", "%W~app*", "channels", ""],
["commands", "-@all +get", "keys", "~app*", "channels", "&x"],
["commands", "-@all -hset", "keys", "%W~app*", "channels", ""],
]

0 comments on commit b8fc586

Please sign in to comment.