Skip to content

Commit

Permalink
Allow polygon filtering of manual input
Browse files Browse the repository at this point in the history
Use manually fed input files in priority, still applying
any provided polygon.

Closes #28
  • Loading branch information
agrenott committed Nov 6, 2023
1 parent 7b0f30d commit c1c1c9b
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 33 deletions.
11 changes: 9 additions & 2 deletions pyhgtmap/NASASRTMUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import base64
import os
import sys
from typing import List, Optional, Tuple
import urllib
import zipfile
from http import cookiejar as cookielib
Expand Down Expand Up @@ -876,7 +877,13 @@ def getFile(opener, area, source):
return downloadAndUnzip(opener, url, area, source)


def getFiles(area, polygon, corrx, corry, sources):
def getFiles(
area: str,
polygon: Optional[List[List[Tuple[float, float]]]],
corrx: float,
corry: float,
sources: List[str],
) -> List[Tuple[str, bool]]:
initDirs(sources)
bbox = calcBbox(area, corrx, corry)
areaPrefixes = makeFileNamePrefixes(bbox, polygon, corrx, corry)
Expand All @@ -898,7 +905,7 @@ def getFiles(area, polygon, corrx, corry, sources):
return files


def anySRTMsources(sources):
def anySRTMsources(sources: List[str]) -> bool:
"""
Returns True if any of the given sources start with 'srtm', False otherwise.
Expand Down
2 changes: 1 addition & 1 deletion pyhgtmap/hgt/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def polygon_mask(
maskArray = numpy.ma.array(numpy.empty((len(x_data) * len(y_data), 1)))
for p in clipped_polygons:
# run through all polygons and combine masks
mask = PolygonPath(p).contains_points(xyPoints)
mask = PolygonPath(p).contains_points(xyPoints) # type: ignore
maskArray = numpy.ma.array(maskArray, mask=mask, keep_mask=True)
return numpy.invert(maskArray.mask.reshape(len(y_data), len(x_data)))

Expand Down
53 changes: 25 additions & 28 deletions pyhgtmap/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import logging
import os
import sys
from optparse import OptionParser
from optparse import OptionParser, Values
from typing import List, Tuple

from pyhgtmap import NASASRTMUtil, __version__, configUtil
from pyhgtmap.hgt.file import calcHgtArea, parsePolygon
Expand All @@ -16,7 +17,7 @@
logger = logging.getLogger(__name__)


def parseCommandLine():
def parseCommandLine(sys_args: List[str]) -> Tuple[Values, List[str]]:
"""parses the command line."""
parser = OptionParser(
usage="%prog [options] [<hgt or GeoTiff file>] [<hgt or GeoTiff files>]"
Expand Down Expand Up @@ -439,7 +440,7 @@ def parseCommandLine():
default="WARNING",
help="Set this tool's debug logging level",
)
opts, args = parser.parse_args()
opts, args = parser.parse_args(sys_args)
if opts.version:
print("pyhgtmap {0:s}".format(__version__))
sys.exit(0)
Expand Down Expand Up @@ -553,44 +554,40 @@ def parseCommandLine():
return opts, args


def main() -> None:
opts, args = parseCommandLine()
def main(sys_args: List[str]) -> None:
opts, args = parseCommandLine(sys_args)
configure_logging(opts.logLevel)
if opts.area:

hgtDataFiles: List[Tuple[str, bool]]
if args:
# Prefer using any manually provided source file
use_poly_flag = opts.polygon is not None
hgtDataFiles = [
(arg, use_poly_flag)
for arg in args
if os.path.splitext(arg)[1].lower() in (".hgt", ".tif", ".tiff", ".vrt")
]
opts.area = ":".join(
[str(i) for i in calcHgtArea(hgtDataFiles, opts.srtmCorrx, opts.srtmCorry)]
)
else:
# Download from area or polygon
logger.debug(f"Downloading HGT files for area {opts.area}")
hgtDataFiles = NASASRTMUtil.getFiles(
opts.area, opts.polygon, opts.srtmCorrx, opts.srtmCorry, opts.dataSource
)
if len(hgtDataFiles) == 0:
if len(opts.dataSource) == 1:
print(
"No files for this area {0:s} from desired source.".format(
opts.area
)
)
else:
print(
"No files for this area {0:s} from desired sources.".format(
opts.area
)
)
print(
"No files for this area {0:s} from desired source(s).".format(opts.area)
)
sys.exit(0)
elif opts.downloadOnly:
sys.exit(0)
else:
hgtDataFiles = [
(arg, False)
for arg in args
if os.path.splitext(arg)[1].lower() in (".hgt", ".tif", ".tiff", ".vrt")
]
opts.area = ":".join(
[str(i) for i in calcHgtArea(hgtDataFiles, opts.srtmCorrx, opts.srtmCorry)]
)

HgtFilesProcessor(opts.nJobs, opts.startId, opts.startWayId, opts).process_files(
hgtDataFiles
)


if __name__ == "__main__":
main()
main(sys.argv[1:])
4 changes: 2 additions & 2 deletions tests/hgt/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def test_way_id_overflow(default_options: SimpleNamespace) -> None:

@staticmethod
def test_process_tile_internal_empty_contour(
default_options: SimpleNamespace, caplog
default_options: SimpleNamespace, caplog: pytest.LogCaptureFixture
) -> None:
"""Ensure no empty output file is generated when there's no contour."""
processor = HgtFilesProcessor(
Expand All @@ -345,7 +345,7 @@ def test_process_tile_internal_empty_contour(
tile_mock.__str__.return_value = "Tile (28.00, 42.50, 29.00, 43.00)" # type: ignore
with tempfile.TemporaryDirectory() as tempdir_name:
with cwd(tempdir_name):
caplog.set_level(logging.INFO)
caplog.set_level(logging.INFO, logger="pyhgtmap.hgt.processor")
processor.process_tile_internal("empty.pbf", tile_mock)
# NO file must be generated
assert not os.path.exists("empty.pbf")
Expand Down
108 changes: 108 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import optparse
import os
from unittest.mock import patch
from pyhgtmap import main

from . import TEST_DATA_PATH


@patch("pyhgtmap.main.NASASRTMUtil")
@patch("pyhgtmap.main.HgtFilesProcessor")
def test_main_download_from_poly(HgtFilesProcessor_mock, NASASRTMUtil_mock) -> None:
"""Only polygon option is used, without files; download tiles."""
# Prepare
sys_args = [
"--pbf",
"--source=view1",
f"--polygon={os.path.join(TEST_DATA_PATH, 'france.poly')}",
]
NASASRTMUtil_mock.getFiles.return_value = [
("hgt/VIEW1/N45E006.hgt", True),
("hgt/VIEW1/N46E006.hgt", True),
]

# Test
main.main(sys_args)

# Check
NASASRTMUtil_mock.getFiles.assert_called_once()
assert (
NASASRTMUtil_mock.getFiles.call_args[0][0]
== "-6.9372070:41.2386600:9.9000000:51.4288000"
)
assert NASASRTMUtil_mock.getFiles.call_args[0][1][0][0:5] == [
(9.9, 42.43788),
(9.9, 41.41346),
(9.328765, 41.32062),
(9.286847, 41.28319),
(8.798805, 41.23866),
]
assert NASASRTMUtil_mock.getFiles.call_args[0][2] == 0
assert NASASRTMUtil_mock.getFiles.call_args[0][3] == 0
assert NASASRTMUtil_mock.getFiles.call_args[0][4] == ["view1"]

HgtFilesProcessor_mock.assert_called_once()
parsed_options: optparse.Values = HgtFilesProcessor_mock.call_args.args[3]
assert parsed_options.area == "-6.9372070:41.2386600:9.9000000:51.4288000"

HgtFilesProcessor_mock.return_value.process_files.assert_called_once_with(
[("hgt/VIEW1/N45E006.hgt", True), ("hgt/VIEW1/N46E006.hgt", True)]
)


@patch("pyhgtmap.main.NASASRTMUtil")
@patch("pyhgtmap.main.HgtFilesProcessor")
def test_main_manual_input_poly(HgtFilesProcessor_mock, NASASRTMUtil_mock) -> None:
"""Polygon option is used, with manual files; polygon must be applied to files."""
# Prepare
sys_args = [
"--pbf",
"--source=view1",
f"--polygon={os.path.join(TEST_DATA_PATH, 'france.poly')}",
"N45E007.hgt",
"N46E007.hgt",
"N47E007.hgt",
]

# Test
main.main(sys_args)

# Check
NASASRTMUtil_mock.getFiles.assert_not_called()

HgtFilesProcessor_mock.assert_called_once()
parsed_options: optparse.Values = HgtFilesProcessor_mock.call_args.args[3]
# area must be properly computed from files names
assert parsed_options.area == "7:45:8:48"
# Polygon check must be enabled for all files
HgtFilesProcessor_mock.return_value.process_files.assert_called_once_with(
[("N45E007.hgt", True), ("N46E007.hgt", True), ("N47E007.hgt", True)]
)


@patch("pyhgtmap.main.NASASRTMUtil")
@patch("pyhgtmap.main.HgtFilesProcessor")
def test_main_manual_input_no_poly(HgtFilesProcessor_mock, NASASRTMUtil_mock) -> None:
"""Polygon option is NOT used, with manual files."""
# Prepare
sys_args = [
"--pbf",
"N45E007.hgt",
"N46E007.hgt",
"N47E007.hgt",
]

# Test
main.main(sys_args)

# Check
NASASRTMUtil_mock.getFiles.assert_not_called()

HgtFilesProcessor_mock.assert_called_once()
parsed_options: optparse.Values = HgtFilesProcessor_mock.call_args.args[3]
# area must be properly computed from files names
assert parsed_options.area == "7:45:8:48"
# Polygon check must NOT be enabled for any files
HgtFilesProcessor_mock.return_value.process_files.assert_called_once_with(
[("N45E007.hgt", False), ("N46E007.hgt", False), ("N47E007.hgt", False)]
)

0 comments on commit c1c1c9b

Please sign in to comment.