Skip to content

Commit

Permalink
Merge pull request #3029 from sigiesec/fix-poller-context-shutdown
Browse files Browse the repository at this point in the history
Fix assertion failure when calling zmq_poller_destroy after zmq_ctx_term
  • Loading branch information
bluca authored Mar 28, 2018
2 parents ef6162a + f571c22 commit 2aa54d6
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 7 deletions.
5 changes: 5 additions & 0 deletions src/socket_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ bool zmq::socket_base_t::check_tag ()
return tag == 0xbaddecaf;
}

bool zmq::socket_base_t::is_thread_safe () const
{
return thread_safe;
}

zmq::socket_base_t *zmq::socket_base_t::create (int type_,
class ctx_t *parent_,
uint32_t tid_,
Expand Down
3 changes: 3 additions & 0 deletions src/socket_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class socket_base_t : public own_t,
// Returns false if object is not a socket.
bool check_tag ();

// Returns whether the socket is thread-safe.
bool is_thread_safe () const;

// Create a socket of a specified type.
static socket_base_t *
create (int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_);
Expand Down
9 changes: 2 additions & 7 deletions src/socket_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,8 @@

static bool is_thread_safe (zmq::socket_base_t &socket)
{
int thread_safe;
size_t thread_safe_size = sizeof (int);

int rc =
socket.getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size);
zmq_assert (rc == 0);
return thread_safe;
// do not use getsockopt here, since that would fail during context termination
return socket.is_thread_safe ();
}

zmq::socket_poller_t::socket_poller_t () :
Expand Down
87 changes: 87 additions & 0 deletions tests/test_ctx_destroy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,87 @@ void test_zmq_ctx_shutdown_null_fails ()
TEST_ASSERT_EQUAL_INT (EFAULT, errno);
}

#ifdef ZMQ_HAVE_POLLER
struct poller_test_data_t
{
int socket_type;
void *ctx;
void *counter;
};

void run_poller (void *data_)
{
struct poller_test_data_t *poller_test_data =
(struct poller_test_data_t *) data_;

void *socket =
zmq_socket (poller_test_data->ctx, poller_test_data->socket_type);
TEST_ASSERT_NOT_NULL (socket);

void *poller = zmq_poller_new ();
TEST_ASSERT_NOT_NULL (poller);

TEST_ASSERT_SUCCESS_ERRNO (
zmq_poller_add (poller, socket, NULL, ZMQ_POLLIN));

zmq_atomic_counter_set (poller_test_data->counter, 1);

zmq_poller_event_t event;
TEST_ASSERT_FAILURE_ERRNO (ETERM, zmq_poller_wait (poller, &event, -1));

TEST_ASSERT_SUCCESS_ERRNO (zmq_poller_destroy (&poller));

// Close the socket
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (socket));
}
#endif

void test_poller_exists_with_socket_on_zmq_ctx_term (const int socket_type)
{
#ifdef ZMQ_HAVE_POLLER
struct poller_test_data_t poller_test_data;

poller_test_data.socket_type = socket_type;

// Set up our context and sockets
poller_test_data.ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (poller_test_data.ctx);

poller_test_data.counter = zmq_atomic_counter_new ();
TEST_ASSERT_NOT_NULL (poller_test_data.counter);

void *thread = zmq_threadstart (run_poller, &poller_test_data);
TEST_ASSERT_NOT_NULL (thread);

while (zmq_atomic_counter_value (poller_test_data.counter) == 0) {
msleep (10);
}

// Destroy the context
TEST_ASSERT_SUCCESS_ERRNO (zmq_ctx_destroy (poller_test_data.ctx));

zmq_threadclose (thread);

zmq_atomic_counter_destroy (&poller_test_data.counter);
#else
TEST_IGNORE_MESSAGE ("libzmq without zmq_poller_* support, ignoring test");
#endif
}

void test_poller_exists_with_socket_on_zmq_ctx_term_thread_safe_socket ()
{
#ifdef ZMQ_BUILD_DRAFT_API
test_poller_exists_with_socket_on_zmq_ctx_term (ZMQ_CLIENT);
#else
TEST_IGNORE_MESSAGE ("libzmq without DRAFT support, ignoring test");
#endif
}

void test_poller_exists_with_socket_on_zmq_ctx_term_non_thread_safe_socket ()
{
test_poller_exists_with_socket_on_zmq_ctx_term (ZMQ_DEALER);
}

int main (void)
{
setup_test_environment ();
Expand All @@ -122,5 +203,11 @@ int main (void)
RUN_TEST (test_zmq_ctx_term_null_fails);
RUN_TEST (test_zmq_term_null_fails);
RUN_TEST (test_zmq_ctx_shutdown_null_fails);

RUN_TEST (
test_poller_exists_with_socket_on_zmq_ctx_term_non_thread_safe_socket);
RUN_TEST (
test_poller_exists_with_socket_on_zmq_ctx_term_thread_safe_socket);

return UNITY_END ();
}

0 comments on commit 2aa54d6

Please sign in to comment.