Skip to content

Commit

Permalink
r.univar: add parallel support
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsms committed Jun 11, 2021
1 parent cd5879f commit 6e6581b
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 11 deletions.
5 changes: 3 additions & 2 deletions raster/r.univar/Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@

MODULE_TOPDIR = ../..

LIBES2 = $(RASTERLIB) $(GISLIB) $(MATHLIB)
LIBES3 = $(RASTER3DLIB) $(RASTERLIB) $(GISLIB) $(MATHLIB)
LIBES2 = $(RASTERLIB) $(GISLIB) $(MATHLIB) $(OMPLIB)
LIBES3 = $(RASTER3DLIB) $(RASTERLIB) $(GISLIB) $(MATHLIB) $(OMPLIB)
DEPENDENCIES = $(RASTER3DDEP) $(GISDEP) $(RASTERDEP)
EXTRA_CFLAGS = $(OMPCFLAGS)

PROGRAMS = r.univar r3.univar

Expand Down
2 changes: 1 addition & 1 deletion raster/r.univar/globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ typedef struct
/* command line options are the same for raster and raster3d maps */
typedef struct
{
struct Option *inputfile, *zonefile, *percentile, *output_file, *separator;
struct Option *inputfile, *zonefile, *percentile, *output_file, *separator, *threads;
struct Flag *shell_style, *extended, *table, *use_rast_region;
} param_type;

Expand Down
259 changes: 251 additions & 8 deletions raster/r.univar/r.univar_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
* This program is a replacement for the r.univar shell script
*/

#if defined(_OPENMP)
#include <omp.h>
#endif

#include <assert.h>
#include <string.h>
#include <float.h>
#include "globals.h"

param_type param;
Expand Down Expand Up @@ -52,6 +57,14 @@ void set_params()
_("Percentile to calculate (requires extended statistics flag)");
param.percentile->guisection = _("Extended");

param.threads = G_define_option();
param.threads->key = "nprocs";
param.threads->type = TYPE_INTEGER;
param.threads->answer = "1";
param.threads->options = "1-1000";
param.threads->required = NO;
param.threads->description = _("Number of threads which will be used for parallel computing");

param.separator = G_define_standard_option(G_OPT_F_SEP);
param.separator->guisection = _("Formatting");

Expand Down Expand Up @@ -82,6 +95,8 @@ static int open_raster(const char *infile);
static univar_stat *univar_stat_with_percentiles(int map_type);
static void process_raster(univar_stat * stats, int fd, int fdz,
const struct Cell_head *region);
static void process_raster_threaded(univar_stat *stats, char *fname,
char *zone_fname, const struct Cell_head *region, const int threads);

/* *************************************************************** */
/* **** the main functions for r.univar ************************** */
Expand All @@ -98,6 +113,8 @@ int main(int argc, char *argv[])
struct Range zone_range;
const char *mapset, *name;

int threads;

G_gisinit(argv[0]);

module = G_define_module();
Expand Down Expand Up @@ -175,14 +192,14 @@ int main(int argc, char *argv[])

/* Check if the native extent and resolution
of the input map should be used */
if(param.use_rast_region->answer) {
mapset = G_find_raster2(*p, "");
Rast_get_cellhd(*p, mapset, &region);
/* Set the computational region */
Rast_set_window(&region);
} else {
G_get_window(&region);
}
if(param.use_rast_region->answer) {
mapset = G_find_raster2(*p, "");
Rast_get_cellhd(*p, mapset, &region);
/* Set the computational region */
Rast_set_window(&region);
} else {
G_get_window(&region);
}

fd = open_raster(*p);

Expand All @@ -202,7 +219,24 @@ int main(int argc, char *argv[])
}
}

sscanf(param.threads->answer, "%d", &threads);
if (threads < 1)
{
G_warning(_("<%d> is not valid number of threads. Number of threads will be set to <%d>"),
threads, abs(threads));
threads = abs(threads);
}
#if defined(_OPENMP)
if (!param.extended->answer) {
omp_set_num_threads(threads);
process_raster_threaded(stats, *p, z, &region, threads);
} else {
process_raster(stats, fd, fdz, &region);
}
#else
process_raster(stats, fd, fdz, &region);
#endif


/* close input raster */
Rast_close(fd);
Expand Down Expand Up @@ -235,6 +269,7 @@ static int open_raster(const char *infile)
}

fd = Rast_open_old(infile, mapset);
G_free((void *) mapset);

return fd;
}
Expand Down Expand Up @@ -379,7 +414,215 @@ process_raster(univar_stat * stats, int fd, int fdz, const struct Cell_head *reg
if (!(param.shell_style->answer))
G_percent(row, rows, 2);
}
G_free(raster_row);
if (n_zones)
G_free(zoneraster_row);
if (!(param.shell_style->answer))
G_percent(rows, rows, 2); /* finish it off */

}

static void
process_raster_threaded(univar_stat * stats, char *fname, char *zone_fname, const struct Cell_head *region, const int threads)
{
/* use G_window_rows(), G_window_cols() here? */
const unsigned int rows = region->rows;
const unsigned int cols = region->cols;
const int n_zones = zone_info.n_zones;

const char *mapset;
const char *zone_mapset;
mapset = G_find_raster2(fname, "");
if (n_zones) {
zone_mapset = G_find_raster2(zone_fname, "");
}

/* open file per thread */
int *fd, *fdz;
fd = G_malloc(threads * sizeof(int));
if (n_zones) {
fdz = G_malloc(threads * sizeof(int));
}
for (int i = 0; i < threads; i++) {
fd[i] = Rast_open_old(fname, mapset);
if (n_zones)
fdz[i] = Rast_open_old(zone_fname, zone_mapset);
}

const RASTER_MAP_TYPE map_type = Rast_get_map_type(fd[0]);
const size_t value_sz = Rast_cell_size(map_type);
void **raster_row;
CELL **zoneraster_row = NULL;
raster_row = G_malloc(threads * sizeof(void*));
if (n_zones) {
zoneraster_row = G_malloc(threads * sizeof(CELL*));
}
for (int i = 0; i < threads; i++) {
if (NULL == (raster_row[i] = Rast_allocate_buf(map_type))) {
return;
}
if (n_zones != 0 && (NULL == (zoneraster_row[i] = Rast_allocate_c_buf()))) {
return;
}
}

int n_alloc = n_zones ? n_zones : 1;
size_t **n = G_malloc(threads * sizeof(size_t*));
double **sum = G_malloc(threads * sizeof(double*));
double **sumsq = G_malloc(threads * sizeof(double*));
double **sum_abs = G_malloc(threads * sizeof(double*));
double **min = G_malloc(threads * sizeof(double*));
double **max = G_malloc(threads * sizeof(double*));
size_t **size = G_malloc(threads * sizeof(double*));

for (int i = 0; i < threads; i++) {
n[i] = G_calloc(n_alloc, sizeof(size_t));
sum[i] = G_calloc(n_alloc, sizeof(double));
sumsq[i] = G_calloc(n_alloc, sizeof(double));
sum_abs[i] = G_calloc(n_alloc, sizeof(double));
size[i] = G_calloc((n_alloc), sizeof(size_t));
min[i] = G_malloc((n_alloc) * sizeof(double));
max[i] = G_malloc((n_alloc) * sizeof(double));
for (int j = 0; j < n_alloc; j++) {
max[i][j] = DBL_MIN;
min[i][j] = DBL_MAX;
}
}

omp_lock_t *minmax = G_malloc(n_alloc * sizeof(omp_lock_t));
for (int i = 0; i < n_alloc; i++) {
omp_init_lock(&(minmax[i]));
}

unsigned int work = 0;
#pragma omp parallel default(none) \
shared(stats, fd, fdz, raster_row, zoneraster_row, n, sum, sumsq, sum_abs, min, max, size, region, \
rows, cols, n_zones, zone_info, param, work, mapset, map_type, value_sz, n_alloc, minmax)
{
int id = omp_get_thread_num();
unsigned int row;

#pragma omp for schedule(static)
for (row = 0; row < rows; row++) {
void *ptr;
CELL *zptr = NULL;
unsigned int col;

Rast_get_row(fd[id], raster_row[id], row, map_type);
if (n_zones) {
Rast_get_c_row(fdz[id], zoneraster_row[id], row);
zptr = zoneraster_row[id];
}

ptr = raster_row[id];

for (col = 0; col < cols; col++) {
double val;
int zone = 0;

if (n_zones) {
/* skip NULL cells in zone map */
if (Rast_is_c_null_value(zptr)) {
ptr = G_incr_void_ptr(ptr, value_sz);
zptr++;
continue;
}
zone = *zptr - zone_info.min;
}

/* count all including NULL cells in input map */
/* stats[zone].size++; */
size[id][zone]++;

/* can't do stats with NULL cells in input map */
if (Rast_is_null_value(ptr, map_type)) {
ptr = G_incr_void_ptr(ptr, value_sz);
if (n_zones)
zptr++;
continue;
}

val = ((map_type == DCELL_TYPE) ? *((DCELL *) ptr)
: (map_type == FCELL_TYPE) ? *((FCELL *) ptr)
: *((CELL *) ptr));

sum[id][zone] += val;
sumsq[id][zone] += val * val;
sum_abs[id][zone] += fabs(val);

if (val > max[id][zone])
max[id][zone] = val;
if (val < min[id][zone])
min[id][zone] = val;

ptr = G_incr_void_ptr(ptr, value_sz);
if (n_zones)
zptr++;
n[id][zone]++;
} /* end of inner loop */
if (!(param.shell_style->answer)) {
#pragma omp atomic update
work++;
G_percent(work, rows, 2);
}
} /* end of loop region */

id = omp_get_thread_num();
for (int i = 0; i < n_alloc; i++) {
#pragma omp atomic update
stats[i].n += n[id][i];
#pragma omp atomic update
stats[i].size += size[id][i];
#pragma omp atomic update
stats[i].sum += sum[id][i];
#pragma omp atomic update
stats[i].sumsq += sumsq[id][i];
#pragma omp atomic update
stats[i].sum_abs += sum_abs[id][i];

omp_set_lock(&(minmax[i]));
if (stats[i].max < max[id][i] || (stats[i].max != stats[i].max && max[id][i] != DBL_MIN)) {
stats[i].max = max[id][i];
}
if (stats[i].min > min[id][i] || (stats[i].min != stats[i].min && min[id][i] != DBL_MAX)) {
stats[i].min = min[id][i];
}
omp_unset_lock(&(minmax[i]));
}

Rast_close(fd[id]);
if (n_zones)
Rast_close(fdz[id]);
G_free(n[id]);
G_free(size[id]);
G_free(sum[id]);
G_free(sumsq[id]);
G_free(sum_abs[id]);
G_free(max[id]);
G_free(min[id]);
G_free(raster_row[id]);
if (n_zones)
G_free(zoneraster_row[id]);
} /* end of parallel region */
if (!(param.shell_style->answer))
G_percent(rows, rows, 2); /* finish it off */
for (int i = 0; i < n_alloc; i++) {
omp_destroy_lock(&(minmax[i]));
}
G_free((void *) mapset);
if (n_zones)
G_free((void *) zone_mapset);
G_free(fd);
if (n_zones)
G_free(fdz);
G_free(n);
G_free(size);
G_free(sum);
G_free(sumsq);
G_free(sum_abs);
G_free(max);
G_free(min);
G_free(raster_row);
if (n_zones)
G_free(zoneraster_row);
}

0 comments on commit 6e6581b

Please sign in to comment.