Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge standalone climate service into shared context #5

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ services:
env_file:
- .env
volumes:
- ./climate-python:/jupyter/climate-python
- ./mimi-api:/jupyter/mimi-api
- ./src/beaker_climate:/jupyter/beaker_climate
working_dir: /jupyter
command: ["beaker", "dev", "watch", "--ip", "0.0.0.0"]
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ dependencies = [
"h5netcdf~=1.3.0",
"netcdf4~=1.6.5",
"cftime~=1.6.3",
"esgf-pyclient~=0.3.1",
"dask",
]

[project.urls]
Expand Down
205 changes: 205 additions & 0 deletions src/beaker_climate/beaker_climate/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import json
import logging
import re
from typing import Optional
import codecs

import pandas
import matplotlib.pyplot as plt
import xarray as xr

from archytas.react import Undefined
from archytas.tool_utils import AgentRef, LoopControllerRef, ReactContextRef, tool

from beaker_kernel.lib import BeakerAgent
from beaker_kernel.lib.context import BaseContext

from pathlib import Path

logger = logging.getLogger(__name__)

from time import sleep
from .search.esgf_search import ESGFProvider

class ClimateDataUtilityAgent(BeakerAgent):
"""
You are assisting us in modifying geo-temporal datasets.

The main things you are going to do are regridding spatial datasets, temporally rescaling datasets, and clipping the extent of geo-temporal datasets.

If you don't have the details necessary to use a tool, you should use the ask_user tool to ask the user for them.
"""
def __init__(self, context: BaseContext = None, tools: list = None, **kwargs):
self.logger = logger
super().__init__(context, tools, **kwargs)

documentation_path=Path(__file__).parent / "api_documentation" / "climate_search.md"
initial_context_msg_added = False
while not initial_context_msg_added:
with open(documentation_path, 'r') as f:
try:
self.add_context(f'''\
The Earth System Grid Federation (ESGF) is a global collaboration that manages and distributes climate and environmental science data.
It serves as the primary platform for accessing CMIP (Coupled Model Intercomparison Project) data and other climate model outputs.
The federation provides a distributed database and delivery system for climate science data, particularly model outputs and observational data.
Through ESGF, users can search, discover and access climate datasets from major modeling centers and research institutions worldwide.
The system supports authentication, search capabilities, and data transfer protocols optimized for large scientific datasets.

If datasets are loaded, use xarray with the OpenDAP URL.
If the user asks to download a dataset, ask them if they are sure they want to download it.

Additionally, any data downloaded should be downloaded to the './data/' directory.
Please ensure the code makes sure this location exists, and all downloaded data is saved to this location.

Provided below is the comprehensive documentation of the climate-search tools that you have access to.
ALWAYS reference this when using the climate-search tools.
```
{f.read()}
```
''')
initial_context_msg_added = True
except Exception as e:
sleep(0.5)
self.esgf = ESGFProvider(self.oneshot)


@tool()
async def search(self, query: str, agent: AgentRef, loop: LoopControllerRef, react_context: ReactContextRef) -> dict:
"""
This tool searches ESGF for datasets.
Save the UNMODIFIED JSON output to a variable in the user's notebook.

Args:
query (str): The user's query to pass to the climate search tool.
Returns:
dict: ESGF unmodified JSON output to be saved to a variable in the notebook.
"""
try:
return await self.esgf.tool_search(query)
except Exception as e:
self.add_context(f"The tool failed with this error: {str(e)}. I need to inform the user about this immediately before deciding what to do next. I need to tell the user the exact error with zero summarization.")
return {}


@tool()
async def fetch(self, dataset_id: str, agent: AgentRef, loop: LoopControllerRef, react_context: ReactContextRef) -> dict:
"""
This tool fetches URLS for datasets.

Args:
dataset_id (str): The user's query to pass to the climate search tool.
Returns:
dict: ESGF fetch results
"""
try:
return self.esgf.tool_fetch(dataset_id)
except Exception as e:
self.add_context(f"The tool failed with this error: {str(e)}. I should inform the user immediately with the full text of the error.")
return {}

@tool()
async def regrid_dataset(
self,
dataset: str,
target_resolution: tuple,
agent: AgentRef,
loop: LoopControllerRef,
aggregation: Optional[str] = "interp_or_mean",
) -> str:
"""
This tool should be used to show the user code to regrid a netcdf dataset with detectable geo-resolution.

If a user asks to regrid a dataset, use this tool to return them code to regrid the dataset.

If you are given a netcdf dataset, use this tool instead of any other regridding tool.

If you are asked about what is needed to regrid a dataset, please provide information about the arguments of this tool.

Args:
dataset (str): The name of the dataset instantiated in the jupyter notebook.
target_resolution (tuple): The target resolution to regrid to, e.g. (0.5, 0.5). This is in degrees longitude and latitude.
aggregation (Optional): The aggregation function to be used in the regridding. The options are as follows:
'conserve'
'min'
'max'
'mean'
'median'
'mode'
'interp_or_mean'
'nearest_or_mode'

Returns:
str: Status of whether or not the dataset has been persisted to the HMI server.
"""

loop.set_state(loop.STOP_SUCCESS)
code = agent.context.get_code(
"flowcast_regridding",
{
"dataset": dataset,
"target_resolution": target_resolution,
"aggregation": aggregation,
},
)

result = json.dumps(
{
"action": "code_cell",
"language": "python3",
"content": code.strip(),
}
)

return result

@tool()
async def get_netcdf_plot(
self,
dataset_variable_name: str,
agent: AgentRef,
loop: LoopControllerRef,
plot_variable_name: Optional[str] = None,
lat_col: Optional[str] = "lat",
lon_col: Optional[str] = "lon",
time_slice_index: Optional[int] = 1,
) -> str:
"""
This function should be used to get a plot of a netcdf dataset.

This function should also be used to preview any netcdf dataset.

If the user asks to plot or preview a dataset, use this tool to return plotting code to them.

You should also ask if the user wants to specify the optional arguments by telling them what each argument does.

Args:
dataset_variable_name (str): The name of the dataset instantiated in the jupyter notebook.
plot_variable_name (Optional): The name of the variable to plot. Defaults to None.
If None is provided, the first variable in the dataset will be plotted.
lat_col (Optional): The name of the latitude column. Defaults to 'lat'.
lon_col (Optional): The name of the longitude column. Defaults to 'lon'.
time_slice_index (Optional): The index of the time slice to visualize. Defaults to 1.

Returns:
str: The code used to plot the netcdf.
"""

code = agent.context.get_code(
"get_netcdf_plot",
{
"dataset": dataset_variable_name,
"plot_variable_name": plot_variable_name,
"lat_col": lat_col,
"lon_col": lon_col,
"time_slice_index": time_slice_index,
},
)

result = await agent.context.evaluate(
code,
parent_header={},
)

output = result.get("return")

return output
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# climate-data

On first context launch, caching data for search will be created - this may take around a minute.

## Structure

The way this set of tools works follows a very specific workflow.
* Search for datasets with the `search` tool.
* Use the **dataset ID** and no other fields to pass to the `fetch` tool to download.
* Fetch will give URLs to download from; if you are downloading a dataset for the user, use the HTTP protocol set of URLs returned from the `fetch` tool.

Do not use the tools in other ways than this.

The `search` tool will return a JSON payload. Inside the response body, `"results"` is a list containing dataset metadata bundles.

To fetch one of them, a list entry in `"results"` will have a `"metadata"` field containing an `"id"` field. **The `"id"` field is what should be passed to the `fetch` tool to download a file.

### CMIP6 (ESGF)

By default, climate-data will search all possible given mirrors for reliability - for endpoints, IDs with mirrors associated in the following form: (`CMIP6.CMIP.NCAR.CESM2.historical.r11i1p1f1.CFday.ua.gn.v20190514|esgf-data.ucar.edu`) should be considered **interchangeable** with mirrorless versions (`CMIP6.CMIP.NCAR.CESM2.historical.r11i1p1f1.CFday.ua.gn.v20190514`). Mirrorless versions should be considered the preferred form.

#### Search Tool

Required Parameters:
* `query`: Natural language string with search terms to retrieve datasets for.

Example: `/search/esgf?query=historical eastward wind 100 km cesm2 r11i1p1f1 cfday`

Output:
```json
{
"results": [
{
"metadata": {
"id": "CMIP6.CMIP.NCAR.CESM2.historical.r11i1p1f1.CFday.ua.gn.v20190514|aims3.llnl.gov",
"version": "20190514"...
}
}, ...
]
}
```

`results` is a list of datasets, sorted by relevance.

Each dataset contains a `metadata` field.

`metadata` contains all of the stored metadata for the data set, provided by ESGF, such as experiment name, title, variables, geospatial coordinates, time, frequency, resolution, and more.

The filesize in bytes of the dataset is in the `size` field of the metadata. Listing metadata attributes about datasets to the user is very useful. Convert sizes to human readable values such as MB or GB, as well as when asked to describe the dataset, mention coordinates, frequency, and resolution as important details.

**If the user asks for information, mention filesize in human readable units, frequency, resolution, and variable. Summarize the metadata, DO NOT print it to stdout.**

The `metadata` field contains an `id` field that is used for subsequent processing and lookups, containing the full dataset ID with revision and node information, such as: `CMIP6.CMIP.NCAR.CESM2.historical.r11i1p1f1.CFday.ua.gn.v20190514|esgf-data.ucar.edu`

#### Fetch Tool

Required Parameters:
* `dataset_id`: ID of the dataset provided by search in full format.

Example:
`/fetch/esgf?dataset_id=CMIP6.CMIP.NCAR.CESM2.historical.r11i1p1f1.CFday.ua.gn.v20190514|esgf-data.ucar.edu`

Output:
```json
{
"dataset": "CMIP6.CMIP....",
"urls": [
{
"http": [
"http://esgf-data.node.example/http/part1...",
"http://esgf-data.node.example/http/part2..."
],
"opendap": [
"http://esgf-data.node.example/opendap/part1...",
"http://esgf-data.node.example/openda[/part2..."
]
},
],
"metadata": {}
}
```

The `urls` field returns a list of dicts mapping **protocol** to **a list of URLs** that comprise the download for each dataset. These files may be large, so they may be one singular download url or multipart, with multiple URLs.

HTTP urls are provided for plain downloads.

OpenDAP supports `xarray.open_mfdataset()` for lazy network usage and disk usage.
22 changes: 22 additions & 0 deletions src/beaker_climate/beaker_climate/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging
import os
from typing import TYPE_CHECKING, Any, Dict

from archytas.tool_utils import LoopControllerRef

from beaker_kernel.lib import BeakerContext
from beaker_kernel.lib.utils import intercept

from .agent import ClimateDataUtilityAgent

if TYPE_CHECKING:
from beaker_kernel.lib import BeakerContext

logger = logging.getLogger(__name__)

class ClimateDataUtilityContext(BeakerContext):
compatible_subkernels = ["python3"]
SLUG = "beaker_climate"

def __init__(self, beaker_kernel: "BeakerKernel", config: Dict[str, Any]) -> None:
super().__init__(beaker_kernel, ClimateDataUtilityAgent, config)
Loading