diff --git a/datasets/bluetopo/package/BlueTopoBathyRaster.cpp b/datasets/bluetopo/package/BlueTopoBathyRaster.cpp index bfee12bb..9ff6ad40 100644 --- a/datasets/bluetopo/package/BlueTopoBathyRaster.cpp +++ b/datasets/bluetopo/package/BlueTopoBathyRaster.cpp @@ -71,9 +71,18 @@ BlueTopoBathyRaster::~BlueTopoBathyRaster(void) = default; /*---------------------------------------------------------------------------- * getIndexFile *----------------------------------------------------------------------------*/ -void BlueTopoBathyRaster::getIndexFile(const OGRGeometry* geo, std::string& file, const std::vector* points) +void BlueTopoBathyRaster::getIndexFile(const OGRGeometry* geo, std::string& file) { static_cast(geo); + file = indexFile; + mlog(DEBUG, "Using %s", file.c_str()); +} + +/*---------------------------------------------------------------------------- + * getIndexFile + *----------------------------------------------------------------------------*/ +void BlueTopoBathyRaster::getIndexFile(const std::vector* points, std::string& file) +{ static_cast(points); file = indexFile; mlog(DEBUG, "Using %s", file.c_str()); diff --git a/datasets/bluetopo/package/BlueTopoBathyRaster.h b/datasets/bluetopo/package/BlueTopoBathyRaster.h index 6a81daea..47a83bfe 100644 --- a/datasets/bluetopo/package/BlueTopoBathyRaster.h +++ b/datasets/bluetopo/package/BlueTopoBathyRaster.h @@ -73,7 +73,8 @@ class BlueTopoBathyRaster: public GeoIndexedRaster BlueTopoBathyRaster (lua_State* L, RequestFields* rqst_parms, const char* key); ~BlueTopoBathyRaster (void) override; - void getIndexFile (const OGRGeometry* geo, std::string& file, const std::vector* points) final; + void getIndexFile (const OGRGeometry* geo, std::string& file) final; + void getIndexFile (const std::vector* points, std::string& file) final; bool findRasters (raster_finder_t* finder) final; /*-------------------------------------------------------------------- diff --git a/datasets/gebco/package/GebcoBathyRaster.cpp b/datasets/gebco/package/GebcoBathyRaster.cpp index a06e6c41..f466eb5a 100644 --- a/datasets/gebco/package/GebcoBathyRaster.cpp +++ b/datasets/gebco/package/GebcoBathyRaster.cpp @@ -83,14 +83,22 @@ GebcoBathyRaster::~GebcoBathyRaster(void) = default; /*---------------------------------------------------------------------------- * getIndexFile *----------------------------------------------------------------------------*/ -void GebcoBathyRaster::getIndexFile(const OGRGeometry* geo, std::string& file, const std::vector* points) +void GebcoBathyRaster::getIndexFile(const OGRGeometry* geo, std::string& file) { static_cast(geo); - static_cast(points); file = filePath + "/" + indexFile; mlog(DEBUG, "Using index file: %s", file.c_str()); } +/*---------------------------------------------------------------------------- + * getIndexFile + *----------------------------------------------------------------------------*/ +void GebcoBathyRaster::getIndexFile(const std::vector* points, std::string& file) +{ + static_cast(points); + file = filePath + "/" + indexFile; + mlog(DEBUG, "Using index file: %s", file.c_str()); +} /*---------------------------------------------------------------------------- * findRasters diff --git a/datasets/gebco/package/GebcoBathyRaster.h b/datasets/gebco/package/GebcoBathyRaster.h index 5b2f5427..d4c45907 100644 --- a/datasets/gebco/package/GebcoBathyRaster.h +++ b/datasets/gebco/package/GebcoBathyRaster.h @@ -71,7 +71,8 @@ class GebcoBathyRaster: public GeoIndexedRaster GebcoBathyRaster (lua_State* L, RequestFields* rqst_parms, const char* key); ~GebcoBathyRaster (void) override; - void getIndexFile (const OGRGeometry* geo, std::string& file, const std::vector* points) final; + void getIndexFile (const OGRGeometry* geo, std::string& file) final; + void getIndexFile (const std::vector* points, std::string& file) final; bool findRasters (raster_finder_t* finder) final; /*-------------------------------------------------------------------- diff --git a/datasets/landsat/package/LandsatHlsRaster.cpp b/datasets/landsat/package/LandsatHlsRaster.cpp index 78de2718..5169677d 100644 --- a/datasets/landsat/package/LandsatHlsRaster.cpp +++ b/datasets/landsat/package/LandsatHlsRaster.cpp @@ -143,9 +143,18 @@ LandsatHlsRaster::~LandsatHlsRaster(void) /*---------------------------------------------------------------------------- * getIndexFile *----------------------------------------------------------------------------*/ -void LandsatHlsRaster::getIndexFile(const OGRGeometry* geo, std::string& file, const std::vector* points) +void LandsatHlsRaster::getIndexFile(const OGRGeometry* geo, std::string& file) { static_cast(geo); + file = indexFile; + // mlog(DEBUG, "Using %s", file.c_str()); +} + +/*---------------------------------------------------------------------------- + * getIndexFile + *----------------------------------------------------------------------------*/ +void LandsatHlsRaster::getIndexFile(const std::vector* points, std::string& file) +{ static_cast(points); file = indexFile; // mlog(DEBUG, "Using %s", file.c_str()); diff --git a/datasets/landsat/package/LandsatHlsRaster.h b/datasets/landsat/package/LandsatHlsRaster.h index 2b27e2be..c4befb4e 100644 --- a/datasets/landsat/package/LandsatHlsRaster.h +++ b/datasets/landsat/package/LandsatHlsRaster.h @@ -91,10 +91,11 @@ class LandsatHlsRaster: public GeoIndexedRaster LandsatHlsRaster (lua_State* L, RequestFields* rqst_parms, const char* key); ~LandsatHlsRaster (void) override; - void getIndexFile (const OGRGeometry* geo, std::string& file, const std::vector* points) final; + void getIndexFile (const OGRGeometry* geo, std::string& file) final; + void getIndexFile (const std::vector* points, std::string& file) final; bool findRasters (raster_finder_t* finder) final; - void getGroupSamples (const rasters_group_t* rgroup, List& slist, uint32_t flags) final + void getSerialGroupSamples(const rasters_group_t* rgroup, List& slist, uint32_t flags) final { _getGroupSamples(SERIAL, rgroup, &slist, flags);} uint32_t getBatchGroupSamples(const rasters_group_t* rgroup, List* slist, uint32_t flags, uint32_t pointIndx) final diff --git a/datasets/pgc/package/PgcDemStripsRaster.cpp b/datasets/pgc/package/PgcDemStripsRaster.cpp index 68583c55..6a1bdd36 100644 --- a/datasets/pgc/package/PgcDemStripsRaster.cpp +++ b/datasets/pgc/package/PgcDemStripsRaster.cpp @@ -91,19 +91,17 @@ bool PgcDemStripsRaster::getFeatureDate(const OGRFeature* feature, TimeLib::gmt_ /*---------------------------------------------------------------------------- * getIndexFile *----------------------------------------------------------------------------*/ -void PgcDemStripsRaster::getIndexFile(const OGRGeometry* geo, std::string& file, const std::vector* points) +void PgcDemStripsRaster::getIndexFile(const OGRGeometry* geo, std::string& file) { - const OGRPolygon* poly = NULL; - - if(geo == NULL && points == NULL) + if(geo == NULL) { - mlog(ERROR, "Both geo and points are NULL"); + mlog(ERROR, "geo param is NULL"); ssErrors |= SS_INDEX_FILE_ERROR; return; } /* Determine if we have a point */ - if(geo && GdalRaster::ispoint(geo)) + if(GdalRaster::ispoint(geo)) { /* Only one index file for a point from one of the geocells */ const OGRPoint* poi = geo->toPoint(); @@ -111,14 +109,14 @@ void PgcDemStripsRaster::getIndexFile(const OGRGeometry* geo, std::string& file, return; } - /* Vector holding all geojson files from all geocells */ + /* Vector holding all geojson index files from all geocells */ std::vector files; /* Determine if we have a polygon */ - if(geo && GdalRaster::ispoly(geo)) + if(GdalRaster::ispoly(geo)) { OGREnvelope env; - poly = geo->toPolygon(); + const OGRPolygon* poly = geo->toPolygon(); poly->getEnvelope(&env); const double minx = floor(env.MinX); @@ -141,25 +139,69 @@ void PgcDemStripsRaster::getIndexFile(const OGRGeometry* geo, std::string& file, mlog(INFO, "Found %ld geojson files in polygon", files.size()); } - /* If we don't have a polygon but we have points get all files for the points */ - if(poly == NULL && points != NULL) + /* If we have only one file, use it as the index file */ + if(files.size() == 1) { - for(const auto& p : *points) + file = files[0]; + return; + } + + /* Combine all geojson files into a single file stored in vsimem */ + if(!combinedGeoJSON.empty()) + { + /* Remove previous combined geojson file */ + VSIUnlink(combinedGeoJSON.c_str()); + } + + combinedGeoJSON = "/vsimem/" + GdalRaster::getUUID() + "_combined.geojson"; + if(combineGeoJSONFiles(files)) + { + /* Set the combined geojson file as the index file */ + file = combinedGeoJSON; + } + else + { + mlog(ERROR, "Failed to combine geojson files"); + ssErrors |= SS_INDEX_FILE_ERROR; + } +} + +/*---------------------------------------------------------------------------- + * getIndexFile + *----------------------------------------------------------------------------*/ +void PgcDemStripsRaster::getIndexFile(const std::vector* points, std::string& file) +{ + if(points == NULL) + { + mlog(ERROR, "points param is NULL"); + ssErrors |= SS_INDEX_FILE_ERROR; + return; + } + + /* Get all geojson files for all points */ + std::vector files; + for(const auto& p : *points) + { + std::string newFile; + _getIndexFile(p.point.x, p.point.y, newFile); + if(!newFile.empty()) { - std::string newFile; - _getIndexFile(p.point.x, p.point.y, newFile); - if(!newFile.empty()) - { - files.push_back(newFile); - } + files.push_back(newFile); } - mlog(INFO, "Found %zu geojson files with %zu points", files.size(), points->size()); } + mlog(INFO, "Found %zu geojson files with %zu points", files.size(), points->size()); /* Remove any duplicate files */ std::sort(files.begin(), files.end()); files.erase(std::unique(files.begin(), files.end()), files.end()); + /* If we have only one file, use it as the index file */ + if(files.size() == 1) + { + file = files[0]; + return; + } + /* Combine all geojson files into a single file stored in vsimem */ if(!combinedGeoJSON.empty()) { diff --git a/datasets/pgc/package/PgcDemStripsRaster.h b/datasets/pgc/package/PgcDemStripsRaster.h index 6be3c262..2287a753 100644 --- a/datasets/pgc/package/PgcDemStripsRaster.h +++ b/datasets/pgc/package/PgcDemStripsRaster.h @@ -53,7 +53,8 @@ class PgcDemStripsRaster: public GeoIndexedRaster PgcDemStripsRaster (lua_State* L, RequestFields* rqst_parms, const char* key, const char* dem_name, const char* geo_suffix, GdalRaster::overrideCRS_t cb); ~PgcDemStripsRaster (void) override; bool getFeatureDate (const OGRFeature* feature, TimeLib::gmt_time_t& gmtDate) final; - void getIndexFile (const OGRGeometry* geo, std::string& file, const std::vector* points) final; + void getIndexFile (const OGRGeometry* geo, std::string& file) final; + void getIndexFile (const std::vector* points, std::string& file) final; bool findRasters (raster_finder_t* finder) final; private: diff --git a/datasets/usgs3dep/package/Usgs3dep1meterDemRaster.cpp b/datasets/usgs3dep/package/Usgs3dep1meterDemRaster.cpp index 7aaa5770..740795b0 100644 --- a/datasets/usgs3dep/package/Usgs3dep1meterDemRaster.cpp +++ b/datasets/usgs3dep/package/Usgs3dep1meterDemRaster.cpp @@ -82,14 +82,22 @@ Usgs3dep1meterDemRaster::~Usgs3dep1meterDemRaster(void) /*---------------------------------------------------------------------------- * getIndexFile *----------------------------------------------------------------------------*/ -void Usgs3dep1meterDemRaster::getIndexFile(const OGRGeometry* geo, std::string& file, const std::vector* points) +void Usgs3dep1meterDemRaster::getIndexFile(const OGRGeometry* geo, std::string& file) { static_cast(geo); - static_cast(points); file = indexFile; mlog(DEBUG, "Using %s", file.c_str()); } +/*---------------------------------------------------------------------------- + * getIndexFile + *----------------------------------------------------------------------------*/ +void Usgs3dep1meterDemRaster::getIndexFile(const std::vector* points, std::string& file) +{ + static_cast(points); + file = indexFile; + mlog(DEBUG, "Using %s", file.c_str()); +} /*---------------------------------------------------------------------------- * findRasters diff --git a/datasets/usgs3dep/package/Usgs3dep1meterDemRaster.h b/datasets/usgs3dep/package/Usgs3dep1meterDemRaster.h index 511c0976..af737a0c 100644 --- a/datasets/usgs3dep/package/Usgs3dep1meterDemRaster.h +++ b/datasets/usgs3dep/package/Usgs3dep1meterDemRaster.h @@ -69,7 +69,8 @@ class Usgs3dep1meterDemRaster: public GeoIndexedRaster Usgs3dep1meterDemRaster (lua_State* L, RequestFields* rqst_parms, const char* key); ~Usgs3dep1meterDemRaster (void) override; - void getIndexFile (const OGRGeometry* geo, std::string& file, const std::vector* points) final; + void getIndexFile (const OGRGeometry* geo, std::string& file) final; + void getIndexFile (const std::vector* points, std::string& file) final; bool findRasters (raster_finder_t* finder) final; static OGRErr overrideTargetCRS(OGRSpatialReference& target); diff --git a/packages/geo/CMakeLists.txt b/packages/geo/CMakeLists.txt index 88a6a5ad..70a464c2 100644 --- a/packages/geo/CMakeLists.txt +++ b/packages/geo/CMakeLists.txt @@ -31,6 +31,8 @@ if (GDAL_FOUND AND PROJ_FOUND AND TIFF_FOUND) ${CMAKE_CURRENT_LIST_DIR}/GdalRaster.cpp ${CMAKE_CURRENT_LIST_DIR}/GeoRaster.cpp ${CMAKE_CURRENT_LIST_DIR}/GeoIndexedRaster.cpp + ${CMAKE_CURRENT_LIST_DIR}/GeoIndexedRasterSerial.cpp + ${CMAKE_CURRENT_LIST_DIR}/GeoIndexedRasterBatch.cpp ${CMAKE_CURRENT_LIST_DIR}/GeoJsonRaster.cpp ${CMAKE_CURRENT_LIST_DIR}/GeoUserRaster.cpp ${CMAKE_CURRENT_LIST_DIR}/RasterObject.cpp diff --git a/packages/geo/GeoIndexedRaster.cpp b/packages/geo/GeoIndexedRaster.cpp index b2ace1d4..172f710e 100644 --- a/packages/geo/GeoIndexedRaster.cpp +++ b/packages/geo/GeoIndexedRaster.cpp @@ -36,20 +36,11 @@ #include "GeoRaster.h" #include "GeoIndexedRaster.h" -#include -#include -#include -#include -#include -#include - - /****************************************************************************** * STATIC DATA ******************************************************************************/ const double GeoIndexedRaster::TOLERANCE = 0.01; /* Tolerance for simplification */ - const char* GeoIndexedRaster::FLAGS_TAG = "Fmask"; const char* GeoIndexedRaster::VALUE_TAG = "Value"; const char* GeoIndexedRaster::DATE_TAG = "datetime"; @@ -58,66 +49,6 @@ const char* GeoIndexedRaster::DATE_TAG = "datetime"; * PUBLIC METHODS ******************************************************************************/ -/*---------------------------------------------------------------------------- - * PointSample Constructor - *----------------------------------------------------------------------------*/ -GeoIndexedRaster::PointSample::PointSample(const OGRPoint& _point, int64_t _pointIndex): - point(_point), pointIndex(_pointIndex), ssErrors(SS_NO_ERRORS) {} - - -/*---------------------------------------------------------------------------- - * PointSample Copy Constructor - *----------------------------------------------------------------------------*/ -GeoIndexedRaster::PointSample::PointSample(const PointSample& ps): - point(ps.point), pointIndex(ps.pointIndex), bandSample(ps.bandSample), ssErrors(ps.ssErrors) -{ - bandSampleReturned.resize(ps.bandSampleReturned.size()); - - for (size_t i = 0; i < ps.bandSampleReturned.size(); ++i) - { - if(ps.bandSampleReturned[i]) - { - /* Create a new atomic with the value loaded from the original */ - bandSampleReturned[i] = std::make_unique>(ps.bandSampleReturned[i]->load()); - } - else - { - bandSampleReturned[i] = NULL; - } - } -} - -/*---------------------------------------------------------------------------- - * Reader Constructor - *----------------------------------------------------------------------------*/ -GeoIndexedRaster::Reader::Reader (GeoIndexedRaster* _obj): - obj(_obj), - geo(NULL), - entry(NULL), - sync(NUM_SYNC_SIGNALS), - run(true) -{ - thread = new Thread(readerThread, this); -} - -/*---------------------------------------------------------------------------- - * Reader Destructor - *----------------------------------------------------------------------------*/ -GeoIndexedRaster::Reader::~Reader (void) -{ - sync.lock(); - { - run = false; /* Set run flag to false */ - sync.signal(DATA_TO_SAMPLE, Cond::NOTIFY_ONE); - } - sync.unlock(); - - delete thread; /* delete thread waits on thread to join */ - - /* geometry geo is cloned not 'newed' on GDAL heap. Use this call to free it */ - OGRGeometryFactory::destroyGeometry(geo); -} - /*---------------------------------------------------------------------------- * RasterFinder Constructor *----------------------------------------------------------------------------*/ @@ -130,27 +61,6 @@ GeoIndexedRaster::RasterFinder::RasterFinder (const OGRGeometry* _geo, { } -/*---------------------------------------------------------------------------- - * SampleCollector Constructor - *----------------------------------------------------------------------------*/ -GeoIndexedRaster::SampleCollector::SampleCollector(GeoIndexedRaster* _obj, const std::vector& _pointsGroups): - obj(_obj), - pGroupsRange({0, 0}), - pointsGroups(_pointsGroups), - ssErrors(SS_NO_ERRORS) -{ -} - -/*---------------------------------------------------------------------------- - * GroupsFinder Constructor - *----------------------------------------------------------------------------*/ -GeoIndexedRaster::GroupsFinder::GroupsFinder(GeoIndexedRaster* _obj, const std::vector* _points): - obj(_obj), - pointsRange({0, 0}), - points(_points) -{ -} - /*---------------------------------------------------------------------------- * init *----------------------------------------------------------------------------*/ @@ -166,238 +76,11 @@ void GeoIndexedRaster::deinit (void) } -/*---------------------------------------------------------------------------- - * getSamples - serial sampling - *----------------------------------------------------------------------------*/ -uint32_t GeoIndexedRaster::getSamples(const MathLib::point_3d_t& point, int64_t gps, List& slist, void* param) -{ - static_cast(param); - - lockSampling(); - try - { - GroupOrdering groupList; - OGRPoint ogrPoint(point.x, point.y, point.z); - - ssErrors = SS_NO_ERRORS; - - /* Sample Rasters */ - if(sample(&ogrPoint, gps, &groupList)) - { - /* Populate Return List of Samples (slist) */ - const GroupOrdering::Iterator iter(groupList); - for(int i = 0; i < iter.length; i++) - { - const rasters_group_t* rgroup = iter[i].value; - uint32_t flags = 0; - - /* Get flags value for this group of rasters */ - if(parms->flags_file) - flags = getGroupFlags(rgroup); - - getGroupSamples(rgroup, slist, flags); - } - } - - /* Update file dictionary */ - fileDictSetSamples(&slist); - } - catch (const RunTimeException &e) - { - mlog(e.level(), "Error getting samples: %s", e.what()); - } - - /* Free Unreturned Results */ - cacheitem_t* item; - const char* key = cache.first(&item); - while(key != NULL) - { - for(uint32_t i = 0; i < item->bandSample.size(); i++) - { - if(item->bandSample[i]) - { - delete item->bandSample[i]; - item->bandSample[i] = NULL; - } - } - - for(uint32_t i = 0; i < item->bandSubset.size(); i++) - { - if(item->bandSubset[i]) - { - delete item->bandSubset[i]; - item->bandSubset[i] = NULL; - } - } - key = cache.next(&item); - } - unlockSampling(); - - return ssErrors; -} - -/*---------------------------------------------------------------------------- - * getSamples - batch sampling - *----------------------------------------------------------------------------*/ -uint32_t GeoIndexedRaster::getSamples(const std::vector& points, List& sllist, void* param) -{ - static_cast(param); - - lockSampling(); - - perfStats.clear(); - cache.clear(); /* Clear cache used by serial sampling */ - fileDict.clear(); /* Start with empty file dictionary */ - - /* Vector of points and their associated raster groups */ - std::vector pointsGroups; - - /* Vector of rasters and all points they contain */ - std::vector uniqueRasters; - - try - { - ssErrors = SS_NO_ERRORS; - - /* Open the index file for all points */ - if(!openGeoIndex(NULL, &points)) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Error opening index file"); - } - - { - /* Rasters to points map */ - raster_points_map_t rasterToPointsMap; - - /* For all points create a vector of raster group lists */ - if(!findAllGroups(&points, pointsGroups, rasterToPointsMap)) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Error creating groups"); - } - - /* For all points create a vector of unique rasters */ - if(!findUniqueRasters(uniqueRasters, pointsGroups, rasterToPointsMap)) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Error finding unique rasters"); - } - - /* rastersToPointsMap is no longer needed */ - } - - /* Sample all unique rasters */ - if(!sampleUniqueRasters(uniqueRasters)) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Error sampling unique rasters"); - } - - /* Populate sllist with samples */ - if(!collectSamples(pointsGroups, sllist)) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Error collecting samples"); - } - } - catch (const RunTimeException &e) - { - mlog(e.level(), "Error getting samples: %s", e.what()); - } - - /* Clean up pointsGroups */ - for(const point_groups_t& pg : pointsGroups) - delete pg.groupList; - - /* Clean up unique rasters */ - for(unique_raster_t* ur : uniqueRasters) - { - for(const point_sample_t& ps : ur->pointSamples) - { - /* Delete samples which have not been returned (quality masks, etc) */ - for(size_t i = 0; i < ps.bandSample.size(); i++) - { - if(!ps.bandSampleReturned[i]->load()) - { - delete ps.bandSample[i]; - } - } - } - - delete ur; - } - - unlockSampling(); - - /* Print performance stats */ - perfStats.log(INFO); - - return ssErrors; -} - - -/*---------------------------------------------------------------------------- - * getSubset - *----------------------------------------------------------------------------*/ -uint32_t GeoIndexedRaster::getSubsets(const MathLib::extent_t& extent, int64_t gps, List& slist, void* param) -{ - static_cast(param); - - lockSampling(); - try - { - GroupOrdering groupList; - OGRPolygon poly = GdalRaster::makeRectangle(extent.ll.x, extent.ll.y, extent.ur.x, extent.ur.y); - ssErrors = SS_NO_ERRORS; - - /* Sample Subsets */ - if(sample(&poly, gps, &groupList)) - { - /* Populate Return Vector of Subsets (slist) */ - const GroupOrdering::Iterator iter(groupList); - for(int i = 0; i < iter.length; i++) - { - const rasters_group_t* rgroup = iter[i].value; - getGroupSubsets(rgroup, slist); - } - } - } - catch (const RunTimeException &e) - { - mlog(e.level(), "Error subsetting raster: %s", e.what()); - } - unlockSampling(); - - return ssErrors; -} /****************************************************************************** * PROTECTED METHODS ******************************************************************************/ -/*---------------------------------------------------------------------------- - * BatchReader Constructor - *----------------------------------------------------------------------------*/ -GeoIndexedRaster::BatchReader::BatchReader(GeoIndexedRaster* _obj) : - obj(_obj), - uraster(NULL), - sync(NUM_SYNC_SIGNALS), - run(true) -{ - thread = new Thread(GeoIndexedRaster::batchReaderThread, this); -} - -/*---------------------------------------------------------------------------- - * BatchReader Destructor - *----------------------------------------------------------------------------*/ -GeoIndexedRaster::BatchReader::~BatchReader(void) -{ - sync.lock(); - { - run = false; /* Set run flag to false */ - sync.signal(DATA_TO_SAMPLE, Cond::NOTIFY_ONE); - } - sync.unlock(); - - delete thread; /* delete thread waits on thread to join */ -} - /*---------------------------------------------------------------------------- * Constructor *----------------------------------------------------------------------------*/ @@ -421,201 +104,6 @@ GeoIndexedRaster::GeoIndexedRaster(lua_State *L, RequestFields* rqst_parms, cons } -/*---------------------------------------------------------------------------- - * getBatchGroupSamples - *----------------------------------------------------------------------------*/ -uint32_t GeoIndexedRaster::getBatchGroupSamples(const rasters_group_t* rgroup, List* slist, uint32_t flags, uint32_t pointIndx) -{ - uint32_t errors = SS_NO_ERRORS; - - for(const auto& rinfo: rgroup->infovect) - { - if(strcmp(VALUE_TAG, rinfo.tag.c_str()) != 0) continue; - - /* This is the unique raster we are looking for, it cannot be NULL */ - unique_raster_t* ur = rinfo.uraster; - assert(ur); - - /* Get the sample for this point from unique raster */ - for(point_sample_t& ps : ur->pointSamples) - { - if(ps.pointIndex == pointIndx) - { - for(size_t i = 0; i < ps.bandSample.size(); i++) - { - /* sample can be NULL if raster read failed, (e.g. point out of bounds) */ - if(ps.bandSample[i] == NULL) continue;; - - RasterSample* sample; - if(!ps.bandSampleReturned[i]->exchange(true)) - { - sample = ps.bandSample[i]; - } - else - { - /* Sample has already been returned, must create a copy */ - sample = new RasterSample(*ps.bandSample[i]); - } - - /* Set time for this sample */ - sample->time = rgroup->gpsTime; - - /* Set flags for this sample, add it to the list */ - sample->flags = flags; - slist->add(sample); - errors |= ps.ssErrors; - } - - /* - * This function assumes that there is only one raster with VALUE_TAG in a group. - * If group has other value rasters the dataset must override this function. - */ - return errors; - } - } - } - - return errors; -} - -/*---------------------------------------------------------------------------- - * getBatchGroupFlags - *----------------------------------------------------------------------------*/ -uint32_t GeoIndexedRaster::getBatchGroupFlags(const rasters_group_t* rgroup, uint32_t pointIndx) -{ - for(const auto& rinfo: rgroup->infovect) - { - if(strcmp(FLAGS_TAG, rinfo.tag.c_str()) != 0) continue; - - /* This is the unique raster we are looking for, it cannot be NULL */ - unique_raster_t* ur = rinfo.uraster; - assert(ur); - - /* Get the sample for this point from unique raster */ - for(const point_sample_t& ps : ur->pointSamples) - { - if(ps.pointIndex == pointIndx) - { - /* - * This function assumes that there is only one raster with FLAGS_TAG in a group. - * The flags value must be in the first band. - * If these assumptions are not met the dataset must override this function. - */ - RasterSample* sample = NULL; - - /* bandSample can be empty if raster failed to open */ - if(!ps.bandSample.empty()) - { - sample = ps.bandSample[0]; - } - - - /* sample can be NULL if raster read failed, (e.g. point out of bounds) */ - if(sample == NULL) break;; - - return sample->value; - } - } - } - - return 0; -} - - -/*---------------------------------------------------------------------------- - * getGroupSamples - *----------------------------------------------------------------------------*/ -void GeoIndexedRaster::getGroupSamples(const rasters_group_t* rgroup, List& slist, uint32_t flags) -{ - for(const auto& rinfo: rgroup->infovect) - { - if(strcmp(VALUE_TAG, rinfo.tag.c_str()) != 0) continue; - - const char* key = fileDict.get(rinfo.fileId); - cacheitem_t* item; - if(cache.find(key, &item)) - { - for(uint32_t i = 0; i < item->bandSample.size(); i++) - { - RasterSample* _sample = item->bandSample[i]; - if(_sample) - { - _sample->flags = flags; - slist.add(_sample); - item->bandSample[i] = NULL; - } - } - - /* Get sampling/subset error status */ - ssErrors |= item->raster->getSSerror(); - - /* - * This function assumes that there is only one raster with VALUE_TAG in a group. - * If group has other value rasters the dataset must override this function. - */ - break; - } - } -} - - -/*---------------------------------------------------------------------------- - * getGroupSubsets - *----------------------------------------------------------------------------*/ -void GeoIndexedRaster::getGroupSubsets(const rasters_group_t* rgroup, List& slist) -{ - for(const auto& rinfo: rgroup->infovect) - { - const char* key = fileDict.get(rinfo.fileId); - cacheitem_t* item; - if(cache.find(key, &item)) - { - for(uint32_t i = 0; i < item->bandSubset.size(); i++) - { - RasterSubset* subset = item->bandSubset[i]; - if(subset) - { - slist.add(subset); - item->bandSubset[i] = NULL; - } - } - - /* Get sampling/subset error status */ - ssErrors |= item->raster->getSSerror(); - } - } -} - - -/*---------------------------------------------------------------------------- - * getGroupFlags - *----------------------------------------------------------------------------*/ -uint32_t GeoIndexedRaster::getGroupFlags(const rasters_group_t* rgroup) -{ - for(const auto& rinfo: rgroup->infovect) - { - if(strcmp(FLAGS_TAG, rinfo.tag.c_str()) != 0) continue; - - cacheitem_t* item; - const char* key = fileDict.get(rinfo.fileId); - if(cache.find(key, &item) && !item->bandSample.empty()) - { - /* - * This function assumes that there is only one raster with FLAGS_TAG in a group. - * The flags value must be in the first band. - * If these assumptions are not met the dataset must override this function. - */ - const RasterSample* _sample = item->bandSample[0]; - if(_sample) - { - return _sample->value; - } - } - } - - return 0; -} - /*---------------------------------------------------------------------------- * getGmtDate *----------------------------------------------------------------------------*/ @@ -675,11 +163,8 @@ bool GeoIndexedRaster::getFeatureDate(const OGRFeature* feature, TimeLib::gmt_ti /*---------------------------------------------------------------------------- * openGeoIndex *----------------------------------------------------------------------------*/ -bool GeoIndexedRaster::openGeoIndex(const OGRGeometry* geo, const std::vector* points) +bool GeoIndexedRaster::openGeoIndex(const std::string& newFile, OGRGeometry* filter) { - std::string newFile; - getIndexFile(geo, newFile, points); - /* Trying to open the same file? */ if(!geoRtree.empty() && newFile == indexFile) return true; @@ -701,10 +186,9 @@ bool GeoIndexedRaster::openGeoIndex(const OGRGeometry* geo, const std::vectorGetLayer(0); CHECKPTR(layer); - /* If caller provided points, use spatial filter to reduce number of features */ - if(points) + if(filter) { - applySpatialFilter(layer, points); + applySpatialFilter(layer, filter); } /* @@ -761,103 +245,17 @@ bool GeoIndexedRaster::openGeoIndex(const OGRGeometry* geo, const std::vector rows, cols *----------------------------------------------------------------------------*/ -void GeoIndexedRaster::sampleRasters(OGRGeometry* geo) +int GeoIndexedRaster::luaDimensions(lua_State *L) { - /* For each raster which is marked to be sampled, give it to the reader thread */ - int signaledReaders = 0; - int i = 0; - cacheitem_t* item; - - const char* key = cache.first(&item); - while(key != NULL) - { - if(item->enabled) - { - reader_t* reader = readers[i++]; - reader->sync.lock(); - { - reader->entry = item; - OGRGeometryFactory::destroyGeometry(reader->geo); - reader->geo = geo->clone(); - reader->sync.signal(DATA_TO_SAMPLE, Cond::NOTIFY_ONE); - signaledReaders++; - } - reader->sync.unlock(); - } - key = cache.next(&item); - } - - /* Wait for readers to finish sampling */ - for(int j = 0; j < signaledReaders; j++) - { - reader_t* reader = readers[j]; - reader->sync.lock(); - { - while(reader->entry != NULL) - reader->sync.wait(DATA_SAMPLED, SYS_TIMEOUT); - } - reader->sync.unlock(); - } -} - - -/*---------------------------------------------------------------------------- - * sample - *----------------------------------------------------------------------------*/ -bool GeoIndexedRaster::sample(OGRGeometry* geo, int64_t gps_secs, GroupOrdering* groupList) -{ - /* Open the index file, if not already open */ - if(!openGeoIndex(geo, NULL)) - return false; - - /* Find rasters that intersect with the geometry */ - std::vector foundFeatures; - - /* Query the R-tree with the OGRPoint and get the result features */ - geoRtree.query(geo, foundFeatures); - - raster_finder_t finder(geo, &foundFeatures, fileDict); - if(!findRasters(&finder)) - return false; - - /* Copy finder's raster groups to groupList */ - for(uint32_t j = 0; j < finder.rasterGroups.size(); j++) - { - rasters_group_t* rgroup = finder.rasterGroups[j]; - groupList->add(groupList->length(), rgroup); - } - - if(!filterRasters(gps_secs, groupList, fileDict)) - return false; - - uint32_t rasters2sample = 0; - if(!updateCache(rasters2sample, groupList)) - return false; - - /* Create additional reader threads if needed */ - if(!createReaderThreads(rasters2sample)) - return false; - - sampleRasters(geo); - - return true; -} - - -/****************************************************************************** - * PRIVATE METHODS - ******************************************************************************/ - -/*---------------------------------------------------------------------------- - * luaDimensions - :dim() --> rows, cols - *----------------------------------------------------------------------------*/ -int GeoIndexedRaster::luaDimensions(lua_State *L) -{ - bool status = false; - int num_ret = 1; + bool status = false; + int num_ret = 1; try { @@ -942,481 +340,6 @@ int GeoIndexedRaster::luaCellSize(lua_State *L) return returnLuaStatus(L, status, num_ret); } -/*---------------------------------------------------------------------------- - * readerThread - *----------------------------------------------------------------------------*/ -void* GeoIndexedRaster::readerThread(void *param) -{ - reader_t *reader = static_cast(param); - - while(reader->run) - { - reader->sync.lock(); - { - /* Wait for raster to work on */ - while((reader->entry == NULL) && reader->run) - reader->sync.wait(DATA_TO_SAMPLE, SYS_TIMEOUT); - } - reader->sync.unlock(); - - cacheitem_t* entry = reader->entry; - if(entry != NULL) - { - GdalRaster* raster = entry->raster; - - try - { - CHECKPTR(raster); - - /* Open raster so we can get inner bands from it */ - raster->open(); - - std::vector bands; - reader->obj->getInnerBands(raster, bands); - - if(GdalRaster::ispoint(reader->geo)) - { - /* Sample raster bands */ - const bool oneBand = bands.size() == 1; - if(oneBand) - { - OGRPoint* p = (dynamic_cast(reader->geo)); - RasterSample* sample = raster->samplePOI(p, bands[0]); - entry->bandSample.push_back(sample); - } - else - { - /* Multiple bands */ - for(const int bandNum : bands) - { - /* Use local copy of point, it will be projected in samplePOI. We do not want to project it again */ - OGRPoint* p = (dynamic_cast(reader->geo)); - OGRPoint point(*p); - - RasterSample* sample = raster->samplePOI(&point, bandNum); - entry->bandSample.push_back(sample); - mlog(DEBUG, "Band: %d, %s", bandNum, sample ? sample->toString().c_str() : "NULL"); - } - } - } - else if(GdalRaster::ispoly(reader->geo)) - { - /* Subset raster bands */ - for(const int bandNum : bands) - { - /* No need to use local copy of polygon, subsetAOI will use it's envelope and not project it */ - RasterSubset* subset = raster->subsetAOI(dynamic_cast(reader->geo), bandNum); - if(subset) - { - /* - * Create new GeoRaster object for subsetted raster - * Use NULL for LuaState, using parent's causes memory corruption - * NOTE: cannot use RasterObject::cppCreate(parms) here, it would create - * new GeoIndexRaster with the same file path as parent raster. - */ - subset->robj = new GeoRaster(NULL, - reader->obj->rqstParms, - reader->obj->samplerKey, - subset->rasterName, - raster->getGpsTime(), - raster->getElevationBandNum(), - raster->getFLagsBandNum(), - raster->getOverrideCRS()); - - entry->bandSubset.push_back(subset); - - /* RequestFields are shared with subsseted raster and other readers */ - GeoIndexedRaster::referenceLuaObject(reader->obj->rqstParms); - } - } - } - } - catch(const RunTimeException& e) - { - mlog(e.level(), "%s", e.what()); - } - - entry->enabled = false; /* raster samples/subsetted */ - - reader->sync.lock(); - { - reader->entry = NULL; /* Done with this raster */ - reader->sync.signal(DATA_SAMPLED, Cond::NOTIFY_ONE); - } - reader->sync.unlock(); - } - } - - return NULL; -} - -/*---------------------------------------------------------------------------- - * batchReaderThread - *----------------------------------------------------------------------------*/ -void* GeoIndexedRaster::batchReaderThread(void *param) -{ - batch_reader_t *breader = static_cast(param); - - while(breader->run) - { - breader->sync.lock(); - { - /* Wait for raster to work on */ - while((breader->uraster == NULL) && breader->run) - breader->sync.wait(DATA_TO_SAMPLE, SYS_TIMEOUT); - } - breader->sync.unlock(); - - if(breader->uraster != NULL) - { - unique_raster_t* ur = breader->uraster; - GdalRaster* raster = NULL; - - try - { - raster = new GdalRaster(breader->obj->parms, - breader->obj->fileDict.get(ur->rinfo->fileId), - 0, /* Sample collecting code will set it to group's gpsTime */ - ur->rinfo->fileId, - ur->rinfo->elevationBandNum, - ur->rinfo->flagsBandNum, - breader->obj->crscb); - - CHECKPTR(raster); - - /* Open raster so we can get inner bands from it */ - raster->open(); - - std::vector bands; - breader->obj->getInnerBands(raster, bands); - - /* Sample all points for this raster */ - for(point_sample_t& ps : ur->pointSamples) - { - /* Sample raster bands */ - const bool oneBand = bands.size() == 1; - if(oneBand) - { - RasterSample* sample = raster->samplePOI(&ps.point, bands[0]); - ps.bandSample.push_back(sample); - ps.bandSampleReturned.emplace_back(std::make_unique>(false)); - } - else - { - /* Multiple bands */ - for(const int bandNum : bands) - { - /* Use local copy of point, it will be projected in samplePOI. We do not want to project it again */ - OGRPoint point(ps.point); - - RasterSample* sample = raster->samplePOI(&point, bandNum); - ps.bandSample.push_back(sample); - ps.bandSampleReturned.emplace_back(std::make_unique>(false)); - ps.ssErrors |= raster->getSSerror(); - } - } - } - } - catch(const RunTimeException& e) - { - mlog(e.level(), "%s", e.what()); - } - - delete raster; - breader->sync.lock(); - { - breader->uraster = NULL; /* Done with this raster and all points */ - breader->sync.signal(DATA_SAMPLED, Cond::NOTIFY_ONE); - } - breader->sync.unlock(); - } - } - - return NULL; -} - -/*---------------------------------------------------------------------------- - * groupsFinderThread - *----------------------------------------------------------------------------*/ -void* GeoIndexedRaster::groupsFinderThread(void *param) -{ - groups_finder_t* gf = static_cast(param); - - /* Thread must initialize GEOS context */ - GEOSContextHandle_t threadGeosContext = GeoRtree::init(); - - const uint32_t start = gf->pointsRange.start; - const uint32_t end = gf->pointsRange.end; - - mlog(DEBUG, "Finding groups for points range: %u - %u", start, end); - - for(uint32_t i = start; i < end; i++) - { - if(!gf->obj->sampling()) - { - mlog(WARNING, "Sampling has been stopped, exiting groups finder thread"); - break; - } - - const point_info_t& pinfo = gf->points->at(i); - const OGRPoint ogrPoint(pinfo.point.x, pinfo.point.y, pinfo.point.z); - - /* Query the R-tree with the OGRPoint and get the result features */ - std::vector foundFeatures; - gf->obj->geoRtree.query(&ogrPoint, threadGeosContext, foundFeatures); - // mlog(DEBUG, "Found %zu features for point %u", foundFeatures.size(), i); - - /* Clone found features since OGRFeature is not thread safe */ - std::vector threadFeatures; - threadFeatures.reserve(foundFeatures.size()); - - for(OGRFeature* feature : foundFeatures) - { - threadFeatures.push_back(feature->Clone()); - } - - /* Set finder for the found features */ - RasterFinder finder(&ogrPoint, &threadFeatures, gf->threadFileDict); - - /* Find rasters intersecting with ogrPoint */ - gf->obj->findRasters(&finder); - - /* Destroy cloned features */ - for(OGRFeature* feature : threadFeatures) - { - OGRFeature::DestroyFeature(feature); - } - - /* Copy rasterGroups from finder to local groupList */ - GroupOrdering* groupList = new GroupOrdering(); - for(rasters_group_t* rgroup : finder.rasterGroups) - { - groupList->add(groupList->length(), rgroup); - } - - /* Filter rasters based on POI time */ - const int64_t gps = gf->obj->usePOItime() ? pinfo.gps : 0.0; - gf->obj->filterRasters(gps, groupList, gf->threadFileDict); - - /* Add found rasters which passed the filter to pointsGroups */ - gf->pointsGroups.emplace_back(point_groups_t{ogrPoint, i, groupList}); - - /* Add raster file names from this groupList to raster to points map */ - const GroupOrdering::Iterator iter(*groupList); - for(int j = 0; j < iter.length; j++) - { - const rasters_group_t* rgroup = iter[j].value; - for(const raster_info_t& rinfo : rgroup->infovect) - { - const char* fileName = gf->threadFileDict.get(rinfo.fileId); - gf->rasterToPointsMap[fileName].insert(i); - } - } - } - - mlog(DEBUG, "Found %zu point groups for range: %u - %u", gf->pointsGroups.size(), start, end); - - /* Thread must initialize GEOS context */ - GeoRtree::deinit(threadGeosContext); - - return NULL; -} - -/*---------------------------------------------------------------------------- - * samplesCollectThread - *----------------------------------------------------------------------------*/ -void* GeoIndexedRaster::samplesCollectThread(void* param) -{ - sample_collector_t* sc = static_cast(param); - - const uint32_t start = sc->pGroupsRange.start; - const uint32_t end = sc->pGroupsRange.end; - - mlog(DEBUG, "Collecting samples for range: %u - %u", start, end); - - u_int32_t numSamples = 0; - for(uint32_t pointIndx = start; pointIndx < end; pointIndx++) - { - if(!sc->obj->sampling()) - { - mlog(WARNING, "Sampling has been stopped, exiting samples collect thread"); - break; - } - - const point_groups_t& pg = sc->pointsGroups[pointIndx]; - - /* Allocate a new sample list for groupList */ - sample_list_t* slist = new sample_list_t(); - - const GroupOrdering::Iterator iter(*pg.groupList); - for(int i = 0; i < iter.length; i++) - { - const rasters_group_t* rgroup = iter[i].value; - uint32_t flags = 0; - - /* Get flags value for this group of rasters */ - if(sc->obj->parms->flags_file) - flags = getBatchGroupFlags(rgroup, pointIndx); - - sc->ssErrors |= sc->obj->getBatchGroupSamples(rgroup, slist, flags, pointIndx); - numSamples += slist->length(); - } - - sc->slvector.push_back(slist); - } - - mlog(DEBUG, "Collected %u samples for range: %u - %u", numSamples, start, end); - - return NULL; -} - - -/*---------------------------------------------------------------------------- - * createReaderThreads - *----------------------------------------------------------------------------*/ -bool GeoIndexedRaster::createReaderThreads(uint32_t rasters2sample) -{ - const int threadsNeeded = rasters2sample; - const int threadsNow = readers.length(); - const int newThreadsCnt = threadsNeeded - threadsNow; - - if(threadsNeeded <= threadsNow) - return true; - - try - { - for(int i = 0; i < newThreadsCnt; i++) - { - Reader* r = new Reader(this); - readers.add(r); - } - } - catch (const RunTimeException &e) - { - ssErrors |= SS_RESOURCE_LIMIT_ERROR; - mlog(CRITICAL, "Failed to create reader threads, needed: %d, created: %d", newThreadsCnt, readers.length() - threadsNow); - } - - return readers.length() == threadsNeeded; -} - -/*---------------------------------------------------------------------------- - * createBatchReaderThreads - *----------------------------------------------------------------------------*/ -bool GeoIndexedRaster::createBatchReaderThreads(uint32_t rasters2sample) -{ - const int threadsNeeded = rasters2sample; - const int threadsNow = batchReaders.length(); - const int newThreadsCnt = threadsNeeded - threadsNow; - - if(threadsNeeded <= threadsNow) - return true; - - try - { - for(int i = 0; i < newThreadsCnt; i++) - { - BatchReader* r = new BatchReader(this); - batchReaders.add(r); - } - } - catch (const RunTimeException &e) - { - ssErrors |= SS_RESOURCE_LIMIT_ERROR; - mlog(CRITICAL, "Failed to create batch reader threads"); - } - - return batchReaders.length() == threadsNeeded; -} - -/*---------------------------------------------------------------------------- - * updateCache - *----------------------------------------------------------------------------*/ -bool GeoIndexedRaster::updateCache(uint32_t& rasters2sample, const GroupOrdering* groupList) -{ - /* Mark all items in cache as not enabled */ - { - cacheitem_t* item; - const char* key = cache.first(&item); - while(key != NULL) - { - item->enabled = false; - key = cache.next(&item); - } - } - - /* Cache contains items/rasters from previous sample run */ - const GroupOrdering::Iterator group_iter(*groupList); - for(int i = 0; i < group_iter.length; i++) - { - const rasters_group_t* rgroup = group_iter[i].value; - for(const auto& rinfo : rgroup->infovect) - { - const char* key = fileDict.get(rinfo.fileId); - cacheitem_t* item; - const bool inCache = cache.find(key, &item); - if(!inCache) - { - /* Create new cache item with raster - note use of bbox in construcutor - it limits area - of interest to the extent of vector index file */ - item = new cacheitem_t; - item->raster = new GdalRaster(parms, - key, - static_cast(rgroup->gpsTime), - rinfo.fileId, - rinfo.elevationBandNum, - rinfo.flagsBandNum, - crscb, - &bbox); - const bool status = cache.add(key, item); - assert(status); (void)status; // cannot fail; prevents linter warnings - } - - /* Clear from previous run */ - item->bandSample.clear(); - item->bandSample.clear(); - - /* Mark as Enabled */ - item->enabled = true; - rasters2sample++; - } - } - - /* Maintain cache from getting too big. */ - if(cache.length() > MAX_CACHE_SIZE) - { - std::vector keys_to_remove; - { - cacheitem_t* item; - const char* key = cache.first(&item); - while(key != NULL) - { - if(!item->enabled) - { - keys_to_remove.push_back(key); - } - key = cache.next(&item); - } - } - - /* Remove cache items found above */ - for(const char* key : keys_to_remove) - { - cache.remove(key); - } - } - - /* Check for max limit of concurent reading raster threads */ - if(rasters2sample > MAX_READER_THREADS) - { - ssErrors |= SS_THREADS_LIMIT_ERROR; - mlog(ERROR, "Too many rasters to read: %d, max allowed: %d", cache.length(), MAX_READER_THREADS); - return false; - } - - return true; -} /*---------------------------------------------------------------------------- * filterRasters @@ -1482,7 +405,7 @@ bool GeoIndexedRaster::filterRasters(int64_t gps_secs, GroupOrdering* groupList, /* Caller provided gps time, use it insead of time from params */ closestGps = gps_secs; } - else if (parms->filter_closest_time) + else if(parms->filter_closest_time) { /* Params provided closest time */ closestGps = TimeLib::gmt2gpstime(parms->closest_time) / 1000; @@ -1521,51 +444,10 @@ bool GeoIndexedRaster::filterRasters(int64_t gps_secs, GroupOrdering* groupList, return (!groupList->empty()); } -/*---------------------------------------------------------------------------- - * getConvexHull - *----------------------------------------------------------------------------*/ -OGRGeometry* GeoIndexedRaster::getConvexHull(const std::vector* points) -{ - if(points->empty()) - return NULL; - - /* Create an empty geometry collection to hold all points */ - OGRGeometryCollection geometryCollection; - - mlog(INFO, "Creating convex hull from %zu points", points->size()); - - /* Collect all points into a geometry collection */ - for(const point_info_t& point_info : *points) - { - const double lon = point_info.point.x; - const double lat = point_info.point.y; - - OGRPoint* point = new OGRPoint(lon, lat); - geometryCollection.addGeometryDirectly(point); - } - - /* Create a convex hull that wraps around all the points */ - OGRGeometry* convexHull = geometryCollection.ConvexHull(); - if(convexHull == NULL) - { - mlog(ERROR, "Failed to create a convex hull around points."); - return NULL; - } - - /* Add a buffer around the convex hull to avoid missing edge points */ - OGRGeometry* bufferedConvexHull = convexHull->Buffer(TOLERANCE); - if(bufferedConvexHull) - { - OGRGeometryFactory::destroyGeometry(convexHull); - } - - return bufferedConvexHull; -} - /*---------------------------------------------------------------------------- * applySpatialFilter *----------------------------------------------------------------------------*/ -void GeoIndexedRaster::applySpatialFilter(OGRLayer* layer, const std::vector* points) +void GeoIndexedRaster::applySpatialFilter(OGRLayer* layer, OGRGeometry* filter) { mlog(INFO, "Features before spatial filter: %lld", layer->GetFeatureCount()); @@ -1574,369 +456,8 @@ void GeoIndexedRaster::applySpatialFilter(OGRLayer* layer, const std::vectorSetSpatialFilter(filter); - OGRGeometryFactory::destroyGeometry(filter); - } + layer->SetSpatialFilter(filter); perfStats.spatialFilterTime = TimeLib::latchtime() - startTime; - mlog(INFO, "Features after spatial filter: %lld", layer->GetFeatureCount()); mlog(DEBUG, "Spatial filter time: %.3lf", perfStats.spatialFilterTime); -} - -/*---------------------------------------------------------------------------- - * findAllGroups - *----------------------------------------------------------------------------*/ -bool GeoIndexedRaster::findAllGroups(const std::vector* points, - std::vector& pointsGroups, - raster_points_map_t& rasterToPointsMap) -{ - /* Do not find groups if sampling stopped */ - if(!sampling()) return true; - - bool status = false; - const double startTime = TimeLib::latchtime(); - - try - { - /* Start rasters groups finder threads */ - std::vector pids; - std::vector rgroupFinders; - - const uint32_t numMaxThreads = std::thread::hardware_concurrency(); - const uint32_t minPointsPerThread = 100; - - mlog(INFO, "Finding rasters groups for all points with %u threads", numMaxThreads); - - std::vector pointsRanges; - getThreadsRanges(pointsRanges, points->size(), minPointsPerThread, numMaxThreads); - const uint32_t numThreads = pointsRanges.size(); - - for(uint32_t i = 0; i < numThreads; i++) - { - GroupsFinder* gf = new GroupsFinder(this, points); - gf->pointsRange = pointsRanges[i]; - rgroupFinders.push_back(gf); - Thread* pid = new Thread(groupsFinderThread, gf); - pids.push_back(pid); - } - - /* Wait for all groups finder threads to finish */ - for(Thread* pid : pids) - { - delete pid; - } - - mlog(INFO, "All groups finders time: %lf", TimeLib::latchtime() - startTime); - - /* Merge the pointGroups from each thread */ - mlog(INFO, "Merging point groups from all threads"); - for(GroupsFinder* gf : rgroupFinders) - { - /* Threads used local file dictionary, combine them and update fileId */ - for(const point_groups_t& pg: gf->pointsGroups) - { - const GroupOrdering::Iterator iter(*pg.groupList); - for (int64_t i = 0; i < iter.length; i++) - { - rasters_group_t *rgroup = iter[i].value; - for (raster_info_t &rinfo : rgroup->infovect) - { - /* Get file from thread file dictionary */ - const std::string fileName = gf->threadFileDict.get(rinfo.fileId); - - /* Add to main file dictionary */ - rinfo.fileId = fileDict.add(fileName); - } - } - - pointsGroups.push_back(pg); - } - - /* Merge the rasterToPointsMap for each thread */ - for (const raster_points_map_t::value_type &pair : gf->rasterToPointsMap) - { - rasterToPointsMap[pair.first].insert(pair.second.begin(), pair.second.end()); - } - - delete gf; - } - - /* Verify that the number of points groups is the same as the number of points */ - if(pointsGroups.size() != points->size()) - { - mlog(ERROR, "Number of points groups: %zu does not match number of points: %zu", pointsGroups.size(), points->size()); - throw RunTimeException(CRITICAL, RTE_ERROR, "Number of points groups does not match number of points"); - } - - /* Reduce memory usage */ - pointsGroups.shrink_to_fit(); - rasterToPointsMap.rehash(rasterToPointsMap.size()); - - status = true; - } - catch (const RunTimeException &e) - { - mlog(e.level(), "Error creating groups: %s", e.what()); - } - - perfStats.findRastersTime = TimeLib::latchtime() - startTime; - return status; -} - - -/*---------------------------------------------------------------------------- - * findUniqueRasters - *----------------------------------------------------------------------------*/ -bool GeoIndexedRaster::findUniqueRasters(std::vector& uniqueRasters, - const std::vector& pointsGroups, - raster_points_map_t& rasterToPointsMap) -{ - /* Do not find unique rasters if sampling stopped */ - if(!sampling()) return true; - - bool status = false; - const double startTime = TimeLib::latchtime(); - - try - { - /* Map to track the index of each unique raster in the uniqueRasters vector */ - std::unordered_map fileIndexMap; - - /* Create vector of unique rasters. */ - mlog(DEBUG, "Finding unique rasters"); - for(const point_groups_t& pg : pointsGroups) - { - const GroupOrdering::Iterator iter(*pg.groupList); - for(int64_t i = 0; i < iter.length; i++) - { - rasters_group_t* rgroup = iter[i].value; - for(raster_info_t& rinfo : rgroup->infovect) - { - /* Is this raster already in the list of unique rasters? */ - const std::string fileName = fileDict.get(rinfo.fileId); - auto it = fileIndexMap.find(fileName); - if(it != fileIndexMap.end()) - { - /* Raster is already in the vector of unique rasters, get index from map and update uraster pointer */ - rinfo.uraster = uniqueRasters[it->second]; - } - else - { - /* Raster is not in the vector of unique rasters */ - unique_raster_t* ur = new unique_raster_t(&rinfo); - uniqueRasters.push_back(ur); - - /* Set pointer in rinfo to new unique raster */ - rinfo.uraster = ur; - - /* Update index map */ - fileIndexMap[fileName] = uniqueRasters.size() - 1; - } - } - } - } - - /* For each unique raster, find the points that belong to it */ - mlog(DEBUG, "Finding points for unique rasters"); - for(unique_raster_t* ur : uniqueRasters) - { - auto it = rasterToPointsMap.find(fileDict.get(ur->rinfo->fileId)); - if(it != rasterToPointsMap.end()) - { - for(const uint32_t pointIndx : it->second) - { - const point_groups_t& pg = pointsGroups[pointIndx]; - ur->pointSamples.emplace_back(pg.point, pg.pointIndex); - } - ur->pointSamples.shrink_to_fit(); - } - } - - /* Reduce memory usage */ - uniqueRasters.shrink_to_fit(); - status = true; - } - catch(const RunTimeException& e) - { - mlog(e.level(), "Error creating groups: %s", e.what()); - } - - perfStats.findUniqueRastersTime = TimeLib::latchtime() - startTime; - mlog(INFO, "Unique rasters time: %lf", perfStats.findUniqueRastersTime); - - return status; -} - - -/*---------------------------------------------------------------------------- - * sampleUniqueRasters - *----------------------------------------------------------------------------*/ -bool GeoIndexedRaster::sampleUniqueRasters(const std::vector& uniqueRasters) -{ - /* Do not sample rasters if sampling stopped */ - if(!sampling()) return true; - - bool status = false; - const double startTime = TimeLib::latchtime(); - - try - { - /* Testing has shown that 20 threads performs twice as fast on a 8 core system than 50 or 100 threads. */ - const uint32_t maxThreads = 20; - - /* Create batch reader threads */ - const uint32_t numRasters = uniqueRasters.size(); - createBatchReaderThreads(std::min(maxThreads, numRasters)); - - const uint32_t numThreads = batchReaders.length(); - mlog(INFO, "Sampling %u rasters with %u threads", numRasters, numThreads); - - /* Sample unique rasters utilizing numThreads */ - uint32_t currentRaster = 0; - - while(currentRaster < numRasters) - { - /* Calculate how many rasters we can process in this batch */ - const uint32_t batchSize = std::min(numThreads, numRasters - currentRaster); - - /* Keep track of how many threads have been assigned work */ - uint32_t activeReaders = 0; - - /* Assign rasters to batch readers as soon as they are free */ - while(currentRaster < numRasters || activeReaders > 0) - { - for(uint32_t i = 0; i < batchSize; i++) - { - BatchReader* breader = batchReaders[i]; - - breader->sync.lock(); - { - /* If this thread is done with its previous raster, assign a new one */ - if(breader->uraster == NULL && currentRaster < numRasters) - { - breader->uraster = uniqueRasters[currentRaster++]; - breader->sync.signal(DATA_TO_SAMPLE, Cond::NOTIFY_ONE); - activeReaders++; - } - } - breader->sync.unlock(); - - if(!sampling()) - { - /* Sampling has been stopped, stop assigning new rasters */ - activeReaders = 0; - currentRaster = numRasters; - break; - } - - /* Check if the current breader has completed its work */ - breader->sync.lock(); - { - if(breader->uraster == NULL && activeReaders > 0) - { - /* Mark one reader as free */ - activeReaders--; - } - } - breader->sync.unlock(); - } - - /* Short wait before checking again to avoid busy waiting */ - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - } - - /* Wait for all batch readers to finish sampling */ - for(int32_t i = 0; i < batchReaders.length(); i++) - { - BatchReader* breader = batchReaders[i]; - - breader->sync.lock(); - { - while(breader->uraster != NULL) - breader->sync.wait(DATA_SAMPLED, SYS_TIMEOUT); - } - breader->sync.unlock(); - } - - status = true; - } - catch(const RunTimeException& e) - { - mlog(e.level(), "Error creating groups: %s", e.what()); - } - - perfStats.samplesTime = TimeLib::latchtime() - startTime; - mlog(INFO, "Done Sampling, time: %lf", perfStats.samplesTime); - return status; -} - - -/*---------------------------------------------------------------------------- - * collectSamples - *----------------------------------------------------------------------------*/ -bool GeoIndexedRaster::collectSamples(const std::vector& pointsGroups, List& sllist) -{ - /* Do not collect samples if sampling stopped */ - if(!sampling()) return true; - - const double start = TimeLib::latchtime(); - - /* Sanity check for pointsGroups, internal pointIndex should be the same as the index in pointsGroups vector */ - assert(std::all_of(pointsGroups.cbegin(), pointsGroups.cend(), - [&pointsGroups](const point_groups_t& pg) { return pg.pointIndex == &pg - pointsGroups.data(); })); - - /* Start sample collection threads */ - std::vector pids; - std::vector sampleCollectors; - - const uint32_t numMaxThreads = std::thread::hardware_concurrency(); - const uint32_t minPointGroupsPerThread = 100; - - std::vector pGroupRanges; - getThreadsRanges(pGroupRanges, pointsGroups.size(), minPointGroupsPerThread, numMaxThreads); - const uint32_t numThreads = pGroupRanges.size(); - - mlog(INFO, "Collecting samples for %zu points with %u threads", pointsGroups.size(), numThreads); - - for(uint32_t i = 0; i < numThreads; i++) - { - SampleCollector* sc = new SampleCollector(this, pointsGroups); - sc->pGroupsRange = pGroupRanges[i]; - sampleCollectors.push_back(sc); - Thread* pid = new Thread(samplesCollectThread, sc); - pids.push_back(pid); - } - - /* Wait for all sample collection threads to finish */ - for(Thread* pid : pids) - { - delete pid; - } - - /* Merge sample lists from all sample collection threads */ - const double mergeStart = TimeLib::latchtime(); - mlog(DEBUG, "Merging sample lists"); - for(SampleCollector* sc : sampleCollectors) - { - const std::vector& slvector = sc->slvector; - for(sample_list_t* slist : slvector) - { - /* Update file dictionary */ - fileDictSetSamples(slist); - - sllist.add(slist); - } - ssErrors |= sc->ssErrors; - delete sc; - } - mlog(DEBUG, "Merged %d sample lists, time: %lf", sllist.length(), TimeLib::latchtime() - mergeStart); - - perfStats.collectSamplesTime = TimeLib::latchtime() - start; - mlog(INFO, "Populated sllist with %d lists of samples, time: %lf", sllist.length(), perfStats.collectSamplesTime); - - return true; } \ No newline at end of file diff --git a/packages/geo/GeoIndexedRaster.h b/packages/geo/GeoIndexedRaster.h index 81a27e9c..8eacc121 100644 --- a/packages/geo/GeoIndexedRaster.h +++ b/packages/geo/GeoIndexedRaster.h @@ -220,17 +220,18 @@ class GeoIndexedRaster: public RasterObject virtual uint32_t getBatchGroupSamples (const rasters_group_t* rgroup, List* slist, uint32_t flags, uint32_t pointIndx); static uint32_t getBatchGroupFlags (const rasters_group_t* rgroup, uint32_t pointIndx); - virtual void getGroupSamples (const rasters_group_t* rgroup, List& slist, uint32_t flags); - virtual void getGroupSubsets (const rasters_group_t* rgroup, List& slist); - uint32_t getGroupFlags (const rasters_group_t* rgroup); + virtual void getSerialGroupSamples (const rasters_group_t* rgroup, List& slist, uint32_t flags); + virtual void getSerialGroupSubsets (const rasters_group_t* rgroup, List& slist); + uint32_t getSerialGroupFlags (const rasters_group_t* rgroup); virtual double getGmtDate (const OGRFeature* feature, const char* field, TimeLib::gmt_time_t& gmtDate); - bool openGeoIndex (const OGRGeometry* geo, const std::vector* points); + bool openGeoIndex (const std::string& newFile, OGRGeometry* filter=NULL); virtual bool getFeatureDate (const OGRFeature* feature, TimeLib::gmt_time_t& gmtDate); - virtual void getIndexFile (const OGRGeometry* geo, std::string& file, const std::vector* points=NULL) = 0; + virtual void getIndexFile (const OGRGeometry* geo, std::string& file) = 0; + virtual void getIndexFile (const std::vector* points, std::string& file) = 0; virtual bool findRasters (raster_finder_t* finder) = 0; void sampleRasters (OGRGeometry* geo); - bool sample (OGRGeometry* geo, int64_t gps, GroupOrdering* groupList); + bool serialSample (OGRGeometry* geo, int64_t gps, GroupOrdering* groupList); /*-------------------------------------------------------------------- * Data @@ -277,7 +278,7 @@ class GeoIndexedRaster: public RasterObject * Data *--------------------------------------------------------------------*/ - List readers; + List serialReaders; List batchReaders; perf_stats_t perfStats; GdalRaster::overrideCRS_t crscb; @@ -297,19 +298,19 @@ class GeoIndexedRaster: public RasterObject static int luaBoundingBox (lua_State* L); static int luaCellSize (lua_State* L); - static void* readerThread (void *param); + static void* serialReaderThread (void *param); static void* batchReaderThread (void *param); static void* groupsFinderThread (void *param); static void* samplesCollectThread(void *param); - bool createReaderThreads (uint32_t rasters2sample); - bool createBatchReaderThreads(uint32_t rasters2sample); + bool createSerialReaderThreads (uint32_t rasters2sample); + bool createBatchReaderThreads (uint32_t rasters2sample); - bool updateCache (uint32_t& rasters2sample, const GroupOrdering* groupList); + bool updateSerialCache (uint32_t& rasters2sample, const GroupOrdering* groupList); bool filterRasters (int64_t gps_secs, GroupOrdering* groupList, RasterFileDictionary& dict); static OGRGeometry* getConvexHull (const std::vector* points); - void applySpatialFilter (OGRLayer* layer, const std::vector* points); + void applySpatialFilter (OGRLayer* layer, OGRGeometry* filter); bool findAllGroups (const std::vector* points, std::vector& pointsGroups, diff --git a/packages/geo/GeoIndexedRasterBatch.cpp b/packages/geo/GeoIndexedRasterBatch.cpp new file mode 100644 index 00000000..a095c2c5 --- /dev/null +++ b/packages/geo/GeoIndexedRasterBatch.cpp @@ -0,0 +1,984 @@ +/* + * Copyright (c) 2021, University of Washington + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the University of Washington nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF WASHINGTON AND CONTRIBUTORS + * “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF WASHINGTON OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/****************************************************************************** + * INCLUDES + ******************************************************************************/ + +#include "GeoRaster.h" +#include "GeoIndexedRaster.h" + + +/****************************************************************************** + * STATIC DATA + ******************************************************************************/ + +/****************************************************************************** + * PUBLIC METHODS + ******************************************************************************/ + +/*---------------------------------------------------------------------------- + * PointSample Constructor + *----------------------------------------------------------------------------*/ +GeoIndexedRaster::PointSample::PointSample(const OGRPoint& _point, int64_t _pointIndex): + point(_point), pointIndex(_pointIndex), ssErrors(SS_NO_ERRORS) {} + + +/*---------------------------------------------------------------------------- + * PointSample Copy Constructor + *----------------------------------------------------------------------------*/ +GeoIndexedRaster::PointSample::PointSample(const PointSample& ps): + point(ps.point), pointIndex(ps.pointIndex), bandSample(ps.bandSample), ssErrors(ps.ssErrors) +{ + bandSampleReturned.resize(ps.bandSampleReturned.size()); + + for (size_t i = 0; i < ps.bandSampleReturned.size(); ++i) + { + if(ps.bandSampleReturned[i]) + { + /* Create a new atomic with the value loaded from the original */ + bandSampleReturned[i] = std::make_unique>(ps.bandSampleReturned[i]->load()); + } + else + { + bandSampleReturned[i] = NULL; + } + } +} + +/*---------------------------------------------------------------------------- + * SampleCollector Constructor + *----------------------------------------------------------------------------*/ +GeoIndexedRaster::SampleCollector::SampleCollector(GeoIndexedRaster* _obj, const std::vector& _pointsGroups): + obj(_obj), + pGroupsRange({0, 0}), + pointsGroups(_pointsGroups), + ssErrors(SS_NO_ERRORS) +{ +} + +/*---------------------------------------------------------------------------- + * GroupsFinder Constructor + *----------------------------------------------------------------------------*/ +GeoIndexedRaster::GroupsFinder::GroupsFinder(GeoIndexedRaster* _obj, const std::vector* _points): + obj(_obj), + pointsRange({0, 0}), + points(_points) +{ +} + +/*---------------------------------------------------------------------------- + * getSamples - batch sampling + *----------------------------------------------------------------------------*/ +uint32_t GeoIndexedRaster::getSamples(const std::vector& points, List& sllist, void* param) +{ + static_cast(param); + + lockSampling(); + + perfStats.clear(); + cache.clear(); /* Clear cache used by serial sampling */ + fileDict.clear(); /* Start with empty file dictionary */ + + /* Vector of points and their associated raster groups */ + std::vector pointsGroups; + + /* Vector of rasters and all points they contain */ + std::vector uniqueRasters; + + try + { + ssErrors = SS_NO_ERRORS; + + /* Get index file for the points */ + std::string ifile; + getIndexFile(&points, ifile); + + /* Create a convex hull that wraps around all the points, used for spatial filter */ + OGRGeometry* filter = getConvexHull(&points); + + /* Open the index file */ + const bool indexOpenedOk = openGeoIndex(ifile, filter); + + /* Clean up convex hull */ + OGRGeometryFactory::destroyGeometry(filter); + + if(!indexOpenedOk) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Error opening index file"); + } + + { + /* Rasters to points map */ + raster_points_map_t rasterToPointsMap; + + /* For all points create a vector of raster group lists */ + if(!findAllGroups(&points, pointsGroups, rasterToPointsMap)) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Error creating groups"); + } + + /* For all points create a vector of unique rasters */ + if(!findUniqueRasters(uniqueRasters, pointsGroups, rasterToPointsMap)) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Error finding unique rasters"); + } + + /* rastersToPointsMap is no longer needed */ + } + + /* Sample all unique rasters */ + if(!sampleUniqueRasters(uniqueRasters)) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Error sampling unique rasters"); + } + + /* Populate sllist with samples */ + if(!collectSamples(pointsGroups, sllist)) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Error collecting samples"); + } + } + catch (const RunTimeException &e) + { + mlog(e.level(), "Error getting samples: %s", e.what()); + } + + /* Clean up pointsGroups */ + for(const point_groups_t& pg : pointsGroups) + delete pg.groupList; + + /* Clean up unique rasters */ + for(unique_raster_t* ur : uniqueRasters) + { + for(const point_sample_t& ps : ur->pointSamples) + { + /* Delete samples which have not been returned (quality masks, etc) */ + for(size_t i = 0; i < ps.bandSample.size(); i++) + { + if(!ps.bandSampleReturned[i]->load()) + { + delete ps.bandSample[i]; + } + } + } + + delete ur; + } + + unlockSampling(); + + /* Print performance stats */ + perfStats.log(INFO); + + return ssErrors; +} + +/****************************************************************************** + * PROTECTED METHODS + ******************************************************************************/ + +/*---------------------------------------------------------------------------- + * BatchReader Constructor + *----------------------------------------------------------------------------*/ +GeoIndexedRaster::BatchReader::BatchReader(GeoIndexedRaster* _obj) : + obj(_obj), + uraster(NULL), + sync(NUM_SYNC_SIGNALS), + run(true) +{ + thread = new Thread(GeoIndexedRaster::batchReaderThread, this); +} + +/*---------------------------------------------------------------------------- + * BatchReader Destructor + *----------------------------------------------------------------------------*/ +GeoIndexedRaster::BatchReader::~BatchReader(void) +{ + sync.lock(); + { + run = false; /* Set run flag to false */ + sync.signal(DATA_TO_SAMPLE, Cond::NOTIFY_ONE); + } + sync.unlock(); + + delete thread; /* delete thread waits on thread to join */ +} + +/*---------------------------------------------------------------------------- + * getBatchGroupSamples + *----------------------------------------------------------------------------*/ +uint32_t GeoIndexedRaster::getBatchGroupSamples(const rasters_group_t* rgroup, List* slist, uint32_t flags, uint32_t pointIndx) +{ + uint32_t errors = SS_NO_ERRORS; + + for(const auto& rinfo: rgroup->infovect) + { + if(strcmp(VALUE_TAG, rinfo.tag.c_str()) != 0) continue; + + /* This is the unique raster we are looking for, it cannot be NULL */ + unique_raster_t* ur = rinfo.uraster; + assert(ur); + + /* Get the sample for this point from unique raster */ + for(point_sample_t& ps : ur->pointSamples) + { + if(ps.pointIndex == pointIndx) + { + for(size_t i = 0; i < ps.bandSample.size(); i++) + { + /* sample can be NULL if raster read failed, (e.g. point out of bounds) */ + if(ps.bandSample[i] == NULL) continue;; + + RasterSample* sample; + if(!ps.bandSampleReturned[i]->exchange(true)) + { + sample = ps.bandSample[i]; + } + else + { + /* Sample has already been returned, must create a copy */ + sample = new RasterSample(*ps.bandSample[i]); + } + + /* Set time for this sample */ + sample->time = rgroup->gpsTime; + + /* Set flags for this sample, add it to the list */ + sample->flags = flags; + slist->add(sample); + errors |= ps.ssErrors; + } + + /* + * This function assumes that there is only one raster with VALUE_TAG in a group. + * If group has other value rasters the dataset must override this function. + */ + return errors; + } + } + } + + return errors; +} + +/*---------------------------------------------------------------------------- + * getBatchGroupFlags + *----------------------------------------------------------------------------*/ +uint32_t GeoIndexedRaster::getBatchGroupFlags(const rasters_group_t* rgroup, uint32_t pointIndx) +{ + for(const auto& rinfo: rgroup->infovect) + { + if(strcmp(FLAGS_TAG, rinfo.tag.c_str()) != 0) continue; + + /* This is the unique raster we are looking for, it cannot be NULL */ + unique_raster_t* ur = rinfo.uraster; + assert(ur); + + /* Get the sample for this point from unique raster */ + for(const point_sample_t& ps : ur->pointSamples) + { + if(ps.pointIndex == pointIndx) + { + /* + * This function assumes that there is only one raster with FLAGS_TAG in a group. + * The flags value must be in the first band. + * If these assumptions are not met the dataset must override this function. + */ + RasterSample* sample = NULL; + + /* bandSample can be empty if raster failed to open */ + if(!ps.bandSample.empty()) + { + sample = ps.bandSample[0]; + } + + + /* sample can be NULL if raster read failed, (e.g. point out of bounds) */ + if(sample == NULL) break;; + + return sample->value; + } + } + } + + return 0; +} + +/****************************************************************************** + * PRIVATE METHODS + ******************************************************************************/ + +/*---------------------------------------------------------------------------- + * batchReaderThread + *----------------------------------------------------------------------------*/ +void* GeoIndexedRaster::batchReaderThread(void *param) +{ + batch_reader_t *breader = static_cast(param); + + while(breader->run) + { + breader->sync.lock(); + { + /* Wait for raster to work on */ + while((breader->uraster == NULL) && breader->run) + breader->sync.wait(DATA_TO_SAMPLE, SYS_TIMEOUT); + } + breader->sync.unlock(); + + if(breader->uraster != NULL) + { + unique_raster_t* ur = breader->uraster; + GdalRaster* raster = NULL; + + try + { + raster = new GdalRaster(breader->obj->parms, + breader->obj->fileDict.get(ur->rinfo->fileId), + 0, /* Sample collecting code will set it to group's gpsTime */ + ur->rinfo->fileId, + ur->rinfo->elevationBandNum, + ur->rinfo->flagsBandNum, + breader->obj->crscb); + + CHECKPTR(raster); + + /* Open raster so we can get inner bands from it */ + raster->open(); + + std::vector bands; + breader->obj->getInnerBands(raster, bands); + + /* Sample all points for this raster */ + for(point_sample_t& ps : ur->pointSamples) + { + /* Sample raster bands */ + const bool oneBand = bands.size() == 1; + if(oneBand) + { + RasterSample* sample = raster->samplePOI(&ps.point, bands[0]); + ps.bandSample.push_back(sample); + ps.bandSampleReturned.emplace_back(std::make_unique>(false)); + } + else + { + /* Multiple bands */ + for(const int bandNum : bands) + { + /* Use local copy of point, it will be projected in samplePOI. We do not want to project it again */ + OGRPoint point(ps.point); + + RasterSample* sample = raster->samplePOI(&point, bandNum); + ps.bandSample.push_back(sample); + ps.bandSampleReturned.emplace_back(std::make_unique>(false)); + ps.ssErrors |= raster->getSSerror(); + } + } + } + } + catch(const RunTimeException& e) + { + mlog(e.level(), "%s", e.what()); + } + + delete raster; + breader->sync.lock(); + { + breader->uraster = NULL; /* Done with this raster and all points */ + breader->sync.signal(DATA_SAMPLED, Cond::NOTIFY_ONE); + } + breader->sync.unlock(); + } + } + + return NULL; +} + +/*---------------------------------------------------------------------------- + * groupsFinderThread + *----------------------------------------------------------------------------*/ +void* GeoIndexedRaster::groupsFinderThread(void *param) +{ + groups_finder_t* gf = static_cast(param); + + /* Thread must initialize GEOS context */ + GEOSContextHandle_t threadGeosContext = GeoRtree::init(); + + const uint32_t start = gf->pointsRange.start; + const uint32_t end = gf->pointsRange.end; + + mlog(DEBUG, "Finding groups for points range: %u - %u", start, end); + + for(uint32_t i = start; i < end; i++) + { + if(!gf->obj->sampling()) + { + mlog(WARNING, "Sampling has been stopped, exiting groups finder thread"); + break; + } + + const point_info_t& pinfo = gf->points->at(i); + const OGRPoint ogrPoint(pinfo.point.x, pinfo.point.y, pinfo.point.z); + + /* Query the R-tree with the OGRPoint and get the result features */ + std::vector foundFeatures; + gf->obj->geoRtree.query(&ogrPoint, threadGeosContext, foundFeatures); + // mlog(DEBUG, "Found %zu features for point %u", foundFeatures.size(), i); + + /* Clone found features since OGRFeature is not thread safe */ + std::vector threadFeatures; + threadFeatures.reserve(foundFeatures.size()); + + for(OGRFeature* feature : foundFeatures) + { + threadFeatures.push_back(feature->Clone()); + } + + /* Set finder for the found features */ + RasterFinder finder(&ogrPoint, &threadFeatures, gf->threadFileDict); + + /* Find rasters intersecting with ogrPoint */ + gf->obj->findRasters(&finder); + + /* Destroy cloned features */ + for(OGRFeature* feature : threadFeatures) + { + OGRFeature::DestroyFeature(feature); + } + + /* Copy rasterGroups from finder to local groupList */ + GroupOrdering* groupList = new GroupOrdering(); + for(rasters_group_t* rgroup : finder.rasterGroups) + { + groupList->add(groupList->length(), rgroup); + } + + /* Filter rasters based on POI time */ + const int64_t gps = gf->obj->usePOItime() ? pinfo.gps : 0.0; + gf->obj->filterRasters(gps, groupList, gf->threadFileDict); + + /* Add found rasters which passed the filter to pointsGroups */ + gf->pointsGroups.emplace_back(point_groups_t{ogrPoint, i, groupList}); + + /* Add raster file names from this groupList to raster to points map */ + const GroupOrdering::Iterator iter(*groupList); + for(int j = 0; j < iter.length; j++) + { + const rasters_group_t* rgroup = iter[j].value; + for(const raster_info_t& rinfo : rgroup->infovect) + { + const char* fileName = gf->threadFileDict.get(rinfo.fileId); + gf->rasterToPointsMap[fileName].insert(i); + } + } + } + + mlog(DEBUG, "Found %zu point groups for range: %u - %u", gf->pointsGroups.size(), start, end); + + /* Thread must initialize GEOS context */ + GeoRtree::deinit(threadGeosContext); + + return NULL; +} + +/*---------------------------------------------------------------------------- + * samplesCollectThread + *----------------------------------------------------------------------------*/ +void* GeoIndexedRaster::samplesCollectThread(void* param) +{ + sample_collector_t* sc = static_cast(param); + + const uint32_t start = sc->pGroupsRange.start; + const uint32_t end = sc->pGroupsRange.end; + + mlog(DEBUG, "Collecting samples for range: %u - %u", start, end); + + u_int32_t numSamples = 0; + for(uint32_t pointIndx = start; pointIndx < end; pointIndx++) + { + if(!sc->obj->sampling()) + { + mlog(WARNING, "Sampling has been stopped, exiting samples collect thread"); + break; + } + + const point_groups_t& pg = sc->pointsGroups[pointIndx]; + + /* Allocate a new sample list for groupList */ + sample_list_t* slist = new sample_list_t(); + + const GroupOrdering::Iterator iter(*pg.groupList); + for(int i = 0; i < iter.length; i++) + { + const rasters_group_t* rgroup = iter[i].value; + uint32_t flags = 0; + + /* Get flags value for this group of rasters */ + if(sc->obj->parms->flags_file) + flags = getBatchGroupFlags(rgroup, pointIndx); + + sc->ssErrors |= sc->obj->getBatchGroupSamples(rgroup, slist, flags, pointIndx); + numSamples += slist->length(); + } + + sc->slvector.push_back(slist); + } + + mlog(DEBUG, "Collected %u samples for range: %u - %u", numSamples, start, end); + + return NULL; +} + +/*---------------------------------------------------------------------------- + * createBatchReaderThreads + *----------------------------------------------------------------------------*/ +bool GeoIndexedRaster::createBatchReaderThreads(uint32_t rasters2sample) +{ + const int threadsNeeded = rasters2sample; + const int threadsNow = batchReaders.length(); + const int newThreadsCnt = threadsNeeded - threadsNow; + + if(threadsNeeded <= threadsNow) + return true; + + try + { + for(int i = 0; i < newThreadsCnt; i++) + { + BatchReader* r = new BatchReader(this); + batchReaders.add(r); + } + } + catch (const RunTimeException &e) + { + ssErrors |= SS_RESOURCE_LIMIT_ERROR; + mlog(CRITICAL, "Failed to create batch reader threads"); + } + + return batchReaders.length() == threadsNeeded; +} + +/*---------------------------------------------------------------------------- + * findAllGroups + *----------------------------------------------------------------------------*/ +bool GeoIndexedRaster::findAllGroups(const std::vector* points, + std::vector& pointsGroups, + raster_points_map_t& rasterToPointsMap) +{ + /* Do not find groups if sampling stopped */ + if(!sampling()) return true; + + bool status = false; + const double startTime = TimeLib::latchtime(); + + try + { + /* Start rasters groups finder threads */ + std::vector pids; + std::vector rgroupFinders; + + const uint32_t numMaxThreads = std::thread::hardware_concurrency(); + const uint32_t minPointsPerThread = 100; + + mlog(INFO, "Finding rasters groups for all points with %u threads", numMaxThreads); + + std::vector pointsRanges; + getThreadsRanges(pointsRanges, points->size(), minPointsPerThread, numMaxThreads); + const uint32_t numThreads = pointsRanges.size(); + + for(uint32_t i = 0; i < numThreads; i++) + { + GroupsFinder* gf = new GroupsFinder(this, points); + gf->pointsRange = pointsRanges[i]; + rgroupFinders.push_back(gf); + Thread* pid = new Thread(groupsFinderThread, gf); + pids.push_back(pid); + } + + /* Wait for all groups finder threads to finish */ + for(Thread* pid : pids) + { + delete pid; + } + + mlog(INFO, "All groups finders time: %lf", TimeLib::latchtime() - startTime); + + /* Merge the pointGroups from each thread */ + mlog(INFO, "Merging point groups from all threads"); + for(GroupsFinder* gf : rgroupFinders) + { + /* Threads used local file dictionary, combine them and update fileId */ + for(const point_groups_t& pg: gf->pointsGroups) + { + const GroupOrdering::Iterator iter(*pg.groupList); + for (int64_t i = 0; i < iter.length; i++) + { + rasters_group_t *rgroup = iter[i].value; + for (raster_info_t &rinfo : rgroup->infovect) + { + /* Get file from thread file dictionary */ + const std::string fileName = gf->threadFileDict.get(rinfo.fileId); + + /* Add to main file dictionary */ + rinfo.fileId = fileDict.add(fileName); + } + } + + pointsGroups.push_back(pg); + } + + /* Merge the rasterToPointsMap for each thread */ + for (const raster_points_map_t::value_type &pair : gf->rasterToPointsMap) + { + rasterToPointsMap[pair.first].insert(pair.second.begin(), pair.second.end()); + } + + delete gf; + } + + /* Verify that the number of points groups is the same as the number of points */ + if(pointsGroups.size() != points->size()) + { + mlog(ERROR, "Number of points groups: %zu does not match number of points: %zu", pointsGroups.size(), points->size()); + throw RunTimeException(CRITICAL, RTE_ERROR, "Number of points groups does not match number of points"); + } + + /* Reduce memory usage */ + pointsGroups.shrink_to_fit(); + rasterToPointsMap.rehash(rasterToPointsMap.size()); + + status = true; + } + catch (const RunTimeException &e) + { + mlog(e.level(), "Error creating groups: %s", e.what()); + } + + perfStats.findRastersTime = TimeLib::latchtime() - startTime; + return status; +} + + +/*---------------------------------------------------------------------------- + * findUniqueRasters + *----------------------------------------------------------------------------*/ +bool GeoIndexedRaster::findUniqueRasters(std::vector& uniqueRasters, + const std::vector& pointsGroups, + raster_points_map_t& rasterToPointsMap) +{ + /* Do not find unique rasters if sampling stopped */ + if(!sampling()) return true; + + bool status = false; + const double startTime = TimeLib::latchtime(); + + try + { + /* Map to track the index of each unique raster in the uniqueRasters vector */ + std::unordered_map fileIndexMap; + + /* Create vector of unique rasters. */ + mlog(DEBUG, "Finding unique rasters"); + for(const point_groups_t& pg : pointsGroups) + { + const GroupOrdering::Iterator iter(*pg.groupList); + for(int64_t i = 0; i < iter.length; i++) + { + rasters_group_t* rgroup = iter[i].value; + for(raster_info_t& rinfo : rgroup->infovect) + { + /* Is this raster already in the list of unique rasters? */ + const std::string fileName = fileDict.get(rinfo.fileId); + auto it = fileIndexMap.find(fileName); + if(it != fileIndexMap.end()) + { + /* Raster is already in the vector of unique rasters, get index from map and update uraster pointer */ + rinfo.uraster = uniqueRasters[it->second]; + } + else + { + /* Raster is not in the vector of unique rasters */ + unique_raster_t* ur = new unique_raster_t(&rinfo); + uniqueRasters.push_back(ur); + + /* Set pointer in rinfo to new unique raster */ + rinfo.uraster = ur; + + /* Update index map */ + fileIndexMap[fileName] = uniqueRasters.size() - 1; + } + } + } + } + + /* For each unique raster, find the points that belong to it */ + mlog(DEBUG, "Finding points for unique rasters"); + for(unique_raster_t* ur : uniqueRasters) + { + auto it = rasterToPointsMap.find(fileDict.get(ur->rinfo->fileId)); + if(it != rasterToPointsMap.end()) + { + for(const uint32_t pointIndx : it->second) + { + const point_groups_t& pg = pointsGroups[pointIndx]; + ur->pointSamples.emplace_back(pg.point, pg.pointIndex); + } + ur->pointSamples.shrink_to_fit(); + } + } + + /* Reduce memory usage */ + uniqueRasters.shrink_to_fit(); + status = true; + } + catch(const RunTimeException& e) + { + mlog(e.level(), "Error creating groups: %s", e.what()); + } + + perfStats.findUniqueRastersTime = TimeLib::latchtime() - startTime; + mlog(INFO, "Unique rasters time: %lf", perfStats.findUniqueRastersTime); + + return status; +} + + +/*---------------------------------------------------------------------------- + * sampleUniqueRasters + *----------------------------------------------------------------------------*/ +bool GeoIndexedRaster::sampleUniqueRasters(const std::vector& uniqueRasters) +{ + /* Do not sample rasters if sampling stopped */ + if(!sampling()) return true; + + bool status = false; + const double startTime = TimeLib::latchtime(); + + try + { + /* Testing has shown that 20 threads performs twice as fast on a 8 core system than 50 or 100 threads. */ + const uint32_t maxThreads = 20; + + /* Create batch reader threads */ + const uint32_t numRasters = uniqueRasters.size(); + createBatchReaderThreads(std::min(maxThreads, numRasters)); + + const uint32_t numThreads = batchReaders.length(); + mlog(INFO, "Sampling %u rasters with %u threads", numRasters, numThreads); + + /* Sample unique rasters utilizing numThreads */ + uint32_t currentRaster = 0; + + while(currentRaster < numRasters) + { + /* Calculate how many rasters we can process in this batch */ + const uint32_t batchSize = std::min(numThreads, numRasters - currentRaster); + + /* Keep track of how many threads have been assigned work */ + uint32_t activeReaders = 0; + + /* Assign rasters to batch readers as soon as they are free */ + while(currentRaster < numRasters || activeReaders > 0) + { + for(uint32_t i = 0; i < batchSize; i++) + { + BatchReader* breader = batchReaders[i]; + + breader->sync.lock(); + { + /* If this thread is done with its previous raster, assign a new one */ + if(breader->uraster == NULL && currentRaster < numRasters) + { + breader->uraster = uniqueRasters[currentRaster++]; + breader->sync.signal(DATA_TO_SAMPLE, Cond::NOTIFY_ONE); + activeReaders++; + } + } + breader->sync.unlock(); + + if(!sampling()) + { + /* Sampling has been stopped, stop assigning new rasters */ + activeReaders = 0; + currentRaster = numRasters; + break; + } + + /* Check if the current breader has completed its work */ + breader->sync.lock(); + { + if(breader->uraster == NULL && activeReaders > 0) + { + /* Mark one reader as free */ + activeReaders--; + } + } + breader->sync.unlock(); + } + + /* Short wait before checking again to avoid busy waiting */ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + + /* Wait for all batch readers to finish sampling */ + for(int32_t i = 0; i < batchReaders.length(); i++) + { + BatchReader* breader = batchReaders[i]; + + breader->sync.lock(); + { + while(breader->uraster != NULL) + breader->sync.wait(DATA_SAMPLED, SYS_TIMEOUT); + } + breader->sync.unlock(); + } + + status = true; + } + catch(const RunTimeException& e) + { + mlog(e.level(), "Error creating groups: %s", e.what()); + } + + perfStats.samplesTime = TimeLib::latchtime() - startTime; + mlog(INFO, "Done Sampling, time: %lf", perfStats.samplesTime); + return status; +} + + +/*---------------------------------------------------------------------------- + * collectSamples + *----------------------------------------------------------------------------*/ +bool GeoIndexedRaster::collectSamples(const std::vector& pointsGroups, List& sllist) +{ + /* Do not collect samples if sampling stopped */ + if(!sampling()) return true; + + const double start = TimeLib::latchtime(); + + /* Sanity check for pointsGroups, internal pointIndex should be the same as the index in pointsGroups vector */ + assert(std::all_of(pointsGroups.cbegin(), pointsGroups.cend(), + [&pointsGroups](const point_groups_t& pg) { return pg.pointIndex == &pg - pointsGroups.data(); })); + + /* Start sample collection threads */ + std::vector pids; + std::vector sampleCollectors; + + const uint32_t numMaxThreads = std::thread::hardware_concurrency(); + const uint32_t minPointGroupsPerThread = 100; + + std::vector pGroupRanges; + getThreadsRanges(pGroupRanges, pointsGroups.size(), minPointGroupsPerThread, numMaxThreads); + const uint32_t numThreads = pGroupRanges.size(); + + mlog(INFO, "Collecting samples for %zu points with %u threads", pointsGroups.size(), numThreads); + + for(uint32_t i = 0; i < numThreads; i++) + { + SampleCollector* sc = new SampleCollector(this, pointsGroups); + sc->pGroupsRange = pGroupRanges[i]; + sampleCollectors.push_back(sc); + Thread* pid = new Thread(samplesCollectThread, sc); + pids.push_back(pid); + } + + /* Wait for all sample collection threads to finish */ + for(Thread* pid : pids) + { + delete pid; + } + + /* Merge sample lists from all sample collection threads */ + const double mergeStart = TimeLib::latchtime(); + mlog(DEBUG, "Merging sample lists"); + for(SampleCollector* sc : sampleCollectors) + { + const std::vector& slvector = sc->slvector; + for(sample_list_t* slist : slvector) + { + /* Update file dictionary */ + fileDictSetSamples(slist); + + sllist.add(slist); + } + ssErrors |= sc->ssErrors; + delete sc; + } + mlog(DEBUG, "Merged %d sample lists, time: %lf", sllist.length(), TimeLib::latchtime() - mergeStart); + + perfStats.collectSamplesTime = TimeLib::latchtime() - start; + mlog(INFO, "Populated sllist with %d lists of samples, time: %lf", sllist.length(), perfStats.collectSamplesTime); + + return true; +} + +/*---------------------------------------------------------------------------- + * getConvexHull + *----------------------------------------------------------------------------*/ +OGRGeometry* GeoIndexedRaster::getConvexHull(const std::vector* points) +{ + if(points->empty()) + return NULL; + + /* Create an empty geometry collection to hold all points */ + OGRGeometryCollection geometryCollection; + + mlog(INFO, "Creating convex hull from %zu points", points->size()); + + /* Collect all points into a geometry collection */ + for(const point_info_t& point_info : *points) + { + const double lon = point_info.point.x; + const double lat = point_info.point.y; + + OGRPoint* point = new OGRPoint(lon, lat); + geometryCollection.addGeometryDirectly(point); + } + + /* Create a convex hull that wraps around all the points */ + OGRGeometry* convexHull = geometryCollection.ConvexHull(); + if(convexHull == NULL) + { + mlog(ERROR, "Failed to create a convex hull around points."); + return NULL; + } + + /* Add a buffer around the convex hull to avoid missing edge points */ + OGRGeometry* bufferedConvexHull = convexHull->Buffer(TOLERANCE); + if(bufferedConvexHull) + { + OGRGeometryFactory::destroyGeometry(convexHull); + } + + return bufferedConvexHull; +} + diff --git a/packages/geo/GeoIndexedRasterSerial.cpp b/packages/geo/GeoIndexedRasterSerial.cpp new file mode 100644 index 00000000..20f48ebd --- /dev/null +++ b/packages/geo/GeoIndexedRasterSerial.cpp @@ -0,0 +1,597 @@ +/* + * Copyright (c) 2021, University of Washington + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the University of Washington nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF WASHINGTON AND CONTRIBUTORS + * “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF WASHINGTON OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/****************************************************************************** + * INCLUDES + ******************************************************************************/ + +#include "GeoRaster.h" +#include "GeoIndexedRaster.h" + +/****************************************************************************** + * STATIC DATA + ******************************************************************************/ + +/****************************************************************************** + * PUBLIC METHODS + ******************************************************************************/ + +/*---------------------------------------------------------------------------- + * Reader Constructor + *----------------------------------------------------------------------------*/ +GeoIndexedRaster::Reader::Reader (GeoIndexedRaster* _obj): + obj(_obj), + geo(NULL), + entry(NULL), + sync(NUM_SYNC_SIGNALS), + run(true) +{ + thread = new Thread(serialReaderThread, this); +} + +/*---------------------------------------------------------------------------- + * Reader Destructor + *----------------------------------------------------------------------------*/ +GeoIndexedRaster::Reader::~Reader (void) +{ + sync.lock(); + { + run = false; /* Set run flag to false */ + sync.signal(DATA_TO_SAMPLE, Cond::NOTIFY_ONE); + } + sync.unlock(); + + delete thread; /* delete thread waits on thread to join */ + + /* geometry geo is cloned not 'newed' on GDAL heap. Use this call to free it */ + OGRGeometryFactory::destroyGeometry(geo); +} + +/*---------------------------------------------------------------------------- + * getSamples - serial sampling + *----------------------------------------------------------------------------*/ +uint32_t GeoIndexedRaster::getSamples(const MathLib::point_3d_t& point, int64_t gps, List& slist, void* param) +{ + static_cast(param); + + lockSampling(); + try + { + GroupOrdering groupList; + OGRPoint ogrPoint(point.x, point.y, point.z); + + ssErrors = SS_NO_ERRORS; + + /* Sample Rasters */ + if(serialSample(&ogrPoint, gps, &groupList)) + { + /* Populate Return List of Samples (slist) */ + const GroupOrdering::Iterator iter(groupList); + for(int i = 0; i < iter.length; i++) + { + const rasters_group_t* rgroup = iter[i].value; + uint32_t flags = 0; + + /* Get flags value for this group of rasters */ + if(parms->flags_file) + flags = getSerialGroupFlags(rgroup); + + getSerialGroupSamples(rgroup, slist, flags); + } + } + + /* Update file dictionary */ + fileDictSetSamples(&slist); + } + catch (const RunTimeException &e) + { + mlog(e.level(), "Error getting samples: %s", e.what()); + } + + /* Free Unreturned Results */ + cacheitem_t* item; + const char* key = cache.first(&item); + while(key != NULL) + { + for(uint32_t i = 0; i < item->bandSample.size(); i++) + { + if(item->bandSample[i]) + { + delete item->bandSample[i]; + item->bandSample[i] = NULL; + } + } + + for(uint32_t i = 0; i < item->bandSubset.size(); i++) + { + if(item->bandSubset[i]) + { + delete item->bandSubset[i]; + item->bandSubset[i] = NULL; + } + } + key = cache.next(&item); + } + unlockSampling(); + + return ssErrors; +} + +/*---------------------------------------------------------------------------- + * getSubset + *----------------------------------------------------------------------------*/ +uint32_t GeoIndexedRaster::getSubsets(const MathLib::extent_t& extent, int64_t gps, List& slist, void* param) +{ + static_cast(param); + + lockSampling(); + try + { + GroupOrdering groupList; + OGRPolygon poly = GdalRaster::makeRectangle(extent.ll.x, extent.ll.y, extent.ur.x, extent.ur.y); + ssErrors = SS_NO_ERRORS; + + /* Sample Subsets */ + if(serialSample(&poly, gps, &groupList)) + { + /* Populate Return Vector of Subsets (slist) */ + const GroupOrdering::Iterator iter(groupList); + for(int i = 0; i < iter.length; i++) + { + const rasters_group_t* rgroup = iter[i].value; + getSerialGroupSubsets(rgroup, slist); + } + } + } + catch (const RunTimeException &e) + { + mlog(e.level(), "Error subsetting raster: %s", e.what()); + } + unlockSampling(); + + return ssErrors; +} + +/****************************************************************************** + * PROTECTED METHODS + ******************************************************************************/ + +/*---------------------------------------------------------------------------- + * getSerialGroupSamples + *----------------------------------------------------------------------------*/ +void GeoIndexedRaster::getSerialGroupSamples(const rasters_group_t* rgroup, List& slist, uint32_t flags) +{ + for(const auto& rinfo: rgroup->infovect) + { + if(strcmp(VALUE_TAG, rinfo.tag.c_str()) != 0) continue; + + const char* key = fileDict.get(rinfo.fileId); + cacheitem_t* item; + if(cache.find(key, &item)) + { + for(uint32_t i = 0; i < item->bandSample.size(); i++) + { + RasterSample* _sample = item->bandSample[i]; + if(_sample) + { + _sample->flags = flags; + slist.add(_sample); + item->bandSample[i] = NULL; + } + } + + /* Get sampling/subset error status */ + ssErrors |= item->raster->getSSerror(); + + /* + * This function assumes that there is only one raster with VALUE_TAG in a group. + * If group has other value rasters the dataset must override this function. + */ + break; + } + } +} + + +/*---------------------------------------------------------------------------- + * getSerialGroupSubsets + *----------------------------------------------------------------------------*/ +void GeoIndexedRaster::getSerialGroupSubsets(const rasters_group_t* rgroup, List& slist) +{ + for(const auto& rinfo: rgroup->infovect) + { + const char* key = fileDict.get(rinfo.fileId); + cacheitem_t* item; + if(cache.find(key, &item)) + { + for(uint32_t i = 0; i < item->bandSubset.size(); i++) + { + RasterSubset* subset = item->bandSubset[i]; + if(subset) + { + slist.add(subset); + item->bandSubset[i] = NULL; + } + } + + /* Get sampling/subset error status */ + ssErrors |= item->raster->getSSerror(); + } + } +} + + +/*---------------------------------------------------------------------------- + * getSerialGroupFlags + *----------------------------------------------------------------------------*/ +uint32_t GeoIndexedRaster::getSerialGroupFlags(const rasters_group_t* rgroup) +{ + for(const auto& rinfo: rgroup->infovect) + { + if(strcmp(FLAGS_TAG, rinfo.tag.c_str()) != 0) continue; + + cacheitem_t* item; + const char* key = fileDict.get(rinfo.fileId); + if(cache.find(key, &item) && !item->bandSample.empty()) + { + /* + * This function assumes that there is only one raster with FLAGS_TAG in a group. + * The flags value must be in the first band. + * If these assumptions are not met the dataset must override this function. + */ + const RasterSample* _sample = item->bandSample[0]; + if(_sample) + { + return _sample->value; + } + } + } + + return 0; +} + +/*---------------------------------------------------------------------------- + * sampleRasters + *----------------------------------------------------------------------------*/ +void GeoIndexedRaster::sampleRasters(OGRGeometry* geo) +{ + /* For each raster which is marked to be sampled, give it to the reader thread */ + int signaledReaders = 0; + int i = 0; + cacheitem_t* item; + + const char* key = cache.first(&item); + while(key != NULL) + { + if(item->enabled) + { + reader_t* reader = serialReaders[i++]; + reader->sync.lock(); + { + reader->entry = item; + OGRGeometryFactory::destroyGeometry(reader->geo); + reader->geo = geo->clone(); + reader->sync.signal(DATA_TO_SAMPLE, Cond::NOTIFY_ONE); + signaledReaders++; + } + reader->sync.unlock(); + } + key = cache.next(&item); + } + + /* Wait for readers to finish sampling */ + for(int j = 0; j < signaledReaders; j++) + { + reader_t* reader = serialReaders[j]; + reader->sync.lock(); + { + while(reader->entry != NULL) + reader->sync.wait(DATA_SAMPLED, SYS_TIMEOUT); + } + reader->sync.unlock(); + } +} + + +/*---------------------------------------------------------------------------- + * serialSample + *----------------------------------------------------------------------------*/ +bool GeoIndexedRaster::serialSample(OGRGeometry* geo, int64_t gps_secs, GroupOrdering* groupList) +{ + /* Open the index file, if not already open */ + std::string ifile;; + getIndexFile(geo, ifile); + + if(!openGeoIndex(ifile)) + return false; + + /* Find rasters that intersect with the geometry */ + std::vector foundFeatures; + + /* Query the R-tree with the OGRPoint and get the result features */ + geoRtree.query(geo, foundFeatures); + + raster_finder_t finder(geo, &foundFeatures, fileDict); + if(!findRasters(&finder)) + return false; + + /* Copy finder's raster groups to groupList */ + for(uint32_t j = 0; j < finder.rasterGroups.size(); j++) + { + rasters_group_t* rgroup = finder.rasterGroups[j]; + groupList->add(groupList->length(), rgroup); + } + + if(!filterRasters(gps_secs, groupList, fileDict)) + return false; + + uint32_t rasters2sample = 0; + if(!updateSerialCache(rasters2sample, groupList)) + return false; + + /* Create additional reader threads if needed */ + if(!createSerialReaderThreads(rasters2sample)) + return false; + + sampleRasters(geo); + + return true; +} + + +/****************************************************************************** + * PRIVATE METHODS + ******************************************************************************/ + +/*---------------------------------------------------------------------------- + * serialReaderThread + *----------------------------------------------------------------------------*/ +void* GeoIndexedRaster::serialReaderThread(void *param) +{ + reader_t *reader = static_cast(param); + + while(reader->run) + { + reader->sync.lock(); + { + /* Wait for raster to work on */ + while((reader->entry == NULL) && reader->run) + reader->sync.wait(DATA_TO_SAMPLE, SYS_TIMEOUT); + } + reader->sync.unlock(); + + cacheitem_t* entry = reader->entry; + if(entry != NULL) + { + GdalRaster* raster = entry->raster; + + try + { + CHECKPTR(raster); + + /* Open raster so we can get inner bands from it */ + raster->open(); + + std::vector bands; + reader->obj->getInnerBands(raster, bands); + + if(GdalRaster::ispoint(reader->geo)) + { + /* Sample raster bands */ + const bool oneBand = bands.size() == 1; + if(oneBand) + { + OGRPoint* p = (dynamic_cast(reader->geo)); + RasterSample* sample = raster->samplePOI(p, bands[0]); + entry->bandSample.push_back(sample); + } + else + { + /* Multiple bands */ + for(const int bandNum : bands) + { + /* Use local copy of point, it will be projected in samplePOI. We do not want to project it again */ + OGRPoint* p = (dynamic_cast(reader->geo)); + OGRPoint point(*p); + + RasterSample* sample = raster->samplePOI(&point, bandNum); + entry->bandSample.push_back(sample); + mlog(DEBUG, "Band: %d, %s", bandNum, sample ? sample->toString().c_str() : "NULL"); + } + } + } + else if(GdalRaster::ispoly(reader->geo)) + { + /* Subset raster bands */ + for(const int bandNum : bands) + { + /* No need to use local copy of polygon, subsetAOI will use it's envelope and not project it */ + RasterSubset* subset = raster->subsetAOI(dynamic_cast(reader->geo), bandNum); + if(subset) + { + /* + * Create new GeoRaster object for subsetted raster + * Use NULL for LuaState, using parent's causes memory corruption + * NOTE: cannot use RasterObject::cppCreate(parms) here, it would create + * new GeoIndexRaster with the same file path as parent raster. + */ + subset->robj = new GeoRaster(NULL, + reader->obj->rqstParms, + reader->obj->samplerKey, + subset->rasterName, + raster->getGpsTime(), + raster->getElevationBandNum(), + raster->getFLagsBandNum(), + raster->getOverrideCRS()); + + entry->bandSubset.push_back(subset); + + /* RequestFields are shared with subsseted raster and other readers */ + GeoIndexedRaster::referenceLuaObject(reader->obj->rqstParms); + } + } + } + } + catch(const RunTimeException& e) + { + mlog(e.level(), "%s", e.what()); + } + + entry->enabled = false; /* raster samples/subsetted */ + + reader->sync.lock(); + { + reader->entry = NULL; /* Done with this raster */ + reader->sync.signal(DATA_SAMPLED, Cond::NOTIFY_ONE); + } + reader->sync.unlock(); + } + } + + return NULL; +} + +/*---------------------------------------------------------------------------- + * createSerialReaderThreads + *----------------------------------------------------------------------------*/ +bool GeoIndexedRaster::createSerialReaderThreads(uint32_t rasters2sample) +{ + const int threadsNeeded = rasters2sample; + const int threadsNow = serialReaders.length(); + const int newThreadsCnt = threadsNeeded - threadsNow; + + if(threadsNeeded <= threadsNow) + return true; + + try + { + for(int i = 0; i < newThreadsCnt; i++) + { + Reader* r = new Reader(this); + serialReaders.add(r); + } + } + catch (const RunTimeException &e) + { + ssErrors |= SS_RESOURCE_LIMIT_ERROR; + mlog(CRITICAL, "Failed to create reader threads, needed: %d, created: %d", newThreadsCnt, serialReaders.length() - threadsNow); + } + + return serialReaders.length() == threadsNeeded; +} + +/*---------------------------------------------------------------------------- + * updateSerialCache + *----------------------------------------------------------------------------*/ +bool GeoIndexedRaster::updateSerialCache(uint32_t& rasters2sample, const GroupOrdering* groupList) +{ + /* Mark all items in cache as not enabled */ + { + cacheitem_t* item; + const char* key = cache.first(&item); + while(key != NULL) + { + item->enabled = false; + key = cache.next(&item); + } + } + + /* Cache contains items/rasters from previous sample run */ + const GroupOrdering::Iterator group_iter(*groupList); + for(int i = 0; i < group_iter.length; i++) + { + const rasters_group_t* rgroup = group_iter[i].value; + for(const auto& rinfo : rgroup->infovect) + { + const char* key = fileDict.get(rinfo.fileId); + cacheitem_t* item; + const bool inCache = cache.find(key, &item); + if(!inCache) + { + /* Create new cache item with raster + note use of bbox in construcutor - it limits area + of interest to the extent of vector index file */ + item = new cacheitem_t; + item->raster = new GdalRaster(parms, + key, + static_cast(rgroup->gpsTime), + rinfo.fileId, + rinfo.elevationBandNum, + rinfo.flagsBandNum, + crscb, + &bbox); + const bool status = cache.add(key, item); + assert(status); (void)status; // cannot fail; prevents linter warnings + } + + /* Clear from previous run */ + item->bandSample.clear(); + + /* Mark as Enabled */ + item->enabled = true; + rasters2sample++; + } + } + + /* Maintain cache from getting too big. */ + if(cache.length() > MAX_CACHE_SIZE) + { + std::vector keys_to_remove; + { + cacheitem_t* item; + const char* key = cache.first(&item); + while(key != NULL) + { + if(!item->enabled) + { + keys_to_remove.push_back(key); + } + key = cache.next(&item); + } + } + + /* Remove cache items found above */ + for(const char* key : keys_to_remove) + { + cache.remove(key); + } + } + + /* Check for max limit of concurent reading raster threads */ + if(rasters2sample > MAX_READER_THREADS) + { + ssErrors |= SS_THREADS_LIMIT_ERROR; + mlog(ERROR, "Too many rasters to read: %d, max allowed: %d", cache.length(), MAX_READER_THREADS); + return false; + } + + return true; +} \ No newline at end of file diff --git a/scripts/systests/parquet_sampler_3dep_perf_test.lua b/scripts/systests/parquet_sampler_3dep_perf_test.lua index dcd224b9..9d75a8f6 100644 --- a/scripts/systests/parquet_sampler_3dep_perf_test.lua +++ b/scripts/systests/parquet_sampler_3dep_perf_test.lua @@ -10,8 +10,8 @@ local td = runner.rootdir(arg[0]) local outq_name = "outq-luatest" -local in_parquet = '/data/3dep/wrzesien_snow_64k.parquet' --- local in_parquet = '/data/3dep/wrzesien_snow_525k.parquet' +-- local in_parquet = '/data/3dep/wrzesien_snow_64k.parquet' +local in_parquet = '/data/3dep/wrzesien_snow_525k.parquet' -- local in_parquet = '/data/3dep/wrzesien_snow_2618k.parquet' -- Indicates local file system (no s3 or client)