-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add job database implementation that uses stac #619
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is quite a large PR and I didn't go through all of it already, just some initial comments
openeo/extra/job_management.py
Outdated
@@ -70,14 +70,6 @@ def exists(self) -> bool: | |||
"""Does the job database already exist, to read job data from?""" | |||
... | |||
|
|||
@abc.abstractmethod | |||
def read(self) -> pd.DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can already remove this as a required method to implement:
it's still being called from start_job_thread
:
df = job_db.read() |
however, It seems to be unused there, so we could actually remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with 29d9b16 (in master branch) I now moved read
from JobDatabaseInterface
to FullDataFrameJobDatabase
openeo/extra/job_management.py
Outdated
@@ -112,6 +104,20 @@ def get_by_status(self, statuses: List[str], max=None) -> pd.DataFrame: | |||
""" | |||
... | |||
|
|||
@abc.abstractmethod | |||
def initialize_from_df(self, df: pd.DataFrame, on_exists: str = "error") -> "JobDatabaseInterface": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this method must be a required part of the JobDatabaseInterface
interface. There are still some conceptual issues with initialize_from_df
at the moment (e.g. see #667), so we might want to avoid painting ourselves in the corner here.
initialize_from_df
is not something that should be called automatically from within the job manager. It is explicitly intended for end users to call explicitly on their job database. So in that sense, it should not be part of the JobDatabaseInterface
interface, which is a contract between the developer of the database and job manager, not a contract between user and job manager.
openeo/extra/stac_job_db.py
Outdated
@@ -0,0 +1,303 @@ | |||
import concurrent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should start making job_manager
a package instead of module, so that files are organised like
- openeo
- extra
- job_management
- __init__.py
- stac_job_db.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With a811bff I made openeo.extra.job_management
a package now, so you can move this new module to openeo.extra.job_management.stac_job_db
now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also merge master now in this feature branch to resolve all conflicts.
so make sure to pull first before continuing on this feature branch
openeo/extra/stac_job_db.py
Outdated
|
||
|
||
def exists(self) -> bool: | ||
return len([c.id for c in self.client.get_collections() if c.id == self.collection_id]) > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: this is a bit simpler and cheaper:
return len([c.id for c in self.client.get_collections() if c.id == self.collection_id]) > 0 | |
return any(c.id == self.collection_id for c in self.client.get_collections()) |
openeo/extra/stac_job_db.py
Outdated
dt = item_dict["properties"]["datetime"] | ||
item_dict["datetime"] = pystac.utils.str_to_datetime(dt) | ||
|
||
return pd.Series(item_dict["properties"], name=item_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you only use item_dict["properties"]
here so the line above that sets item_dict["datetime"]
has no use?
openeo/extra/stac_job_db.py
Outdated
import pystac | ||
import requests | ||
from pystac import Collection, Item | ||
from pystac_client import Client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed, pystac_client is an optional dependency at the moment.
you should document that at
openeo-python-client/docs/installation.rst
Lines 82 to 94 in 47172cd
Optional dependencies | |
====================== | |
Depending on your use case, you might also want to install some additional libraries. | |
For example: | |
- ``netCDF4`` or ``h5netcdf`` for loading and writing NetCDF files (e.g. integrated in ``xarray.load_dataset()``) | |
- ``matplotlib`` for visualisation (e.g. integrated plot functionality in ``xarray`` ) | |
- ``pyarrow`` for (read/write) support of Parquet files | |
(e.g. with :py:class:`~openeo.extra.job_management.MultiBackendJobManager`) | |
- ``rioxarray`` for GeoTIFF support in the assert helpers from ``openeo.testing.results`` | |
- ``geopandas`` for working with dataframes with geospatial support, | |
(e.g. with :py:class:`~openeo.extra.job_management.MultiBackendJobManager`) |
tests/extra/test_stac_jobdb.py
Outdated
|
||
@pytest.fixture | ||
def mock_stac_api_job_database(mock_auth) -> STACAPIJobDatabase: | ||
return STACAPIJobDatabase(collection_id="test_id", stac_root_url="http://fake-stac-api", auth=mock_auth) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for dummy/fake URLs, use the .test
TLD, which is designed for especially for test situations
to prepare for future extensions, e.g. #619
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some more notes
@@ -0,0 +1,300 @@ | |||
import concurrent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import concurrent | |
import concurrent.futures |
return item | ||
|
||
def count_by_status(self, statuses: Iterable[str] = ()) -> dict: | ||
items = self.get_by_status(statuses, max=200) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statuses: Iterable[str]
only guarantees that you can iterate through it once.
You are using statuses
twice in this function, so the second time it might be an exhausted/empty collection.
It's best, to first make a copy of the statuses that you can use twice, e.g.
statuses = set(statuses)
item.add_link(pystac.Link(rel=pystac.RelType.COLLECTION, target=item.collection_id)) | ||
|
||
def _ingest_bulk(self, items: Iterable[Item]) -> dict: | ||
collection_id = items[0].collection_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because of items: Iterable[Item]
, it's not really valid to do items[0]
(Iterable
only allows to do a for loop, once, not random access with e.g. [0]
).
In this case, I think you should just use type annotation List[Item]
df = pd.DataFrame(series) | ||
if len(series) == 0: | ||
# TODO: What if default columns are overwritten by the user? | ||
df = MultiBackendJobManager._normalize_df( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using private MultiBackendJobManager._normalize_df
from get_by_status
looks a bit problematic.
It's also used from initialize_from_df
, which is ok at the moment, but subject to change (see #667)
Using it in another context than initialize_from_df
, like here, seems to indicate that we have to rethink all this initialization business. Not sure yet what to do instead
import pandas as pd | ||
import pystac | ||
import requests | ||
from pystac import Collection, Item |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's a matter of taste, but I would avoid importing generic names like Collection
and Item
in the global namespace like this.
I would just use pystac.Collection()
and the like in the code, which is a bit more typing work, but a lot clearer reading-wise
(same with Client
from pystac_client
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra advantage of import foo
over from foo import bar
: it reduces risk on circular import issues
To solve still: