diff --git a/ompi/mca/coll/basic/coll_basic_alltoallw.c b/ompi/mca/coll/basic/coll_basic_alltoallw.c index 9470d4ac11c..fe753b34e74 100644 --- a/ompi/mca/coll/basic/coll_basic_alltoallw.c +++ b/ompi/mca/coll/basic/coll_basic_alltoallw.c @@ -31,6 +31,7 @@ #include "mpi.h" #include "ompi/constants.h" #include "ompi/datatype/ompi_datatype.h" +#include "opal/datatype/opal_convertor_internal.h" #include "ompi/mca/coll/coll.h" #include "ompi/mca/coll/base/coll_tags.h" #include "ompi/mca/pml/pml.h" @@ -42,12 +43,11 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con struct ompi_communicator_t *comm, mca_coll_base_module_t *module) { - int i, j, size, rank, err = MPI_SUCCESS, max_size; + int i, j, size, rank, err = MPI_SUCCESS; ompi_request_t *req; char *save_buffer = NULL; - ptrdiff_t ext, gap = 0; - - /* Initialize. */ + size_t max_size = 0, packed_size; + opal_convertor_t convertor; size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); @@ -57,11 +57,14 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con return MPI_SUCCESS; } - /* Find the largest receive amount */ + /* Find the largest amount of packed send/recv data */ for (i = 0, max_size = 0 ; i < size ; ++i) { - ext = opal_datatype_span(&rdtypes[i]->super, rcounts[i], &gap); + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i); - max_size = ext > max_size ? ext : max_size; + packed_size = opal_datatype_compute_remote_size(&rdtypes[i]->super, + ompi_proc->super.proc_convertor->master->remote_sizes); + packed_size *= rcounts[i]; + max_size = packed_size > max_size ? packed_size : max_size; } /* Allocate a temporary buffer */ @@ -77,45 +80,45 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con msg_size_i *= rcounts[i]; for (j = i+1 ; j < size ; ++j) { size_t msg_size_j; + struct iovec iov = {.iov_base = save_buffer, .iov_len = max_size}; + uint32_t iov_count = 1; ompi_datatype_type_size(rdtypes[j], &msg_size_j); msg_size_j *= rcounts[j]; /* Initiate all send/recv to/from others. */ if (i == rank && msg_size_j != 0) { - char * tmp_buffer; - /* Shift the temporary buffer according to the current datatype */ - (void)opal_datatype_span(&rdtypes[j]->super, rcounts[j], &gap); - tmp_buffer = save_buffer - gap; - /* Copy the data into the temporary buffer */ - err = ompi_datatype_copy_content_same_ddt (rdtypes[j], rcounts[j], - tmp_buffer, (char *) rbuf + rdisps[j]); - if (MPI_SUCCESS != err) { goto error_hndl; } + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, j); + opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0); + opal_convertor_prepare_for_send(&convertor, &rdtypes[j]->super, rcounts[j], + (char *) rbuf + rdisps[j]); + packed_size = max_size; + err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); + if (1 != err) { goto error_hndl; } /* Exchange data with the peer */ err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[j], rcounts[j], rdtypes[j], j, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); if (MPI_SUCCESS != err) { goto error_hndl; } - err = MCA_PML_CALL(send ((void *) tmp_buffer, rcounts[j], rdtypes[j], + err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED, j, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, comm)); if (MPI_SUCCESS != err) { goto error_hndl; } } else if (j == rank && msg_size_i != 0) { - char * tmp_buffer; - /* Shift the temporary buffer according to the current datatype */ - (void)opal_datatype_span(&rdtypes[i]->super, rcounts[i], &gap); - tmp_buffer = save_buffer - gap; - /* Copy the data into the temporary buffer */ - err = ompi_datatype_copy_content_same_ddt (rdtypes[i], rcounts[i], - tmp_buffer, (char *) rbuf + rdisps[i]); - if (MPI_SUCCESS != err) { goto error_hndl; } + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i); + opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0); + opal_convertor_prepare_for_send(&convertor, &rdtypes[i]->super, rcounts[i], + (char *) rbuf + rdisps[i]); + packed_size = max_size; + err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); + if (1 != err) { goto error_hndl; } /* Exchange data with the peer */ err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[i], rcounts[i], rdtypes[i], i, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); if (MPI_SUCCESS != err) { goto error_hndl; } - err = MCA_PML_CALL(send ((void *) tmp_buffer, rcounts[i], rdtypes[i], + err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED, i, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, comm)); if (MPI_SUCCESS != err) { goto error_hndl; }