Skip to content

Commit

Permalink
fix formatting as per pep8 in minio.select (#877)
Browse files Browse the repository at this point in the history
  • Loading branch information
balamurugana authored Apr 11, 2020
1 parent 94b4c12 commit 45aa187
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 56 deletions.
2 changes: 2 additions & 0 deletions minio/select/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
"""


class SelectMessageError(Exception):
'''
Raised in case of message type 'error'
'''


class SelectCRCValidationError(Exception):
'''
Raised in case of CRC mismatch
Expand Down
13 changes: 6 additions & 7 deletions minio/select/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,26 @@
EVENT_RECORDS = 'Records' # Event Type is Records
EVENT_PROGRESS = 'Progress' # Event Type Progress
EVENT_STATS = 'Stats' # Event Type Stats
EVENT_CONT = 'Cont' # Event Type continue
EVENT_CONT = 'Cont' # Event Type continue
EVENT_END = 'End' # Event Type is End
EVENT_CONTENT_TYPE = "text/xml" # Event content xml type
EVENT_CONTENT_TYPE = "text/xml" # Event content xml type
EVENT = 'event' # Message Type is event
ERROR = 'error' # Message Type is error


def calculate_crc(value):
'''
Returns the CRC using crc32
'''
return crc32(value) & 0xffffffff


def validate_crc(current_value, expected_value):
'''
Validate through CRC check
'''
crc_current = calculate_crc(current_value)
crc_expected = byte_int(expected_value)
if crc_current == crc_expected:
return True
return False
return calculate_crc(current_value) == byte_int(expected_value)


def byte_int(data_bytes):
'''
Expand Down
9 changes: 9 additions & 0 deletions minio/select/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@

from .helpers import (SQL)


class CSVInput:
"""
CSVInput: Input Format as CSV.
"""

def __init__(self, FileHeaderInfo=None, RecordDelimiter="\n",
FieldDelimiter=",", QuoteCharacter='"',
QuoteEscapeCharacter='"', Comments="#",
Expand All @@ -43,10 +45,12 @@ def __init__(self, FileHeaderInfo=None, RecordDelimiter="\n",
self.Comments = Comments
self.AllowQuotedRecordDelimiter = AllowQuotedRecordDelimiter


class JSONInput:
"""
JSONInput: Input format as JSON.
"""

def __init__(self, Type=None):
self.Type = Type

Expand All @@ -61,6 +65,7 @@ class InputSerialization:
"""
InputSerialization: nput Format.
"""

def __init__(self, compression_type="NONE", csv=None, json=None, par=None):
self.compression_type = compression_type
self.csv_input = csv
Expand All @@ -73,6 +78,7 @@ class CSVOutput:
CSVOutput: Output as CSV.
"""

def __init__(self, QuoteFields="ASNEEDED", RecordDelimiter="\n",
FieldDelimiter=",", QuoteCharacter='"',
QuoteEscapeCharacter='"'):
Expand All @@ -87,6 +93,7 @@ class JsonOutput:
"""
JsonOutput- Output as JSON.
"""

def __init__(self, RecordDelimiter="\n"):
self.RecordDelimiter = RecordDelimiter

Expand All @@ -95,6 +102,7 @@ class OutputSerialization:
"""
OutputSerialization: Output Format.
"""

def __init__(self, csv=None, json=None):
self.csv_output = csv
self.json_output = json
Expand All @@ -104,6 +112,7 @@ class RequestProgress:
"""
RequestProgress: Sends progress message.
"""

def __init__(self, enabled=False):
self.enabled = enabled

Expand Down
95 changes: 46 additions & 49 deletions minio/select/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,15 @@
import io
import sys

from binascii import crc32
from xml.etree import ElementTree
from xml.etree.ElementTree import ParseError

from .helpers import (EVENT_RECORDS, EVENT_PROGRESS,
EVENT_STATS, EVENT_CONT,
EVENT, EVENT_CONTENT_TYPE,
EVENT_END, ERROR)
from .helpers import (EVENT_RECORDS, EVENT_STATS,
EVENT, EVENT_CONTENT_TYPE, ERROR)

from .helpers import (validate_crc, calculate_crc, byte_int)
from .errors import (SelectMessageError, SelectCRCValidationError)


def _extract_header(header_bytes):
"""
populates the header map after reading the header in bytes
Expand All @@ -51,26 +48,28 @@ def _extract_header(header_bytes):
# While loop ends when all the headers present are read
# header contains multipe headers
while header_byte_parsed < len(header_bytes):
header_name_byte_length = byte_int(header_bytes[header_byte_parsed:header_byte_parsed+1])
header_name_byte_length = byte_int(
header_bytes[header_byte_parsed:header_byte_parsed+1])
header_byte_parsed += 1
header_name = \
header_bytes[header_byte_parsed:
header_byte_parsed+header_name_byte_length]
header_name = header_bytes[
header_byte_parsed:header_byte_parsed+header_name_byte_length
]
header_byte_parsed += header_name_byte_length
# Header Value Type is of 1 bytes and is skipped
header_byte_parsed += 1
value_string_byte_length = \
byte_int(header_bytes[header_byte_parsed:
header_byte_parsed+2])
value_string_byte_length = byte_int(
header_bytes[header_byte_parsed:header_byte_parsed+2]
)
header_byte_parsed += 2
header_value = \
header_bytes[header_byte_parsed:
header_byte_parsed+value_string_byte_length]
header_value = header_bytes[
header_byte_parsed:header_byte_parsed+value_string_byte_length
]
header_byte_parsed += value_string_byte_length
header_map[header_name.decode("utf-8").lstrip(":")] = \
header_value.decode("utf-8").lstrip(":")
header_map[header_name.decode(
"utf-8").lstrip(":")] = header_value.decode("utf-8").lstrip(":")
return header_map


def _parse_stats(stats):
"""
Parses stats XML and populates the stat dict.
Expand All @@ -86,12 +85,14 @@ def _parse_stats(stats):

return stat


class SelectObjectReader(object):
"""
SelectObjectReader returns a Reader that upon read
returns queried data, but stops when the response ends.
LimitedRandomReader is compatible with BufferedIOBase.
"""

def __init__(self, response):
self.response = response
self.remaining_bytes = bytes()
Expand Down Expand Up @@ -121,12 +122,12 @@ def __extract_message(self):

crc_bytes = io.BytesIO()
total_bytes_len = self.response.read(4)
if len(total_bytes_len) == 0:
if not total_bytes_len:
return {}

total_length = byte_int(total_bytes_len)
header_bytes_len = self.response.read(4)
if len(header_bytes_len) == 0:
if not header_bytes_len:
return {}

header_len = byte_int(header_bytes_len)
Expand All @@ -145,9 +146,9 @@ def __extract_message(self):
crc_bytes.write(prelude_bytes_crc)

header_bytes = self.response.read(header_len)
if len(header_bytes) == 0:
if not header_bytes:
raise SelectMessageError(
"Premature truncation of select message header"+
"Premature truncation of select message header" +
", server is sending corrupt message?")

crc_bytes.write(header_bytes)
Expand All @@ -156,39 +157,36 @@ def __extract_message(self):
payload_length = total_length - header_len - int(16)
payload_bytes = b''
event_type = header_map["event-type"]

if header_map["message-type"] == ERROR:
raise SelectMessageError(
header_map["error-code"] + ":\"" + \
header_map["error-code"] + ":\"" +
header_map["error-message"] + "\"")
elif header_map["message-type"] == EVENT:
if event_type == EVENT_END:
pass
elif event_type == EVENT_CONT:
pass
elif event_type == EVENT_STATS:
content_type = header_map["content-type"]
if content_type != EVENT_CONTENT_TYPE:
raise SelectMessageError(
"Unrecognized content-type {0}".format(content_type))
else:
payload_bytes = self.response.read(payload_length)
self.stat = _parse_stats(payload_bytes)

elif event_type == EVENT_RECORDS:
payload_bytes = self.response.read(payload_length)
else:

if header_map["message-type"] != EVENT:
raise SelectMessageError(
"Unrecognized message-type {0}".format(header_map["message-type"])
"Unrecognized message-type {0}".format(
header_map["message-type"])
)

if event_type == EVENT_STATS:
content_type = header_map["content-type"]
if content_type != EVENT_CONTENT_TYPE:
raise SelectMessageError(
"Unrecognized content-type {0}".format(content_type))

payload_bytes = self.response.read(payload_length)
self.stat = _parse_stats(payload_bytes)
elif event_type == EVENT_RECORDS:
payload_bytes = self.response.read(payload_length)

crc_bytes.write(payload_bytes)

message_crc = self.response.read(4)
if len(message_crc) == 0:
if not message_crc:
return {}

if not validate_crc(crc_bytes.getvalue(),
message_crc):
if not validate_crc(crc_bytes.getvalue(), message_crc):
raise SelectCRCValidationError(
{"Checksum Mismatch, MessageCRC of " +
str(calculate_crc(crc_bytes.getvalue())) +
Expand All @@ -207,14 +205,13 @@ def stream(self, num_bytes=32*1024):
caller should call self.close() to close the stream.
"""
while not self.response.isclosed():
if len(self.remaining_bytes) == 0:
if not self.remaining_bytes:
message = self.__extract_message()
if EVENT_RECORDS in message:
self.remaining_bytes = message.get(EVENT_RECORDS, b'')
else:
# For all other events continue
if EVENT_RECORDS not in message:
continue

self.remaining_bytes = message.get(EVENT_RECORDS, b'')

result = self.remaining_bytes
if num_bytes < len(self.remaining_bytes):
result = self.remaining_bytes[:num_bytes]
Expand Down

0 comments on commit 45aa187

Please sign in to comment.