Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes to support db 13.3+ #716

Merged
merged 3 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion notebooks/databricks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ If you already have a Databricks account, you can run the example notebooks on a
spark.task.resource.gpu.amount 1
spark.databricks.delta.preview.enabled true
spark.python.worker.reuse true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-24.04.1.jar:/databricks/spark/python
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-24.06.1.jar:/databricks/spark/python
spark.sql.execution.arrow.maxRecordsPerBatch 100000
spark.rapids.memory.gpu.minAllocFraction 0.0001
spark.plugins com.nvidia.spark.SQLPlugin
Expand Down
2 changes: 1 addition & 1 deletion notebooks/databricks/init-pip-cuda-11.8.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SPARK_RAPIDS_ML_ZIP=/dbfs/path/to/zip/file
# also in general, RAPIDS_VERSION (python) fields should omit any leading 0 in month/minor field (i.e. 23.8.0 and not 23.08.0)
# while SPARK_RAPIDS_VERSION (jar) should have leading 0 in month/minor (e.g. 23.08.2 and not 23.8.2)
RAPIDS_VERSION=24.8.0
SPARK_RAPIDS_VERSION=24.04.1
SPARK_RAPIDS_VERSION=24.06.1

curl -L https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/${SPARK_RAPIDS_VERSION}/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}-cuda11.jar -o /databricks/jars/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}.jar

Expand Down
12 changes: 2 additions & 10 deletions notebooks/logistic-regression.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"metadata": {},
"source": [
"### Classifier builder\n",
"We will use this function to build both the Spark RAPIDS ML (GPU) and Spark ML (CPU) logistic regression classifier objects, demonstrating the common API, and verify they yield similar performance on our synthetic dataset. NOTE: GPU LogisisticRegression does not yet support `standardization=True`"
"We will use this function to build both the Spark RAPIDS ML (GPU) and Spark ML (CPU) logistic regression classifier objects, demonstrating the common API, and verify they yield similar performance on our synthetic dataset."
]
},
{
Expand All @@ -134,7 +134,7 @@
"outputs": [],
"source": [
"def build_lr_classifier(estimator_class):\n",
" return ( estimator_class(standardization=False)\n",
" return ( estimator_class()\n",
" .setFeaturesCol(\"features\")\n",
" .setLabelCol(\"label\")\n",
" .setRegParam(0.001)\n",
Expand Down Expand Up @@ -655,13 +655,6 @@
"## Sparse Vectors"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Standardization needs to be false for now. Will be fixed in 24.02."
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -715,7 +708,6 @@
" regParam=0.01,\n",
" maxIter=100,\n",
" fitIntercept=True,\n",
" standardization=False,\n",
" featuresCol=\"features\",\n",
" labelCol=\"label\",\n",
" )\n",
Expand Down
4 changes: 2 additions & 2 deletions notebooks/umap.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,8 @@
"import os\n",
"import requests\n",
"\n",
"SPARK_RAPIDS_VERSION = \"23.12.1\"\n",
"cuda_version = \"12\"\n",
"SPARK_RAPIDS_VERSION = \"24.06.1\"\n",
"cuda_version = \"11\"\n",
"rapids_jar = f\"rapids-4-spark_2.12-{SPARK_RAPIDS_VERSION}.jar\"\n",
"\n",
"if not os.path.exists(rapids_jar):\n",
Expand Down
15 changes: 14 additions & 1 deletion python/benchmark/benchmark/bench_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import time
from typing import Any, Dict, Iterator, List, Optional, Union

import numpy as np
import pandas as pd
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.functions import array_to_vector, vector_to_array
from pyspark.sql import DataFrame, SparkSession
Expand Down Expand Up @@ -185,11 +187,22 @@ def gpu_cache_df(df: DataFrame) -> DataFrame:
)
elif is_vector_col:
df_for_scoring = transformed_df.select(
vector_to_array(col(feature_col)), output_col
vector_to_array(col(feature_col)).alias(feature_col), output_col
)

cluster_centers = gpu_model.cluster_centers_

# temporary patch for DB with spark-rapids plugin
# this part is not timed so overhead is not critical, but should be reverted
# once https://github.com/NVIDIA/spark-rapids/issues/10770 is fixed
db_version = os.environ.get("DATABRICKS_RUNTIME_VERSION")
if db_version:
dim = len(cluster_centers[0])
# inject unsupported expr (slice) that is essentially a noop
df_for_scoring = df_for_scoring.select(
F.slice(feature_col, 1, dim).alias(feature_col), output_col
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Any intuition as to why slice can get the hanging resolved?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is an unsupported expression in spark-rapids so falls back to cpu and injects columnartorow and rowtocolumnar transformations which do some batching that doesn't (but should after patching) happen otherwise.

)

if num_cpus > 0:
from pyspark.ml.clustering import KMeans as SparkKMeans

Expand Down
6 changes: 4 additions & 2 deletions python/benchmark/databricks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ This directory contains shell scripts for running larger scale benchmarks on Dat

2. The benchmarks can be run as
```bash
./run_benchmark.sh [cpu|gpu] >> benchmark_log
./run_benchmark.sh [cpu|gpu|gpu_etl] [[12.2|13.3|14.3]] >> benchmark_log
```

The script creates a cpu or gpu cluster, respectively using the cluster specs in [cpu_cluster_spec](./cpu_cluster_spec.sh) and [gpu_cluster_spec](./gpu_cluster_spec.sh), depending on the supplied argument. In gpu mode each algorithm benchmark is run 3 times, and similarly in cpu mode, except for kmeans and random forest classifier and regressor which are each run 1 time due to their long running times.
The script creates a cpu or gpu cluster, respectively using the cluster specs in [cpu_cluster_spec](./cpu_cluster_spec.sh), [gpu_cluster_spec](./gpu_cluster_spec.sh), [gpu_etl_cluster_spec](./gpu_etl_cluster_spec.sh), depending on the supplied argument. In gpu and gpu_etl mode each algorithm benchmark is run 3 times, and similarly in cpu mode, except for kmeans and random forest classifier and regressor which are each run 1 time due to their long running times. gpu_etl mode also uses the [spark-rapids](https://github.com/NVIDIA/spark-rapids) gpu accelerated plugin.

An optional databricks runtime version can be supplied as a second argument, with 13.3 being the default if not specified. Runtimes higher than 13.3 are only compatible with cpu and gpu modes (i.e. not gpu_etl) as they are not yet supported by the spark-rapids plugin.

3. The file `benchmark_log` will have the fit/train/transform running times and accuracy scores. A simple convenience script has been provided to extract timing information for each run:
```bash
Expand Down
1 change: 1 addition & 0 deletions python/benchmark/databricks/benchmark_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ create_cluster() {
INIT_SCRIPT_DIR="${WS_BENCHMARK_HOME}/init_scripts"

# sourcing allows variable substitution (e.g. cluster name) into cluster json specs

cluster_spec=`source ${cluster_type}_cluster_spec.sh`
echo $cluster_spec

Expand Down
2 changes: 1 addition & 1 deletion python/benchmark/databricks/cpu_cluster_spec.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cat <<EOF
{
"num_workers": $(( $num_cpus / 8)),
"cluster_name": "$cluster_name",
"spark_version": "11.3.x-cpu-ml-scala2.12",
"spark_version": "${db_version}.x-cpu-ml-scala2.12",
"spark_conf": {},
"aws_attributes": {
"first_on_demand": 1,
Expand Down
21 changes: 3 additions & 18 deletions python/benchmark/databricks/gpu_cluster_spec.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,22 @@ cat <<EOF
{
"num_workers": $num_gpus,
"cluster_name": "$cluster_name",
"spark_version": "12.2.x-gpu-ml-scala2.12",
"spark_version": "${db_version}.x-gpu-ml-scala2.12",
"spark_conf": {
"spark.task.resource.gpu.amount": "0.25",
"spark.task.cpus": "1",
"spark.databricks.delta.preview.enabled": "true",
"spark.python.worker.reuse": "true",
"spark.executorEnv.PYTHONPATH": "/databricks/jars/rapids-4-spark_2.12-24.04.1.jar:/databricks/spark/python",
"spark.executorEnv.PYTHONPATH": "/databricks/spark/python",
"spark.sql.files.minPartitionNum": "2",
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
"spark.executor.cores": "8",
"spark.executor.memory": "5g",
"spark.rapids.memory.gpu.minAllocFraction": "0.0001",
"spark.plugins": "com.nvidia.spark.SQLPlugin",
"spark.locality.wait": "0s",
"spark.sql.cache.serializer": "com.nvidia.spark.ParquetCachedBatchSerializer",
"spark.rapids.memory.gpu.pooling.enabled": "false",
"spark.rapids.sql.explain": "ALL",
"spark.sql.execution.sortBeforeRepartition": "false",
"spark.rapids.sql.python.gpu.enabled": "true",
"spark.rapids.memory.pinnedPool.size": "2G",
"spark.python.daemon.module": "rapids.daemon_databricks",
"spark.rapids.sql.batchSizeBytes": "512m",
"spark.sql.adaptive.enabled": "false",
"spark.sql.execution.arrow.pyspark.enabled": "true",
"spark.sql.files.maxPartitionBytes": "2000000000000",
"spark.databricks.delta.optimizeWrite.enabled": "false",
"spark.rapids.sql.concurrentGpuTasks": "2"
"spark.databricks.delta.optimizeWrite.enabled": "false"
},
"aws_attributes": {
"first_on_demand": 1,
Expand All @@ -46,10 +35,6 @@ cat <<EOF
"destination": "dbfs:${BENCHMARK_HOME}/cluster_logs/${cluster_name}"
}
},
"spark_env_vars": {
"LIBCUDF_CUFILE_POLICY": "OFF",
"NCCL_DEBUG": "INFO"
},
"autotermination_minutes": 30,
"enable_elastic_disk": false,
"init_scripts": [
Expand Down
65 changes: 65 additions & 0 deletions python/benchmark/databricks/gpu_etl_cluster_spec.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# needed for bm script arguments
cat <<EOF
{
"num_workers": $num_gpus,
"cluster_name": "$cluster_name",
"spark_version": "${db_version}.x-gpu-ml-scala2.12",
"spark_conf": {
"spark.task.resource.gpu.amount": "0.25",
"spark.task.cpus": "1",
"spark.databricks.delta.preview.enabled": "true",
"spark.python.worker.reuse": "true",
"spark.executorEnv.PYTHONPATH": "/databricks/jars/rapids-4-spark_2.12-24.06.1.jar:/databricks/spark/python",
"spark.sql.files.minPartitionNum": "2",
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
"spark.executor.cores": "8",
"spark.executor.memory": "5g",
"spark.rapids.memory.gpu.minAllocFraction": "0.0001",
"spark.plugins": "com.nvidia.spark.SQLPlugin",
"spark.locality.wait": "0s",
"spark.sql.cache.serializer": "com.nvidia.spark.ParquetCachedBatchSerializer",
"spark.rapids.memory.gpu.pooling.enabled": "false",
"spark.rapids.sql.explain": "ALL",
"spark.sql.execution.sortBeforeRepartition": "false",
"spark.rapids.sql.python.gpu.enabled": "true",
"spark.rapids.memory.pinnedPool.size": "2G",
"spark.python.daemon.module": "rapids.daemon_databricks",
"spark.rapids.sql.batchSizeBytes": "512m",
"spark.sql.adaptive.enabled": "false",
"spark.sql.execution.arrow.pyspark.enabled": "true",
"spark.sql.files.maxPartitionBytes": "2000000000000",
"spark.databricks.delta.optimizeWrite.enabled": "false",
"spark.rapids.sql.concurrentGpuTasks": "2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we keep this here, do we need to remove it from gpu_cluster_spec.sh (the one without GPU ETL)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Will delete there. Good catch.

},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-west-2a",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0
},
"node_type_id": "g5.2xlarge",
"driver_node_type_id": "g4dn.xlarge",
"custom_tags": {},
"cluster_log_conf": {
"dbfs": {
"destination": "dbfs:${BENCHMARK_HOME}/cluster_logs/${cluster_name}"
}
},
"spark_env_vars": {
"LIBCUDF_CUFILE_POLICY": "OFF",
"NCCL_DEBUG": "INFO"
},
"autotermination_minutes": 30,
"enable_elastic_disk": false,
"init_scripts": [
{
"workspace": {
"destination": "${INIT_SCRIPT_DIR}/init-pip-cuda-11.8.sh"
}
}
],
"enable_local_disk_encryption": false,
"runtime_engine": "STANDARD"
}
EOF
2 changes: 1 addition & 1 deletion python/benchmark/databricks/init-pip-cuda-11.8.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ BENCHMARK_ZIP=/dbfs/path/to/benchmark.zip
# also, in general, RAPIDS_VERSION (python) fields should omit any leading 0 in month/minor field (i.e. 23.8.0 and not 23.08.0)
# while SPARK_RAPIDS_VERSION (jar) should have leading 0 in month/minor (e.g. 23.08.2 and not 23.8.2)
RAPIDS_VERSION=24.8.0
SPARK_RAPIDS_VERSION=24.04.1
SPARK_RAPIDS_VERSION=24.06.1

curl -L https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/${SPARK_RAPIDS_VERSION}/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}-cuda11.jar -o /databricks/jars/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}.jar

Expand Down
16 changes: 11 additions & 5 deletions python/benchmark/databricks/run_benchmark.sh
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
#!/bin/bash

cluster_type=$1
cluster_type=${1:-gpu_etl}
db_version=${2:-13.3}

if [[ $cluster_type == "gpu" ]]; then
if [[ $cluster_type == "gpu" || $cluster_type == "gpu_etl" ]]; then
num_cpus=0
num_gpus=2
elif [[ $cluster_type == "cpu" ]]; then
num_cpus=16
num_gpus=0
else
echo "unknown cluster type $cluster_type"
echo "usage: $0 cpu|gpu"
echo "usage: $0 cpu|gpu|gpu_etl [12.2|13.3|14.3|15.4]"
exit 1
fi

if [[ $db_version > 13.3 && $cluster_type == "gpu_etl" ]]; then
echo "spark rapids etl plugin is not supported on databricks ${db_version}"
echo "please specify db_version 12.2 or 13.3 for cluster type gpu_etl"
exit 1
fi

source benchmark_utils.sh

#BENCHMARK_DATA_HOME=/spark-rapids-ml/benchmarking/datasets
BENCHMARK_DATA_HOME=s3a://spark-rapids-ml-bm-datasets-public

# creates cluster and sets CLUSTER_ID equal to created cluster's id
Expand All @@ -31,7 +37,7 @@ kmeans_runs=1
rf_runs=1
rf_cpu_options="--subsamplingRate=0.5"

if [[ $cluster_type == "gpu" ]]
if [[ $cluster_type =~ "gpu" ]]
then
n_streams="--n_streams 4"
kmeans_runs=3
Expand Down
2 changes: 1 addition & 1 deletion python/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ EOF

if [[ $cluster_type == "gpu_etl" ]]
then
SPARK_RAPIDS_VERSION=24.04.1
SPARK_RAPIDS_VERSION=24.06.1
rapids_jar=${rapids_jar:-rapids-4-spark_2.12-$SPARK_RAPIDS_VERSION.jar}
if [ ! -f $rapids_jar ]; then
echo "downloading spark rapids jar"
Expand Down