diff --git a/data-processing-lib/python/pyproject.toml b/data-processing-lib/python/pyproject.toml index b29c889e3..595a1805a 100644 --- a/data-processing-lib/python/pyproject.toml +++ b/data-processing-lib/python/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "boto3==1.34.69", "argparse", "mmh3", + "psutil", ] [project_urls] diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py index 710adb62c..8692da29e 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py @@ -9,9 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - +import os import time import traceback +import psutil from datetime import datetime from multiprocessing import Pool from typing import Any @@ -24,12 +25,31 @@ PythonTransformRuntimeConfiguration, ) from data_processing.transform import AbstractBinaryTransform, TransformStatistics -from data_processing.utils import get_logger +from data_processing.utils import GB, get_logger logger = get_logger(__name__) +@staticmethod +def _execution_resources() -> dict[str, Any]: + """ + Get Execution resource + :return: tuple of cpu/memory usage + """ + # Getting loadover15 minutes + load1, load5, load15 = psutil.getloadavg() + # Getting memory used + mused = round(psutil.virtual_memory()[3] / GB, 2) + return { + "cpus": round((load15/os.cpu_count()) * 100, 1), + "gpus": 0, + "memory": mused, + "object_store": 0, + } + + + def orchestrate( data_access_factory: DataAccessFactoryBase, runtime_config: PythonTransformRuntimeConfiguration, @@ -43,6 +63,7 @@ def orchestrate( :return: 0 - success or 1 - failure """ start_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + start_time = time.time() logger.info(f"orchestrator {runtime_config.get_name()} started at {start_ts}") # create statistics statistics = TransformStatistics() @@ -118,6 +139,7 @@ def orchestrate( "job_input_params": input_params | data_access_factory.get_input_params() | execution_config.get_input_params(), + "execution_stats": _execution_resources() | {"execution time, min": round((time.time() - start_time) / 60.0, 3)}, "job_output_stats": stats, } logger.debug(f"Saving job metadata: {metadata}.") diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py index 69699f16a..57a6c58fc 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py @@ -16,7 +16,7 @@ from data_processing.data_access import DataAccessFactoryBase from data_processing.transform import TransformStatistics -from data_processing.utils import get_logger +from data_processing.utils import GB, get_logger from data_processing_spark.runtime.spark import ( SparkTransformFileProcessor, SparkTransformRuntimeConfiguration, @@ -123,6 +123,12 @@ def process_partition(iterator): try: # build and save metadata logger.debug("Building job metadata") + cpus = sc.defaultParallelism + executors = sc._jsc.sc().getExecutorMemoryStatus() + memory = 0.0 + for i in range(executors.size()): + memory += executors.toList().apply(i)._2()._1() + resources = {"cpus": cpus, "gpus": 0, "memory": round(memory/GB, 2), "object_store": 0} input_params = runtime_config.get_transform_metadata() | execution_configuration.get_input_params() metadata = { "pipeline": execution_configuration.pipeline_id, @@ -136,8 +142,8 @@ def process_partition(iterator): "job_input_params": input_params | data_access_factory.get_input_params(), "execution_stats": { "num partitions": num_partitions, - "execution time, min": (time.time() - start_time) / 60, - }, + "execution time, min": round((time.time() - start_time) / 60, 3), + } | resources, "job_output_stats": stats, } logger.debug(f"Saving job metadata: {metadata}.")