Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Add support for Cloudtik to mount storage as a file system with s3, b… #129

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/cloudtik/core/_private/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ def docker_start_cmds(user, image, mount_dict, data_disks, container_name, user_
env_flags = " ".join(
["-e {name}={val}".format(name=k, val=v) for k, v in env_vars.items()])

fuse_flags = "--cap-add SYS_ADMIN --device /dev/fuse --security-opt apparmor:unconfined"

user_options_str = " ".join(user_options)
docker_run = [
docker_cmd, "run", "--rm", "--name {}".format(container_name), "-d",
"-it", mount_flags, env_flags, user_options_str, "--net=host", image,
"-it", mount_flags, env_flags, fuse_flags, user_options_str, "--net=host", image,
"bash"
]
return " ".join(docker_run)
5 changes: 5 additions & 0 deletions python/cloudtik/core/config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
"description": "Mark whether the config file is already bootstrapped or not",
"default": false
},
"cloud_storage_mount_enabled": {
"type": "boolean",
"description": "If enabled, Cloudtik will mount storage as a file system with s3, blobfuse or gcs",
"default": false
},
"provider": {
"type": "object",
"description": "Cloud-provider specific configuration.",
Expand Down
5 changes: 4 additions & 1 deletion python/cloudtik/runtime/spark/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from shlex import quote

from cloudtik.runtime.spark.utils import CLOUDTIK_RUNTIME_SPARK_PATH, update_spark_configurations
from cloudtik.runtime.spark.utils import CLOUDTIK_RUNTIME_SPARK_PATH, update_spark_configurations, is_cloud_storage_mount_enabled


RUNTIME_SPARK_SCRIPTS_PATH = os.path.join(
CLOUDTIK_RUNTIME_SPARK_PATH, "scripts")
Expand Down Expand Up @@ -208,6 +209,8 @@ def configure(head, provider, head_address, aws_s3_bucket, aws_s3_access_key_id,
cmds += ["--azure_container={}".format(azure_container)]
if azure_account_key:
cmds += ["--azure_account_key={}".format(azure_account_key)]
if is_cloud_storage_mount_enabled():
cmds += ["--fuse_flag"]

if script_args:
cmds += list(script_args)
Expand Down
16 changes: 15 additions & 1 deletion python/cloudtik/runtime/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
["proc_nodemanager", False, "NodeManager", "worker"],
]


CLOUDTIK_RUNTIME_SPARK_PATH = os.path.abspath(os.path.dirname(__file__))
SPARK_OUT_CONF = os.path.join(CLOUDTIK_RUNTIME_SPARK_PATH, "conf/outconf/spark/spark-defaults.conf")
CLOUDTIK_BOOTSTRAP_CONFIG_PATH = "~/cloudtik_bootstrap_config.yaml"


YARN_RESOURCE_MEMORY_RATIO = 0.8
SPARK_EXECUTOR_MEMORY_RATIO = 1
Expand Down Expand Up @@ -140,7 +144,7 @@ def _get_spark_config(config: Dict[str, Any]):

def update_spark_configurations():
# Merge user specified configuration and default configuration
bootstrap_config = os.path.expanduser("~/cloudtik_bootstrap_config.yaml")
bootstrap_config = os.path.expanduser(CLOUDTIK_BOOTSTRAP_CONFIG_PATH)
if not os.path.exists(bootstrap_config):
return

Expand Down Expand Up @@ -172,6 +176,16 @@ def update_spark_configurations():
f.write("{} {}\n".format(key, value))


def is_cloud_storage_mount_enabled() -> bool:
bootstrap_config = os.path.expanduser(CLOUDTIK_BOOTSTRAP_CONFIG_PATH)

if not os.path.exists(bootstrap_config):
return False

config = yaml.safe_load(open(bootstrap_config).read())
return config.get("cloud_storage_mount_enabled", False)


def with_spark_runtime_environment_variables(runtime_config, provider):
runtime_envs = {}
if runtime_config and runtime_config.get("spark", {}).get("enable_hdfs", False):
Expand Down
109 changes: 108 additions & 1 deletion runtime/spark/scripts/configure.sh
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
#!/bin/bash

args=$(getopt -a -o h::p: -l head::,head_address::,provider:,aws_s3_bucket::,aws_s3_access_key_id::,aws_s3_secret_access_key::,project_id::,gcs_bucket::,gcs_service_account_client_email::,gcs_service_account_private_key_id::,gcs_service_account_private_key::,azure_storage_type::,azure_storage_account::,azure_container::,azure_account_key:: -- "$@")

args=$(getopt -a -o h::p: -l head::,fuse_flag::,head_address::,provider:,aws_s3_bucket::,aws_s3_access_key_id::,aws_s3_secret_access_key::,project_id::,gcs_bucket::,gcs_service_account_client_email::,gcs_service_account_private_key_id::,gcs_service_account_private_key::,azure_storage_type::,azure_storage_account::,azure_container::,azure_account_key:: -- "$@")
eval set -- "${args}"

IS_HEAD_NODE=false
FUSE_FLAG=false
export USER_HOME=/home/$(whoami)
export MOUNT_PATH=$USER_HOME/share

while true
do
case "$1" in
--head)
IS_HEAD_NODE=true
;;
--fuse_flag)
FUSE_FLAG=true
;;
-h|--head_address)
HEAD_ADDRESS=$2
shift
Expand Down Expand Up @@ -368,10 +375,110 @@ function configure_ganglia() {
fi
}


function s3_fuse() {
if [ ! -n "${AWS_S3A_BUCKET}" ]; then
echo "AWS_S3A_BUCKET environment variable is not set."
exit 1
fi

if [ ! -n "${FS_S3A_ACCESS_KEY}" ]; then
echo "FS_S3A_ACCESS_KEY environment variable is not set."
exit 1
fi

if [ ! -n "${FS_S3A_SECRET_KEY}" ]; then
echo "FS_S3A_SECRET_KEY environment variable is not set."
exit 1
fi

sudo apt-get update
sudo apt-get install s3fs -y

echo "${FS_S3A_ACCESS_KEY}:${FS_S3A_SECRET_KEY}" > ${USER_HOME}/.passwd-s3fs
chmod 600 ${USER_HOME}${USER_HOME}/.passwd-s3fs

mkdir -p ${MOUNT_PATH}
s3fs ${AWS_S3A_BUCKET} -o use_cache=/tmp -o mp_umask=002 -o multireq_max=5 ${MOUNT_PATH}
}


function blob_fuse() {
if [ ! -n "${AZURE_CONTAINER}" ]; then
echo "AZURE_CONTAINER environment variable is not set."
exit 1
fi

if [ ! -n "${AZURE_ACCOUNT_KEY}" ]; then
echo "AZURE_ACCOUNT_KEY environment variable is not set."
exit 1
fi

if [ ! -n "${AZURE_STORAGE_ACCOUNT}" ]; then
echo "AZURE_STORAGE_ACCOUNT environment variable is not set."
exit 1
fi

#Install blobfuse
wget https://packages.microsoft.com/config/ubuntu/20.04/packages-microsoft-prod.deb
sudo dpkg -i packages-microsoft-prod.deb
sudo apt-get update
sudo apt-get install blobfuse
#Use a ramdisk for the temporary path
sudo mkdir /mnt/ramdisk
sudo mount -t tmpfs -o size=16g tmpfs /mnt/ramdisk
sudo mkdir /mnt/ramdisk/blobfusetmp
sudo chown ubuntu /mnt/ramdisk/blobfusetmp


echo "accountName ${AZURE_STORAGE_ACCOUNT}" > ${USER_HOME}/fuse_connection.cfg
echo "accountKey ${AZURE_ACCOUNT_KEY}" >> ${USER_HOME}/fuse_connection.cfg
echo "containerName ${AZURE_CONTAINER}" >> ${USER_HOME}/fuse_connection.
chmod 600 ${USER_HOME}/fuse_connection.cfg
mkdir -p ${MOUNT_PATH}
blobfuse ${MOUNT_PATH} --tmp-path=/mnt/ramdisk/blobfusetmp --config-file=${USER_HOME}/fuse_connection.cfg -o attr_timeout=240 -o entry_timeout=240 -o negative_timeout=120

}


function gcs_fuse() {
if [ ! -n "${GCP_GCS_BUCKET}" ]; then
echo "GCP_GCS_BUCKET environment variable is not set."
exit 1
fi
sudo apt-get update
sudo apt-get install -y curl
echo "deb http://packages.cloud.google.com/apt gcsfuse-bionic main" |sudo tee /etc/apt/sources.list.d/gcsfuse.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install gcsfuse -y
mkdir -p ${MOUNT_PATH}
gcsfuse ${GCP_GCS_BUCKET} ${MOUNT_PATH}
}


function mount_cloud_storage_for_cloudtik() {
if [ $FUSE_FLAG == "true" ];then
if [ "$provider" == "aws" ]; then
s3_fuse
fi

if [ "$provider" == "gcp" ]; then
gcs_fuse
fi

if [ "$provider" == "azure" ]; then
blob_fuse
fi
fi
}


check_spark_installed
set_head_address
set_resources_for_spark
configure_system_folders
configure_hadoop_and_spark
configure_jupyter_for_spark
configure_ganglia
mount_cloud_storage_for_cloudtik