Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added "ACL LOG" command new in Redis 6 RC2 #1307

Closed
wants to merge 12 commits into from
61 changes: 61 additions & 0 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,43 @@ def parse_acl_getuser(response, **options):
return data


def parse_acl_log(response, **options):
if response is None:
return None
if isinstance(response, list):
data = []
for log in response:
log_data = pairs_to_dict(log, True, True)
client_info = log_data.get('client-info', '')
log_data["client-info"] = parse_client_info(client_info)

# float() is lossy comparing to the "double" in C
log_data["age-seconds"] = float(log_data["age-seconds"])
data.append(log_data)
else:
data = bool_ok(response)
return data


def parse_client_info(value):
"""
Parsing client-info in ACL Log in following format.
"key1=value1 key2=value2 key3=value3"
"""
client_info = {}
infos = value.split(" ")
for info in infos:
key, value = info.split("=")
client_info[key] = value

# Those fields are definded as int in networking.c
for int_key in {"id", "age", "idle", "db", "sub", "psub",
"multi", "qbuf", "qbuf-free", "obl",
"oll", "omem"}:
client_info[int_key] = int(client_info[int_key])
return client_info


class Redis(object):
"""
Implementation of the Redis protocol.
Expand Down Expand Up @@ -578,6 +615,7 @@ class Redis(object):
'ACL GETUSER': parse_acl_getuser,
'ACL LIST': lambda r: list(map(nativestr, r)),
'ACL LOAD': bool_ok,
'ACL LOG': parse_acl_log,
'ACL SAVE': bool_ok,
'ACL SETUSER': bool_ok,
'ACL USERS': lambda r: list(map(nativestr, r)),
Expand Down Expand Up @@ -955,6 +993,29 @@ def acl_list(self):
"Return a list of all ACLs on the server"
return self.execute_command('ACL LIST')

def acl_log(self, count=None):
"""
Get ACL logs as a list.
:param int count: Get logs[0:count].
:rtype: List.
"""
args = []
if count is not None:
if not isinstance(count, int):
raise DataError('ACL LOG count must be an '
'integer')
args.append(count)

return self.execute_command('ACL LOG', *args)

def acl_log_reset(self):
"""
Reset ACL logs.
:rtype: Boolean.
"""
args = [b'RESET']
return self.execute_command('ACL LOG', *args)

def acl_load(self):
"""
Load ACL rules from the configured ``aclfile``.
Expand Down
33 changes: 33 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,39 @@ def teardown():
users = r.acl_list()
assert 'user %s off -@all' % username in users

@skip_if_server_version_lt(REDIS_6_VERSION)
def test_acl_log(self, r, request):
username = 'redis-py-user'

def teardown():
r.acl_deluser(username)

request.addfinalizer(teardown)
r.acl_setuser(username, enabled=True, reset=True,
commands=['+get', '+set', '+select'],
keys=['cache:*'], nopass=True)
r.acl_log_reset()

r_test = redis.Redis(host='localhost', port=6379, db=9,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to hardcode connection info in a test. Instead, use the _get_client function from conftest.py. Something like:

r_test = _get_client(redis.Redis, request, username=username)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep please check the comment below

username=username)
# Valid operation and key
r_test.set("cache:0", 1)
r_test.get("cache:0")
# Invalid key
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be:

with pytest.raises(exceptions.NoPermissionError):
    r_test.get('violated_cache:0')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks.

r_test.get("violated_cache:0")
except exceptions.NoPermissionError:
pass
# Invalid operation
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above:

with pytest.raises(exceptions.NoPermissionError):
    r_test.hset('cache:0', 'hkey', 'hval')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks.

r_test.hset("cache:0", "hkey", "hval")
except exceptions.NoPermissionError:
pass

assert len(r.acl_log()) == 2
assert len(r.acl_log(count=1)) == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to assert that acl_log() returns a dict and that a common set of keys exist. Additionally we should check that the 'click-info' value is a dict as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added few return type assertion now.

assert r.acl_log_reset()

@skip_if_server_version_lt(REDIS_6_VERSION)
def test_acl_setuser_categories_without_prefix_fails(self, r, request):
username = 'redis-py-user'
Expand Down