diff --git a/tests/jobs/python/long_running.py b/tests/jobs/python/long_running.py new file mode 100644 index 0000000000000..bd8133f30ecec --- /dev/null +++ b/tests/jobs/python/long_running.py @@ -0,0 +1,33 @@ +from __future__ import print_function + + +import sys +import time +from pyspark.sql import SparkSession + + +if __name__ == "__main__": + """ + Usage: long_running [partitions] [run_time_sec] + """ + partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 5 + run_time_sec = int(sys.argv[2]) if len(sys.argv) > 2 else 600 + + spark = SparkSession \ + .builder \ + .appName("Long-Running Spark Job") \ + .getOrCreate() + + n = 100000 * partitions + data = spark.sparkContext.parallelize(range(1, n + 1), partitions) + + def processPartition(partition): + """Sleep for run_time_sec""" + print('Start processing partition') + time.sleep(run_time_sec) + print('Done processing partition') + + data.foreachPartition(processPartition) + print('Job completed successfully') + + spark.stop() diff --git a/tests/test_recovery.py b/tests/test_recovery.py new file mode 100644 index 0000000000000..b29763fe12bf9 --- /dev/null +++ b/tests/test_recovery.py @@ -0,0 +1,89 @@ +import logging +import os +import pytest +import re +import shakedown +import time + +import utils +from utils import SPARK_PACKAGE_NAME + + +LOGGER = logging.getLogger(__name__) +THIS_DIR = os.path.dirname(os.path.abspath(__file__)) +LONG_RUNNING_FW_NAME = "Long-Running Spark Job" +LONG_RUNNING_FW_NUM_TASKS = 1 +MASTER_CONNECTION_TIMEOUT_SEC = 15 * 60 +LONG_RUNNING_RUN_TIME_SEC = MASTER_CONNECTION_TIMEOUT_SEC + (15 * 60) + + +def setup_module(module): + utils.require_spark() + + +def teardown_module(module): + shakedown.uninstall_package_and_wait(SPARK_PACKAGE_NAME) + + +@pytest.mark.skip(reason="Waiting for upstream change, https://issues.apache.org/jira/browse/SPARK-21419") +@pytest.mark.recovery +def test_disconnect_from_master(): + python_script_path = os.path.join(THIS_DIR, 'jobs', 'python', 'long_running.py') + python_script_url = utils.upload_file(python_script_path) + task_id = utils.submit_job(python_script_url, + "{} {}".format(LONG_RUNNING_FW_NUM_TASKS, LONG_RUNNING_RUN_TIME_SEC), + ["--conf", "spark.mesos.driver.failoverTimeout=1800", + "--conf", "spark.cores.max=1"]) + + # Wait until executor is running + LOGGER.info("Waiting for executor task to be RUNNING...") + shakedown.wait_for(lambda: utils.is_service_ready(LONG_RUNNING_FW_NAME, LONG_RUNNING_FW_NUM_TASKS), + ignore_exceptions=False, + timeout_seconds=600) + + # Block the driver's connection to Mesos master + framework_info = shakedown.get_service(LONG_RUNNING_FW_NAME) + (driver_host, port) = _parse_fw_pid_host_port(framework_info["pid"]) + _block_master_connection(driver_host, port) + + # The connection will timeout after 15 minutes of inactivity. + # Add 5 minutes to make sure the master has detected the disconnection. + # The framework will be considered disconnected => failover_timeout kicks in. + LOGGER.info("Waiting {} seconds for connection with master to timeout...".format(MASTER_CONNECTION_TIMEOUT_SEC)) + time.sleep(MASTER_CONNECTION_TIMEOUT_SEC + 5 * 60) + + # Restore the connection. The driver should reconnect. + _unblock_master_connection(driver_host) + + # The executor and driver should finish. + utils.check_job_output(task_id, "Job completed successfully") + + # Due to https://issues.apache.org/jira/browse/MESOS-5180, the driver does not re-register, so + # teardown won't occur until the failover_timeout period ends. The framework remains "Inactive". + # Uncomment when the bug is fixed: + #_wait_for_completed_framework(LONG_RUNNING_FW_NAME, 60) + + +def _parse_fw_pid_host_port(pid): + # Framework pid looks like: "scheduler-cd28f2eb-3aec-4060-a731-f5be1f5186c4@10.0.1.7:37085" + regex = r"([^@]+)@([^:]+):(\d+)" + match = re.search(regex, pid) + return match.group(2), int(match.group(3)) + + +def _block_master_connection(host, port): + LOGGER.info("Blocking connection with master") + shakedown.network.save_iptables(host) + # Reject incoming packets from master + shakedown.network.run_iptables(host, '-I INPUT -p tcp --dport {} -j REJECT'.format(port)) + + +def _unblock_master_connection(host): + LOGGER.info("Unblocking connection with master") + shakedown.network.restore_iptables(host) + + +def _wait_for_completed_framework(fw_name, timeout_seconds): + shakedown.wait_for(lambda: utils.is_framework_completed(fw_name), + ignore_exceptions=False, + timeout_seconds=timeout_seconds) diff --git a/tests/test_spark.py b/tests/test_spark.py index e1a61d730d3d0..d7cf68fa4e784 100644 --- a/tests/test_spark.py +++ b/tests/test_spark.py @@ -26,7 +26,7 @@ def setup_module(module): if utils.hdfs_enabled(): utils.require_hdfs() utils.require_spark() - _upload_file(os.environ["SCALA_TEST_JAR_PATH"]) + utils.upload_file(os.environ["SCALA_TEST_JAR_PATH"]) def teardown_module(module): @@ -42,7 +42,7 @@ def test_jar(): spark_job_runner_args = '{} dcos \\"*\\" spark:only 2 --auth-token={}'.format( master_url, shakedown.dcos_acs_token()) - jar_url = _upload_file(os.getenv('TEST_JAR_PATH')) + jar_url = utils.upload_file(os.getenv('TEST_JAR_PATH')) utils.run_tests(jar_url, spark_job_runner_args, "All tests passed", @@ -62,9 +62,9 @@ def test_teragen(): @pytest.mark.sanity def test_python(): python_script_path = os.path.join(THIS_DIR, 'jobs', 'python', 'pi_with_include.py') - python_script_url = _upload_file(python_script_path) + python_script_url = utils.upload_file(python_script_path) py_file_path = os.path.join(THIS_DIR, 'jobs', 'python', 'PySparkTestInclude.py') - py_file_url = _upload_file(py_file_path) + py_file_url = utils.upload_file(py_file_path) utils.run_tests(python_script_url, "30", "Pi is roughly 3", @@ -97,7 +97,7 @@ def test_kerberos(): @pytest.mark.sanity def test_r(): r_script_path = os.path.join(THIS_DIR, 'jobs', 'R', 'dataframe.R') - r_script_url = _upload_file(r_script_path) + r_script_url = utils.upload_file(r_script_path) utils.run_tests(r_script_url, '', "Justin") @@ -186,16 +186,5 @@ def _run_janitor(service_name): auth=shakedown.dcos_acs_token())) -def _upload_file(file_path): - LOGGER.info("Uploading {} to s3://{}/{}".format( - file_path, - os.environ['S3_BUCKET'], - os.environ['S3_PREFIX'])) - - s3.upload_file(file_path) - - basename = os.path.basename(file_path) - return s3.http_url(basename) - def _scala_test_jar_url(): return s3.http_url(os.path.basename(os.environ["SCALA_TEST_JAR_PATH"])) diff --git a/tests/utils.py b/tests/utils.py index ee40371164437..d7692e8dc5c2b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,6 +7,7 @@ import os import re import requests +import s3 import shakedown import subprocess import urllib @@ -111,7 +112,11 @@ def _wait_for_hdfs(): def _is_hdfs_ready(expected_tasks = DEFAULT_HDFS_TASK_COUNT): - running_tasks = [t for t in shakedown.get_service_tasks(HDFS_SERVICE_NAME) \ + return is_service_ready(HDFS_SERVICE_NAME, expected_tasks) + + +def is_service_ready(service_name, expected_tasks): + running_tasks = [t for t in shakedown.get_service_tasks(service_name) \ if t['state'] == 'TASK_RUNNING'] return len(running_tasks) >= expected_tasks @@ -138,7 +143,11 @@ def _get_spark_options(options = None): def run_tests(app_url, app_args, expected_output, args=[]): - task_id = _submit_job(app_url, app_args, args) + task_id = submit_job(app_url, app_args, args) + check_job_output(task_id, expected_output) + + +def check_job_output(task_id, expected_output): LOGGER.info('Waiting for task id={} to complete'.format(task_id)) shakedown.wait_for_task_completion(task_id) stdout = _task_log(task_id) @@ -167,7 +176,19 @@ def create_secret(name, value): dcos.http.put(url, data=json.dumps(data)) -def _submit_job(app_url, app_args, args=[]): +def upload_file(file_path): + LOGGER.info("Uploading {} to s3://{}/{}".format( + file_path, + os.environ['S3_BUCKET'], + os.environ['S3_PREFIX'])) + + s3.upload_file(file_path) + + basename = os.path.basename(file_path) + return s3.http_url(basename) + + +def submit_job(app_url, app_args, args=[]): if is_strict(): args += ["--conf", 'spark.mesos.driverEnv.MESOS_MODULES=file:///opt/mesosphere/etc/mesos-scheduler-modules/dcos_authenticatee_module.json'] args += ["--conf", 'spark.mesos.driverEnv.MESOS_AUTHENTICATEE=com_mesosphere_dcos_ClassicRPCAuthenticatee'] @@ -193,3 +214,8 @@ def _task_log(task_id, filename=None): LOGGER.info("Running {}".format(cmd)) stdout = subprocess.check_output(cmd, shell=True).decode('utf-8') return stdout + + +def is_framework_completed(fw_name): + # The framework is not Active or Inactive + return shakedown.get_service(fw_name, True) is None