Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait wrapper around Batch V1/V2 user action calls until the status changes #549

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion sentinelhub/api/batch/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
# do not use `from __future__ import annotations`, it clashes with `dataclass_json`
import datetime as dt
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from functools import wraps
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar, Union

from dataclasses_json import CatchAll, LetterCase, Undefined, dataclass_json
from dataclasses_json import config as dataclass_config
Expand All @@ -25,10 +27,33 @@

LOGGER = logging.getLogger(__name__)

T = TypeVar("T", bound="SentinelHubBatch")

BatchRequestType = Union[str, dict, "BatchRequest"]
BatchCollectionType = Union[str, dict, "BatchCollection"]


def batch_user_action_wait_for_status_change(func: Callable) -> Callable:
"""Decorator function for waiting for a status change after a user action."""

@wraps(func)
def retrying_func(self: T, batch_request: "BatchRequest") -> Json:
status = batch_request.status
output = func(self, batch_request)
for wait_time in [1, 2, 5, 10, 20, 100]:
time.sleep(wait_time)
new_status = self.get_request(batch_request).status
if new_status != status:
return output

LOGGER.warning(
"Status of batch request %s did not change from %s in time!", batch_request.request_id, status.value
)
return output

return retrying_func


class BatchTileStatus(Enum):
"""An enum class with all possible batch tile statuses"""

Expand Down Expand Up @@ -280,6 +305,7 @@ def delete_request(self, batch_request: BatchRequestType) -> Json:
url=self._get_processing_url(request_id), request_type=RequestType.DELETE, use_session=True
)

@batch_user_action_wait_for_status_change
def start_analysis(self, batch_request: BatchRequestType) -> Json:
"""Starts analysis of a batch job request

Expand All @@ -290,6 +316,7 @@ def start_analysis(self, batch_request: BatchRequestType) -> Json:
"""
return self._call_job(batch_request, "analyse")

@batch_user_action_wait_for_status_change
def start_job(self, batch_request: BatchRequestType) -> Json:
"""Starts running a batch job

Expand All @@ -300,6 +327,7 @@ def start_job(self, batch_request: BatchRequestType) -> Json:
"""
return self._call_job(batch_request, "start")

@batch_user_action_wait_for_status_change
def cancel_job(self, batch_request: BatchRequestType) -> Json:
"""Cancels a batch job

Expand All @@ -311,6 +339,7 @@ def cancel_job(self, batch_request: BatchRequestType) -> Json:
"""
return self._call_job(batch_request, "cancel")

@batch_user_action_wait_for_status_change
def restart_job(self, batch_request: BatchRequestType) -> Json:
"""Restarts only those parts of a job that failed

Expand Down
4 changes: 4 additions & 0 deletions sentinelhub/api/batch/process_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ..process import SentinelHubRequest
from ..utils import AccessSpecification, datetime_config, enum_config, remove_undefined, s3_specification
from .base import BaseBatchClient, BaseBatchRequest, BatchRequestStatus, BatchUserAction, StoppedStatusReason
from .process import batch_user_action_wait_for_status_change

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -249,20 +250,23 @@ def update_request(self, batch_request: BatchRequestType, description: str) -> J
use_session=True,
)

@batch_user_action_wait_for_status_change
def start_analysis(self, batch_request: BatchRequestType) -> Json:
"""Starts analysis of a batch job request

:param batch_request: Batch request ID, a dictionary containing an "ID" field, or a BatchProcessRequest.
"""
return self._call_job(batch_request, "analyse")

@batch_user_action_wait_for_status_change
def start_job(self, batch_request: BatchRequestType) -> Json:
"""Starts running a batch job

:param batch_request: Batch request ID, a dictionary containing an "ID" field, or a BatchProcessRequest.
"""
return self._call_job(batch_request, "start")

@batch_user_action_wait_for_status_change
def stop_job(self, batch_request: BatchRequestType) -> Json:
"""Stops a batch job

Expand Down
85 changes: 64 additions & 21 deletions tests/api/batch/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sentinelhub import (
CRS,
BatchRequest,
BatchRequestStatus,
BBox,
DataCollection,
MimeType,
Expand Down Expand Up @@ -44,6 +45,10 @@ def test_single_tiling_grid(batch_client: SentinelHubBatch) -> None:
assert isinstance(tiling_grid, dict)


def count_batch_get_requests(requests_list: list, request_id: str) -> int:
return len([r for r in requests_list if r.url.endswith(request_id)])


def test_create_and_run_batch_request(batch_client: SentinelHubBatch, requests_mock: Mocker) -> None:
"""A test that mocks creation and execution of a new batch request"""
evalscript = "some evalscript"
Expand All @@ -65,26 +70,20 @@ def test_create_and_run_batch_request(batch_client: SentinelHubBatch, requests_m

requests_mock.post("/oauth/token", real_http=True)
request_id = "mocked-id"
requests_mock.post(
"/api/v1/batch/process",
[
{
"json": {
"id": request_id,
"processRequest": {
"input": {
"bounds": {
"bbox": list(bbox),
"properties": {"crs": "http://www.opengis.net/def/crs/OGC/1.3/CRS84"},
}
}
},
"tileCount": 42,
"status": "CREATED",
request_payload = {
"id": request_id,
"processRequest": {
"input": {
"bounds": {
"bbox": list(bbox),
"properties": {"crs": "http://www.opengis.net/def/crs/OGC/1.3/CRS84"},
}
}
],
)
},
"tileCount": 42,
"status": "CREATED",
}
requests_mock.post("/api/v1/batch/process", [{"json": request_payload}])

batch_request = batch_client.create(
sentinelhub_request,
Expand All @@ -104,18 +103,62 @@ def test_create_and_run_batch_request(batch_client: SentinelHubBatch, requests_m
batch_client.delete_request(batch_request)
requests_mock.request_history[-1].url.endswith(delete_endpoint)

endpoints = ["analyse", "start", "cancel", "restartpartial"]
endpoints = ["analyse", "start", "restartpartial", "cancel"]
full_endpoints = [f"/api/v1/batch/process/{request_id}/{endpoint}" for endpoint in endpoints]
for full_endpoint in full_endpoints:
requests_mock.post(full_endpoint, [{"json": ""}])

# test start analysis
batch_request.status = BatchRequestStatus.CREATED
requests_mock.get(
f"/api/v1/batch/process/{request_id}",
[
{"json": {**request_payload, "status": "CREATED"}},
{"json": {**request_payload, "status": "ANALYSIS_DONE"}},
],
)
batch_client.start_analysis(batch_request)
assert count_batch_get_requests(requests_mock.request_history, request_id) == 3

# test start job
batch_request.status = BatchRequestStatus.ANALYSIS_DONE
requests_mock.get(
f"/api/v1/batch/process/{request_id}",
[
{"json": {**request_payload, "status": "ANALYSIS_DONE"}},
{"json": {**request_payload, "status": "PROCESSING"}},
],
)
batch_client.start_job(batch_request)
batch_client.cancel_job(batch_request)
assert count_batch_get_requests(requests_mock.request_history, request_id) == 5

# test restart job
batch_request.status = BatchRequestStatus.PARTIAL
requests_mock.get(
f"/api/v1/batch/process/{request_id}",
[
{"json": {**request_payload, "status": "PARTIAL"}},
{"json": {**request_payload, "status": "PROCESSING"}},
],
)
batch_client.restart_job(batch_request)
assert count_batch_get_requests(requests_mock.request_history, request_id) == 7

# test cancel job
batch_request.status = BatchRequestStatus.PROCESSING
requests_mock.get(
f"/api/v1/batch/process/{request_id}",
[
{"json": {**request_payload, "status": "PROCESSING"}},
{"json": {**request_payload, "status": "CANCELED"}},
],
)
batch_client.cancel_job(batch_request)
assert count_batch_get_requests(requests_mock.request_history, request_id) == 9

requests_history = [r for r in requests_mock.request_history if not r.url.endswith(request_id)]
for index, full_endpoint in enumerate(full_endpoints):
assert requests_mock.request_history[index - len(full_endpoints)].url.endswith(full_endpoint)
assert requests_history[index - len(full_endpoints)].url.endswith(full_endpoint)


def test_iter_requests(batch_client: SentinelHubBatch) -> None:
Expand Down
70 changes: 51 additions & 19 deletions tests/api/batch/test_process_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
CRS,
BatchProcessClient,
BatchProcessRequest,
BatchRequestStatus,
BBox,
DataCollection,
MimeType,
Expand Down Expand Up @@ -40,6 +41,10 @@ def test_single_tiling_grid(batch_client: BatchProcessClient) -> None:
assert isinstance(tiling_grid, dict)


def count_batch_get_requests(requests_list: list, request_id: str) -> int:
return len([r for r in requests_list if r.url.endswith(request_id)])


def test_create_and_run_batch_request(batch_client: BatchProcessClient, requests_mock: Mocker) -> None:
"""A test that mocks creation and execution of a new batch request"""
evalscript = "some evalscript"
Expand All @@ -61,26 +66,20 @@ def test_create_and_run_batch_request(batch_client: BatchProcessClient, requests

requests_mock.post("/oauth/token", real_http=True)
request_id = "mocked-id"
requests_mock.post(
"/api/v2/batch/process",
[
{
"json": {
"id": request_id,
"domainAccountId": 0,
"request": {
"input": {
"bounds": {
"bbox": list(bbox),
"properties": {"crs": "http://www.opengis.net/def/crs/OGC/1.3/CRS84"},
}
}
},
"status": "CREATED",
request_payload = {
"id": request_id,
"domainAccountId": 0,
"request": {
"input": {
"bounds": {
"bbox": list(bbox),
"properties": {"crs": "http://www.opengis.net/def/crs/OGC/1.3/CRS84"},
}
}
],
)
},
"status": "CREATED",
}
requests_mock.post("/api/v2/batch/process", [{"json": request_payload}])

batch_request = batch_client.create(
sentinelhub_request,
Expand All @@ -98,12 +97,45 @@ def test_create_and_run_batch_request(batch_client: BatchProcessClient, requests
for full_endpoint in full_endpoints:
requests_mock.post(full_endpoint, [{"json": ""}])

# test start analysis
batch_request.status = BatchRequestStatus.CREATED
requests_mock.get(
f"/api/v2/batch/process/{request_id}",
[
{"json": {**request_payload, "status": "CREATED"}},
{"json": {**request_payload, "status": "ANALYSIS_DONE"}},
],
)
batch_client.start_analysis(batch_request)
assert count_batch_get_requests(requests_mock.request_history, request_id) == 2

# test start job
batch_request.status = BatchRequestStatus.ANALYSIS_DONE
requests_mock.get(
f"/api/v2/batch/process/{request_id}",
[
{"json": {**request_payload, "status": "ANALYSIS_DONE"}},
{"json": {**request_payload, "status": "PROCESSING"}},
],
)
batch_client.start_job(batch_request)
assert count_batch_get_requests(requests_mock.request_history, request_id) == 4

# test stop job
batch_request.status = BatchRequestStatus.PROCESSING
requests_mock.get(
f"/api/v2/batch/process/{request_id}",
[
{"json": {**request_payload, "status": "PROCESSING"}},
{"json": {**request_payload, "status": "STOPPED"}},
],
)
batch_client.stop_job(batch_request)
assert count_batch_get_requests(requests_mock.request_history, request_id) == 6

requests_history = [r for r in requests_mock.request_history if not r.url.endswith(request_id)]
for index, full_endpoint in enumerate(full_endpoints):
assert requests_mock.request_history[index - len(full_endpoints)].url.endswith(full_endpoint)
assert requests_history[index - len(full_endpoints)].url.endswith(full_endpoint)


def test_iter_requests(batch_client: BatchProcessClient) -> None:
Expand Down
Loading