Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging with open telemetry traces #9083

Merged
merged 13 commits into from
Nov 1, 2024
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ dependencies = [
"netCDF4",
"numpy<2",
"openpyxl", # extra dependency for pandas (excel)
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-instrumentation-threading",
"orjson",
"packaging",
"pandas",
Expand Down
18 changes: 16 additions & 2 deletions src/ert/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from uuid import UUID

import yaml
from opentelemetry.instrumentation.threading import ThreadingInstrumentor
from opentelemetry.trace import Status, StatusCode

import ert.shared
from _ert.threading import set_signal_handler
Expand All @@ -33,6 +35,7 @@
from ert.run_models.multiple_data_assimilation import MultipleDataAssimilation
from ert.services import StorageService, WebvizErt
from ert.shared.storage.command import add_parser_options as ert_api_add_parser_options
from ert.trace import trace, tracer, tracer_provider
from ert.validation import (
IntegerArgument,
NumberListStringArgument,
Expand Down Expand Up @@ -646,12 +649,15 @@ def log_process_usage() -> None:
)


@tracer.start_as_current_span("ert.application.start")
def main() -> None:
span = trace.get_current_span()
warnings.filterwarnings("ignore", category=DeprecationWarning)
locale.setlocale(locale.LC_NUMERIC, "C")

# Have ErtThread re-raise uncaught exceptions on main thread
set_signal_handler()
ThreadingInstrumentor().instrument()

args = ert_parser(None, sys.argv[1:])

Expand All @@ -676,19 +682,26 @@ def main() -> None:
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
root_logger.addHandler(handler)

try:
with ErtPluginContext(logger=logging.getLogger()) as context:
with ErtPluginContext(
logger=logging.getLogger(), trace_provider=tracer_provider
) as context:
logger.info(f"Running ert with {args}")
args.func(args, context.plugin_manager)
except ErtCliError as err:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(err)
logger.debug(str(err))
sys.exit(str(err))
except ConfigValidationError as err:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(err)
err_msg = err.cli_message()
logger.debug(err_msg)
sys.exit(err_msg)
except BaseException as err:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(err)
logger.exception(f'ERT crashed unexpectedly with "{err}"')

logfiles = set() # Use set to avoid duplicates...
Expand All @@ -703,6 +716,7 @@ def main() -> None:
finally:
log_process_usage()
os.environ.pop("ERT_LOG_DIR")
ThreadingInstrumentor().uninstrument()


if __name__ == "__main__":
Expand Down
6 changes: 5 additions & 1 deletion src/ert/plugins/hook_specifications/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
job_documentation,
legacy_ertscript_workflow,
)
from .logging import add_log_handle_to_root
from .logging import (
add_log_handle_to_root,
add_span_processor,
)
from .site_config import site_config_lines

__all__ = [
"add_log_handle_to_root",
"add_span_processor",
"ecl100_config_path",
"ecl300_config_path",
"flow_config_path",
Expand Down
12 changes: 12 additions & 0 deletions src/ert/plugins/hook_specifications/logging.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

from opentelemetry.sdk.trace.export import BatchSpanProcessor

from ert.plugins.plugin_manager import hook_specification


Expand All @@ -11,3 +13,13 @@ def add_log_handle_to_root() -> logging.Handler: # type: ignore

:return: A log handle that will be added to the root logger
"""


@hook_specification
def add_span_processor() -> BatchSpanProcessor: # type: ignore
"""
Create a BatchSpanProcessor which will be added to the trace provider
in ert.

:return: A BatchSpanProcessor that will be added to the trace provider in ert
"""
14 changes: 14 additions & 0 deletions src/ert/plugins/plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)

import pluggy
from opentelemetry.sdk.trace import TracerProvider

from .workflow_config import WorkflowConfigs

Expand Down Expand Up @@ -322,17 +323,26 @@ def add_logging_handle_to_root(self, logger: logging.Logger) -> None:
for handle in handles:
logger.addHandler(handle)

def add_span_processor_to_trace_provider(
self, trace_provider: TracerProvider
) -> None:
span_processors = self.hook.add_span_processor()
for span_processor in span_processors:
trace_provider.add_span_processor(span_processor)


class ErtPluginContext:
def __init__(
self,
plugins: Optional[List[object]] = None,
logger: Optional[logging.Logger] = None,
trace_provider: Optional[TracerProvider] = None,
) -> None:
self.plugin_manager = ErtPluginManager(plugins=plugins)
self.tmp_dir: Optional[str] = None
self.tmp_site_config_filename: Optional[str] = None
self._logger = logger
self._trace_provider = trace_provider

def _create_site_config(self, tmp_dir: str) -> Optional[str]:
site_config_content = self.plugin_manager.get_site_config_content()
Expand All @@ -348,6 +358,10 @@ def _create_site_config(self, tmp_dir: str) -> Optional[str]:
def __enter__(self) -> ErtPluginContext:
if self._logger is not None:
self.plugin_manager.add_logging_handle_to_root(logger=self._logger)
if self._trace_provider is not None:
self.plugin_manager.add_span_processor_to_trace_provider(
trace_provider=self._trace_provider
)
logger.debug(str(self.plugin_manager))
logger.debug("Creating temporary directory for site-config")
self.tmp_dir = tempfile.mkdtemp()
Expand Down
4 changes: 4 additions & 0 deletions src/ert/run_models/base_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from ert.mode_definitions import MODULE_MODE
from ert.runpaths import Runpaths
from ert.storage import Ensemble, Storage
from ert.trace import tracer
from ert.workflow_runner import WorkflowRunner

from ..config.analysis_config import UpdateSettings
Expand Down Expand Up @@ -318,6 +319,7 @@ def _clean_env_context(self) -> None:
self._context_env.pop(key)
os.environ.pop(key, None)

@tracer.start_as_current_span(f"{__name__}.start_simulations_thread")
def start_simulations_thread(
self,
evaluator_server_config: EvaluatorServerConfig,
Expand Down Expand Up @@ -573,6 +575,7 @@ async def run_ensemble_evaluator_async(
return evaluator_task.result()

# This function needs to be there for the sake of testing that expects sync ee run
@tracer.start_as_current_span(f"{__name__}.run_ensemble_evaluator")
def run_ensemble_evaluator(
self,
run_args: List[RunArg],
Expand Down Expand Up @@ -654,6 +657,7 @@ def validate(self) -> None:
f"({min_realization_count})"
)

@tracer.start_as_current_span(f"{__name__}.run_workflows")
def run_workflows(
self, runtime: HookRuntime, storage: Storage, ensemble: Ensemble
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/ensemble_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Ensemble, Experiment, Storage
from ert.trace import tracer

from ..run_arg import create_run_arguments
from .base_run_model import BaseRunModel, StatusEvents
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(
minimum_required_realizations=minimum_required_realizations,
)

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self,
evaluator_server_config: EvaluatorServerConfig,
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Storage
from ert.trace import tracer

from ..config.analysis_config import UpdateSettings
from ..config.analysis_module import ESSettings
Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(

self.support_restart = False

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/evaluate_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Storage
from ert.trace import tracer

from ..run_arg import create_run_arguments
from . import BaseRunModel
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
random_seed=random_seed,
)

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/iterated_ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Ensemble, Storage
from ert.trace import tracer

from ..config.analysis_config import UpdateSettings
from ..config.analysis_module import IESSettings
Expand Down Expand Up @@ -117,6 +118,7 @@ def analyzeStep(
) from e
self.run_workflows(HookRuntime.POST_UPDATE, self._storage, posterior_storage)

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/multiple_data_assimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Ensemble, Storage
from ert.trace import tracer

from ..config.analysis_config import UpdateSettings
from ..config.analysis_module import ESSettings
Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(
minimum_required_realizations=minimum_required_realizations,
)

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
Expand Down
50 changes: 26 additions & 24 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ert.constant_filenames import ERROR_file
from ert.load_status import LoadStatus
from ert.storage.realization_storage_state import RealizationStorageState
from ert.trace import tracer

from .driver import Driver, FailedSubmit

Expand Down Expand Up @@ -147,30 +148,31 @@ async def run(
checksum_lock: asyncio.Lock,
max_submit: int = 1,
) -> None:
self._requested_max_submit = max_submit
for attempt in range(max_submit):
await self._submit_and_run_once(sem)

if self.returncode.cancelled() or self._scheduler._cancelled:
break

if self.returncode.result() == 0:
if self._scheduler._manifest_queue is not None:
await self._verify_checksum(checksum_lock)
async with forward_model_ok_lock:
await self._handle_finished_forward_model()
break

if attempt < max_submit - 1:
message = (
f"Realization {self.iens} failed, "
f"resubmitting for attempt {attempt+2} of {max_submit}"
)
logger.warning(message)
self.returncode = asyncio.Future()
self.started.clear()
else:
await self._send(JobState.FAILED)
with tracer.start_as_current_span(f"{__name__}.run.realization_{self.iens}"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we might be a little lucky here since the scope will cover most, if not all, of the functions in this class. 👍

self._requested_max_submit = max_submit
for attempt in range(max_submit):
await self._submit_and_run_once(sem)

if self.returncode.cancelled() or self._scheduler._cancelled:
break

if self.returncode.result() == 0:
if self._scheduler._manifest_queue is not None:
await self._verify_checksum(checksum_lock)
async with forward_model_ok_lock:
await self._handle_finished_forward_model()
break

if attempt < max_submit - 1:
message = (
f"Realization {self.iens} failed, "
f"resubmitting for attempt {attempt+2} of {max_submit}"
)
logger.warning(message)
self.returncode = asyncio.Future()
self.started.clear()
else:
await self._send(JobState.FAILED)

async def _max_runtime_task(self) -> None:
assert self.real.max_runtime is not None
Expand Down
11 changes: 11 additions & 0 deletions src/ert/trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import SpanLimits, TracerProvider

resource = Resource(attributes={SERVICE_NAME: "ert"})
tracer_provider = TracerProvider(
resource=resource, span_limits=SpanLimits(max_events=128 * 16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this number is much higher than what we really need, but let's leave it like this for the time being.
Events within a span are meant for noticeable changes like exceptions, outcomes and such.

)
trace.set_tracer_provider(tracer_provider)

tracer = trace.get_tracer("ert.main")
11 changes: 11 additions & 0 deletions tests/ert/unit_tests/plugins/dummy_plugins.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import logging
from io import StringIO

from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

from ert import ForwardModelStepPlugin, plugin

Expand Down Expand Up @@ -76,6 +79,14 @@ def add_log_handle_to_root():
return fh


span_output = StringIO()


@plugin(name="dummy")
def add_span_processor():
return BatchSpanProcessor(ConsoleSpanExporter(out=span_output))


class DummyFMStep(ForwardModelStepPlugin):
def __init__(self):
super().__init__(name="DummyForwardModel", command=["touch", "dummy.out"])
Expand Down
Loading