diff --git a/build_helpers/check-smb.py b/build_helpers/check-smb.py index 6a7dd055..9d1db9d5 100755 --- a/build_helpers/check-smb.py +++ b/build_helpers/check-smb.py @@ -28,7 +28,7 @@ def test_connection(server, port): print("Connection attempt %d failed: %s" % (attempt, str(e))) attempt += 1 if attempt == total_attempts: - raise Exception("Timeout while waiting for SMB server to come " "online") + raise Exception("Timeout while waiting for SMB server to come online") print("Sleeping for 5 seconds before next attempt") time.sleep(5) diff --git a/src/smbclient/_io.py b/src/smbclient/_io.py index 5fa66ca1..b2032ae9 100644 --- a/src/smbclient/_io.py +++ b/src/smbclient/_io.py @@ -232,7 +232,7 @@ def _receive_resp(request): transaction += (set_req, _receive_resp) -class SMBFileTransaction(object): +class SMBFileTransaction: def __init__(self, raw): """ Stores compound requests in 1 class that can be committed when required. Either uses the opened raw object or @@ -392,7 +392,7 @@ def __init__( "file": CreateOptions.FILE_NON_DIRECTORY_FILE, }.get(self.FILE_TYPE, 0) - super(SMBRawIO, self).__init__() + super().__init__() def __enter__(self): self.open() diff --git a/src/smbclient/_os.py b/src/smbclient/_os.py index 51076b04..63d17741 100644 --- a/src/smbclient/_os.py +++ b/src/smbclient/_os.py @@ -1172,7 +1172,7 @@ def _set_basic_information( set_info(transaction, basic_info) -class SMBDirEntry(object): +class SMBDirEntry: def __init__(self, raw, dir_info, connection_cache=None): self._smb_raw = raw self._dir_info = dir_info diff --git a/src/smbclient/_pool.py b/src/smbclient/_pool.py index be603cb4..14e80395 100644 --- a/src/smbclient/_pool.py +++ b/src/smbclient/_pool.py @@ -38,7 +38,7 @@ class _ConfigSingleton(type): def __call__(cls, *args, **kwargs): if cls not in cls.__instances: - config = super(_ConfigSingleton, cls).__call__(*args, **kwargs) + config = super().__call__(*args, **kwargs) cls.__instances[cls] = config # Needs to be done after the config instance is in the singleton dict due to setting this value will kick diff --git a/src/smbprotocol/__init__.py b/src/smbprotocol/__init__.py index 579477e4..237787e0 100644 --- a/src/smbprotocol/__init__.py +++ b/src/smbprotocol/__init__.py @@ -3,15 +3,7 @@ # MIT License (see LICENSE or https://opensource.org/licenses/MIT) import logging - -try: - from logging import NullHandler -except ImportError: # pragma: no cover - - class NullHandler(logging.Handler): - def emit(self, record): - pass - +from logging import NullHandler logger = logging.getLogger(__name__) logger.addHandler(NullHandler()) @@ -20,7 +12,7 @@ def emit(self, record): MAX_PAYLOAD_SIZE = 65536 -class Dialects(object): +class Dialects: """ [MS-SMB2] v53.0 2017-09-15 diff --git a/src/smbprotocol/change_notify.py b/src/smbprotocol/change_notify.py index 5dd0731f..c7ecb31a 100644 --- a/src/smbprotocol/change_notify.py +++ b/src/smbprotocol/change_notify.py @@ -20,7 +20,7 @@ log = logging.getLogger(__name__) -class ChangeNotifyFlags(object): +class ChangeNotifyFlags: """ [MS-SMB2] 2.2.35 SMB2 CHANGE_NOTIFY Request - Flags https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-smb2/598f395a-e7a2-4cc8-afb3-ccb30dd2df7c @@ -30,7 +30,7 @@ class ChangeNotifyFlags(object): SMB2_WATCH_TREE = 0x0001 -class CompletionFilter(object): +class CompletionFilter: """ [MS-SMB2] 2.2.35 SMB2 CHANGE_NOTIFY Request - CompletionFilter https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-smb2/598f395a-e7a2-4cc8-afb3-ccb30dd2df7c @@ -50,7 +50,7 @@ class CompletionFilter(object): FILE_NOTIFY_CHANGE_STREAM_WRITE = 0x00000800 -class FileAction(object): +class FileAction: """ [MS-FSCC] 2.7.1 FILE_NOTIFY_INFORMATION - Action https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/634043d7-7b39-47e9-9e26-bda64685e4c9 @@ -102,7 +102,7 @@ def __init__(self): ), ] ) - super(FileNotifyInformation, self).__init__() + super().__init__() class SMB2ChangeNotifyRequest(Structure): @@ -144,7 +144,7 @@ def __init__(self): ("reserved", IntField(size=4)), ] ) - super(SMB2ChangeNotifyRequest, self).__init__() + super().__init__() class SMB2ChangeNotifyResponse(Structure): @@ -189,10 +189,10 @@ def __init__(self): ), ] ) - super(SMB2ChangeNotifyResponse, self).__init__() + super().__init__() -class FileSystemWatcher(object): +class FileSystemWatcher: def __init__(self, open): """ A class that encapsulates a FileSystemWatcher over SMB. It is designed to make it easy to run the watcher in @@ -254,7 +254,7 @@ def result(self): @property def cancelled(self): - """ States whether the change notify request was cancelled or not. """ "" + """States whether the change notify request was cancelled or not.""" return self._request is not None and self._request.cancelled is True def start(self, completion_filter, flags=0, output_buffer_length=65536, send=True): diff --git a/src/smbprotocol/connection.py b/src/smbprotocol/connection.py index ed9212a9..e105de6c 100644 --- a/src/smbprotocol/connection.py +++ b/src/smbprotocol/connection.py @@ -52,7 +52,7 @@ log = logging.getLogger(__name__) -class SecurityMode(object): +class SecurityMode: """ [MS-SMB2] v53.0 2017-09-15 @@ -64,7 +64,7 @@ class SecurityMode(object): SMB2_NEGOTIATE_SIGNING_REQUIRED = 0x0002 -class Capabilities(object): +class Capabilities: """ [MS-SMB2] v53.0 2017-09-15 @@ -81,7 +81,7 @@ class Capabilities(object): SMB2_GLOBAL_CAP_ENCRYPTION = 0x00000040 -class NegotiateContextType(object): +class NegotiateContextType: """ [MS-SMB2] v53.0 2017-09-15 @@ -98,7 +98,7 @@ class NegotiateContextType(object): SMB2_SIGNING_CAPABILITIES = 0x0008 -class HashAlgorithms(object): +class HashAlgorithms: """ [MS-SMB2] v53.0 2017-09-15 @@ -109,7 +109,7 @@ class HashAlgorithms(object): SHA_512 = 0x0001 -class Ciphers(object): +class Ciphers: """ [MS-SMB2] v53.0 2017-09-15 @@ -189,7 +189,7 @@ def __init__(self): ] ) - super(SMB2NegotiateRequest, self).__init__() + super().__init__() class SMB3NegotiateRequest(Structure): @@ -276,7 +276,7 @@ def __init__(self): ), ] ) - super(SMB3NegotiateRequest, self).__init__() + super().__init__() def _negotiate_context_offset_value(self, structure): # The offset from the beginning of the SMB2 header to the first, 8-byte @@ -359,7 +359,7 @@ def __init__(self): ), ] ) - super(SMB2NegotiateContextRequest, self).__init__() + super().__init__() def _data_structure_type(self, structure): con_type = structure["context_type"].get_value() @@ -421,7 +421,7 @@ def __init__(self): ), ] ) - super(SMB2PreauthIntegrityCapabilities, self).__init__() + super().__init__() class SMB2EncryptionCapabilities(Structure): @@ -454,7 +454,7 @@ def __init__(self): ), ] ) - super(SMB2EncryptionCapabilities, self).__init__() + super().__init__() class SMB2NetnameNegotiateContextId(Structure): @@ -601,7 +601,7 @@ def __init__(self): ), ] ) - super(SMB2NegotiateResponse, self).__init__() + super().__init__() def _negotiate_context_count_value(self, structure): # If the dialect_revision is SMBv3.1.1, this field specifies the @@ -667,7 +667,7 @@ class SMB2Echo(Structure): def __init__(self): self.fields = OrderedDict([("structure_size", IntField(size=2, default=4)), ("reserved", IntField(size=2))]) - super(SMB2Echo, self).__init__() + super().__init__() class SMB2CancelRequest(Structure): @@ -694,7 +694,7 @@ def __init__(self): ("reserved", IntField(size=2)), ] ) - super(SMB2CancelRequest, self).__init__() + super().__init__() class SMB2TransformHeader(Structure): @@ -719,10 +719,10 @@ def __init__(self): ("data", BytesField()), # not in spec ] ) - super(SMB2TransformHeader, self).__init__() + super().__init__() -class Connection(object): +class Connection: def __init__(self, guid, server_name, port=445, require_signing=True): """ [MS-SMB2] v53.0 2017-09-15 @@ -1112,7 +1112,7 @@ def echo(self, sid=0, timeout=60, credit_request=1): :param credit_request: The number of credits to request :return: the credits that were granted by the server """ - log.info("Sending Echo request with a timeout of %d and credit " "request of %d" % (timeout, credit_request)) + log.info("Sending Echo request with a timeout of %d and credit request of %d" % (timeout, credit_request)) echo_msg = SMB2Echo() log.debug(echo_msg) @@ -1566,7 +1566,7 @@ def _send_smb2_negotiate(self, dialect, timeout, encryption_algorithms, signing_ int_cap["data"]["hash_algorithms"] = [HashAlgorithms.SHA_512] int_cap["data"]["salt"] = self.salt log.debug( - "Adding preauth integrity capabilities of hash SHA512 " "and salt %s to negotiate request" % self.salt + "Adding preauth integrity capabilities of hash SHA512 and salt %s to negotiate request" % self.salt ) enc_cap = SMB2NegotiateContextRequest() @@ -1574,7 +1574,7 @@ def _send_smb2_negotiate(self, dialect, timeout, encryption_algorithms, signing_ enc_cap["data"] = SMB2EncryptionCapabilities() supported_ciphers = encryption_algorithms enc_cap["data"]["ciphers"] = supported_ciphers - log.debug("Adding encryption capabilities of AES128|256 GCM and " "AES128|256 CCM to negotiate request") + log.debug("Adding encryption capabilities of AES128|256 GCM and AES128|256 CCM to negotiate request") netname_id = SMB2NegotiateContextRequest() netname_id["context_type"] = NegotiateContextType.SMB2_NETNAME_NEGOTIATE_CONTEXT_ID @@ -1656,7 +1656,7 @@ def _calculate_credit_charge(self, message): return credit_charge -class Request(object): +class Request: def __init__(self, message, message_type, connection, session_id=None): """ [MS-SMB2] v53.0 2017-09-15 diff --git a/src/smbprotocol/create_contexts.py b/src/smbprotocol/create_contexts.py index 3fab0586..6b0bb1a9 100644 --- a/src/smbprotocol/create_contexts.py +++ b/src/smbprotocol/create_contexts.py @@ -17,7 +17,7 @@ ) -class CreateContextName(object): +class CreateContextName: """ [MS-SMB2] v53.0 2017-09-15 @@ -73,7 +73,7 @@ def get_response_structure(name, size=None): }.get(name, None) -class EAFlags(object): +class EAFlags: """ [MS-FSCC] @@ -85,7 +85,7 @@ class EAFlags(object): FILE_NEED_EA = 0x00000080 -class LeaseState(object): +class LeaseState: """ [MS-SMB2] @@ -100,7 +100,7 @@ class LeaseState(object): SMB2_LEASE_WRITE_CACHING = 0x04 -class LeaseRequestFlags(object): +class LeaseRequestFlags: """ [MS-SMB2] @@ -111,7 +111,7 @@ class LeaseRequestFlags(object): SMB2_LEASE_FLAG_PARENT_LEASE_KEY_SET = 0x00000004 -class LeaseResponseFlags(object): +class LeaseResponseFlags: """ [MS-SMB2] @@ -122,7 +122,7 @@ class LeaseResponseFlags(object): SMB2_LEASE_FLAG_PARENT_LEASE_KEY_SET = 0x00000004 # V2 Response -class DurableHandleFlags(object): +class DurableHandleFlags: """ [MS-SMB2] @@ -133,7 +133,7 @@ class DurableHandleFlags(object): SMB2_DHANDLE_FLAG_PERSISTENT = 0x00000002 -class SVHDXOriginatorFlags(object): +class SVHDXOriginatorFlags: """ [MS-RSVD] 2.2.4.12 SVHDX_OPEN_DEVICE_CONTEXT OriginatorFlags Used to indicate which component has originated or issued the operations. @@ -177,7 +177,7 @@ def __init__(self): ), ] ) - super(SMB2CreateContextRequest, self).__init__() + super().__init__() def _buffer_data_offset(self, structure): if structure["data_length"].get_value() == 0: @@ -283,7 +283,7 @@ def __init__(self): ), ] ) - super(SMB2CreateEABuffer, self).__init__() + super().__init__() def _padding_size(self, structure): if structure["next_entry_offset"].get_value() == 0: @@ -331,7 +331,7 @@ class SMB2CreateDurableHandleRequest(Structure): def __init__(self): self.fields = OrderedDict([("durable_request", BytesField(size=16, default=b"\x00" * 16))]) - super(SMB2CreateDurableHandleRequest, self).__init__() + super().__init__() class SMB2CreateDurableHandleResponse(Structure): @@ -343,7 +343,7 @@ class SMB2CreateDurableHandleResponse(Structure): def __init__(self): self.fields = OrderedDict([("reserved", IntField(size=8))]) - super(SMB2CreateDurableHandleResponse, self).__init__() + super().__init__() class SMB2CreateDurableHandleReconnect(Structure): @@ -357,7 +357,7 @@ class SMB2CreateDurableHandleReconnect(Structure): def __init__(self): self.fields = OrderedDict([("data", BytesField(size=16))]) - super(SMB2CreateDurableHandleReconnect, self).__init__() + super().__init__() class SMB2CreateQueryMaximalAccessRequest(Structure): @@ -372,7 +372,7 @@ class SMB2CreateQueryMaximalAccessRequest(Structure): def __init__(self): self.fields = OrderedDict([("timestamp", DateTimeField())]) - super(SMB2CreateQueryMaximalAccessRequest, self).__init__() + super().__init__() class SMB2CreateQueryMaximalAccessResponse(Structure): @@ -391,7 +391,7 @@ def __init__(self): ("maximal_access", IntField(size=4)), ] ) - super(SMB2CreateQueryMaximalAccessResponse, self).__init__() + super().__init__() class SMB2CreateAllocationSize(Structure): @@ -406,7 +406,7 @@ class SMB2CreateAllocationSize(Structure): def __init__(self): self.fields = OrderedDict([("allocation_size", IntField(size=8))]) - super(SMB2CreateAllocationSize, self).__init__() + super().__init__() class SMB2CreateTimewarpToken(Structure): @@ -421,7 +421,7 @@ class SMB2CreateTimewarpToken(Structure): def __init__(self): self.fields = OrderedDict([("timestamp", DateTimeField())]) - super(SMB2CreateTimewarpToken, self).__init__() + super().__init__() class SMB2CreateRequestLease(Structure): @@ -442,7 +442,7 @@ def __init__(self): ("lease_duration", IntField(size=8)), ] ) - super(SMB2CreateRequestLease, self).__init__() + super().__init__() class SMB2CreateResponseLease(Structure): @@ -461,7 +461,7 @@ def __init__(self): ("lease_duration", IntField(size=8)), ] ) - super(SMB2CreateResponseLease, self).__init__() + super().__init__() class SMB2CreateQueryOnDiskIDResponse(Structure): @@ -481,7 +481,7 @@ def __init__(self): ("reserved", BytesField(size=16, default=b"\x00" * 16)), ] ) - super(SMB2CreateQueryOnDiskIDResponse, self).__init__() + super().__init__() class SMB2CreateRequestLeaseV2(Structure): @@ -507,7 +507,7 @@ def __init__(self): ("reserved", IntField(size=2)), ] ) - super(SMB2CreateRequestLeaseV2, self).__init__() + super().__init__() class SMB2CreateResponseLeaseV2(Structure): @@ -529,7 +529,7 @@ def __init__(self): ("reserved", IntField(size=2)), ] ) - super(SMB2CreateResponseLeaseV2, self).__init__() + super().__init__() class SMB2CreateDurableHandleRequestV2(Structure): @@ -553,7 +553,7 @@ def __init__(self): ("create_guid", UuidField(size=16)), ] ) - super(SMB2CreateDurableHandleRequestV2, self).__init__() + super().__init__() class SMB2CreateDurableHandleReconnectV2(Structure): @@ -574,7 +574,7 @@ def __init__(self): ("flags", FlagField(size=4, flag_type=DurableHandleFlags)), ] ) - super(SMB2CreateDurableHandleReconnectV2, self).__init__() + super().__init__() class SMB2CreateDurableHandleResponseV2(Structure): @@ -589,7 +589,7 @@ def __init__(self): self.fields = OrderedDict( [("timeout", IntField(size=4)), ("flags", FlagField(size=4, flag_type=DurableHandleFlags))] ) - super(SMB2CreateDurableHandleResponseV2, self).__init__() + super().__init__() class SMB2CreateAppInstanceId(Structure): @@ -611,7 +611,7 @@ def __init__(self): ("app_instance_id", BytesField(size=16)), ] ) - super(SMB2CreateAppInstanceId, self).__init__() + super().__init__() class SMB2SVHDXOpenDeviceContextRequest(Structure): @@ -638,7 +638,7 @@ def __init__(self): ("initiator_host_name", BytesField(size=lambda s: s["initiator_host_name_length"].get_value())), ] ) - super(SMB2SVHDXOpenDeviceContextRequest, self).__init__() + super().__init__() class SMB2SVHDXOpenDeviceContextResponse(Structure): @@ -665,7 +665,7 @@ def __init__(self): ("initiator_host_name", BytesField(size=lambda s: s["initiator_host_name_length"].get_value())), ] ) - super(SMB2SVHDXOpenDeviceContextResponse, self).__init__() + super().__init__() class SMB2SVHDXOpenDeviceContextV2Request(Structure): @@ -697,7 +697,7 @@ def __init__(self): ("virtual_size", IntField(size=8)), ] ) - super(SMB2SVHDXOpenDeviceContextV2Request, self).__init__() + super().__init__() class SMB2SVHDXOpenDeviceContextV2Response(Structure): @@ -729,7 +729,7 @@ def __init__(self): ("virtual_size", IntField(size=8)), ] ) - super(SMB2SVHDXOpenDeviceContextV2Response, self).__init__() + super().__init__() class SMB2CreateAppInstanceVersion(Structure): @@ -753,4 +753,4 @@ def __init__(self): ("app_instance_version_low", IntField(size=8)), ] ) - super(SMB2CreateAppInstanceVersion, self).__init__() + super().__init__() diff --git a/src/smbprotocol/dfs.py b/src/smbprotocol/dfs.py index 8b0b495e..6e46f1d1 100644 --- a/src/smbprotocol/dfs.py +++ b/src/smbprotocol/dfs.py @@ -221,7 +221,7 @@ def __init__(self): ("request_file_name", TextField(null_terminated=True)), ] ) - super(DFSReferralRequest, self).__init__() + super().__init__() class DFSReferralRequestEx(Structure): @@ -275,7 +275,7 @@ def __init__(self): ), ] ) - super(DFSReferralRequestEx, self).__init__() + super().__init__() class DFSReferralResponse(Structure): @@ -301,7 +301,7 @@ def __init__(self): ("string_buffer", BytesField()), ] ) - super(DFSReferralResponse, self).__init__() + super().__init__() def _create_dfs_referral_entry(self, data): results = [] @@ -346,7 +346,7 @@ def __init__(self): ("share_name", TextField(null_terminated=True)), ] ) - super(DFSReferralEntryV1, self).__init__() + super().__init__() @property def network_address(self): @@ -381,7 +381,7 @@ def __init__(self): self.dfs_path = None self.dfs_alternate_path = None self.network_address = None - super(DFSReferralEntryV2, self).__init__() + super().__init__() def process_string_buffer(self, buffer, entry_offset): buffer_fields = ["dfs_path", "dfs_alternate_path", "network_address"] @@ -424,7 +424,7 @@ def __init__(self): self.dfs_path = None self.dfs_alternate_path = None self.network_address = None - super(DFSReferralEntryV3, self).__init__() + super().__init__() def process_string_buffer(self, buffer, entry_offset): is_name_list = self["referral_entry_flags"].has_flag(DFSReferralEntryFlags.NAME_LIST_REFERRAL) diff --git a/src/smbprotocol/exceptions.py b/src/smbprotocol/exceptions.py index 8932d4e6..a9ec482f 100644 --- a/src/smbprotocol/exceptions.py +++ b/src/smbprotocol/exceptions.py @@ -63,7 +63,7 @@ def __init__(self, ntstatus, filename, filename2=None): NtStatus.STATUS_PRIVILEGE_NOT_HELD: (errno.EACCES, "Required privilege not held"), NtStatus.STATUS_SHARING_VIOLATION: ( errno.EPERM, - "The process cannot access the file because it is being " "used by another process", + "The process cannot access the file because it is being used by another process", ), NtStatus.STATUS_NOT_A_REPARSE_POINT: (errno.EINVAL, "The file or directory is not a reparse point"), NtStatus.STATUS_FILE_IS_A_DIRECTORY: errno.EISDIR, @@ -75,7 +75,7 @@ def __init__(self, ntstatus, filename, filename2=None): if not isinstance(error_details, tuple): error_details = (error_details, os.strerror(error_details)) - super(SMBOSError, self).__init__(error_details[0], error_details[1], str(filename)) + super().__init__(error_details[0], error_details[1], str(filename)) def __str__(self): msg = "[Error {0}] [NtStatus 0x{1}] {2}: '{3}'".format( @@ -141,7 +141,7 @@ def message(self): required_dialect = self._get_dialect_name(self.required_dialect) negotiated_dialect = self._get_dialect_name(self.negotiated_dialect) - msg = "%s is not available on the negotiated dialect %s, " "requires dialect %s%s" % ( + msg = "%s is not available on the negotiated dialect %s, requires dialect %s%s" % ( self.feature_name, negotiated_dialect, required_dialect, @@ -162,7 +162,7 @@ class _SMBErrorRegistry(type): __registry = {} def __init__(cls, name, bases, attributes): - super(_SMBErrorRegistry, cls).__init__(name, bases, attributes) + super().__init__(name, bases, attributes) # Special case for the base SMBResponseException doesn't need to have _STATUS_CODE if cls.__module__ == _SMBErrorRegistry.__module__ and cls.__name__ == "SMBResponseException": @@ -176,6 +176,7 @@ def __init__(cls, name, bases, attributes): cls.__registry[cls._STATUS_CODE] = cls def __call__(cls, header=None): + # intercept construction of new SMBResponseException (and derived) instances if header: new_cls = cls.__registry.get(header["status"].get_value(), cls) @@ -185,7 +186,8 @@ def __call__(cls, header=None): header["data"] = SMB2ErrorResponse() new_cls = cls - return super(_SMBErrorRegistry, new_cls).__call__(header) + # now use type.__call__ directly to actually construct instance + return type.__call__(new_cls, header) class SMBResponseException(SMBException, metaclass=_SMBErrorRegistry): @@ -363,7 +365,7 @@ class InvalidDeviceRequest(SMBResponseException): class MoreProcessingRequired(SMBResponseException): _BASE_MESSAGE = ( - "The specified I/O request packet (IRP) cannot be disposed of because the I/O operation is not " "complete." + "The specified I/O request packet (IRP) cannot be disposed of because the I/O operation is not complete." ) _STATUS_CODE = NtStatus.STATUS_MORE_PROCESSING_REQUIRED @@ -455,7 +457,7 @@ class WrongPassword(SMBResponseException): class LogonFailure(SMBResponseException): _BASE_MESSAGE = ( - "The attempted logon is invalid. This is either due to a bad username or authentication " "information." + "The attempted logon is invalid. This is either due to a bad username or authentication information." ) _STATUS_CODE = NtStatus.STATUS_LOGON_FAILURE @@ -574,7 +576,7 @@ class PipeBroken(SMBResponseException): class FSDriverRequired(SMBResponseException): _BASE_MESSAGE = ( - "A volume has been accessed for which a file system driver is required that " "has not yet been loaded." + "A volume has been accessed for which a file system driver is required that has not yet been loaded." ) _STATUS_CODE = NtStatus.STATUS_FS_DRIVER_REQUIRED @@ -614,7 +616,7 @@ class ServerUnavailable(SMBResponseException): _STATUS_CODE = NtStatus.STATUS_SERVER_UNAVAILABLE -class ErrorContextId(object): +class ErrorContextId: """ [MS-SMB2] v53.0 2017-09-15 @@ -627,7 +629,7 @@ class ErrorContextId(object): SMB2_ERROR_ID_SHARE_REDIRECT = 0x53526472 -class SymbolicLinkErrorFlags(object): +class SymbolicLinkErrorFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -640,7 +642,7 @@ class SymbolicLinkErrorFlags(object): SYMLINK_FLAG_RELATIVE = 0x00000001 -class IpAddrType(object): +class IpAddrType: """ [MS-SM2] v53.0 2017-09-15 @@ -698,7 +700,7 @@ def __init__(self): ), ] ) - super(SMB2ErrorResponse, self).__init__() + super().__init__() def _error_data_value(self, structure, data): context_responses = [] @@ -756,7 +758,7 @@ def __init__(self): ), ] ) - super(SMB2ErrorContextResponse, self).__init__() + super().__init__() class SMB2SymbolicLinkErrorResponse(Structure): @@ -801,7 +803,7 @@ def __init__(self): ("path_buffer", BytesField(size=lambda s: self._get_name_length(s, True))), ] ) - super(SMB2SymbolicLinkErrorResponse, self).__init__() + super().__init__() def _get_name_length(self, structure, first): print_name_len = structure["print_name_length"].get_value() @@ -911,7 +913,7 @@ def __init__(self): ("resource_name", BytesField(size=lambda s: s["resource_name_length"].get_value())), ] ) - super(SMB2ShareRedirectErrorContext, self).__init__() + super().__init__() def _resource_name_offset(self, structure): min_structure_size = 24 @@ -941,7 +943,7 @@ def __init__(self): ), ] ) - super(SMB2MoveDstIpAddrStructure, self).__init__() + super().__init__() def _ip_address_size(self, structure): if structure["type"].get_value() == IpAddrType.MOVE_DST_IPADDR_V4: @@ -972,5 +974,5 @@ def set_ipaddress(self, address): else: addr = address.replace(":", "") if len(addr) != 32: - raise ValueError("When setting an IPv6 address, it must be in " "the full form without concatenation") + raise ValueError("When setting an IPv6 address, it must be in the full form without concatenation") self["ip_address"].set_value(binascii.unhexlify(addr)) diff --git a/src/smbprotocol/file_info.py b/src/smbprotocol/file_info.py index 63771d65..0fc4df71 100644 --- a/src/smbprotocol/file_info.py +++ b/src/smbprotocol/file_info.py @@ -19,7 +19,7 @@ ) -class AlignmentRequirement(object): +class AlignmentRequirement: """ [MS-FSCC] 2.4.3 FileAlignmentInformation - AlignmentRequirement https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/9b0b9971-85aa-4651-8438-f1c4298bcb0d @@ -37,7 +37,7 @@ class AlignmentRequirement(object): FILE_512_BYTE_ALIGNMENT = 0x000001FF -class InfoType(object): +class InfoType: """ [MS-SMB2] 2.2.39 SMB2 SET_INFO Request - InfoType https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-smb2/ee9614c4-be54-4a3c-98f1-769a7032a0e4 @@ -51,7 +51,7 @@ class InfoType(object): SMB2_0_INFO_QUOTA = 0x04 -class ModeInformation(object): +class ModeInformation: """ [MS-FSCC] 2.4.24 FileModeInformation - Mode https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/52df7798-8330-474b-ac31-9afe8075640c @@ -65,7 +65,7 @@ class ModeInformation(object): FILE_DELETE_ON_CLOSE = 0x00001000 -class FileAttributes(object): +class FileAttributes: """ [MS-FSCC] @@ -94,7 +94,7 @@ class FileAttributes(object): FILE_ATTRIBUTE_RECALL_ON_DATA_ACCESS = 0x00400000 -class FileInformationClass(object): +class FileInformationClass: """ [MS-SMB2] v53.0 2017-09-15 @@ -141,7 +141,7 @@ class FileInformationClass(object): FILE_NORMALIZED_NAME_INFORMATION = 48 -class FileSystemInformationClass(object): +class FileSystemInformationClass: """ [MS-SMB2] v53.0 2017-09-15 @@ -167,7 +167,7 @@ class FileSystemInformationClass(object): FILE_FS_SECTOR_SIZE_INFORMATION = 11 -class QueryInfoFlags(object): +class QueryInfoFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -206,7 +206,7 @@ def __init__(self): ), ] ) - super(FileNameInformation, self).__init__() + super().__init__() class FileAccessInformation(Structure): @@ -225,7 +225,7 @@ def __init__(self): ("access_flags", IntField(size=4)), ] ) - super(FileAccessInformation, self).__init__() + super().__init__() class FileAllInformation(Structure): @@ -305,7 +305,7 @@ def __init__(self): ), ] ) - super(FileAllInformation, self).__init__() + super().__init__() class FileAlignmentInformation(Structure): @@ -329,7 +329,7 @@ def __init__(self): ), ] ) - super(FileAlignmentInformation, self).__init__() + super().__init__() class FileAttributeTagInformation(Structure): @@ -362,7 +362,7 @@ def __init__(self): ), ] ) - super(FileAttributeTagInformation, self).__init__() + super().__init__() class FileBasicInformation(Structure): @@ -392,7 +392,7 @@ def __init__(self): ("reserved", IntField(size=4)), ] ) - super(FileBasicInformation, self).__init__() + super().__init__() class FileBothDirectoryInformation(Structure): @@ -438,7 +438,7 @@ def __init__(self): ("file_name", BytesField(size=lambda s: s["file_name_length"].get_value())), ] ) - super(FileBothDirectoryInformation, self).__init__() + super().__init__() class FileDirectoryInformation(Structure): @@ -473,7 +473,7 @@ def __init__(self): ("file_name", BytesField(size=lambda s: s["file_name_length"].get_value())), ] ) - super(FileDirectoryInformation, self).__init__() + super().__init__() class FileDispositionInformation(Structure): @@ -491,7 +491,7 @@ def __init__(self): ("delete_pending", BoolField(size=1)), ] ) - super(FileDispositionInformation, self).__init__() + super().__init__() class FileEaInformation(Structure): @@ -509,7 +509,7 @@ def __init__(self): ("ea_size", IntField(size=4)), ] ) - super(FileEaInformation, self).__init__() + super().__init__() class FileEndOfFileInformation(Structure): @@ -527,7 +527,7 @@ def __init__(self): ("end_of_file", IntField(size=8)), ] ) - super(FileEndOfFileInformation, self).__init__() + super().__init__() class FileFullDirectoryInformation(Structure): @@ -563,7 +563,7 @@ def __init__(self): ("file_name", BytesField(size=lambda s: s["file_name_length"].get_value())), ] ) - super(FileFullDirectoryInformation, self).__init__() + super().__init__() class FileFullEaInformation(Structure): @@ -610,7 +610,7 @@ def __init__(self): ), ] ) - super(FileFullEaInformation, self).__init__() + super().__init__() class FileGetEaInformation(Structure): @@ -639,7 +639,7 @@ def __init__(self): ("padding", IntField(size=1)), ] ) - super(FileGetEaInformation, self).__init__() + super().__init__() class FileIdBothDirectoryInformation(Structure): @@ -687,7 +687,7 @@ def __init__(self): ("file_name", BytesField(size=lambda s: s["file_name_length"].get_value())), ] ) - super(FileIdBothDirectoryInformation, self).__init__() + super().__init__() class FileIdFullDirectoryInformation(Structure): @@ -725,7 +725,7 @@ def __init__(self): ("file_name", BytesField(size=lambda s: s["file_name_length"].get_value())), ] ) - super(FileIdFullDirectoryInformation, self).__init__() + super().__init__() class FileInternalInformation(Structure): @@ -743,7 +743,7 @@ def __init__(self): ("index_number", IntField(size=8)), ] ) - super(FileInternalInformation, self).__init__() + super().__init__() class FileLinkInformation(Structure): @@ -783,7 +783,7 @@ def __init__(self): ), ] ) - super(FileLinkInformation, self).__init__() + super().__init__() class FileModeInformation(Structure): @@ -807,7 +807,7 @@ def __init__(self): ), ] ) - super(FileModeInformation, self).__init__() + super().__init__() class FileNamesInformation(Structure): @@ -828,7 +828,7 @@ def __init__(self): ("file_name", BytesField(size=lambda s: s["file_name_length"].get_value())), ] ) - super(FileNamesInformation, self).__init__() + super().__init__() class FilePositionInformation(Structure): @@ -852,7 +852,7 @@ def __init__(self): ), ] ) - super(FilePositionInformation, self).__init__() + super().__init__() class FileRenameInformation(Structure): @@ -891,7 +891,7 @@ def __init__(self): ), ] ) - super(FileRenameInformation, self).__init__() + super().__init__() class FileStandardInformation(Structure): @@ -926,7 +926,7 @@ def __init__(self): ("reserved", IntField(size=2)), ] ) - super(FileStandardInformation, self).__init__() + super().__init__() class FileStreamInformation(Structure): @@ -972,7 +972,7 @@ def __init__(self): ), ] ) - super(FileStreamInformation, self).__init__() + super().__init__() class FileFsFullSizeInformation(Structure): @@ -994,7 +994,7 @@ def __init__(self): ("bytes_per_sector", IntField(size=4)), ] ) - super(FileFsFullSizeInformation, self).__init__() + super().__init__() class FileFsObjectIdInformation(Structure): @@ -1019,7 +1019,7 @@ def __init__(self): ), ] ) - super(FileFsObjectIdInformation, self).__init__() + super().__init__() class FileFsVolumeInformation(Structure): @@ -1054,4 +1054,4 @@ def __init__(self): ), ] ) - super(FileFsVolumeInformation, self).__init__() + super().__init__() diff --git a/src/smbprotocol/header.py b/src/smbprotocol/header.py index f8872941..8142d3d8 100644 --- a/src/smbprotocol/header.py +++ b/src/smbprotocol/header.py @@ -7,7 +7,7 @@ from smbprotocol.structure import BytesField, EnumField, FlagField, IntField, Structure -class Commands(object): +class Commands: """ [MS-SMB2] v53.0 2017-09-15 @@ -36,7 +36,7 @@ class Commands(object): SMB2_OPLOCK_BREAK = 0x0012 -class NtStatus(object): +class NtStatus: """ [MS-ERREF] https://msdn.microsoft.com/en-au/library/cc704588.aspx @@ -111,7 +111,7 @@ class NtStatus(object): STATUS_SERVER_UNAVAILABLE = 0xC0000466 -class Smb2Flags(object): +class Smb2Flags: """ [MS-SMB2] v53.0 2017-09-15 @@ -185,7 +185,7 @@ def __init__(self): ("data", BytesField()), ] ) - super(SMB2HeaderAsync, self).__init__() + super().__init__() class SMB2HeaderRequest(Structure): @@ -241,7 +241,7 @@ def __init__(self): ("data", BytesField()), ] ) - super(SMB2HeaderRequest, self).__init__() + super().__init__() class SMB2HeaderResponse(Structure): @@ -303,4 +303,4 @@ def __init__(self): ("data", BytesField()), ] ) - super(SMB2HeaderResponse, self).__init__() + super().__init__() diff --git a/src/smbprotocol/ioctl.py b/src/smbprotocol/ioctl.py index 8a15dec7..662f5bf2 100644 --- a/src/smbprotocol/ioctl.py +++ b/src/smbprotocol/ioctl.py @@ -21,7 +21,7 @@ ) -class CtlCode(object): +class CtlCode: """ [MS-SMB2] v53.0 2017-09-15 @@ -52,7 +52,7 @@ class CtlCode(object): FSCTL_VALIDATE_NEGOTIATE_INFO = 0x00140204 -class IOCTLFlags(object): +class IOCTLFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -64,7 +64,7 @@ class IOCTLFlags(object): SMB2_0_IOCTL_IS_FSCTL = 0x00000001 -class HashVersion(object): +class HashVersion: """ [MS-SMB2] v53.0 2017-09-15 @@ -76,7 +76,7 @@ class HashVersion(object): SRV_HASH_VER_2 = 0x00000002 -class HashRetrievalType(object): +class HashRetrievalType: """ [MS-SMB2] v53.0 2017-09-15 @@ -89,7 +89,7 @@ class HashRetrievalType(object): SRV_HASH_RETRIEVE_FILE_BASED = 0x00000002 -class IfCapability(object): +class IfCapability: """ [MS-SMB2] v53.0 2017-09-15 @@ -101,7 +101,7 @@ class IfCapability(object): RDMA_CAPABLE = 0x00000002 -class SockAddrFamily(object): +class SockAddrFamily: """ [MS-SMB2] v53.0 2017-09-15 @@ -160,7 +160,7 @@ def __init__(self): ("buffer", BytesField(size=lambda s: s["input_count"].get_value())), ] ) - super(SMB2IOCTLRequest, self).__init__() + super().__init__() def _buffer_offset_value(self, structure): # The offset from the beginning of the SMB2 header to the value of the @@ -198,7 +198,7 @@ def __init__(self): ), ] ) - super(SMB2SrvCopyChunkCopy, self).__init__() + super().__init__() class SMB2SrvCopyChunk(Structure): @@ -219,7 +219,7 @@ def __init__(self): ("reserved", IntField(size=4)), ] ) - super(SMB2SrvCopyChunk, self).__init__() + super().__init__() class SMB2SrvReadHashRequest(Structure): @@ -242,7 +242,7 @@ def __init__(self): ("offset", IntField(size=8)), ] ) - super(SMB2SrvReadHashRequest, self).__init__() + super().__init__() class SMB2SrvNetworkResiliencyRequest(Structure): @@ -262,7 +262,7 @@ def __init__(self): ("reserved", IntField(size=4)), ] ) - super(SMB2SrvNetworkResiliencyRequest, self).__init__() + super().__init__() class SMB2ValidateNegotiateInfoRequest(Structure): @@ -304,7 +304,7 @@ def __init__(self): ), ] ) - super(SMB2ValidateNegotiateInfoRequest, self).__init__() + super().__init__() class SMB2IOCTLResponse(Structure): @@ -344,7 +344,7 @@ def __init__(self): ), ] ) - super(SMB2IOCTLResponse, self).__init__() + super().__init__() class SMB2SrvCopyChunkResponse(Structure): @@ -362,7 +362,7 @@ def __init__(self): ("total_bytes_written", IntField(size=4)), ] ) - super(SMB2SrvCopyChunkResponse, self).__init__() + super().__init__() class SMB2SrvSnapshotArray(Structure): @@ -384,7 +384,7 @@ def __init__(self): ("snapshots", BytesField()), ] ) - super(SMB2SrvSnapshotArray, self).__init__() + super().__init__() class SMB2SrvRequestResumeKey(Structure): @@ -415,7 +415,7 @@ def __init__(self): ), ] ) - super(SMB2SrvRequestResumeKey, self).__init__() + super().__init__() class SMB2NetworkInterfaceInfo(Structure): @@ -442,7 +442,7 @@ def __init__(self): ("sock_addr_storage", StructureField(size=128, structure_type=SockAddrStorage)), ] ) - super(SMB2NetworkInterfaceInfo, self).__init__() + super().__init__() @staticmethod def pack_multiple(messages): @@ -511,7 +511,7 @@ def __init__(self): ), ] ) - super(SockAddrStorage, self).__init__() + super().__init__() def _get_buffer_size(self, structure): if structure["family"].get_value() == SockAddrFamily.INTER_NETWORK: @@ -544,7 +544,7 @@ def __init__(self): self.fields = OrderedDict( [("port", IntField(size=2)), ("ipv4_address", BytesField(size=4)), ("reserved", IntField(size=8))] ) - super(SockAddrIn, self).__init__() + super().__init__() def get_ipaddress(self): addr_bytes = self["ipv4_address"].get_value() @@ -575,7 +575,7 @@ def __init__(self): ("scope_id", IntField(size=4)), ] ) - super(SockAddrIn6, self).__init__() + super().__init__() def get_ipaddress(self): # get's the full IPv6 Address, note this is the full address and has @@ -590,7 +590,7 @@ def set_ipaddress(self, address): # e.g. fe80:0000:0000:0000:0000:0000:0000:0000 and not any short form address = address.replace(":", "") if len(address) != 32: - raise ValueError("When setting an IPv6 address, it must be in the " "full form without concatenation") + raise ValueError("When setting an IPv6 address, it must be in the full form without concatenation") self["ipv6_address"].set_value(binascii.unhexlify(address)) @@ -618,4 +618,4 @@ def __init__(self): ("dialect", EnumField(size=2, enum_type=Dialects)), ] ) - super(SMB2ValidateNegotiateInfoResponse, self).__init__() + super().__init__() diff --git a/src/smbprotocol/open.py b/src/smbprotocol/open.py index 84ec14a1..6885b489 100644 --- a/src/smbprotocol/open.py +++ b/src/smbprotocol/open.py @@ -36,7 +36,7 @@ log = logging.getLogger(__name__) -class RequestedOplockLevel(object): +class RequestedOplockLevel: """ [MS-SMB2] v53.0 2017-09-15 @@ -51,7 +51,7 @@ class RequestedOplockLevel(object): SMB2_OPLOCK_LEVEL_LEASE = 0xFF -class ImpersonationLevel(object): +class ImpersonationLevel: """ [MS-SMB2] v53.0 2017-09-15 @@ -65,7 +65,7 @@ class ImpersonationLevel(object): Delegate = 0x3 -class ShareAccess(object): +class ShareAccess: """ [MS-SMB2] v53.0 2017-09-15 @@ -78,7 +78,7 @@ class ShareAccess(object): FILE_SHARE_DELETE = 0x4 -class CreateDisposition(object): +class CreateDisposition: """ [MS-SMB2] v53.0 2017-09-15 @@ -95,7 +95,7 @@ class CreateDisposition(object): FILE_OVERWRITE_IF = 0x5 -class CreateOptions(object): +class CreateOptions: """ [MS-SMB2] v53.0 2017-09-15 @@ -126,7 +126,7 @@ class CreateOptions(object): FILE_OPEN_FOR_FREE_SPACE_QUERY = 0x00800000 -class FilePipePrinterAccessMask(object): +class FilePipePrinterAccessMask: """ [MS-SMB2] v53.0 2017-09-15 @@ -156,7 +156,7 @@ class FilePipePrinterAccessMask(object): GENERIC_READ = 0x80000000 -class DirectoryAccessMask(object): +class DirectoryAccessMask: """ [MS-SMB2] v53.0 2017-09-15 @@ -186,7 +186,7 @@ class DirectoryAccessMask(object): GENERIC_READ = 0x80000000 -class FileFlags(object): +class FileFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -197,7 +197,7 @@ class FileFlags(object): SMB2_CREATE_FLAG_REPARSEPOINT = 0x1 -class CreateAction(object): +class CreateAction: """ [MS-SMB2] v53.0 2017-09-15 @@ -211,7 +211,7 @@ class CreateAction(object): FILE_OVERWRITTEN = 0x3 -class CloseFlags(object): +class CloseFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -222,7 +222,7 @@ class CloseFlags(object): SMB2_CLOSE_FLAG_POSTQUERY_ATTRIB = 0x01 -class ReadFlags(object): +class ReadFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -233,7 +233,7 @@ class ReadFlags(object): SMB2_READFLAG_READ_UNBUFFERED = 0x01 -class ReadWriteChannel(object): +class ReadWriteChannel: """ [MS-SMB2] v53.0 2017-09-15 @@ -246,7 +246,7 @@ class ReadWriteChannel(object): SMB2_CHANNEL_RDMA_V1_INVALIDATE = 0x2 -class WriteFlags(object): +class WriteFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -258,7 +258,7 @@ class WriteFlags(object): SMB2_WRITEFLAG_WRITE_UNBUFFERED = 0x00000002 -class QueryDirectoryFlags(object): +class QueryDirectoryFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -272,7 +272,7 @@ class QueryDirectoryFlags(object): SMB2_REOPEN = 0x10 -class QueryInfoFlags(object): +class QueryInfoFlags: """ [MS-SMB2] 2.2.37 SMB2 QUERY_INFO Request - Flags https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-smb2/d623b2f7-a5cd-4639-8cc9-71fa7d9f9ba9 @@ -285,7 +285,7 @@ class QueryInfoFlags(object): SL_INDEX_SPECIFIED = 0x00000004 -class InfoAdditionalInformation(object): +class InfoAdditionalInformation: """ [MS-SMB2] 2.2.39 SMB2 SET_INFO Request - AdditionalInformation https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-smb2/ee9614c4-be54-4a3c-98f1-769a7032a0e4 @@ -362,7 +362,7 @@ def __init__(self): ), ] ) - super(SMB2CreateRequest, self).__init__() + super().__init__() def _name_length(self, structure): buffer_path = structure["buffer_path"].get_value() @@ -445,7 +445,7 @@ def __init__(self): ), ] ) - super(SMB2CreateResponse, self).__init__() + super().__init__() def _create_contexts_offset(self, structure): if len(structure["buffer"]) == 0: @@ -486,7 +486,7 @@ def __init__(self): ("file_id", BytesField(size=16)), ] ) - super(SMB2CloseRequest, self).__init__() + super().__init__() class SMB2CloseResponse(Structure): @@ -521,7 +521,7 @@ def __init__(self): ), ] ) - super(SMB2CloseResponse, self).__init__() + super().__init__() class SMB2FlushRequest(Structure): @@ -544,7 +544,7 @@ def __init__(self): ("file_id", BytesField(size=16)), ] ) - super(SMB2FlushRequest, self).__init__() + super().__init__() class SMB2FlushResponse(Structure): @@ -559,7 +559,7 @@ class SMB2FlushResponse(Structure): def __init__(self): self.fields = OrderedDict([("structure_size", IntField(size=2, default=4)), ("reserved", IntField(size=2))]) - super(SMB2FlushResponse, self).__init__() + super().__init__() class SMB2ReadRequest(Structure): @@ -589,7 +589,7 @@ def __init__(self): ("buffer", BytesField(size=lambda s: self._get_buffer_length(s), default=b"\x00")), ] ) - super(SMB2ReadRequest, self).__init__() + super().__init__() def _get_read_channel_info_offset(self, structure): if structure["channel"].get_value() == 0: @@ -633,7 +633,7 @@ def __init__(self): ("buffer", BytesField(size=lambda s: s["data_length"].get_value())), ] ) - super(SMB2ReadResponse, self).__init__() + super().__init__() class SMB2WriteRequest(Structure): @@ -669,7 +669,7 @@ def __init__(self): ("buffer_channel_info", BytesField(size=lambda s: s["write_channel_info_length"].get_value())), ] ) - super(SMB2WriteRequest, self).__init__() + super().__init__() def _get_write_channel_info_offset(self, structure): if len(structure["buffer_channel_info"]) == 0: @@ -702,7 +702,7 @@ def __init__(self): ("write_channel_info_length", IntField(size=2)), ] ) - super(SMB2WriteResponse, self).__init__() + super().__init__() class SMB2QueryDirectoryRequest(Structure): @@ -730,7 +730,7 @@ def __init__(self): ("buffer", BytesField(size=lambda s: s["file_name_length"].get_value())), ] ) - super(SMB2QueryDirectoryRequest, self).__init__() + super().__init__() @staticmethod def unpack_response(file_information_class, buffer): @@ -787,7 +787,7 @@ def __init__(self): ("buffer", BytesField(size=lambda s: s["output_buffer_length"].get_value())), ] ) - super(SMB2QueryDirectoryResponse, self).__init__() + super().__init__() class SMB2QueryInfoRequest(Structure): @@ -860,7 +860,7 @@ def __init__(self): ), ] ) - super(SMB2QueryInfoRequest, self).__init__() + super().__init__() class SMB2QueryInfoResponse(Structure): @@ -905,7 +905,7 @@ def __init__(self): ), ] ) - super(SMB2QueryInfoResponse, self).__init__() + super().__init__() def parse_buffer(self, file_info_type): buffer = self["buffer"].get_value() @@ -985,7 +985,7 @@ def __init__(self): ("buffer", BytesField(size=lambda s: s["buffer_length"].get_value())), ] ) - super(SMB2SetInfoRequest, self).__init__() + super().__init__() class SMB2SetInfoResponse(Structure): @@ -1005,10 +1005,10 @@ def __init__(self): ("structure_size", IntField(size=2, default=2)), ] ) - super(SMB2SetInfoResponse, self).__init__() + super().__init__() -class Open(object): +class Open: def __init__(self, tree, name): """ [MS-SMB2] v53.0 2017-09-15 diff --git a/src/smbprotocol/reparse_point.py b/src/smbprotocol/reparse_point.py index dc36173d..d8ed4126 100644 --- a/src/smbprotocol/reparse_point.py +++ b/src/smbprotocol/reparse_point.py @@ -9,7 +9,7 @@ from smbprotocol.structure import BytesField, EnumField, IntField, Structure -class ReparseTags(object): +class ReparseTags: """ [MS-FSCC] 2.1.2.1 Reparse Tags https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/c8e77b37-3909-4fe6-a4ea-2b9d423b1ee4 @@ -97,7 +97,7 @@ def is_reparse_tag_directory(tag): return tag & 0x10000000 == 0x10000000 -class SymbolicLinkFlags(object): +class SymbolicLinkFlags: """ [MS-FSCC] 2.1.2.4 Symbolic Link Reparse Data Buffer - Flags https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/b41f1cbf-10df-4a47-98d4-1c52a833d913 @@ -135,7 +135,7 @@ def __init__(self): ("data_buffer", BytesField(size=lambda s: s["reparse_data_length"].get_value())), ] ) - super(ReparseDataBuffer, self).__init__() + super().__init__() class SymbolicLinkReparseDataBuffer(Structure): @@ -168,7 +168,7 @@ def __init__(self): ), ] ) - super(SymbolicLinkReparseDataBuffer, self).__init__() + super().__init__() def get_substitute_name(self): return self._get_name("substitute") diff --git a/src/smbprotocol/security_descriptor.py b/src/smbprotocol/security_descriptor.py index a507a7a6..0d7636db 100644 --- a/src/smbprotocol/security_descriptor.py +++ b/src/smbprotocol/security_descriptor.py @@ -16,7 +16,7 @@ ) -class AccessMask(object): +class AccessMask: """ [MS-DTYP] @@ -40,7 +40,7 @@ class AccessMask(object): DELETE = 0x00010000 -class AceType(object): +class AceType: """ [MS-DTYP] @@ -73,7 +73,7 @@ class AceType(object): SYSTEM_SCOPED_POLICY_ID_ACE_TYPE = 0x13 -class AceFlags(object): +class AceFlags: """ [MS-DTYP] @@ -90,7 +90,7 @@ class AceFlags(object): SUCCESSFUL_ACCESS_ACE_FLAG = 0x40 -class AclRevision(object): +class AclRevision: """ [MS-DTYP] @@ -103,7 +103,7 @@ class AclRevision(object): ACL_REVISION_DS = 0x04 # not natively supported yet -class SDControl(object): +class SDControl: """ [MS-DTYP] @@ -152,7 +152,7 @@ def __init__(self): ), ] ) - super(SIDPacket, self).__init__() + super().__init__() def __str__(self): revision = self["revision"].get_value() @@ -173,7 +173,7 @@ def from_string(self, sid_string): sid_entries = sid_string.split("-") if len(sid_entries) < 3: raise ValueError( - "A SID string must start with S and contain a " "revision and identifier authority, e.g. S-1-0" + "A SID string must start with S and contain a revision and identifier authority, e.g. S-1-0" ) revision = int(sid_entries[1]) @@ -202,7 +202,7 @@ def __init__(self): ("sid", StructureField(structure_type=SIDPacket)), ] ) - super(AccessAllowedAce, self).__init__() + super().__init__() class AccessDeniedAce(Structure): @@ -222,7 +222,7 @@ def __init__(self): ("sid", StructureField(structure_type=SIDPacket)), ] ) - super(AccessDeniedAce, self).__init__() + super().__init__() class SystemAuditAce(Structure): @@ -243,7 +243,7 @@ def __init__(self): ("sid", StructureField(structure_type=SIDPacket)), ] ) - super(SystemAuditAce, self).__init__() + super().__init__() class AclPacket(Structure): @@ -276,7 +276,7 @@ def __init__(self): ), ] ) - super(AclPacket, self).__init__() + super().__init__() def _unpack_aces(self, structure, data): aces = [] @@ -326,7 +326,7 @@ def __init__(self): ) # used to store the buffer values so it can easily be rebuilt self._buffer = OrderedDict() - super(SMB2CreateSDBuffer, self).__init__() + super().__init__() def get_owner(self): return self._get_sid_from_buffer("offset_owner") diff --git a/src/smbprotocol/session.py b/src/smbprotocol/session.py index b8794d34..59354a59 100644 --- a/src/smbprotocol/session.py +++ b/src/smbprotocol/session.py @@ -25,7 +25,7 @@ log = logging.getLogger(__name__) -class SessionFlags(object): +class SessionFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -98,7 +98,7 @@ def __init__(self): ), ] ) - super(SMB2SessionSetupRequest, self).__init__() + super().__init__() class SMB2SessionSetupResponse(Structure): @@ -151,7 +151,7 @@ def __init__(self): ), ] ) - super(SMB2SessionSetupResponse, self).__init__() + super().__init__() class SMB2Logoff(Structure): @@ -167,10 +167,10 @@ class SMB2Logoff(Structure): def __init__(self): self.fields = OrderedDict([("structure_size", IntField(size=2, default=4)), ("reserved", IntField(size=2))]) - super(SMB2Logoff, self).__init__() + super().__init__() -class Session(object): +class Session: def __init__(self, connection, username=None, password=None, require_encryption=True, auth_protocol="negotiate"): """ [MS-SMB2] v53.0 2017-09-15 diff --git a/src/smbprotocol/structure.py b/src/smbprotocol/structure.py index e19f10a4..d9726551 100644 --- a/src/smbprotocol/structure.py +++ b/src/smbprotocol/structure.py @@ -50,7 +50,7 @@ def predicate(line): return "".join(lines) -class Structure(object): +class Structure: def __init__(self): # Now that self.fields is set, loop through it again and set the # metadata around the fields and set the value based on default. @@ -150,9 +150,7 @@ def __init__(self, little_endian=True, default=None, size=None): self.little_endian = little_endian if not (size is None or isinstance(size, int) or isinstance(size, types.LambdaType)): - raise InvalidFieldDefinition( - "%s size for field must be an int or " "None for a variable length" % field_type - ) + raise InvalidFieldDefinition("%s size for field must be an int or None for a variable length" % field_type) self.size = size self.default = default self.value = None @@ -331,9 +329,9 @@ def __init__(self, size, unsigned=True, **kwargs): :param kwargs: Any other kwarg to be sent to Field() """ if size not in [1, 2, 4, 8]: - raise InvalidFieldDefinition("IntField size must have a value of " "1, 2, 4, or 8 not %s" % str(size)) + raise InvalidFieldDefinition("IntField size must have a value of 1, 2, 4, or 8 not %s" % str(size)) self.unsigned = unsigned - super(IntField, self).__init__(size=size, **kwargs) + super().__init__(size=size, **kwargs) def _pack_value(self, value): format = self._get_struct_format(self.size, self.unsigned) @@ -353,9 +351,7 @@ def _parse_value(self, value): elif isinstance(value, int): int_value = value else: - raise TypeError( - "Cannot parse value for field %s of type %s to " "an int" % (self.name, type(value).__name__) - ) + raise TypeError("Cannot parse value for field %s of type %s to an int" % (self.name, type(value).__name__)) return int_value def _get_packed_size(self): @@ -390,7 +386,7 @@ def _parse_value(self, value): bytes_value = value else: raise TypeError( - "Cannot parse value for field %s of type %s to a " "byte string" % (self.name, type(value).__name__) + "Cannot parse value for field %s of type %s to a byte string" % (self.name, type(value).__name__) ) return bytes_value @@ -432,16 +428,16 @@ def __init__(self, list_count=None, list_type=BytesField(), unpack_func=None, ** """ if list_count is not None and not (isinstance(list_count, int) or isinstance(list_count, types.LambdaType)): raise InvalidFieldDefinition( - "ListField list_count must be an " "int, lambda, or None for a variable " "list length" + "ListField list_count must be an int, lambda, or None for a variable list length" ) self.list_count = list_count if not isinstance(list_type, Field): - raise InvalidFieldDefinition("ListField list_type must be a " "Field definition") + raise InvalidFieldDefinition("ListField list_type must be a Field definition") self.list_type = list_type if unpack_func is not None and not isinstance(unpack_func, types.LambdaType): - raise InvalidFieldDefinition("ListField unpack_func must be a " "lambda function or None") + raise InvalidFieldDefinition("ListField unpack_func must be a lambda function or None") elif unpack_func is None and (list_count is None or list_type.size is None): raise InvalidFieldDefinition( "ListField must either define " @@ -451,7 +447,7 @@ def __init__(self, list_count=None, list_type=BytesField(), unpack_func=None, ** ) self.unpack_func = unpack_func - super(ListField, self).__init__(**kwargs) + super().__init__(**kwargs) def __getitem__(self, item): # TODO: Make this more efficient @@ -491,9 +487,7 @@ def _parse_value(self, value): # manually parse each list entry to the field type specified list_value = value else: - raise TypeError( - "Cannot parse value for field %s of type %s to a " "list" % (self.name, type(value).__name__) - ) + raise TypeError("Cannot parse value for field %s of type %s to a list" % (self.name, type(value).__name__)) list_value = [self._parse_sub_value(v) for v in list_value] return list_value @@ -557,7 +551,7 @@ def __init__(self, structure_type, **kwargs): :param kwargs: Any other kwarg to be sent to Field() """ self.structure_type = structure_type - super(StructureField, self).__init__(**kwargs) + super().__init__(**kwargs) def __setitem__(self, key, value): field = self._get_field(key) @@ -590,7 +584,7 @@ def _parse_value(self, value): structure_value = value else: raise TypeError( - "Cannot parse value for field %s of type %s to a " "structure" % (self.name, type(value).__name__) + "Cannot parse value for field %s of type %s to a structure" % (self.name, type(value).__name__) ) if isinstance(structure_value, bytes) and self.structure_type and structure_value != b"": @@ -614,7 +608,7 @@ def _to_string(self): def _get_field(self, key): structure_value = self._get_calculated_value(self.value) if isinstance(structure_value, bytes): - raise ValueError("Cannot get field %s when structure is defined " "as a byte string" % key) + raise ValueError("Cannot get field %s when structure is defined as a byte string" % key) field = structure_value._get_field(key) return field @@ -640,8 +634,8 @@ def __init__(self, size=None, **kwargs): :param kwargs: Any other kwarg to be sent to Field() """ if not (size is None or size == 8): - raise InvalidFieldDefinition("DateTimeField type must have a size " "of 8 not %d" % size) - super(DateTimeField, self).__init__(size=8, **kwargs) + raise InvalidFieldDefinition("DateTimeField type must have a size of 8 not %d" % size) + super().__init__(size=8, **kwargs) def _pack_value(self, value): utc_tz = datetime.timezone.utc @@ -678,7 +672,7 @@ def _parse_value(self, value): datetime_value = value else: raise TypeError( - "Cannot parse value for field %s of type %s to a " "datetime" % (self.name, type(value).__name__) + "Cannot parse value for field %s of type %s to a datetime" % (self.name, type(value).__name__) ) return datetime_value @@ -700,8 +694,8 @@ def __init__(self, size=None, **kwargs): :param kwargs: Any other kwarg to be sent to Field() """ if not (size is None or size == 16): - raise InvalidFieldDefinition("UuidField type must have a size of " "16 not %d" % size) - super(UuidField, self).__init__(size=16, **kwargs) + raise InvalidFieldDefinition("UuidField type must have a size of 16 not %d" % size) + super().__init__(size=16, **kwargs) def _pack_value(self, value): if self.little_endian: @@ -723,9 +717,7 @@ def _parse_value(self, value): elif isinstance(value, types.LambdaType): uuid_value = value else: - raise TypeError( - "Cannot parse value for field %s of type %s to a " "uuid" % (self.name, type(value).__name__) - ) + raise TypeError("Cannot parse value for field %s of type %s to a uuid" % (self.name, type(value).__name__)) return uuid_value def _get_packed_size(self): @@ -740,10 +732,10 @@ class EnumField(IntField): def __init__(self, enum_type, enum_strict=True, **kwargs): self.enum_type = enum_type self.enum_strict = enum_strict - super(EnumField, self).__init__(**kwargs) + super().__init__(**kwargs) def _parse_value(self, value): - int_value = super(EnumField, self)._parse_value(value) + int_value = super()._parse_value(value) valid = False for flag_value in vars(self.enum_type).values(): if int_value == flag_value: @@ -771,7 +763,7 @@ class FlagField(IntField): def __init__(self, flag_type, flag_strict=True, **kwargs): self.flag_type = flag_type self.flag_strict = flag_strict - super(FlagField, self).__init__(**kwargs) + super().__init__(**kwargs) def set_flag(self, flag): valid = False @@ -788,7 +780,7 @@ def has_flag(self, flag): return self.value & flag == flag def _parse_value(self, value): - int_value = super(FlagField, self)._parse_value(value) + int_value = super()._parse_value(value) current_val = int_value for value in vars(self.flag_type).values(): if isinstance(value, int): @@ -819,8 +811,8 @@ def __init__(self, size=1, **kwargs): :param kwargs: Any other kwargs to be sent to Field() """ if size != 1: - raise InvalidFieldDefinition("BoolField size must have a value of " "1, not %d" % size) - super(BoolField, self).__init__(size=size, **kwargs) + raise InvalidFieldDefinition("BoolField size must have a value of 1, not %d" % size) + super().__init__(size=size, **kwargs) def _pack_value(self, value): return b"\x01" if value else b"\x00" @@ -835,9 +827,7 @@ def _parse_value(self, value): elif isinstance(value, types.LambdaType): bool_value = value else: - raise TypeError( - "Cannot parse value for field %s of type %s to a " "bool" % (self.name, type(value).__name__) - ) + raise TypeError("Cannot parse value for field %s of type %s to a bool" % (self.name, type(value).__name__)) return bool_value def _get_packed_size(self): @@ -851,7 +841,7 @@ class TextField(BytesField): def __init__(self, encoding="utf-16-le", null_terminated=False, **kwargs): self.encoding = encoding self.null_terminated = null_terminated - super(TextField, self).__init__(**kwargs) + super().__init__(**kwargs) def _pack_value(self, value): if self.null_terminated: @@ -869,7 +859,7 @@ def _parse_value(self, value): text_value = value else: raise TypeError( - "Cannot parse value for field %s of type %s to a " "text string" % (self.name, type(value).__name__) + "Cannot parse value for field %s of type %s to a text string" % (self.name, type(value).__name__) ) return text_value diff --git a/src/smbprotocol/transport.py b/src/smbprotocol/transport.py index 2a97655f..3ba05b35 100644 --- a/src/smbprotocol/transport.py +++ b/src/smbprotocol/transport.py @@ -44,10 +44,10 @@ def __init__(self): ), ] ) - super(DirectTCPPacket, self).__init__() + super().__init__() -class Tcp(object): +class Tcp: MAX_SIZE = 16777215 def __init__(self, server, port, timeout=None): diff --git a/src/smbprotocol/tree.py b/src/smbprotocol/tree.py index 5eeb0535..94f3e37d 100644 --- a/src/smbprotocol/tree.py +++ b/src/smbprotocol/tree.py @@ -26,7 +26,7 @@ log = logging.getLogger(__name__) -class TreeFlags(object): +class TreeFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -39,7 +39,7 @@ class TreeFlags(object): SMB2_TREE_CONNECT_FLAG_EXTENSION_PRESENT = 0x0001 -class ShareType(object): +class ShareType: """ [MS-SMB2] v53.0 2017-09-15 @@ -52,7 +52,7 @@ class ShareType(object): SMB2_SHARE_TYPE_PRINT = 0x03 -class ShareFlags(object): +class ShareFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -77,7 +77,7 @@ class ShareFlags(object): SMB2_SHAREFLAG_IDENTITY_REMOTING = 0x00040000 -class ShareCapabilities(object): +class ShareCapabilities: """ [MS-SMB2] v53.0 2017-09-15 @@ -131,7 +131,7 @@ def __init__(self): ("buffer", BytesField(size=lambda s: s["path_length"].get_value())), ] ) - super(SMB2TreeConnectRequest, self).__init__() + super().__init__() class SMB2TreeConnectResponse(Structure): @@ -174,7 +174,7 @@ def __init__(self): ("maximal_access", IntField(size=4)), ] ) - super(SMB2TreeConnectResponse, self).__init__() + super().__init__() class SMB2TreeDisconnect(Structure): @@ -201,10 +201,10 @@ def __init__(self): ("reserved", IntField(size=2)), ] ) - super(SMB2TreeDisconnect, self).__init__() + super().__init__() -class TreeConnect(object): +class TreeConnect: def __init__(self, session, share_name): """ [MS-SMB2] v53.0 2017-09-15 diff --git a/tests/test_change_notify.py b/tests/test_change_notify.py index 73203320..422a5ed7 100644 --- a/tests/test_change_notify.py +++ b/tests/test_change_notify.py @@ -31,7 +31,7 @@ from smbprotocol.tree import TreeConnect -class TestFileNotifyInformation(object): +class TestFileNotifyInformation: DATA = b"\x00\x00\x00\x00" b"\x01\x00\x00\x00" b"\x08\x00\x00\x00" b"\x63\x00\x61\x00\x66\x00\xe9\x00" def test_create_message(self): @@ -53,7 +53,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "café" -class TestSMB2ChangeNotifyRequest(object): +class TestSMB2ChangeNotifyRequest: DATA = ( b"\x20\x00" b"\x00\x00" @@ -85,7 +85,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSMB2ChangeNotifyResponse(object): +class TestSMB2ChangeNotifyResponse: DATA = b"\x09\x00" b"\x48\x00" b"\x04\x00\x00\x00" b"\x01\x02\x03\x04" def test_create_message(self): @@ -105,7 +105,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == b"\x01\x02\x03\x04" -class TestChangeNotify(object): +class TestChangeNotify: def _remove_file(self, tree, name): file_open = Open(tree, name) file_open.create( diff --git a/tests/test_connection.py b/tests/test_connection.py index 48d35195..d2b85318 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -35,7 +35,7 @@ from smbprotocol.session import Session -class TestSMB2NegotiateRequest(object): +class TestSMB2NegotiateRequest: def test_create_message(self): message = SMB2NegotiateRequest() message["security_mode"] = SecurityMode.SMB2_NEGOTIATE_SIGNING_ENABLED @@ -93,7 +93,7 @@ def test_parse_message(self): ] -class TestSMB3NegotiateRequest(object): +class TestSMB3NegotiateRequest: def test_create_message(self): message = SMB3NegotiateRequest() message["security_mode"] = SecurityMode.SMB2_NEGOTIATE_SIGNING_ENABLED @@ -412,7 +412,7 @@ def test_parse_message_with_context_no_padding(self): ] -class TestSMB2NegotiateContextRequest(object): +class TestSMB2NegotiateContextRequest: def test_create_message(self): message = SMB2NegotiateContextRequest() message["context_type"] = NegotiateContextType.SMB2_ENCRYPTION_CAPABILITIES @@ -449,7 +449,7 @@ def test_parse_message_invalid_context_type(self): ) -class TestSMB2PreauthIntegrityCapabilities(object): +class TestSMB2PreauthIntegrityCapabilities: def test_create_message(self): message = SMB2PreauthIntegrityCapabilities() message["hash_algorithms"] = [HashAlgorithms.SHA_512] @@ -474,7 +474,7 @@ def test_parse_message(self): assert actual["salt"].get_value() == b"\x01" * 16 -class TestSMB2EncryptionCapabilities(object): +class TestSMB2EncryptionCapabilities: def test_create_message(self): message = SMB2EncryptionCapabilities() message["ciphers"] = [Ciphers.AES_128_CCM, Ciphers.AES_128_GCM] @@ -534,7 +534,7 @@ def test_parse_message(self): ] -class TestSMB2NegotiateResponse(object): +class TestSMB2NegotiateResponse: def test_create_message(self): message = SMB2NegotiateResponse() message["security_mode"] = SecurityMode.SMB2_NEGOTIATE_SIGNING_ENABLED @@ -772,7 +772,7 @@ def test_parse_message_3_1_1(self): assert preauth_cap["salt"].get_value() == b"\x22" * 32 -class TestSMB2Echo(object): +class TestSMB2Echo: def test_create_message(self): message = SMB2Echo() expected = b"\x04\x00" b"\x00\x00" @@ -790,7 +790,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSMB2CancelRequest(object): +class TestSMB2CancelRequest: DATA = b"\x04\x00" b"\x00\x00" def test_create_message(self): @@ -807,7 +807,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSMB2TransformHeader(object): +class TestSMB2TransformHeader: def test_create_message(self): message = SMB2TransformHeader() message["nonce"] = b"\xff" * 16 @@ -856,7 +856,7 @@ def test_parse_message(self): assert actual["data"].get_value() == b"\x01\x02\x03\x04" -class TestConnection(object): +class TestConnection: def test_dialect_2_0_2(self, smb_real): connection = Connection(uuid.uuid4(), smb_real[2], smb_real[3]) connection.connect(Dialects.SMB_2_0_2) @@ -1103,7 +1103,7 @@ def test_verify_fail_no_session(self, smb_real): header["flags"].set_flag(Smb2Flags.SMB2_FLAGS_SIGNED) with pytest.raises(SMBException) as exc: connection.verify_signature(header, 100) - assert str(exc.value) == "Failed to find session 100 for " "message verification" + assert str(exc.value) == "Failed to find session 100 for message verification" finally: connection.disconnect() @@ -1140,7 +1140,7 @@ def test_decrypt_invalid_flag(self, smb_real): enc_header["flags"] = 5 with pytest.raises(SMBException) as exc: connection._decrypt(enc_header) - assert str(exc.value) == "Expecting flag of 0x0001 but got 5 in " "the SMB Transform Header Response" + assert str(exc.value) == "Expecting flag of 0x0001 but got 5 in the SMB Transform Header Response" finally: connection.disconnect(True) @@ -1159,7 +1159,7 @@ def test_decrypt_invalid_session_id(self, smb_real): enc_header["session_id"] = 100 with pytest.raises(SMBException) as exc: connection._decrypt(enc_header) - assert str(exc.value) == "Failed to find valid session 100 for " "message decryption" + assert str(exc.value) == "Failed to find valid session 100 for message decryption" finally: connection.disconnect(True) @@ -1171,7 +1171,7 @@ def test_requested_credits_greater_than_available(self, smb_real): msg["max_output_response"] = 65538 # results in 2 credits required with pytest.raises(SMBException) as exc: connection.send(msg, None, None, 0) - assert str(exc.value) == "Request requires 2 credits but only 1 " "credits are available" + assert str(exc.value) == "Request requires 2 credits but only 1 credits are available" finally: connection.disconnect() @@ -1185,7 +1185,7 @@ def test_send_invalid_tree_id(self, smb_real): msg["file_id"] = b"\xff" * 16 with pytest.raises(SMBException) as exc: connection.send(msg, session.session_id, 10) - assert str(exc.value) == "Cannot find Tree with the ID 10 in " "the session tree table" + assert str(exc.value) == "Cannot find Tree with the ID 10 in the session tree table" finally: connection.disconnect() diff --git a/tests/test_create_contexts.py b/tests/test_create_contexts.py index c5afeaf2..70e0745f 100644 --- a/tests/test_create_contexts.py +++ b/tests/test_create_contexts.py @@ -42,7 +42,7 @@ from smbprotocol.header import NtStatus -class TestCreateContextName(object): +class TestCreateContextName: def test_get_response_known(self): name = CreateContextName.SMB2_CREATE_QUERY_ON_DISK_ID actual = CreateContextName.get_response_structure(name) @@ -55,7 +55,7 @@ def test_get_response_unknown(self): assert actual == expected -class TestSMB2CreateContextName(object): +class TestSMB2CreateContextName: def test_create_message(self): ea_buffer1 = SMB2CreateEABuffer() ea_buffer1["ea_name"] = "Authors\x00".encode("ascii") @@ -315,7 +315,7 @@ def test_get_context_data_unknown(self): assert actual == b"\x20\x00\x00\x00" -class TestSMB2CreateEABuffer(object): +class TestSMB2CreateEABuffer: def test_create_message(self): msg1 = SMB2CreateEABuffer() msg1["ea_name"] = "Authors\x00".encode("ascii") @@ -396,7 +396,7 @@ def test_parse_message(self): assert actual2["padding"].get_value() == b"" -class TestSMB2CreateDurableHandleRequest(object): +class TestSMB2CreateDurableHandleRequest: def test_create_message(self): message = SMB2CreateDurableHandleRequest() expected = b"\x00" * 16 @@ -413,7 +413,7 @@ def test_parse_message(self): assert actual["durable_request"].get_value() == b"\x00" * 16 -class TestSMB2CreateDurableHandleResponse(object): +class TestSMB2CreateDurableHandleResponse: def test_create_message(self): message = SMB2CreateDurableHandleResponse() expected = b"\x00" * 8 @@ -430,7 +430,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSMB2CreateDurableHandleReconnect(object): +class TestSMB2CreateDurableHandleReconnect: def test_create_message(self): message = SMB2CreateDurableHandleReconnect() message["data"] = b"\xff" * 16 @@ -448,7 +448,7 @@ def test_parse_message(self): assert actual["data"].pack() == b"\xff" * 16 -class TestSMB2CreateQueryMaximalAccessRequest(object): +class TestSMB2CreateQueryMaximalAccessRequest: def test_create_message(self): message = SMB2CreateQueryMaximalAccessRequest() message["timestamp"] = datetime.utcfromtimestamp(0) @@ -466,7 +466,7 @@ def test_parse_message(self): assert actual["timestamp"].get_value() == datetime.utcfromtimestamp(0) -class TestSMB2CreateQueryMaximalAccessResponse(object): +class TestSMB2CreateQueryMaximalAccessResponse: def test_create_message(self): message = SMB2CreateQueryMaximalAccessResponse() message["maximal_access"] = 2032127 @@ -485,7 +485,7 @@ def test_parse_message(self): assert actual["maximal_access"].get_value() == 2032127 -class TestSMB2CreateAllocationSize(object): +class TestSMB2CreateAllocationSize: def test_create_message(self): message = SMB2CreateAllocationSize() message["allocation_size"] = 1024 @@ -503,7 +503,7 @@ def test_parse_message(self): assert actual["allocation_size"].get_value() == 1024 -class TestSMB2CreateTimewarpToken(object): +class TestSMB2CreateTimewarpToken: def test_create_message(self): message = SMB2CreateTimewarpToken() message["timestamp"] = datetime.utcfromtimestamp(0) @@ -521,7 +521,7 @@ def test_parse_message(self): assert actual["timestamp"].get_value() == datetime.utcfromtimestamp(0) -class TestSMB2CreateRequestLease(object): +class TestSMB2CreateRequestLease: def test_create_message(self): message = SMB2CreateRequestLease() message["lease_key"] = b"\xff" * 16 @@ -556,7 +556,7 @@ def test_parse_message(self): assert actual["lease_duration"].get_value() == 10 -class TestSMB2CreateResponseLease(object): +class TestSMB2CreateResponseLease: def test_create_message(self): message = SMB2CreateResponseLease() message["lease_key"] = b"\xff" * 16 @@ -592,7 +592,7 @@ def test_parse_message(self): assert actual["lease_duration"].get_value() == 12 -class TestSMB2CreateQueryOnDiskIDResponse(object): +class TestSMB2CreateQueryOnDiskIDResponse: def test_create_message(self): message = SMB2CreateQueryOnDiskIDResponse() message["disk_file_id"] = 43065671436753645 @@ -623,7 +623,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == b"\x00" * 16 -class TestSMB2CreateRequestLeaseV2(object): +class TestSMB2CreateRequestLeaseV2: def test_create_message(self): message = SMB2CreateRequestLeaseV2() message["lease_key"] = b"\xff" * 16 @@ -672,7 +672,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSMB2CreateResponseLeaseV2(object): +class TestSMB2CreateResponseLeaseV2: def test_create_message(self): message = SMB2CreateResponseLeaseV2() message["lease_key"] = b"\xff" * 16 @@ -721,7 +721,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSMB2CreateDurableHandleRequestV2(object): +class TestSMB2CreateDurableHandleRequestV2: def test_create_message(self): message = SMB2CreateDurableHandleRequestV2() message["timeout"] = 100 @@ -756,7 +756,7 @@ def test_parse_message(self): assert actual["create_guid"].get_value() == uuid.UUID(bytes=b"\xff" * 16) -class TestSMB2CreateDurableHandleReconnectV2(object): +class TestSMB2CreateDurableHandleReconnectV2: def test_create_message(self): message = SMB2CreateDurableHandleReconnectV2() message["file_id"] = b"\xff" * 16 @@ -790,7 +790,7 @@ def test_parse_message(self): assert actual["flags"].get_value() == DurableHandleFlags.SMB2_DHANDLE_FLAG_PERSISTENT -class TestSMB2CreateDurableHandleResponseV2(object): +class TestSMB2CreateDurableHandleResponseV2: def test_create_message(self): message = SMB2CreateDurableHandleResponseV2() message["timeout"] = 10 @@ -809,7 +809,7 @@ def test_parse_message(self): assert actual["flags"].get_value() == 0 -class TestSMB2CreateAppInstanceId(object): +class TestSMB2CreateAppInstanceId: def test_create_message(self): message = SMB2CreateAppInstanceId() message["app_instance_id"] = b"\xff" * 16 @@ -829,7 +829,7 @@ def test_parse_message(self): assert actual["app_instance_id"].get_value() == b"\xff" * 16 -class TestSMB2SVHDXOpenDeviceContextRequest(object): +class TestSMB2SVHDXOpenDeviceContextRequest: def test_create_message(self): message = SMB2SVHDXOpenDeviceContextRequest() message["initiator_id"] = b"\xff" * 16 @@ -879,7 +879,7 @@ def test_parse_message(self): assert actual["initiator_host_name"].get_value() == "hostname".encode("utf-16-le") -class TestSMB2SVHDXOpenDeviceContextResponse(object): +class TestSMB2SVHDXOpenDeviceContextResponse: def test_create_message(self): message = SMB2SVHDXOpenDeviceContextResponse() message["initiator_id"] = b"\xff" * 16 @@ -931,7 +931,7 @@ def test_parse_message(self): assert actual["initiator_host_name"].get_value() == "hostname".encode("utf-16-le") -class TestSMB2SVHDXOpenDeviceContextV2Request(object): +class TestSMB2SVHDXOpenDeviceContextV2Request: def test_create_message(self): message = SMB2SVHDXOpenDeviceContextV2Request() message["initiator_id"] = b"\xff" * 16 @@ -996,7 +996,7 @@ def test_parse_message(self): assert actual["virtual_size"].get_value() == 0 -class TestSMB2SVHDXOpenDeviceContextV2Response(object): +class TestSMB2SVHDXOpenDeviceContextV2Response: def test_create_message(self): message = SMB2SVHDXOpenDeviceContextV2Response() message["initiator_id"] = b"\xff" * 16 @@ -1063,7 +1063,7 @@ def test_parse_message(self): assert actual["virtual_size"].get_value() == 0 -class TestSMB2CreateAppInstanceVersion(object): +class TestSMB2CreateAppInstanceVersion: def test_create_message(self): message = SMB2CreateAppInstanceVersion() message["app_instance_version_high"] = 10 diff --git a/tests/test_dfs.py b/tests/test_dfs.py index a4bf2987..9479a03f 100644 --- a/tests/test_dfs.py +++ b/tests/test_dfs.py @@ -26,7 +26,7 @@ UNICODE_TEXT = "ÜseӜ" + to_text(b"\xF0\x9D\x84\x9E") -class TestDomainEntry(object): +class TestDomainEntry: def test_domain_entry(self): domain_entry = DomainEntry(DOMAIN_REFERRAL["referral_entries"].get_value()[0]) assert domain_entry.domain_list == [] @@ -61,7 +61,7 @@ def test_process_dc_referral_invalid_hint(self): domain_entry.dc_hint = "invalid" -class TestReferralEntry(object): +class TestReferralEntry: def test_root_referral_entry(self): referral_entry = ReferralEntry(ROOT_REFERRAL) assert referral_entry.dfs_path == "\\domain.test\\dfs" @@ -114,7 +114,7 @@ def test_target_referral_invalid_hint(self): referral_entry.target_hint = DFSTarget("fake", False) -class TestDFSReferralRequest(object): +class TestDFSReferralRequest: def test_create_message(self): share = "\\\\server\\shares\\%s" % UNICODE_TEXT @@ -159,7 +159,7 @@ def test_parse_message(self, leftover): assert actual["request_file_name"].get_value() == share -class TestDFSReferralRequestEx(object): +class TestDFSReferralRequestEx: def test_create_message(self): share = "\\\\server\\shares\\%s" % UNICODE_TEXT @@ -225,7 +225,7 @@ def test_parse_message(self, leftover): assert actual["site_name"].get_value() == UNICODE_TEXT -class TestDFSReferralResponse(object): +class TestDFSReferralResponse: def test_parse_message_v1(self): actual = DFSReferralResponse() data = ( diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py index 1f3a20c6..1f4551fb 100644 --- a/tests/test_exceptions.py +++ b/tests/test_exceptions.py @@ -29,14 +29,14 @@ from smbprotocol.header import NtStatus -class TestSMBException(object): +class TestSMBException: def test_exception(self): with pytest.raises(SMBException) as exc: raise SMBException("smb error") assert str(exc.value) == "smb error" -class TestSMBAuthenticationError(object): +class TestSMBAuthenticationError: def test_exception(self): with pytest.raises(SMBAuthenticationError) as exc: raise SMBAuthenticationError("auth error") @@ -48,7 +48,7 @@ def test_caught_with_smbexception(self): assert str(exc.value) == "auth error" -class TestSMBOSError(object): +class TestSMBOSError: def test_error(self): with pytest.raises(SMBOSError) as err: raise SMBOSError(NtStatus.STATUS_OBJECT_NAME_NOT_FOUND, "filéname") @@ -83,7 +83,7 @@ def test_error_with_override(self): assert str(err.value) == "[Error 13] [NtStatus 0xc0000061] Required privilege not held: 'filéname'" -class TestSMBUnsupportedFeature(object): +class TestSMBUnsupportedFeature: def test_exception_needs_newer(self): with pytest.raises(SMBUnsupportedFeature) as exc: raise SMBUnsupportedFeature(Dialects.SMB_3_0_0, Dialects.SMB_3_1_1, "feature", True) @@ -112,7 +112,7 @@ def test_exception_no_suffix(self): ) -class TestSMBResponseException(object): +class TestSMBResponseException: def test_throw_default_exception(self): error_resp = SMB2ErrorResponse() header = self._get_header(error_resp) @@ -264,7 +264,7 @@ def _get_header(self, data, status=NtStatus.STATUS_INVALID_PARAMETER): return header -class TestSMB2ErrorResponse(object): +class TestSMB2ErrorResponse: def test_create_message_plain(self): # This is a plain error response without the error context response # data appended @@ -318,7 +318,7 @@ def test_parse_message_with_context(self): assert error_data[0]["error_context_data"].get_value() == b"\x01\x02\x03\x04" -class TestSMB2ErrorContextResponse(object): +class TestSMB2ErrorContextResponse: def test_create_message(self): message = SMB2ErrorContextResponse() message["error_id"] = ErrorContextId.SMB2_ERROR_ID_SHARE_REDIRECT @@ -339,7 +339,7 @@ def test_parse_message(self): assert actual["error_context_data"].get_value() == b"\x01\x02\x03\x04" -class TestSMB2SymbolicLinkErrorResponse(object): +class TestSMB2SymbolicLinkErrorResponse: def test_create_message(self): message = SMB2SymbolicLinkErrorResponse() message.set_name(r"C:\temp\folder", r"\??\C:\temp\folder") @@ -547,7 +547,7 @@ def test_resolve_path_different_host(self): resp.resolve_path(link_path) -class TestSMB2ShareRedirectErrorContext(object): +class TestSMB2ShareRedirectErrorContext: def test_create_message(self): message = SMB2ShareRedirectErrorContext() ip1 = SMB2MoveDstIpAddrStructure() @@ -628,7 +628,7 @@ def test_parse_message(self): assert actual["resource_name"].get_value() == b"\x01\x02\x03\x04" -class TestSMB2MoveDstIpAddrStructure(object): +class TestSMB2MoveDstIpAddrStructure: def test_create_message_v4(self): message = SMB2MoveDstIpAddrStructure() message["type"] = IpAddrType.MOVE_DST_IPADDR_V4 @@ -663,7 +663,7 @@ def test_fail_invalid_ipv6_address(self): message["type"] = IpAddrType.MOVE_DST_IPADDR_V6 with pytest.raises(ValueError) as exc: message.set_ipaddress("abc") - assert str(exc.value) == "When setting an IPv6 address, it must be " "in the full form without concatenation" + assert str(exc.value) == "When setting an IPv6 address, it must be in the full form without concatenation" def test_parse_message_v4(self): actual = SMB2MoveDstIpAddrStructure() diff --git a/tests/test_file_info.py b/tests/test_file_info.py index 671688fe..3cb3d3c5 100644 --- a/tests/test_file_info.py +++ b/tests/test_file_info.py @@ -28,7 +28,7 @@ from smbprotocol.structure import DateTimeField -class TestFileNameInformation(object): +class TestFileNameInformation: DATA = b"\x08\x00\x00\x00" b"\x63\x00\x61\x00\x66\x00\xe9\x00" def test_create_message(self): @@ -49,7 +49,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "café" -class TestFileAllInformation(object): +class TestFileAllInformation: DATA = ( b"\x00\x80\x3e\xd5\xde\xb1\x9d\x01" b"\x00\x80\x3e\xd5\xde\xb1\x9d\x01" @@ -133,7 +133,7 @@ def test_parse_message(self): assert name["file_name"].get_value() == "" -class TestFileBothDirectoryInformation(object): +class TestFileBothDirectoryInformation: def test_create_message(self): message = FileBothDirectoryInformation() message["creation_time"] = datetime.utcfromtimestamp(1024) @@ -214,7 +214,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "file1.txt".encode("utf-16-le") -class TestFileDirectoryInformation(object): +class TestFileDirectoryInformation: def test_create_message(self): message = FileDirectoryInformation() message["creation_time"] = datetime.utcfromtimestamp(1024) @@ -278,7 +278,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "file1.txt".encode("utf-16-le") -class TestFileDispositionInformation(object): +class TestFileDispositionInformation: DATA = b"\x01" def test_create_message(self): @@ -299,7 +299,7 @@ def test_parse_message(self): assert actual["delete_pending"].get_value() is True -class TestFileEndOfFileInformation(object): +class TestFileEndOfFileInformation: def test_create_message(self): message = FileEndOfFileInformation() message["end_of_file"] = 1049459 @@ -319,7 +319,7 @@ def test_parse_message(self): assert actual["end_of_file"].get_value() == 1049459 -class TestFileFullDirectoryInformation(object): +class TestFileFullDirectoryInformation: def test_create_message(self): message = FileFullDirectoryInformation() message["creation_time"] = datetime.utcfromtimestamp(1024) @@ -386,7 +386,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "file1.txt".encode("utf-16-le") -class TestFileFullEaInformation(object): +class TestFileFullEaInformation: DATA = b"\x14\x00\x00\x00" b"\x00" b"\x04" b"\x04\x00" b"\x43\x41\x46\xe9\x00" b"\x63\x61\x66\xe9" def test_create_message(self): @@ -415,7 +415,7 @@ def test_parse_message(self): assert actual["ea_value"].get_value() == b"\x63\x61\x66\xe9" -class TestFileGetEaInformation(object): +class TestFileGetEaInformation: DATA = b"\x14\x00\x00\x00" b"\x04" b"\x43\x41\x46\xe9\x00" def test_create_message(self): @@ -440,7 +440,7 @@ def test_parse_message(self): assert actual["padding"].get_value() == 0 -class TestFileIdBothDirectoryInformation(object): +class TestFileIdBothDirectoryInformation: def test_create_message(self): message = FileIdBothDirectoryInformation() message["creation_time"] = datetime.utcfromtimestamp(1024) @@ -528,7 +528,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "file1.txt".encode("utf-16-le") -class TestFileIdFullDirectoryInformation(object): +class TestFileIdFullDirectoryInformation: def test_create_message(self): message = FileIdFullDirectoryInformation() message["creation_time"] = datetime.utcfromtimestamp(1024) @@ -602,7 +602,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "file1.txt".encode("utf-16-le") -class TestFileLinkInformation(object): +class TestFileLinkInformation: DATA = ( b"\x01" b"\x00\x00\x00\x00\x00\x00\x00" @@ -631,7 +631,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "café" -class TestFileNamesInformation(object): +class TestFileNamesInformation: def test_create_message(self): message = FileNamesInformation() message["file_name"] = "file1.txt".encode("utf-16-le") @@ -666,7 +666,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "file1.txt".encode("utf-16-le") -class TestFileRenameInformation(object): +class TestFileRenameInformation: DATA = ( b"\x01" b"\x00\x00\x00\x00\x00\x00\x00" @@ -695,7 +695,7 @@ def test_parse_message(self): assert actual["file_name"].get_value() == "café" -class TestFileStandardInformation(object): +class TestFileStandardInformation: def test_create_message(self): message = FileStandardInformation() message["allocation_size"] = 123456 @@ -738,7 +738,7 @@ def test_parse_message(self): assert actual["directory"].get_value() is False -class TestFileFsObjectInformation(object): +class TestFileFsObjectInformation: DATA = ( b"\x01\x02\x03\x04\x05\x06\x07\x08" b"\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10" @@ -771,7 +771,7 @@ def test_parse_message(self): assert actual["extended_info"].get_value() == b"\x00" * 48 -class TestFileFsVolumeInformation(object): +class TestFileFsVolumeInformation: DATA = ( b"\x00\x80\x3e\xd5\xde\xb1\x9d\x01" b"\x0a\x00\x00\x00" @@ -805,7 +805,7 @@ def test_parse_message(self): assert actual["volume_label"].get_value() == "café" -class TestFileFsFullSizeInformation(object): +class TestFileFsFullSizeInformation: DATA = ( b"\xa4\x6f\xd6\x01\x00\x00\x00\x00" b"\x9c\x41\x12\x01\x00\x00\x00\x00" diff --git a/tests/test_header.py b/tests/test_header.py index ca7f9a57..51cb4864 100644 --- a/tests/test_header.py +++ b/tests/test_header.py @@ -10,7 +10,7 @@ ) -class TestSMB2HeaderAsync(object): +class TestSMB2HeaderAsync: DATA = ( b"\xfe\x53\x4d\x42" b"\x40\x00" @@ -59,7 +59,7 @@ def test_parse_message(self): assert actual["data"].get_value() == b"\x01\x02\x03\x04" -class TestSMB2HeaderRequest(object): +class TestSMB2HeaderRequest: def test_create_message(self): header = SMB2HeaderRequest() header["command"] = Commands.SMB2_SESSION_SETUP @@ -126,7 +126,7 @@ def test_parse_message(self): assert actual["data"].get_value() == b"\x01\x02\x03\x04" -class TestSMB2HeaderResponse(object): +class TestSMB2HeaderResponse: def test_create_message(self): header = SMB2HeaderResponse() header["command"] = Commands.SMB2_SESSION_SETUP diff --git a/tests/test_ioctl.py b/tests/test_ioctl.py index 39fff2e2..8e8fca33 100644 --- a/tests/test_ioctl.py +++ b/tests/test_ioctl.py @@ -32,7 +32,7 @@ ) -class TestSMB2IOCTLRequest(object): +class TestSMB2IOCTLRequest: def test_create_message(self): message = SMB2IOCTLRequest() message["ctl_code"] = CtlCode.FSCTL_VALIDATE_NEGOTIATE_INFO @@ -120,7 +120,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == b"\x12\x13\x14\x15" -class TestSMB2SrvCopyChunkCopy(object): +class TestSMB2SrvCopyChunkCopy: def test_create_message(self): chunk1 = SMB2SrvCopyChunk() chunk1["source_offset"] = 0 @@ -190,7 +190,7 @@ def test_parse_message(self): assert chunk2["reserved"].get_value() == 0 -class TestSMB2SrvCopyChunk(object): +class TestSMB2SrvCopyChunk: def test_create_message(self): message = SMB2SrvCopyChunk() message["source_offset"] = 1234 @@ -222,7 +222,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSMB2SrcReadHashRequest(object): +class TestSMB2SrcReadHashRequest: def test_create_message(self): message = SMB2SrvReadHashRequest() message["hash_version"] = HashVersion.SRV_HASH_VER_2 @@ -258,7 +258,7 @@ def test_parse_message(self): assert actual["offset"].get_value() == 10 -class TestSMB2SrvNetworkResiliencyRequest(object): +class TestSMB2SrvNetworkResiliencyRequest: def test_create_message(self): message = SMB2SrvNetworkResiliencyRequest() message["timeout"] = 100 @@ -276,7 +276,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSMB2ValidateNegotiateInfoRequest(object): +class TestSMB2ValidateNegotiateInfoRequest: def test_create_message(self): message = SMB2ValidateNegotiateInfoRequest() message["capabilities"] = 8 @@ -317,7 +317,7 @@ def test_parse_message(self): assert len(actual["dialects"].get_value()) == 2 -class TestSMB2IOCTLResponse(object): +class TestSMB2IOCTLResponse: def test_create_message(self): message = SMB2IOCTLResponse() message["ctl_code"] = CtlCode.FSCTL_VALIDATE_NEGOTIATE_INFO @@ -375,7 +375,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == b"\x20\x21\x22\x23" -class TestSMB2SrvCopyChunkResponse(object): +class TestSMB2SrvCopyChunkResponse: def test_create_message(self): message = SMB2SrvCopyChunkResponse() message["chunks_written"] = 2 @@ -396,7 +396,7 @@ def test_parse_message(self): assert actual["total_bytes_written"].get_value() == 10 -class TestSMB2SrvSnapshotArray(object): +class TestSMB2SrvSnapshotArray: def test_create_message(self): message = SMB2SrvSnapshotArray() message["snapshot_array_size"] = 2 @@ -417,7 +417,7 @@ def test_parse_message(self): assert actual["snapshots"].get_value() == b"\x00\x00\x00\x00" -class TestSMB2SrvRequestResumeKey(object): +class TestSMB2SrvRequestResumeKey: def test_create_message(self): message = SMB2SrvRequestResumeKey() message["resume_key"] = b"\xff" * 24 @@ -445,7 +445,7 @@ def test_parse_message(self): assert actual["context_length"].get_value() == 0 -class TestSMB2NetworkInterfaceInfo(object): +class TestSMB2NetworkInterfaceInfo: def test_create_message(self): addr1 = SockAddrIn() addr1.set_ipaddress("10.0.2.15") @@ -548,7 +548,7 @@ def test_parse_message(self): assert actual_sock2["family"].get_value() == SockAddrFamily.INTER_NETWORK_V6 -class TestSockAddrStorage(object): +class TestSockAddrStorage: def test_create_message_ipv4(self): message = SockAddrStorage() message["family"] = SockAddrFamily.INTER_NETWORK @@ -610,7 +610,7 @@ def test_parse_message_ipv6(self): assert sock_addr.get_ipaddress() == "fe80:0000:0000:0000:894a:2dbc:1d9c:2da1" -class TestSockAddrIn(object): +class TestSockAddrIn: def test_create_message(self): message = SockAddrIn() message.set_ipaddress("10.0.2.15") @@ -638,7 +638,7 @@ def test_parse_message(self): assert actual.get_ipaddress() == "10.0.2.15" -class TestSockAddrIn6(object): +class TestSockAddrIn6: def test_create_message(self): message = SockAddrIn6() message.set_ipaddress("fe80:0000:0000:0000:894a:2dbc:1d9c:2da1") @@ -657,7 +657,7 @@ def test_set_ipaddress_invalid_format(self): message = SockAddrIn6() with pytest.raises(ValueError) as exc: message.set_ipaddress("fe80::894a:2dbc:1d9c:2da1") - assert str(exc.value) == "When setting an IPv6 address, it must be " "in the full form without concatenation" + assert str(exc.value) == "When setting an IPv6 address, it must be in the full form without concatenation" def test_parse_message(self): actual = SockAddrIn6() @@ -680,7 +680,7 @@ def test_parse_message(self): assert actual.get_ipaddress() == "fe80:0000:0000:0000:894a:2dbc:1d9c:2da1" -class TestSMB2ValidateNegotiateInfoResponse(object): +class TestSMB2ValidateNegotiateInfoResponse: def test_create_message(self): message = SMB2ValidateNegotiateInfoResponse() message["capabilities"] = 8 diff --git a/tests/test_open.py b/tests/test_open.py index b3d46e67..d9f596cf 100644 --- a/tests/test_open.py +++ b/tests/test_open.py @@ -68,7 +68,7 @@ from smbprotocol.tree import TreeConnect -class TestSMB2CreateRequest(object): +class TestSMB2CreateRequest: def test_create_message(self): timewarp_token = SMB2CreateTimewarpToken() timewarp_token["timestamp"] = datetime.utcfromtimestamp(0) @@ -268,7 +268,7 @@ def test_parse_message_no_contexts(self): assert actual["buffer_contexts"].get_value() == [] -class TestSMB2CreateResponse(object): +class TestSMB2CreateResponse: def test_create_message(self): message = SMB2CreateResponse() message["flag"] = FileFlags.SMB2_CREATE_FLAG_REPARSEPOINT @@ -456,7 +456,7 @@ def test_parse_message_no_contexts(self): assert actual["buffer"].get_value() == [] -class TestSMB2CloseRequest(object): +class TestSMB2CloseRequest: def test_create_message(self): message = SMB2CloseRequest() message["flags"].set_flag(CloseFlags.SMB2_CLOSE_FLAG_POSTQUERY_ATTRIB) @@ -489,7 +489,7 @@ def test_parse_message(self): assert actual["file_id"].get_value() == b"\xff" * 16 -class TestSMB2CloseResponse(object): +class TestSMB2CloseResponse: def test_create_message(self): message = SMB2CloseResponse() message["creation_time"] = datetime.utcfromtimestamp(0) @@ -540,7 +540,7 @@ def test_parse_message(self): assert actual["file_attributes"].get_value() == 0 -class TestSMB2FlushRequest(object): +class TestSMB2FlushRequest: def test_create_message(self): message = SMB2FlushRequest() message["file_id"] = b"\xff" * 16 @@ -572,7 +572,7 @@ def test_parse_message(self): assert actual["file_id"].pack() == b"\xff" * 16 -class TestSMB2FlushResponse(object): +class TestSMB2FlushResponse: def test_create_message(self): message = SMB2FlushResponse() expected = b"\x04\x00" b"\x00\x00" @@ -589,7 +589,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSMB2ReadRequest(object): +class TestSMB2ReadRequest: def test_create_message(self): message = SMB2ReadRequest() message["padding"] = b"\x50" @@ -713,7 +713,7 @@ def test_parse_message_channel_info(self): assert actual["buffer"].get_value() == b"\x00" * 16 -class TestSMB2ReadResponse(object): +class TestSMB2ReadResponse: def test_create_message(self): message = SMB2ReadResponse() message["data_offset"] = 80 @@ -742,7 +742,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == b"\x01\x02\x03\x04" -class TestSMB2WriteRequest(object): +class TestSMB2WriteRequest: def test_create_message(self): message = SMB2WriteRequest() message["offset"] = 131072 @@ -863,7 +863,7 @@ def test_parse_message_channel_info(self): assert actual["buffer_channel_info"].get_value() == b"\x00" * 16 -class TestSMB2WriteResponse(object): +class TestSMB2WriteResponse: def test_create_message(self): message = SMB2WriteResponse() message["count"] = 58040 @@ -885,7 +885,7 @@ def test_parse_message(self): assert actual["write_channel_info_length"].get_value() == 0 -class TestSMB2QueryDirectoryRequest(object): +class TestSMB2QueryDirectoryRequest: def test_create_message(self): message = SMB2QueryDirectoryRequest() message["file_information_class"] = FileInformationClass.FILE_NAMES_INFORMATION @@ -936,7 +936,7 @@ def test_parse_message(self): assert actual["buffer"].get_value().decode("utf-16-le") == "*" -class TestSMB2QueryDirectoryResponse(object): +class TestSMB2QueryDirectoryResponse: def test_create_message(self): message = SMB2QueryDirectoryResponse() message["buffer"] = b"\x10\x00\x00\x00\x00\x00\x00\x00" b"\x02\x00\x00\x00\x2E\x00\x00\x00" @@ -969,7 +969,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == b"\x10\x00\x00\x00\x00\x00\x00\x00" b"\x02\x00\x00\x00\x2E\x00\x00\x00" -class TestSMB2QueryInfoRequest(object): +class TestSMB2QueryInfoRequest: DATA = ( b"\x29\x00" b"\x01" @@ -1016,7 +1016,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == b"\x01\x02\x03\x04" -class TestSMB2QueryInfoResponse(object): +class TestSMB2QueryInfoResponse: def test_create_message(self): message = SMB2QueryInfoResponse() message["buffer"] = b"\x01\x02\x03\x04" @@ -1090,7 +1090,7 @@ def test_unpack_multiple_ea_response(self): assert actual[2]["ea_value"].get_value() == b"\x00\x01\x02\x03" -class TestSMB2SetInfoRequest(object): +class TestSMB2SetInfoRequest: def test_create_message(self): message = SMB2SetInfoRequest() message["info_type"] = 1 @@ -1143,7 +1143,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == b"\x01\x02\x03\x04" -class TestSMB2SetInfoResponse(object): +class TestSMB2SetInfoResponse: def test_create_message(self): message = SMB2SetInfoResponse() expected = b"\x02\x00" @@ -1162,7 +1162,7 @@ def test_parse_message(self): assert actual["structure_size"].get_value() == 2 -class TestOpen(object): +class TestOpen: # basic file open tests for each dialect def test_dialect_2_0_2(self, smb_real): connection = Connection(uuid.uuid4(), smb_real[2], smb_real[3]) diff --git a/tests/test_reparse_point.py b/tests/test_reparse_point.py index 1a007db8..3b9255e3 100644 --- a/tests/test_reparse_point.py +++ b/tests/test_reparse_point.py @@ -11,7 +11,7 @@ ) -class TestReparseTags(object): +class TestReparseTags: def test_tag_is_microsoft(self): assert ReparseTags.is_reparse_tag_microsoft(ReparseTags.IO_REPARSE_TAG_SYMLINK) assert not ReparseTags.is_reparse_tag_microsoft(1) @@ -25,7 +25,7 @@ def test_tag_is_directory(self): assert not ReparseTags.is_reparse_tag_directory(ReparseTags.IO_REPARSE_TAG_SYMLINK) -class TestReparseDataBuffer(object): +class TestReparseDataBuffer: DATA = b"\x0c\x00\x00\xa0" b"\x04\x00" b"\x00\x00" b"\x01\x02\x03\x04" def test_create_message(self): @@ -48,7 +48,7 @@ def test_parse_message(self): assert actual["data_buffer"].get_value() == b"\x01\x02\x03\x04" -class TestSymbolicLinkReparseDataBuffer(object): +class TestSymbolicLinkReparseDataBuffer: # Purposefully but the print name before sub name to test that the get_*_name() functions can handle any order. DATA = ( b"\x08\x00" diff --git a/tests/test_security_descriptor.py b/tests/test_security_descriptor.py index 49789ec3..385918bb 100644 --- a/tests/test_security_descriptor.py +++ b/tests/test_security_descriptor.py @@ -17,7 +17,7 @@ ) -class TestSIDPacket(object): +class TestSIDPacket: def test_create_message(self): sid = "S-1-1-0" message = SIDPacket() @@ -109,7 +109,7 @@ def test_parse_message_domain_sid(self): assert sub_auth[4] == 1104 -class TestAccessAllowedAce(object): +class TestAccessAllowedAce: def test_create_message(self): sid = SIDPacket() sid.from_string("S-1-1-0") @@ -155,7 +155,7 @@ def test_parse_message(self): assert str(actual["sid"].get_value()) == "S-1-1-0" -class TestAccessDeniedAce(object): +class TestAccessDeniedAce: def test_create_message(self): sid = SIDPacket() sid.from_string("S-1-1-0") @@ -201,7 +201,7 @@ def test_parse_message(self): assert str(actual["sid"].get_value()) == "S-1-1-0" -class TestSystemAuditAce(object): +class TestSystemAuditAce: def test_create_message(self): sid = SIDPacket() sid.from_string("S-1-1-0") @@ -247,7 +247,7 @@ def test_parse_message(self): assert str(actual["sid"].get_value()) == "S-1-1-0" -class TestAclPacket(object): +class TestAclPacket: def test_create_message(self): sid1 = SIDPacket() sid1.from_string("S-1-1-0") @@ -377,7 +377,7 @@ def test_parse_message(self): assert aces[2] == b"\x05\x00\x14\x00\x00\x00\x00\x00" b"\x01\x01\x00\x00\x00\x00\x00\x01" b"\x00\x00\x00\x00" -class TestSMB2SDBuffer(object): +class TestSMB2SDBuffer: def test_create_message(self): sid1 = SIDPacket() sid1.from_string("S-1-1-0") diff --git a/tests/test_session.py b/tests/test_session.py index eded2fb0..3ecb177d 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -18,7 +18,7 @@ from smbprotocol.session import spnego as pyspnego -class TestSMB2SessionSetupRequest(object): +class TestSMB2SessionSetupRequest: def test_create_message(self): message = SMB2SessionSetupRequest() message["security_mode"] = SecurityMode.SMB2_NEGOTIATE_SIGNING_ENABLED @@ -63,7 +63,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == b"\x01\x02\x03\x04" -class TestSMB2SessionSetupResponse(object): +class TestSMB2SessionSetupResponse: def test_create_message(self): message = SMB2SessionSetupResponse() message["session_flags"] = 1 @@ -85,7 +85,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == b"\x04\x03\x02\x01" -class TestSMB2Logoff(object): +class TestSMB2Logoff: def test_create_message(self): message = SMB2Logoff() expected = b"\x04\x00" b"\x00\x00" @@ -102,7 +102,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestSession(object): +class TestSession: def test_dialect_2_0_2(self, smb_real): connection = Connection(uuid.uuid4(), smb_real[2], smb_real[3]) connection.connect(Dialects.SMB_2_0_2) @@ -248,7 +248,7 @@ def test_require_encryption_not_supported(self, smb_real): session = Session(connection, smb_real[0], smb_real[1]) with pytest.raises(SMBException) as exc: session.connect() - assert str(exc.value) == "SMB encryption is required but the " "connection does not support it" + assert str(exc.value) == "SMB encryption is required but the connection does not support it" finally: connection.disconnect(True) diff --git a/tests/test_smbclient_os.py b/tests/test_smbclient_os.py index 5bc9cac1..37a3493c 100644 --- a/tests/test_smbclient_os.py +++ b/tests/test_smbclient_os.py @@ -723,7 +723,7 @@ def test_open_file_with_read_share_access(smb_share): with smbclient.open_file(file_path): expected = ( - "[NtStatus 0xc0000043] The process cannot access the file because it is being used by " "another process" + "[NtStatus 0xc0000043] The process cannot access the file because it is being used by another process" ) with pytest.raises(OSError, match=re.escape(expected)): smbclient.open_file(file_path) @@ -742,7 +742,7 @@ def test_open_file_with_write_share_access(smb_share): with smbclient.open_file(file_path, mode="w") as fd: expected = ( - "[NtStatus 0xc0000043] The process cannot access the file because it is being used by " "another process: " + "[NtStatus 0xc0000043] The process cannot access the file because it is being used by another process: " ) with pytest.raises(OSError, match=re.escape(expected)): smbclient.open_file(file_path, mode="a") diff --git a/tests/test_structure.py b/tests/test_structure.py index adb62871..47f54637 100644 --- a/tests/test_structure.py +++ b/tests/test_structure.py @@ -77,7 +77,7 @@ def __init__(self): ), ] ) - super(Structure2, self).__init__() + super().__init__() class Structure1(Structure): @@ -125,10 +125,10 @@ def __init__(self): ] ) - super(Structure1, self).__init__() + super().__init__() -class TestStructure(object): +class TestStructure: def test_structure_defaults(self): actual = Structure1() assert len(actual.fields) == 7 @@ -275,7 +275,7 @@ def __init__(self): ("end", BytesField()), ] ) - super(Structure3, self).__init__() + super().__init__() structure = Structure3() structure["end"] = b"\x01\x02\x03\x04" @@ -290,11 +290,11 @@ def __init__(self): assert len(structure["end"]) == 6 -class TestIntField(object): +class TestIntField: class StructureTest(Structure): def __init__(self): self.fields = OrderedDict([("field", IntField(size=4, default=1234))]) - super(TestIntField.StructureTest, self).__init__() + super().__init__() def test_get_size(self): field = self.StructureTest()["field"] @@ -324,7 +324,7 @@ def test_pack_signed(self): class UnsignedStructure(Structure): def __init__(self): self.fields = OrderedDict([("field", IntField(size=2, unsigned=False, default=-1))]) - super(UnsignedStructure, self).__init__() + super().__init__() field = UnsignedStructure()["field"] expected = b"\xff\xff" @@ -352,12 +352,12 @@ def test_unpack(self): def test_invalid_size_none(self): with pytest.raises(InvalidFieldDefinition) as exc: IntField(size=None) - assert str(exc.value) == "IntField size must have a value of 1, 2, " "4, or 8 not None" + assert str(exc.value) == "IntField size must have a value of 1, 2, 4, or 8 not None" def test_invalid_size_bad_int(self): with pytest.raises(InvalidFieldDefinition) as exc: IntField(size=3) - assert str(exc.value) == "IntField size must have a value of 1, 2, " "4, or 8 not 3" + assert str(exc.value) == "IntField size must have a value of 1, 2, 4, or 8 not 3" def test_set_none(self): field = self.StructureTest()["field"] @@ -400,13 +400,13 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value([]) - assert str(exc.value) == "Cannot parse value for field field of " "type list to an int" + assert str(exc.value) == "Cannot parse value for field field of type list to an int" def test_byte_order(self): class ByteOrderStructure(Structure): def __init__(self): self.fields = OrderedDict([("field", IntField(size=2, little_endian=False, default=10))]) - super(ByteOrderStructure, self).__init__() + super().__init__() field = ByteOrderStructure()["field"] expected = b"\x00\x0a" @@ -414,11 +414,11 @@ def __init__(self): assert actual == expected -class TestBytesField(object): +class TestBytesField: class StructureTest(Structure): def __init__(self): self.fields = OrderedDict([("field", BytesField(size=4, default=b"\x10\x11\x12\x13"))]) - super(TestBytesField.StructureTest, self).__init__() + super().__init__() def test_get_size(self): field = self.StructureTest()["field"] @@ -502,7 +502,7 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value([]) - assert str(exc.value) == "Cannot parse value for field field of " "type list to a byte string" + assert str(exc.value) == "Cannot parse value for field field of type list to a byte string" def test_pack_invalid_size(self): field = self.StructureTest()["field"] @@ -511,13 +511,13 @@ def test_pack_invalid_size(self): assert len(field) == 2 with pytest.raises(ValueError) as exc: field.pack() - assert str(exc.value) == "Invalid packed data length for field " "field of 2 does not fit field size of 4" + assert str(exc.value) == "Invalid packed data length for field field of 2 does not fit field size of 4" def test_set_int_invalid_size(self): class InvalidSizeStructure(Structure): def __init__(self): self.fields = OrderedDict([("field", BytesField(size=3))]) - super(InvalidSizeStructure, self).__init__() + super().__init__() with pytest.raises(InvalidFieldDefinition) as exc: field = InvalidSizeStructure()["field"] @@ -528,14 +528,14 @@ def test_set_invalid_size(self): class InvalidSizeStructure(Structure): def __init__(self): self.fields = OrderedDict([("field", BytesField(size="a"))]) - super(InvalidSizeStructure, self).__init__() + super().__init__() with pytest.raises(InvalidFieldDefinition) as exc: InvalidSizeStructure() - assert str(exc.value) == "BytesField size for field must be an int " "or None for a variable length" + assert str(exc.value) == "BytesField size for field must be an int or None for a variable length" -class TestListField(object): +class TestListField: # unpack variable length list class StructureTest(Structure): @@ -550,7 +550,7 @@ def __init__(self): ) ] ) - super(TestListField.StructureTest, self).__init__() + super().__init__() def test_get_size(self): field = self.StructureTest()["field"] @@ -601,7 +601,7 @@ def __init__(self): self.fields = OrderedDict( [("field", ListField(size=7, unpack_func=lambda s, d: [b"\x01\x02", b"\x03\x04\x05\x06", b"\07"]))] ) - super(UnpackListStructure, self).__init__() + super().__init__() field = UnpackListStructure()["field"] field.unpack(b"\x00") @@ -667,43 +667,43 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value(0) - assert str(exc.value) == "Cannot parse value for field field of " "type int to a list" + assert str(exc.value) == "Cannot parse value for field field of type int to a list" def test_list_count_not_int_or_lambda(self): class InvalidListField(Structure): def __init__(self): self.fields = OrderedDict([("field", ListField(list_count="a"))]) - super(InvalidListField, self).__init__() + super().__init__() with pytest.raises(InvalidFieldDefinition) as exc: InvalidListField() - assert str(exc.value) == "ListField list_count must be an int, " "lambda, or None for a variable list length" + assert str(exc.value) == "ListField list_count must be an int, lambda, or None for a variable list length" def test_unpack_func_not_lambda(self): class InvalidListField(Structure): def __init__(self): self.fields = OrderedDict([("field", ListField(unpack_func="a"))]) - super(InvalidListField, self).__init__() + super().__init__() with pytest.raises(InvalidFieldDefinition) as exc: InvalidListField() - assert str(exc.value) == "ListField unpack_func must be a lambda " "function or None" + assert str(exc.value) == "ListField unpack_func must be a lambda function or None" def test_list_field_not_field(self): class InvalidListField(Structure): def __init__(self): self.fields = OrderedDict([("field", ListField(list_type="a"))]) - super(InvalidListField, self).__init__() + super().__init__() with pytest.raises(InvalidFieldDefinition) as exc: InvalidListField() - assert str(exc.value) == "ListField list_type must be a Field " "definition" + assert str(exc.value) == "ListField list_type must be a Field definition" def test_list_unpack_list_type_size_not_defined(self): class InvalidListField(Structure): def __init__(self): self.fields = OrderedDict([("field", ListField(list_count=1))]) - super(InvalidListField, self).__init__() + super().__init__() with pytest.raises(InvalidFieldDefinition) as exc: InvalidListField() @@ -717,7 +717,7 @@ def test_list_unpack_list_count_not_defined(self): class InvalidListField(Structure): def __init__(self): self.fields = OrderedDict([("field", ListField(list_type=BytesField(size=1)))]) - super(InvalidListField, self).__init__() + super().__init__() with pytest.raises(InvalidFieldDefinition) as exc: InvalidListField() @@ -728,7 +728,7 @@ def __init__(self): ) -class TestStructureField(object): +class TestStructureField: class StructureTest(Structure): def __init__(self): self.fields = OrderedDict( @@ -739,7 +739,7 @@ def __init__(self): ) ] ) - super(TestStructureField.StructureTest, self).__init__() + super().__init__() def test_get_size(self): field = self.StructureTest()["field"] @@ -904,7 +904,7 @@ def test_fail_get_structure_bytes_value(self): field.set_value(b"\x7d\x00\x00\x00\x14\x15\x16\x17") with pytest.raises(ValueError) as exc: field["field"] - assert str(exc.value) == "Cannot get field field when structure is " "defined as a byte string" + assert str(exc.value) == "Cannot get field field when structure is defined as a byte string" def test_set_structure_field(self): field = self.StructureTest()["field"] @@ -920,14 +920,14 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value([]) - assert str(exc.value) == "Cannot parse value for field field of " "type list to a structure" + assert str(exc.value) == "Cannot parse value for field field of type list to a structure" -class TestUuidField(object): +class TestUuidField: class StructureTest(Structure): def __init__(self): self.fields = OrderedDict([("field", UuidField())]) - super(TestUuidField.StructureTest, self).__init__() + super().__init__() def test_get_size(self): field = self.StructureTest()["field"] @@ -1001,7 +1001,7 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value([]) - assert str(exc.value) == "Cannot parse value for field field of " "type list to a uuid" + assert str(exc.value) == "Cannot parse value for field field of type list to a uuid" def test_invalid_size_none(self): with pytest.raises(InvalidFieldDefinition) as exc: @@ -1025,7 +1025,7 @@ def test_unpack_uuid_field_big_endian(self): assert actual == expected -class TestDateTimeField(object): +class TestDateTimeField: DATE = datetime(year=1993, month=6, day=11, hour=7, minute=52, second=34, microsecond=34) class StructureTest(Structure): @@ -1040,7 +1040,7 @@ def __init__(self): ) ] ) - super(TestDateTimeField.StructureTest, self).__init__() + super().__init__() @pytest.mark.parametrize( "raw, expected_dt, expected_bytes", @@ -1147,15 +1147,15 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value([]) - assert str(exc.value) == "Cannot parse value for field field of " "type list to a datetime" + assert str(exc.value) == "Cannot parse value for field field of type list to a datetime" def test_invalid_size_none(self): with pytest.raises(InvalidFieldDefinition) as exc: DateTimeField(size=4) - assert str(exc.value) == "DateTimeField type must have a size of 8 " "not 4" + assert str(exc.value) == "DateTimeField type must have a size of 8 not 4" -class TestEnumField(object): +class TestEnumField: class StructureTest(Structure): def __init__(self): self.fields = OrderedDict( @@ -1170,7 +1170,7 @@ def __init__(self): ), ] ) - super(TestEnumField.StructureTest, self).__init__() + super().__init__() def test_get_size(self): field = self.StructureTest()["field"] @@ -1198,7 +1198,7 @@ def __init__(self): ) ] ) - super(StructureTestDefaultZero, self).__init__() + super().__init__() field = StructureTestDefaultZero()["field"] expected = "(0) UNKNOWN_ENUM" @@ -1256,16 +1256,16 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value([]) - assert str(exc.value) == "Cannot parse value for field field of " "type list to an int" + assert str(exc.value) == "Cannot parse value for field field of type list to an int" def test_set_invalid_value(self): field = self.StructureTest()["field"] with pytest.raises(ValueError) as exc: field.set_value(0x13) - assert str(exc.value) == "Enum value 19 does not exist in enum type " "" + assert str(exc.value) == "Enum value 19 does not exist in enum type " -class TestFlagField(object): +class TestFlagField: class StructureTest(Structure): def __init__(self): self.fields = OrderedDict( @@ -1280,7 +1280,7 @@ def __init__(self): ), ] ) - super(TestFlagField.StructureTest, self).__init__() + super().__init__() def test_get_size(self): field = self.StructureTest()["field"] @@ -1345,7 +1345,7 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value([]) - assert str(exc.value) == "Cannot parse value for field field of " "type list to an int" + assert str(exc.value) == "Cannot parse value for field field of type list to an int" def test_check_flag_set(self): field = self.StructureTest()["field"] @@ -1362,9 +1362,7 @@ def test_set_invalid_flag(self): field = self.StructureTest()["field"] with pytest.raises(ValueError) as ex: field.set_flag(10) - assert ( - str(ex.value) == "Flag value does not exist in flag type " "" - ) + assert str(ex.value) == "Flag value does not exist in flag type " def test_set_invalid_value(self): field = self.StructureTest()["field"] @@ -1373,11 +1371,11 @@ def test_set_invalid_value(self): assert str(exc.value) == "Invalid flag for field field value set 128" -class TestBoolField(object): +class TestBoolField: class StructureTest(Structure): def __init__(self): self.fields = OrderedDict([("field", BoolField(size=1))]) - super(TestBoolField.StructureTest, self).__init__() + super().__init__() def test_get_size(self): field = self.StructureTest()["field"] @@ -1484,16 +1482,16 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value([]) - assert str(exc.value) == "Cannot parse value for field field of " "type list to a bool" + assert str(exc.value) == "Cannot parse value for field field of type list to a bool" -class TestTextField(object): +class TestTextField: STRING_VALUE = "Hello World - café" class StructureTest(Structure): def __init__(self): self.fields = OrderedDict([("field", TextField(encoding="utf-8", default=TestTextField.STRING_VALUE))]) - super(TestTextField.StructureTest, self).__init__() + super().__init__() def test_get_size(self): field = self.StructureTest()["field"] @@ -1567,7 +1565,7 @@ def test_set_invalid(self): field.name = "field" with pytest.raises(TypeError) as exc: field.set_value([]) - assert str(exc.value) == "Cannot parse value for field field of " "type list to a text string" + assert str(exc.value) == "Cannot parse value for field field of type list to a text string" def test_set_with_different_encoding(self): structure = self.StructureTest() @@ -1605,4 +1603,4 @@ def __init__(self): ) ] ) - super(TestTextFieldNullTerminated.StructureTest, self).__init__() + super().__init__() diff --git a/tests/test_transport.py b/tests/test_transport.py index 86447676..5fa89000 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -36,7 +36,7 @@ def server_tcp(request): sock.close() -class TestDirectTcpPacket(object): +class TestDirectTcpPacket: def test_create_message(self): message = DirectTCPPacket() message["smb2_message"] = b"\xfe\x53\x4d\x42" @@ -60,7 +60,7 @@ def test_parse_message(self): assert actual_header.get_value() == b"\xfe\x53\x4d\x42" -class TestTcp(object): +class TestTcp: def test_normal_fail_message_too_big(self): tcp = Tcp("0.0.0.0", 0) tcp.connected = True diff --git a/tests/test_tree.py b/tests/test_tree.py index 9fc6db4b..f2a53ec4 100644 --- a/tests/test_tree.py +++ b/tests/test_tree.py @@ -18,7 +18,7 @@ ) -class TestSMB2TreeConnectRequest(object): +class TestSMB2TreeConnectRequest: def test_create_message(self): message = SMB2TreeConnectRequest() message["flags"] = 2 @@ -58,7 +58,7 @@ def test_parse_message(self): assert actual["buffer"].get_value() == "\\\\127.0.0.1\\c$".encode("utf-16-le") -class TestSMB2TreeConnectResponse(object): +class TestSMB2TreeConnectResponse: def test_create_message(self): message = SMB2TreeConnectResponse() message["share_type"] = 1 @@ -83,7 +83,7 @@ def test_parse_message(self): assert actual["maximal_access"].get_value() == 10 -class TestSMB2TreeDisconnect(object): +class TestSMB2TreeDisconnect: def test_create_message(self): message = SMB2TreeDisconnect() expected = b"\x04\x00" b"\x00\x00" @@ -100,7 +100,7 @@ def test_parse_message(self): assert actual["reserved"].get_value() == 0 -class TestTreeConnect(object): +class TestTreeConnect: def test_dialect_2_0_2(self, smb_real): connection = Connection(uuid.uuid4(), smb_real[2], smb_real[3]) connection.connect(Dialects.SMB_2_0_2) @@ -219,7 +219,7 @@ def test_secure_negotiation_verification_failed(self, smb_real): session.connect() with pytest.raises(SMBException) as exc: tree.connect() - assert "Secure negotiate failed to verify server dialect, " "Actual: 770, Expected: 768" in str(exc.value) + assert "Secure negotiate failed to verify server dialect, Actual: 770, Expected: 768" in str(exc.value) finally: connection.disconnect(True)