Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix socket bind collisions in manager_tests, race condition bind/listen/subscribe and memory leaks #18

Merged
merged 10 commits into from
Nov 2, 2016
Merged
11 changes: 9 additions & 2 deletions src/common/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ $(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ $^ $(CFLAGS)

$(BUILD)/task_tests: test/task_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ $^ $(CFLAGS)
$(CC) -o $@ $^ thirdparty/hiredis/libhiredis.a $(CFLAGS)

$(BUILD)/redis_tests: hiredis test/redis_tests.c $(BUILD)/libcommon.a logging.h
$(CC) -o $@ test/redis_tests.c logging.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)
Expand All @@ -40,7 +40,14 @@ hiredis:

test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
./thirdparty/redis-3.2.3/src/redis-server &
sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/task_log_tests ; ./build/object_table_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests
sleep 1s
./build/common_tests
./build/db_tests
./build/io_tests
./build/task_tests
./build/redis_tests
./build/task_log_tests
./build/object_table_tests

valgrind: test
valgrind --leak-check=full --error-exitcode=1 ./build/common_tests
Expand Down
6 changes: 0 additions & 6 deletions src/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@

#define UNIQUE_ID_SIZE 20

/* Cleanup method for running tests with the greatest library.
* Runs the test, then clears the Redis database. */
#define RUN_REDIS_TEST(context, test) \
RUN_TEST(test); \
freeReplyObject(redisCommand(context, "FLUSHALL"));

typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id;

extern const unique_id NIL_ID;
Expand Down
10 changes: 6 additions & 4 deletions src/common/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
* write_message and read_message methods.
*
* @param port The port to bind to.
* @param shall_listen Are we also starting to listen on the socket?
* @return A non-blocking file descriptor for the socket, or -1 if an error
* occurs.
*/
int bind_inet_sock(const int port) {
int bind_inet_sock(const int port, bool shall_listen) {
struct sockaddr_in name;
int socket_fd = socket(PF_INET, SOCK_STREAM, 0);
if (socket_fd < 0) {
Expand All @@ -55,7 +56,7 @@ int bind_inet_sock(const int port) {
close(socket_fd);
return -1;
}
if (listen(socket_fd, 5) == -1) {
if (shall_listen && listen(socket_fd, 5) == -1) {
LOG_ERR("Could not listen to socket %d", port);
close(socket_fd);
return -1;
Expand All @@ -68,10 +69,11 @@ int bind_inet_sock(const int port) {
* pathname. Removes any existing file at the pathname.
*
* @param socket_pathname The pathname for the socket.
* @param shall_listen Are we also starting to listen on the socket?
* @return A blocking file descriptor for the socket, or -1 if an error
* occurs.
*/
int bind_ipc_sock(const char *socket_pathname) {
int bind_ipc_sock(const char *socket_pathname, bool shall_listen) {
struct sockaddr_un socket_address;
int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (socket_fd < 0) {
Expand Down Expand Up @@ -104,7 +106,7 @@ int bind_ipc_sock(const char *socket_pathname) {
close(socket_fd);
return -1;
}
if (listen(socket_fd, 5) == -1) {
if (shall_listen && listen(socket_fd, 5) == -1) {
LOG_ERR("Could not listen to socket %s", socket_pathname);
close(socket_fd);
return -1;
Expand Down
7 changes: 4 additions & 3 deletions src/common/io.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef IO_H
#define IO_H

#include <stdbool.h>
#include <stdint.h>

enum common_message_type {
Expand All @@ -14,8 +15,8 @@ enum common_message_type {

/* Helper functions for socket communication. */

int bind_inet_sock(const int port);
int bind_ipc_sock(const char *socket_pathname);
int bind_inet_sock(const int port, bool shall_listen);
int bind_ipc_sock(const char *socket_pathname, bool shall_listen);
int connect_ipc_sock(const char *socket_pathname);

int accept_client(int socket_fd);
Expand All @@ -29,4 +30,4 @@ void write_log_message(int fd, char *message);
void write_formatted_log_message(int fd, const char *format, ...);
char *read_log_message(int fd);

#endif
#endif /* IO_H */
13 changes: 5 additions & 8 deletions src/common/test/db_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <sys/wait.h>

#include "event_loop.h"
#include "test/example_task.h"
#include "test_common.h"
#include "state/db.h"
#include "state/object_table.h"
#include "state/task_log.h"
Expand Down Expand Up @@ -197,13 +197,10 @@ TEST unique_client_id_test(void) {
}

SUITE(db_tests) {
redisContext *context = redisConnect("127.0.0.1", 6379);
freeReplyObject(redisCommand(context, "FLUSHALL"));
RUN_REDIS_TEST(context, object_table_lookup_test);
RUN_REDIS_TEST(context, task_log_test);
RUN_REDIS_TEST(context, task_log_all_test);
RUN_REDIS_TEST(context, unique_client_id_test);
redisFree(context);
RUN_REDIS_TEST(object_table_lookup_test);
RUN_REDIS_TEST(task_log_test);
RUN_REDIS_TEST(task_log_all_test);
RUN_REDIS_TEST(unique_client_id_test);
}

GREATEST_MAIN_DEFS();
Expand Down
4 changes: 2 additions & 2 deletions src/common/test/io_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ SUITE(io_tests);

TEST ipc_socket_test(void) {
const char *socket_pathname = "test-socket";
int socket_fd = bind_ipc_sock(socket_pathname);
int socket_fd = bind_ipc_sock(socket_pathname, true);
ASSERT(socket_fd >= 0);

char *test_string = "hello world";
Expand Down Expand Up @@ -50,7 +50,7 @@ TEST ipc_socket_test(void) {

TEST long_ipc_socket_test(void) {
const char *socket_pathname = "long-test-socket";
int socket_fd = bind_ipc_sock(socket_pathname);
int socket_fd = bind_ipc_sock(socket_pathname, true);
ASSERT(socket_fd >= 0);

UT_string *test_string;
Expand Down
2 changes: 1 addition & 1 deletion src/common/test/object_table_tests.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "greatest.h"

#include "event_loop.h"
#include "example_task.h"
#include "test_common.h"
#include "common.h"
#include "state/object_table.h"
#include "state/redis.h"
Expand Down
16 changes: 7 additions & 9 deletions src/common/test/redis_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "state/redis.h"
#include "io.h"
#include "logging.h"
#include "test_common.h"

SUITE(redis_tests);

Expand Down Expand Up @@ -40,7 +41,7 @@ TEST redis_socket_test(void) {
const char *socket_pathname = "redis-test-socket";
redisContext *context = redisConnect("127.0.0.1", 6379);
ASSERT(context != NULL);
int socket_fd = bind_ipc_sock(socket_pathname);
int socket_fd = bind_ipc_sock(socket_pathname, true);
ASSERT(socket_fd >= 0);

int client_fd = connect_ipc_sock(socket_pathname);
Expand Down Expand Up @@ -97,7 +98,7 @@ TEST async_redis_socket_test(void) {

/* Start IPC channel. */
const char *socket_pathname = "async-redis-test-socket";
int socket_fd = bind_ipc_sock(socket_pathname);
int socket_fd = bind_ipc_sock(socket_pathname, true);
ASSERT(socket_fd >= 0);
utarray_push_back(connections, &socket_fd);

Expand Down Expand Up @@ -171,7 +172,7 @@ TEST logging_test(void) {

/* Start IPC channel. */
const char *socket_pathname = "logging-test-socket";
int socket_fd = bind_ipc_sock(socket_pathname);
int socket_fd = bind_ipc_sock(socket_pathname, true);
ASSERT(socket_fd >= 0);
utarray_push_back(connections, &socket_fd);

Expand Down Expand Up @@ -208,12 +209,9 @@ TEST logging_test(void) {
}

SUITE(redis_tests) {
redisContext *context = redisConnect("127.0.0.1", 6379);
freeReplyObject(redisCommand(context, "FLUSHALL"));
RUN_REDIS_TEST(context, redis_socket_test);
RUN_REDIS_TEST(context, async_redis_socket_test);
RUN_REDIS_TEST(context, logging_test);
redisFree(context);
RUN_REDIS_TEST(redis_socket_test);
RUN_REDIS_TEST(async_redis_socket_test);
RUN_REDIS_TEST(logging_test);
}

GREATEST_MAIN_DEFS();
Expand Down
2 changes: 1 addition & 1 deletion src/common/test/task_log_tests.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "greatest.h"

#include "event_loop.h"
#include "example_task.h"
#include "test_common.h"
#include "common.h"
#include "state/object_table.h"
#include "state/redis.h"
Expand Down
2 changes: 1 addition & 1 deletion src/common/test/task_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <sys/socket.h>

#include "common.h"
#include "test/example_task.h"
#include "test_common.h"
#include "task.h"
#include "io.h"

Expand Down
22 changes: 19 additions & 3 deletions src/common/test/example_task.h → src/common/test/test_common.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#ifndef EXAMPLE_TASK_H
#define EXAMPLE_TASK_H
#ifndef TEST_COMMON_H
#define TEST_COMMON_H

#include "hiredis/hiredis.h"

#include "task.h"

Expand All @@ -20,4 +22,18 @@ task_instance *example_task_instance(void) {
return instance;
}

#endif
/* Flush redis. */
void flushall_redis() {
redisContext *context = redisConnect("127.0.0.1", 6379);
freeReplyObject(redisCommand(context, "FLUSHALL"));
redisFree(context);
}

/* Cleanup method for running tests with the greatest library.
* Runs the test, then clears the Redis database. */
#define RUN_REDIS_TEST(test) \
flushall_redis(); \
RUN_TEST(test); \
flushall_redis();

#endif /* TEST_COMMON */
2 changes: 1 addition & 1 deletion src/photon/photon_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void start_server(const char *socket_name,
const char *redis_addr,
int redis_port,
const char *plasma_socket_name) {
int fd = bind_ipc_sock(socket_name);
int fd = bind_ipc_sock(socket_name, true);
event_loop *loop = event_loop_create();
g_state =
init_local_scheduler(loop, redis_addr, redis_port, plasma_socket_name);
Expand Down
19 changes: 13 additions & 6 deletions src/plasma/plasma_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -903,17 +903,24 @@ void start_server(const char *store_socket_name,
int port,
const char *db_addr,
int db_port) {
g_manager_state = init_plasma_manager_state(store_socket_name, master_addr,
port, db_addr, db_port);
CHECK(g_manager_state);

int remote_sock = bind_inet_sock(port);
/* Bind the sockets before we try to connect to the plasma store.
* In case the bind does not succeed, we want to be able to exit
* without breaking the pipe to the store. */
int remote_sock = bind_inet_sock(port, false);
if (remote_sock < 0) {
exit(EXIT_COULD_NOT_BIND_PORT);
}
int local_sock = bind_ipc_sock(manager_socket_name);

int local_sock = bind_ipc_sock(manager_socket_name, false);
CHECKM(local_sock >= 0, "Unable to bind local manager socket");

g_manager_state = init_plasma_manager_state(store_socket_name, master_addr,
port, db_addr, db_port);
CHECK(g_manager_state);

CHECK(listen(remote_sock, 5) != -1);
CHECK(listen(local_sock, 5) != -1);

LOG_DEBUG("Started server connected to store %s, listening on port %d",
store_socket_name, port);
event_loop_add_file(g_manager_state->loop, local_sock, EVENT_LOOP_READ,
Expand Down
2 changes: 1 addition & 1 deletion src/plasma/plasma_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ void signal_handler(int signal) {
void start_server(char *socket_name) {
event_loop *loop = event_loop_create();
plasma_store_state *state = init_plasma_store(loop);
int socket = bind_ipc_sock(socket_name);
int socket = bind_ipc_sock(socket_name, true);
CHECK(socket >= 0);
event_loop_add_file(loop, socket, EVENT_LOOP_READ, new_client_connection,
state);
Expand Down
Loading