Skip to content

Commit

Permalink
ATL fix for verbs provider and some minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rajakrishi committed Oct 21, 2021
1 parent 42fc582 commit 90e0a46
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 51 deletions.
2 changes: 2 additions & 0 deletions src/atl_counters.tbl
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
ATL_COUNTER(fam_get_atomic)
ATL_COUNTER(fam_put_atomic)
ATL_COUNTER(fam_scatter_atomic)
ATL_COUNTER(fam_gather_atomic)
61 changes: 31 additions & 30 deletions src/fam_atl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace openfam {
#define RETURN_WITH_FAM_EXCEPTION \
} \
catch (Fam_Exception & e) { \
throw e; \
throw e; \
}

class ATLib::ATLimpl_ {
Expand Down Expand Up @@ -501,8 +501,8 @@ int atl_finalize() {

if (serverAddrName) free(serverAddrName);

if (defaultCtx != NULL)
delete defaultCtx;
//if (defaultCtx != NULL)
//delete defaultCtx;
return 0;
}

Expand Down Expand Up @@ -599,10 +599,11 @@ int fam_get_atomic(void *local, Fam_Descriptor *descriptor,

uint64_t nodeId = descriptor->get_memserver_id();
Fam_Context *ATLCtx = get_defaultCtx(nodeId);
//uint64_t ATLBaseAddr = (uint64_t)descriptor->get_base_address();
fi_context *ctx = fabric_post_response_buff(&retStatus,(*fiAddrs)[nodeId], ATLCtx,sizeof(retStatus));
ret = famCIS->get_atomic(globalDescriptor.regionId & REGIONID_MASK,
globalDescriptor.offset, offset, nbytes,
key, get_selfAddr(nodeId), get_selfAddrLen(nodeId),
key, (uint64_t)local, get_selfAddr(nodeId), get_selfAddrLen(nodeId),
nodeId, uid, gid);

if (ret == 0) {
Expand Down Expand Up @@ -664,7 +665,7 @@ int fam_put_atomic(void *local, Fam_Descriptor *descriptor,

ret = famCIS->put_atomic(globalDescriptor.regionId & REGIONID_MASK,
globalDescriptor.offset, offset, nbytes,
key, get_selfAddr(nodeId),get_selfAddrLen(nodeId),
key, (uint64_t) local, get_selfAddr(nodeId),get_selfAddrLen(nodeId),
(const char *)local, nodeId, uid, gid);

if ((ret == 0) && (nbytes > MAX_DATA_IN_MSG)) {
Expand All @@ -691,16 +692,16 @@ int fam_scatter_atomic(void *local, Fam_Descriptor *descriptor,
int32_t retStatus = -1;
fi_context *ctx = NULL;
Fam_Global_Descriptor globalDescriptor;
// FAM_CNTR_INC_API(fam_put_atomic);
// FAM_PROFILE_START_ALLOCATOR(fam_put_atomic);
ATL_CNTR_INC_API(fam_scatter_atomic);
ATL_PROFILE_START_ALLOCATOR(fam_scatter_atomic);
if ((local == NULL) || (descriptor == NULL) || (nElements == 0)) {
message << "Invalid Options";
THROW_ATL_ERR_MSG(ATL_Exception, message.str().c_str());
}

ret = validate_item(descriptor);
// FAM_PROFILE_END_ALLOCATOR(fam_put_atomic);
// FAM_PROFILE_START_OPS(fam_put_atomic);
ATL_PROFILE_END_ALLOCATOR(fam_scatter_atomic);
ATL_PROFILE_START_OPS(fam_scatter_atomic);
if (ret == 0) {
// Read data from FAM region with this key
globalDescriptor = descriptor->get_global_descriptor();
Expand All @@ -722,16 +723,16 @@ int fam_scatter_atomic(void *local, Fam_Descriptor *descriptor,

ret = famCIS->scatter_strided_atomic(
globalDescriptor.regionId & REGIONID_MASK, globalDescriptor.offset,
nElements, firstElement, stride, elementSize, key, get_selfAddr(nodeId),
nElements, firstElement, stride, elementSize, key, (uint64_t) local, get_selfAddr(nodeId),
get_selfAddrLen(nodeId), nodeId, uid, gid);

if (ret == 0) {
fabric_completion_wait(ATLCtx, ctx, 1);
ret = retStatus;
fabric_deregister_mr(mr);
}
// FAM_PROFILE_END_OPS(fam_put_atomic);
} // validate_item
ATL_PROFILE_END_OPS(fam_scatter_atomic);
}
return ret;
}

Expand All @@ -746,16 +747,16 @@ int fam_gather_atomic(void *local, Fam_Descriptor *descriptor,
int32_t retStatus = -1;
fi_context *ctx = NULL;
Fam_Global_Descriptor globalDescriptor;
// FAM_CNTR_INC_API(fam_put_atomic);
// FAM_PROFILE_START_ALLOCATOR(fam_put_atomic);
ATL_CNTR_INC_API(fam_gather_atomic);
ATL_PROFILE_START_ALLOCATOR(fam_gather_atomic);
if ((local == NULL) || (descriptor == NULL) || (nElements == 0)) {
message << "Invalid Options";
THROW_ATL_ERR_MSG(ATL_Exception, message.str().c_str());
}

ret = validate_item(descriptor);
// FAM_PROFILE_END_ALLOCATOR(fam_put_atomic);
// FAM_PROFILE_START_OPS(fam_put_atomic);
ATL_PROFILE_END_ALLOCATOR(fam_gather_atomic);
ATL_PROFILE_START_OPS(fam_gather_atomic);
if (ret == 0) {
// Read data from FAM region with this key
globalDescriptor = descriptor->get_global_descriptor();
Expand All @@ -777,14 +778,15 @@ int fam_gather_atomic(void *local, Fam_Descriptor *descriptor,

ret = famCIS->gather_strided_atomic(
globalDescriptor.regionId & REGIONID_MASK, globalDescriptor.offset,
nElements, firstElement, stride, elementSize, key, get_selfAddr(nodeId),
nElements, firstElement, stride, elementSize, key, (uint64_t) local, get_selfAddr(nodeId),
get_selfAddrLen(nodeId), nodeId, uid, gid);

if (ret == 0) {
fabric_completion_wait(ATLCtx, ctx, 1);
ret = retStatus;
fabric_deregister_mr(mr);
}
ATL_PROFILE_END_OPS(fam_gather_atomic);
// FAM_PROFILE_END_OPS(fam_put_atomic);
} // validate_item
return ret;
Expand All @@ -801,16 +803,16 @@ int fam_scatter_atomic(void *local, Fam_Descriptor *descriptor,
int32_t retStatus = -1;
fi_context *ctx = NULL;
Fam_Global_Descriptor globalDescriptor;
// FAM_CNTR_INC_API(fam_put_atomic);
// FAM_PROFILE_START_ALLOCATOR(fam_put_atomic);
ATL_CNTR_INC_API(fam_scatter_atomic);
ATL_PROFILE_START_ALLOCATOR(fam_scatter_atomic);
if ((local == NULL) || (descriptor == NULL) || (nElements == 0)) {
message << "Invalid Options";
THROW_ATL_ERR_MSG(ATL_Exception, message.str().c_str());
}

ret = validate_item(descriptor);
// FAM_PROFILE_END_ALLOCATOR(fam_put_atomic);
// FAM_PROFILE_START_OPS(fam_put_atomic);
ATL_PROFILE_END_ALLOCATOR(fam_scatter_atomic);
ATL_PROFILE_START_OPS(fam_scatter_atomic);
if (ret == 0) {
// Read data from FAM region with this key
globalDescriptor = descriptor->get_global_descriptor();
Expand Down Expand Up @@ -842,15 +844,15 @@ int fam_scatter_atomic(void *local, Fam_Descriptor *descriptor,

ret = famCIS->scatter_indexed_atomic(
globalDescriptor.regionId & REGIONID_MASK, globalDescriptor.offset,
nElements, string(indexStr.str()).c_str(), elementSize, key,
nElements, string(indexStr.str()).c_str(), elementSize, key, (uint64_t) local,
get_selfAddr(nodeId), get_selfAddrLen(nodeId), nodeId, uid, gid);

if (ret == 0) {
fabric_completion_wait(ATLCtx, ctx, 1);
ret = retStatus;
fabric_deregister_mr(mr);
}
// FAM_PROFILE_END_OPS(fam_put_atomic);
ATL_PROFILE_END_OPS(fam_scatter_atomic);
} // validate_item
return ret;
}
Expand All @@ -865,16 +867,16 @@ int fam_gather_atomic(void *local, Fam_Descriptor *descriptor,
int32_t retStatus = -1;
fi_context *ctx = NULL;
Fam_Global_Descriptor globalDescriptor;
// FAM_CNTR_INC_API(fam_put_atomic);
// FAM_PROFILE_START_ALLOCATOR(fam_put_atomic);
ATL_CNTR_INC_API(fam_gather_atomic);
ATL_PROFILE_START_ALLOCATOR(fam_gather_atomic);
if ((local == NULL) || (descriptor == NULL) || (nElements == 0)) {
message << "Invalid Options";
THROW_ATL_ERR_MSG(ATL_Exception, message.str().c_str());
}

ret = validate_item(descriptor);
// FAM_PROFILE_END_ALLOCATOR(fam_put_atomic);
// FAM_PROFILE_START_OPS(fam_put_atomic);
ATL_PROFILE_END_ALLOCATOR(fam_gather_atomic);
ATL_PROFILE_START_OPS(fam_gather_atomic);
if (ret == 0) {
// Read data from FAM region with this key
globalDescriptor = descriptor->get_global_descriptor();
Expand Down Expand Up @@ -906,15 +908,15 @@ int fam_gather_atomic(void *local, Fam_Descriptor *descriptor,

ret = famCIS->gather_indexed_atomic(
globalDescriptor.regionId & REGIONID_MASK, globalDescriptor.offset,
nElements, string(indexStr.str()).c_str(), elementSize, key,
nElements, string(indexStr.str()).c_str(), elementSize, key, (uint64_t) local,
get_selfAddr(nodeId), get_selfAddrLen(nodeId), nodeId, uid, gid);

if (ret == 0) {
fabric_completion_wait(ATLCtx, ctx, 1);
ret = retStatus;
fabric_deregister_mr(mr);
}
// FAM_PROFILE_END_OPS(fam_put_atomic);
ATL_PROFILE_END_OPS(fam_gather_atomic);
} // validate_item
return ret;
}
Expand Down Expand Up @@ -1017,4 +1019,3 @@ ATLib::~ATLib() {
delete pATLimpl_;
}
} //namespace

15 changes: 12 additions & 3 deletions test/atl_multi_get_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ int main() {
dataRegion = my_fam->fam_lookup_region(DATA_REGION);
} catch (Fam_Exception &e) {
cout << "data Region not found" << endl;
dataRegion = my_fam->fam_create_region(DATA_REGION, 1048576, 0777, RAID1);
dataRegion = my_fam->fam_create_region(DATA_REGION, 1048576, 0777, NULL);
}
char msg1[200] = {0};
char msg2[200] = {0};
Expand All @@ -115,12 +115,21 @@ int main() {
for (i = 0; i < 200; i++)
msg1[i] = 'X';
auto start = std::chrono::high_resolution_clock::now();
myatlib->fam_put_atomic((void *)msg1, item1, 0, 200);
try {
myatlib->fam_put_atomic((void *)msg1, item1, 0, 200);
} catch(Fam_Exception &e) {
cout << "fam_put_atomic failed" << e.fam_error_msg() << endl;
}

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed_seconds = end - start;
cout << "put atomic elapsed time: " << elapsed_seconds.count() << endl;
start = std::chrono::high_resolution_clock::now();
myatlib->fam_get_atomic((void *)msg2, item1, 0, 200); // strlen(msg1));
try {
myatlib->fam_get_atomic((void *)msg2, item1, 0, 200); // strlen(msg1));
} catch(Fam_Exception &e) {
cout << "fam_get_atomic failed" << e.fam_error_msg() << endl;
}
end = std::chrono::high_resolution_clock::now();
elapsed_seconds = end - start;
cout << "get atomic elapsed time: " << elapsed_seconds.count() << endl;
Expand Down
15 changes: 12 additions & 3 deletions test/atl_parallel_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ int main(int argc, char *argv[]) {
dataRegion = my_fam->fam_lookup_region(DATA_REGION);
} catch (Fam_Exception &e) {
cout << "data Region not found" << endl;
dataRegion = my_fam->fam_create_region(DATA_REGION, 1048576, 0777, RAID1);
dataRegion = my_fam->fam_create_region(DATA_REGION, 1048576, 0777, NULL);
}
char msg1[200] = {0};
char msg2[200] = {0};
Expand All @@ -120,12 +120,21 @@ int main(int argc, char *argv[]) {
auto start = std::chrono::high_resolution_clock::now();
for (i = 0; i < NUM_ITERATIONS; i++) {
compflag = false;
myatlib->fam_put_atomic((void *)msg1, item1, 0, 200);
try {
myatlib->fam_put_atomic((void *)msg1, item1, 0, 200);
} catch(Fam_Exception &e) {
cout << "fam_put_atomic failed" << e.fam_error_msg() << endl;
}
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed_seconds = end - start;
cout << "put atomic elapsed time: " << elapsed_seconds.count() << endl;
start = std::chrono::high_resolution_clock::now();
myatlib->fam_get_atomic((void *)msg2, item1, 0, 200);
try {
myatlib->fam_get_atomic((void *)msg2, item1, 0, 200);
} catch(Fam_Exception &e) {
cout << "fam_get_atomic failed" << e.fam_error_msg() << endl;
}

end = std::chrono::high_resolution_clock::now();
elapsed_seconds = end - start;
cout << "get atomic elapsed time: " << elapsed_seconds.count() << endl;
Expand Down
46 changes: 38 additions & 8 deletions test/fam_SG_ATL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ int main() {
dataRegion = my_fam->fam_lookup_region(DATA_REGION);
} catch (Fam_Exception &e) {
cout << "data Region not found" << endl;
dataRegion = my_fam->fam_create_region(DATA_REGION, 1048576, 0777, RAID1);
dataRegion = my_fam->fam_create_region(DATA_REGION, 1048576, 0777, NULL);
}
char msg1[200] = {0};
char msg2[200] = {0};
Expand All @@ -108,17 +108,26 @@ int main() {
for (i = 0; i < 200; i++)
msg1[i] = 'X';
auto start = std::chrono::high_resolution_clock::now();
myatlib->fam_put_atomic((void *)msg1, item1, 0, 200); // strlen(msg1));
try {
myatlib->fam_put_atomic((void *)msg1, item1, 0, 200); // strlen(msg1));
} catch(Fam_Exception &e) {
cout << "fam_put_atomic failed" << e.fam_error_msg() << endl;
}

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed_seconds = end - start;
cout << "put atomic elapsed time: " << elapsed_seconds.count() << endl;
start = std::chrono::high_resolution_clock::now();
myatlib->fam_get_atomic((void *)msg2, item1, 0, 200); // strlen(msg1));
try {
myatlib->fam_get_atomic((void *)msg2, item1, 0, 200); // strlen(msg1));
} catch(Fam_Exception &e) {
cout << "fam_get_atomic failed" << e.fam_error_msg() << endl;
}
end = std::chrono::high_resolution_clock::now();
elapsed_seconds = end - start;
cout << "get atomic elapsed time: " << elapsed_seconds.count() << endl;
cout << msg2 << endl;
if (strcmp(msg1, msg2) != 0)
if (strncmp(msg1, msg2, 200) != 0)
cout << "Test1: Comparison of full string failed" << endl;
else
cout << "Test1: Comparison of full string successful" << endl;
Expand All @@ -128,17 +137,31 @@ int main() {
// sleep(30);
cout << "Scatter atomic - strided" << endl;
start = std::chrono::high_resolution_clock::now();
myatlib->fam_scatter_atomic(msg1, item1, 5, 1, 2, 2);
try {
myatlib->fam_scatter_atomic(msg1, item1, 5, 1, 2, 2);
} catch (Fam_Exception &e) {
cout << "Scatter atomic not found" << e.fam_error_msg() << endl;
}

end = std::chrono::high_resolution_clock::now();
elapsed_seconds = end - start;
cout << "Scatter strided atomic elapsed time: " << elapsed_seconds.count()
<< endl;
myatlib->fam_get_atomic((void *)msg2, item1, 0, 200);
try {
myatlib->fam_get_atomic((void *)msg2, item1, 0, 200);
} catch(Fam_Exception &e) {
cout << "fam_get_atomic failed" << e.fam_error_msg() << endl;
}

cout << msg2 << endl;
cout << "Gather atomic - strided" << endl;
memset(msg2, 0, 200);
start = std::chrono::high_resolution_clock::now();
myatlib->fam_gather_atomic(msg2, item1, 5, 1, 2, 2);
try {
myatlib->fam_gather_atomic(msg2, item1, 5, 1, 2, 2);
} catch (Fam_Exception &e) {
cout << " Gather Failed" << e.fam_error_msg() << endl;
}
end = std::chrono::high_resolution_clock::now();
elapsed_seconds = end - start;
cout << "Gather strided atomic elapsed time: " << elapsed_seconds.count()
Expand All @@ -153,7 +176,11 @@ int main() {
msg1[i] = 'Z';
uint64_t indexes[] = {10, 17, 13, 15, 30};
start = std::chrono::high_resolution_clock::now();
myatlib->fam_scatter_atomic(msg1, item1, 5, indexes, 2);
try {
myatlib->fam_scatter_atomic(msg1, item1, 5, indexes, 2);
} catch (Fam_Exception &e) {
cout << " Gather Failed" << e.fam_error_msg() << endl;
}
end = std::chrono::high_resolution_clock::now();
elapsed_seconds = end - start;
cout << "Scatter indexed atomic elapsed time: " << elapsed_seconds.count()
Expand All @@ -164,6 +191,9 @@ int main() {
memset(msg2, 0, 200);
start = std::chrono::high_resolution_clock::now();
myatlib->fam_gather_atomic(msg2, item1, 5, indexes, 2);
} catch (Fam_Exception &e) {
cout << " Gather Failed" << e.fam_error_msg() << endl;
}
end = std::chrono::high_resolution_clock::now();
elapsed_seconds = end - start;
cout << "Gather indexed atomic elapsed time: " << elapsed_seconds.count()
Expand Down
Loading

0 comments on commit 90e0a46

Please sign in to comment.