Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added "ACL LOG" command new in Redis 6 RC2 #1307

Closed
wants to merge 12 commits into from
61 changes: 61 additions & 0 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,43 @@ def parse_acl_getuser(response, **options):
return data


def parse_acl_log(response, **options):
if response is None:
return None
if isinstance(response, list):
data = []
for log in response:
log_data = pairs_to_dict(log, True, True)
client_info = log_data.get('client-info', '')
log_data["client-info"] = parse_client_info(client_info)

# float() is lossy comparing to the "double" in C
log_data["age-seconds"] = float(log_data["age-seconds"])
data.append(log_data)
else:
data = bool_ok(response)
return data


def parse_client_info(value):
"""
Parsing client-info in ACL Log in following format.
"key1=value1 key2=value2 key3=value3"
"""
client_info = {}
infos = value.split(" ")
for info in infos:
key, value = info.split("=")
client_info[key] = value

# Those fields are definded as int in networking.c
for int_key in {"id", "age", "idle", "db", "sub", "psub",
"multi", "qbuf", "qbuf-free", "obl",
"oll", "omem"}:
client_info[int_key] = int(client_info[int_key])
return client_info


def parse_module_result(response):
if isinstance(response, ModuleError):
raise response
Expand Down Expand Up @@ -585,6 +622,7 @@ class Redis(object):
'ACL GETUSER': parse_acl_getuser,
'ACL LIST': lambda r: list(map(nativestr, r)),
'ACL LOAD': bool_ok,
'ACL LOG': parse_acl_log,
'ACL SAVE': bool_ok,
'ACL SETUSER': bool_ok,
'ACL USERS': lambda r: list(map(nativestr, r)),
Expand Down Expand Up @@ -966,6 +1004,29 @@ def acl_list(self):
"Return a list of all ACLs on the server"
return self.execute_command('ACL LIST')

def acl_log(self, count=None):
"""
Get ACL logs as a list.
:param int count: Get logs[0:count].
:rtype: List.
"""
args = []
if count is not None:
if not isinstance(count, int):
raise DataError('ACL LOG count must be an '
'integer')
args.append(count)

return self.execute_command('ACL LOG', *args)

def acl_log_reset(self):
"""
Reset ACL logs.
:rtype: Boolean.
"""
args = [b'RESET']
return self.execute_command('ACL LOG', *args)

def acl_load(self):
"""
Load ACL rules from the configured ``aclfile``.
Expand Down
35 changes: 35 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,41 @@ def teardown():
users = r.acl_list()
assert 'user %s off -@all' % username in users

@skip_if_server_version_lt(REDIS_6_VERSION)
def test_acl_log(self, r, request):
username = 'redis-py-user'

def teardown():
r.acl_deluser(username)

request.addfinalizer(teardown)
r.acl_setuser(username, enabled=True, reset=True,
commands=['+get', '+set', '+select'],
keys=['cache:*'], nopass=True)
r.acl_log_reset()

r_test = redis.Redis(host='master', port=6379, db=9,
username=username)

# Valid operation and key
r_test.set('cache:0', 1)
r_test.get('cache:0')

# Invalid key
with pytest.raises(exceptions.NoPermissionError):
r_test.get('violated_cache:0')

# Invalid operation
with pytest.raises(exceptions.NoPermissionError):
r_test.hset('cache:0', 'hkey', 'hval')

assert isinstance(r.acl_log(), list)
assert len(r.acl_log()) == 2
assert len(r.acl_log(count=1)) == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to assert that acl_log() returns a dict and that a common set of keys exist. Additionally we should check that the 'click-info' value is a dict as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added few return type assertion now.

assert isinstance(r.acl_log()[0], dict)
assert 'client-info' in r.acl_log(count=1)[0]
assert r.acl_log_reset()

@skip_if_server_version_lt(REDIS_6_VERSION)
def test_acl_setuser_categories_without_prefix_fails(self, r, request):
username = 'redis-py-user'
Expand Down