Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2p debug #1438

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
6 changes: 2 additions & 4 deletions src/device/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@
__trace_hwreg()\
if (ncclShmem.work.header.type == ncclWorkTypeP2p) { \
struct ncclWorkElemP2p *p2pElems = ncclShmem.work.p2pElems; \
collTrace->p2p[0].connIndex = 0; \
collTrace->p2p[0].connIndex = p2pElems[0].connIndex; \
collTrace->p2pOpCount[0] = p2pElems[0].opCount; \
collTrace->p2p[0].ngroups = p2pElems[0].ngroups; \
collTrace->p2p[0].nWarps = p2pElems[0].nWarps; \
collTrace->p2p[0].warpStart = p2pElems[0].warpStart; \
collTrace->p2p[0].peer = p2pElems[0].p2pType == ncclWorkP2pTypeRecv ? (uint16_t)(p2pElems[0].peer) : -1; \
collTrace->p2p[1].connIndex = 0; \
collTrace->p2p[1].connIndex = p2pElems[1].connIndex; \
collTrace->p2pOpCount[1] = p2pElems[1].opCount; \
collTrace->p2p[1].ngroups = p2pElems[1].ngroups; \
collTrace->p2p[1].nWarps = p2pElems[1].nWarps; \
Expand All @@ -73,8 +73,6 @@
struct ncclWorkElem *elems = ncclShmem.work.elems; \
collTrace->opCount = elems[0].opCount; \
collTrace->coll.nWarps = elems[0].nWarps; \
collTrace->coll.bid = elems[0].bid; \
collTrace->coll.nChannels = elems[0].nChannels; \
collTrace->type = (launch_type) | ncclCollTraceCollElemType; \
} \
}
Expand Down
3 changes: 1 addition & 2 deletions src/device/op128.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ template<> __device__ __forceinline__ void st_global<0>(uintptr_t addr, BytePack
} \
template<> \
__device__ __forceinline__ void st_##space<bytes>(addr_cxx_ty addr, BytePack<bytes> value) { \
data_cxx_ty tmp = value.native; \
*((data_cxx_ty *)addr) = tmp; \
__builtin_nontemporal_store(value.native, (data_cxx_ty *)addr); \
}

// #if __CUDA_ARCH__ >= 700
Expand Down
7 changes: 6 additions & 1 deletion src/device/prims_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,16 @@ class Primitives<
if (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) ||
((flags & (Send*RoleWaitSend)) && !noSendWait)) {
int spins = 0;
static int repeat = 100;
while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) {
__builtin_amdgcn_s_sleep(1);
connStepCache = loadStepValue(connStepPtr);
if (checkAbort(spins)) break;
//if (spins == 0) printf("r=%d b=%d t=%d SPUN OUT got=%d want=%d\n", ncclShmem.comm.rank, blockIdx.x, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice));
if (spins == 0) traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice));
if (spins == 0 && repeat > 0) {
repeat --;
traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice));
}
}
__asm__ __volatile__("s_wakeup");
}
Expand Down Expand Up @@ -178,6 +182,7 @@ class Primitives<
if (flags & (Recv*RolePostRecv | Send*RolePostSend)) {
step += StepPerSlice;
STORE(connStepPtr, step);
traceData(__LINE__, threadIdx.x, (uint64_t)connStepPtr, step);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/device/sendrecv.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ struct RunWork<ncclFuncSendRecv, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE> {
runRecv<ProtoSimple<1,1,8>>(tid, nthreads, group, args);
#elif defined(__gfx908__) || defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__)
runRecv<ProtoSimple<1,1,4>>(tid, nthreads, group, args);
if (tid%WARP_SIZE == 0) traceData(__LINE__, threadIdx.x,
reinterpret_cast<uint64_t>(uint64_t(args->buffHi32)<<32 | args->buffLo32),
reinterpret_cast<uint64_t>(uint64_t(args->countHi32)<<32 | args->countLo32));
#else
runRecv<ProtoSimple<1,1>>(tid, nthreads, group, args);
#endif
Expand All @@ -222,6 +225,9 @@ struct RunWork<ncclFuncSendRecv, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE> {
runSend<ProtoSimple<1,1,8>>(tid, nthreads, group, args);
#elif defined(__gfx908__) || defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__)
runSend<ProtoSimple<1,1,4>>(tid, nthreads, group, args);
if (tid%WARP_SIZE == 0) traceData(__LINE__, threadIdx.x,
reinterpret_cast<uint64_t>(uint64_t(args->buffHi32)<<32 | args->buffLo32),
reinterpret_cast<uint64_t>(uint64_t(args->countHi32)<<32 | args->countLo32));
#else
runSend<ProtoSimple<1,1>>(tid, nthreads, group, args);
#endif
Expand Down
2 changes: 0 additions & 2 deletions src/include/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,6 @@ struct ncclCollTrace {
uint64_t data_1;
struct {
uint8_t nWarps;
uint8_t bid;
uint8_t nChannels;
} coll;
struct {
int16_t peer;
Expand Down
6 changes: 3 additions & 3 deletions src/init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ static constexpr int64_t defaultEnableMscclpp = 0;
RCCL_PARAM(MscclppEnabled, "MSCCLPP_ENABLE", defaultEnableMscclpp);

// GDRCOPY support: Off by default
NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 0);
NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 1);

// GDRCOPY support
gdr_t ncclGdrCopy = NULL;
Expand Down Expand Up @@ -260,7 +260,7 @@ void *ncclCommThreadMain(void *arg) {
sprintf(line, "## [%012.6f] [%02d:%02d] %06lx", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->opCount);
offset = strlen(line);
if (type == ncclCollTraceCollElemType) {
sprintf(line+offset, " CE %s nw %d bi %d nc %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks);
sprintf(line+offset, " CE %s nw %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, comm->busId, comm->nRanks);
} else if (type == ncclCollTraceP2pElemType) {
sprintf(line+offset, " PE %s %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d", funcNames[fIdx],
td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups,
Expand All @@ -275,7 +275,7 @@ void *ncclCommThreadMain(void *arg) {
sprintf(line+offset, " CL %s", funcNames[fIdx]);
offset = strlen(line);
if ((type&0xf0) == ncclCollTraceCollElemType)
sprintf(line+offset, " nw %d bi %d nc %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks);
sprintf(line+offset, " nw %d busId %lx nRanks %d", td->coll.nWarps, comm->busId, comm->nRanks);
else if ((type&0xf0) == ncclCollTraceP2pElemType)
sprintf(line+offset, " %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d",
td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups,
Expand Down
31 changes: 29 additions & 2 deletions src/transport/net.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,11 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph
NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, netId, 0, &req.useGdr));

// Determine whether we need to flush the GDR buffer on recv or not
if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush));
if (req.useGdr) {
NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush));
CUDACHECK(hipDeviceGetAttribute((int*)&req.curr_hdp_reg, hipDeviceAttributeHdpMemFlushCntl, myInfo->cudaDev));
recv->conn.curr_hdp_reg = req.curr_hdp_reg;
}

// We don't support PXN on receive yet
tpProxyRank = comm->topParentRanks[myInfo->rank];
Expand Down Expand Up @@ -654,6 +658,7 @@ static ncclResult_t recvProxySetup(struct ncclProxyConnection* connection, struc
resources->needFlush = req->needFlush;
resources->channelId = req->channelId;
resources->connIndex = req->connIndex;
resources->curr_hdp_reg = req->curr_hdp_reg;
ncclNetProperties_t props;
NCCLCHECK(proxyState->ncclNet->getProperties(req->netDev, &props));
/* DMA-BUF support */
Expand Down Expand Up @@ -1349,6 +1354,9 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
return ncclSuccess;
}

RCCL_PARAM(NetHdpFlush, "NET_HDP_FLUSH", 1);
RCCL_PARAM(NetGdrFlush, "NET_GDR_FLUSH", 1);

static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct ncclProxyArgs* args) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt++;
Expand Down Expand Up @@ -1538,16 +1546,31 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
if (totalSize > 0 && p == NCCL_PROTO_SIMPLE && needFlush) {
// GDRCOPY support
struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources);
if (rcclParamNetHdpFlush() && resources->curr_hdp_reg) {
static bool once = true;
__atomic_store_n(resources->curr_hdp_reg, 0x1, __ATOMIC_RELAXED);
int val = __atomic_load_n(resources->curr_hdp_reg, __ATOMIC_ACQUIRE);
if (once) {
once = false;
INFO(NCCL_INIT, "%s: flushed HDP %p val %d", __func__, resources->curr_hdp_reg, val);
}
}
if (resources->gdcFlush) {
#if defined (__x86_64__)
// Force a PCI-E read from GPU memory
static bool once = true;
asm volatile ("mov (%0), %%eax" :: "l"(resources->gdcFlush) : "%eax");
if (once) {
once = false;
INFO(NCCL_INIT, "%s: issued GDC flush", __func__);
}
#else
WARN("NET: GDR Flush only supported on x86_64");
return ncclInternalError;
#endif
} else {
} else if (rcclParamNetGdrFlush()) {
int subCount = 0;
static bool once = true;
for (int i=0; i<subGroup->groupSize; i++) {
struct ncclProxySubArgs* sub = subGroup + i;
if (step < sub->nsteps) {
Expand All @@ -1564,6 +1587,10 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
}
struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources);
NCCLCHECK(proxyState->ncclNet->iflush(resources->netRecvComm, subCount, ptrs, sizes, mhandles, subGroup->requests+(step%NCCL_STEPS)));
if (once) {
once = false;
INFO(NCCL_INIT, "%s: issued GDR flush", __func__);
}
}
}
args->idle = 0;
Expand Down