Skip to content

Commit

Permalink
[SPARK-478] Make driver failover_timeout configurable, to allow for t…
Browse files Browse the repository at this point in the history
…emporary disconnection between driver and Mesos master. (apache#161)
  • Loading branch information
susanxhuynh authored Jul 19, 2017
1 parent d694c6e commit aa6783f
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 19 deletions.
33 changes: 33 additions & 0 deletions tests/jobs/python/long_running.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import print_function


import sys
import time
from pyspark.sql import SparkSession


if __name__ == "__main__":
"""
Usage: long_running [partitions] [run_time_sec]
"""
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 5
run_time_sec = int(sys.argv[2]) if len(sys.argv) > 2 else 600

spark = SparkSession \
.builder \
.appName("Long-Running Spark Job") \
.getOrCreate()

n = 100000 * partitions
data = spark.sparkContext.parallelize(range(1, n + 1), partitions)

def processPartition(partition):
"""Sleep for run_time_sec"""
print('Start processing partition')
time.sleep(run_time_sec)
print('Done processing partition')

data.foreachPartition(processPartition)
print('Job completed successfully')

spark.stop()
89 changes: 89 additions & 0 deletions tests/test_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import logging
import os
import pytest
import re
import shakedown
import time

import utils
from utils import SPARK_PACKAGE_NAME


LOGGER = logging.getLogger(__name__)
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
LONG_RUNNING_FW_NAME = "Long-Running Spark Job"
LONG_RUNNING_FW_NUM_TASKS = 1
MASTER_CONNECTION_TIMEOUT_SEC = 15 * 60
LONG_RUNNING_RUN_TIME_SEC = MASTER_CONNECTION_TIMEOUT_SEC + (15 * 60)


def setup_module(module):
utils.require_spark()


def teardown_module(module):
shakedown.uninstall_package_and_wait(SPARK_PACKAGE_NAME)


@pytest.mark.skip(reason="Waiting for upstream change, https://issues.apache.org/jira/browse/SPARK-21419")
@pytest.mark.recovery
def test_disconnect_from_master():
python_script_path = os.path.join(THIS_DIR, 'jobs', 'python', 'long_running.py')
python_script_url = utils.upload_file(python_script_path)
task_id = utils.submit_job(python_script_url,
"{} {}".format(LONG_RUNNING_FW_NUM_TASKS, LONG_RUNNING_RUN_TIME_SEC),
["--conf", "spark.mesos.driver.failoverTimeout=1800",
"--conf", "spark.cores.max=1"])

# Wait until executor is running
LOGGER.info("Waiting for executor task to be RUNNING...")
shakedown.wait_for(lambda: utils.is_service_ready(LONG_RUNNING_FW_NAME, LONG_RUNNING_FW_NUM_TASKS),
ignore_exceptions=False,
timeout_seconds=600)

# Block the driver's connection to Mesos master
framework_info = shakedown.get_service(LONG_RUNNING_FW_NAME)
(driver_host, port) = _parse_fw_pid_host_port(framework_info["pid"])
_block_master_connection(driver_host, port)

# The connection will timeout after 15 minutes of inactivity.
# Add 5 minutes to make sure the master has detected the disconnection.
# The framework will be considered disconnected => failover_timeout kicks in.
LOGGER.info("Waiting {} seconds for connection with master to timeout...".format(MASTER_CONNECTION_TIMEOUT_SEC))
time.sleep(MASTER_CONNECTION_TIMEOUT_SEC + 5 * 60)

# Restore the connection. The driver should reconnect.
_unblock_master_connection(driver_host)

# The executor and driver should finish.
utils.check_job_output(task_id, "Job completed successfully")

# Due to https://issues.apache.org/jira/browse/MESOS-5180, the driver does not re-register, so
# teardown won't occur until the failover_timeout period ends. The framework remains "Inactive".
# Uncomment when the bug is fixed:
#_wait_for_completed_framework(LONG_RUNNING_FW_NAME, 60)


def _parse_fw_pid_host_port(pid):
# Framework pid looks like: "scheduler-cd28f2eb-3aec-4060-a731-f5be1f5186c4@10.0.1.7:37085"
regex = r"([^@]+)@([^:]+):(\d+)"
match = re.search(regex, pid)
return match.group(2), int(match.group(3))


def _block_master_connection(host, port):
LOGGER.info("Blocking connection with master")
shakedown.network.save_iptables(host)
# Reject incoming packets from master
shakedown.network.run_iptables(host, '-I INPUT -p tcp --dport {} -j REJECT'.format(port))


def _unblock_master_connection(host):
LOGGER.info("Unblocking connection with master")
shakedown.network.restore_iptables(host)


def _wait_for_completed_framework(fw_name, timeout_seconds):
shakedown.wait_for(lambda: utils.is_framework_completed(fw_name),
ignore_exceptions=False,
timeout_seconds=timeout_seconds)
21 changes: 5 additions & 16 deletions tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def setup_module(module):
if utils.hdfs_enabled():
utils.require_hdfs()
utils.require_spark()
_upload_file(os.environ["SCALA_TEST_JAR_PATH"])
utils.upload_file(os.environ["SCALA_TEST_JAR_PATH"])


def teardown_module(module):
Expand All @@ -42,7 +42,7 @@ def test_jar():
spark_job_runner_args = '{} dcos \\"*\\" spark:only 2 --auth-token={}'.format(
master_url,
shakedown.dcos_acs_token())
jar_url = _upload_file(os.getenv('TEST_JAR_PATH'))
jar_url = utils.upload_file(os.getenv('TEST_JAR_PATH'))
utils.run_tests(jar_url,
spark_job_runner_args,
"All tests passed",
Expand All @@ -62,9 +62,9 @@ def test_teragen():
@pytest.mark.sanity
def test_python():
python_script_path = os.path.join(THIS_DIR, 'jobs', 'python', 'pi_with_include.py')
python_script_url = _upload_file(python_script_path)
python_script_url = utils.upload_file(python_script_path)
py_file_path = os.path.join(THIS_DIR, 'jobs', 'python', 'PySparkTestInclude.py')
py_file_url = _upload_file(py_file_path)
py_file_url = utils.upload_file(py_file_path)
utils.run_tests(python_script_url,
"30",
"Pi is roughly 3",
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_kerberos():
@pytest.mark.sanity
def test_r():
r_script_path = os.path.join(THIS_DIR, 'jobs', 'R', 'dataframe.R')
r_script_url = _upload_file(r_script_path)
r_script_url = utils.upload_file(r_script_path)
utils.run_tests(r_script_url,
'',
"Justin")
Expand Down Expand Up @@ -186,16 +186,5 @@ def _run_janitor(service_name):
auth=shakedown.dcos_acs_token()))


def _upload_file(file_path):
LOGGER.info("Uploading {} to s3://{}/{}".format(
file_path,
os.environ['S3_BUCKET'],
os.environ['S3_PREFIX']))

s3.upload_file(file_path)

basename = os.path.basename(file_path)
return s3.http_url(basename)

def _scala_test_jar_url():
return s3.http_url(os.path.basename(os.environ["SCALA_TEST_JAR_PATH"]))
32 changes: 29 additions & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import re
import requests
import s3
import shakedown
import subprocess
import urllib
Expand Down Expand Up @@ -111,7 +112,11 @@ def _wait_for_hdfs():


def _is_hdfs_ready(expected_tasks = DEFAULT_HDFS_TASK_COUNT):
running_tasks = [t for t in shakedown.get_service_tasks(HDFS_SERVICE_NAME) \
return is_service_ready(HDFS_SERVICE_NAME, expected_tasks)


def is_service_ready(service_name, expected_tasks):
running_tasks = [t for t in shakedown.get_service_tasks(service_name) \
if t['state'] == 'TASK_RUNNING']
return len(running_tasks) >= expected_tasks

Expand All @@ -138,7 +143,11 @@ def _get_spark_options(options = None):


def run_tests(app_url, app_args, expected_output, args=[]):
task_id = _submit_job(app_url, app_args, args)
task_id = submit_job(app_url, app_args, args)
check_job_output(task_id, expected_output)


def check_job_output(task_id, expected_output):
LOGGER.info('Waiting for task id={} to complete'.format(task_id))
shakedown.wait_for_task_completion(task_id)
stdout = _task_log(task_id)
Expand Down Expand Up @@ -167,7 +176,19 @@ def create_secret(name, value):
dcos.http.put(url, data=json.dumps(data))


def _submit_job(app_url, app_args, args=[]):
def upload_file(file_path):
LOGGER.info("Uploading {} to s3://{}/{}".format(
file_path,
os.environ['S3_BUCKET'],
os.environ['S3_PREFIX']))

s3.upload_file(file_path)

basename = os.path.basename(file_path)
return s3.http_url(basename)


def submit_job(app_url, app_args, args=[]):
if is_strict():
args += ["--conf", 'spark.mesos.driverEnv.MESOS_MODULES=file:///opt/mesosphere/etc/mesos-scheduler-modules/dcos_authenticatee_module.json']
args += ["--conf", 'spark.mesos.driverEnv.MESOS_AUTHENTICATEE=com_mesosphere_dcos_ClassicRPCAuthenticatee']
Expand All @@ -193,3 +214,8 @@ def _task_log(task_id, filename=None):
LOGGER.info("Running {}".format(cmd))
stdout = subprocess.check_output(cmd, shell=True).decode('utf-8')
return stdout


def is_framework_completed(fw_name):
# The framework is not Active or Inactive
return shakedown.get_service(fw_name, True) is None

0 comments on commit aa6783f

Please sign in to comment.