Skip to content

Commit

Permalink
Add parallel region
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsms committed Aug 9, 2021
1 parent 5fcbb01 commit ccb937e
Showing 1 changed file with 125 additions and 52 deletions.
177 changes: 125 additions & 52 deletions raster/r.series/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*
* MODULE: r.series
* AUTHOR(S): Glynn Clements <glynn gclements.plus.com> (original contributor)
* Hamish Bowman <hamish_b yahoo.com>,
* Hamish Bowman <hamish_b yahoo.com>,
* Jachym Cepicky <jachym les-ejk.cz>,
* Martin Wegmann <wegmann biozentrum.uni-wuerzburg.de>
* PURPOSE:
Expand Down Expand Up @@ -125,9 +125,14 @@ int main(int argc, char *argv[])
} flag;
int i, t;
int nprocs;
bool threaded;
int num_inputs;
struct input **inputs = NULL;
int bufrows;
#if defined(_OPENMP)
omp_lock_t fd_lock;
#endif

int num_outputs;
struct output *outputs = NULL;
struct History history;
Expand Down Expand Up @@ -210,15 +215,15 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);

sscanf(parm.nprocs->answer, "%d", &nprocs);
if (nprocs < 1)
{
if (nprocs < 1) {
G_fatal_error(_("<%d> is not valid number of nprocs."), nprocs);
}
#if defined(_OPENMP)
omp_set_num_threads(nprocs);
#else
nprocs = 1;
#endif
threaded = nprocs > 1;

lo = -1.0 / 0.0; /* -inf */
hi = 1.0 / 0.0; /* inf */
Expand Down Expand Up @@ -393,6 +398,13 @@ int main(int argc, char *argv[])
bufrows = nprocs;
}

/* set the locks for lazily opening raster files */
#if defined(_OPENMP)
if (flag.lazy->answer && threaded) {
omp_init_lock(&fd_lock);
}
#endif

/* process the output maps */
for (i = 0; parm.output->answers[i]; i++) ;
num_outputs = i;
Expand Down Expand Up @@ -465,69 +477,130 @@ int main(int argc, char *argv[])
/* process the data */
G_verbose_message(_("Percent complete..."));

int t_id = 0;
struct input *in = inputs[t_id];

for (row = 0; row < nrows; row++) {
G_percent(row, nrows, 2);
int computed = 0;
int written = 0;

if (flag.lazy->answer) {
/* Open the files only on run time */
for (i = 0; i < num_inputs; i++) {
in[i].fd = Rast_open_old(in[i].name, "");
Rast_get_d_row(in[i].fd, in[i].buf, row);
Rast_close(in[i].fd);
}
} else {
for (i = 0; i < num_inputs; i++)
Rast_get_d_row(in[i].fd, in[i].buf, row);
while (written < nrows) {
int range = bufrows;
if (range > nrows - written) {
range = nrows - written;
}
int start = written;
int end = written + range;

for (col = 0; col < ncols; col++) {
int null = 0;
#pragma omp parallel if(threaded) private(row, col, i)
{
int t_id = 0;
#if defined(_OPENMP)
t_id = omp_get_thread_num();
#endif
struct input *in = inputs[t_id];
DCELL *val = values[t_id];
DCELL *val_tmp = values_tmp[t_id];
DCELL (*val_w)[2] = NULL;
DCELL (*val_w_tmp)[2] = NULL;
if (have_weights) {
val_w = values_w[t_id];
val_w_tmp = values_w_tmp[t_id];
}

for (i = 0; i < num_inputs; i++) {
DCELL v = in[i].buf[col];
#pragma omp for schedule(static)
for (row = start; row < end; row++) {
G_percent(computed, nrows, 2);

if (Rast_is_d_null_value(&v))
null = 1;
else if (parm.range->answer && (v < lo || v > hi)) {
Rast_set_d_null_value(&v, 1);
null = 1;
}
values[t_id][i] = v;
if (have_weights) {
values_w[t_id][i][0] = v;
values_w[t_id][i][1] = in[i].weight;
if (flag.lazy->answer) {
/* Open the files only on run time */
for (i = 0; i < num_inputs; i++) {
#if defined(_OPENMP)
if(threaded) {
omp_set_lock(&fd_lock);
in[i].fd = Rast_open_old(in[i].name, "");
omp_unset_lock(&fd_lock);

Rast_get_d_row(in[i].fd, in[i].buf, row);

omp_set_lock(&fd_lock);
Rast_close(in[i].fd);
omp_unset_lock(&fd_lock);
} else {
in[i].fd = Rast_open_old(in[i].name, "");
Rast_get_d_row(in[i].fd, in[i].buf, row);
Rast_close(in[i].fd);
}
#else
in[i].fd = Rast_open_old(in[i].name, "");
Rast_get_d_row(in[i].fd, in[i].buf, row);
Rast_close(in[i].fd);
#endif
}
} else {
for (i = 0; i < num_inputs; i++)
Rast_get_d_row(in[i].fd, in[i].buf, row);
}
}

for (i = 0; i < num_outputs; i++) {
struct output *out = &outputs[i];
for (col = 0; col < ncols; col++) {
int null = 0;
size_t s = (size_t) (row - start) * ncols + col;

for (i = 0; i < num_inputs; i++) {
DCELL v = in[i].buf[col];

if (Rast_is_d_null_value(&v))
null = 1;
else if (parm.range->answer && (v < lo || v > hi)) {
Rast_set_d_null_value(&v, 1);
null = 1;
}
values[t_id][i] = v;
if (have_weights) {
val_w[i][0] = v;
val_w[i][1] = in[i].weight;
}
}

if (null && flag.nulls->answer)
Rast_set_d_null_value(&out->buf[col], 1);
else {
if (out->method_fn_w) {
memcpy(values_w_tmp[t_id], values_w[t_id],
num_inputs * 2 * sizeof(DCELL));
(*out->method_fn_w)(&out->buf[col], values_w_tmp[t_id],
num_inputs, &out->quantile);
} else {
memcpy(values_tmp[t_id], values[t_id], num_inputs * sizeof(DCELL));
(*out->method_fn)(&out->buf[col], values_tmp[t_id],
num_inputs, &out->quantile);
for (i = 0; i < num_outputs; i++) {
struct output *out = &outputs[i];

if (null && flag.nulls->answer)
Rast_set_d_null_value(&out->buf[s], 1);
else {
if (out->method_fn_w) {
memcpy(val_w_tmp, val_w,
num_inputs * 2 * sizeof(DCELL));
(*out->method_fn_w)(&out->buf[s], val_w_tmp,
num_inputs, &out->quantile);
} else {
memcpy(val_tmp, val, num_inputs * sizeof(DCELL));
(*out->method_fn)(&out->buf[s], val_tmp,
num_inputs, &out->quantile);
}
}
}
}
}

computed++;
} /* end for loop */
} /* end parallel region */

/* write output buffer to disk */
for (i = 0; i < num_outputs; i++) {
struct output *out = &outputs[i];
for (row = start; row < end; row++)
Rast_put_d_row(out->fd, &out->buf[(size_t) (row - start) * ncols]);
}
written = end;

for (i = 0; i < num_outputs; i++)
Rast_put_d_row(outputs[i].fd, outputs[i].buf);
}
} /* end while loop */

G_percent(row, nrows, 2);

/* destroy locks */
#if defined(_OPENMP)
if (flag.lazy->answer && nprocs > 1) {
omp_destroy_lock(&fd_lock);
}
#endif

/* close output maps */
for (i = 0; i < num_outputs; i++) {
struct output *out = &outputs[i];
Expand All @@ -539,7 +612,7 @@ int main(int argc, char *argv[])
Rast_write_history(out->name, &history);
}

/* Close input maps */
/* close input maps */
if (!flag.lazy->answer) {
for (t = 0; t < nprocs; t++)
for (i = 0; i < num_inputs; i++)
Expand Down

0 comments on commit ccb937e

Please sign in to comment.