Skip to content

Commit

Permalink
Merge pull request apache#99 from mesosphere/spark-332-cni
Browse files Browse the repository at this point in the history
[SPARK-332] Adds CNI support
  • Loading branch information
mgummelt authored Nov 16, 2016
2 parents d8cf493 + d9e819f commit a9efef8
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 77 deletions.
3 changes: 2 additions & 1 deletion bin/make-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
SPARK_BUILD_DIR="${DIR}/.."

function fetch_spark() {
rm -rf build/dist
mkdir -p build/dist
[ -f "build/dist/${DIST_TGZ}" ] || curl -o "build/dist/${DIST_TGZ}" "${SPARK_DIST_URI}"
curl -o "build/dist/${DIST_TGZ}" "${SPARK_DIST_URI}"
tar xvf "build/dist/${DIST_TGZ}" -C build/dist
}

Expand Down
2 changes: 1 addition & 1 deletion bin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ run_tests() {
fi
source env/bin/activate
pip install -r requirements.txt
python test.py
py.test test.py
if [ $? -ne 0 ]; then
notify_github failure "Tests failed"
exit 1
Expand Down
11 changes: 8 additions & 3 deletions conf/spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ cd $MESOS_SANDBOX

MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so

# Support environments without DNS
if [ -n "$LIBPROCESS_IP" ]; then
SPARK_LOCAL_IP=${LIBPROCESS_IP}
# For non-CNI, tell the Spark driver to bind to LIBPROCESS_IP
#
# For CNI, LIBPROCESS_IP is 0.0.0.0, so we can't do this. Instead, we
# let Spark perform its default behavior of doing a DNS lookup on the
# hostname, which should work because the container is set up with a
# $(hostname) in /etc/hosts.
if ! getent hosts $(hostname) > /dev/null && [ -n "$LIBPROCESS_IP" ]; then
SPARK_LOCAL_IP=${LIBPROCESS_IP}
fi

# I first set this to MESOS_SANDBOX, as a Workaround for MESOS-5866
Expand Down
143 changes: 71 additions & 72 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,57 @@
import shakedown


def get_content_type(basename):
if basename.endswith('.jar'):
content_type = 'application/java-archive'
elif basename.endswith('.py'):
content_type = 'application/x-python'
elif basename.endswith('.R'):
content_type = 'application/R'
else:
raise ValueError("Unexpected file type: {}. Expected .jar, .py, or .R file.".format(basename))
return content_type
def test_jar():
spark_job_runner_args = 'http://leader.mesos:5050 dcos \\"*\\" spark:only 2'
jar_url = _upload_file(os.getenv('TEST_JAR_PATH'))
_run_tests(jar_url,
spark_job_runner_args,
"All tests passed",
{"--class": 'com.typesafe.spark.test.mesos.framework.runners.SparkJobRunner'})


def upload_file(file_path):
conn = S3Connection(os.environ['AWS_ACCESS_KEY_ID'], os.environ['AWS_SECRET_ACCESS_KEY'])
bucket = conn.get_bucket(os.environ['S3_BUCKET'])
basename = os.path.basename(file_path)
def test_python():
script_dir = os.path.dirname(os.path.abspath(__file__))
python_script_path = os.path.join(script_dir, 'jobs', 'pi_with_include.py')
python_script_url = _upload_file(python_script_path)
py_file_path = os.path.join(script_dir, 'jobs', 'PySparkTestInclude.py')
py_file_url = _upload_file(py_file_path)
_run_tests(python_script_url,
"30",
"Pi is roughly 3",
{"--py-files": py_file_url})

content_type = get_content_type(basename)
def test_r():
# TODO: enable R test when R is enabled in Spark (2.1)
#r_script_path = os.path.join(script_dir, 'jobs', 'dataframe.R')
#run_tests(r_script_path,
# '',
# "1 Justin")
pass

key = Key(bucket, '{}/{}'.format(os.environ['S3_PREFIX'], basename))
key.metadata = {'Content-Type': content_type}
key.set_contents_from_filename(file_path)
key.make_public()

jar_url = "http://{0}.s3.amazonaws.com/{1}/{2}".format(
os.environ['S3_BUCKET'],
os.environ['S3_PREFIX'],
basename)
def test_cni():
SPARK_EXAMPLES="http://downloads.mesosphere.com/spark/assets/spark-examples_2.11-2.0.1.jar"
_run_tests(SPARK_EXAMPLES,
"",
"Pi is roughly 3",
{"--class": "org.apache.spark.examples.SparkPi"},
{"spark.mesos.network.name": "dcos"})

return jar_url

def _run_tests(app_url, app_args, expected_output, args={}, config={}):
task_id = _submit_job(app_url, app_args, args, config)
print('Waiting for task id={} to complete'.format(task_id))
shakedown.wait_for_task_completion(task_id)
log = _task_log(task_id)
print(log)
assert expected_output in log

def submit_job(app_resource_url, app_args, app_class, py_files):
if app_class is not None:
app_class_option = '--class {} '.format(app_class)
else:
app_class_option = ''
if py_files is not None:
py_files_option = '--py-files {} '.format(py_files)
else:
py_files_option = ''

submit_args = "-Dspark.driver.memory=2g {0}{1}{2} {3}".format(
app_class_option, py_files_option, app_resource_url, app_args)
def _submit_job(app_url, app_args, args={}, config={}):
args_str = ' '.join('{0} {1}'.format(k, v) for k,v in args.items())
config_str = ' '.join('-D{0}={1}'.format(k, v) for k,v in config.items())
submit_args = ' '.join(arg for arg in ["-Dspark.driver.memory=2g", args_str, app_url, app_args, config_str] if arg != "")
cmd = 'dcos --log-level=DEBUG spark --verbose run --submit-args="{0}"'.format(submit_args)
print('Running {}'.format(cmd))
stdout = subprocess.check_output(cmd, shell=True).decode('utf-8')
Expand All @@ -67,48 +74,40 @@ def submit_job(app_resource_url, app_args, app_class, py_files):
return match.group(1)


def task_log(task_id):
cmd = "dcos task log --completed --lines=1000 {}".format(task_id)
print('Running {}'.format(cmd))
stdout = subprocess.check_output(cmd, shell=True).decode('utf-8')
return stdout
def _upload_file(file_path):
conn = S3Connection(os.environ['AWS_ACCESS_KEY_ID'], os.environ['AWS_SECRET_ACCESS_KEY'])
bucket = conn.get_bucket(os.environ['S3_BUCKET'])
basename = os.path.basename(file_path)

content_type = _get_content_type(basename)

def run_tests(app_path, app_args, expected_output, app_class=None, py_file_path=None):
app_resource_url = upload_file(app_path)
if py_file_path is not None:
py_file_url = upload_file(py_file_path)
else:
py_file_url = None
task_id = submit_job(app_resource_url, app_args, app_class, py_file_url)
print('Waiting for task id={} to complete'.format(task_id))
shakedown.wait_for_task_completion(task_id)
log = task_log(task_id)
print(log)
assert expected_output in log
key = Key(bucket, '{}/{}'.format(os.environ['S3_PREFIX'], basename))
key.metadata = {'Content-Type': content_type}
key.set_contents_from_filename(file_path)
key.make_public()

jar_url = "http://{0}.s3.amazonaws.com/{1}/{2}".format(
os.environ['S3_BUCKET'],
os.environ['S3_PREFIX'],
basename)

def main():
spark_job_runner_args = 'http://leader.mesos:5050 dcos \\"*\\" spark:only 2'
run_tests(os.getenv('TEST_JAR_PATH'),
spark_job_runner_args,
"All tests passed",
app_class='com.typesafe.spark.test.mesos.framework.runners.SparkJobRunner')
return jar_url

script_dir = os.path.dirname(os.path.abspath(__file__))
python_script_path = os.path.join(script_dir, 'jobs', 'pi_with_include.py')
py_file_path = os.path.join(script_dir, 'jobs', 'PySparkTestInclude.py')
run_tests(python_script_path,
'30',
"Pi is roughly 3",
py_file_path=py_file_path)

# TODO: enable R test when R is enabled in Spark (2.1)
#r_script_path = os.path.join(script_dir, 'jobs', 'dataframe.R')
#run_tests(r_script_path,
# '',
# "1 Justin")
def _get_content_type(basename):
if basename.endswith('.jar'):
content_type = 'application/java-archive'
elif basename.endswith('.py'):
content_type = 'application/x-python'
elif basename.endswith('.R'):
content_type = 'application/R'
else:
raise ValueError("Unexpected file type: {}. Expected .jar, .py, or .R file.".format(basename))
return content_type


if __name__ == '__main__':
main()
def _task_log(task_id):
cmd = "dcos task log --completed --lines=1000 {}".format(task_id)
print('Running {}'.format(cmd))
stdout = subprocess.check_output(cmd, shell=True).decode('utf-8')
return stdout

0 comments on commit a9efef8

Please sign in to comment.