Skip to content

Commit

Permalink
Improve logging with open telemetry traces (#9083)
Browse files Browse the repository at this point in the history
Add a span processor through the add_span_processor pluggin hook
to export trace information to e.g. azure
---------

Co-authored-by: Andreas Eknes Lie <andrli@equinor.com>
  • Loading branch information
HakonSohoel and andreas-el authored Nov 1, 2024
1 parent 7ee89c1 commit c34060c
Show file tree
Hide file tree
Showing 15 changed files with 131 additions and 27 deletions.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ dependencies = [
"netCDF4",
"numpy<2",
"openpyxl", # extra dependency for pandas (excel)
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-instrumentation-threading",
"orjson",
"packaging",
"pandas",
Expand Down
18 changes: 16 additions & 2 deletions src/ert/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from uuid import UUID

import yaml
from opentelemetry.instrumentation.threading import ThreadingInstrumentor
from opentelemetry.trace import Status, StatusCode

import ert.shared
from _ert.threading import set_signal_handler
Expand All @@ -33,6 +35,7 @@
from ert.run_models.multiple_data_assimilation import MultipleDataAssimilation
from ert.services import StorageService, WebvizErt
from ert.shared.storage.command import add_parser_options as ert_api_add_parser_options
from ert.trace import trace, tracer, tracer_provider
from ert.validation import (
IntegerArgument,
NumberListStringArgument,
Expand Down Expand Up @@ -646,12 +649,15 @@ def log_process_usage() -> None:
)


@tracer.start_as_current_span("ert.application.start")
def main() -> None:
span = trace.get_current_span()
warnings.filterwarnings("ignore", category=DeprecationWarning)
locale.setlocale(locale.LC_NUMERIC, "C")

# Have ErtThread re-raise uncaught exceptions on main thread
set_signal_handler()
ThreadingInstrumentor().instrument()

args = ert_parser(None, sys.argv[1:])

Expand All @@ -676,19 +682,26 @@ def main() -> None:
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
root_logger.addHandler(handler)

try:
with ErtPluginContext(logger=logging.getLogger()) as context:
with ErtPluginContext(
logger=logging.getLogger(), trace_provider=tracer_provider
) as context:
logger.info(f"Running ert with {args}")
args.func(args, context.plugin_manager)
except ErtCliError as err:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(err)
logger.debug(str(err))
sys.exit(str(err))
except ConfigValidationError as err:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(err)
err_msg = err.cli_message()
logger.debug(err_msg)
sys.exit(err_msg)
except BaseException as err:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(err)
logger.exception(f'ERT crashed unexpectedly with "{err}"')

logfiles = set() # Use set to avoid duplicates...
Expand All @@ -703,6 +716,7 @@ def main() -> None:
finally:
log_process_usage()
os.environ.pop("ERT_LOG_DIR")
ThreadingInstrumentor().uninstrument()


if __name__ == "__main__":
Expand Down
6 changes: 5 additions & 1 deletion src/ert/plugins/hook_specifications/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
job_documentation,
legacy_ertscript_workflow,
)
from .logging import add_log_handle_to_root
from .logging import (
add_log_handle_to_root,
add_span_processor,
)
from .site_config import site_config_lines

__all__ = [
"add_log_handle_to_root",
"add_span_processor",
"ecl100_config_path",
"ecl300_config_path",
"flow_config_path",
Expand Down
12 changes: 12 additions & 0 deletions src/ert/plugins/hook_specifications/logging.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

from opentelemetry.sdk.trace.export import BatchSpanProcessor

from ert.plugins.plugin_manager import hook_specification


Expand All @@ -11,3 +13,13 @@ def add_log_handle_to_root() -> logging.Handler: # type: ignore
:return: A log handle that will be added to the root logger
"""


@hook_specification
def add_span_processor() -> BatchSpanProcessor: # type: ignore
"""
Create a BatchSpanProcessor which will be added to the trace provider
in ert.
:return: A BatchSpanProcessor that will be added to the trace provider in ert
"""
14 changes: 14 additions & 0 deletions src/ert/plugins/plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)

import pluggy
from opentelemetry.sdk.trace import TracerProvider

from .workflow_config import WorkflowConfigs

Expand Down Expand Up @@ -322,17 +323,26 @@ def add_logging_handle_to_root(self, logger: logging.Logger) -> None:
for handle in handles:
logger.addHandler(handle)

def add_span_processor_to_trace_provider(
self, trace_provider: TracerProvider
) -> None:
span_processors = self.hook.add_span_processor()
for span_processor in span_processors:
trace_provider.add_span_processor(span_processor)


class ErtPluginContext:
def __init__(
self,
plugins: Optional[List[object]] = None,
logger: Optional[logging.Logger] = None,
trace_provider: Optional[TracerProvider] = None,
) -> None:
self.plugin_manager = ErtPluginManager(plugins=plugins)
self.tmp_dir: Optional[str] = None
self.tmp_site_config_filename: Optional[str] = None
self._logger = logger
self._trace_provider = trace_provider

def _create_site_config(self, tmp_dir: str) -> Optional[str]:
site_config_content = self.plugin_manager.get_site_config_content()
Expand All @@ -348,6 +358,10 @@ def _create_site_config(self, tmp_dir: str) -> Optional[str]:
def __enter__(self) -> ErtPluginContext:
if self._logger is not None:
self.plugin_manager.add_logging_handle_to_root(logger=self._logger)
if self._trace_provider is not None:
self.plugin_manager.add_span_processor_to_trace_provider(
trace_provider=self._trace_provider
)
logger.debug(str(self.plugin_manager))
logger.debug("Creating temporary directory for site-config")
self.tmp_dir = tempfile.mkdtemp()
Expand Down
4 changes: 4 additions & 0 deletions src/ert/run_models/base_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from ert.mode_definitions import MODULE_MODE
from ert.runpaths import Runpaths
from ert.storage import Ensemble, Storage
from ert.trace import tracer
from ert.workflow_runner import WorkflowRunner

from ..config.analysis_config import UpdateSettings
Expand Down Expand Up @@ -318,6 +319,7 @@ def _clean_env_context(self) -> None:
self._context_env.pop(key)
os.environ.pop(key, None)

@tracer.start_as_current_span(f"{__name__}.start_simulations_thread")
def start_simulations_thread(
self,
evaluator_server_config: EvaluatorServerConfig,
Expand Down Expand Up @@ -573,6 +575,7 @@ async def run_ensemble_evaluator_async(
return evaluator_task.result()

# This function needs to be there for the sake of testing that expects sync ee run
@tracer.start_as_current_span(f"{__name__}.run_ensemble_evaluator")
def run_ensemble_evaluator(
self,
run_args: List[RunArg],
Expand Down Expand Up @@ -654,6 +657,7 @@ def validate(self) -> None:
f"({min_realization_count})"
)

@tracer.start_as_current_span(f"{__name__}.run_workflows")
def run_workflows(
self, runtime: HookRuntime, storage: Storage, ensemble: Ensemble
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/ensemble_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Ensemble, Experiment, Storage
from ert.trace import tracer

from ..run_arg import create_run_arguments
from .base_run_model import BaseRunModel, StatusEvents
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(
minimum_required_realizations=minimum_required_realizations,
)

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self,
evaluator_server_config: EvaluatorServerConfig,
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Storage
from ert.trace import tracer

from ..config.analysis_config import UpdateSettings
from ..config.analysis_module import ESSettings
Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(

self.support_restart = False

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/evaluate_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Storage
from ert.trace import tracer

from ..run_arg import create_run_arguments
from . import BaseRunModel
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
random_seed=random_seed,
)

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/iterated_ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Ensemble, Storage
from ert.trace import tracer

from ..config.analysis_config import UpdateSettings
from ..config.analysis_module import IESSettings
Expand Down Expand Up @@ -117,6 +118,7 @@ def analyzeStep(
) from e
self.run_workflows(HookRuntime.POST_UPDATE, self._storage, posterior_storage)

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/ert/run_models/multiple_data_assimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Ensemble, Storage
from ert.trace import tracer

from ..config.analysis_config import UpdateSettings
from ..config.analysis_module import ESSettings
Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(
minimum_required_realizations=minimum_required_realizations,
)

@tracer.start_as_current_span(f"{__name__}.run_experiment")
def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
Expand Down
50 changes: 26 additions & 24 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ert.constant_filenames import ERROR_file
from ert.load_status import LoadStatus
from ert.storage.realization_storage_state import RealizationStorageState
from ert.trace import tracer

from .driver import Driver, FailedSubmit

Expand Down Expand Up @@ -147,30 +148,31 @@ async def run(
checksum_lock: asyncio.Lock,
max_submit: int = 1,
) -> None:
self._requested_max_submit = max_submit
for attempt in range(max_submit):
await self._submit_and_run_once(sem)

if self.returncode.cancelled() or self._scheduler._cancelled:
break

if self.returncode.result() == 0:
if self._scheduler._manifest_queue is not None:
await self._verify_checksum(checksum_lock)
async with forward_model_ok_lock:
await self._handle_finished_forward_model()
break

if attempt < max_submit - 1:
message = (
f"Realization {self.iens} failed, "
f"resubmitting for attempt {attempt+2} of {max_submit}"
)
logger.warning(message)
self.returncode = asyncio.Future()
self.started.clear()
else:
await self._send(JobState.FAILED)
with tracer.start_as_current_span(f"{__name__}.run.realization_{self.iens}"):
self._requested_max_submit = max_submit
for attempt in range(max_submit):
await self._submit_and_run_once(sem)

if self.returncode.cancelled() or self._scheduler._cancelled:
break

if self.returncode.result() == 0:
if self._scheduler._manifest_queue is not None:
await self._verify_checksum(checksum_lock)
async with forward_model_ok_lock:
await self._handle_finished_forward_model()
break

if attempt < max_submit - 1:
message = (
f"Realization {self.iens} failed, "
f"resubmitting for attempt {attempt+2} of {max_submit}"
)
logger.warning(message)
self.returncode = asyncio.Future()
self.started.clear()
else:
await self._send(JobState.FAILED)

async def _max_runtime_task(self) -> None:
assert self.real.max_runtime is not None
Expand Down
11 changes: 11 additions & 0 deletions src/ert/trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import SpanLimits, TracerProvider

resource = Resource(attributes={SERVICE_NAME: "ert"})
tracer_provider = TracerProvider(
resource=resource, span_limits=SpanLimits(max_events=128 * 16)
)
trace.set_tracer_provider(tracer_provider)

tracer = trace.get_tracer("ert.main")
11 changes: 11 additions & 0 deletions tests/ert/unit_tests/plugins/dummy_plugins.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import logging
from io import StringIO

from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

from ert import ForwardModelStepPlugin, plugin

Expand Down Expand Up @@ -76,6 +79,14 @@ def add_log_handle_to_root():
return fh


span_output = StringIO()


@plugin(name="dummy")
def add_span_processor():
return BatchSpanProcessor(ConsoleSpanExporter(out=span_output))


class DummyFMStep(ForwardModelStepPlugin):
def __init__(self):
super().__init__(name="DummyForwardModel", command=["touch", "dummy.out"])
Expand Down
Loading

0 comments on commit c34060c

Please sign in to comment.