Skip to content

Commit

Permalink
adding execution_stats to python metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Oct 1, 2024
1 parent 83cec8f commit 2069f57
Showing 1 changed file with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os
import time
import traceback
import psutil
from datetime import datetime
from multiprocessing import Pool
from typing import Any
Expand All @@ -24,12 +25,31 @@
PythonTransformRuntimeConfiguration,
)
from data_processing.transform import AbstractBinaryTransform, TransformStatistics
from data_processing.utils import get_logger
from data_processing.utils import GB, get_logger


logger = get_logger(__name__)


@staticmethod
def _execution_resources() -> dict[str, Any]:
"""
Get Execution resource
:return: tuple of cpu/memory usage
"""
# Getting loadover15 minutes
load1, load5, load15 = psutil.getloadavg()
# Getting memory used
mused = round(psutil.virtual_memory()[3] / GB, 2)
return {
"cpus": round((load15/os.cpu_count()) * 100, 1),
"gpus": 0,
"memory": mused,
"object_store": 0,
}



def orchestrate(
data_access_factory: DataAccessFactoryBase,
runtime_config: PythonTransformRuntimeConfiguration,
Expand All @@ -43,6 +63,7 @@ def orchestrate(
:return: 0 - success or 1 - failure
"""
start_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
start_time = time.time()
logger.info(f"orchestrator {runtime_config.get_name()} started at {start_ts}")
# create statistics
statistics = TransformStatistics()
Expand Down Expand Up @@ -118,6 +139,7 @@ def orchestrate(
"job_input_params": input_params
| data_access_factory.get_input_params()
| execution_config.get_input_params(),
"execution_stats": _execution_resources() | {"execution time, min": round((time.time() - start_time) / 60.0, 3)},
"job_output_stats": stats,
}
logger.debug(f"Saving job metadata: {metadata}.")
Expand Down

0 comments on commit 2069f57

Please sign in to comment.