Skip to content

Commit

Permalink
issue: 3623390 Implementing DOCA Flow steering creation
Browse files Browse the repository at this point in the history
1. Isolated mode DOCA Flow steering for multi-process
2. Start DOCA RXQ and attach with DOCA FLow
3. Enable fork() with DOCA for Nginx.

Signed-off-by: Alexander Grissik <agrissik@nvidia.com>
  • Loading branch information
AlexanderGrissik committed May 29, 2024
1 parent ee5254c commit 75069a6
Show file tree
Hide file tree
Showing 19 changed files with 442 additions and 34 deletions.
74 changes: 74 additions & 0 deletions src/core/dev/cq_mgr_rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ void cq_mgr_rx::configure(int cq_size)

cq_logdbg("Created CQ as Rx with fd[%d] and of size %d elements (ibv_cq_hndl=%p)",
get_channel_fd(), cq_size, m_p_ibv_cq);

doca_error_t rc = doca_pe_create(&m_doca_pe);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_pe_create");
throw_xlio_exception("doca_pe_create failed");
}

cq_logdbg("Created DOCA PE %p", m_doca_pe);
}

cq_mgr_rx::~cq_mgr_rx()
Expand Down Expand Up @@ -170,6 +178,13 @@ cq_mgr_rx::~cq_mgr_rx()
xlio_stats_instance_remove_cq_block(m_p_cq_stat);

cq_logdbg("Destroying Rx CQ done");

if (m_doca_pe) {
doca_error_t rc = doca_pe_destroy(m_doca_pe);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_pe_destroy PE:%p", m_doca_pe);
}
}
}

void cq_mgr_rx::statistics_print()
Expand Down Expand Up @@ -240,6 +255,65 @@ void cq_mgr_rx::add_hqrx(hw_queue_rx *hqrx_ptr)
hqrx_ptr->get_rx_max_wr_num() - hqrx_wr_num, hqrx_ptr->get_rx_max_wr_num());

m_debt = 0;

doca_error_t rc = doca_mmap_create(&temp_doca_mmap);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_mmap_create");
}

rc = doca_mmap_set_max_num_devices(temp_doca_mmap, 1U);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_mmap_set_max_num_devices");
}

rc = doca_mmap_add_dev(temp_doca_mmap, m_p_ib_ctx_handler->get_doca_device());
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_mmap_add_dev");
}

size_t alloc_size = 32 * 16384;
xlio_allocator_heap temp_heap(false);
void *memptr = temp_heap.alloc(alloc_size);

rc = doca_mmap_set_memrange(temp_doca_mmap, memptr, 32 * 16384);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_mmap_set_memrange");
}

rc = doca_mmap_start(temp_doca_mmap);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_mmap_start");
}

rc = doca_buf_inventory_create(32U, &temp_doca_inventory);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_buf_inventory_create");
}

rc = doca_buf_inventory_start(temp_doca_inventory);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_buf_inventory_start");
}

for (int i = 0; i < 32; ++i) {
rc = doca_buf_inventory_buf_get_by_addr(temp_doca_inventory, temp_doca_mmap,
(uint8_t *)memptr + (i * 16384), 16384,
temp_doca_bufs + i);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_buf_inventory_buf_get_by_data");
}

rc = doca_eth_rxq_task_recv_allocate_init(m_hqrx_ptr->m_doca_rxq.get(), {.ptr = nullptr},
temp_doca_bufs[i], temp_doca_tasks + i);
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_eth_rxq_task_recv_allocate_init");
}

rc = doca_task_submit(doca_eth_rxq_task_recv_as_doca_task(temp_doca_tasks[i]));
if (DOCA_IS_ERROR(rc)) {
PRINT_DOCA_ERR(cq_logerr, rc, "doca_eth_rxq_task_recv_as_doca_task");
}
}
}

void cq_mgr_rx::del_hqrx(hw_queue_rx *hqrx_ptr)
Expand Down
13 changes: 12 additions & 1 deletion src/core/dev/cq_mgr_rx.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
#include "proto/mem_buf_desc.h"
#include "proto/xlio_lwip.h"
#include "xlio_extra.h"
#include <doca_pe.h>
#include <doca_eth_rxq_cpu_data_path.h>
#include <doca_buf_inventory.h>
#include <doca_mmap.h>
#include <doca_buf.h>

#if VLIST_DEBUG
#define VLIST_DEBUG_CQ_MGR_PRINT_ERROR_IS_MEMBER \
Expand Down Expand Up @@ -71,6 +76,11 @@ class cq_mgr_rx {
friend class rfs_uc_tcp_gro; // need for stats

public:
doca_buf_inventory *temp_doca_inventory = nullptr;
doca_mmap *temp_doca_mmap = nullptr;
doca_buf *temp_doca_bufs[32];
doca_eth_rxq_task_recv *temp_doca_tasks[32];

enum buff_status_e {
BS_OK,
BS_CQE_RESP_WR_IMM_NOT_SUPPORTED,
Expand All @@ -87,7 +97,7 @@ class cq_mgr_rx {

ibv_cq *get_ibv_cq_hndl() { return m_p_ibv_cq; }
int get_channel_fd() { return m_comp_event_channel ? m_comp_event_channel->fd : 0; }

doca_pe *get_doca_pe() const { return m_doca_pe; }
/**
* Arm the managed CQ's notification channel
* Calling this more then once without get_event() will return without
Expand Down Expand Up @@ -176,6 +186,7 @@ class cq_mgr_rx {

virtual void statistics_print();

doca_pe *m_doca_pe = nullptr;
xlio_ib_mlx5_cq_t m_mlx5_cq;
hw_queue_rx *m_hqrx_ptr = nullptr;
mem_buf_desc_t *m_rx_hot_buffer = nullptr;
Expand Down
6 changes: 6 additions & 0 deletions src/core/dev/cq_mgr_rx_regrq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

#define cq_logfunc __log_info_func
#define cq_logdbg __log_info_dbg
#define cq_loginfo __log_info_info
#define cq_logwarn __log_info_warn
#define cq_logerr __log_info_err
#define cq_logpanic __log_info_panic
Expand Down Expand Up @@ -344,6 +345,11 @@ int cq_mgr_rx_regrq::poll_and_process_element_rx(uint64_t *p_cq_poll_sn, void *p
/* Assume locked!!! */
cq_logfuncall("");

uint8_t rc = doca_pe_progress(m_doca_pe);
if (rc) {
cq_loginfo("doca_pe_progress PROGRESS");
}

uint32_t ret_rx_processed = process_recv_queue(pv_fd_ready_array);
if (unlikely(ret_rx_processed >= m_n_sysvar_cq_poll_batch_max)) {
m_p_ring->m_gro_mgr.flush_all(pv_fd_ready_array);
Expand Down
140 changes: 130 additions & 10 deletions src/core/dev/hw_queue_rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include "dev/rfs_rule.h"
#include "dev/cq_mgr_rx_regrq.h"
#include "dev/cq_mgr_rx_strq.h"
#include <doca_ctx.h>
#include <doca_eth_rxq_cpu_data_path.h>

#undef MODULE_NAME
#define MODULE_NAME "hw_queue_rx"
Expand Down Expand Up @@ -141,7 +143,7 @@ bool hw_queue_rx::configure_rq(ibv_comp_channel *rx_comp_event_channel)
}

// Create the QP
if (!prepare_rq(mlx5_cq.cq_num)) {
if (!prepare_rq(mlx5_cq.cq_num, m_p_cq_mgr_rx->get_doca_pe())) {
return false;
}

Expand All @@ -158,6 +160,7 @@ void hw_queue_rx::up()
release_rx_buffers(); // We might have old flushed cqe's in our CQ still from previous HA event

modify_queue_to_ready_state();
start_doca_rxq();

m_p_cq_mgr_rx->add_hqrx(this);
}
Expand All @@ -168,6 +171,8 @@ void hw_queue_rx::down()

modify_queue_to_error_state();

stop_doca_rxq();

// let the QP drain all wqe's to flushed cqe's now that we moved
// it to error state and post_sent final trigger for completion
usleep(1000);
Expand Down Expand Up @@ -231,6 +236,62 @@ void hw_queue_rx::post_recv_buffers(descq_t *p_buffers, size_t count)
}
}

/*void hw_queue_rx::callback_rxq_state_changed(
const union doca_data user_data, struct doca_ctx *ctx,
enum doca_ctx_states prev_state, enum doca_ctx_states next_state)
{
hw_queue_rx *hw_rxq = reinterpret_cast<hw_queue_rx *>(user_data.ptr);
if (DOCA_CTX_STATE_IDLE == next_state) {
hw_rxq-
}
}*/

void hw_queue_rx::start_doca_rxq()
{
hwqrx_logdbg("Starting DOCA RXQ: %p", m_doca_rxq.get());

if (!m_p_ib_ctx_handler->get_doca_flow_port()) {
hwqrx_logerr("modify_queue_to_ready_state unable to get DOCA flow port, RXQ: %p", this);
}

doca_error_t err = doca_ctx_start(m_doca_ctx_rxq);
if (DOCA_IS_ERROR(err)) {
PRINT_DOCA_ERR(hwqrx_logerr, err, "doca_ctx_start(RXQ). RXQ:%p", m_doca_rxq.get());
}

hwqrx_loginfo("DOCA RXQ started, ctx: %p", m_doca_ctx_rxq);

err = doca_eth_rxq_get_flow_queue_id(m_doca_rxq.get(), &m_doca_rx_queue_id);
if (DOCA_IS_ERROR(err)) {
PRINT_DOCA_ERR(hwqrx_logerr, err, "doca_eth_rxq_get_flow_queue_id. RXQ:%p",
m_doca_rxq.get());
}
}

void hw_queue_rx::stop_doca_rxq()
{
hwqrx_logdbg("Stopping DOCA RXQ: %p", m_doca_rxq.get());

doca_error_t err = doca_ctx_stop(m_doca_ctx_rxq);
if (DOCA_ERROR_IN_PROGRESS == err) {
doca_ctx_states ctx_state = DOCA_CTX_STATE_STOPPING; // Just to enter the while loop.
doca_pe *pe = m_p_cq_mgr_rx->get_doca_pe();
while (DOCA_CTX_STATE_IDLE != ctx_state) {
if (!doca_pe_progress(pe)) {
err = doca_ctx_get_state(m_doca_ctx_rxq, &ctx_state);
if (err != DOCA_SUCCESS) {
PRINT_DOCA_ERR(hwqrx_logerr, err,
"Error flushing DOCA RXQ (doca_ctx_get_state). RXQ:%p",
m_doca_rxq.get());
break;
}
}
}
} else if (DOCA_IS_ERROR(err)) {
PRINT_DOCA_ERR(hwqrx_logerr, err, "doca_ctx_stop(RXQ). RXQ:%p", m_doca_rxq.get());
}
}

void hw_queue_rx::modify_queue_to_ready_state()
{
hwqrx_logdbg("");
Expand Down Expand Up @@ -260,18 +321,20 @@ void hw_queue_rx::modify_queue_to_error_state()
}
}

rfs_rule *hw_queue_rx::create_rfs_rule(dpcp::match_params &match_value,
rfs_rule *hw_queue_rx::create_rfs_rule(doca_flow_match &match_val, doca_flow_match &match_msk,
dpcp::match_params &match_value,
dpcp::match_params &match_mask, uint16_t priority,
uint32_t flow_tag, xlio_tir *tir_ext)
{
if (m_p_ib_ctx_handler && m_p_ib_ctx_handler->get_dpcp_adapter()) {
// TLS RX uses tir_ext.
dpcp::tir *dpcp_tir = (tir_ext ? xlio_tir_to_dpcp_tir(tir_ext) : m_tir.get());
uint16_t rxq_id = m_doca_rx_queue_id; // TODO: Add Support for TLS-RX.

std::unique_ptr<rfs_rule> new_rule(new rfs_rule());
if (dpcp_tir &&
new_rule->create(match_value, match_mask, *dpcp_tir, priority, flow_tag,
*m_p_ib_ctx_handler)) {
if (dpcp_tir && m_doca_rx_queue_id &&
new_rule->create(match_val, match_msk, rxq_id, match_value, match_mask, *dpcp_tir,
priority, flow_tag, *m_p_ib_ctx_handler)) {
return new_rule.release();
}
}
Expand Down Expand Up @@ -509,7 +572,40 @@ void hw_queue_rx::destory_doca_rxq(doca_eth_rxq *rxq)
}
}

bool hw_queue_rx::prepare_rq(uint32_t cqn)
void task_completion_cb(doca_eth_rxq_task_recv *task_recv, doca_data task_user_data,
doca_data ctx_user_data)
{
NOT_IN_USE(task_user_data);
NOT_IN_USE(ctx_user_data);

doca_task_free(doca_eth_rxq_task_recv_as_doca_task(task_recv));

vlog_printf(VLOG_INFO, "PACKET RECEIVED pid: %d\n", (int)getpid());
}

void hw_queue_rx::rx_task_error_cb(doca_eth_rxq_task_recv *task_recv, doca_data task_user_data,
doca_data ctx_user_data)
{
NOT_IN_USE(task_user_data);
NOT_IN_USE(ctx_user_data);

hw_queue_rx *hw_rx = reinterpret_cast<hw_queue_rx *>(ctx_user_data.ptr);
doca_ctx_states ctx_state = DOCA_CTX_STATE_STOPPING;
doca_error_t rc_state = doca_ctx_get_state(hw_rx->m_doca_ctx_rxq, &ctx_state);
ctx_state = ((ctx_state != DOCA_CTX_STATE_IDLE) ? ctx_state : DOCA_CTX_STATE_STOPPING);
if (rc_state != DOCA_SUCCESS || ctx_state != DOCA_CTX_STATE_STOPPING) {
PRINT_DOCA_ERR(__log_err,
doca_task_get_status(doca_eth_rxq_task_recv_as_doca_task(task_recv)),
"RX Task Error");
}

doca_task_free(doca_eth_rxq_task_recv_as_doca_task(task_recv));

__log_func("rx_task_error_cb, task_recv: %p, rc_state: %d, ctx_state: %d", task_recv, rc_state,
ctx_state);
}

bool hw_queue_rx::prepare_rq(uint32_t cqn, doca_pe *pe)
{
hwqrx_logdbg("");

Expand All @@ -531,14 +627,16 @@ bool hw_queue_rx::prepare_rq(uint32_t cqn)
return false;
}

hwqrx_loginfo("RXQ caps MaxBurstSize %u, MaxPacketSize %u, Dev:%s", max_burst_size,
max_packet_size, m_p_ib_ctx_handler->get_ibname().c_str());

if (m_rx_num_wr > max_burst_size) {
hwqrx_logwarn("Decreasing MaxBurstSize %u to capability %u.", m_rx_num_wr, max_burst_size);
hwqrx_logwarn("Decreasing BurstSize %u to capability %u.", m_rx_num_wr, max_burst_size);
m_rx_num_wr = max_burst_size;
}

hwqrx_logdbg(
"Creating DOCA RXQ MaxBurstSize: %u, CapMaxBurstSize: %u, MaxPacketSize: %u, Dev:%s",
m_rx_num_wr, max_burst_size, max_packet_size, m_p_ib_ctx_handler->get_ibname().c_str());
hwqrx_loginfo("Creating DOCA RXQ MaxBurstSize: %u, MaxPacketSize: %u, Dev:%s", m_rx_num_wr,
max_packet_size, m_p_ib_ctx_handler->get_ibname().c_str());

doca_eth_rxq *rxq = nullptr;
doca_error_t err = doca_eth_rxq_create(dev, m_rx_num_wr, max_packet_size, &rxq);
Expand All @@ -548,13 +646,35 @@ bool hw_queue_rx::prepare_rq(uint32_t cqn)
}

m_doca_rxq.reset(rxq);
m_doca_ctx_rxq = doca_eth_rxq_as_doca_ctx(rxq);

err = doca_ctx_set_user_data(m_doca_ctx_rxq, {.ptr = this});
if (DOCA_IS_ERROR(err)) {
PRINT_DOCA_ERR(hwqrx_logerr, err, "doca_ctx_set_user_data ctx/hw_queue_rx: %p,%p",
m_doca_ctx_rxq, this);
return false;
}

err = doca_eth_rxq_set_type(m_doca_rxq.get(), DOCA_ETH_RXQ_TYPE_REGULAR);
if (DOCA_IS_ERROR(err)) {
PRINT_DOCA_ERR(hwqrx_logerr, err, "doca_eth_rxq_get_type_supported");
return false;
}

err = doca_eth_rxq_task_recv_set_conf(m_doca_rxq.get(), task_completion_cb, rx_task_error_cb,
safe_mce_sys().cq_poll_batch_max);
if (DOCA_IS_ERROR(err)) {
PRINT_DOCA_ERR(hwqrx_logerr, err, "doca_eth_rxq_task_recv_set_conf rxq: %p max-tasks: %u",
m_doca_rxq.get(), safe_mce_sys().cq_poll_batch_max);
return false;
}

err = doca_pe_connect_ctx(pe, m_doca_ctx_rxq);
if (DOCA_IS_ERROR(err)) {
PRINT_DOCA_ERR(hwqrx_logerr, err, "doca_pe_connect_ctx pe/ctx: %p,%p", pe, m_doca_ctx_rxq);
return false;
}

dpcp::adapter *dpcp_adapter = m_p_ib_ctx_handler->get_dpcp_adapter();
if (!dpcp_adapter) {
hwqrx_logerr("Failed to get dpcp::adapter for prepare_rq");
Expand Down
Loading

0 comments on commit 75069a6

Please sign in to comment.