Skip to content

Commit

Permalink
Reliable receiving (-r) data as sent by sender
Browse files Browse the repository at this point in the history
Two changes implemented:

1. On client, Following TEST_END signal, it sends the total number of bytes sent for server over control-socket.

2. On server,
    a. following TEST_END it receives the total number of bytes and sets to test->settings->bytes
    b. (if reliability requested) it defers the test termination and cleanup until that exact number of bytes are received (timer-based solution)
  • Loading branch information
hanvari committed Jul 3, 2021
1 parent e3da02c commit 703bf42
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ struct iperf_test
int omit; /* duration of omit period (-O flag) */
int duration; /* total duration of test (-t flag) */
char *diskfile_name; /* -F option */
int reliable_receive_in_full; /* -r option. For reciever only. */
int affinity, server_affinity; /* -A option */
#if defined(HAVE_CPUSET_SETAFFINITY)
cpuset_t cpumask;
Expand Down
7 changes: 6 additions & 1 deletion src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,7 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv)
{"zerocopy", no_argument, NULL, 'Z'},
{"omit", required_argument, NULL, 'O'},
{"file", required_argument, NULL, 'F'},
{"reliable", required_argument, NULL, 'r'},
{"repeating-payload", no_argument, NULL, OPT_REPEATING_PAYLOAD},
{"timestamps", optional_argument, NULL, OPT_TIMESTAMPS},
#if defined(HAVE_CPU_AFFINITY)
Expand Down Expand Up @@ -1029,7 +1030,7 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv)
char *client_username = NULL, *client_rsa_public_key = NULL, *server_rsa_private_key = NULL;
#endif /* HAVE_SSL */

while ((flag = getopt_long(argc, argv, "p:f:i:D1VJvsc:ub:t:n:k:l:P:Rw:B:M:N46S:L:ZO:F:A:T:C:dI:hX:", longopts, NULL)) != -1) {
while ((flag = getopt_long(argc, argv, "p:f:i:D1VJvsc:ub:t:n:k:l:P:Rw:B:M:N46S:L:ZO:F:rA:T:C:dI:hX:", longopts, NULL)) != -1) {
switch (flag) {
case 'p':
portno = atoi(optarg);
Expand Down Expand Up @@ -1328,6 +1329,9 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv)
case 'F':
test->diskfile_name = optarg;
break;
case 'r':
test->reliable_receive_in_full = 1;
break;
case OPT_IDLE_TIMEOUT:
test->settings->idle_timeout = atoi(optarg);
if (test->settings->idle_timeout < 1 || test->settings->idle_timeout > MAX_TIME) {
Expand Down Expand Up @@ -2618,6 +2622,7 @@ iperf_defaults(struct iperf_test *testp)
testp->omit = OMIT;
testp->duration = DURATION;
testp->diskfile_name = (char*) 0;
testp->reliable_receive_in_full = 0;
testp->affinity = -1;
testp->server_affinity = -1;
TAILQ_INIT(&testp->xbind_addrs);
Expand Down
7 changes: 7 additions & 0 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,13 @@ iperf_run_client(struct iperf_test * test)
test->stats_callback(test);
if (iperf_set_send_state(test, TEST_END) != 0)
goto cleanup_and_fail;
// sending total bytes count for server,
// to be used if needed for reliable data saving in file on server (-F)
int32_t netBytesSent = htonl(test->bytes_sent);
if (Nwrite(test->ctrl_sck, (char*) &netBytesSent, sizeof(netBytesSent), Ptcp) < 0) {
i_errno = IECTRLWRITE;
return -1;
}
}
}
// If we're in reverse mode, continue draining the data
Expand Down
108 changes: 92 additions & 16 deletions src/iperf_server_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,47 @@ iperf_accept(struct iperf_test *test)
return 0;
}

// function to be used with timer for reliably receiving data
// this makes sure we wait until receiving the exact number of bytes as sent by client
Timer *server_receive_timer_defer_termination;
static void
server_defer_test_done_until_receive_all(TimerClientData client_data, struct iperf_time *nowP){
struct iperf_test *test = client_data.p;

if (test->debug)
printf("Deferred TEST_END. test state: %d, bytes received %d out of %d\n",test->state, test->bytes_received, test->settings->bytes);

struct iperf_stream *sp;

if (test->settings->bytes != 0 &&
test->bytes_received >= test->settings->bytes) {
// first, cancel the timer !
tmr_cancel(server_receive_timer_defer_termination);
// Then, the rest
if (test->debug)
printf("It seems we received sufficient data! %d out of %d bytes",test->bytes_received,test->settings->bytes);
// Test done :)
// Taken from TEST_END case in message handler function
test->state = TEST_END; // manual, since we revert it in the switch/case
test->done = 1;
cpu_util(test->cpu_util);
test->stats_callback(test);
SLIST_FOREACH(sp, &test->streams, streams) {
FD_CLR(sp->socket, &test->read_set);
FD_CLR(sp->socket, &test->write_set);
close(sp->socket);
}
test->reporter_callback(test);
if (iperf_set_send_state(test, EXCHANGE_RESULTS) != 0)
return -1;
if (iperf_exchange_results(test) < 0)
return -1;
if (iperf_set_send_state(test, DISPLAY_RESULTS) != 0)
return -1;
if (test->on_test_finish)
test->on_test_finish(test);
}
}

/**************************************************************************/
int
Expand All @@ -191,24 +232,59 @@ iperf_handle_message_server(struct iperf_test *test)
case TEST_START:
break;
case TEST_END:
test->done = 1;
cpu_util(test->cpu_util);
test->stats_callback(test);
SLIST_FOREACH(sp, &test->streams, streams) {
FD_CLR(sp->socket, &test->read_set);
FD_CLR(sp->socket, &test->write_set);
close(sp->socket);
}
test->reporter_callback(test);
if (iperf_set_send_state(test, EXCHANGE_RESULTS) != 0)
return -1;
if (iperf_exchange_results(test) < 0)
{
// receive total number of bytes sent by clinet
int32_t netBytesSent;
if (Nread(test->ctrl_sck, (char*) &netBytesSent, sizeof(netBytesSent), Ptcp) < 0) {
i_errno = IESENDMESSAGE;
return -1;
if (iperf_set_send_state(test, DISPLAY_RESULTS) != 0)
return -1;
if (test->on_test_finish)
test->on_test_finish(test);
}
// decide on immediate termination, or waiting for reliable data receive
if(test->reliable_receive_in_full==0){
// no reliable data receive requested, so terminate immediately
test->done = 1;
cpu_util(test->cpu_util);
test->stats_callback(test);
SLIST_FOREACH(sp, &test->streams, streams) {
FD_CLR(sp->socket, &test->read_set);
FD_CLR(sp->socket, &test->write_set);
close(sp->socket);
}
test->reporter_callback(test);
if (iperf_set_send_state(test, EXCHANGE_RESULTS) != 0)
return -1;
if (iperf_exchange_results(test) < 0)
return -1;
if (iperf_set_send_state(test, DISPLAY_RESULTS) != 0)
return -1;
if (test->on_test_finish)
test->on_test_finish(test);
}
else {
// reliable data receive requested, so wait until all the data received
struct iperf_time now;
TimerClientData cd;
if (test->debug)
printf("In TEST_END. Registering a timer...\n");
// getting size of data to be received (bytes_count)
if (test->debug){
printf("Receiving total bytes count...\n");
printf("Current test->settings->bytes is %d\n", test->settings->bytes);
}
test->settings->bytes = ntohl(netBytesSent);
if (test->debug)
printf("New test->settings->bytes is %d\n", test->settings->bytes);
// registering the timer to check if all data received to terminate
if (iperf_time_now(&now) < 0) {
i_errno = IEINITTEST;
return -1;
}
cd.p = test;
server_receive_timer_defer_termination = tmr_create(&now, server_defer_test_done_until_receive_all, cd, 0.5 * SEC_TO_US, 1);
test->state = TEST_RUNNING; // to avoid any side-effects
}
break;
}
case IPERF_DONE:
break;
case CLIENT_TERMINATE:
Expand Down

0 comments on commit 703bf42

Please sign in to comment.