Skip to content

Commit

Permalink
Merge pull request #2488 from eisenhauer/Upstream
Browse files Browse the repository at this point in the history
EVPath Upstream listen socket leak fix
  • Loading branch information
eisenhauer authored Oct 16, 2020
2 parents 98cd2eb + 532fa31 commit 7263faa
Showing 1 changed file with 119 additions and 82 deletions.
201 changes: 119 additions & 82 deletions thirdparty/EVPath/EVPath/cmsockets.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ typedef struct func_list_item {
typedef struct socket_client_data {
CManager cm;
char *hostname;
int listen_port;
int conn_sock;
int listen_count;
int *listen_fds;
int *listen_ports;
attr_list characteristics;
CMtrans_services svc;
} *socket_client_data_ptr;
Expand Down Expand Up @@ -482,7 +483,10 @@ attr_list conn_attr_list;
#endif

{
int local_listen_port = htons(sd->listen_port);
int local_listen_port = 0;
if (sd->listen_count) {
local_listen_port = htons(sd->listen_ports[0]);
}
if (write(sock, &local_listen_port, 4) != 4) {
svc->trace_out(cm, "Write failed\n");
return -1;
Expand Down Expand Up @@ -601,8 +605,14 @@ attr_list attrs;
svc->trace_out(cm, "CMself check - Host IP addrs don't match, %lx, %lx", IP, host_addr);
return 0;
}
if (int_port_num != sd->listen_port) {
svc->trace_out(cm, "CMself check - Ports don't match, %d, %d", int_port_num, sd->listen_port);
int port_match = 0;
for (int i = 0; i < sd->listen_count; i++) {
if (int_port_num == sd->listen_ports[i]) {
port_match = sd->listen_ports[i];
}
}
if (!port_match) {
svc->trace_out(cm, "CMself check - Ports don't match, %d, %d", int_port_num, port_match);
return 0;
}
svc->trace_out(cm, "CMself check returning TRUE");
Expand Down Expand Up @@ -668,19 +678,14 @@ attr_list listen_info;
unsigned int length;
struct sockaddr_in sock_addr;
int sock_opt_val = 1;
int conn_sock;
int conn_sock = 0;
int attr_port_num = 0;
u_short port_num = 0;
int port_range_low, port_range_high;
int use_hostname = 0;
int IP;
char host_name[256];

conn_sock = socket(AF_INET, SOCK_STREAM, 0);
if (conn_sock == SOCKET_ERROR) {
fprintf(stderr, "Cannot open INET socket\n");
return NULL;
}
if (sd->cm) {
/* assert CM is locked */
assert(CM_LOCKED(svc, sd->cm));
Expand All @@ -707,87 +712,114 @@ attr_list listen_info;
sock_addr.sin_family = AF_INET;
sock_addr.sin_addr.s_addr = INADDR_ANY;
sock_addr.sin_port = htons(port_num);
if (sock_addr.sin_port != 0) {
/* specific port requested. set REUSEADDR, REUSEPORT because previous server might have died badly */
if (setsockopt(conn_sock, SOL_SOCKET, SO_REUSEADDR, (char *) &sock_opt_val,
sizeof(sock_opt_val)) != 0) {
fprintf(stderr, "Failed to set REUSEADDR on INET socket before bind\n");
perror("setsockopt(SO_REUSEADDR) failed");
return NULL;

for (int i = 0; i < sd->listen_count; i++) {
if ((sd->listen_ports[i] == port_num) ||
((sd->listen_ports[i] >= port_range_low) &&
(sd->listen_ports[i] <= port_range_high))) {
conn_sock = sd->listen_fds[i];
}
#ifdef SO_REUSEPORT
sock_opt_val = 1;
if (setsockopt(conn_sock, SOL_SOCKET, SO_REUSEPORT, (const char*)&sock_opt_val, sizeof(sock_opt_val)) != 0) {
fprintf(stderr, "Failed to set REUSEADDR on INET socket before bind\n");
perror("setsockopt(SO_REUSEPORT) failed");
}
if (!conn_sock) {
conn_sock = socket(AF_INET, SOCK_STREAM, 0);
if (conn_sock == SOCKET_ERROR) {
fprintf(stderr, "Cannot open INET socket\n");
return NULL;
}
if (sock_addr.sin_port != 0) {
/* specific port requested. set REUSEADDR, REUSEPORT because previous server might have died badly */
if (setsockopt(conn_sock, SOL_SOCKET, SO_REUSEADDR, (char *) &sock_opt_val,
sizeof(sock_opt_val)) != 0) {
fprintf(stderr, "Failed to set REUSEADDR on INET socket before bind\n");
perror("setsockopt(SO_REUSEADDR) failed");
return NULL;
}
#ifdef SO_REUSEPORT
sock_opt_val = 1;
if (setsockopt(conn_sock, SOL_SOCKET, SO_REUSEPORT, (const char*)&sock_opt_val, sizeof(sock_opt_val)) != 0) {
fprintf(stderr, "Failed to set REUSEADDR on INET socket before bind\n");
perror("setsockopt(SO_REUSEPORT) failed");
return NULL;
}
#endif
svc->trace_out(cm, "CMSocket trying to bind selected port %d", port_num);
if (bind(conn_sock, (struct sockaddr *) &sock_addr,
sizeof sock_addr) == SOCKET_ERROR) {
fprintf(stderr, "Cannot bind INET socket\n");
svc->trace_out(cm, "CMSocket trying to bind selected port %d", port_num);
if (bind(conn_sock, (struct sockaddr *) &sock_addr,
sizeof sock_addr) == SOCKET_ERROR) {
fprintf(stderr, "Cannot bind INET socket\n");
return NULL;
}
} else if (port_range_high == -1) {
/* bind to any port, range unconstrained */
sock_addr.sin_port = 0;
svc->trace_out(cm, "CMSocket trying to bind to any available port");
if (bind(conn_sock, (struct sockaddr *) &sock_addr,
sizeof sock_addr) == SOCKET_ERROR) {
fprintf(stderr, "Cannot bind INET socket\n");
return NULL;
}
} else {
long seedval = time(NULL) + getpid();
/* port num is free. Constrain to range to standards */
int size = port_range_high - port_range_low;
int tries = 30;
int result = SOCKET_ERROR;
srand48(seedval);
while (tries > 0) {
int target = port_range_low + size * drand48();
sock_addr.sin_port = htons(target);
svc->trace_out(cm, "CMSocket trying to bind port %d", target);
result = bind(conn_sock, (struct sockaddr *) &sock_addr,
sizeof sock_addr);
tries--;
if (result != SOCKET_ERROR) tries = 0;
if (tries%5 == 4) {
/* try reseeding in case we're in sync with another process */
srand48(time(NULL) + getpid());
}
if (tries == 20) {
/* damn, tried a lot, increase the range (This might violate specified range) */
size *= 10;
}
if (tries == 10) {
/* damn, tried a lot more, increase the range (This might violate specified range) */
size *= 10;
}
}
if (result == SOCKET_ERROR) {
fprintf(stderr, "Cannot bind INET socket\n");
return NULL;
}
}
/* begin listening for conns and set the backlog */
if (listen(conn_sock, FD_SETSIZE)) {
fprintf(stderr, "listen failed\n");
return NULL;
}
} else if (port_range_high == -1) {
/* bind to any port, range unconstrained */
sock_addr.sin_port = 0;
svc->trace_out(cm, "CMSocket trying to bind to any available port");
if (bind(conn_sock, (struct sockaddr *) &sock_addr,
sizeof sock_addr) == SOCKET_ERROR) {
fprintf(stderr, "Cannot bind INET socket\n");
svc->trace_out(cm, "CMSockets Adding socket_accept_conn as action on fd %d", conn_sock);
svc->fd_add_select(cm, conn_sock, socket_accept_conn,
(void *) trans, (void *) (long)conn_sock);

length = sizeof(sock_addr);
if (getsockname(conn_sock, (struct sockaddr *) &sock_addr, &length) < 0) {
fprintf(stderr, "Cannot get socket name\n");
return NULL;
}
sd->listen_fds = realloc(sd->listen_fds,
sizeof(int)*(sd->listen_count+1));
sd->listen_ports = realloc(sd->listen_ports,
sizeof(int)*(sd->listen_count+1));
sd->listen_fds[sd->listen_count] = conn_sock;
sd->listen_ports[sd->listen_count] = ntohs(sock_addr.sin_port);
sd->listen_count++;
} else {
long seedval = time(NULL) + getpid();
/* port num is free. Constrain to range to standards */
int size = port_range_high - port_range_low;
int tries = 30;
int result = SOCKET_ERROR;
srand48(seedval);
while (tries > 0) {
int target = port_range_low + size * drand48();
sock_addr.sin_port = htons(target);
svc->trace_out(cm, "CMSocket trying to bind port %d", target);
result = bind(conn_sock, (struct sockaddr *) &sock_addr,
sizeof sock_addr);
tries--;
if (result != SOCKET_ERROR) tries = 0;
if (tries%5 == 4) {
/* try reseeding in case we're in sync with another process */
srand48(time(NULL) + getpid());
}
if (tries == 20) {
/* damn, tried a lot, increase the range (This might violate specified range) */
size *= 10;
}
if (tries == 10) {
/* damn, tried a lot more, increase the range (This might violate specified range) */
size *= 10;
}
}
if (result == SOCKET_ERROR) {
fprintf(stderr, "Cannot bind INET socket\n");
length = sizeof(sock_addr);
if (getsockname(conn_sock, (struct sockaddr *) &sock_addr, &length) < 0) {
fprintf(stderr, "Cannot get socket name\n");
return NULL;
}
}
length = sizeof sock_addr;
if (getsockname(conn_sock, (struct sockaddr *) &sock_addr, &length) < 0) {
fprintf(stderr, "Cannot get socket name\n");
return NULL;
}
/* begin listening for conns and set the backlog */
if (listen(conn_sock, FD_SETSIZE)) {
fprintf(stderr, "listen failed\n");
return NULL;
svc->trace_out(cm, "CMSockets reusing prior listen, fd %d, port %d\n", conn_sock, ntohs(sock_addr.sin_port));
}
/* set the port num as one we can be contacted at */

svc->trace_out(cm, "CMSockets Adding socket_accept_conn as action on fd %d", conn_sock);
svc->fd_add_select(cm, conn_sock, socket_accept_conn,
(void *) trans, (void *) (long)conn_sock);

sd->conn_sock = conn_sock;
{
int int_port_num = ntohs(sock_addr.sin_port);
attr_list ret_list;
Expand All @@ -799,7 +831,6 @@ attr_list listen_info;
if (sd->hostname != NULL)
svc->free_func(sd->hostname);
sd->hostname = strdup(host_name);
sd->listen_port = int_port_num;
if ((IP != 0) && (!use_hostname)) {
add_attr(ret_list, CM_IP_ADDR, Attr_Int4,
(attr_value) (long)IP);
Expand Down Expand Up @@ -1141,7 +1172,11 @@ free_socket_data(CManager cm, void *sdv)
if (sd->hostname != NULL)
svc->free_func(sd->hostname);
free_attr_list(sd->characteristics);
close(sd->conn_sock);
for(int i = 0 ; i < sd->listen_count; i++) {
close(sd->listen_fds[i]);
}
svc->free_func(sd->listen_fds);
svc->free_func(sd->listen_ports);
svc->free_func(sd);
}

Expand Down Expand Up @@ -1189,9 +1224,11 @@ libcmsockets_LTX_initialize(CManager cm, CMtrans_services svc, transport_entry t
socket_data = svc->malloc_func(sizeof(struct socket_client_data));
socket_data->cm = cm;
socket_data->hostname = NULL;
socket_data->listen_port = -1;
socket_data->svc = svc;
socket_data->characteristics = create_attr_list();
socket_data->listen_count = 0;
socket_data->listen_fds = malloc(sizeof(int));
socket_data->listen_ports = malloc(sizeof(int));
add_int_attr(socket_data->characteristics, CM_TRANSPORT_RELIABLE, 1);
svc->add_shutdown_task(cm, free_socket_data, (void *) socket_data, FREE_TASK);
return (void *) socket_data;
Expand Down

0 comments on commit 7263faa

Please sign in to comment.