Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove arguments related to cost-savings #1230

Merged
merged 5 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,12 @@ def _init_nodes(self):
# construct worker nodes info when cluster is inactive
executors_cnt = len(worker_nodes_from_conf) if worker_nodes_from_conf else 0
if num_workers != executors_cnt:
self.logger.warning('Cluster configuration: `executors` count %d does not match the '
'`num_workers` value %d. Using generated names.', executors_cnt,
num_workers)
if not self.is_inferred:
# this warning should be raised only when the cluster is not inferred, i.e. user has provided the
# cluster configuration with num_workers explicitly set
self.logger.warning('Cluster configuration: `executors` count %d does not match the '
'`num_workers` value %d. Using the `num_workers` value.', executors_cnt,
num_workers)
worker_nodes_from_conf = self.generate_node_configurations(num_workers)
if num_workers == 0 and self.props.get_value('node_type_id') is None:
# if there are no worker nodes and no node_type_id, then we cannot proceed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,12 @@ def _init_nodes(self):
# construct worker nodes info when cluster is inactive
executors_cnt = len(worker_nodes_from_conf) if worker_nodes_from_conf else 0
if num_workers != executors_cnt:
self.logger.warning('Cluster configuration: `executors` count %d does not match the '
'`num_workers` value %d. Using generated names.', executors_cnt,
num_workers)
if not self.is_inferred:
# this warning should be raised only when the cluster is not inferred, i.e. user has provided the
# cluster configuration with num_workers explicitly set
self.logger.warning('Cluster configuration: `executors` count %d does not match the '
'`num_workers` value %d. Using generated names.', executors_cnt,
num_workers)
worker_nodes_from_conf = self.generate_node_configurations(num_workers)
if num_workers == 0 and self.props.get_value('node_type_id') is None:
# if there are no worker nodes and no node_type_id, then we cannot proceed
Expand Down
9 changes: 6 additions & 3 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,12 @@ def _init_nodes(self):
worker_nodes_from_conf = self.props.get_value_silent('config', 'workerConfig', 'instanceNames')
instance_names_cnt = len(worker_nodes_from_conf) if worker_nodes_from_conf else 0
if worker_cnt != instance_names_cnt:
self.logger.warning('Cluster configuration: `instanceNames` count %d does not '
'match the `numInstances` value %d. Using generated names.',
instance_names_cnt, worker_cnt)
if not self.is_inferred:
# this warning should be raised only when the cluster is not inferred, i.e. user has provided the
# cluster configuration with num_workers explicitly set
self.logger.warning('Cluster configuration: `instanceNames` count %d does not '
'match the `numInstances` value %d. Using generated names.',
instance_names_cnt, worker_cnt)
worker_nodes_from_conf = self.generate_node_configurations(worker_cnt)
# create workers array
for worker_node in worker_nodes_from_conf:
Expand Down
9 changes: 9 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,15 @@ def generate_node_configurations(self, num_executors: int, render_args: dict = N
node_config = self._generate_node_configuration(render_args)
return [node_config for _ in range(num_executors)]

def get_cluster_shape_str(self) -> str:
"""
Returns a string representation of the cluster shape.
"""
master_node = self.get_master_node().instance_type
executor_node = self.get_worker_node(0).instance_type
num_executors = self.get_nodes_cnt(SparkNodeType.WORKER)
return f'<Driver: {master_node}, Executor: {num_executors} X {executor_node}>'


@dataclass
class ClusterReshape(ClusterGetAccessor):
Expand Down
58 changes: 40 additions & 18 deletions user_tools/src/spark_rapids_pytools/common/cluster_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""This module provides functionality for cluster inference"""

from dataclasses import dataclass, field

from enum import Enum
from typing import Optional
from logging import Logger

Expand All @@ -26,17 +26,29 @@
from spark_rapids_pytools.common.utilities import ToolLogging


class ClusterType(Enum):
"""
Enum for cluster types
"""
CPU = 'CPU'
GPU = 'GPU'

def __str__(self):
return self.value


@dataclass
class ClusterInference:
"""
Class for inferring cluster information and constructing CPU clusters.
Class for inferring cluster information and constructing CPU or GPU clusters.

:param platform: The platform on which the cluster inference is performed.
"""
platform: PlatformBase = field(default=None, init=True)
cluster_type: ClusterType = field(default=ClusterType.CPU, init=True)
logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.cluster_inference'), init=False)

def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict]:
def _get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict]:
"""
Extract information about drivers and executors from input json
"""
Expand All @@ -53,10 +65,15 @@ def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict
cores_per_executor = cluster_info_df.get('Cores Per Executor')
execs_per_node = cluster_info_df.get('Num Executors Per Node')
total_cores_per_node = execs_per_node * cores_per_executor
if pd.isna(total_cores_per_node):
self.logger.info('For App ID: %s, Unable to infer %s cluster. Reason - Total cores per node cannot'
' be determined.', cluster_info_df['App ID'], self.cluster_type)
return None
# TODO - need to account for number of GPUs per executor
executor_instance = self.platform.get_matching_executor_instance(total_cores_per_node)
if pd.isna(executor_instance):
self.logger.info('Unable to infer CPU cluster. No matching executor instance found for vCPUs = %s',
self.logger.info('For App ID: %s, Unable to infer %s cluster. Reason - No matching executor instance '
'found for num cores = %d', cluster_info_df['App ID'], self.cluster_type,
total_cores_per_node)
return None
return {
Expand All @@ -66,21 +83,26 @@ def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict
'NUM_EXECUTOR_NODES': int(num_executor_nodes)
}

def infer_cpu_cluster(self, cluster_info_df: pd.DataFrame) -> Optional[ClusterBase]:
def infer_cluster(self, cluster_info_df: pd.DataFrame) -> Optional[ClusterBase]:
"""
Infer CPU cluster configuration based on json input and return the constructed cluster object.
Infer CPU or GPU cluster configuration based input cluster df and return the constructed cluster object.
"""
if len(cluster_info_df) != 1:
self.logger.info('Cannot infer CPU cluster from event logs. Only single cluster is supported.')
return None
try:
if len(cluster_info_df) != 1:
self.logger.info('Cannot infer %s cluster from event logs. Only single cluster is supported.',
self.cluster_type)
return None

# Extract cluster information from parsed logs. Above check ensures df contains single row.
cluster_template_args = self.get_cluster_template_args(cluster_info_df.iloc[0])
if cluster_template_args is None:
return None
# Construct cluster configuration using platform-specific logic
cluster_conf = self.platform.generate_cluster_configuration(cluster_template_args)
if cluster_conf is None:
# Extract cluster information from parsed logs. Above check ensures df contains single row.
cluster_template_args = self._get_cluster_template_args(cluster_info_df.iloc[0])
if cluster_template_args is None:
return None
# Construct cluster configuration using platform-specific logic
cluster_conf = self.platform.generate_cluster_configuration(cluster_template_args)
if cluster_conf is None:
return None
cluster_props_new = JSONPropertiesContainer(cluster_conf, file_load=False)
return self.platform.load_cluster_by_prop(cluster_props_new, is_inferred=True)
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error while inferring cluster: %s', str(e))
return None
cluster_props_new = JSONPropertiesContainer(cluster_conf, file_load=False)
return self.platform.load_cluster_by_prop(cluster_props_new, is_inferred=True)
Loading