Skip to content

Commit

Permalink
Temporary solutions in order to work around last errors, new issue re…
Browse files Browse the repository at this point in the history
…lated to arguments to HitProcessingLoop
  • Loading branch information
JuanGonzalezCaminero committed Dec 16, 2024
1 parent c2df2ed commit 6f60851
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 88 deletions.
138 changes: 78 additions & 60 deletions include/AdePT/core/AsyncAdePTTransport.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

///////////////////////////////////////////////////////////////////////////////////////////////////

// TODO: This include can't be here (because it also includes the implementation, and this
// can't be compiled with nvcc, all implementations of AsyncAdePTTransport members need to become
// TODO: This include can't be here (because it also includes the implementation, and this
// can't be compiled with nvcc, all implementations of AsyncAdePTTransport members need to become
// free functions)
// #include <AdePT/core/AsyncAdePTTransport.hh>

///////////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////////

#include <AdePT/core/AsyncAdePTTransportStruct.cuh>
#include <AdePT/core/AsyncAdePTTransportStruct.hh>
Expand Down Expand Up @@ -436,10 +436,11 @@ void FlushScoring(AdePTScoring &scoring)
/// If successful, this will initialise the member fGPUState.
/// If memory allocation fails, an exception is thrown. In this case, the caller has to
/// try again after some wait time or with less transport slots.
GPUstate *InitializeGPU(int trackCapacity, int scoringCapacity, int numThreads, TrackBuffer& trackBuffer)
GPUstate *InitializeGPU(int trackCapacity, int scoringCapacity, int numThreads, TrackBuffer &trackBuffer,
std::vector<AdePTScoring *> &scoring)
{
// auto gpuState_ptr = std::make_unique<GPUstate>();
auto gpuState_ptr = new GPUstate();
auto gpuState_ptr = new GPUstate();
GPUstate &gpuState = *gpuState_ptr;

// Allocate structures to manage tracks of an implicit type:
Expand Down Expand Up @@ -519,10 +520,10 @@ GPUstate *InitializeGPU(int trackCapacity, int scoringCapacity, int numThreads,
// init scoring structures
gpuMalloc(gpuState.fScoring_dev, numThreads);

fScoring->clear();
fScoring->reserve(numThreads);
scoring.clear();
scoring.reserve(numThreads);
for (unsigned int i = 0; i < numThreads; ++i) {
fScoring->emplace_back(gpuState.fScoring_dev + i);
scoring.emplace_back(gpuState.fScoring_dev + i);
}
gpuState.fHitScoring.reset(new HitScoring(scoringCapacity, numThreads));

Expand Down Expand Up @@ -577,8 +578,9 @@ void ReturnTracksToG4(TrackBuffer &trackBuffer, GPUstate &gpuState, std::vector<
#endif
}

void HitProcessingLoop(HitProcessingContext *const context, GPUstate *gpuState, std::vector<std::atomic<EventState>> &eventStates,
std::condition_variable& cvG4Workers)
void HitProcessingLoop(HitProcessingContext *const context, GPUstate *gpuState,
std::vector<std::atomic<EventState>> &eventStates,
std::condition_variable &cvG4Workers)
{
while (context->keepRunning) {
std::unique_lock lock(context->mutex);
Expand All @@ -594,15 +596,17 @@ void HitProcessingLoop(HitProcessingContext *const context, GPUstate *gpuState,
}
}

void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, TrackBuffer& trackBuffer, GPUstate *gpuStatePtr,
std::vector<std::atomic<EventState>> &eventStates, std::condition_variable& cvG4Workers, int adeptSeed)
void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, TrackBuffer &trackBuffer,
GPUstate *gpuStatePtr, std::vector<std::atomic<EventState>> &eventStates,
std::condition_variable &cvG4Workers, std::vector<AdePTScoring *> &scoring,
int adeptSeed)
{
// NVTXTracer tracer{"TransportLoop"};

// Initialise the transport engine:
do {
try {
gpuStatePtr = InitializeGPU(trackCapacity, scoringCapacity, numThreads, trackBuffer);
gpuStatePtr = InitializeGPU(trackCapacity, scoringCapacity, numThreads, trackBuffer, scoring);
} catch (std::invalid_argument &exc) {
// Clear error state:
auto result = cudaGetLastError();
Expand Down Expand Up @@ -643,7 +647,7 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track
};

std::unique_ptr<HitProcessingContext> hitProcessing{new HitProcessingContext{transferStream}};
std::thread hitProcessingThread{&HitProcessingLoop, hitProcessing.get(), gpuState, eventStates, cvG4Workers};
std::thread hitProcessingThread{&HitProcessingLoop, hitProcessing.get(), &gpuState, std::ref(eventStates), std::ref(cvG4Workers)};

auto computeThreadsAndBlocks = [](unsigned int nParticles) -> std::pair<unsigned int, unsigned int> {
constexpr int TransportThreads = 256;
Expand Down Expand Up @@ -682,30 +686,30 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track

COPCORE_CUDA_CHECK(cudaStreamSynchronize(gpuState.stream));

// #ifdef USE_NVTX
// std::map<AsyncAdePTTransport::EventState, std::string> stateMap{
// {EventState::NewTracksFromG4, "NewTracksFromG4"},
// {EventState::G4RequestsFlush, "G4RequestsFlush"},
// {EventState::Inject, "Inject"},
// {EventState::InjectionCompleted, "InjectionCompleted"},
// {EventState::Transporting, "Transporting"},
// {EventState::WaitingForTransportToFinish, "WaitingForTransportToFinish"},
// {EventState::RequestHitFlush, "RequestHitFlush"},
// {EventState::HitsFlushed, "HitsFlushed"},
// {EventState::FlushingTracks, "FlushingTracks"},
// {EventState::DeviceFlushed, "DeviceFlushed"},
// {EventState::LeakedTracksRetrieved, "LeakedTracksRetrieved"},
// {EventState::ScoringRetrieved, "ScoringRetrieved"}};
// #endif
// #ifdef USE_NVTX
// std::map<AsyncAdePTTransport::EventState, std::string> stateMap{
// {EventState::NewTracksFromG4, "NewTracksFromG4"},
// {EventState::G4RequestsFlush, "G4RequestsFlush"},
// {EventState::Inject, "Inject"},
// {EventState::InjectionCompleted, "InjectionCompleted"},
// {EventState::Transporting, "Transporting"},
// {EventState::WaitingForTransportToFinish, "WaitingForTransportToFinish"},
// {EventState::RequestHitFlush, "RequestHitFlush"},
// {EventState::HitsFlushed, "HitsFlushed"},
// {EventState::FlushingTracks, "FlushingTracks"},
// {EventState::DeviceFlushed, "DeviceFlushed"},
// {EventState::LeakedTracksRetrieved, "LeakedTracksRetrieved"},
// {EventState::ScoringRetrieved, "ScoringRetrieved"}};
// #endif

for (unsigned int iteration = 0;
inFlight > 0 || gpuState.injectState != InjectState::Idle || gpuState.extractState != ExtractState::Idle ||
std::any_of(eventStates.begin(), eventStates.end(), needTransport);
++iteration) {
// #ifdef USE_NVTX
// nvtx1.setTag(stateMap[eventStates[0].load()].data());
// nvtx2.setTag(stateMap[eventStates[1].load()].data());
// #endif
// #ifdef USE_NVTX
// nvtx1.setTag(stateMap[eventStates[0].load()].data());
// nvtx2.setTag(stateMap[eventStates[1].load()].data());
// #endif

// Swap the queues for the next iteration.
electrons.queues.SwapActive();
Expand Down Expand Up @@ -746,10 +750,10 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track

// copy buffer of tracks to device
// TODO: ResourceManager
COPCORE_CUDA_CHECK(cudaMemcpyAsync(trackBuffer.toDevice_dev, toDevice.tracks,
// COPCORE_CUDA_CHECK(cudaMemcpyAsync(trackBuffer.toDevice_dev.get(), toDevice.tracks,
nInject * sizeof(TrackDataWithIDs), cudaMemcpyHostToDevice,
transferStream));
COPCORE_CUDA_CHECK(
cudaMemcpyAsync(trackBuffer.toDevice_dev, toDevice.tracks,
// COPCORE_CUDA_CHECK(cudaMemcpyAsync(trackBuffer.toDevice_dev.get(), toDevice.tracks,
nInject * sizeof(TrackDataWithIDs), cudaMemcpyHostToDevice, transferStream));
// Mark end of copy operation:
COPCORE_CUDA_CHECK(cudaEventRecord(cudaEvent, transferStream));

Expand Down Expand Up @@ -873,13 +877,16 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track
constexpr unsigned int block_size = 128;
const unsigned int grid_size = (trackBuffer.fNumFromDevice + block_size - 1) / block_size;
// TODO: ResourceManager
FillFromDeviceBuffer<<<grid_size, block_size, 0, transferStream>>>(allLeaked, trackBuffer.fromDevice_dev,
// FillFromDeviceBuffer<<<grid_size, block_size, 0, transferStream>>>(allLeaked, trackBuffer.fromDevice_dev.get(),
trackBuffer.fNumFromDevice);
FillFromDeviceBuffer<<<grid_size, block_size, 0, transferStream>>>(
allLeaked, trackBuffer.fromDevice_dev,
// FillFromDeviceBuffer<<<grid_size, block_size, 0, transferStream>>>(allLeaked,
// trackBuffer.fromDevice_dev.get(),
trackBuffer.fNumFromDevice);
// TODO: ResourceManager
COPCORE_CUDA_CHECK(cudaMemcpyFromSymbolAsync(trackBuffer.nFromDevice_host, nFromDevice_dev,
// COPCORE_CUDA_CHECK(cudaMemcpyFromSymbolAsync(trackBuffer.nFromDevice_host.get(), nFromDevice_dev,
sizeof(unsigned int), 0, cudaMemcpyDeviceToHost, transferStream));
COPCORE_CUDA_CHECK(cudaMemcpyFromSymbolAsync(
trackBuffer.nFromDevice_host, nFromDevice_dev,
// COPCORE_CUDA_CHECK(cudaMemcpyFromSymbolAsync(trackBuffer.nFromDevice_host.get(), nFromDevice_dev,
sizeof(unsigned int), 0, cudaMemcpyDeviceToHost, transferStream));
COPCORE_CUDA_CHECK(cudaLaunchHostFunc(
transferStream,
[](void *arg) { (*static_cast<decltype(GPUstate::extractState) *>(arg)) = ExtractState::ReadyToCopy; },
Expand All @@ -896,14 +903,24 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track
if (gpuState.extractState == ExtractState::ReadyToCopy) {
gpuState.extractState = ExtractState::CopyToHost;
// TODO: ResourceManager
COPCORE_CUDA_CHECK(cudaMemcpyAsync(trackBuffer.fromDevice_host, trackBuffer.fromDevice_dev,
// COPCORE_CUDA_CHECK(cudaMemcpyAsync(trackBuffer.fromDevice_host.get(), trackBuffer.fromDevice_dev.get(),
(*trackBuffer.nFromDevice_host) * sizeof(TrackDataWithIDs),
cudaMemcpyDeviceToHost, transferStream));
COPCORE_CUDA_CHECK(cudaMemcpyAsync(
trackBuffer.fromDevice_host, trackBuffer.fromDevice_dev,
// COPCORE_CUDA_CHECK(cudaMemcpyAsync(trackBuffer.fromDevice_host.get(), trackBuffer.fromDevice_dev.get(),
(*trackBuffer.nFromDevice_host) * sizeof(TrackDataWithIDs), cudaMemcpyDeviceToHost, transferStream));


ReturnAux temp{&trackBuffer, &gpuState, &eventStates};


COPCORE_CUDA_CHECK(cudaLaunchHostFunc(
transferStream,
[&trackBuffer, &gpuState, &eventStates](void *unused) { ReturnTracksToG4(trackBuffer, gpuState, eventStates); },
nullptr));
transferStream,
[](void *temp) {
// static_cast<ReturnAux*>(temp);
ReturnTracksToG4(*static_cast<ReturnAux*>(temp)->trackBuffer,
*static_cast<ReturnAux*>(temp)->gpuState,
*static_cast<ReturnAux*>(temp)->eventStates);
},
&temp));
}

// -------------------------
Expand Down Expand Up @@ -1067,10 +1084,12 @@ std::shared_ptr<const std::vector<GPUHit>> GetGPUHits(unsigned int threadId, GPU
return gpuState->fHitScoring->GetNextHitsVector(threadId);
}

std::thread LaunchGPUWorker(int trackCapacity, int scoringCapacity, int numThreads, TrackBuffer& trackBuffer, GPUstate *gpuStatePtr,
std::vector<std::atomic<EventState>> &eventStates, std::condition_variable& cvG4Workers, int adeptSeed)
std::thread LaunchGPUWorker(int trackCapacity, int scoringCapacity, int numThreads, TrackBuffer &trackBuffer,
GPUstate *gpuStatePtr, std::vector<std::atomic<EventState>> &eventStates,
std::condition_variable &cvG4Workers, std::vector<AdePTScoring *> &scoring, int adeptSeed)
{
return std::thread{&TransportLoop, trackCapacity, scoringCapacity, numThreads, trackBuffer, *gpuStatePtr, eventStates, adeptSeed};
return std::thread{&TransportLoop, trackCapacity, scoringCapacity, numThreads, trackBuffer,
*gpuStatePtr, eventStates, scoring, adeptSeed};
}

void FreeGPU(GPUstate &gpuState, G4HepEmState &g4hepem_state, std::thread &gpuWorker)
Expand All @@ -1085,7 +1104,7 @@ void FreeGPU(GPUstate &gpuState, G4HepEmState &g4hepem_state, std::thread &gpuWo
// Free resources.
// TODO: Try to use ResourceManager for this pointer
// gpuState.reset();
cudaFree(gpuState);
cudaFree(&gpuState);

// TODO: GPUstate is no longer a unique_ptr inside AsyncAdePTTransport,
// check if there's any further cleanup required
Expand Down Expand Up @@ -1124,17 +1143,17 @@ TrackBuffer::TrackBuffer(unsigned int numToDevice, unsigned int numFromDevice, u
// Double buffer for lock-free host runs:
COPCORE_CUDA_CHECK(cudaMallocHost(&hostPtr, 2 * numToDevice * sizeof(TrackDataWithIDs)));
COPCORE_CUDA_CHECK(cudaMalloc(&devPtr, numToDevice * sizeof(TrackDataWithIDs)));
//TODO: Check whether we can use ResourceManager

// TODO: Check whether we can use ResourceManager
toDevice_host = hostPtr;
toDevice_dev = devPtr;
toDevice_dev = devPtr;
// toDevice_host.reset(hostPtr);
// toDevice_dev.reset(devPtr);

COPCORE_CUDA_CHECK(cudaMallocHost(&hostPtr, numFromDevice * sizeof(TrackDataWithIDs)));
COPCORE_CUDA_CHECK(cudaMalloc(&devPtr, numFromDevice * sizeof(TrackDataWithIDs)));
fromDevice_host = hostPtr;
fromDevice_dev = devPtr;
fromDevice_dev = devPtr;
// fromDevice_host.reset(hostPtr);
// fromDevice_dev.reset(devPtr);

Expand All @@ -1143,7 +1162,7 @@ TrackBuffer::TrackBuffer(unsigned int numToDevice, unsigned int numFromDevice, u
nFromDevice_host = nFromDevice;
// nFromDevice_host.reset(nFromDevice);

toDeviceBuffer[0].tracks = toDevice_host;
toDeviceBuffer[0].tracks = toDevice_host;
// toDeviceBuffer[0].tracks = toDevice_host.get();
toDeviceBuffer[0].maxTracks = numToDevice;
toDeviceBuffer[0].nTrack = 0;
Expand All @@ -1160,5 +1179,4 @@ TrackBuffer::TrackBuffer(unsigned int numToDevice, unsigned int numFromDevice, u
// FreeGPU();
// }


#endif // ASYNC_ADEPT_TRANSPORT_CUH
Loading

0 comments on commit 6f60851

Please sign in to comment.