Skip to content

Commit

Permalink
Async: remove support for binary responses.
Browse files Browse the repository at this point in the history
They're not used and simplifying the protocol may help towards
restructuring the IO model underlying CMS.
  • Loading branch information
giomasce committed Jun 1, 2013
1 parent e0927d8 commit a382a2d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 98 deletions.
91 changes: 23 additions & 68 deletions cms/async/AsyncLibrary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-

# Programming contest management system
# Copyright © 2010-2012 Giovanni Mascellani <mascellani@poisson.phc.unipi.it>
# Copyright © 2010-2013 Giovanni Mascellani <mascellani@poisson.phc.unipi.it>
# Copyright © 2010-2012 Stefano Maggiolo <s.maggiolo@gmail.com>
# Copyright © 2010-2012 Matteo Boscariol <boscarim@hotmail.com>
#
Expand Down Expand Up @@ -38,8 +38,7 @@

from cms.async import ServiceCoord, Address, get_service_address
from cms.async.Utils import random_string, Logger, \
encode_binary, encode_length, encode_json, \
decode_binary, decode_length, decode_json
encode_length, encode_json, decode_length, decode_json


# Our logger object - can be a standard one (provided in Utils), or a
Expand Down Expand Up @@ -79,15 +78,6 @@ def rpc_method(func):
return func


def rpc_binary_response(func):
"""Decorator for a RPC method that wants its response to be
treated as a binary string.
"""
func.binary_response = True
return func


def rpc_threaded(func):
"""Decorator for a RPC method that we want to execute in a
separate thread.
Expand Down Expand Up @@ -387,7 +377,6 @@ def method_info(self, method_name):

res = {}
res["callable"] = hasattr(method, "rpc_callable")
res["binary_response"] = hasattr(method, "binary_response")
res["threaded"] = hasattr(method, "threaded")

return res
Expand Down Expand Up @@ -429,8 +418,7 @@ class ThreadedRPC(threading.Thread):
it can.
"""
def __init__(self, remote_service, message,
response, binary_response):
def __init__(self, remote_service, message, response):
"""Initialize the thread.
remote_service (RemoteService): the service that called our
Expand All @@ -440,15 +428,12 @@ def __init__(self, remote_service, message,
right parameters.
response (dict): the partial reply (to be integrated with the
actual value returned by the rpc method).
binary_response (bool): if the method is supposed to return a
binary string.
"""
threading.Thread.__init__(self)
self.remote_service = remote_service
self.service = self.remote_service.service
self.response = response
self.binary_response = binary_response
self.message = message

def run(self):
Expand All @@ -465,16 +450,14 @@ def run(self):
self.response["__error"] = "%s: %s\n%s" % \
(exception.__class__.__name__, exception,
traceback.format_exc())
self.binary_response = False
method_response = None

# And we put the response in the bucket, waiting for the main
# thread to deliver it.
self.service._threaded_responses_lock.acquire()
self.service._threaded_responses.append((self.remote_service,
(self.response,
method_response,
self.binary_response)))
method_response)))
self.service._threaded_responses_lock.release()


Expand Down Expand Up @@ -544,14 +527,11 @@ def found_terminator(self):
# We decode the arriving data
try:
json_length = decode_length(data[:4])
if len(data) != json_length + 4:
logger.warning("Incoming message with binary data aren't "
"supported anymore, discarding.")
return
message = decode_json(data[4:json_length + 4])
if len(data) > json_length + 4:
if message["__data"] is None:
message["__data"] = \
decode_binary(data[json_length + 4:])
else:
message["__data"]["binary_data"] = \
decode_binary(data[json_length + 4:])
except:
logger.warning("Cannot understand incoming message, discarding.")
return
Expand All @@ -568,20 +548,18 @@ def found_terminator(self):
# We find the properties of the called rpc method.
try:
method_info = self.service.method_info(message["__method"])
binary_response = method_info["binary_response"]
threaded = method_info["threaded"]
except KeyError as exception:
response["__error"] = "%s: %s\n%s" % \
(exception.__class__.__name__, exception,
traceback.format_exc())
binary_response = False
method_response = None
threaded = False

# If the rpc method is threaded, then we start the thread
# and return immediately.
if threaded:
thread = ThreadedRPC(self, message, response, binary_response)
thread = ThreadedRPC(self, message, response)
thread.start()
return

Expand All @@ -593,9 +571,8 @@ def found_terminator(self):
response["__error"] = "%s: %s\n%s" % \
(exception.__class__.__name__, exception,
traceback.format_exc())
binary_response = False
method_response = None
self.send_reply(response, method_response, binary_response)
self.send_reply(response, method_response)

# Otherwise, is a response to our rpc call.
else:
Expand All @@ -609,28 +586,22 @@ def found_terminator(self):
else:
logger.warning("No pending request with id %s found." % ident)

def send_reply(self, response, method_response, binary_response):
def send_reply(self, response, method_response):
"""Send back a reply to an rpc call.
response (dict): the metadata of the reply.
method_response (object): the actual returned value.
binary_response (bool): True if method_response is a binary string.
"""
try:
if binary_response:
response["__data"] = None
binary_message = encode_binary(method_response)
else:
response["__data"] = method_response
binary_message = ""
response["__data"] = method_response
json_message = encode_json(response)
json_length = encode_length(len(json_message))
except ValueError as error:
logger.warning("Cannot send response because of " +
"encoding error. %s" % repr(error))
return
self._push_right(json_length + json_message + binary_message)
self._push_right(json_length + json_message)

def execute_rpc(self, method, data, callback=None, plus=None,
timeout=None):
Expand All @@ -645,9 +616,7 @@ def execute_rpc(self, method, data, callback=None, plus=None,
}
The __id field is put by the pre_execute method of
RPCRequest. Also, if in the arguments we have a field
named "binary_data", we send it separatedly as a binary
attachment after the JSON encoded message.
RPCRequest.
method (string): the name of the method to call.
data (object): the object to pass to the remote method.
Expand Down Expand Up @@ -707,29 +676,15 @@ def callback(self, data, error=None):
message = request.pre_execute()

# We encode the request and send it
if "binary_data" not in data:
try:
json_message = encode_json(message)
json_length = encode_length(len(json_message))
binary_message = ""
except ValueError:
msg = "Cannot send request of method %s because of " \
"encoding error." % method
request.complete({"__error": msg})
return
else:
try:
binary_data = data["binary_data"]
del data["binary_data"]
json_message = encode_json(message)
json_length = encode_length(len(json_message))
binary_message = encode_binary(binary_data)
except ValueError:
msg = "Cannot send request of method %s because of " \
"encoding error." % method
request.complete({"__error": msg})
return
ret = self._push_right(json_length + json_message + binary_message)
try:
json_message = encode_json(message)
json_length = encode_length(len(json_message))
except ValueError:
msg = "Cannot send request of method %s because of " \
"encoding error." % method
request.complete({"__error": msg})
return
ret = self._push_right(json_length + json_message)
if not ret:
msg = "Transfer interrupted"
request.complete({"__error": msg})
Expand Down
31 changes: 1 addition & 30 deletions cms/async/Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-

# Programming contest management system
# Copyright © 2010-2012 Giovanni Mascellani <mascellani@poisson.phc.unipi.it>
# Copyright © 2010-2013 Giovanni Mascellani <mascellani@poisson.phc.unipi.it>
# Copyright © 2010-2012 Stefano Maggiolo <s.maggiolo@gmail.com>
# Copyright © 2010-2012 Matteo Boscariol <boscarim@hotmail.com>
#
Expand Down Expand Up @@ -99,35 +99,6 @@ def decode_json(string):
raise ValueError


def encode_binary(string):
"""Encode a string for binary transmission - escape character is
'\\' and we escape '\r' as '\\r', so we can use again '\r\n' as
terminator string.
string (string): the binary string to encode
returns (string): the escaped string
"""
try:
return string.replace('\n', '\\\n')
except:
print >> sys.stderr, "Can't encode binary."
raise ValueError


def decode_binary(string):
"""Decode an escaped string to a usual string.
string (string): the escaped string to decode
return (object): the decoded string
"""
try:
return string.replace('\\\n', '\n')
except:
print >> sys.stderr, "Can't decode binary."
raise ValueError


class Logger:
"""Utility class for simple logging.
Expand Down

0 comments on commit a382a2d

Please sign in to comment.