Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds distributed row gatherer #1589

Open
wants to merge 10 commits into
base: neighborhood-communicator
Choose a base branch
from

Conversation

MarcelKoch
Copy link
Member

@MarcelKoch MarcelKoch commented Apr 4, 2024

This PR adds a distributed row gatherer. This operator essentially provides the communication required in our matrix apply.

Besides the normal apply (which is blocking), it also provides two asynchronous calls. One version has an additional workspace parameter which is used as send buffer. This version can be called multiple times without restrictions, if different workspaces are used for each call. The other version doesn't have a workspace parameter, and instead uses an internal buffer. As a consequence, this function can only be called a second time, if the request of the previous call has been waited on. Otherwise, this function will throw.

This is the second part of splitting up #1546.

It also introduces some intermediate changes, which could be extracted out beforehand:

PR Stack:

@MarcelKoch MarcelKoch self-assigned this Apr 4, 2024
@ginkgo-bot ginkgo-bot added reg:build This is related to the build system. reg:testing This is related to testing. mod:core This is related to the core module. type:matrix-format This is related to the Matrix formats labels Apr 4, 2024
@MarcelKoch MarcelKoch requested a review from pratikvn April 4, 2024 10:49
@MarcelKoch MarcelKoch force-pushed the distributed-row-gatherer branch 2 times, most recently from 49557f1 to 4a79442 Compare April 5, 2024 08:18
@MarcelKoch MarcelKoch modified the milestone: Ginkgo 1.8.0 Apr 5, 2024
@MarcelKoch MarcelKoch requested a review from upsj April 19, 2024 09:20
@MarcelKoch MarcelKoch mentioned this pull request Apr 19, 2024
7 tasks
@MarcelKoch MarcelKoch force-pushed the distributed-row-gatherer branch 2 times, most recently from 98fa10a to 79de4c3 Compare April 19, 2024 16:19
@MarcelKoch
Copy link
Member Author

One issue that I have is the constructor. It takes a collective_communicator and an index_map. The index_map already defines the communication pattern, so the collective_communicator has to match that.
One option might be to have a virtual function like

std::unique_ptr<collective_communicator> create_with_same_type(communicator, index_map);

If I can't come up with anything better, I guess I will use that.

@pratikvn
Copy link
Member

Do we need to have the std::future setup for the release ? Can we remove that for now and just use a normal synchronous approach ? I think that is a significant change that maybe needs more thought and probably a separate PR.

@MarcelKoch MarcelKoch force-pushed the distributed-row-gatherer branch 2 times, most recently from 98dcc4f to 5e970e9 Compare July 18, 2024 17:00
Copy link
Member

@pratikvn pratikvn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice work! LGTM!

core/distributed/row_gatherer.cpp Outdated Show resolved Hide resolved
int is_inactive;
MPI_Status status;
GKO_ASSERT_NO_MPI_ERRORS(
MPI_Request_get_status(req_listener_, &is_inactive, &status));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we maybe move this MPI function into mpi.hpp and create a wrapper for it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't really work here, since this function would be a member function of request, but I'm using a bare MPI_Request (and can't use request, because it will try to free the request in the destructor), so it would not be applicable.


mutable array<char> send_workspace_;

mutable MPI_Request req_listener_{MPI_REQUEST_NULL};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be of type mpi::request ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because the destructor of mpi::request will try to free the request. But req_listenser_ doesn't own any requests, so the program would crash.

template <typename LocalIndexType>
void RowGatherer<LocalIndexType>::apply_impl(const LinOp* alpha, const LinOp* b,
const LinOp* beta, LinOp* x) const
GKO_NOT_IMPLEMENTED;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can also implement the advanced apply by replacing b_local->row_gather(idxs, buffer) by b_local->row_gather(alpha, idxs, beta, buffer) ?

send_sizes.data(), send_offsets.data(), type, recv_ptr,
recv_sizes.data(), recv_offsets.data(), type);
coll_comm
->i_all_to_all_v(use_host_buffer ? exec->get_master() : exec, send_ptr,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any difference between using all_to_all_v vs i_all_to_all_v? I assume all_to_all_v also update the interface

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all_to_all_v is a blocking call, while i_all_to_all_v is non-blocking. Right now the collective_communicator only provides the non-blocking interface, since it is more general.

include/ginkgo/core/distributed/row_gatherer.hpp Outdated Show resolved Hide resolved
* auto x = matrix::Dense<double>::create(...);
*
* auto future = rg->apply_async(b, x);
* // do some computation that doesn't modify b, or access x
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it access x but it is unclear when it will be accessed before the wait

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this just meant to say that you can't expect any meaningful data when accessing x before the wait has completed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get it wrong.
Is the comment here to describe that user can do something safely after the call or the apply_async behavior?
My comment was based on that it is the behavior of the apply_async because apply_async definitely accesses x.
If it is for user action during async and wait, then it is correct.

core/distributed/row_gatherer.cpp Outdated Show resolved Hide resolved
Comment on lines +98 to +102
workspace.set_executor(mpi_exec);
if (send_size_in_bytes > workspace.get_size()) {
workspace.resize_and_reset(sizeof(ValueType) *
send_size[0] * send_size[1]);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combining them to assign the workspace directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combine how? Do you mean like

workspace = array<char>(mpi_exec, sizeof(ValueType) * send_size[0] * send_size[1]);

Comment on lines +118 to +119
req = coll_comm_->i_all_to_all_v(
mpi_exec, send_ptr, type.get(), recv_ptr, type.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_buffer might be on the host but the recv_ptr(x_local) might be on the device

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a check above to ensure that the memory space of the recv buffer is accessible from the mpi executor. So if GPU aware MPI is used, it should work (even if send buffer is on the host and recv buffer in the device or vice versa). Otherwise an exception will be thrown.

core/test/mpi/distributed/row_gatherer.cpp Outdated Show resolved Hide resolved
core/test/mpi/distributed/row_gatherer.cpp Outdated Show resolved Hide resolved
MarcelKoch and others added 10 commits October 23, 2024 15:32
- only allocate if necessary
- synchronize correct executor

Co-authored-by: Pratik Nayak <pratik.nayak@kit.edu>
- split tests into core and backend part
- fix formatting
- fix openmpi pre 4.1.x macro

Co-authored-by: Pratik Nayak <pratik.nayak4@gmail.com>
Co-authored-by: Yu-Hsiang M. Tsai <yhmtsai@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1:ST:ready-for-review This PR is ready for review mod:core This is related to the core module. reg:build This is related to the build system. reg:testing This is related to testing. type:matrix-format This is related to the Matrix formats
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants