From b043fbd3303272580f54d5aa89932384ec5fb973 Mon Sep 17 00:00:00 2001 From: Floris van Nee Date: Mon, 4 Nov 2019 13:28:42 +0100 Subject: [PATCH] Support PgBouncer by sending only a single SYNC message per query This is a simple implementation of PgBouncer support for asyncpg. It doesn't have any detection features, but at least changes asyncpg behavior in such a way that using PgBouncer is possible. This commit gets rid of the explicit SYNC after a parse/describe sequence and changes is to a FLUSH. This should work regardless of the setting of statement_cache_size and whether or not it's pgbouncer or a direct postgres connection. With this, PgBouncer is supported when setting statement_cache_size explicitly to 0. --- asyncpg/exceptions/_base.py | 3 ++- asyncpg/protocol/coreproto.pyx | 18 ++++++++++++------ asyncpg/protocol/protocol.pyx | 7 +++++++ 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/asyncpg/exceptions/_base.py b/asyncpg/exceptions/_base.py index 3e6ef812..6b068f2f 100644 --- a/asyncpg/exceptions/_base.py +++ b/asyncpg/exceptions/_base.py @@ -142,7 +142,8 @@ def _make_constructor(cls, fields, query=None): purpose; * if you have no option of avoiding the use of pgbouncer, - then you must switch pgbouncer's pool_mode to "session". + then you can set statement_cache_size to 0 when creating + the asyncpg connection object. """) dct['hint'] = hint diff --git a/asyncpg/protocol/coreproto.pyx b/asyncpg/protocol/coreproto.pyx index a44bc5ad..fdc26ec6 100644 --- a/asyncpg/protocol/coreproto.pyx +++ b/asyncpg/protocol/coreproto.pyx @@ -118,6 +118,7 @@ cdef class CoreProtocol: self.result = apg_exc.InternalClientError( 'unknown error in protocol implementation') + self._parse_msg_ready_for_query() self._push_result() else: @@ -187,19 +188,23 @@ cdef class CoreProtocol: elif mtype == b'T': # Row description self.result_row_desc = self.buffer.consume_message() + self._push_result() elif mtype == b'E': # ErrorResponse self._parse_msg_error_response(True) - - elif mtype == b'Z': - # ReadyForQuery - self._parse_msg_ready_for_query() - self._push_result() + # we don't send a sync during the parse/describe sequence + # but send a FLUSH instead. If an error happens we need to + # send a SYNC explicitly in order to mark the end of the transaction. + # this effectively clears the error and we then wait until we get a + # ready for new query message + self._write(SYNC_MESSAGE) + self.state = PROTOCOL_ERROR_CONSUME elif mtype == b'n': # NoData self.buffer.discard_message() + self._push_result() cdef _process__bind_execute(self, char mtype): if mtype == b'D': @@ -853,7 +858,7 @@ cdef class CoreProtocol: buf.end_message() packet.write_buffer(buf) - packet.write_bytes(SYNC_MESSAGE) + packet.write_bytes(FLUSH_MESSAGE) self._write(packet) @@ -1028,3 +1033,4 @@ cdef class CoreProtocol: cdef bytes SYNC_MESSAGE = bytes(WriteBuffer.new_message(b'S').end_message()) +cdef bytes FLUSH_MESSAGE = bytes(WriteBuffer.new_message(b'H').end_message()) diff --git a/asyncpg/protocol/protocol.pyx b/asyncpg/protocol/protocol.pyx index ac653bd0..8ae000ce 100644 --- a/asyncpg/protocol/protocol.pyx +++ b/asyncpg/protocol/protocol.pyx @@ -588,6 +588,13 @@ cdef class BaseProtocol(CoreProtocol): }) self.abort() + if self.state == PROTOCOL_PREPARE: + # we need to send a SYNC to server if we cancel during the PREPARE phase + # because the PREPARE sequence does not send a SYNC itself. + # we cannot send this extra SYNC if we are not in PREPARE phase, + # because then we would issue two SYNCs and we would get two ReadyForQuery + # replies, which our current state machine implementation cannot handle + self._write(SYNC_MESSAGE) self._set_state(PROTOCOL_CANCELLED) def _on_timeout(self, fut):