Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement selection vector I/O with collective chunk filling #3826

Merged
merged 14 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 101 additions & 163 deletions src/H5Dchunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -5536,11 +5536,9 @@ H5D__chunk_update_old_edge_chunks(H5D_t *dset, hsize_t old_dim[])
/*-------------------------------------------------------------------------
* Function: H5D__chunk_collective_fill
*
* Purpose: Use MPIO collective write to fill the chunks (if number of
* chunks to fill is greater than the number of MPI procs;
* otherwise use independent I/O).
* Purpose: Use MPIO selection vector I/O for writing fill chunks
*
* Return: Non-negative on success/Negative on failure
* Return: Non-negative on success/Negative on failure
*
*-------------------------------------------------------------------------
*/
Expand All @@ -5554,19 +5552,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
int mpi_code; /* MPI return code */
size_t num_blocks; /* Number of blocks between processes. */
size_t leftover_blocks; /* Number of leftover blocks to handle */
int blocks, leftover; /* converted to int for MPI */
MPI_Aint *chunk_disp_array = NULL;
MPI_Aint *block_disps = NULL;
int *block_lens = NULL;
MPI_Datatype mem_type = MPI_BYTE, file_type = MPI_BYTE;
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
bool need_sort = false;
size_t i; /* Local index variable */
int blocks; /* converted to int for MPI */
int leftover; /* converted to int for MPI */
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
size_t i; /* Local index variable */
haddr_t *io_addrs = NULL;
size_t *io_sizes = NULL;
const void **io_wbufs = NULL;
H5FD_mem_t io_types[2];
bool all_same_block_len = true;
bool need_sort = false;
size_t io_2sizes[2];
herr_t ret_value = SUCCEED; /* Return value */

FUNC_ENTER_PACKAGE

assert(chunk_fill_info->num_chunks != 0);

/*
* If a separate fill buffer is provided for partial chunks, ensure
* that the "don't filter partial edge chunks" flag is set.
Expand All @@ -5589,6 +5592,7 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
/* Distribute evenly the number of blocks between processes. */
if (mpi_size == 0)
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "Resulted in division by zero");

num_blocks =
(size_t)(chunk_fill_info->num_chunks / (size_t)mpi_size); /* value should be the same on all procs */

Expand All @@ -5602,157 +5606,97 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
H5_CHECKED_ASSIGN(leftover, int, leftover_blocks, size_t);

/* Check if we have any chunks to write on this rank */
if (num_blocks > 0 || (leftover && leftover > mpi_rank)) {
MPI_Aint partial_fill_buf_disp = 0;
bool all_same_block_len = true;

/* Allocate buffers */
if (NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer");

if (partial_chunk_fill_buf) {
MPI_Aint fill_buf_addr;
MPI_Aint partial_fill_buf_addr;

/* Calculate the displacement between the fill buffer and partial chunk fill buffer */
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(fill_buf, &fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(partial_chunk_fill_buf, &partial_fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)

#if H5_CHECK_MPI_VERSION(3, 1)
partial_fill_buf_disp = MPI_Aint_diff(partial_fill_buf_addr, fill_buf_addr);
#else
partial_fill_buf_disp = partial_fill_buf_addr - fill_buf_addr;
#endif
if (num_blocks > 0 || leftover > mpi_rank) {

/*
* Allocate all-zero block displacements array. If a block's displacement
* is left as zero, that block will be written to from the regular fill
* buffer. If a block represents an unfiltered partial edge chunk, its
* displacement will be set so that the block is written to from the
* unfiltered fill buffer.
*/
if (NULL == (block_disps = (MPI_Aint *)H5MM_calloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate block displacements buffer");
}
if (NULL == (io_addrs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_addrs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
"couldn't allocate space for I/O addresses vector");

/*
* Perform initial scan of chunk info list to:
* - make sure that chunk addresses are monotonically non-decreasing
* - check if all blocks have the same length
*/
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
need_sort = true;

if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
all_same_block_len = false;
}
if (NULL == (io_wbufs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_wbufs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O buffers vector");
}

if (need_sort)
qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks,
sizeof(struct chunk_coll_fill_info), H5D__chunk_cmp_coll_fill_info);
/*
* Perform initial scan of chunk info list to:
* - make sure that chunk addresses are monotonically non-decreasing
* - check if all blocks have the same length
*/
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
need_sort = true;

/* Allocate buffer for block lengths if necessary */
if (!all_same_block_len)
if (NULL == (block_lens = (int *)H5MM_malloc((size_t)(blocks + 1) * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer");
if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
all_same_block_len = false;
}

for (i = 0; i < (size_t)blocks; i++) {
size_t idx = i + (size_t)(mpi_rank * blocks);
/*
* Note that we sort all of the chunks here, and not just a subset
* corresponding to this rank. We do this since we have found MPI I/O to work
* better when each rank writes blocks that are contiguous in the file,
* and by sorting the full list we maximize the chance of that happening.
*/
if (need_sort)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some more comments to the blocks that don't have them, explaining what the code is doing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, for this block in particular, add something like this note (I just discussed this with Scot):

Note that we sort all of the chunks here, and not just a subset corresponding to this rank. We do this since we have found MPI I/O to work better when each rank writes blocks that are contiguous in the file, and by sorting the full list we maximize the chance of that happening.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could potentially be making extra work for ourselves now by sorting here since the MPI I/O VFD is already scanning the chunk list to check for this anyway and we don't necessarily need to impose the monotonically non-decreasing file displacement requirement on other parallel VFDs. That said, since the sort has to happen at some point anyway if the addresses are out of order, I'm not sure if the above note is really needed here.

qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks, sizeof(struct chunk_coll_fill_info),
H5D__chunk_cmp_coll_fill_info);

/* store the chunk address as an MPI_Aint */
chunk_disp_array[i] = (MPI_Aint)(chunk_fill_info->chunk_info[idx].addr);
/*
* If all the chunks have the same length, use the compressed feature
* to store the size.
* Otherwise, allocate the array of sizes for storing chunk sizes.
*/
if (all_same_block_len) {
io_2sizes[0] = chunk_fill_info->chunk_info[0].chunk_size;
io_2sizes[1] = 0;
}
else {
if (NULL == (io_sizes = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_sizes))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector");
}

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[i], int, chunk_fill_info->chunk_info[idx].chunk_size, size_t);
/*
* Since the type of all chunks is raw data, use the compressed feature
* to store the chunk type.
*/
io_types[0] = H5FD_MEM_DRAW;
io_types[1] = H5FD_MEM_NOLIST;

if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[i] = partial_fill_buf_disp;
}
} /* end for */
/*
* For the chunks corresponding to this rank, fill in the
* address, size and buf pointer for each chunk.
*/
for (i = 0; i < (size_t)blocks; i++) {
size_t idx = i + (size_t)(mpi_rank * blocks);

/* Calculate if there are any leftover blocks after evenly
* distributing. If there are, then round-robin the distribution
* to processes 0 -> leftover.
*/
if (leftover && leftover > mpi_rank) {
chunk_disp_array[blocks] =
(MPI_Aint)chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[blocks], int,
chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size,
size_t);

if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[blocks] = partial_fill_buf_disp;
}
io_addrs[i] = chunk_fill_info->chunk_info[idx].addr;

blocks++;
}
if (!all_same_block_len)
io_sizes[i] = chunk_fill_info->chunk_info[idx].chunk_size;

/* Create file and memory types for the write operation */
if (all_same_block_len) {
int block_len;
if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk)
io_wbufs[i] = partial_chunk_fill_buf;
else
io_wbufs[i] = fill_buf;
}

H5_CHECKED_ASSIGN(block_len, int, chunk_fill_info->chunk_info[0].chunk_size, size_t);
/*
* For the leftover chunk corresponding to this rank, fill in the
* address, size and buf pointer for the chunk.
*/
if (leftover > mpi_rank) {
io_addrs[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;

mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
if (!all_same_block_len)
io_sizes[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size;

if (partial_chunk_fill_buf) {
/*
* If filters are disabled for partial edge chunks, those chunks could
* potentially have the same block length as the other chunks, but still
* need to be written to using the unfiltered fill buffer. Use an hindexed
* block type rather than an hvector.
*/
mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
}
else {
mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code)
}
}
else {
/*
* Currently, different block lengths implies that there are partial
* edge chunks and the "don't filter partial edge chunks" flag is set.
*/
if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
assert(block_lens);
assert(block_disps);

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
io_wbufs[blocks] = partial_chunk_fill_buf;
}
else
io_wbufs[blocks] = fill_buf;

if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
} /* end if */

/* Set MPI-IO VFD properties */

/* Set MPI datatypes for operation */
if (H5CX_set_mpi_coll_datatypes(mem_type, file_type) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O properties");
blocks++;
}

/* Get current transfer mode */
if (H5CX_get_io_xfer_mode(&prev_xfer_mode) < 0)
Expand All @@ -5763,31 +5707,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
if (H5CX_set_io_xfer_mode(H5FD_MPIO_COLLECTIVE) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* Low-level write (collective) */
if (H5F_shared_block_write(H5F_SHARED(dset->oloc.file), H5FD_MEM_DRAW, (haddr_t)0,
(blocks) ? (size_t)1 : (size_t)0, fill_buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file");

/* Barrier so processes don't race ahead */
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)

/* Perform the selection vector I/O for the chunks */
if (H5F_shared_vector_write(H5F_SHARED(dset->oloc.file), (uint32_t)blocks, io_types, io_addrs,
all_same_block_len ? io_2sizes : io_sizes, io_wbufs) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed");

done:
if (have_xfer_mode)
/* Set transfer mode */
/* Restore transfer mode */
if (H5CX_set_io_xfer_mode(prev_xfer_mode) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* free things */
if (MPI_BYTE != file_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if (MPI_BYTE != mem_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
H5MM_xfree(chunk_disp_array);
H5MM_xfree(block_disps);
H5MM_xfree(block_lens);
H5MM_xfree(io_addrs);
H5MM_xfree(io_wbufs);
H5MM_xfree(io_sizes);

FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__chunk_collective_fill() */
Expand All @@ -5805,6 +5742,7 @@ H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2)

FUNC_LEAVE_NOAPI(H5_addr_cmp(entry1->addr, entry2->addr))
} /* end H5D__chunk_cmp_coll_fill_info() */

#endif /* H5_HAVE_PARALLEL */

/*-------------------------------------------------------------------------
Expand Down
Loading