Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One exception and some nice-to-have changes #121

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aioamqp/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ def basic_deliver(self, frame):
@asyncio.coroutine
def server_basic_cancel(self, frame):
"""From the server, means the server won't send anymore messages to this consumer."""
consumer_tag = frame.arguments['consumer_tag']
consumer_tag = frame.arguments.get('consumer_tag',None)
self.cancelled_consumers.add(consumer_tag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what to do if we get None here ? (it misses a space between , and None)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Probably cancel all consumers …

Copy link
Contributor

@mwfrojdman mwfrojdman Nov 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you not get a consumer tag with a basic.cancel frame? It's always there

EDIT: Actually it's a parameter, not an argument, so frame.payload_decoder.read_xyz()should be used to read it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK … can you prepare a patch that actually works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is now fixed in commit 893f196.

logger.info("consume cancelled received")

Expand Down
13 changes: 12 additions & 1 deletion aioamqp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ def connection_made(self, transport):
self._stream_writer = _StreamWriter(transport, self, self._stream_reader, self._loop)

def connection_lost(self, exc):
logger.warning("Connection lost exc=%r", exc)
if exc is None:
logger.debug("Connection lost")
else:
logger.warning("Connection lost", exc_info=exc)
self.connection_closed.set()
self.is_open = False
self._close_channels(exception=exc)
Expand Down Expand Up @@ -199,6 +202,14 @@ def start_connection(self, host, port, login, password, virtualhost, ssl=False,

# for now, we read server's responses asynchronously
self.worker = ensure_future(self.run(), loop=self._loop)
def done_work(w):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's interesting !

try:
w.result()
except asyncio.CancelledError:
pass
except Exception as exc:
logger.exception("Worker died", exc_info=exc)
self.worker.add_done_callback(done_work)

def stop(self):
self.is_open = False
Expand Down
8 changes: 6 additions & 2 deletions aioamqp/tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class AmqpConnectionTestCase(testcase.RabbitTestCase, unittest.TestCase):

@testing.coroutine
def test_connect(self):
_transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop)
_transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop,
host=self.host,port=self.port,login=self.login,password=self.password)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't forget the spaces :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bah. Will try to remember next time :-P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just created a branch with a pylint test ;)

any suggestion ? @RemiCardona @smurfix #122

self.assertTrue(proto.is_open)
self.assertIsNotNone(proto.server_properties)
yield from proto.close()
Expand All @@ -29,6 +30,8 @@ def test_connect_tuning(self):
channel_max=channel_max,
frame_max=frame_max,
heartbeat=heartbeat,
host=self.host, port=self.port,
login=self.login, password=self.password,
)
self.assertTrue(proto.is_open)
self.assertIsNotNone(proto.server_properties)
Expand All @@ -47,7 +50,8 @@ def test_connect_tuning(self):

@testing.coroutine
def test_socket_nodelay(self):
transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop)
transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop,
host=self.host,port=self.port, login=self.login,password=self.password)
sock = transport.get_extra_info('socket')
opt_val = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
self.assertEqual(opt_val, 1)
Expand Down
11 changes: 8 additions & 3 deletions aioamqp/tests/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class ProtocolTestCase(testcase.RabbitTestCase, unittest.TestCase):

@testing.coroutine
def test_connect(self):
_transport, protocol = yield from amqp_connect(virtualhost=self.vhost, loop=self.loop)
_transport, protocol = yield from amqp_connect(virtualhost=self.vhost, loop=self.loop,
host=self.host,port=self.port, login=self.login,password=self.password)
self.assertTrue(protocol.is_open)
yield from protocol.close()

Expand All @@ -33,6 +34,8 @@ def test_connect_products_info(self):
virtualhost=self.vhost,
client_properties=client_properties,
loop=self.loop,
host=self.host,port=self.port,
login=self.login,password=self.password
)

self.assertEqual(protocol.client_properties, client_properties)
Expand All @@ -41,11 +44,13 @@ def test_connect_products_info(self):
@testing.coroutine
def test_connection_unexistant_vhost(self):
with self.assertRaises(exceptions.AmqpClosedConnection):
yield from amqp_connect(virtualhost='/unexistant', loop=self.loop)
yield from amqp_connect(virtualhost='/unexistant', loop=self.loop,
host=self.host,port=self.port, login=self.login,password=self.password)

def test_connection_wrong_login_password(self):
with self.assertRaises(exceptions.AmqpClosedConnection):
self.loop.run_until_complete(amqp_connect(login='wrong', password='wrong', loop=self.loop))
self.loop.run_until_complete(amqp_connect(login='wrong', password='wrong', loop=self.loop,
host=self.host,port=self.port))

@testing.coroutine
def test_connection_from_url(self):
Expand Down
9 changes: 6 additions & 3 deletions aioamqp/tests/testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ def setUp(self):
super().setUp()
self.host = os.environ.get('AMQP_HOST', 'localhost')
self.port = os.environ.get('AMQP_PORT', 5672)
self.login = os.environ.get('AMQP_USER', 'guest')
self.password = os.environ.get('AMQP_PASS', 'guest')
self.mgr_port = os.environ.get('AMQP_MGR_PORT', 15672)
self.vhost = os.environ.get('AMQP_VHOST', self.VHOST + str(uuid.uuid4()))
self.http_client = pyrabbit.api.Client('localhost:15672/api/', 'guest', 'guest')
self.http_client = pyrabbit.api.Client('%s:%d/api/' % (self.host,self.mgr_port), self.login,self.password)

self.amqps = []
self.channels = []
Expand All @@ -101,7 +104,7 @@ def reset_vhost(self):

self.http_client.create_vhost(self.vhost)
self.http_client.set_vhost_permissions(
vname=self.vhost, username='guest', config='.*', rd='.*', wr='.*',
vname=self.vhost, username=self.login, config='.*', rd='.*', wr='.*',
)

@asyncio.coroutine
Expand Down Expand Up @@ -285,7 +288,7 @@ def protocol_factory(*args, **kw):
return ProxyAmqpProtocol(self, *args, **kw)
vhost = vhost or self.vhost
transport, protocol = yield from aioamqp_connect(host=self.host, port=self.port, virtualhost=vhost,
protocol_factory=protocol_factory, loop=self.loop)
protocol_factory=protocol_factory, loop=self.loop, login=self.login, password=self.password)
self.amqps.append(protocol)
self.transports.append(transport)
return transport, protocol