From bffbf5b234d12fd248257665cb9962ebe7e17518 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 24 May 2023 13:51:14 -0700 Subject: [PATCH] Converted usages of Py_BEGIN_ALLOW_THREADS --- src/consumer.cc | 23 +++++++++-------------- src/producer.cc | 6 +++--- src/reader.cc | 18 ++++++++---------- 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/src/consumer.cc b/src/consumer.cc index 67d2daa..7b5174c 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -34,19 +34,17 @@ Message Consumer_receive(Consumer& consumer) { Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) { Message msg; - Result res; - Py_BEGIN_ALLOW_THREADS res = consumer.receive(msg, timeoutMs); - Py_END_ALLOW_THREADS - CHECK_RESULT(res); + py::gil_scoped_release release; + CHECK_RESULT(consumer.receive(msg, timeoutMs)); return msg; } Messages Consumer_batch_receive(Consumer& consumer) { Messages msgs; - Result res; - Py_BEGIN_ALLOW_THREADS res = consumer.batchReceive(msgs); - Py_END_ALLOW_THREADS CHECK_RESULT(res); + + py::gil_scoped_release release; + CHECK_RESULT(consumer.batchReceive(msgs)); return msgs; } @@ -59,8 +57,8 @@ void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) } void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) { - Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg); - Py_END_ALLOW_THREADS + py::gil_scoped_release release; + consumer.negativeAcknowledge(msg); } void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { @@ -97,11 +95,8 @@ bool Consumer_is_connected(Consumer& consumer) { return consumer.isConnected(); MessageId Consumer_get_last_message_id(Consumer& consumer) { MessageId msgId; - Result res; - Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId); - Py_END_ALLOW_THREADS - - CHECK_RESULT(res); + py::gil_scoped_release release; + CHECK_RESULT(consumer.getLastMessageId(msgId)); return msgId; } diff --git a/src/producer.cc b/src/producer.cc index 7027185..f092a0f 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -30,10 +30,10 @@ MessageId Producer_send(Producer& producer, const Message& message) { } void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) { - Py_BEGIN_ALLOW_THREADS producer.sendAsync(msg, callback); - Py_END_ALLOW_THREADS + py::gil_scoped_release release; + producer.sendAsync(msg, callback); - if (PyErr_CheckSignals() == -1) { + if (PyErr_CheckSignals() == -1) { PyErr_SetInterrupt(); } } diff --git a/src/reader.cc b/src/reader.cc index 0126f3f..da15322 100644 --- a/src/reader.cc +++ b/src/reader.cc @@ -28,13 +28,13 @@ Message Reader_readNext(Reader& reader) { // TODO: There is currently no readNextAsync() version for the Reader. // Once that's available, we should also convert these ad-hoc loops. while (true) { - Py_BEGIN_ALLOW_THREADS - // Use 100ms timeout to periodically check whether the - // interpreter was interrupted - res = reader.readNext(msg, 100); - Py_END_ALLOW_THREADS + py::gil_scoped_release release; - if (res != ResultTimeout) { + // Use 100ms timeout to periodically check whether the + // interpreter was interrupted + res = reader.readNext(msg, 100); + + if (res != ResultTimeout) { // In case of timeout we keep calling receive() to simulate a // blocking call until a message is available, while breaking // every once in a while to check the Python signal status @@ -53,11 +53,9 @@ Message Reader_readNext(Reader& reader) { Message Reader_readNextTimeout(Reader& reader, int timeoutMs) { Message msg; - Result res; - Py_BEGIN_ALLOW_THREADS res = reader.readNext(msg, timeoutMs); - Py_END_ALLOW_THREADS + py::gil_scoped_release release; + CHECK_RESULT(reader.readNext(msg, timeoutMs)); - CHECK_RESULT(res); return msg; }