diff --git a/src/tools/ddsperf/CMakeLists.txt b/src/tools/ddsperf/CMakeLists.txt index b183f2f4d2..1700a8c064 100644 --- a/src/tools/ddsperf/CMakeLists.txt +++ b/src/tools/ddsperf/CMakeLists.txt @@ -18,6 +18,8 @@ if (BUILD_DDSPERF) ddsperf.c cputime.c cputime.h netload.c netload.h + jparser.c jparser.h + parameters.c parameters.h async_listener.c async_listener.h) target_link_libraries(ddsperf ddsperf_types ddsc compat) diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c index 8df98517c4..5ab513333e 100644 --- a/src/tools/ddsperf/ddsperf.c +++ b/src/tools/ddsperf/ddsperf.c @@ -38,6 +38,7 @@ #include "dds/ddsrt/atomics.h" #include "dds/ddsrt/heap.h" +#include "parameters.h" #include "cputime.h" #include "netload.h" @@ -50,27 +51,6 @@ #define PINGPONG_RAWSIZE 20000 -enum topicsel { - KS, /* KeyedSeq type: seq#, key, sequence-of-octet */ - K32, /* Keyed32 type: seq#, key, array-of-24-octet (sizeof = 32) */ - K256, /* Keyed256 type: seq#, key, array-of-248-octet (sizeof = 256) */ - OU, /* OneULong type: seq# */ - UK16, /* Unkeyed16, type: seq#, array-of-12-octet (sizeof = 16) */ - UK1k, /* Unkeyed1k, type: seq#, array-of-1020-octet (sizeof = 1024) */ - UK64k, /* Unkeyed64k, type: seq#, array-of-65532-octet (sizeof = 65536) */ - S16, /* Keyed, 16 octets, int64 junk, seq#, key */ - S256, /* Keyed, 16 * S16, int64 junk, seq#, key */ - S4k, /* Keyed, 16 * S256, int64 junk, seq#, key */ - S32k /* Keyed, 4 * S4k, int64 junk, seq#, key */ -}; - -enum submode { - SM_NONE, /* no subscriber at all */ - SM_WAITSET, /* subscriber using a waitset */ - SM_POLLING, /* ... using polling, sleeping for 1ms if no data */ - SM_LISTENER /* ... using a DATA_AVAILABLE listener */ -}; - static const char *argv0; static ddsrt_atomic_uint32_t termflag = DDSRT_ATOMIC_UINT32_INIT (0); @@ -85,43 +65,9 @@ static dds_entity_t rd_participants, rd_subscriptions, rd_publications; /* Topics, readers, writers (except for pong writers: there are many of those) */ -static dds_entity_t tp_data, tp_ping, tp_pong, tp_stat; -static char tpname_data[32], tpname_ping[32], tpname_pong[32]; -static dds_entity_t sub, pub, wr_data, wr_ping, wr_stat, rd_data, rd_ping, rd_pong, rd_stat; - -/* Number of different key values to use (must be 1 for OU type) */ -static unsigned nkeyvals = 1; - -/* Topic type to use */ -static enum topicsel topicsel = KS; - -/* Data and ping/pong subscriber triggering modes */ -static enum submode submode = SM_LISTENER; -static enum submode pingpongmode = SM_LISTENER; - -/* Whether to show "sub" stats every second even when nothing happens */ -static bool substat_every_second = false; - -/* Whether to show extended statistics (currently just rexmit info) */ -static bool extended_stats = false; - -/* Size of the sequence in KeyedSeq type in bytes */ -static uint32_t baggagesize = 0; - -/* Whether or not to register instances prior to writing */ -static bool register_instances = true; - -/* Maximum run time in seconds */ -static double dur = HUGE_VAL; - -/* Minimum number of peers (if not met, exit status is 1) */ -static uint32_t minmatch = 0; - -/* Wait this long for MINMATCH peers before starting */ -static double initmaxwait = 0; - -/* Maximum time it may take to discover all MINMATCH peers */ -static double maxwait = HUGE_VAL; +static dds_entity_t tp_stat; +static char tpname_pong[32]; +static dds_entity_t sub, pub, wr_stat, rd_stat; /* Number of participants for which all expected endpoints have been matched (this includes the local participant @@ -134,63 +80,16 @@ static uint32_t matchcount = 0; time (5s, currently) [protected by disc_lock] */ static uint32_t matchtimeout = 0; -/* Data is published in bursts of this many samples */ -static uint32_t burstsize = 1; - -/* Whether to use reliable or best-effort readers/writers */ -static bool reliable = true; - -/* History depth for throughput data reader and writer; 0 is - KEEP_ALL, otherwise it is KEEP_LAST histdepth. Ping/pong - always uses KEEP_LAST 1. */ -static int32_t histdepth = 0; - -/* Publishing rate in Hz, HUGE_VAL means as fast as possible, - 0 means no throughput data is published at all */ -static double pub_rate; - -/* Fraction of throughput data samples that double as a ping - message */ -static uint32_t ping_frac = 0; - /* Setting for "ignore local" reader/writer QoS: whether or not to ignore readers and writers in the same particiapnt that would otherwise match */ static dds_ignorelocal_kind_t ignorelocal = DDS_IGNORELOCAL_PARTICIPANT; - -/* Pinging interval for roundtrip testing, 0 means as fast as - possible, DDS_INFINITY means never */ -static dds_duration_t ping_intv; +static bool batch_mode = false; /* Number of times a new ping was sent before all expected pongs had been received */ static uint32_t ping_timeouts = 0; -/* Maximum allowed increase in RSS between initial RSS sample and - final RSS sample: final one must be <= - init * (1 + rss_factor/100) + rss_term */ -static bool rss_check = false; -static double rss_factor = 1; -static double rss_term = 0; - -/* Maximum allowed increase in live memory between initial sample - and final sample: final one must be <= - init * (1 + livemem_factor/100) + livemem_term */ -static bool livemem_check = false; -static double livemem_factor = 1; -static double livemem_term = 0; - -/* Minimum number of samples, minimum number of roundtrips to - declare the run a success */ -static uint64_t min_received = 0; -static uint64_t min_roundtrips = 0; - -/* Whether to gather/show latency information in "sub" mode */ -static bool sublatency = false; - -/* Use writer loans (only for memcpy-able types) */ -static bool use_writer_loan = false; - /* Event queue for processing discovery events (data available on DCPSParticipant, subscription & publication matched) asynchronously to avoid deadlocking on creating a reader from @@ -224,6 +123,7 @@ struct latencystat { /* Subscriber statistics for tracking number of samples received and lost per source */ struct eseq_stat { + bool keepall; /* totals */ uint64_t nrecv; uint64_t nlost; @@ -253,8 +153,6 @@ struct eseq_admin { uint32_t **eseq; }; -static struct eseq_admin eseq_admin; - /* Entry for mapping ping/data publication handle to pong writer */ struct subthread_arg_pongwr { dds_instance_handle_t pubhandle; @@ -299,15 +197,6 @@ static ddsrt_mutex_t pongwr_lock; static uint32_t npongwr; static struct subthread_arg_pongwr *pongwr; -/* Each subscriber thread gets its own not-quite-pre-allocated - set of samples (it does use a loan, but that loan gets reused) */ -struct subthread_arg { - dds_entity_t rd; - uint32_t max_samples; - dds_sample_info_t *iseq; - void **mseq; -}; - /* Type used for converting GUIDs to strings, used for generating the per-participant partition names */ struct guidstr { @@ -387,6 +276,41 @@ union data { Struct32k s32k; }; +struct pubdesc { + const struct dataflow *flow; + dds_entity_t tp; + dds_entity_t pub; + dds_entity_t wr; + ddsrt_thread_t tid; + struct dds_statistics *pubstat; + const struct dds_stat_keyvalue *rexmit_bytes; + const struct dds_stat_keyvalue *time_throttle; + const struct dds_stat_keyvalue *time_rexmit; + const struct dds_stat_keyvalue *throttle_count; + struct pubdesc *next; +}; + +/* Each subscriber thread gets its own not-quite-pre-allocated + set of samples (it does use a loan, but that loan gets reused) */ +struct subdesc{ + const struct dataflow *flow; + dds_entity_t tp; + dds_entity_t sub; + dds_entity_t rd; + uint32_t max_samples; + dds_sample_info_t *iseq; + void **mseq; + struct dds_statistics *substat; + const struct dds_stat_keyvalue *discarded_bytes; + struct eseq_admin eseq_admin; + struct subdesc *next; +}; + +static struct global globals; +static struct subdesc ping_reader; +static struct subdesc pong_reader; +static struct pubdesc ping_writer; + static void verrorx (int exitcode, const char *fmt, va_list ap) { vprintf (fmt, ap); @@ -645,7 +569,7 @@ static void *make_baggage (dds_sequence_octet *b, uint32_t cnt) return b->_buffer; } -static size_t getseqoff (void) +static size_t getseqoff (enum topicsel topicsel) { switch (topicsel) { @@ -664,7 +588,7 @@ static size_t getseqoff (void) return 0; } -static size_t getkeyvaloff (void) +static size_t getkeyvaloff (enum topicsel topicsel) { switch (topicsel) { @@ -683,48 +607,49 @@ static size_t getkeyvaloff (void) return 0; } -static void *init_sample (union data *data, uint32_t seq) +static void *init_sample (union data *data, uint32_t seq, enum topicsel topicsel, uint32_t baggagesize) { void *baggage = NULL; memset (data, 0xee, sizeof (*data)); if (topicsel == KS) baggage = make_baggage (&data->ks.baggage, baggagesize); - *((uint32_t *) ((char *) data + getseqoff ())) = seq; - if (getkeyvaloff () != SIZE_MAX) - *((uint32_t *) ((char *) data + getkeyvaloff ())) = 0; + *((uint32_t *) ((char *) data + getseqoff (topicsel))) = seq; + if (getkeyvaloff (topicsel) != SIZE_MAX) + *((uint32_t *) ((char *) data + getkeyvaloff (topicsel))) = 0; return baggage; } static uint32_t pubthread (void *varg) { int result; + struct pubdesc *tdesc = varg; + const struct dataflow * const flow = tdesc->flow; dds_instance_handle_t *ihs; dds_time_t ntot = 0, tfirst; union data data; void *baggage = NULL; - (void) varg; memset (&data, 0, sizeof (data)); - assert (nkeyvals > 0); - assert (topicsel != OU || nkeyvals == 1); + assert (flow->nkeyvals > 0); + assert (flow->topicsel != OU || flow->nkeyvals == 1); - baggage = init_sample (&data, 0); - size_t seqoff = getseqoff (); - size_t keyvaloff = getkeyvaloff (); - ihs = malloc (nkeyvals * sizeof (dds_instance_handle_t)); + baggage = init_sample (&data, 0, flow->topicsel, flow->baggagesize); + size_t seqoff = getseqoff (flow->topicsel); + size_t keyvaloff = getkeyvaloff (flow->topicsel); + ihs = malloc (flow->nkeyvals * sizeof (dds_instance_handle_t)); assert(ihs); - if (!register_instances) + if (!flow->register_instances) { - for (unsigned k = 0; k < nkeyvals; k++) + for (unsigned k = 0; k < flow->nkeyvals; k++) ihs[k] = 0; } else { - for (unsigned k = 0; k < nkeyvals; k++) + for (unsigned k = 0; k < flow->nkeyvals; k++) { if (keyvaloff != SIZE_MAX) *((uint32_t *) ((char *) &data + keyvaloff)) = 0; - if ((result = dds_register_instance (wr_data, &ihs[k], &data)) != DDS_RETCODE_OK) + if ((result = dds_register_instance (tdesc->wr, &ihs[k], &data)) != DDS_RETCODE_OK) { printf ("dds_register_instance failed: %d\n", result); fflush (stdout); @@ -743,11 +668,11 @@ static uint32_t pubthread (void *varg) while (!ddsrt_atomic_ld32 (&termflag)) { /* lsb of timestamp is abused to signal whether the sample is a ping requiring a response or not */ - bool reqresp = (ping_frac == 0) ? 0 : (ping_frac == UINT32_MAX) ? 1 : (ddsrt_random () <= ping_frac); + bool reqresp = (flow->ping_frac == 0) ? 0 : (flow->ping_frac == UINT32_MAX) ? 1 : (ddsrt_random () <= flow->ping_frac); void *dataptr; - if (!use_writer_loan) + if (!flow->use_writer_loan) dataptr = &data; - else if ((result = dds_request_loan (wr_data, &dataptr)) < 0) + else if ((result = dds_request_loan (tdesc->wr, &dataptr)) < 0) { printf ("request loan error: %d\n", result); fflush (stdout); @@ -760,7 +685,7 @@ static uint32_t pubthread (void *varg) *((uint32_t *) ((char *) dataptr + keyvaloff)) = *((uint32_t *) ((char *) &data + keyvaloff)); } - if ((result = dds_write_ts (wr_data, dataptr, (t_write & ~1) | reqresp)) != DDS_RETCODE_OK) + if ((result = dds_write_ts (tdesc->wr, dataptr, (t_write & ~1) | reqresp)) != DDS_RETCODE_OK) { printf ("write error: %d\n", result); fflush (stdout); @@ -774,7 +699,7 @@ static uint32_t pubthread (void *varg) } if (reqresp) { - dds_write_flush (wr_data); + dds_write_flush (tdesc->wr); } const dds_time_t t_post_write = (time_counter == 1) ? dds_time () : t_write; @@ -803,19 +728,19 @@ static uint32_t pubthread (void *varg) (*((uint32_t *) ((char *) &data + seqoff)))++; if (keyvaloff != SIZE_MAX) { uint32_t * const keyvalptr = (uint32_t *) ((char *) &data + keyvaloff); - *keyvalptr = (*keyvalptr + 1) % nkeyvals; + *keyvalptr = (*keyvalptr + 1) % flow->nkeyvals; } t_write = t_post_write; - if (pub_rate < HUGE_VAL) + if (flow->pub_rate < HUGE_VAL) { - if (++batch_counter == burstsize) + if (++batch_counter == flow->burstsize) { /* FIXME: should average rate over a short-ish period, rather than over the entire run */ - while (((double) (ntot / burstsize) / ((double) (t_write - tfirst) / 1e9 + 5e-3)) > pub_rate && !ddsrt_atomic_ld32 (&termflag)) + while (((double) (ntot / flow->burstsize) / ((double) (t_write - tfirst) / 1e9 + 5e-3)) > flow->pub_rate && !ddsrt_atomic_ld32 (&termflag)) { /* FIXME: flushing manually because batching is not yet implemented properly */ - dds_write_flush (wr_data); + dds_write_flush (tdesc->wr); dds_sleepfor (DDS_MSECS (1)); t_write = dds_time (); time_counter = time_interval = 1; @@ -933,7 +858,7 @@ static void fini_eseq_admin (struct eseq_admin *ea) { free (ea->ph); free (ea->pph); - if (sublatency) + if (globals.sublatency) { for (unsigned i = 0; i < ea->nph; i++) latencystat_fini (&ea->stats[i].info); @@ -970,7 +895,7 @@ static dds_instance_handle_t get_pphandle_for_pubhandle (dds_instance_handle_t p } DDSRT_WARNING_MSVC_OFF(6308) -static int check_eseq (struct eseq_admin *ea, uint32_t seq, uint32_t keyval, uint32_t size, const dds_instance_handle_t pubhandle, int64_t tdelta) +static int check_eseq (struct eseq_admin *ea, uint32_t seq, uint32_t keyval, uint32_t size, const dds_instance_handle_t pubhandle, int64_t tdelta, bool keepall) { uint32_t *eseq; if (keyval >= ea->nkeys) @@ -988,7 +913,8 @@ static int check_eseq (struct eseq_admin *ea, uint32_t seq, uint32_t keyval, uin ea->stats[i].nrecv_bytes += size; ea->stats[i].nlost += seq - e; ea->stats[i].last_size = size; - if (sublatency) + ea->stats[i].keepall = keepall; + if (globals.sublatency) latencystat_update (&ea->stats[i].info, tdelta); ddsrt_mutex_unlock (&ea->lock); return seq == e; @@ -1014,7 +940,7 @@ static int check_eseq (struct eseq_admin *ea, uint32_t seq, uint32_t keyval, uin ea->stats[ea->nph].nrecv = 1; ea->stats[ea->nph].nrecv_bytes = size; ea->stats[ea->nph].last_size = size; - if (sublatency) + if (globals.sublatency) { latencystat_init (&ea->stats[ea->nph].info); latencystat_update (&ea->stats[ea->nph].info, tdelta); @@ -1025,6 +951,7 @@ static int check_eseq (struct eseq_admin *ea, uint32_t seq, uint32_t keyval, uin } DDSRT_WARNING_MSVC_ON(6308) + static bool update_roundtrip (dds_instance_handle_t pubhandle, int64_t tdelta, bool isping, uint32_t seq) { bool allseen; @@ -1112,11 +1039,12 @@ static dds_entity_t get_pong_writer (dds_instance_handle_t pubhandle) return wr_pong; } -static bool process_data (dds_entity_t rd, struct subthread_arg *arg) +static bool process_data (dds_entity_t rd, struct subdesc *sdesc) { - uint32_t max_samples = arg->max_samples; - dds_sample_info_t *iseq = arg->iseq; - void **mseq = arg->mseq; + uint32_t max_samples = sdesc->max_samples; + dds_sample_info_t *iseq = sdesc->iseq; + void **mseq = sdesc->mseq; + enum topicsel topicsel = sdesc->flow->topicsel; int32_t nread_data; if ((nread_data = dds_take (rd, mseq, iseq, max_samples, max_samples)) < 0) error2 ("dds_take (rd_data): %d\n", (int) nread_data); @@ -1126,7 +1054,7 @@ static bool process_data (dds_entity_t rd, struct subthread_arg *arg) { // time stamp is only used when tracking latency, and reading the clock a couple of // times every microsecond is rather costly - const int64_t tdelta = sublatency ? dds_time () - iseq[i].source_timestamp : 0; + const int64_t tdelta = globals.sublatency ? dds_time () - iseq[i].source_timestamp : 0; uint32_t seq = 0, keyval = 0, size = 0; switch (topicsel) { @@ -1145,7 +1073,7 @@ static bool process_data (dds_entity_t rd, struct subthread_arg *arg) case S4k: { Struct4k *d = mseq[i]; keyval = d->keyval; seq = d->seq; size = topic_payload_size (topicsel, 0); } break; case S32k: { Struct32k *d = mseq[i]; keyval = d->keyval; seq = d->seq; size = topic_payload_size (topicsel, 0); } break; } - (void) check_eseq (&eseq_admin, seq, keyval, size, iseq[i].publication_handle, tdelta); + (void) check_eseq (&sdesc->eseq_admin, seq, keyval, size, iseq[i].publication_handle, tdelta, (sdesc->flow->reliable && sdesc->flow->histdepth == 0)); if (iseq[i].source_timestamp & 1) { dds_entity_t wr_pong; @@ -1161,7 +1089,7 @@ static bool process_data (dds_entity_t rd, struct subthread_arg *arg) return (nread_data > 0); } -static bool process_ping (dds_entity_t rd, struct subthread_arg *arg) +static bool process_ping (dds_entity_t rd, struct subdesc *arg) { /* Ping sends back Pongs with the lsb 1; Data sends back Pongs with the lsb 0. This way, the Pong handler can figure out whether to Ping again or not by looking at the lsb. If it is 1, another Ping is required */ @@ -1187,7 +1115,7 @@ static bool process_ping (dds_entity_t rd, struct subthread_arg *arg) return (nread_ping > 0); } -static bool process_pong (dds_entity_t rd, struct subthread_arg *arg) +static bool process_pong (dds_entity_t rd, struct subdesc *arg) { uint32_t max_samples = arg->max_samples; dds_sample_info_t *iseq = arg->iseq; @@ -1204,7 +1132,7 @@ static bool process_pong (dds_entity_t rd, struct subthread_arg *arg) uint32_t * const seq = mseq[i]; const bool isping = (iseq[i].source_timestamp & 1) != 0; const bool all = update_roundtrip (iseq[i].publication_handle, (tnow - iseq[i].source_timestamp) / 2, isping, *seq); - if (isping && all && ping_intv == 0) + if (isping && all && globals.ping_intv == 0) { /* If it is a pong sent in response to a ping, and all known nodes have responded, send out a new ping */ dds_return_t rc; @@ -1213,7 +1141,7 @@ static bool process_pong (dds_entity_t rd, struct subthread_arg *arg) cur_ping_time = dds_time (); cur_ping_seq = ++(*seq); ddsrt_mutex_unlock (&pongwr_lock); - if ((rc = dds_write_ts (wr_ping, mseq[i], dds_time () | 1)) < 0 && rc != DDS_RETCODE_TIMEOUT) + if ((rc = dds_write_ts (ping_writer.wr, mseq[i], dds_time () | 1)) < 0 && rc != DDS_RETCODE_TIMEOUT) error2 ("dds_write (wr_ping, mseq[i]): %d\n", (int) rc); } } @@ -1226,11 +1154,11 @@ static void maybe_send_new_ping (dds_time_t tnow, dds_time_t *tnextping) void *baggage; union data data; int32_t rc; - assert (ping_intv != DDS_INFINITY); + assert (globals.ping_intv != DDS_INFINITY); ddsrt_mutex_lock (&pongwr_lock); - if (tnow < cur_ping_time + (ping_intv == 0 ? DDS_SECS (1) : ping_intv)) + if (tnow < cur_ping_time + (globals.ping_intv == 0 ? DDS_SECS (1) : globals.ping_intv)) { - if (ping_intv == 0) + if (globals.ping_intv == 0) *tnextping = cur_ping_time + DDS_SECS (1); ddsrt_mutex_unlock (&pongwr_lock); } @@ -1247,7 +1175,7 @@ static void maybe_send_new_ping (dds_time_t tnow, dds_time_t *tnextping) } } n_pong_seen = 0; - if (ping_intv == 0) + if (globals.ping_intv == 0) { *tnextping = tnow + DDS_SECS (1); cur_ping_time = tnow; @@ -1257,95 +1185,123 @@ static void maybe_send_new_ping (dds_time_t tnow, dds_time_t *tnextping) /* tnow should be ~ cur_ping_time + ping_intv, but it won't be if the wakeup was delayed significantly, the machine was suspended in the meantime, so slow down if we can't keep up */ - cur_ping_time += ping_intv; - if (cur_ping_time < tnow - ping_intv / 2) + cur_ping_time += globals.ping_intv; + if (cur_ping_time < tnow - globals.ping_intv / 2) cur_ping_time = tnow; - *tnextping = cur_ping_time + ping_intv; + *tnextping = cur_ping_time + globals.ping_intv; } cur_ping_seq++; - baggage = init_sample (&data, cur_ping_seq); + baggage = init_sample (&data, cur_ping_seq, ping_writer.flow->topicsel, ping_writer.flow->baggagesize); ddsrt_mutex_unlock (&pongwr_lock); - if ((rc = dds_write_ts (wr_ping, &data, dds_time () | 1)) < 0 && rc != DDS_RETCODE_TIMEOUT) + if ((rc = dds_write_ts (ping_writer.wr, &data, dds_time () | 1)) < 0 && rc != DDS_RETCODE_TIMEOUT) error2 ("send_new_ping: dds_write (wr_ping, &data): %d\n", (int) rc); if (baggage) free (baggage); } } -static dds_entity_t make_reader_waitset (dds_entity_t rd) +static dds_entity_t make_reader_waitset (void) { dds_entity_t ws; int32_t rc; ws = dds_create_waitset (dp); if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc); - if ((rc = dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS)) < 0) - error2 ("dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS): %d\n", (int) rc); - if ((rc = dds_waitset_attach (ws, rd, 1)) < 0) - error2 ("dds_waitset_attach (ws, rd, 1): %d\n", (int) rc); return ws; } +static void attach_reader_waitset(dds_entity_t ws, struct subdesc *desc) +{ + int32_t rc; + if ((rc = dds_set_status_mask (desc->rd, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS)) < 0) + error2 ("dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS): %d\n", (int) rc); + if ((rc = dds_waitset_attach (ws, desc->rd, (dds_attach_t) desc)) < 0) + error2 ("dds_waitset_attach (ws, rd, desc): %d\n", (int) rc); +} + static uint32_t subthread_waitset (void *varg) { - struct subthread_arg * const arg = varg; - dds_entity_t ws = make_reader_waitset (rd_data); + struct subdesc * desc = varg; + struct subdesc * cur; + dds_entity_t ws = make_reader_waitset (); + struct subdesc **xs; + size_t len = 1; + + cur = desc; + while (cur) + { + attach_reader_waitset (ws, cur); + cur = cur->next; + len++; + } + + xs = ddsrt_malloc (len * sizeof (struct subdesc *)); while (!ddsrt_atomic_ld32 (&termflag)) { - if (!process_data (rd_data, arg)) + int32_t nxs, i; + if ((nxs = dds_waitset_wait (ws, (dds_attach_t *) xs, len, DDS_INFINITY)) < 0) + error2 ("dds_waitset_wait: %d\n", (int) nxs); + + for (i = 0; i < nxs; i++) { - /* when we use DATA_AVAILABLE, we must read until nothing remains, or we would deadlock - if more than max_samples were available and nothing further is received */ - int32_t nxs; - if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0) - error2 ("dds_waitset_wait: %d\n", (int) nxs); + if (xs[i] != NULL) + while (process_data (xs[i]->rd, xs[i])); } } + ddsrt_free (xs); return 0; } static uint32_t subpingthread_waitset (void *varg) { - struct subthread_arg * const arg = varg; - dds_entity_t ws = make_reader_waitset (rd_ping); + struct subdesc * const desc = varg; + dds_entity_t ws = make_reader_waitset (); + attach_reader_waitset (ws, desc); while (!ddsrt_atomic_ld32 (&termflag)) { int32_t nxs; if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0) error2 ("dds_waitset_wait: %d\n", (int) nxs); - process_ping (rd_ping, arg); + process_ping (desc->rd, desc); } return 0; } static uint32_t subpongthread_waitset (void *varg) { - struct subthread_arg * const arg = varg; - dds_entity_t ws = make_reader_waitset (rd_pong); + struct subdesc * const desc = varg; + dds_entity_t ws = make_reader_waitset (); + attach_reader_waitset (ws, desc); while (!ddsrt_atomic_ld32 (&termflag)) { int32_t nxs; if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0) error2 ("dds_waitset_wait: %d\n", (int) nxs); - process_pong (rd_pong, arg); + process_pong (desc->rd, desc); } return 0; } static uint32_t subthread_polling (void *varg) { - struct subthread_arg * const arg = varg; + struct subdesc * const desc = varg; while (!ddsrt_atomic_ld32 (&termflag)) { - if (!process_data (rd_data, arg)) - dds_sleepfor (DDS_MSECS (1)); + struct subdesc * cur = desc; + + while (cur) + { + while (process_data (cur->rd, cur)); + cur = cur->next; + } + dds_sleepfor (DDS_MSECS (1)); } return 0; } static void data_available_listener (dds_entity_t rd, void *arg) { - process_data (rd, arg); + process_data (rd, arg); } static void ping_available_listener (dds_entity_t rd, void *arg) @@ -1379,7 +1335,7 @@ static dds_entity_t create_pong_writer (dds_instance_handle_t pphandle, const st qos = dds_create_qos (); dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); dds_qset_ignorelocal (qos, ignorelocal); - if ((wr_pong = dds_create_writer (pongpub, tp_pong, qos, listener)) < 0) + if ((wr_pong = dds_create_writer (pongpub, pong_reader.tp, qos, listener)) < 0) error2 ("dds_create_writer(%s) failed: %d\n", tpname_pong, (int) wr_pong); dds_delete_qos (qos); dds_delete_listener (listener); @@ -1492,14 +1448,14 @@ static void async_participant_data_listener (dds_entity_t rd, void *arg) pp->pid = (uint32_t) pid; pp->tdisc = dds_time (); pp->tdeadline = pp->tdisc + DDS_SECS (5); - if (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE) - pp->unmatched = MM_ALL & ~(has_reader ? 0 : MM_RD_DATA) & ~(rd_data ? 0 : MM_WR_DATA); + if ((pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE) && !batch_mode) + pp->unmatched = MM_ALL & ~(has_reader ? 0 : MM_RD_DATA) & ~(globals.submode != SM_NONE ? 0 : MM_WR_DATA); else pp->unmatched = 0; ddsrt_fibheap_insert (&ppants_to_match_fhd, &ppants_to_match, pp); ddsrt_avl_insert_ipath (&ppants_td, &ppants, pp, &ipath); - make_pongwr = (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE); + make_pongwr = (!batch_mode && (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE)); } ddsrt_mutex_unlock (&disc_lock); @@ -1653,6 +1609,316 @@ static void set_data_available_listener (dds_entity_t rd, const char *rd_name, d dds_delete_listener (listener); } + + +static const struct dds_stat_keyvalue dummy_u64 = { .name = "", .kind = DDS_STAT_KIND_UINT64, .u.u64 = 0 }; +static const struct dds_stat_keyvalue dummy_u32 = { .name = "", .kind = DDS_STAT_KIND_UINT32, .u.u32 = 0 }; + +static const char * get_topic_suffix(enum topicsel topicsel) +{ + const char *tp_suf = ""; + switch (topicsel) + { + case KS: tp_suf = "KS"; break; + case K32: tp_suf = "K32"; break; + case K256: tp_suf = "K256"; break; + case OU: tp_suf = "OU"; break; + case UK16: tp_suf = "UK16"; break; + case UK1k : tp_suf = "UK1k"; break; + case UK64k: tp_suf = "UK64k"; break; + case S16: tp_suf = "S16"; break; + case S256: tp_suf = "S256"; break; + case S4k: tp_suf = "S4k"; break; + case S32k: tp_suf = "S32k"; break; + } + return tp_suf; +} + +static void pubdesc_init(struct pubdesc *desc, const struct dataflow *flow) +{ + dds_qos_t *qos = NULL; + dds_listener_t *listener; + dds_entity_t lpub; + + memset(desc, 0, sizeof (*desc)); + + desc->flow = flow; + + qos = dds_create_qos (); + dds_qset_reliability (qos, flow->reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (10)); + if ((desc->tp = dds_create_topic (dp, flow->tp_desc, flow->topicname, qos, NULL)) < 0) + error2 ("dds_create_topic(%s) failed: %d\n", flow->topicname, (int) desc->tp); + dds_delete_qos (qos); + + if (flow->partition != NULL) { + qos = dds_create_qos (); + dds_qset_partition1 (qos, flow->partition); + if ((lpub = dds_create_publisher (dp, qos, NULL)) < 0) + error2 ("dds_create_publisher failed: %d\n", (int) lpub); + desc->pub = lpub; + dds_delete_qos (qos); + } else { + desc->pub = 0; + lpub = pub; + } + + qos = dds_create_qos (); + if (flow->histdepth == 0) + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 1); + else + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, flow->histdepth); + dds_qset_resource_limits (qos, 10000, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + listener = dds_create_listener ((void *) (uintptr_t) MM_RD_DATA); + dds_lset_publication_matched (listener, publication_matched_listener); + dds_qset_writer_batching (qos, true); + if ((desc->wr = dds_create_writer (lpub, desc->tp, qos, listener)) < 0) + error2 ("dds_create_writer(%s) failed: %d\n", flow->topicname, (int) desc->wr); + dds_qset_writer_batching (qos, false); + dds_delete_listener (listener); + dds_delete_qos (qos); + + desc->pubstat = dds_create_statistics (desc->wr); + if (desc->rexmit_bytes == NULL) + desc->rexmit_bytes = &dummy_u64; + if (desc->time_rexmit == NULL) + desc->time_rexmit = &dummy_u64; + if (desc->time_throttle == NULL) + desc->time_throttle = &dummy_u64; + if (desc->throttle_count == NULL) + desc->throttle_count = &dummy_u32; + if (desc->rexmit_bytes->kind != DDS_STAT_KIND_UINT64 || + desc->time_rexmit->kind != DDS_STAT_KIND_UINT64 || + desc->time_throttle->kind != DDS_STAT_KIND_UINT64 || + desc->throttle_count->kind != DDS_STAT_KIND_UINT32) + { + abort (); + } +} + +static void pubdesc_fini(struct pubdesc *desc) +{ + dds_set_listener (desc->wr, NULL); + dds_delete_statistics (desc->pubstat); + dds_delete (desc->wr); +} + +static struct pubdesc * pubdesc_new(const struct dataflow *flow) +{ + struct pubdesc *desc = ddsrt_malloc (sizeof(*desc)); + + pubdesc_init (desc, flow); + return desc; +} + +static void pubdesc_free(struct pubdesc *desc) +{ + if (desc) + { + pubdesc_fini (desc); + ddsrt_free (desc); + } +} + +static void reader_loan_init (struct subdesc *desc, uint32_t max_samples) +{ + desc->max_samples = max_samples; + desc->mseq = malloc (desc->max_samples * sizeof (desc->mseq[0])); + assert(desc->mseq); + desc->iseq = malloc (desc->max_samples * sizeof (desc->iseq[0])); + assert(desc->iseq); + DDSRT_WARNING_MSVC_OFF(6386) + for (uint32_t i = 0; i < desc->max_samples; i++) + desc->mseq[i] = NULL; + DDSRT_WARNING_MSVC_ON(6386) +} + +static void reader_loan_fini (struct subdesc *desc) +{ + dds_return_loan(desc->rd, desc->mseq, (int32_t) desc->max_samples); + free (desc->mseq); + free (desc->iseq); +} + +static void subdesc_init(struct subdesc *desc, const struct dataflow *flow) +{ + dds_qos_t *qos = NULL; + dds_listener_t *listener; + dds_entity_t lsub; + + memset(desc, 0, sizeof (*desc)); + + desc->flow = flow; + + qos = dds_create_qos (); + dds_qset_reliability (qos, flow->reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (10)); + if ((desc->tp = dds_create_topic (dp, flow->tp_desc, flow->topicname, qos, NULL)) < 0) + error2 ("dds_create_topic(%s) failed: %d\n", flow->topicname, (int) desc->tp); + dds_delete_qos (qos); + + if (flow->partition != NULL) { + qos = dds_create_qos (); + dds_qset_partition1 (qos, flow->partition); + if ((lsub = dds_create_subscriber (dp, qos, NULL)) < 0) + error2 ("dds_create_subscriber failed: %d\n", (int) lsub); + desc->sub = lsub; + dds_delete_qos (qos); + } else { + desc->sub = 0; + lsub = sub; + } + + qos = dds_create_qos (); + if (flow->histdepth == 0) + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 1); + else + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, flow->histdepth); + dds_qset_resource_limits (qos, 10000, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + dds_qset_ignorelocal (qos, ignorelocal); + listener = dds_create_listener ((void *) (uintptr_t) MM_WR_DATA); + dds_lset_subscription_matched (listener, subscription_matched_listener); + if (globals.submode != SM_NONE && (desc->rd = dds_create_reader (lsub, desc->tp, qos, listener)) < 0) + error2 ("dds_create_reader(%s) failed: %d\n", flow->topicname, (int) desc->rd); + dds_delete_listener (listener); + dds_delete_qos (qos); + + reader_loan_init (desc, 1000); + init_eseq_admin (&desc->eseq_admin, flow->nkeyvals); + + desc->substat = dds_create_statistics (desc->rd); + desc->discarded_bytes = dds_lookup_statistic (desc->substat, "discarded_bytes"); + if (desc->discarded_bytes->kind != DDS_STAT_KIND_UINT64) + { + abort(); + } +} + +static void subdesc_fini(struct subdesc *desc) +{ + dds_set_listener (desc->rd, NULL); + reader_loan_fini (desc); + dds_delete_statistics (desc->substat); + dds_delete (desc->rd); + fini_eseq_admin (&desc->eseq_admin); +} + +static struct subdesc * subdesc_new(const struct dataflow *flow) +{ + struct subdesc *desc = ddsrt_malloc (sizeof(*desc)); + + subdesc_init (desc, flow); + + return desc; +} + +static void subdesc_free(struct subdesc *desc) +{ + if (desc) + { + subdesc_fini (desc); + ddsrt_free (desc); + } +} + +static void init_ping_reader(const struct dataflow *flow) +{ + dds_qos_t *qos = NULL; + dds_listener_t *listener; + char tpname[32]; + const char *tp_suf = get_topic_suffix(flow->topicsel); + + snprintf (tpname, sizeof (tpname), "DDSPerf%cPing%s", flow->reliable ? 'R' : 'U', tp_suf); + + ping_reader.flow = flow; + + qos = dds_create_qos (); + dds_qset_reliability (qos, flow->reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (10)); + if ((ping_reader.tp = dds_create_topic (dp, flow->tp_desc, tpname, qos, NULL)) < 0) + error2 ("dds_create_topic(%s) failed: %d\n", tpname, (int) ping_reader.tp); + dds_delete_qos (qos); + + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); + dds_qset_ignorelocal (qos, ignorelocal); + listener = dds_create_listener ((void *) (uintptr_t) MM_WR_PING); + dds_lset_subscription_matched (listener, subscription_matched_listener); + if ((ping_reader.rd = dds_create_reader (sub, ping_reader.tp, qos, listener)) < 0) + error2 ("dds_create_reader(%s) failed: %d\n", tpname, (int) ping_reader.rd); + dds_delete_listener (listener); + dds_delete_qos (qos); + + reader_loan_init (&ping_reader, 100); +} + +static void init_pong_reader(const struct dataflow *flow) +{ + dds_qos_t *qos = NULL; + dds_listener_t *listener; + dds_return_t rc; + const char *tp_suf = get_topic_suffix(flow->topicsel); + + snprintf (tpname_pong, sizeof (tpname_pong), "DDSPerf%cPong%s", flow->reliable ? 'R' : 'U', tp_suf); + + pong_reader.flow = flow; + + qos = dds_create_qos (); + dds_qset_reliability (qos, flow->reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (10)); + if ((pong_reader.tp = dds_create_topic (dp, flow->tp_desc, tpname_pong, qos, NULL)) < 0) + error2 ("dds_create_topic(%s) failed: %d\n", tpname_pong, (int) pong_reader.tp); + dds_delete_qos (qos); + + dds_guid_t ppguid; + if ((rc = dds_get_guid (dp, &ppguid)) < 0) + error2 ("dds_get_guid(participant) failed: %d\n", (int) rc); + struct guidstr guidstr; + make_guidstr (&guidstr, &ppguid); + dds_qos_t *subqos = dds_create_qos (); + dds_qset_partition1 (subqos, guidstr.str); + if ((pong_reader.sub = dds_create_subscriber (dp, subqos, NULL)) < 0) + error2 ("dds_create_subscriber(pong) failed: %d\n", (int) pong_reader.sub); + dds_delete_qos (subqos); + + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); + dds_qset_ignorelocal (qos, ignorelocal); + listener = dds_create_listener ((void *) (uintptr_t) MM_WR_PONG); + dds_lset_subscription_matched (listener, subscription_matched_listener); + if ((pong_reader.rd = dds_create_reader (pong_reader.sub, pong_reader.tp, qos, listener)) < 0) + error2 ("dds_create_reader(%s) failed: %d\n", tpname_pong, (int) pong_reader.rd); + dds_delete_listener (listener); + dds_delete_qos (qos); + + reader_loan_init (&pong_reader, 100); +} + +static void init_ping_writer(const struct dataflow *flow) +{ + dds_qos_t *qos = NULL; + dds_listener_t *listener; + char tpname[32]; + const char *tp_suf = get_topic_suffix(flow->topicsel); + + snprintf (tpname, sizeof (tpname), "DDSPerf%cPing%s", flow->reliable ? 'R' : 'U', tp_suf); + + ping_writer.flow = flow; + + qos = dds_create_qos (); + dds_qset_reliability (qos, flow->reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (10)); + if ((ping_writer.tp = dds_create_topic (dp, flow->tp_desc, tpname, qos, NULL)) < 0) + error2 ("dds_create_topic(%s) failed: %d\n", tpname, (int) ping_writer.tp); + dds_delete_qos (qos); + + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); + dds_qset_ignorelocal (qos, ignorelocal); + listener = dds_create_listener ((void *) (uintptr_t) MM_RD_PING); + dds_lset_publication_matched (listener, publication_matched_listener); + if ((ping_writer.wr = dds_create_writer (pub, ping_writer.tp, qos, listener)) < 0) + error2 ("dds_create_writer(%s) failed: %d\n", tpname, (int) ping_writer.wr); + dds_delete_listener (listener); + dds_delete_qos (qos); +} + + struct dds_stats { struct dds_statistics *pubstat; const struct dds_stat_keyvalue *rexmit_bytes; @@ -1663,14 +1929,15 @@ struct dds_stats { const struct dds_stat_keyvalue *discarded_bytes; }; -static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state, struct dds_stats *stats) +static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state, struct pubdesc *publist, struct subdesc *sublist) { char prefix[128]; const double ts = (double) (tnow - tref) / 1e9; + struct subdesc *sdesc; bool output = false; snprintf (prefix, sizeof (prefix), "[%"PRIdPID"] %.3f ", ddsrt_getpid (), ts); - if (pub_rate > 0) + if (globals.is_publishing) { ddsrt_mutex_lock (&pubstat_lock); hist_print (prefix, pubstat_hist, tnow - tprev, 1); @@ -1680,62 +1947,75 @@ static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, str int64_t *newraw = malloc (PINGPONG_RAWSIZE * sizeof (*newraw)); assert(newraw); - if (submode != SM_NONE) + if (globals.submode != SM_NONE) { - struct eseq_admin * const ea = &eseq_admin; uint64_t tot_nrecv = 0, tot_nlost = 0, nlost = 0; uint64_t nrecv = 0, nrecv_bytes = 0; uint64_t nrecv10s = 0, nrecv10s_bytes = 0; - uint32_t last_size = 0; - ddsrt_mutex_lock (&ea->lock); - for (uint32_t i = 0; i < ea->nph; i++) + uint32_t tot_size = 0; + sdesc = sublist; + while (sdesc) { - struct eseq_stat * const x = &ea->stats[i]; - unsigned refidx1s = (x->refidx == 0) ? (unsigned) (sizeof (x->ref) / sizeof (x->ref[0]) - 1) : (x->refidx - 1); - unsigned refidx10s = x->refidx; - tot_nrecv += x->nrecv; - tot_nlost += x->nlost; - nrecv += x->nrecv - x->ref[refidx1s].nrecv; - nlost += x->nlost - x->ref[refidx1s].nlost; - nrecv_bytes += x->nrecv_bytes - x->ref[refidx1s].nrecv_bytes; - nrecv10s += x->nrecv - x->ref[refidx10s].nrecv; - nrecv10s_bytes += x->nrecv_bytes - x->ref[refidx10s].nrecv_bytes; - last_size = x->last_size; - x->ref[x->refidx].nrecv = x->nrecv; - x->ref[x->refidx].nlost = x->nlost; - x->ref[x->refidx].nrecv_bytes = x->nrecv_bytes; - if (++x->refidx == (unsigned) (sizeof (x->ref) / sizeof (x->ref[0]))) - x->refidx = 0; + struct eseq_admin * const ea = &sdesc->eseq_admin; + uint32_t last_size = 0; + ddsrt_mutex_lock (&ea->lock); + for (uint32_t i = 0; i < ea->nph; i++) + { + struct eseq_stat * const x = &ea->stats[i]; + unsigned refidx1s = (x->refidx == 0) ? (unsigned) (sizeof (x->ref) / sizeof (x->ref[0]) - 1) : (x->refidx - 1); + unsigned refidx10s = x->refidx; + tot_nrecv += x->nrecv; + tot_nlost += x->nlost; + nrecv += x->nrecv - x->ref[refidx1s].nrecv; + nlost += x->nlost - x->ref[refidx1s].nlost; + nrecv_bytes += x->nrecv_bytes - x->ref[refidx1s].nrecv_bytes; + nrecv10s += x->nrecv - x->ref[refidx10s].nrecv; + nrecv10s_bytes += x->nrecv_bytes - x->ref[refidx10s].nrecv_bytes; + last_size = x->last_size; + x->ref[x->refidx].nrecv = x->nrecv; + x->ref[x->refidx].nlost = x->nlost; + x->ref[x->refidx].nrecv_bytes = x->nrecv_bytes; + if (++x->refidx == (unsigned) (sizeof (x->ref) / sizeof (x->ref[0]))) + x->refidx = 0; + } + ddsrt_mutex_unlock (&ea->lock); + sdesc = sdesc->next; + tot_size += last_size; } - ddsrt_mutex_unlock (&ea->lock); - if (nrecv > 0 || substat_every_second) + if (nrecv > 0 || globals.substat_every_second) { const double dt = (double) (tnow - tprev); printf ("%s size %"PRIu32" total %"PRIu64" lost %"PRIu64" delta %"PRIu64" lost %"PRIu64" rate %.2f kS/s %.2f Mb/s (%.2f kS/s %.2f Mb/s)\n", - prefix, last_size, tot_nrecv, tot_nlost, nrecv, nlost, + prefix, tot_size, tot_nrecv, tot_nlost, nrecv, nlost, (double) nrecv * 1e6 / dt, (double) nrecv_bytes * 8 * 1e3 / dt, (double) nrecv10s * 1e6 / (10 * dt), (double) nrecv10s_bytes * 8 * 1e3 / (10 * dt)); output = true; } - if (sublatency) + if (globals.sublatency) { - ddsrt_mutex_lock (&ea->lock); - for (uint32_t i = 0; i < ea->nph; i++) + sdesc = sublist; + while (sdesc) { - struct eseq_stat * const x = &ea->stats[i]; - struct latencystat y = x->info; - latencystat_reset (&x->info, newraw); - /* pongwr entries get added at the end, npongwr only grows: so can safely + struct eseq_admin * const ea = &sdesc->eseq_admin; + ddsrt_mutex_lock (&ea->lock); + for (uint32_t i = 0; i < ea->nph; i++) + { + struct eseq_stat * const x = &ea->stats[i]; + struct latencystat y = x->info; + latencystat_reset (&x->info, newraw); + /* pongwr entries get added at the end, npongwr only grows: so can safely unlock the stats in between nodes for calculating percentiles */ + ddsrt_mutex_unlock (&ea->lock); + if (y.cnt > 0) + output = true; + newraw = latencystat_print (&y, prefix, " sublat", ea->ph[i], ea->pph[i], x->last_size); + ddsrt_mutex_lock (&ea->lock); + } ddsrt_mutex_unlock (&ea->lock); - if (y.cnt > 0) - output = true; - newraw = latencystat_print (&y, prefix, " sublat", ea->ph[i], ea->pph[i], x->last_size); - ddsrt_mutex_lock (&ea->lock); + sdesc = sdesc->next; } - ddsrt_mutex_unlock (&ea->lock); } } @@ -1750,7 +2030,7 @@ static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, str ddsrt_mutex_unlock (&pongstat_lock); if (y.info.cnt > 0) output = true; - newraw = latencystat_print (&y.info, prefix, "", y.pubhandle, y.pphandle, topic_payload_size (topicsel, baggagesize)); + newraw = latencystat_print (&y.info, prefix, "", y.pubhandle, y.pphandle, topic_payload_size (pong_reader.flow->topicsel, pong_reader.flow->baggagesize)); ddsrt_mutex_lock (&pongstat_lock); } ddsrt_mutex_unlock (&pongstat_lock); @@ -1792,38 +2072,41 @@ static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, str if (output) record_netload (netload_state, prefix, tnow); - if (extended_stats && output && stats) + if (globals.extended_stats && output && publist && sublist) { - (void) dds_refresh_statistics (stats->substat); - (void) dds_refresh_statistics (stats->pubstat); - printf ("%s discarded %"PRIu64" rexmit %"PRIu64" Trexmit %"PRIu64" Tthrottle %"PRIu64" Nthrottle %"PRIu32"\n", prefix, stats->discarded_bytes->u.u64, stats->rexmit_bytes->u.u64, stats->time_rexmit->u.u64, stats->time_throttle->u.u64, stats->throttle_count->u.u32); + struct pubdesc *pdesc; + uint64_t discarded_bytes = 0; + uint64_t rexmit_bytes = 0; + uint64_t time_rexmit = 0; + uint64_t time_throttle = 0; + uint32_t throttle_count = 0; + + pdesc = publist; + while( pdesc) + { + (void) dds_refresh_statistics (pdesc->pubstat); + rexmit_bytes += pdesc->rexmit_bytes->u.u64; + time_rexmit += pdesc->time_rexmit->u.u64; + time_throttle += pdesc->time_throttle->u.u64; + throttle_count += pdesc->throttle_count->u.u32; + pdesc = pdesc->next; + } + + sdesc = sublist; + while( sdesc) + { + (void) dds_refresh_statistics (sdesc->substat); + discarded_bytes += sdesc->discarded_bytes->u.u64; + sdesc = sdesc->next; + } + + printf ("%s discarded %"PRIu64" rexmit %"PRIu64" Trexmit %"PRIu64" Tthrottle %"PRIu64" Nthrottle %"PRIu32"\n", prefix, discarded_bytes, rexmit_bytes, time_rexmit, time_throttle, throttle_count); } fflush (stdout); return output; } -static void subthread_arg_init (struct subthread_arg *arg, dds_entity_t rd, uint32_t max_samples) -{ - arg->rd = rd; - arg->max_samples = max_samples; - arg->mseq = malloc (arg->max_samples * sizeof (arg->mseq[0])); - assert(arg->mseq); - arg->iseq = malloc (arg->max_samples * sizeof (arg->iseq[0])); - assert(arg->iseq); - DDSRT_WARNING_MSVC_OFF(6386) - for (uint32_t i = 0; i < arg->max_samples; i++) - arg->mseq[i] = NULL; - DDSRT_WARNING_MSVC_ON(6386) -} - -static void subthread_arg_fini (struct subthread_arg *arg) -{ - dds_return_loan(arg->rd, arg->mseq, (int32_t) arg->max_samples); - free (arg->mseq); - free (arg->iseq); -} - #if !DDSRT_WITH_FREERTOS && !__ZEPHYR__ static void signal_handler (int sig) { @@ -1856,7 +2139,7 @@ static void sigxfsz_handler (int sig __attribute__ ((unused))) if (write (2, msg, sizeof (msg) - 1) < 0) { /* may not ignore return value according to Linux/gcc */ } - print_stats (0, tnow, tnow - DDS_SECS (1), NULL, NULL, NULL); + print_stats (0, tnow, tnow - DDS_SECS (1), NULL, NULL, NULL, NULL); kill (getpid (), 9); } } @@ -1946,6 +2229,12 @@ MODE... is zero or more of:\n\ ping, for this, specify a percentage either as \"ping X%%\" (the\n\ \"ping\" keyword is optional, the %% sign is not). \"loan\" uses\n\ loans on the writer.\n\ +OR\n\ + batch scenario.json node\n\ + Allows to run several publishers or subscribers. The scenario file\n\ + specifies the parameters of the data flows used in the scenario.\n\ + The node parameter selects which node and associated data flows are\n\ + applicable to this instance.\n\ \n\ Payload size (including fixed part of topic) may be set as part of a\n\ \"ping\" or \"pub\" specification for topic KS (there is only size,\n\ @@ -2026,89 +2315,46 @@ static int string_int_map_lookup (const struct string_int_map_elem *elems, const return (match == SIZE_MAX) ? -1 : elems[match].value; } -struct multiplier { - const char *suffix; - int mult; -}; - -static const struct multiplier frequency_units[] = { - { "Hz", 1 }, - { "kHz", 1000 }, - { NULL, 0 } -}; - -static const struct multiplier size_units[] = { - { "B", 1 }, - { "k", 1024 }, - { "M", 1048576 }, - { "kB", 1024 }, - { "KiB", 1024 }, - { "MB", 1048576 }, - { "MiB", 1048576 }, - { NULL, 0 } -}; - -static int lookup_multiplier (const struct multiplier *units, const char *suffix) -{ - while (*suffix == ' ') - suffix++; - if (*suffix == 0) - return 1; - else if (units == NULL) - return 0; - else - { - for (size_t i = 0; units[i].suffix; i++) - if (strcmp (units[i].suffix, suffix) == 0) - return units[i].mult; - return 0; - } -} - -static bool set_simple_uint32 (int *xoptind, int xargc, char * const xargv[], const char *token, const struct multiplier *units, uint32_t *val) +static bool set_simple_uint32 (int *xoptind, int xargc, char * const xargv[], const char *token, bool (*func)(const char *, uint32_t *), uint32_t *val) { if (strcmp (xargv[*xoptind], token) != 0) return false; else { - unsigned x; - int pos, mult; if (++(*xoptind) == xargc) error3 ("argument missing in %s specification\n", token); - if (sscanf (xargv[*xoptind], "%u%n", &x, &pos) == 1 && (mult = lookup_multiplier (units, xargv[*xoptind] + pos)) > 0) - *val = x * (unsigned) mult; - else + if (!func(xargv[*xoptind], val)) error3 ("%s: invalid %s specification\n", xargv[*xoptind], token); return true; } } -static void set_mode_ping (int *xoptind, int xargc, char * const xargv[]) +static void set_mode_ping (struct dataflow *flow, int *xoptind, int xargc, char * const xargv[]) { - ping_intv = 0; - pingpongmode = SM_LISTENER; + globals.ping_intv = 0; + globals.pingpongmode = SM_LISTENER; while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) { - int pos = 0, mult = 1; double ping_rate; - if (strncmp (xargv[*xoptind], "inf", 3) == 0 && lookup_multiplier (frequency_units, xargv[*xoptind] + 3) > 0) - { - ping_intv = 0; - } - else if (sscanf (xargv[*xoptind], "%lf%n", &ping_rate, &pos) == 1 && (mult = lookup_multiplier (frequency_units, xargv[*xoptind] + pos)) > 0) + + if (string_to_frequency(xargv[*xoptind], &ping_rate)) { - ping_rate *= mult; - if (ping_rate == 0) ping_intv = DDS_INFINITY; - else if (ping_rate > 0) ping_intv = (dds_duration_t) (1e9 / ping_rate + 0.5); - else error3 ("%s: invalid ping rate\n", xargv[*xoptind]); + if (ping_rate < HUGE_VAL) + { + if (ping_rate == 0) globals.ping_intv = DDS_INFINITY; + else if (ping_rate > 0) globals.ping_intv = (dds_duration_t) (1e9 / ping_rate + 0.5); + else error3 ("%s: invalid ping rate\n", xargv[*xoptind]); + } + else + globals.ping_intv = 0; } - else if (set_simple_uint32 (xoptind, xargc, xargv, "size", size_units, &baggagesize)) + else if (set_simple_uint32 (xoptind, xargc, xargv, "size", string_to_size, &flow->baggagesize)) { /* no further work needed */ } else { - pingpongmode = (enum submode) string_int_map_lookup (pingpongmodes, "ping mode", xargv[*xoptind], true); + globals.pingpongmode = (enum submode) string_int_map_lookup (pingpongmodes, "ping mode", xargv[*xoptind], true); } (*xoptind)++; } @@ -2116,10 +2362,10 @@ static void set_mode_ping (int *xoptind, int xargc, char * const xargv[]) static void set_mode_pong (int *xoptind, int xargc, char * const xargv[]) { - pingpongmode = SM_LISTENER; + globals.pingpongmode = SM_LISTENER; while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) { - pingpongmode = (enum submode) string_int_map_lookup (pingpongmodes, "pong mode", xargv[*xoptind], true); + globals.pingpongmode = (enum submode) string_int_map_lookup (pingpongmodes, "pong mode", xargv[*xoptind], true); (*xoptind)++; } } @@ -2132,54 +2378,52 @@ static void set_mode_sub (int *xoptind, int xargc, char * const xargv[]) { "listener", SM_LISTENER }, { NULL, 0 } }; - submode = SM_LISTENER; + globals.submode = SM_LISTENER; while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) { - submode = (enum submode) string_int_map_lookup (submodes, "subscription mode", xargv[*xoptind], true); + globals.submode = (enum submode) string_int_map_lookup (submodes, "subscription mode", xargv[*xoptind], true); (*xoptind)++; } } -static void set_mode_pub (int *xoptind, int xargc, char * const xargv[]) +static void set_mode_pub (struct dataflow *flow, int *xoptind, int xargc, char * const xargv[]) { - pub_rate = HUGE_VAL; - burstsize = 1; - ping_frac = 0; + flow->name = NULL; + flow->pub_rate = HUGE_VAL; + flow->burstsize = 1; + flow->ping_frac = 0; while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) { - int pos = 0, mult = 1; + int pos = 0; double r; - if (strncmp (xargv[*xoptind], "inf", 3) == 0 && lookup_multiplier (frequency_units, xargv[*xoptind] + 3) > 0) - { - pub_rate = HUGE_VAL; - } - else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && (mult = lookup_multiplier (frequency_units, xargv[*xoptind] + pos)) > 0) + + if (string_to_frequency(xargv[*xoptind], &r)) { if (r < 0) error3 ("%s: invalid publish rate\n", xargv[*xoptind]); - pub_rate = r * mult; + flow->pub_rate = r; } - else if (set_simple_uint32 (xoptind, xargc, xargv, "burst", NULL, &burstsize)) + else if (set_simple_uint32 (xoptind, xargc, xargv, "burst", string_to_number, &flow->burstsize)) { /* no further work needed */ } - else if (set_simple_uint32 (xoptind, xargc, xargv, "size", size_units, &baggagesize)) + else if (set_simple_uint32 (xoptind, xargc, xargv, "size", string_to_size, &flow->baggagesize)) { /* no further work needed */ } else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && strcmp (xargv[*xoptind] + pos, "%") == 0) { if (r < 0 || r > 100) error3 ("%s: ping fraction out of range\n", xargv[*xoptind]); - ping_frac = (uint32_t) (UINT32_MAX * (r / 100.0) + 0.5); + flow->ping_frac = (uint32_t) (UINT32_MAX * (r / 100.0) + 0.5); } - else if (strcmp (xargv[*xoptind], "ping") == 0 && *xoptind + 1 < xargc && sscanf (xargv[*xoptind + 1], "%lf%%%n", &pub_rate, &pos) == 1 && xargv[*xoptind + 1][pos] == 0) + else if (strcmp (xargv[*xoptind], "ping") == 0 && *xoptind + 1 < xargc && sscanf (xargv[*xoptind + 1], "%lf%%%n", &flow->pub_rate, &pos) == 1 && xargv[*xoptind + 1][pos] == 0) { ++(*xoptind); if (r < 0 || r > 100) error3 ("%s: ping fraction out of range\n", xargv[*xoptind]); - ping_frac = (uint32_t) (UINT32_MAX * (r / 100.0) + 0.5); + flow->ping_frac = (uint32_t) (UINT32_MAX * (r / 100.0) + 0.5); } else if (strcmp (xargv[*xoptind], "loan") == 0) { - use_writer_loan = true; + flow->use_writer_loan = true; } else { @@ -2189,23 +2433,23 @@ static void set_mode_pub (int *xoptind, int xargc, char * const xargv[]) } } -static void set_mode (int xoptind, int xargc, char * const xargv[]) +static void set_mode (struct dataflow *flow, int xoptind, int xargc, char * const xargv[]) { int code; - pub_rate = 0.0; - submode = SM_NONE; - pingpongmode = SM_LISTENER; - ping_intv = DDS_INFINITY; - ping_frac = 0; + flow->pub_rate = 0.0; + globals.submode = SM_NONE; + globals.pingpongmode = SM_LISTENER; + globals.ping_intv = DDS_INFINITY; + flow->ping_frac = 0; while (xoptind < xargc && (code = exact_string_int_map_lookup (modestrings, "mode string", xargv[xoptind], true)) != -1) { xoptind++; switch (code) { - case 1: set_mode_ping (&xoptind, xargc, xargv); break; + case 1: set_mode_ping (flow, &xoptind, xargc, xargv); break; case 2: set_mode_pong (&xoptind, xargc, xargv); break; case 3: set_mode_sub (&xoptind, xargc, xargv); break; - case 4: set_mode_pub (&xoptind, xargc, xargv); break; + case 4: set_mode_pub (flow, &xoptind, xargc, xargv); break; } } if (xoptind != xargc) @@ -2217,170 +2461,199 @@ static void set_mode (int xoptind, int xargc, char * const xargv[]) static bool wait_for_initial_matches (void) { dds_time_t tnow = dds_time (); - const dds_time_t tendwait = tnow + (dds_duration_t) (initmaxwait * 1e9); + const dds_time_t tendwait = tnow + (dds_duration_t) (globals.initmaxwait * 1e9); ddsrt_mutex_lock (&disc_lock); - while (matchcount < minmatch && tnow < tendwait) + while (matchcount < globals.minmatch && tnow < tendwait) { ddsrt_mutex_unlock (&disc_lock); dds_sleepfor (DDS_MSECS (100)); ddsrt_mutex_lock (&disc_lock); tnow = dds_time (); } - const bool ok = (matchcount >= minmatch); + const bool ok = (matchcount >= globals.minmatch); if (!ok) { /* set minmatch to an impossible value to avoid a match occurring between now and - the determining of the exit status from causing a successful return */ - minmatch = UINT32_MAX; + the determining of the exit status from causing a successful return */ + globals.minmatch = UINT32_MAX; } ddsrt_mutex_unlock (&disc_lock); if (!ok) - return false; + return false; dds_sleepfor (DDS_MSECS (100)); return true; } -int main (int argc, char *argv[]) +static void parse_arguments(int argc, char *argv[], struct dataflow **flows) { - dds_entity_t ws; - dds_return_t rc; - dds_qos_t *qos; - dds_listener_t *listener; int opt; - bool collect_stats = false; - dds_time_t tref = DDS_INFINITY; - ddsrt_threadattr_t attr; - ddsrt_thread_t pubtid, subtid, subpingtid, subpongtid; -#if !_WIN32 && !DDSRT_WITH_FREERTOS && !__ZEPHYR__ - sigset_t sigset, osigset; - ddsrt_thread_t sigtid; -#endif - char netload_if[256] = {0}; - double netload_bw = -1; - double rss_init = 0.0, rss_final = 0.0; - double livemem_init = 0.0, livemem_final = 0.0; - ddsrt_threadattr_init (&attr); - argv0 = argv[0]; + char tpname[32]; + struct dataflow *flow = dataflow_new(); while ((opt = getopt (argc, argv, "1cd:D:i:n:k:ulLK:T:Q:R:Xh")) != EOF) { int pos; switch (opt) { - case '1': substat_every_second = true; break; - case 'c': collect_stats = true; break; - case 'd': { - char *col; - (void) ddsrt_strlcpy (netload_if, optarg, sizeof (netload_if)); - if ((col = strrchr (netload_if, ':')) == NULL) - netload_bw = 0; - else - { - if (col == netload_if || (sscanf (col+1, "%lf%n", &netload_bw, &pos) != 1 || (col+1)[pos] != 0)) - error3 ("-d %s: expected DEVICE:BANDWIDTH\n", optarg); - *col = 0; - } - break; - } - case 'D': dur = atof (optarg); if (dur <= 0) dur = HUGE_VAL; break; - case 'i': did = (dds_domainid_t) atoi (optarg); break; - case 'n': nkeyvals = (unsigned) atoi (optarg); break; - case 'u': reliable = false; break; - case 'k': histdepth = atoi (optarg); if (histdepth < 0) histdepth = 0; break; - case 'l': sublatency = true; break; - case 'L': ignorelocal = DDS_IGNORELOCAL_NONE; break; - case 'T': - if (strcmp (optarg, "KS") == 0) topicsel = KS; - else if (strcmp (optarg, "K32") == 0) topicsel = K32; - else if (strcmp (optarg, "K256") == 0) topicsel = K256; - else if (strcmp (optarg, "OU") == 0) topicsel = OU; - else if (strcmp (optarg, "UK16") == 0) topicsel = UK16; - else if (strcmp (optarg, "UK1k") == 0) topicsel = UK1k; - else if (strcmp (optarg, "UK1024") == 0) topicsel = UK1k; // backwards compat - else if (strcmp (optarg, "UK64k") == 0) topicsel = UK64k; - else if (strcmp (optarg, "S16") == 0) topicsel = S16; - else if (strcmp (optarg, "S256") == 0) topicsel = S256; - else if (strcmp (optarg, "S4k") == 0) topicsel = S4k; - else if (strcmp (optarg, "S32k") == 0) topicsel = S32k; - else error3 ("-T %s: unknown topic\n", optarg); - break; - case 'Q': { - double d; - unsigned long n; - if (sscanf (optarg, "rss:%lf%n", &d, &pos) == 1 && (optarg[pos] == 0 || optarg[pos] == '%')) { - if (optarg[pos] == 0) rss_term = d * 1048576.0; else rss_factor = 1.0 + d / 100.0; - rss_check = true; - } else if (sscanf (optarg, "livemem:%lf%n", &d, &pos) == 1 && (optarg[pos] == 0 || optarg[pos] == '%')) { - if (optarg[pos] == 0) livemem_term = d * 1048576.0; else livemem_factor = 1.0 + d / 100.0; - livemem_check = true; - } else if (sscanf (optarg, "samples:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { - min_received = (uint64_t) n; - } else if (sscanf (optarg, "roundtrips:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { - min_roundtrips = (uint64_t) n; - } else if (sscanf (optarg, "maxwait:%lf%n", &maxwait, &pos) == 1 && optarg[pos] == 0) { - maxwait = (maxwait <= 0) ? HUGE_VAL : maxwait; - } else if (sscanf (optarg, "initwait:%lf%n", &initmaxwait, &pos) == 1 && optarg[pos] == 0) { - initmaxwait = (initmaxwait <= 0) ? 0 : initmaxwait; - } else if (sscanf (optarg, "minmatch:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { - minmatch = (uint32_t) n; - } else { - error3 ("-Q %s: invalid success criterium\n", optarg); - } - break; + case '1': globals.substat_every_second = true; break; + case 'c': globals.collect_stats = true; break; + case 'd': { + char *col; + (void) ddsrt_strlcpy (globals.netload_if, optarg, sizeof (globals.netload_if)); + if ((col = strrchr (globals.netload_if, ':')) == NULL) + globals.netload_bw = 0; + else + { + if (col == globals.netload_if || (sscanf (col+1, "%lf%n", &globals.netload_bw, &pos) != 1 || (col+1)[pos] != 0)) + error3 ("-d %s: expected DEVICE:BANDWIDTH\n", optarg); + *col = 0; } - case 'X': extended_stats = true; break; - case 'R': { - tref = 0; - if (sscanf (optarg, "%"SCNd64"%n", &tref, &pos) != 1 || optarg[pos] != 0) - error3 ("-R %s: invalid reference time\n", optarg); - break; + break; + } + case 'D': globals.dur = atof (optarg); if (globals.dur <= 0) globals.dur = HUGE_VAL; break; + case 'i': did = (dds_domainid_t) atoi (optarg); break; + case 'n': flow->nkeyvals = (unsigned) atoi (optarg); break; + case 'u': flow->reliable = false; break; + case 'k': flow->histdepth = atoi (optarg); if (flow->histdepth < 0) flow->histdepth = 0; break; + case 'l': globals.sublatency = true; break; + case 'L': ignorelocal = DDS_IGNORELOCAL_NONE; break; + case 'T': + if (!get_topicsel_from_string(optarg, &flow->topicsel)) + error3 ("-T %s: unknown topic\n", optarg); + break; + case 'Q': { + double d; + unsigned long n; + if (sscanf (optarg, "rss:%lf%n", &d, &pos) == 1 && (optarg[pos] == 0 || optarg[pos] == '%')) { + if (optarg[pos] == 0) globals.rss_term = d * 1048576.0; else globals.rss_factor = 1.0 + d / 100.0; + globals.rss_check = true; + } else if (sscanf (optarg, "livemem:%lf%n", &d, &pos) == 1 && (optarg[pos] == 0 || optarg[pos] == '%')) { + if (optarg[pos] == 0) globals.livemem_term = d * 1048576.0; else globals.livemem_factor = 1.0 + d / 100.0; + globals.livemem_check = true; + } else if (sscanf (optarg, "samples:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { + globals.min_received = (uint64_t) n; + } else if (sscanf (optarg, "roundtrips:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { + globals.min_roundtrips = (uint64_t) n; + } else if (sscanf (optarg, "maxwait:%lf%n", &globals.maxwait, &pos) == 1 && optarg[pos] == 0) { + globals.maxwait = (globals.maxwait <= 0) ? HUGE_VAL : globals.maxwait; + } else if (sscanf (optarg, "initwait:%lf%n", &globals.initmaxwait, &pos) == 1 && optarg[pos] == 0) { + globals.initmaxwait = (globals.initmaxwait <= 0) ? 0 : globals.initmaxwait; + } else if (sscanf (optarg, "minmatch:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { + globals.minmatch = (uint32_t) n; + } else { + error3 ("-Q %s: invalid success criterium\n", optarg); } - case 'h': default: usage (); break; + break; + } + case 'X': globals.extended_stats = true; break; + case 'R': { + globals.tref = 0; + if (sscanf (optarg, "%"SCNd64"%n", &globals.tref, &pos) != 1 || optarg[pos] != 0) + error3 ("-R %s: invalid reference time\n", optarg); + break; + } + case 'h': default: usage (); break; } } if (optind == argc || (optind + 1 == argc && strcmp (argv[optind], "help") == 0)) usage (); + else if (optind + 3 == argc && strcmp (argv[optind], "batch") == 0) + { + if (!read_parameters(argv[optind+1], argv[optind+2], flows, &globals)) + exit(3); + dataflow_free(flow); + batch_mode = true; + } else if (optind + 1 == argc && strcmp (argv[optind], "sanity") == 0) { char * const sanity[] = { "ping", "1Hz" }; - set_mode (0, 2, sanity); + set_mode (flow, 0, 2, sanity); + *flows = flow; } else { - set_mode (optind, argc, argv); + set_mode (flow, optind, argc, argv); + *flows = flow; } - if (nkeyvals == 0) - nkeyvals = 1; - if (topicsel == OU && nkeyvals != 1) - error3 ("-n %u invalid: topic OU has no key\n", nkeyvals); - if (topicsel != KS && baggagesize != 0) - error3 ("size %"PRIu32" invalid: only topic KS has a sequence\n", baggagesize); - if (topicsel == KS && use_writer_loan) - error3 ("topic KS is not supported with writer loans because it contains a sequence\n"); - if (baggagesize != 0 && baggagesize < 12) - error3 ("size %"PRIu32" invalid: too small to allow for overhead\n", baggagesize); - else if (baggagesize > 0) - baggagesize -= 12; - - if (livemem_check) + if (!batch_mode) + { + if (flow->nkeyvals == 0) + flow->nkeyvals = 1; + if (flow->topicsel == OU && flow->nkeyvals != 1) + error3 ("-n %u invalid: topic OU has no key\n", flow->nkeyvals); + if (flow->topicsel != KS && flow->baggagesize != 0) + error3 ("size %"PRIu32" invalid: only topic KS has a sequence\n", flow->baggagesize); + if (flow->topicsel == KS && flow->use_writer_loan) + error3 ("topic KS is not supported with writer loans because it contains a sequence\n"); + if (flow->baggagesize != 0 && flow->baggagesize < 12) + error3 ("size %"PRIu32" invalid: too small to allow for overhead\n", flow->baggagesize); + else if (flow->baggagesize > 0) + flow->baggagesize -= 12; + + { + const char *tp_suf = get_topic_suffix(flow->topicsel); + const dds_topic_descriptor_t *tp_desc = get_topic_descriptor(flow->topicsel); + + snprintf (tpname, sizeof (tpname), "DDSPerf%cData%s", flow->reliable ? 'R' : 'U', tp_suf); + flow->topicname = string_copy (tpname); + flow->partition = NULL; + flow->tp_desc = tp_desc; + } + } +} + + +int main (int argc, char *argv[]) +{ + dds_entity_t ws; + dds_return_t rc; + dds_qos_t *qos; + dds_listener_t *listener; + ddsrt_threadattr_t attr; + ddsrt_thread_t subtid, subpingtid, subpongtid; +#if !_WIN32 && !DDSRT_WITH_FREERTOS && !__ZEPHYR__ + sigset_t sigset, osigset; + ddsrt_thread_t sigtid; +#endif + double rss_init = 0.0, rss_final = 0.0; + double livemem_init = 0.0, livemem_final = 0.0; + struct record_cputime_state *cputime_state = NULL; + ddsrt_threadattr_init (&attr); + struct dataflow *flows = NULL; + struct dataflow *flow; + struct pubdesc *publishers = NULL; + struct subdesc *subscribers = NULL; + struct pubdesc *pdesc; + struct subdesc *sdesc; + + argv0 = argv[0]; + + init_globals(&globals); + + parse_arguments(argc, argv, &flows); + + if (!flows) + error3 ("No flow specified\n"); + + if (globals.livemem_check) { // better make sure no calls to ddsrt_malloc, etc., take place before this ddsrt_set_allocator ((ddsrt_allocation_ops_t){ .malloc = ddsperf_malloc, - .calloc = ddsperf_calloc, - .realloc = ddsperf_realloc, - .free = ddsperf_free + .calloc = ddsperf_calloc, + .realloc = ddsperf_realloc, + .free = ddsperf_free }); } struct record_netload_state *netload_state; - if (netload_bw < 0) + if (globals.netload_bw < 0) netload_state = NULL; - else if ((netload_state = record_netload_new (netload_if, netload_bw)) == NULL) - error3 ("can't get network utilization information for device %s\n", netload_if); + else if ((netload_state = record_netload_new (globals.netload_if, globals.netload_bw)) == NULL) + error3 ("can't get network utilization information for device %s\n", globals.netload_if); ddsrt_avl_init (&ppants_td, &ppants); ddsrt_fibheap_init (&ppants_to_match_fhd, &ppants_to_match); @@ -2392,7 +2665,7 @@ int main (int argc, char *argv[]) pubstat_hist = hist_new (30, 1000, 0); - if ((async_listener = async_listener_new ()) == NULL || !async_listener_start (async_listener)) + if ((async_listener = async_listener_new ()) == NULL) error2 ("failed to start asynchronous listener thread\n"); qos = dds_create_qos (); @@ -2401,7 +2674,7 @@ int main (int argc, char *argv[]) { int cnt; char udata[256]; - cnt = snprintf (udata, sizeof (udata), UDATA_MAGIC"%d:%"PRIdPID":", submode != SM_NONE, ddsrt_getpid ()); + cnt = snprintf (udata, sizeof (udata), UDATA_MAGIC"%d:%"PRIdPID":", globals.submode != SM_NONE, ddsrt_getpid ()); assert (cnt >= 0 && (size_t)cnt < sizeof (udata)); if (ddsrt_gethostname (udata + cnt, sizeof (udata) - (size_t)cnt) != DDS_RETCODE_OK) ddsrt_strlcpy (udata + cnt, "?", sizeof(udata) - (size_t)cnt); @@ -2427,37 +2700,6 @@ int main (int argc, char *argv[]) error2 ("dds_create_topic(%s) failed: %d\n", "DDSPerfCPUStats", (int) tp_stat); dds_delete_qos (qos); - { - const char *tp_suf = "KS"; - const dds_topic_descriptor_t *tp_desc = NULL; - switch (topicsel) - { - case KS: tp_desc = &KeyedSeq_desc; break; - case K32: tp_suf = "K32"; tp_desc = &Keyed32_desc; break; - case K256: tp_suf = "K256"; tp_desc = &Keyed256_desc; break; - case OU: tp_suf = "OU"; tp_desc = &OneULong_desc; break; - case UK16: tp_suf = "UK16"; tp_desc = &Unkeyed16_desc; break; - case UK1k: tp_suf = "UK1k"; tp_desc = &Unkeyed1k_desc; break; - case UK64k: tp_suf = "UK64k"; tp_desc = &Unkeyed64k_desc; break; - case S16: tp_suf = "S16"; tp_desc = &Struct16_desc; break; - case S256: tp_suf = "S256"; tp_desc = &Struct256_desc; break; - case S4k: tp_suf = "S4k"; tp_desc = &Struct4k_desc; break; - case S32k: tp_suf = "S32k"; tp_desc = &Struct32k_desc; break; - } - snprintf (tpname_data, sizeof (tpname_data), "DDSPerf%cData%s", reliable ? 'R' : 'U', tp_suf); - snprintf (tpname_ping, sizeof (tpname_ping), "DDSPerf%cPing%s", reliable ? 'R' : 'U', tp_suf); - snprintf (tpname_pong, sizeof (tpname_pong), "DDSPerf%cPong%s", reliable ? 'R' : 'U', tp_suf); - qos = dds_create_qos (); - dds_qset_reliability (qos, reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (10)); - if ((tp_data = dds_create_topic (dp, tp_desc, tpname_data, qos, NULL)) < 0) - error2 ("dds_create_topic(%s) failed: %d\n", tpname_data, (int) tp_data); - if ((tp_ping = dds_create_topic (dp, tp_desc, tpname_ping, qos, NULL)) < 0) - error2 ("dds_create_topic(%s) failed: %d\n", tpname_ping, (int) tp_ping); - if ((tp_pong = dds_create_topic (dp, tp_desc, tpname_pong, qos, NULL)) < 0) - error2 ("dds_create_topic(%s) failed: %d\n", tpname_pong, (int) tp_pong); - dds_delete_qos (qos); - } - /* participants reader must exist before the "publication matched" or "subscription matched" listener is invoked, or it won't be able to get the details (FIXME: even the DDS spec has convenience functions for that ...) */ @@ -2468,94 +2710,61 @@ int main (int argc, char *argv[]) if ((rd_publications = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL)) < 0) error2 ("dds_create_reader(publications) failed: %d\n", (int) rd_publications); - /* Set DATA_AVAILABLE listener on participant later: it has the nasty habit of potentially - triggering before the reader is accessible to the application via its handle. Furthermore, - upon matching a participant, a new writer is created that gets a publication_matched - listener, which in turn depends on rd_subscriptions. */ - listener = dds_create_listener (NULL); - dds_lset_data_available (listener, participant_data_listener); - dds_set_listener (rd_participants, listener); - dds_delete_listener (listener); - /* then there is the matter of data arriving prior to setting the listener ... this state - of affairs is undoubtedly a bug; call the "asynchronous" one synchronously (this is an - application thread anyway) */ - async_participant_data_listener (rd_participants, NULL); - /* stats writer always exists, reader only when we were requested to collect & print stats */ qos = dds_create_qos (); dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); dds_qset_ignorelocal (qos, DDS_IGNORELOCAL_PARTICIPANT); if ((wr_stat = dds_create_writer (pub, tp_stat, qos, NULL)) < 0) error2 ("dds_create_writer(statistics) failed: %d\n", (int) wr_stat); - if (collect_stats) + if (globals.collect_stats) { if ((rd_stat = dds_create_reader (sub, tp_stat, qos, NULL)) < 0) error2 ("dds_create_reader(statistics) failed: %d\n", (int) rd_stat); } dds_delete_qos (qos); - /* ping reader/writer uses keep-last-1 history; not checking matching on these (yet) */ - qos = dds_create_qos (); - dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); - dds_qset_ignorelocal (qos, ignorelocal); - listener = dds_create_listener ((void *) (uintptr_t) MM_WR_PING); - dds_lset_subscription_matched (listener, subscription_matched_listener); - if ((rd_ping = dds_create_reader (sub, tp_ping, qos, listener)) < 0) - error2 ("dds_create_reader(%s) failed: %d\n", tpname_ping, (int) rd_ping); - dds_delete_listener (listener); - listener = dds_create_listener ((void *) (uintptr_t) MM_RD_PING); - dds_lset_publication_matched (listener, publication_matched_listener); - if ((wr_ping = dds_create_writer (pub, tp_ping, qos, listener)) < 0) - error2 ("dds_create_writer(%s) failed: %d\n", tpname_ping, (int) wr_ping); - dds_delete_listener (listener); - dds_delete_qos (qos); + flow = flows; + while (flow) + { + if (flow->mode == FLOW_PUB || (flow->mode == FLOW_DEFAULT && flow->pub_rate > 0)) + { + pdesc = pubdesc_new(flow); + pdesc->next = publishers; + publishers = pdesc; + globals.is_publishing = true; + } + if (flow->mode == FLOW_SUB || (flow->mode == FLOW_DEFAULT && globals.submode != SM_NONE)) + { + sdesc = subdesc_new(flow); + sdesc->next = subscribers; + subscribers = sdesc; + } + flow = flow->next; + } - /* data reader/writer use a keep-all history with generous resource limits. */ - qos = dds_create_qos (); - if (histdepth == 0) - dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 1); - else - dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, histdepth); - dds_qset_resource_limits (qos, 10000, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); - dds_qset_ignorelocal (qos, ignorelocal); - listener = dds_create_listener ((void *) (uintptr_t) MM_WR_DATA); - dds_lset_subscription_matched (listener, subscription_matched_listener); - if (submode != SM_NONE && (rd_data = dds_create_reader (sub, tp_data, qos, listener)) < 0) - error2 ("dds_create_reader(%s) failed: %d\n", tpname_data, (int) rd_data); - dds_delete_listener (listener); - listener = dds_create_listener ((void *) (uintptr_t) MM_RD_DATA); - dds_lset_publication_matched (listener, publication_matched_listener); - dds_qset_writer_batching (qos, true); - if ((wr_data = dds_create_writer (pub, tp_data, qos, listener)) < 0) - error2 ("dds_create_writer(%s) failed: %d\n", tpname_data, (int) wr_data); - dds_qset_writer_batching (qos, false); + if (!batch_mode) + { + init_ping_reader(flows); + init_ping_writer(flows); + init_pong_reader(flows); + } + + if (!async_listener_start (async_listener)) + error2 ("failed to start asynchronous listener thread\n"); + + /* Set DATA_AVAILABLE listener on participant later: it has the nasty habit of potentially + triggering before the reader is accessible to the application via its handle. Furthermore, + upon matching a participant, a new writer is created that gets a publication_matched + listener, which in turn depends on rd_subscriptions. */ + listener = dds_create_listener (NULL); + dds_lset_data_available (listener, participant_data_listener); + dds_set_listener (rd_participants, listener); dds_delete_listener (listener); - /* We only need a pong reader when sending data with a non-zero probability - of it being a "ping", or when sending "real" pings. I.e., if - rate > 0 && ping_frac > 0) || ping_intv != DDS_NEVER - but it doesn't really hurt to have the reader either, and always creating - it and futhermore eagerly creating the pong writers means we can do more - checking. */ - { - dds_guid_t ppguid; - if ((rc = dds_get_guid (dp, &ppguid)) < 0) - error2 ("dds_get_guid(participant) failed: %d\n", (int) rc); - struct guidstr guidstr; - make_guidstr (&guidstr, &ppguid); - dds_entity_t sub_pong; - dds_qos_t *subqos = dds_create_qos (); - dds_qset_partition1 (subqos, guidstr.str); - if ((sub_pong = dds_create_subscriber (dp, subqos, NULL)) < 0) - error2 ("dds_create_subscriber(pong) failed: %d\n", (int) sub_pong); - dds_delete_qos (subqos); - listener = dds_create_listener ((void *) (uintptr_t) MM_WR_PONG); - dds_lset_subscription_matched (listener, subscription_matched_listener); - if ((rd_pong = dds_create_reader (sub_pong, tp_pong, qos, listener)) < 0) - error2 ("dds_create_reader(%s) failed: %d\n", tpname_pong, (int) rd_pong); - dds_delete_listener (listener); - } - dds_delete_qos (qos); + /* then there is the matter of data arriving prior to setting the listener ... this state + of affairs is undoubtedly a bug; call the "asynchronous" one synchronously (this is an + application thread anyway) */ + async_participant_data_listener (rd_participants, NULL); if ((termcond = dds_create_guardcondition (dp)) < 0) error2 ("dds_create_guardcondition(termcond) failed: %d\n", (int) termcond); @@ -2564,38 +2773,47 @@ int main (int argc, char *argv[]) if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) error2 ("dds_waitset_attach(main, termcond) failed: %d\n", (int) rc); - /* Make publisher & subscriber thread arguments and start the threads we - need (so what if we allocate memory for reading data even if we don't - have a reader or will never really be receiving data) */ - struct subthread_arg subarg_data, subarg_ping, subarg_pong; - init_eseq_admin (&eseq_admin, nkeyvals); - subthread_arg_init (&subarg_data, rd_data, 1000); - subthread_arg_init (&subarg_ping, rd_ping, 100); - subthread_arg_init (&subarg_pong, rd_pong, 100); - uint32_t (*subthread_func) (void *arg) = NULL; - switch (submode) + uint32_t (*subthread_func) (void *arg) = 0; + if (subscribers != NULL) { + switch (globals.submode) + { case SM_NONE: break; case SM_WAITSET: subthread_func = subthread_waitset; break; case SM_POLLING: subthread_func = subthread_polling; break; case SM_LISTENER: break; + } } - memset (&pubtid, 0, sizeof (pubtid)); memset (&subtid, 0, sizeof (subtid)); memset (&subpingtid, 0, sizeof (subpingtid)); memset (&subpongtid, 0, sizeof (subpongtid)); /* Just before starting the threads but after setting everything up, wait for the required number of peers, if requested to do so */ - if (initmaxwait > 0 && !wait_for_initial_matches()) + if (globals.initmaxwait > 0 && !wait_for_initial_matches()) goto err_minmatch_wait; - if (pub_rate > 0) - ddsrt_thread_create (&pubtid, "pub", &attr, pubthread, NULL); - if (subthread_func != NULL) - ddsrt_thread_create (&subtid, "sub", &attr, subthread_func, &subarg_data); - else if (submode == SM_LISTENER) - set_data_available_listener (rd_data, "rd_data", data_available_listener, &subarg_data); + pdesc = publishers; + while (pdesc) + { + const char *pthrname = (pdesc->flow->name != NULL) ? pdesc->flow->name : "pub"; + ddsrt_thread_create (&pdesc->tid, pthrname, &attr, pubthread, pdesc); + pdesc = pdesc->next; + } + + if (subthread_func != 0) + ddsrt_thread_create (&subtid, "sub", &attr, subthread_func, subscribers); + else if (globals.submode == SM_LISTENER && subscribers) + { + sdesc = subscribers; + while (sdesc) + { + const char *sthrname = (sdesc->flow->name != NULL) ? sdesc->flow->name : "rd_data"; + set_data_available_listener (sdesc->rd, sthrname, data_available_listener, sdesc); + sdesc = sdesc->next; + } + } + /* Need to handle incoming "pong"s only if we can be sending "ping"s (whether that be pings from the "ping" mode (i.e. ping_intv != DDS_NEVER), or pings embedded in the published data stream (i.e. rate > 0 && ping_frac > 0). The trouble with @@ -2603,50 +2821,23 @@ int main (int argc, char *argv[]) would result in a "thread awake nesting" overflow (and otherwise in a stack overflow) because each sample triggers the next. So in that particular case we had better create a waitset. */ - const bool pingpong_waitset = (ping_intv != DDS_NEVER && ignorelocal == DDS_IGNORELOCAL_NONE) || pingpongmode == SM_WAITSET; - if (pingpong_waitset) + const bool pingpong_waitset = (flows->mode == FLOW_DEFAULT && (globals.ping_intv != DDS_NEVER && ignorelocal == DDS_IGNORELOCAL_NONE)) || globals.pingpongmode == SM_WAITSET; + if (!batch_mode) { - ddsrt_thread_create (&subpingtid, "ping", &attr, subpingthread_waitset, &subarg_ping); - ddsrt_thread_create (&subpongtid, "pong", &attr, subpongthread_waitset, &subarg_pong); - } - else - { - set_data_available_listener (rd_ping, "rd_ping", ping_available_listener, &subarg_ping); - set_data_available_listener (rd_pong, "rd_pong", pong_available_listener, &subarg_pong); + if (pingpong_waitset) + { + ddsrt_thread_create (&subpingtid, "ping", &attr, subpingthread_waitset, &ping_reader); + ddsrt_thread_create (&subpongtid, "pong", &attr, subpongthread_waitset, &pong_reader); + } + else + { + set_data_available_listener (ping_reader.rd, "rd_ping", ping_available_listener, &ping_reader); + set_data_available_listener (pong_reader.rd, "rd_pong", pong_available_listener, &pong_reader); + } } - /* Have to do this after all threads have been created because it caches the list */ - struct record_cputime_state *cputime_state; cputime_state = record_cputime_new (wr_stat); - struct dds_stats stats; - const struct dds_stat_keyvalue dummy_u64 = { .name = "", .kind = DDS_STAT_KIND_UINT64, .u.u64 = 0 }; - const struct dds_stat_keyvalue dummy_u32 = { .name = "", .kind = DDS_STAT_KIND_UINT32, .u.u32 = 0 }; - stats.substat = dds_create_statistics (rd_data); - stats.discarded_bytes = dds_lookup_statistic (stats.substat, "discarded_bytes"); - stats.pubstat = dds_create_statistics (wr_data); - stats.rexmit_bytes = dds_lookup_statistic (stats.pubstat, "rexmit_bytes"); - stats.time_rexmit = dds_lookup_statistic (stats.pubstat, "time_rexmit"); - stats.time_throttle = dds_lookup_statistic (stats.pubstat, "time_throttle"); - stats.throttle_count = dds_lookup_statistic (stats.pubstat, "throttle_count"); - if (stats.discarded_bytes == NULL) - stats.discarded_bytes = &dummy_u64; - if (stats.rexmit_bytes == NULL) - stats.rexmit_bytes = &dummy_u64; - if (stats.time_rexmit == NULL) - stats.time_rexmit = &dummy_u64; - if (stats.time_throttle == NULL) - stats.time_throttle = &dummy_u64; - if (stats.throttle_count == NULL) - stats.throttle_count = &dummy_u32; - if (stats.discarded_bytes->kind != DDS_STAT_KIND_UINT64 || - stats.rexmit_bytes->kind != DDS_STAT_KIND_UINT64 || - stats.time_rexmit->kind != DDS_STAT_KIND_UINT64 || - stats.time_throttle->kind != DDS_STAT_KIND_UINT64 || - stats.throttle_count->kind != DDS_STAT_KIND_UINT32) - { - abort (); - } /* I hate Unix signals in multi-threaded processes ... */ #ifdef _WIN32 @@ -2673,20 +2864,21 @@ int main (int argc, char *argv[]) ignore the possibility of overflow around the year 2260.) */ dds_time_t tnow = dds_time (); const dds_time_t tstart = tnow; - if (tref == DDS_INFINITY) - tref = tstart; + if (globals.tref == DDS_INFINITY) + globals.tref = tstart; dds_time_t tminmatch = DDS_NEVER; - dds_time_t tmatch = (maxwait == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (maxwait * 1e9 + 0.5); - const dds_time_t tstop = (dur == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (dur * 1e9 + 0.5); + dds_time_t tmatch = (globals.maxwait == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (globals.maxwait * 1e9 + 0.5); + const dds_time_t tstop = (globals.dur == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (globals.dur * 1e9 + 0.5); + dds_time_t tnext = tstart + DDS_SECS (1); dds_time_t tlast = tstart; - dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv; + dds_time_t tnextping = (globals.ping_intv == DDS_INFINITY) ? DDS_NEVER : (globals.ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + globals.ping_intv; while (!ddsrt_atomic_ld32 (&termflag) && tnow < tstop) { dds_time_t twakeup = DDS_NEVER; int32_t nxs; - if (tnow < tminmatch && matchcount >= minmatch) + if (tnow < tminmatch && matchcount >= globals.minmatch) tminmatch = tnow; /* bail out if too few readers discovered within the deadline */ @@ -2694,7 +2886,7 @@ int main (int argc, char *argv[]) { bool status_ok; ddsrt_mutex_lock (&disc_lock); - status_ok = (matchcount >= minmatch); + status_ok = (matchcount >= globals.minmatch); ddsrt_mutex_unlock (&disc_lock); if (status_ok) tmatch = DDS_NEVER; @@ -2702,7 +2894,7 @@ int main (int argc, char *argv[]) { /* set minmatch to an impossible value to avoid a match occurring between now and the determining of the exit status from causing a successful return */ - minmatch = UINT32_MAX; + globals.minmatch = UINT32_MAX; break; } } @@ -2750,7 +2942,7 @@ int main (int argc, char *argv[]) if (tnext <= tnow) { bool output; - output = print_stats (tref, tnow, tlast, cputime_state, netload_state, &stats); + output = print_stats (globals.tref, tnow, tlast, cputime_state, netload_state, publishers, subscribers); tlast = tnow; if (tnow > tnext + DDS_MSECS (500)) tnext = tnow + DDS_SECS (1); @@ -2773,17 +2965,12 @@ int main (int argc, char *argv[]) or stopping a process, as a result of packet loss if best-effort reliability is selected, or as a result of overwhelming the ping/pong from the data publishing thread (as the QoS is a simple keep-last-1) */ - if (tnextping <= tnow) + if (flows->mode == FLOW_DEFAULT && tnextping <= tnow) { maybe_send_new_ping (tnow, &tnextping); } } - dds_delete_statistics (stats.pubstat); - dds_delete_statistics (stats.substat); - record_netload_free (netload_state); - record_cputime_free (cputime_state); - #if _WIN32 signal_handler (SIGINT); #elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__ @@ -2801,9 +2988,13 @@ int main (int argc, char *argv[]) } #endif - if (pub_rate > 0) - ddsrt_thread_join (pubtid, NULL); - if (subthread_func != NULL) + pdesc = publishers; + while (pdesc) + { + ddsrt_thread_join (pdesc->tid, NULL); + pdesc = pdesc->next; + } + if (subthread_func != 0) ddsrt_thread_join (subtid, NULL); if (pingpong_waitset) { @@ -2812,15 +3003,21 @@ int main (int argc, char *argv[]) } err_minmatch_wait: + + while (publishers) { + pdesc = publishers; + publishers = pdesc->next; + pubdesc_free (pdesc); + } + pubdesc_fini(&ping_writer); + + record_netload_free (netload_state); + record_cputime_free (cputime_state); + /* stop the listeners before deleting the readers: otherwise they may still try to access a reader that has already become inaccessible (not quite good, but ...) */ - dds_set_listener (rd_ping, NULL); - dds_set_listener (rd_pong, NULL); - dds_set_listener (rd_data, NULL); - dds_set_listener (rd_participants, NULL); - dds_set_listener (rd_subscriptions, NULL); - dds_set_listener (rd_publications, NULL); + /* Delete rd_data early to workaround a deadlock deleting a reader or writer while the receive thread (or a delivery thread) got @@ -2834,20 +3031,36 @@ int main (int argc, char *argv[]) The fix is to eliminate the waiting and retrying, and instead flip the reader's state to out-of-sync and rely on retransmits to let it make progress once room is available again. */ - dds_delete (rd_data); uint64_t nlost = 0; bool received_ok = true; - for (uint32_t i = 0; i < eseq_admin.nph; i++) + + while (subscribers) { - nlost += eseq_admin.stats[i].nlost; - if (eseq_admin.stats[i].nrecv < (uint64_t) min_received) - received_ok = false; + sdesc = subscribers; + subscribers = sdesc->next; + for (uint32_t i = 0; i < sdesc->eseq_admin.nph; i++) + { + if (!sdesc->eseq_admin.stats[i].keepall) + continue; + nlost += sdesc->eseq_admin.stats[i].nlost; + if (sdesc->eseq_admin.stats[i].nrecv < (uint64_t) globals.min_received) + received_ok = false; + } + subdesc_free(sdesc); } - fini_eseq_admin (&eseq_admin); - subthread_arg_fini (&subarg_data); - subthread_arg_fini (&subarg_ping); - subthread_arg_fini (&subarg_pong); + subdesc_fini(&ping_reader); + subdesc_fini(&pong_reader); + + dds_set_listener (rd_participants, NULL); + dds_set_listener (rd_subscriptions, NULL); + dds_set_listener (rd_publications, NULL); + + dds_set_listener (rd_participants, NULL); + dds_set_listener (rd_subscriptions, NULL); + dds_set_listener (rd_publications, NULL); + + dds_delete (dp); // only shutdown async listener once the participant is gone: that's @@ -2865,12 +3078,20 @@ int main (int argc, char *argv[]) bool roundtrips_ok = true; for (uint32_t i = 0; i < npongstat; i++) { - if (pongstat[i].info.totcnt < min_roundtrips) + if (pongstat[i].info.totcnt < globals.min_roundtrips) roundtrips_ok = false; latencystat_fini (&pongstat[i].info); } free (pongstat); + flow = flows; + while (flow) + { + struct dataflow *nxt = flow->next; + dataflow_free (flow); + flow = nxt; + } + bool ok = true; { @@ -2887,12 +3108,12 @@ int main (int argc, char *argv[]) ddsrt_avl_free (&ppants_td, &ppants, free_ppant); - if (matchcount < minmatch) + if (matchcount < globals.minmatch) { printf ("[%"PRIdPID"] error: too few matching participants (%"PRIu32")\n", ddsrt_getpid (), matchcount); ok = false; } - if (nlost > 0 && (reliable && histdepth == 0)) + if (nlost > 0) { printf ("[%"PRIdPID"] error: %"PRIu64" samples lost\n", ddsrt_getpid (), nlost); ok = false; @@ -2907,18 +3128,18 @@ int main (int argc, char *argv[]) printf ("[%"PRIdPID"] error: too few samples received from some peers\n", ddsrt_getpid ()); ok = false; } - if (livemem_check && livemem_final >= livemem_init * livemem_factor + livemem_term) + if (globals.livemem_check && livemem_final >= livemem_init * globals.livemem_factor + globals.livemem_term) { printf ("[%"PRIdPID"] error: live memory grew too much (%.1fMB -> %.1fMB)\n", ddsrt_getpid (), livemem_init / 1048576.0, livemem_final / 1048576.0); ok = false; } - if (rss_check && rss_final >= rss_init * rss_factor + rss_term) + if (globals.rss_check && rss_final >= rss_init * globals.rss_factor + globals.rss_term) { printf ("[%"PRIdPID"] error: RSS grew too much (%.1fMB -> %.1fMB)\n", ddsrt_getpid (), rss_init / 1048576.0, rss_final / 1048576.0); ok = false; } - if (livemem_check) + if (globals.livemem_check) { printf ("[%"PRIdPID"] note: livemem init %.1fMB peak %.1fMB final %.1fMB\n", ddsrt_getpid (), livemem_init / 1048576.0, (double) ddsrt_atomic_ld32 (&ddsperf_malloc_peak) / 1048576.0, livemem_final / 1048576.0); printf ("[%"PRIdPID"] note: RSS init %.1fMB final %.1fMB\n", ddsrt_getpid (), rss_init / 1048576.0, rss_final / 1048576.0); diff --git a/src/tools/ddsperf/jparser.c b/src/tools/ddsperf/jparser.c new file mode 100644 index 0000000000..d8aeb9e51c --- /dev/null +++ b/src/tools/ddsperf/jparser.c @@ -0,0 +1,281 @@ +// Copyright(c) 2019 ZettaScale Technology and others +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +// v. 1.0 which is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +#include +#include +#include +#include +#include + +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/misc.h" +#include "dds/ddsrt/avl.h" +#include "dds/ddsrt/string.h" +#include "dds/ddsrt/io.h" + +#include "jparser.h" + +struct jparser { + FILE *f; + char c; + int line; + int pos; + void *args; +}; + +void jparser_error (struct jparser *parser, char *fmt, ...) +{ + char *msg; + va_list ap; + + va_start (ap, fmt); + (void)ddsrt_vasprintf (&msg, fmt, ap); + va_end (ap); + fprintf (stderr, "parser error at char '%c' line %u at %u: %s\n", parser->c, parser->line, parser->pos, msg); + ddsrt_free (msg); +} + + +static char readchar (struct jparser *parser) +{ + parser->c = (char)fgetc (parser->f); + parser->pos++; + if (parser->c == '\n') + { + parser->line++; + parser->pos = 0; + } + return parser->c; +} + +static char skip_whitespace (struct jparser *parser) +{ + while (parser->c != EOF && strchr (" \t\n\r", parser->c)) + (void)readchar (parser); + return parser->c; +} + +static char * read_string (struct jparser *parser) +{ + char buffer[256]; + char c; + uint32_t i = 0; + + while ((c = readchar (parser)) != EOF && c != '"' && c != '\r' && c != '\n' && i < sizeof (buffer)) + { + buffer[i++] = (char)c; + } + if (c != '"') + { + jparser_error (parser, "expected closing quote"); + return NULL; + } + buffer[i] = '\0'; + (void)readchar (parser); + + return ddsrt_strdup (buffer); +} + +#define TOKEN_DELIMERS ",}] \r\n" + +static char * read_token (struct jparser *parser) +{ + char buffer[256]; + char c = parser->c; + uint32_t i = 0; + + do { + buffer[i++] = c; + } while ((c = readchar (parser)) != EOF && strchr (TOKEN_DELIMERS, c) == NULL && i < sizeof (buffer)); + buffer[i] = '\0'; + + return ddsrt_strdup (buffer); +} + +static bool parse_object (struct jparser *parser, const struct jparser_rule * const rules); +static bool parse_value (struct jparser *parser, const struct jparser_rule * const rules, const char *name); + +static bool parse_array (struct jparser *parser, const struct jparser_rule * const rules, const char *name) +{ + char c; + + do { + (void)readchar (parser); + + if (rules->open_callback && !rules->open_callback (parser, JPARSER_ACTION_ARRAY_OPEN, name, rules->label, parser->args)) + { + return false; + } + else if (!parse_value (parser, rules, name)) + { + return false; + } + else if (rules->close_callback && !rules->close_callback (parser, JPARSER_ACTION_ARRAY_CLOSE, name, rules->label, parser->args)) + { + return false; + } + c = skip_whitespace (parser); + } while (c == ','); + + if ((c != ']' && c != EOF)) + { + jparser_error (parser, "expected ]"); + return false; + } + (void)readchar (parser); + + return true; +} + +static bool parse_value (struct jparser *parser, const struct jparser_rule * const rules, const char *name) +{ + bool retval; + char c; + char *str; + + c = skip_whitespace (parser); + if (c == '"') + { + if ((str = read_string (parser)) == NULL) + { + return false; + } + retval = !rules->value_callback || rules->value_callback (parser, JPARSER_ACTION_VALUE, str, rules->label, parser->args); + ddsrt_free (str); + } + else if (c == '{') + { + return parse_object (parser, rules->children ? rules->children : rules); + } + else if (c == '[') + { + retval = parse_array(parser, rules, name); + } + else + { + if ((str = read_token (parser)) == NULL) + return false; + retval = !rules->value_callback || rules->value_callback (parser, JPARSER_ACTION_VALUE, str, rules->label, parser->args); + ddsrt_free (str); + } + + return retval; +} + +static bool parse_element (struct jparser *parser, const struct jparser_rule * const rules) +{ + char c; + char *name = NULL; + int idx; + + c = skip_whitespace (parser); + if (c != '"') + { + jparser_error (parser, "expected opening quote"); + return false; + } + + if ((name = read_string (parser)) == NULL) + { + jparser_error (parser, "expect element name"); + return false; + } + else if (strlen (name) == 0) + { + jparser_error (parser, "expect element name"); + goto fail; + } + + for (idx = 0; ((rules[idx].label != JPARSER_END_RULE) && (rules[idx].name != NULL) && (strcmp (rules[idx].name, name) != 0)); idx++) + { + if (rules[idx].label == JPARSER_END_RULE) + { + jparser_error (parser, "expect element name '%s' not found", name); + goto fail; + } + } + + if (rules[idx].open_callback && !rules[idx].open_callback (parser, JPARSER_ACTION_OBJECT_OPEN, name, rules[idx].label, parser->args)) + goto fail; + + c = skip_whitespace (parser); + if (c != ':') + { + jparser_error (parser, "expected colon"); + goto fail; + } + + (void)readchar (parser); + if (!parse_value (parser, &rules[idx], name)) + goto fail; + + if (rules[idx].close_callback && !rules[idx].close_callback (parser, JPARSER_ACTION_OBJECT_CLOSE, name, rules[idx].label, parser->args)) + goto fail; + + ddsrt_free (name); + return true; + +fail: + ddsrt_free (name); + return false; +} + +static bool parse_object (struct jparser *parser, const struct jparser_rule * const rules) +{ + char c; + const struct jparser_rule *r = rules; + + do { + (void)readchar (parser); + if (!parse_element (parser, r)) + { + return false; + } + c = skip_whitespace (parser); + } while (c == ','); + + if ((c != '}' && c != EOF)) + { + jparser_error (parser, "expected }"); + return false; + } + (void)readchar (parser); + + return true; +} + +bool jparser_parse (const char *fname, const struct jparser_rule * const rules, void *args) +{ + bool result = false; + struct jparser parser; + char c; + + parser.f = fopen (fname, "r"); + if (parser.f == NULL) { + fprintf (stderr, "Failed to open script '%s'\n", fname); + return false; + } + + parser.line = 1; + parser.pos = 0; + parser.args = args; + parser.c = 0; + + c = skip_whitespace (&parser); + if (c == '{') + result = parse_object (&parser, rules); + else if (c == '[') + result = parse_array (&parser, rules, NULL); + else + jparser_error(&parser, "expected '{' or '['"); + + fclose(parser.f); + return result; +} diff --git a/src/tools/ddsperf/jparser.h b/src/tools/ddsperf/jparser.h new file mode 100644 index 0000000000..76c1d7ca04 --- /dev/null +++ b/src/tools/ddsperf/jparser.h @@ -0,0 +1,43 @@ +// Copyright(c) 2019 ZettaScale Technology and others +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +// v. 1.0 which is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +#ifndef JPARSER_H +#define JPARSER_H + +#include + + +#define JPARSER_END_RULE -1 + +enum jparser_action_kind { + JPARSER_ACTION_OBJECT_OPEN, + JPARSER_ACTION_OBJECT_CLOSE, + JPARSER_ACTION_ARRAY_OPEN, + JPARSER_ACTION_ARRAY_CLOSE, + JPARSER_ACTION_VALUE +}; + +struct jparser; + +typedef bool (*jparser_action) (struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *arg); + +struct jparser_rule { + int label; + const char *name; + jparser_action open_callback; + jparser_action close_callback; + jparser_action value_callback; + const struct jparser_rule * const children; +}; + +void jparser_error (struct jparser *parser, char *fmt, ...); +bool jparser_parse (const char *fname, const struct jparser_rule * const rules, void *args); + +#endif /* JPARSER_H */ diff --git a/src/tools/ddsperf/parameters.c b/src/tools/ddsperf/parameters.c new file mode 100644 index 0000000000..c09643347b --- /dev/null +++ b/src/tools/ddsperf/parameters.c @@ -0,0 +1,875 @@ +// Copyright(c) 2019 ZettaScale Technology and others +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +// v. 1.0 which is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +#include +#include +#include +#include +#include + +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/misc.h" +#include "dds/ddsrt/avl.h" +#include "dds/ddsrt/string.h" +#include "dds/ddsrt/io.h" + +#include "jparser.h" +#include "parameters.h" + +enum rule_id { + RULE_ID_END, + RULE_ID_COMMENT, + RULE_ID_GLOBALS, + RULE_ID_TEMPLATES, + RULE_ID_NODES, + RULE_ID_NODE, + RULE_ID_FLOWS, + RULE_ID_FLOW, + RULE_ID_USES, + RULE_ID_MODE, + RULE_ID_TOPIC_NAME, + RULE_ID_TOPIC_TYPE, + RULE_ID_PARTITION, + RULE_ID_NUM_KEYS, + RULE_ID_SAMPLE_SIZE, + RULE_ID_BURST_SIZE, + RULE_ID_REGISTER, + RULE_ID_RELIABLE, + RULE_ID_HISTORY, + RULE_ID_TRANSPORT_PRIO, + RULE_ID_PUB_RATE, + RULE_ID_USE_WRITER_LOAN, + RULE_ID_SUBMODE, + RULE_ID_SUBLATENCY, + RULE_ID_COLLECT_STATS, + RULE_ID_EXTENDED_STATS, + RULE_ID_INTERFACE, + RULE_ID_BANDWIDTH +}; + +struct node { + ddsrt_avl_node_t node; + char *name; + char netload_if[256]; + double netload_bw; + struct dataflow *head; + struct dataflow *tail; +}; + +struct flow_template { + ddsrt_avl_node_t node; + char *name; + struct dataflow *parameters; +}; + +struct context { + ddsrt_avl_tree_t nodes; + ddsrt_avl_tree_t templates; + bool process_templates; + struct node *curnode; + struct flow_template *curtemplate; + struct dataflow *curflow; + struct global *globals; +}; + +static bool handle_global_parameter(struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args); +static bool handle_template (struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args); +static bool handle_node (struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args); +static bool handle_flow (struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args); +static bool handle_node_name (struct jparser *parser, enum jparser_action_kind kind, const char *name, int label, void *args); +static bool handle_node_parameter(struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args); +static bool handle_template_name(struct jparser *parser, enum jparser_action_kind kind, const char *name, int label, void *args); +static bool handle_flow_name (struct jparser *parser, enum jparser_action_kind kind, const char *name, int label, void *args); +static bool handle_flow_uses(struct jparser *parser, enum jparser_action_kind kind, const char *name, int label, void *args); +static bool handle_flow_parameter(struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args); + +static const struct jparser_rule flow_rules[] = { + { RULE_ID_FLOW, "flow", 0, 0, handle_flow_name, NULL }, + { RULE_ID_USES, "uses", 0, 0, handle_flow_uses, NULL }, + { RULE_ID_MODE, "mode", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_TOPIC_NAME, "topic_name", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_TOPIC_TYPE, "topic_type", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_PARTITION, "partition", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_NUM_KEYS, "nkeys", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_SAMPLE_SIZE, "size", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_BURST_SIZE, "burst", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_REGISTER, "register", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_RELIABLE, "reliable", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_HISTORY, "history", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_TRANSPORT_PRIO, "transport_prio", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_PUB_RATE, "pub_rate", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_USE_WRITER_LOAN, "use_writer_loan", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_COMMENT, "comment", 0, 0, 0, NULL }, + { RULE_ID_END, NULL, 0, 0, 0, NULL } +}; + +static const struct jparser_rule template_rules[] = { + { RULE_ID_FLOW, "name", 0, 0, handle_template_name, NULL }, + { RULE_ID_MODE, "mode", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_TOPIC_NAME, "topic_name", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_TOPIC_TYPE, "topic_type", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_PARTITION, "partition", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_NUM_KEYS, "nkeys", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_SAMPLE_SIZE, "size", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_BURST_SIZE, "burst", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_REGISTER, "register", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_RELIABLE, "reliable", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_HISTORY, "history", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_TRANSPORT_PRIO, "transport_prio", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_PUB_RATE, "pub_rate", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_USE_WRITER_LOAN, "use_writer_loan", 0, 0, handle_flow_parameter, NULL }, + { RULE_ID_COMMENT, "comment", 0, 0, 0, NULL }, + { RULE_ID_END, NULL, 0, 0, 0, NULL } +}; + +static const struct jparser_rule node_rules[] = { + { RULE_ID_NODE, "node", 0, 0, handle_node_name, NULL}, + { RULE_ID_FLOWS, "flows", handle_flow, handle_flow, 0, flow_rules}, + { RULE_ID_INTERFACE, "interface", 0, 0, handle_node_parameter, NULL}, + { RULE_ID_BANDWIDTH, "bandwidth", 0, 0, handle_node_parameter, NULL}, + { RULE_ID_COMMENT, "comment", 0, 0, 0, NULL }, + { RULE_ID_END, NULL, 0, 0, 0, NULL } +}; + +static const struct jparser_rule global_rules[] = { + { RULE_ID_SUBMODE, "submode", 0, 0, handle_global_parameter, NULL}, + { RULE_ID_SUBLATENCY, "sublatency", 0, 0, handle_global_parameter, NULL}, + { RULE_ID_COLLECT_STATS, "collect_stats", 0, 0, handle_global_parameter, NULL}, + { RULE_ID_EXTENDED_STATS, "extended_stats", 0, 0, handle_global_parameter, NULL}, + { RULE_ID_COMMENT, "comment", 0, 0, 0, NULL }, + { RULE_ID_END, NULL, 0, 0, 0, NULL } +}; + +static const struct jparser_rule rules[] = { + { RULE_ID_GLOBALS, "globals", 0, 0, 0, global_rules}, + { RULE_ID_TEMPLATES, "templates", handle_template, handle_template, 0, template_rules}, + { RULE_ID_NODES, "nodes", handle_node, handle_node, 0, node_rules}, + { RULE_ID_COMMENT, "comment", 0, 0, 0, NULL }, + { RULE_ID_END, NULL, 0, 0, 0, NULL } +}; + + +static int compare_name (const void *a, const void *b); + +static const ddsrt_avl_treedef_t node_tree_def = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct node, node), offsetof (struct node, name), compare_name, 0); +static const ddsrt_avl_treedef_t flow_tree_def = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct flow_template, node), offsetof (struct flow_template, name), compare_name, 0); + + +static int compare_name (const void *a, const void *b) +{ + return strcmp (a, b); +} + +static const struct multiplier frequency_units[] = { + { "Hz", 1 }, + { "kHz", 1000 }, + { NULL, 0 } +}; + +static const struct multiplier size_units[] = { + { "B", 1 }, + { "k", 1024 }, + { "M", 1048576 }, + { "kB", 1024 }, + { "KiB", 1024 }, + { "MB", 1048576 }, + { "MiB", 1048576 }, + { NULL, 0 } +}; + +static int lookup_multiplier (const struct multiplier *units, const char *suffix) +{ + while (*suffix == ' ') + suffix++; + if (*suffix == 0) + return 1; + else if (units == NULL) + return 0; + else + { + for (size_t i = 0; units[i].suffix; i++) + if (strcmp (units[i].suffix, suffix) == 0) + return units[i].mult; + return 0; + } +} + +bool string_to_size (const char *str, uint32_t *val) +{ + unsigned x; + int pos, mult; + + if (sscanf (str, "%u%n", &x, &pos) == 1 && (mult = lookup_multiplier (size_units, str + pos)) > 0) + *val = x * (unsigned) mult; + else + return false; + return true; +} + +bool string_to_number (const char *str, uint32_t *val) +{ + char *endptr; + int orig_errno = errno; + + errno = 0; + *val = (uint32_t) strtoul (str, &endptr, 10); + errno = orig_errno; + if (*endptr != '\0') + return false; + return true; +} + +bool string_to_frequency (const char *str, double *val) +{ + int pos, mult = 1; + double r; + + if (strncmp (str, "inf", 3) == 0 && lookup_multiplier (frequency_units, str + 3) > 0) + *val = HUGE_VAL; + else if (sscanf (str, "%lf%n", &r, &pos) == 1 && (mult = lookup_multiplier (frequency_units, str + pos)) > 0) + *val = r * mult; + else + return false; + return true; +} + +bool string_to_bool (const char *value, bool *result) +{ + if (strcmp (value, "true") == 0 || strcmp (value, "1") == 0) + *result = true; + else if (strcmp (value, "false") == 0 || strcmp (value, "0") == 0) + *result = false; + else + return false; + return true; +} + +static bool flow_exists (struct node *node, const char *name) +{ + struct dataflow *flow; + + flow = node->head; + while (flow != NULL && strcmp (flow->name, name) != 0) + flow = flow->next; + + return (flow != NULL); +} + +char * string_copy(const char *s) +{ + char *str; + + if ((str = malloc(strlen(s) + 1)) != NULL) + strcpy(str, s); + return str; +} + +static bool handle_global_parameter(struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args) +{ + struct context *ctx = args; + + DDSRT_UNUSED_ARG(kind); + + switch (label) + { + case RULE_ID_SUBMODE: + if (strcmp ("waitset", value) == 0) + ctx->globals->submode = SM_WAITSET; + else if (strcmp ("polling", value) == 0) + ctx->globals->submode = SM_POLLING; + else if (strcmp ("listener", value) == 0) + ctx->globals->submode = SM_LISTENER; + else + { + jparser_error (parser, "submode '%s' is invalid", value); + return false; + } + return true;; + case RULE_ID_SUBLATENCY: + if (!string_to_bool (value, &ctx->globals->sublatency)) + { + jparser_error (parser, "sublatency '%s' is not a valid boolean", value); + return false; + } + return true; + case RULE_ID_COLLECT_STATS: + if (!string_to_bool (value, &ctx->globals->collect_stats)) + { + jparser_error (parser, "collect_stats '%s' is not a valid boolean", value); + return false; + } + return true; + case RULE_ID_EXTENDED_STATS: + if (!string_to_bool (value, &ctx->globals->extended_stats)) + { + jparser_error (parser, "extended_stats '%s' is not a valid boolean", value); + return false; + } + return true; + default: + assert(0); + break; + } + return false; +} + +static bool handle_node (struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args) +{ + struct context *ctx = args; + + DDSRT_UNUSED_ARG(value); + DDSRT_UNUSED_ARG(label); + + if (kind == JPARSER_ACTION_ARRAY_OPEN) + { + ctx->curnode = malloc (sizeof (struct node)); + ctx->curnode->name = NULL; + ctx->curnode->netload_if[0] = '\0'; + ctx->curnode->netload_bw = -1; + ctx->curnode->head = NULL; + ctx->curnode->tail = NULL; + } + else if (kind == JPARSER_ACTION_ARRAY_CLOSE) + { + if (ctx->curnode == NULL || ctx->curnode->name == NULL) + { + jparser_error (parser, "node name not specified"); + return false; + } + ddsrt_avl_insert(&node_tree_def, &ctx->nodes, ctx->curnode); + ctx->curnode = NULL; + } + + return true; +} + +static bool handle_template (struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args) +{ + struct context *ctx = args; + + DDSRT_UNUSED_ARG(value); + DDSRT_UNUSED_ARG(label); + + if (kind == JPARSER_ACTION_OBJECT_OPEN) + { + ctx->process_templates = true; + } + else if (kind == JPARSER_ACTION_OBJECT_CLOSE) + { + ctx->process_templates = false; + } + else if (kind == JPARSER_ACTION_ARRAY_OPEN) + { + ctx->curtemplate = malloc (sizeof (struct flow_template)); + ctx->curtemplate->name = NULL; + ctx->curtemplate->parameters = dataflow_new(); + } + else if (kind == JPARSER_ACTION_ARRAY_CLOSE) + { + if (ctx->curtemplate == NULL || ctx->curtemplate->name == NULL) + { + jparser_error (parser, "template name not specified"); + return false; + } + ddsrt_avl_insert(&flow_tree_def, &ctx->templates, ctx->curtemplate); + ctx->curtemplate = NULL; + } + + return true; +} + +static bool handle_flow (struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args) +{ + struct context *ctx = args; + + DDSRT_UNUSED_ARG(value); + DDSRT_UNUSED_ARG(label); + + if (kind == JPARSER_ACTION_ARRAY_OPEN) + { + ctx->curflow = dataflow_new (); + } + else if (kind == JPARSER_ACTION_ARRAY_CLOSE) + { + if (ctx->curflow == NULL || ctx->curflow->name == NULL) + { + jparser_error (parser, "flow name not specified"); + return false; + } + + if (ctx->curflow->nkeyvals == 0) + ctx->curflow->nkeyvals = 1; + + if (ctx->curflow->topicsel == OU && ctx->curflow->nkeyvals != 1) + { + jparser_error (parser, "-n %u invalid: topic OU has no key\n", ctx->curflow->nkeyvals); + return false; + } + + if (ctx->curflow->topicsel != KS && ctx->curflow->baggagesize != 0) + { + jparser_error (parser, "size %"PRIu32" invalid: only topic KS has a sequence", ctx->curflow->baggagesize); + return false; + } + if (ctx->curflow->topicsel == KS && ctx->curflow->use_writer_loan) + { + jparser_error (parser,"topic KS is not supported with writer loans because it contains a sequence\n"); + return false; + } + + if (ctx->curflow->baggagesize != 0 && ctx->curflow->baggagesize < 12) + { + jparser_error (parser, "size %"PRIu32" invalid: too small to allow for overhead\n", ctx->curflow->baggagesize); + return false; + } + + if (ctx->curflow->baggagesize > 0) + ctx->curflow->baggagesize -= 12; + + ctx->curflow->tp_desc = get_topic_descriptor (ctx->curflow->topicsel); + if (ctx->curnode->head == NULL) + { + ctx->curnode->head = ctx->curflow; + } + else + { + ctx->curnode->tail->next = ctx->curflow; + } + ctx->curnode->tail = ctx->curflow; + ctx->curflow = NULL; + } + + return true; +} + +static bool handle_node_name(struct jparser *parser, enum jparser_action_kind kind, const char *name, int label, void *args) +{ + struct context *ctx = args; + + DDSRT_UNUSED_ARG(kind); + DDSRT_UNUSED_ARG(label); + + assert(ctx->curnode); + + if (ctx->curnode->name != NULL) + { + jparser_error (parser, "node '%s' already specified", name); + return false; + } + else if (ddsrt_avl_lookup (&node_tree_def, &ctx->nodes, name) != NULL) + { + jparser_error (parser, "node '%s' already exists", name); + return false; + } + ctx->curnode->name = string_copy(name); + return true; +} + +static bool handle_template_name(struct jparser *parser, enum jparser_action_kind kind, const char *name, int label, void *args) +{ + struct context *ctx = args; + + DDSRT_UNUSED_ARG(kind); + DDSRT_UNUSED_ARG(label); + + assert(ctx->curtemplate); + + if (ctx->curtemplate->name != NULL) + { + jparser_error (parser, "template '%s' already specified", name); + return false; + } + else if (ddsrt_avl_lookup (&flow_tree_def, &ctx->templates, name) != NULL) + { + jparser_error (parser, "template '%s' already exists", name); + return false; + } + ctx->curtemplate->name = string_copy(name); + return true; +} + +static bool handle_flow_name(struct jparser *parser, enum jparser_action_kind kind, const char *name, int label, void *args) +{ + struct context *ctx = args; + + DDSRT_UNUSED_ARG(kind); + DDSRT_UNUSED_ARG(label); + + assert(ctx->curflow); + + if (ctx->curflow->name != NULL) + { + jparser_error (parser, "flow '%s' already specified", name); + return false; + } + else if (flow_exists (ctx->curnode, name)) + { + jparser_error (parser, "FAIL: flow '%s' already exists\n", name); + return false; + } + ctx->curflow->name = string_copy(name); + return true; +} + +static bool handle_flow_uses(struct jparser *parser, enum jparser_action_kind kind, const char *name, int label, void *args) +{ + struct context *ctx = args; + struct flow_template *template = NULL; + + DDSRT_UNUSED_ARG(kind); + DDSRT_UNUSED_ARG(label); + + assert(ctx->curflow); + + template = ddsrt_avl_lookup (&flow_tree_def, &ctx->templates, name); + if (template == NULL) + { + jparser_error (parser, "flow template '%s' not found", name); + return false; + } + + ctx->curflow->mode = template->parameters->mode; + if (template->parameters->topicname) + { + free(ctx->curflow->topicname); + ctx->curflow->topicname = string_copy(template->parameters->topicname); + } + if (template->parameters->partition) + { + free(ctx->curflow->partition); + ctx->curflow->partition = string_copy(template->parameters->partition); + } + ctx->curflow->topicsel = template->parameters->topicsel; + ctx->curflow->tp_desc = template->parameters->tp_desc; + ctx->curflow->nkeyvals = template->parameters->nkeyvals; + ctx->curflow->baggagesize = template->parameters->baggagesize; + ctx->curflow->burstsize = template->parameters->burstsize; + ctx->curflow->register_instances = template->parameters->register_instances; + ctx->curflow->pub_rate = template->parameters->pub_rate; + ctx->curflow->reliable = template->parameters->reliable; + ctx->curflow->histdepth = template->parameters->histdepth; + ctx->curflow->transport_prio = template->parameters->transport_prio; + ctx->curflow->ping_frac = template->parameters->ping_frac; + ctx->curflow->use_writer_loan = template->parameters->use_writer_loan; + + return true; +} + +static bool handle_node_parameter(struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args) +{ + struct context *ctx = args; + bool result = true; + int pos; + + DDSRT_UNUSED_ARG(kind); + + switch (label) + { + case RULE_ID_INTERFACE: + ddsrt_strlcpy (ctx->curnode->netload_if, value, sizeof (ctx->globals->netload_if)); + break; + case RULE_ID_BANDWIDTH: + if (sscanf (value, "%lf%n", &ctx->curnode->netload_bw, &pos) != 1) + { + jparser_error (parser, "bandwidth '%s' is valid", value); + result = false; + } + break; + } + return result; +} + +static bool handle_flow_parameter(struct jparser *parser, enum jparser_action_kind kind, const char *value, int label, void *args) +{ + struct context *ctx = args; + struct dataflow *flow; + bool result = true; + uint32_t uval; + + DDSRT_UNUSED_ARG(kind); + + if (ctx->process_templates) + { + flow = ctx->curtemplate->parameters; + } + else + { + flow = ctx->curflow; + } + + switch (label) + { + case RULE_ID_MODE: + if (strcmp (value, "pub") == 0) + flow->mode = FLOW_PUB; + else if (strcmp (value, "sub") == 0) + flow->mode = FLOW_SUB; + else + { + jparser_error (parser, "mode '%s' not expected only 'pub' or 'sub' allowed", value); + result = false; + } + break; + case RULE_ID_TOPIC_NAME: + free(flow->topicname); + flow->topicname = string_copy (value); + break; + case RULE_ID_TOPIC_TYPE: + result = get_topicsel_from_string (value, &flow->topicsel); + break; + case RULE_ID_PARTITION: + free(flow->partition); + flow->partition = string_copy (value); + break; + case RULE_ID_NUM_KEYS: + if (!string_to_number (value, &flow->nkeyvals)) + { + jparser_error (parser, "nkeys '%s' is not a valid number", value); + result = false; + } + break; + case RULE_ID_SAMPLE_SIZE: + if (!string_to_size (value, &flow->baggagesize)) + { + jparser_error (parser, "sample_size '%s' is not a valid number", value); + result = false; + } + break; + case RULE_ID_BURST_SIZE: + if (!string_to_size (value, &flow->burstsize)) + { + jparser_error (parser, "burst_size '%s' is not a valid number", value); + result = false; + } + break; + case RULE_ID_REGISTER: + if (!string_to_bool (value, &flow->register_instances)) + { + jparser_error (parser, "register '%s' is not a valid booleam", value); + result = false; + } + break; + case RULE_ID_RELIABLE: + if (!string_to_bool (value, &flow->reliable)) + { + jparser_error (parser, "reliable '%s' is not a valid boolean", value); + result = false; + } + break; + case RULE_ID_HISTORY: + if (!string_to_number (value, &uval)) + { + jparser_error (parser, "history '%s' is not a valid number", value); + result = false; + } + flow->histdepth = (int32_t) uval; + break; + case RULE_ID_TRANSPORT_PRIO: + if (!string_to_number (value, &flow->transport_prio)) + { + jparser_error (parser, "transport_prio '%s' is not a valid number", value); + result = false; + } + break; + case RULE_ID_PUB_RATE: + if (!string_to_frequency (value, &flow->pub_rate)) + { + jparser_error (parser, "pub_rate '%s' is not a valid number", value); + result = false; + } + break; + case RULE_ID_USE_WRITER_LOAN: + if (!string_to_bool (value, &flow->use_writer_loan)) + { + jparser_error (parser, "reliable '%s' is not a valid boolean", value); + result = false; + } + break; + default: + result = false; + break; + } + + return result; +} + + +static void node_free (struct node *node) +{ + if (node) + { + free (node->name); + while (node->head) + { + struct dataflow *flow = node->head; + node->head = flow->next; + dataflow_free (flow); + } + free (node); + } +} + +static void node_free_wrapper (void *a) +{ + node_free (a); +} + +static void flow_template_free (struct flow_template *template) +{ + if (template) + { + free (template->name); + dataflow_free(template->parameters); + free (template); + } +} + +static void flow_template_free_wrapper (void *a) +{ + flow_template_free (a); +} + +bool read_parameters (const char *fname, const char *node_name, struct dataflow **flows, struct global *globals) +{ + bool result = false; + struct context ctx; + + ctx.curnode = NULL; + ctx.curtemplate = NULL; + ctx.curflow = NULL; + ctx.globals = globals; + ctx.process_templates = false; + ddsrt_avl_init (&node_tree_def, &ctx.nodes); + ddsrt_avl_init (&flow_tree_def, &ctx.templates); + + if ((result = jparser_parse(fname, rules, &ctx))) + { + struct node *node = ddsrt_avl_lookup (&node_tree_def, &ctx.nodes, node_name); + if (node == NULL) + { + fprintf (stderr, "Node '%s' not found\n", node_name); + result = false; + } + else + { + *flows = node->head; + if (node->netload_bw > 0 && node->netload_if[0] != '\0') + { + memcpy(globals->netload_if, node->netload_if, sizeof(globals->netload_if)); + globals->netload_bw = node->netload_bw; + } + node->head = NULL; + node->tail = NULL; + } + } + + dataflow_free (ctx.curflow); + node_free (ctx.curnode); + ddsrt_avl_free (&node_tree_def, &ctx.nodes, node_free_wrapper); + ddsrt_avl_free (&flow_tree_def, &ctx.templates, flow_template_free_wrapper); + + return result; +} + +void init_globals (struct global *args) +{ + args->dur = HUGE_VAL; + args->is_publishing = false; + args->tref = DDS_INFINITY; + args->netload_if[0] = '\0'; + args->netload_bw = -1; + args->submode = SM_LISTENER; + args->pingpongmode = SM_LISTENER; + args->ping_intv = DDS_INFINITY; + args->substat_every_second = false; + args->extended_stats = false; + args->minmatch = 0; + args->initmaxwait = 0; + args->maxwait = HUGE_VAL; + args->rss_check = false; + args->rss_factor = 1; + args->rss_term = 0; + args->min_received = 0; + args->min_roundtrips = 0; + args->livemem_check = false; + args->livemem_factor = 1; + args->livemem_term = 0; + args->sublatency = false; +} + +struct dataflow * dataflow_new (void) +{ + struct dataflow *flow = malloc (sizeof (*flow)); + flow->name = NULL; + flow->mode = FLOW_DEFAULT; + flow->topicname = NULL; + flow->partition = NULL; + flow->topicsel = KS; + flow->tp_desc = NULL; + flow->nkeyvals = 1; + flow->baggagesize = 0; + flow->burstsize = 1; + flow->register_instances = true; + flow->pub_rate = 0.0; + flow->reliable = true; + flow->histdepth = 0; + flow->ping_frac = 0; + flow->use_writer_loan = false; + flow->next = NULL; + return flow; +} + +void dataflow_free (struct dataflow *flow) +{ + if (flow) + { + free (flow->name); + free (flow->topicname); + free (flow->partition); + free (flow); + } +} + +bool get_topicsel_from_string (const char *name, enum topicsel *topicsel) +{ + if (strcmp (name, "KS") == 0) *topicsel = KS; + else if (strcmp (name, "K32") == 0) *topicsel = K32; + else if (strcmp (name, "K256") == 0) *topicsel = K256; + else if (strcmp (name, "OU") == 0) *topicsel = OU; + else if (strcmp (name, "UK16") == 0) *topicsel = UK16; + else if (strcmp (name, "UK1") == 0) *topicsel = UK1k; + else if (strcmp (name, "UK64") == 0) *topicsel = UK64k; + else if (strcmp (name, "S16") == 0) *topicsel = S16; + else if (strcmp (name, "S256") == 0) *topicsel = S256; + else if (strcmp (name, "S4k") == 0) *topicsel = S4k; + else if (strcmp (name, "S32k") == 0) *topicsel = S32k; + else return false; + return true; +} + +const dds_topic_descriptor_t * get_topic_descriptor (enum topicsel topicsel) +{ + const dds_topic_descriptor_t *tp_desc = NULL; + switch (topicsel) + { + case KS: tp_desc = &KeyedSeq_desc; break; + case K32: tp_desc = &Keyed32_desc; break; + case K256: tp_desc = &Keyed256_desc; break; + case OU: tp_desc = &OneULong_desc; break; + case UK16: tp_desc = &Unkeyed16_desc; break; + case UK1k: tp_desc = &Unkeyed1k_desc; break; + case UK64k: tp_desc = &Unkeyed64k_desc; break; + case S16: tp_desc = &Struct16_desc; break; + case S256: tp_desc = &Struct256_desc; break; + case S4k: tp_desc = &Struct4k_desc; break; + case S32k: tp_desc = &Struct32k_desc; break; + } + return tp_desc; +} diff --git a/src/tools/ddsperf/parameters.h b/src/tools/ddsperf/parameters.h new file mode 100644 index 0000000000..295db428f3 --- /dev/null +++ b/src/tools/ddsperf/parameters.h @@ -0,0 +1,111 @@ +// Copyright(c) 2019 ZettaScale Technology and others +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +// v. 1.0 which is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +#ifndef PARAMETERS_H +#define PARAMETERS_H + +#include + +#include + +#include "ddsperf_types.h" + + +enum topicsel { + KS, /* KeyedSeq type: seq#, key, sequence-of-octet */ + K32, /* Keyed32 type: seq#, key, array-of-24-octet (sizeof = 32) */ + K256, /* Keyed256 type: seq#, key, array-of-248-octet (sizeof = 256) */ + OU, /* OneULong type: seq# */ + UK16, /* Unkeyed16, type: seq#, array-of-12-octet (sizeof = 16) */ + UK1k, /* Unkeyed1k, type: seq#, array-of-1020-octet (sizeof = 1024) */ + UK64k, /* Unkeyed64k, type: seq#, array-of-65532-octet (sizeof = 65536) */ + S16, /* Keyed, 16 octets, int64 junk, seq#, key */ + S256, /* Keyed, 16 * S16, int64 junk, seq#, key */ + S4k, /* Keyed, 16 * S256, int64 junk, seq#, key */ + S32k /* Keyed, 4 * S4k, int64 junk, seq#, key */ +}; + +enum submode { + SM_NONE, /* no subscriber at all */ + SM_WAITSET, /* subscriber using a waitset */ + SM_POLLING, /* ... using polling, sleeping for 1ms if no data */ + SM_LISTENER /* ... using a DATA_AVAILABLE listener */ +}; + +enum flowmode { + FLOW_DEFAULT, + FLOW_PUB, + FLOW_SUB, +}; + +struct global { + double dur; /* Maximum run time in seconds */ + dds_time_t tref; + char netload_if[256]; + double netload_bw; + bool is_publishing; + enum submode submode; /* Data and ping/pong subscriber triggering modes */ + enum submode pingpongmode; + dds_duration_t ping_intv; /* Pinging interval for roundtrip testing, 0 means as fast as possible, DDS_INFINITY means never */ + bool substat_every_second; /* Whether to show "sub" stats every second even when nothing happens */ + bool collect_stats; /* collect statistics */ + bool extended_stats; /* Whether to show extended statistics (currently just rexmit info) */ + uint32_t minmatch; /* Minimum number of peers (if not met, exit status is 1) */ + double initmaxwait; /* Wait this long for MINMATCH peers before starting */ + double maxwait; /* Maximum time it may take to discover all MINMATCH peers */ + bool rss_check; /* Maximum allowed increase in RSS between 2nd RSS sample and final RSS sample: final one must be <= init * (1 + rss_factor/100) + rss_term */ + double rss_factor; + double rss_term; + uint64_t min_received; /* Minimum number of samples, minimum number of roundtrips to declare the run a success */ + uint64_t min_roundtrips; + bool livemem_check; /* Maximum allowed increase in live memory between initial sample, and final sample: final one must be <= init * (1 + livemem_factor/100) + livemem_term */ + double livemem_factor; + double livemem_term; + bool sublatency; /* Whether to gather/show latency information in "sub" mode */ +}; + +struct dataflow { + char *name; + enum flowmode mode; + char *topicname; + char *partition; + enum topicsel topicsel; /* Topic type to use */ + const dds_topic_descriptor_t *tp_desc; + unsigned nkeyvals; /* Number of different key values to use (must be 1 for OU type) */ + uint32_t baggagesize; /* Size of the sequence in KeyedSeq type in bytes */ + uint32_t burstsize; /* Data is published in bursts of this many samples */ + bool register_instances; /* Whether or not to register instances prior to writing */ + double pub_rate; /* Publishing rate in Hz, HUGE_VAL means as fast as possible, 0 means no throughput data is published at all */ + bool reliable; /* Whether to use reliable or best-effort readers/writers */ + int32_t histdepth; /* History depth for throughput data reader and writer; 0 is KEEP_ALL, otherwise it is KEEP_LAST histdepth. Ping/pong always uses KEEP_LAST 1. */ + uint32_t transport_prio; + uint32_t ping_frac; /* Fraction of throughput data samples that double as a ping message */ + bool use_writer_loan; /* Use writer loans (only for memcpy-able types) */ + struct dataflow *next; +}; + +struct multiplier { + const char *suffix; + int mult; +}; + +void init_globals (struct global *args); +struct dataflow * dataflow_new (void); +void dataflow_free (struct dataflow *flow); +const dds_topic_descriptor_t * get_topic_descriptor (enum topicsel topicsel); +bool get_topicsel_from_string (const char *name, enum topicsel *topicsel); +bool string_to_size (const char *str, uint32_t *val); +bool string_to_number (const char *str, uint32_t *val); +bool string_to_frequency (const char *str, double *val); +bool string_to_bool (const char *value, bool *result); +char * string_copy(const char *s); +bool read_parameters (const char *fname, const char *node_name, struct dataflow **flows, struct global *globals); + +#endif /* PARAMETERS_H */