diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py index 710adb62c9..8692da29e3 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py @@ -9,9 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - +import os import time import traceback +import psutil from datetime import datetime from multiprocessing import Pool from typing import Any @@ -24,12 +25,31 @@ PythonTransformRuntimeConfiguration, ) from data_processing.transform import AbstractBinaryTransform, TransformStatistics -from data_processing.utils import get_logger +from data_processing.utils import GB, get_logger logger = get_logger(__name__) +@staticmethod +def _execution_resources() -> dict[str, Any]: + """ + Get Execution resource + :return: tuple of cpu/memory usage + """ + # Getting loadover15 minutes + load1, load5, load15 = psutil.getloadavg() + # Getting memory used + mused = round(psutil.virtual_memory()[3] / GB, 2) + return { + "cpus": round((load15/os.cpu_count()) * 100, 1), + "gpus": 0, + "memory": mused, + "object_store": 0, + } + + + def orchestrate( data_access_factory: DataAccessFactoryBase, runtime_config: PythonTransformRuntimeConfiguration, @@ -43,6 +63,7 @@ def orchestrate( :return: 0 - success or 1 - failure """ start_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + start_time = time.time() logger.info(f"orchestrator {runtime_config.get_name()} started at {start_ts}") # create statistics statistics = TransformStatistics() @@ -118,6 +139,7 @@ def orchestrate( "job_input_params": input_params | data_access_factory.get_input_params() | execution_config.get_input_params(), + "execution_stats": _execution_resources() | {"execution time, min": round((time.time() - start_time) / 60.0, 3)}, "job_output_stats": stats, } logger.debug(f"Saving job metadata: {metadata}.")