Skip to content

Commit

Permalink
EnKF updates to support RRFS 3-km NA domain (NOAA-EMC#501)
Browse files Browse the repository at this point in the history
WIP:  see issue NOAA-EMC#500

Co-authored-by: russ.treadon <Russ.Treadon@noaa.gov>
Co-authored-by: jswhit2 <Jeffrey.S.Whitaker@noaa.gov>
  • Loading branch information
3 people authored Dec 1, 2022
1 parent 4a7c652 commit 8ef658d
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 107 deletions.
22 changes: 16 additions & 6 deletions src/enkf/controlvec.f90
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ subroutine write_control(no_inflate_flag)
logical, intent(in) :: no_inflate_flag

real(r_double) :: t1,t2
integer(i_kind) :: nb, nvar, ne
integer(i_kind) :: nb, nvar,ne,nn
integer(i_kind) :: q_ind, ierr
real(r_single), allocatable, dimension(:,:) :: grdin_mean_tmp
real(r_single), allocatable, dimension(:,:,:,:) :: grdin_mean
Expand All @@ -309,11 +309,21 @@ subroutine write_control(no_inflate_flag)
print *,'--------------'
endif
! gather ensmean increment on root.
do ne=1,nanals_per_iotask
call mpi_reduce(grdin(:,:,nb,ne), grdin_mean_tmp, npts*ncdim, mpi_real4,&
mpi_sum,0,mpi_comm_io,ierr)
if (nproc == 0) grdin_mean(:,:,nb,1) = grdin_mean(:,:,nb,1) + grdin_mean_tmp
enddo
if (real(npts)*real(ncdim) < 2_r_kind**32/2_r_kind - 1_r_kind) then
do ne=1,nanals_per_iotask
call mpi_reduce(grdin(:,:,nb,ne), grdin_mean_tmp, npts*ncdim, mpi_real4,&
mpi_sum,0,mpi_comm_io,ierr)
if (nproc == 0) grdin_mean(:,:,nb,1) = grdin_mean(:,:,nb,1) + grdin_mean_tmp
enddo
else
do nn=1,ncdim
do ne=1,nanals_per_iotask
call mpi_reduce(grdin(:,nn,nb,ne), grdin_mean_tmp(:,nn), npts, mpi_real4,&
mpi_sum,0,mpi_comm_io,ierr)
if (nproc == 0) grdin_mean(:,nn,nb,1) = grdin_mean(:,nn,nb,1) + grdin_mean_tmp(:,nn)
enddo
enddo
endif
! print out ens mean increment info
if (nproc == 0) then
grdin_mean(:,:,nb,1) = grdin_mean(:,:,nb,1)/real(nanals)
Expand Down
293 changes: 193 additions & 100 deletions src/enkf/loadbal.f90
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ module loadbal
numptsperproc, numobsperproc
integer(i_kind),public, allocatable, dimension(:,:) :: indxproc, indxproc_obs
integer(i_kind),public :: npts_min, npts_max, nobs_min, nobs_max
integer(8) totsize
! kd-tree structures.
type(kdtree2),public,pointer :: kdtree_obs, kdtree_grid, kdtree_obs2

Expand Down Expand Up @@ -356,73 +355,126 @@ subroutine scatter_chunks
allocate(scounts(0:numproc-1))
allocate(displs(0:numproc-1))
allocate(rcounts(0:numproc-1))
! allocate array to hold pieces of state vector on each proc.
allocate(anal_chunk(nanals,npts_max,ncdim,nbackgrounds))
if (nproc == 0) print *,'anal_chunk size = ',size(anal_chunk,kind=8)

! only IO tasks send any data.
! scounts is number of data elements to send to processor np.
! rcounts is number of data elements to recv from processor np.
! displs is displacement into send array for data to go to proc np
do np=0,numproc-1
displs(np) = np*nanals_per_iotask*npts_max*ncdim
enddo
if (nproc <= ntasks_io-1) then
scounts = nanals_per_iotask*npts_max*ncdim

if (real(numproc)*real(nanals_per_iotask)*real(npts_max)*real(ncdim) < 2_r_kind**32/2_r_kind - 1_r_kind) then
do np=0,numproc-1
displs(np) = np*nanals_per_iotask*npts_max*ncdim
enddo
if (nproc <= ntasks_io-1) then
scounts = nanals_per_iotask*npts_max*ncdim
else
scounts = 0
endif
! displs is also the displacement into recv array for data to go into anal_chunk
! on task np.
do np=0,numproc-1
if (np <= ntasks_io-1) then
rcounts(np) = nanals_per_iotask*npts_max*ncdim
else
rcounts(np) = 0
end if
enddo
allocate(sendbuf(numproc*nanals_per_iotask*npts_max*ncdim))
allocate(recvbuf(numproc*nanals_per_iotask*npts_max*ncdim))

! send and receive buffers.
do nb=1,nbackgrounds ! loop over time levels in background

if (nproc <= ntasks_io-1) then
! fill up send buffer.
do np=1,numproc
do ne=1,nanals_per_iotask
do nn=1,ncdim
do i=1,numptsperproc(np)
n = ((np-1)*ncdim*nanals_per_iotask + (ne-1)*ncdim + (nn-1))*npts_max + i
sendbuf(n) = grdin(indxproc(np,i),nn,nb,ne)
enddo
enddo
enddo
enddo
end if
call mpi_alltoallv(sendbuf, scounts, displs, mpi_real4, recvbuf, rcounts, displs,&
mpi_real4, mpi_comm_world, ierr)

!==> compute ensemble of first guesses on each task, remove mean from anal.
!$omp parallel do schedule(dynamic,1) private(nn,i,nanal,n)
do nn=1,ncdim
do i=1,numptsperproc(nproc+1)
do nanal=1,nanals
n = ((nanal-1)*ncdim + (nn-1))*npts_max + i
anal_chunk(nanal,i,nn,nb) = recvbuf(n)
enddo
end do
end do
!$omp end parallel do

enddo ! loop over nbackgrounds
else
scounts = 0
endif
! displs is also the displacement into recv array for data to go into anal_chunk
! on
! task np.
do np=0,numproc-1
if (np <= ntasks_io-1) then
rcounts(np) = nanals_per_iotask*npts_max*ncdim
do np=0,numproc-1
displs(np) = np*nanals_per_iotask*npts_max
enddo
if (nproc <= ntasks_io-1) then
scounts = nanals_per_iotask*npts_max
else
rcounts(np) = 0
end if
enddo
allocate(sendbuf(numproc*nanals_per_iotask*npts_max*ncdim))
allocate(recvbuf(numproc*nanals_per_iotask*npts_max*ncdim))

! allocate array to hold pieces of state vector on each proc.
allocate(anal_chunk(nanals,npts_max,ncdim,nbackgrounds))
if (nproc == 0) print *,'anal_chunk size = ',size(anal_chunk,kind=8)
scounts = 0
endif
! displs is also the displacement into recv array for data to go into anal_chunk
! on task np.
do np=0,numproc-1
if (np <= ntasks_io-1) then
rcounts(np) = nanals_per_iotask*npts_max
else
rcounts(np) = 0
end if
enddo
allocate(sendbuf(numproc*nanals_per_iotask*npts_max))
allocate(recvbuf(numproc*nanals_per_iotask*npts_max))

! send and receive buffers.
do nb=1,nbackgrounds ! loop over time levels in background
do nn=1,ncdim ! loop over levels

if (nproc <= ntasks_io-1) then
! fill up send buffer.
do np=1,numproc
do ne=1,nanals_per_iotask
do i=1,numptsperproc(np)
n = ((ne-1)*nanals_per_iotask + (np-1))*npts_max + i
sendbuf(n) = grdin(indxproc(np,i),nn,nb,ne)
enddo
enddo
enddo
end if
call mpi_alltoallv(sendbuf, scounts, displs, mpi_real4, recvbuf, rcounts, displs,&
mpi_real4, mpi_comm_world, ierr)
!==> compute ensemble of first guesses on each task, remove mean from anal.
!$omp parallel do schedule(dynamic,1) private(i,nanal,n)
do i=1,numptsperproc(nproc+1)
do nanal=1,nanals
n = (nanal-1)*npts_max + i
anal_chunk(nanal,i,nn,nb) = recvbuf(n)
enddo
end do
!$omp end parallel do

enddo ! end loop over levels
enddo ! loop over nbackgrounds
endif

deallocate(sendbuf, recvbuf)
allocate(anal_chunk_prior(nanals,npts_max,ncdim,nbackgrounds))
allocate(ensmean_chunk(npts_max,ncdim,nbackgrounds))
allocate(ensmean_chunk_prior(npts_max,ncdim,nbackgrounds))
ensmean_chunk = 0_r_single

! send and receive buffers.
do nb=1,nbackgrounds ! loop over time levels in background

if (nproc <= ntasks_io-1) then
! fill up send buffer.
do np=1,numproc
do ne=1,nanals_per_iotask
do nn=1,ncdim
do i=1,numptsperproc(np)
n = ((np-1)*ncdim*nanals_per_iotask + (ne-1)*ncdim + (nn-1))*npts_max + i
sendbuf(n) = grdin(indxproc(np,i),nn,nb,ne)
enddo
enddo
enddo
enddo
end if
call mpi_alltoallv(sendbuf, scounts, displs, mpi_real4, recvbuf, rcounts, displs,&
mpi_real4, mpi_comm_world, ierr)

!==> compute ensemble of first guesses on each task, remove mean from anal.
!$omp parallel do schedule(dynamic,1) private(nn,i,nanal,n)
do nn=1,ncdim
do i=1,numptsperproc(nproc+1)
do nanal=1,nanals
n = ((nanal-1)*ncdim + (nn-1))*npts_max + i
anal_chunk(nanal,i,nn,nb) = recvbuf(n)
enddo
end do
end do
!$omp end parallel do

enddo ! loop over nbackgrounds

!==> compute mean, remove it from anal_chunk
!$omp parallel do schedule(dynamic,1) private(nn,i,n,nb)
do nb=1,nbackgrounds
Expand All @@ -440,7 +492,6 @@ subroutine scatter_chunks
end do
!$omp end parallel do

deallocate(sendbuf, recvbuf)

end subroutine scatter_chunks

Expand All @@ -460,52 +511,94 @@ subroutine gather_chunks
! scounts is number of data elements to send to processor np.
! rcounts is number of data elements to recv from processor np.
! displs is displacement into send array for data to go to proc np
if (nproc <= ntasks_io-1) then
rcounts = nanals_per_iotask*npts_max*ncdim
else
rcounts = 0
endif
do np=0,numproc-1
displs(np) = np*nanals_per_iotask*npts_max*ncdim
if (np <= ntasks_io-1) then
scounts(np) = nanals_per_iotask*npts_max*ncdim
else
scounts(np) = 0
end if
enddo
allocate(recvbuf(numproc*nanals_per_iotask*npts_max*ncdim))
allocate(sendbuf(numproc*nanals_per_iotask*npts_max*ncdim))


do nb=1,nbackgrounds ! loop over time levels in background
do nn=1,ncdim
do i=1,numptsperproc(nproc+1)
do nanal=1,nanals
n = ((nanal-1)*ncdim + (nn-1))*npts_max + i
! add ensemble mean back in.
sendbuf(n) = anal_chunk(nanal,i,nn,nb)+ensmean_chunk(i,nn,nb)
! convert to increment (A-F).
sendbuf(n) = sendbuf(n)-(anal_chunk_prior(nanal,i,nn,nb)+ensmean_chunk_prior(i,nn,nb))
enddo
enddo
enddo
call mpi_alltoallv(sendbuf, scounts, displs, mpi_real4, recvbuf, rcounts, displs,&
mpi_real4, mpi_comm_world, ierr)
if (nproc <= ntasks_io-1) then
do np=1,numproc
do ne=1,nanals_per_iotask
do nn=1,ncdim
do i=1,numptsperproc(np)
n = ((np-1)*ncdim*nanals_per_iotask + (ne-1)*ncdim + (nn-1))*npts_max + i
grdin(indxproc(np,i),nn,nb,ne) = recvbuf(n)
if (real(numproc)*real(nanals_per_iotask)*real(npts_max)*real(ncdim) < 2_r_kind**32/2_r_kind - 1_i_kind) then
if (nproc <= ntasks_io-1) then
rcounts = nanals_per_iotask*npts_max*ncdim
else
rcounts = 0
endif
do np=0,numproc-1
displs(np) = np*nanals_per_iotask*npts_max*ncdim
if (np <= ntasks_io-1) then
scounts(np) = nanals_per_iotask*npts_max*ncdim
else
scounts(np) = 0
end if
enddo
allocate(recvbuf(numproc*nanals_per_iotask*npts_max*ncdim))
allocate(sendbuf(numproc*nanals_per_iotask*npts_max*ncdim))
do nb=1,nbackgrounds ! loop over time levels in background
do nn=1,ncdim
do i=1,numptsperproc(nproc+1)
do nanal=1,nanals
n = ((nanal-1)*ncdim + (nn-1))*npts_max + i
! add ensemble mean back in.
sendbuf(n) = anal_chunk(nanal,i,nn,nb)+ensmean_chunk(i,nn,nb)
! convert to increment (A-F).
sendbuf(n) = sendbuf(n)-(anal_chunk_prior(nanal,i,nn,nb)+ensmean_chunk_prior(i,nn,nb))
enddo
enddo
enddo
enddo
enddo
!print *,nproc,'min/max ps',minval(grdin(:,ncdim)),maxval(grdin(:,ncdim))
end if
enddo ! end loop over background time levels

call mpi_alltoallv(sendbuf, scounts, displs, mpi_real4, recvbuf, rcounts, displs,&
mpi_real4, mpi_comm_world, ierr)
if (nproc <= ntasks_io-1) then
do np=1,numproc
do ne=1,nanals_per_iotask
do nn=1,ncdim
do i=1,numptsperproc(np)
n = ((np-1)*ncdim*nanals_per_iotask + (ne-1)*ncdim + (nn-1))*npts_max + i
grdin(indxproc(np,i),nn,nb,ne) = recvbuf(n)
enddo
enddo
enddo
enddo
!print *,nproc,'min/max ps',minval(grdin(:,ncdim)),maxval(grdin(:,ncdim))
end if
enddo ! end loop over background time levels
else
if (nproc <= ntasks_io-1) then
rcounts = nanals_per_iotask*npts_max
else
rcounts = 0
endif
do np=0,numproc-1
displs(np) = np*nanals_per_iotask*npts_max
if (np <= ntasks_io-1) then
scounts(np) = nanals_per_iotask*npts_max
else
scounts(np) = 0
end if
enddo
allocate(recvbuf(numproc*nanals_per_iotask*npts_max))
allocate(sendbuf(numproc*nanals_per_iotask*npts_max))
do nb=1,nbackgrounds ! loop over time levels in background
do nn=1,ncdim ! loop over levels
do i=1,numptsperproc(nproc+1)
do nanal=1,nanals
n = (nanal-1)*npts_max + i
! add ensemble mean back in.
sendbuf(n) = anal_chunk(nanal,i,nn,nb)+ensmean_chunk(i,nn,nb)
! convert to increment (A-F).
sendbuf(n) = sendbuf(n)-(anal_chunk_prior(nanal,i,nn,nb)+ensmean_chunk_prior(i,nn,nb))
enddo
enddo
call mpi_alltoallv(sendbuf, scounts, displs, mpi_real4, recvbuf, rcounts, displs,&
mpi_real4, mpi_comm_world, ierr)
if (nproc <= ntasks_io-1) then
do np=1,numproc
do ne=1,nanals_per_iotask
do i=1,numptsperproc(np)
n = ((ne-1)*nanals_per_iotask + (np-1))*npts_max + i
grdin(indxproc(np,i),nn,nb,ne) = recvbuf(n)
enddo
enddo
enddo
!print *,nproc,'min/max ps',minval(grdin(:,ncdim)),maxval(grdin(:,ncdim))
end if
enddo ! end loop over levels
enddo ! end loop over background time levels
endif
deallocate(sendbuf, recvbuf)

end subroutine gather_chunks
Expand Down
2 changes: 1 addition & 1 deletion src/enkf/mpi_readobs.f90
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ subroutine mpi_getobs(obspath, datestring, nobs_conv, nobs_oz, nobs_sat, nobs_to
! exchange obs prior ensemble members across all tasks to fully populate shared
! memory array pointer on each node.
if (nproc_shm == 0) then
if (real(nanals)*real(nobs_tot) < 2**32/2. - 1) then
if (real(nanals)*real(nobs_tot) < 2_r_kind**32/2_r_kind - 1_r_kind) then
call mpi_allreduce(mpi_in_place,anal_ob,nanals*nobs_tot,mpi_real4,mpi_sum,mpi_comm_shmemroot,ierr)
else
! count won't fit in 32-bit integer and mpi_allreduce doesn't handle
Expand Down

0 comments on commit 8ef658d

Please sign in to comment.