Skip to content

Commit

Permalink
Merge pull request #1463 from EdwardSro/pr-pyverbs-tests
Browse files Browse the repository at this point in the history
tests: Extend Pyverbs and add more tests
  • Loading branch information
rleon authored May 30, 2024
2 parents 2b0f69b + c20aa53 commit 15b0142
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 42 deletions.
1 change: 1 addition & 0 deletions pyverbs/cq.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ cdef class CQEX(PyverbsCM):
cdef add_ref(self, obj)
cdef object qps
cdef object srqs
cdef object wqs

cdef class WC(PyverbsObject):
cdef v.ibv_wc wc
Expand Down
5 changes: 4 additions & 1 deletion pyverbs/cq.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ cdef class CQEX(PyverbsCM):
super().__init__()
self.qps = weakref.WeakSet()
self.srqs = weakref.WeakSet()
self.wqs = weakref.WeakSet()
if self.cq != NULL:
# Leave CQ initialization to the provider
return
Expand All @@ -336,6 +337,8 @@ cdef class CQEX(PyverbsCM):
self.qps.add(obj)
elif isinstance(obj, SRQ):
self.srqs.add(obj)
elif isinstance(obj, WQ):
self.wqs.add(obj)
else:
raise PyverbsError('Unrecognized object type')

Expand All @@ -346,7 +349,7 @@ cdef class CQEX(PyverbsCM):
if self.cq != NULL:
if self.logger:
self.logger.debug('Closing CQEx')
close_weakrefs([self.srqs, self.qps])
close_weakrefs([self.srqs, self.qps, self.wqs])
rc = v.ibv_destroy_cq(<v.ibv_cq*>self.cq)
if rc != 0:
raise PyverbsRDMAError('Failed to destroy CQEX', rc)
Expand Down
6 changes: 6 additions & 0 deletions pyverbs/device.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ cdef class Context(PyverbsCM):
raise PyverbsRDMAError(f'Failed to query pkey {index} of port {port_num}')
return pkey

def get_pkey_index(self, unsigned int port_num, int pkey):
idx = v.ibv_get_pkey_index(self.context, port_num, pkey)
if idx == -1:
raise PyverbsRDMAError(f'Failed to get pkey index of pkey = {pkey} of port {port_num}')
return idx

def query_gid(self, unsigned int port_num, int index):
gid = GID()
rc = v.ibv_query_gid(self.context, port_num, index, &gid.gid)
Expand Down
1 change: 1 addition & 0 deletions pyverbs/libibverbs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ cdef extern from 'infiniband/verbs.h':
int index, ibv_gid *gid)
int ibv_query_pkey(ibv_context *context, unsigned int port_num,
int index, uint16_t *pkey)
int ibv_get_pkey_index(ibv_context *context, unsigned int port_num, uint16_t pkey)
ibv_pd *ibv_alloc_pd(ibv_context *context)
int ibv_dealloc_pd(ibv_pd *pd)
ibv_mr *ibv_reg_mr(ibv_pd *pd, void *addr, size_t length, int access)
Expand Down
2 changes: 2 additions & 0 deletions pyverbs/providers/mlx5/mlx5dv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ cdef class Mlx5UAR(PyverbsObject):

cdef class Mlx5DmOpAddr(PyverbsCM):
cdef void *addr
@staticmethod
cdef void _cpy(void *dst, void *src, int length)

cdef class WqeSeg(PyverbsCM):
cdef void *segment
Expand Down
40 changes: 31 additions & 9 deletions pyverbs/providers/mlx5/mlx5dv.pyx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019 Mellanox Technologies, Inc. All rights reserved. See COPYING file

from libc.stdint cimport uintptr_t, uint8_t, uint16_t, uint32_t
from libc.stdint cimport uintptr_t, uint8_t, uint16_t, uint32_t, uint64_t
from libc.string cimport memcpy, memset
from libc.stdlib cimport calloc, free
from posix.mman cimport munmap
Expand Down Expand Up @@ -1427,24 +1427,46 @@ cdef class Mlx5DmOpAddr(PyverbsCM):
def unmap(self, length):
munmap(self.addr, length)

@staticmethod
cdef void _cpy(void *dst, void *src, int length):
"""
Copy data (bytes) from src to dst. To ensure atomicity, copy in a single
write operation.
:param dst: The address to copy from.
:param src: The address to copy to.
:param length: Length in bytes. (supports: power of two. up to 8 bytes)
"""
if length == 1:
(<uint8_t *> dst)[0] = (<uint8_t *> src)[0]
elif length == 2:
(<uint16_t *> dst)[0] = (<uint16_t *> src)[0]
elif length == 4:
(<uint32_t *> dst)[0] = (<uint32_t *> src)[0]
elif length == 8:
(<uint64_t *> dst)[0] = (<uint64_t *> src)[0]
elif length == 16:
raise PyverbsUserError('Currently PyVerbs does not support 16 bytes Memic Atomic operations')
else:
raise PyverbsUserError(f'Memic Atomic operations do not support with length: {length}')

def write(self, data):
"""
Writes data (bytes) to the DM operation address using memcpy.
Writes data (bytes) to the DM operation address.
:param data: Bytes of data
"""
memcpy(<char *>self.addr, <char *>data, len(data))
length = len(data)
Mlx5DmOpAddr._cpy(<void *> self.addr, <void *><char *> data, length)


def read(self, length):
"""
Reads 'length' bytes from the DM operation address using memcpy.
Reads 'length' bytes from the DM operation address.
:param length: Data length to read (in bytes)
:return: Read data in bytes
"""
cdef char *data = <char*> calloc(length, sizeof(char))
if data == NULL:
raise PyverbsError('Failed to allocate memory')
memcpy(<char *>data, <char *>self.addr, length)
res = data[:length]
cdef void *data = calloc(length, sizeof(char))
Mlx5DmOpAddr._cpy(data, <void *>self.addr, length)
res = (<char *> data)[:length]
free(data)
return res

Expand Down
4 changes: 2 additions & 2 deletions pyverbs/wq.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ from pyverbs.pd cimport PD
cdef class WQInitAttr(PyverbsObject):
cdef v.ibv_wq_init_attr attr
cdef PD pd
cdef CQ cq
cdef object cq

cdef class WQAttr(PyverbsObject):
cdef v.ibv_wq_attr attr
Expand All @@ -22,7 +22,7 @@ cdef class WQ(PyverbsCM):
cdef v.ibv_wq *wq
cdef Context context
cdef PD pd
cdef CQ cq
cdef object cq
cdef object rwq_ind_tables
cpdef add_ref(self, obj)

Expand Down
29 changes: 20 additions & 9 deletions pyverbs/wq.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ from pyverbs.base cimport close_weakrefs
cimport pyverbs.libibverbs_enums as e
from pyverbs.device cimport Context
from pyverbs.wr cimport RecvWR
from pyverbs.cq cimport CQ
from pyverbs.cq cimport CQ, CQEX
from pyverbs.pd cimport PD
from pyverbs.qp cimport QP


cdef class WQInitAttr(PyverbsObject):
def __init__(self, wq_context=None, PD wq_pd=None, CQ wq_cq=None, wq_type=e.IBV_WQT_RQ,
def __init__(self, wq_context=None, PD wq_pd=None, wq_cq=None, wq_type=e.IBV_WQT_RQ,
max_wr=100, max_sge=1, comp_mask=0, create_flags=0):
"""
Initializes a WqInitAttr object representing ibv_wq_init_attr struct.
:param wq_context: Associated WQ context
:param wq_pd: PD to be associated with the WQ
:param wq_cq: CQ to be associated with the WQ
:param wq_cq: CQ or CQEX to be associated with the WQ
:param wp_type: The desired WQ type
:param max_wr: Requested max number of outstanding WRs in the WQ
:param max_sge: Requested max number of scatter/gather (s/g) elements per WR in the WQ
Expand All @@ -40,7 +40,13 @@ cdef class WQInitAttr(PyverbsObject):
self.pd = wq_pd
self.attr.pd = wq_pd.pd if wq_pd else NULL
self.cq = wq_cq
self.attr.cq = wq_cq.cq if wq_cq else NULL
if wq_cq:
if isinstance(wq_cq, CQ):
self.attr.cq = (<CQ>wq_cq).cq
else:
self.attr.cq = (<CQEX>wq_cq).ibv_cq
else:
self.attr.cq = NULL
self.attr.comp_mask = comp_mask
self.attr.create_flags = create_flags

Expand All @@ -63,9 +69,12 @@ cdef class WQInitAttr(PyverbsObject):
def cq(self):
return self.cq
@cq.setter
def cq(self, CQ val):
def cq(self, val):
self.cq = val
self.attr.cq = <v.ibv_cq*>val.cq
if isinstance(val, CQ):
self.attr.cq = (<CQ>val).cq
else:
self.attr.cq = (<CQEX>val).ibv_cq


cdef class WQAttr(PyverbsObject):
Expand Down Expand Up @@ -140,9 +149,11 @@ cdef class WQ(PyverbsCM):
pd = <PD>attr.pd
pd.add_ref(self)
self.pd = pd
cq = <CQ>attr.cq
cq.add_ref(self)
self.cq = cq
if isinstance(attr.cq, CQ):
(<CQ>attr.cq).add_ref(self)
elif isinstance(attr.cq, CQEX):
(<CQEX>attr.cq).add_ref(self)
self.cq = attr.cq
self.rwq_ind_tables = weakref.WeakSet()

cpdef add_ref(self, obj):
Expand Down
12 changes: 12 additions & 0 deletions tests/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ def test_query_pkey(self):
if dev.node_type == e.IBV_NODE_CA:
ctx.query_pkey(port_num=self.ib_port, index=0)

def test_get_pkey_index(self):
"""
Test ibv_get_pkey_index()
"""
source_pkey_index = 0
with d.Context(name=self.dev_name) as ctx:
pkey = u.get_pkey_from_kernel(device=self.dev_name, port=self.ib_port,
index=source_pkey_index)
queried_pkey_idx = ctx.get_pkey_index(port_num=self.ib_port, pkey=pkey)
self.assertEqual(queried_pkey_idx, source_pkey_index,
f'Got index={queried_pkey_idx}\nExpected index={source_pkey_index}')

def test_query_gid(self):
"""
Test ibv_query_gid()
Expand Down
10 changes: 5 additions & 5 deletions tests/test_mlx5_dm_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ def test_dm_atomic_ops(self):
raise ex
inc_addr.write(b'\x01')
inc_addr.write(b'\x01')
# Now we should read 0x02 and the memory set to ffs
# Now we should read 0x02 and the memory set to 0x1
val = int.from_bytes(test_and_set_addr.read(1), 'big')
self.assertEqual(val, 2)
# Verify that TEST_AND_SET set the memory to ffs
# Verify that TEST_AND_SET set the memory to 0x1
val = int.from_bytes(test_and_set_addr.read(1), 'big')
self.assertEqual(val, 255)
self.assertEqual(val, 1)
inc_addr.unmap(self.dm_size)
test_and_set_addr.unmap(self.dm_size)

Expand Down Expand Up @@ -146,5 +146,5 @@ def test_parallel_dm_atomic_ops(self):
raise self.skip_queue.get()

val = int.from_bytes(self._read_from_op_addr(), 'big')
self.assertEqual(val, num_threads - 1,
f'Read value is ({val}) is different than expected ({num_threads-1})' )
self.assertEqual(val, num_threads + 1,
f'Read value is ({val}) is different than expected ({num_threads+1})' )
4 changes: 1 addition & 3 deletions tests/test_mlx5_mkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ def reg_mr_sig_t10dif(self):
player.qp.wr_set_mkey_access_flags(e.IBV_ACCESS_LOCAL_WRITE)
player.qp.wr_set_mkey_layout_list([sge])

t10dif_flags = (dve.MLX5DV_SIG_T10DIF_FLAG_REF_REMAP |
dve.MLX5DV_SIG_T10DIF_FLAG_APP_ESCAPE |
dve.MLX5DV_SIG_T10DIF_FLAG_APP_REF_ESCAPE)
t10dif_flags = dve.MLX5DV_SIG_T10DIF_FLAG_REF_REMAP
sig_t10dif = Mlx5SigT10Dif(bg_type=dve.MLX5DV_SIG_T10DIF_CRC,
bg=0xFFFF, app_tag=0xABCD,
ref_tag=0x01234567, flags=t10dif_flags)
Expand Down
27 changes: 23 additions & 4 deletions tests/test_mr.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,25 +629,27 @@ def test_dmabuf_rdma_traffic(self):


class DeviceMemoryRes(RCResources):
def __init__(self, dev_name, ib_port, gid_index, remote_access=False):
def __init__(self, dev_name, ib_port, gid_index, remote_access=False, msg_size=1024):
"""
Initialize DM resources based on RC resources that include RC
QP.
:param dev_name: Device name to be used.
:param ib_port: IB port of the device to use.
:param gid_index: Which GID index to use.
:param remote_access: If True, enable remote access.
:param msg_size: Message size (default: 1024).
"""
self.remote_access = remote_access
super().__init__(dev_name=dev_name, ib_port=ib_port,
gid_index=gid_index)
gid_index=gid_index, msg_size=msg_size)

def create_mr(self):
try:
self.dm = d.DM(self.ctx, d.AllocDmAttr(length=self.msg_size))
access = e.IBV_ACCESS_ZERO_BASED | e.IBV_ACCESS_LOCAL_WRITE
if self.remote_access:
access |= e.IBV_ACCESS_REMOTE_WRITE
access |= e.IBV_ACCESS_REMOTE_WRITE | e.IBV_ACCESS_REMOTE_READ | \
e.IBV_ACCESS_REMOTE_ATOMIC
self.mr = DMMR(self.pd, self.msg_size, access, self.dm, 0)
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
Expand All @@ -658,7 +660,8 @@ def create_qp_attr(self):
qp_attr = QPAttr(port_num=self.ib_port)
qp_attr.qp_access_flags = e.IBV_ACCESS_LOCAL_WRITE
if self.remote_access:
qp_attr.qp_access_flags |= e.IBV_ACCESS_REMOTE_WRITE
qp_attr.qp_access_flags |= e.IBV_ACCESS_REMOTE_WRITE | e.IBV_ACCESS_REMOTE_READ | \
e.IBV_ACCESS_REMOTE_ATOMIC
return qp_attr


Expand Down Expand Up @@ -687,3 +690,19 @@ def test_dm_traffic(self):
def test_dm_remote_traffic(self):
self.create_players(DeviceMemoryRes, remote_access=True)
u.rdma_traffic(**self.traffic_args, send_op=e.IBV_WR_RDMA_WRITE)

def test_dm_remote_write_traffic_imm(self):
self.create_players(DeviceMemoryRes, remote_access=True)
u.traffic(**self.traffic_args, send_op=e.IBV_WR_RDMA_WRITE_WITH_IMM)

def test_dm_remote_read_traffic(self):
self.create_players(DeviceMemoryRes, remote_access=True)
u.rdma_traffic(**self.traffic_args, send_op=e.IBV_WR_RDMA_READ)

def test_dm_atomic_fetch_add(self):
self.create_players(DeviceMemoryRes, remote_access=True, msg_size=8)
u.atomic_traffic(**self.traffic_args, send_op=e.IBV_WR_ATOMIC_FETCH_AND_ADD)

def test_dm_atomic_cmp_swp(self):
self.create_players(DeviceMemoryRes, remote_access=True, msg_size=8)
u.atomic_traffic(**self.traffic_args, send_op=e.IBV_WR_ATOMIC_CMP_AND_SWP)
20 changes: 18 additions & 2 deletions tests/test_odp.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from pyverbs.mem_alloc import mmap, munmap, madvise, MAP_ANONYMOUS_, MAP_PRIVATE_, \
MAP_HUGETLB_
from tests.base import RCResources, UDResources, XRCResources
from pyverbs.qp import QPCap, QPAttr, QPInitAttr
from pyverbs.wr import SGE, SendWR, RecvWR
from pyverbs.qp import QPAttr, QPInitAttr
from tests.base import RDMATestCase
from pyverbs.mr import MR
import pyverbs.enums as e
Expand Down Expand Up @@ -32,7 +32,8 @@ class OdpRC(RCResources):
def __init__(self, dev_name, ib_port, gid_index, is_huge=False,
request_user_addr=False, use_mr_prefetch=None, is_implicit=False,
prefetch_advice=e._IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
msg_size=1024, odp_caps=e.IBV_ODP_SUPPORT_SEND | e.IBV_ODP_SUPPORT_RECV):
msg_size=1024, odp_caps=e.IBV_ODP_SUPPORT_SEND | e.IBV_ODP_SUPPORT_RECV,
use_mixed_mr=False):
"""
Initialize an OdpRC object.
:param dev_name: Device name to be used
Expand All @@ -47,6 +48,7 @@ def __init__(self, dev_name, ib_port, gid_index, is_huge=False,
:param is_implicit: If True, register implicit MR.
:param prefetch_advice: The advice of the prefetch request (ignored
if use_mr_prefetch is None).
:param use_mixed_mr: If True, create a non-ODP MR in addition to the ODP MR.
"""
self.is_huge = is_huge
self.request_user_addr = request_user_addr
Expand All @@ -56,6 +58,8 @@ def __init__(self, dev_name, ib_port, gid_index, is_huge=False,
e.IBV_ACCESS_REMOTE_ATOMIC | e.IBV_ACCESS_REMOTE_READ | \
e.IBV_ACCESS_REMOTE_WRITE
self.user_addr = None
self.use_mixed_mr = use_mixed_mr
self.non_odp_mr = None
super(OdpRC, self).__init__(dev_name=dev_name, ib_port=ib_port,
gid_index=gid_index)
self.use_mr_prefetch = use_mr_prefetch
Expand All @@ -77,6 +81,8 @@ def create_mr(self):
access |= e.IBV_ACCESS_HUGETLB
self.mr = MR(self.pd, self.msg_size, access, address=self.user_addr,
implicit=self.is_implicit)
if self.use_mixed_mr:
self.non_odp_mr = MR(self.pd, self.msg_size, e.IBV_ACCESS_LOCAL_WRITE)

def create_qp_init_attr(self):
return QPInitAttr(qp_type=e.IBV_QPT_RC, scq=self.cq, sq_sig_all=0,
Expand All @@ -87,6 +93,11 @@ def create_qp_attr(self):
qp_attr.qp_access_flags = self.access
return qp_attr

def create_qp_cap(self):
if self.use_mixed_mr:
return QPCap(max_recv_wr=self.num_msgs, max_send_sge=2, max_recv_sge=2)
return super().create_qp_cap()


class OdpXRC(XRCResources):
def __init__(self, request_user_addr=False, **kwargs):
Expand Down Expand Up @@ -134,6 +145,11 @@ def test_odp_rc_traffic(self):
self.create_players(OdpRC, request_user_addr=self.force_page_faults)
u.traffic(**self.traffic_args)

def test_odp_rc_mixed_mr(self):
self.create_players(OdpRC, request_user_addr=self.force_page_faults,
use_mixed_mr=True)
u.traffic(**self.traffic_args)

def test_odp_rc_atomic_cmp_and_swp(self):
self.force_page_faults = False
self.create_players(OdpRC, request_user_addr=self.force_page_faults,
Expand Down
Loading

0 comments on commit 15b0142

Please sign in to comment.