Skip to content

Commit

Permalink
use a set literal when testing for membership
Browse files Browse the repository at this point in the history
  • Loading branch information
giampaolo committed Dec 19, 2024
1 parent 8162905 commit 1a63407
Show file tree
Hide file tree
Showing 18 changed files with 61 additions and 62 deletions.
2 changes: 1 addition & 1 deletion psutil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def __str__(self):
except AccessDenied:
pass

if self._exitcode not in (_SENTINEL, None):
if self._exitcode not in {_SENTINEL, None}:
info["exitcode"] = self._exitcode
if self._create_time is not None:
info['started'] = _pprint_secs(self._create_time)
Expand Down
8 changes: 4 additions & 4 deletions psutil/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ def isfile_strict(path):
try:
st = os.stat(path)
except OSError as err:
if err.errno in (errno.EPERM, errno.EACCES):
if err.errno in {errno.EPERM, errno.EACCES}:
raise
return False
else:
Expand All @@ -562,7 +562,7 @@ def path_exists_strict(path):
try:
os.stat(path)
except OSError as err:
if err.errno in (errno.EPERM, errno.EACCES):
if err.errno in {errno.EPERM, errno.EACCES}:
raise
return False
else:
Expand Down Expand Up @@ -639,12 +639,12 @@ def socktype_to_enum(num):

def conn_to_ntuple(fd, fam, type_, laddr, raddr, status, status_map, pid=None):
"""Convert a raw connection tuple to a proper ntuple."""
if fam in (socket.AF_INET, AF_INET6):
if fam in {socket.AF_INET, AF_INET6}:
if laddr:
laddr = addr(*laddr)
if raddr:
raddr = addr(*raddr)
if type_ == socket.SOCK_STREAM and fam in (AF_INET, AF_INET6):
if type_ == socket.SOCK_STREAM and fam in {AF_INET, AF_INET6}:
status = status_map.get(status, CONN_NONE)
else:
status = CONN_NONE # ignore whatever C returned to us
Expand Down
2 changes: 1 addition & 1 deletion psutil/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def ProcessLookupError(inst):

@_instance_checking_exception(EnvironmentError)
def PermissionError(inst):
return getattr(inst, 'errno', _SENTINEL) in (errno.EACCES, errno.EPERM)
return getattr(inst, 'errno', _SENTINEL) in {errno.EACCES, errno.EPERM}

@_instance_checking_exception(EnvironmentError)
def InterruptedError(inst):
Expand Down
2 changes: 1 addition & 1 deletion psutil/_psbsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ def cpu_affinity_set(self, cpus):
# <<the call would leave a thread without a valid CPU to run
# on because the set does not overlap with the thread's
# anonymous mask>>
if err.errno in (errno.EINVAL, errno.EDEADLK):
if err.errno in {errno.EINVAL, errno.EDEADLK}:
for cpu in cpus:
if cpu not in allcpus:
raise ValueError(
Expand Down
8 changes: 4 additions & 4 deletions psutil/_pslinux.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ def retrieve(self, kind, pid=None):
ret = set()
for proto_name, family, type_ in self.tmap[kind]:
path = "%s/net/%s" % (self._procfs_path, proto_name)
if family in (socket.AF_INET, socket.AF_INET6):
if family in {socket.AF_INET, socket.AF_INET6}:
ls = self.process_inet(
path, family, type_, inodes, filter_pid=pid
)
Expand Down Expand Up @@ -1358,7 +1358,7 @@ def disk_partitions(all=False):
device, mountpoint, fstype, opts = partition
if device == 'none':
device = ''
if device in ("/dev/root", "rootfs"):
if device in {"/dev/root", "rootfs"}:
device = RootFsDeviceFinder().find() or device
if not all:
if not device or fstype not in fstypes:
Expand Down Expand Up @@ -1589,7 +1589,7 @@ def multi_bcat(*paths):
status = cat(root + "/status", fallback="").strip().lower()
if status == "discharging":
power_plugged = False
elif status in ("charging", "full"):
elif status in {"charging", "full"}:
power_plugged = True

# Seconds left.
Expand Down Expand Up @@ -2256,7 +2256,7 @@ def ionice_get(self):
def ionice_set(self, ioclass, value):
if value is None:
value = 0
if value and ioclass in (IOPRIO_CLASS_IDLE, IOPRIO_CLASS_NONE):
if value and ioclass in {IOPRIO_CLASS_IDLE, IOPRIO_CLASS_NONE}:
raise ValueError("%r ioclass accepts no value" % ioclass)
if value < 0 or value > 7:
msg = "value not in 0-7 range"
Expand Down
6 changes: 3 additions & 3 deletions psutil/_pssunos.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def net_connections(kind, _pid=-1):
if type_ not in types:
continue
# TODO: refactor and use _common.conn_to_ntuple.
if fam in (AF_INET, AF_INET6):
if fam in {AF_INET, AF_INET6}:
if laddr:
laddr = _common.addr(*laddr)
if raddr:
Expand Down Expand Up @@ -476,7 +476,7 @@ def nice_get(self):

@wrap_exceptions
def nice_set(self, value):
if self.pid in (2, 3):
if self.pid in {2, 3}:
# Special case PIDs: internally setpriority(3) return ESRCH
# (no such process), no matter what.
# The process actually exists though, as it has a name,
Expand Down Expand Up @@ -678,7 +678,7 @@ def net_connections(self, kind='inet'):
os.stat('%s/%s' % (self._procfs_path, self.pid))

# UNIX sockets
if kind in ('all', 'unix'):
if kind in {'all', 'unix'}:
ret.extend([
_common.pconn(*conn)
for conn in self._get_unix_sockets(self.pid)
Expand Down
24 changes: 12 additions & 12 deletions psutil/_pswindows.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,10 @@ def _wrap_exceptions(self):
% self._name
)
raise AccessDenied(pid=None, name=self._name, msg=msg)
elif err.winerror in (
elif err.winerror in {
cext.ERROR_INVALID_NAME,
cext.ERROR_SERVICE_DOES_NOT_EXIST,
):
}:
msg = "service %r does not exist" % self._name
raise NoSuchProcess(pid=None, name=self._name, msg=msg)
else:
Expand Down Expand Up @@ -697,15 +697,15 @@ def as_dict(self):
def is_permission_err(exc):
"""Return True if this is a permission error."""
assert isinstance(exc, OSError), exc
if exc.errno in (errno.EPERM, errno.EACCES):
if exc.errno in {errno.EPERM, errno.EACCES}:
return True
# On Python 2 OSError doesn't always have 'winerror'. Sometimes
# it does, in which case the original exception was WindowsError
# (which is a subclass of OSError).
return getattr(exc, "winerror", -1) in (
return getattr(exc, "winerror", -1) in {
cext.ERROR_ACCESS_DENIED,
cext.ERROR_PRIVILEGE_NOT_HELD,
)
}


def convert_oserror(exc, pid=None, name=None):
Expand Down Expand Up @@ -919,10 +919,10 @@ def send_signal(self, sig):
if sig == signal.SIGTERM:
cext.proc_kill(self.pid)
# py >= 2.7
elif sig in (
elif sig in {
getattr(signal, "CTRL_C_EVENT", object()),
getattr(signal, "CTRL_BREAK_EVENT", object()),
):
}:
os.kill(self.pid, sig)
else:
msg = (
Expand Down Expand Up @@ -976,7 +976,7 @@ def wait(self, timeout=None):

@wrap_exceptions
def username(self):
if self.pid in (0, 4):
if self.pid in {0, 4}:
return 'NT AUTHORITY\\SYSTEM'
domain, user = cext.proc_username(self.pid)
return py2_strencode(domain) + '\\' + py2_strencode(user)
Expand Down Expand Up @@ -1034,7 +1034,7 @@ def resume(self):
@wrap_exceptions
@retry_error_partial_copy
def cwd(self):
if self.pid in (0, 4):
if self.pid in {0, 4}:
raise AccessDenied(self.pid, self._name)
# return a normalized pathname since the native C function appends
# "\\" at the and of the path
Expand All @@ -1043,7 +1043,7 @@ def cwd(self):

@wrap_exceptions
def open_files(self):
if self.pid in (0, 4):
if self.pid in {0, 4}:
return []
ret = set()
# Filenames come in in native format like:
Expand Down Expand Up @@ -1087,12 +1087,12 @@ def ionice_set(self, ioclass, value):
if value:
msg = "value argument not accepted on Windows"
raise TypeError(msg)
if ioclass not in (
if ioclass not in {
IOPRIO_VERYLOW,
IOPRIO_LOW,
IOPRIO_NORMAL,
IOPRIO_HIGH,
):
}:
raise ValueError("%s is not a valid priority" % ioclass)
cext.proc_io_priority_set(self.pid, ioclass)

Expand Down
18 changes: 9 additions & 9 deletions psutil/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def macos_version():
INVALID_UNICODE_SUFFIX = b"f\xc0\x80".decode('utf8', 'surrogateescape')
else:
INVALID_UNICODE_SUFFIX = "f\xc0\x80"
ASCII_FS = sys.getfilesystemencoding().lower() in ('ascii', 'us-ascii')
ASCII_FS = sys.getfilesystemencoding().lower() in {'ascii', 'us-ascii'}

# --- paths

Expand Down Expand Up @@ -1739,11 +1739,11 @@ def get_free_port(host='127.0.0.1'):

def bind_socket(family=AF_INET, type=SOCK_STREAM, addr=None):
"""Binds a generic socket."""
if addr is None and family in (AF_INET, AF_INET6):
if addr is None and family in {AF_INET, AF_INET6}:
addr = ("", 0)
sock = socket.socket(family, type)
try:
if os.name not in ('nt', 'cygwin'):
if os.name not in {'nt', 'cygwin'}:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(addr)
if type == socket.SOCK_STREAM:
Expand Down Expand Up @@ -1874,7 +1874,7 @@ def check_connection_ntuple(conn):

def check_ntuple(conn):
has_pid = len(conn) == 7
assert len(conn) in (6, 7), len(conn)
assert len(conn) in {6, 7}, len(conn)
assert conn[0] == conn.fd, conn.fd
assert conn[1] == conn.family, conn.family
assert conn[2] == conn.type, conn.type
Expand All @@ -1885,7 +1885,7 @@ def check_ntuple(conn):
assert conn[6] == conn.pid, conn.pid

def check_family(conn):
assert conn.family in (AF_INET, AF_INET6, AF_UNIX), conn.family
assert conn.family in {AF_INET, AF_INET6, AF_UNIX}, conn.family
if enum is not None:
assert isinstance(conn.family, enum.IntEnum), conn
else:
Expand All @@ -1908,11 +1908,11 @@ def check_family(conn):
def check_type(conn):
# SOCK_SEQPACKET may happen in case of AF_UNIX socks
SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
assert conn.type in (
assert conn.type in {
socket.SOCK_STREAM,
socket.SOCK_DGRAM,
SOCK_SEQPACKET,
), conn.type
}, conn.type
if enum is not None:
assert isinstance(conn.type, enum.IntEnum), conn
else:
Expand All @@ -1923,7 +1923,7 @@ def check_type(conn):
def check_addrs(conn):
# check IP address and port sanity
for addr in (conn.laddr, conn.raddr):
if conn.family in (AF_INET, AF_INET6):
if conn.family in {AF_INET, AF_INET6}:
assert isinstance(addr, tuple), type(addr)
if not addr:
continue
Expand All @@ -1939,7 +1939,7 @@ def check_status(conn):
getattr(psutil, x) for x in dir(psutil) if x.startswith('CONN_')
]
assert conn.status in valids, conn.status
if conn.family in (AF_INET, AF_INET6) and conn.type == SOCK_STREAM:
if conn.family in {AF_INET, AF_INET6} and conn.type == SOCK_STREAM:
assert conn.status != psutil.CONN_NONE, conn.status
else:
assert conn.status == psutil.CONN_NONE, conn.status
Expand Down
14 changes: 7 additions & 7 deletions psutil/tests/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

def this_proc_net_connections(kind):
cons = psutil.Process().net_connections(kind=kind)
if kind in ("all", "unix"):
if kind in {"all", "unix"}:
return filter_proc_net_connections(cons)
return cons

Expand Down Expand Up @@ -430,7 +430,7 @@ def test_count(self):
cons = this_proc_net_connections(kind='tcp')
assert len(cons) == (2 if supports_ipv6() else 1)
for conn in cons:
assert conn.family in (AF_INET, AF_INET6)
assert conn.family in {AF_INET, AF_INET6}
assert conn.type == SOCK_STREAM
# tcp4
cons = this_proc_net_connections(kind='tcp4')
Expand All @@ -447,7 +447,7 @@ def test_count(self):
cons = this_proc_net_connections(kind='udp')
assert len(cons) == (2 if supports_ipv6() else 1)
for conn in cons:
assert conn.family in (AF_INET, AF_INET6)
assert conn.family in {AF_INET, AF_INET6}
assert conn.type == SOCK_DGRAM
# udp4
cons = this_proc_net_connections(kind='udp4')
Expand All @@ -464,23 +464,23 @@ def test_count(self):
cons = this_proc_net_connections(kind='inet')
assert len(cons) == (4 if supports_ipv6() else 2)
for conn in cons:
assert conn.family in (AF_INET, AF_INET6)
assert conn.type in (SOCK_STREAM, SOCK_DGRAM)
assert conn.family in {AF_INET, AF_INET6}
assert conn.type in {SOCK_STREAM, SOCK_DGRAM}
# inet6
if supports_ipv6():
cons = this_proc_net_connections(kind='inet6')
assert len(cons) == 2
for conn in cons:
assert conn.family == AF_INET6
assert conn.type in (SOCK_STREAM, SOCK_DGRAM)
assert conn.type in {SOCK_STREAM, SOCK_DGRAM}
# Skipped on BSD becayse by default the Python process
# creates a UNIX socket to '/var/run/log'.
if HAS_NET_CONNECTIONS_UNIX and not (FREEBSD or NETBSD):
cons = this_proc_net_connections(kind='unix')
assert len(cons) == 3
for conn in cons:
assert conn.family == AF_UNIX
assert conn.type in (SOCK_STREAM, SOCK_DGRAM)
assert conn.type in {SOCK_STREAM, SOCK_DGRAM}


@pytest.mark.skipif(SKIP_SYSCONS, reason="requires root")
Expand Down
2 changes: 1 addition & 1 deletion psutil/tests/test_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -1556,7 +1556,7 @@ def test_users(self):
# Make sure the C extension converts ':0' and ':0.0' to
# 'localhost'.
for user in psutil.users():
assert user.host not in (":0", ":0.0")
assert user.host not in {":0", ":0.0"}

def test_procfs_path(self):
tdir = self.get_testfn()
Expand Down
4 changes: 2 additions & 2 deletions psutil/tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,14 @@ class TestMisc(PsutilTestCase):
def test__all__(self):
dir_psutil = dir(psutil)
for name in dir_psutil:
if name in (
if name in {
'debug',
'long',
'tests',
'test',
'PermissionError',
'ProcessLookupError',
):
}:
continue
if not name.startswith('_'):
try:
Expand Down
2 changes: 1 addition & 1 deletion psutil/tests/test_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def test_create_time(self):
round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
round_time_psutil
).strftime("%H:%M:%S")
assert time_ps in [time_psutil_tstamp, round_time_psutil_tstamp]
assert time_ps in {time_psutil_tstamp, round_time_psutil_tstamp}

def test_exe(self):
ps_pathname = ps_name(self.pid)
Expand Down
Loading

0 comments on commit 1a63407

Please sign in to comment.