Skip to content

Commit

Permalink
prov/net: Expand saved rx support to include storing msg data
Browse files Browse the repository at this point in the history
Currently, we only save and continue if the inbound message
contains no application data.  Expand this to allow storing a
small amount of application data (up to the inject size).

This will be expanded by a subsequent patch to allow saving
multiple messages, which is needed to handle MPI_Bsend call
semantics.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
  • Loading branch information
shefty committed Dec 16, 2022
1 parent 23ea687 commit d2676e7
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 126 deletions.
17 changes: 8 additions & 9 deletions prov/net/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ extern size_t xnet_zerocopy_size;
extern int xnet_trace_msg;
extern int xnet_disable_autoprog;
extern int xnet_io_uring;
extern int xnet_max_saved;

struct xnet_xfer_entry;
struct xnet_ep;
Expand Down Expand Up @@ -192,7 +193,6 @@ struct xnet_ep {
struct xnet_active_tx cur_tx;
OFI_DBG_VAR(uint8_t, tx_id)
OFI_DBG_VAR(uint8_t, rx_id)
struct xnet_active_rx saved_rx;

struct dlist_entry unexp_entry;
struct dlist_entry saved_entry;
Expand All @@ -202,6 +202,8 @@ struct xnet_ep {
struct slist need_ack_queue;
struct slist async_queue;
struct slist rma_read_queue;
struct slist saved_queue;
int saved_cnt;
int rx_avail;
struct xnet_srx *srx;

Expand Down Expand Up @@ -364,6 +366,8 @@ static inline void xnet_signal_progress(struct xnet_progress *progress)
#define XNET_ASYNC BIT(5)
#define XNET_INJECT_OP BIT(6)
#define XNET_FREE_BUF BIT(7)
#define XNET_SAVED_XFER BIT(8)
#define XNET_COPY_RECV BIT(9)
#define XNET_MULTI_RECV FI_MULTI_RECV /* BIT(16) */

struct xnet_xfer_entry {
Expand Down Expand Up @@ -614,14 +618,9 @@ static inline bool xnet_has_unexp(struct xnet_ep *ep)
return ep->cur_rx.handler && !ep->cur_rx.entry;
}

static inline bool xnet_has_saved_rx(struct xnet_ep *ep)
{
assert(xnet_progress_locked(xnet_ep2_progress(ep)));
return ep->saved_rx.hdr_done != 0;
}

void xnet_complete_saved(struct xnet_ep *ep, struct xnet_xfer_entry *rx_entry);
void xnet_clear_saved_rx(struct xnet_ep *ep);
void xnet_recv_saved(struct xnet_xfer_entry *saved_entry,
struct xnet_xfer_entry *rx_entry);
void xnet_complete_saved(struct xnet_xfer_entry *saved_entry);

#define XNET_WARN_ERR(subsystem, log_str, err) \
FI_WARN(&xnet_prov, subsystem, log_str "%s (%d)\n", \
Expand Down
19 changes: 14 additions & 5 deletions prov/net/src/xnet_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,15 @@ void xnet_report_success(struct xnet_ep *ep, struct util_cq *cq,
size_t len;

if (!(xfer_entry->cq_flags & FI_COMPLETION) ||
(xfer_entry->ctrl_flags & XNET_INTERNAL_XFER))
(xfer_entry->ctrl_flags & (XNET_INTERNAL_XFER | XNET_SAVED_XFER)))
return;

if (xfer_entry->ctrl_flags & XNET_COPY_RECV) {
xfer_entry->ctrl_flags &= ~XNET_COPY_RECV;
xnet_complete_saved(xfer_entry);
return;
}

flags = xfer_entry->cq_flags & ~FI_COMPLETION;
if (flags & FI_RECV) {
len = xfer_entry->hdr.base_hdr.size -
Expand Down Expand Up @@ -169,13 +175,16 @@ void xnet_cq_report_error(struct util_cq *cq,
{
struct fi_cq_err_entry err_entry;

if (xfer_entry->ctrl_flags & (XNET_INTERNAL_XFER | XNET_INJECT_OP)) {
if (xfer_entry->ctrl_flags & XNET_INTERNAL_XFER)
FI_WARN(&xnet_prov, FI_LOG_CQ, "internal transfer "
if (xfer_entry->ctrl_flags &
(XNET_INTERNAL_XFER | XNET_SAVED_XFER | XNET_INJECT_OP)) {
if (xfer_entry->ctrl_flags &
(XNET_INTERNAL_XFER | XNET_SAVED_XFER)) {
FI_WARN(&xnet_prov, FI_LOG_CQ, "internal/saved transfer "
"failed (%s)\n", fi_strerror(err));
else
} else {
FI_WARN(&xnet_prov, FI_LOG_CQ, "inject transfer "
"failed (%s)\n", fi_strerror(err));
}
return;
}

Expand Down
8 changes: 6 additions & 2 deletions prov/net/src/xnet_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ static void xnet_ep_flush_all_queues(struct xnet_ep *ep)
xnet_ep_flush_queue(ep, &ep->rma_read_queue, cq);
xnet_ep_flush_queue(ep, &ep->need_ack_queue, cq);
xnet_ep_flush_queue(ep, &ep->async_queue, cq);
xnet_ep_flush_queue(ep, &ep->saved_queue, cq);
ep->saved_cnt = 0;

cq = container_of(ep->util_ep.rx_cq, struct xnet_cq, util_cq);
if (ep->cur_rx.entry) {
Expand Down Expand Up @@ -367,9 +369,8 @@ void xnet_ep_disable(struct xnet_ep *ep, int cm_err, void* err_data,
return;
};

if (xnet_has_saved_rx(ep))
xnet_clear_saved_rx(ep);
dlist_remove_init(&ep->unexp_entry);
dlist_remove_init(&ep->saved_entry);
xnet_halt_sock(xnet_ep2_progress(ep), ep->bsock.sock);

ret = ofi_shutdown(ep->bsock.sock, SHUT_RDWR);
Expand Down Expand Up @@ -524,6 +525,7 @@ static int xnet_ep_close(struct fid *fid)
progress = xnet_ep2_progress(ep);
ofi_genlock_lock(&progress->lock);
dlist_remove_init(&ep->unexp_entry);
dlist_remove_init(&ep->saved_entry);
xnet_halt_sock(progress, ep->bsock.sock);
xnet_ep_flush_all_queues(ep);
ofi_genlock_unlock(&progress->lock);
Expand Down Expand Up @@ -736,12 +738,14 @@ int xnet_endpoint(struct fid_domain *domain, struct fi_info *info,
}

dlist_init(&ep->unexp_entry);
dlist_init(&ep->saved_entry);
slist_init(&ep->rx_queue);
slist_init(&ep->tx_queue);
slist_init(&ep->priority_queue);
slist_init(&ep->rma_read_queue);
slist_init(&ep->need_ack_queue);
slist_init(&ep->async_queue);
slist_init(&ep->saved_queue);

if (info->ep_attr->rx_ctx_cnt != FI_SHARED_CONTEXT)
ep->rx_avail = (int) info->rx_attr->size;
Expand Down
13 changes: 12 additions & 1 deletion prov/net/src/xnet_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ size_t xnet_zerocopy_size = SIZE_MAX;
int xnet_trace_msg;
int xnet_disable_autoprog;
int xnet_io_uring;
int xnet_max_saved = 4;


static void xnet_init_env(void)
Expand Down Expand Up @@ -115,8 +116,18 @@ static void xnet_init_env(void)
if (!fi_param_get_size_t(&xnet_prov, "rx_size", &rx_size))
xnet_default_rx_size = rx_size;

fi_param_define(&xnet_prov, "max_saved", FI_PARAM_INT,
"maximum number of received messages that do not "
"have a posted application buffer that will be "
"queued by the provider. A larger value increases "
"memory and processing overhead, negatively "
"impacting performance, but may be required by some "
"applications to prevent hangs. (default: %d)",
xnet_max_saved);
fi_param_get_int(&xnet_prov, "max_saved", &xnet_max_saved);
fi_param_define(&xnet_prov, "nodelay", FI_PARAM_BOOL,
"overrides default TCP_NODELAY socket setting");
"overrides default TCP_NODELAY socket setting "
"(default %d)", xnet_nodelay);
fi_param_get_bool(&xnet_prov, "nodelay", &xnet_nodelay);

fi_param_define(&xnet_prov, "staging_sbuf_size", FI_PARAM_INT,
Expand Down
Loading

0 comments on commit d2676e7

Please sign in to comment.