diff --git a/great_expectations/core/util.py b/great_expectations/core/util.py index e915ee83fd34..6c71c790d787 100644 --- a/great_expectations/core/util.py +++ b/great_expectations/core/util.py @@ -459,6 +459,7 @@ def sniff_s3_compression(s3_url: S3Url) -> str: # noinspection PyPep8Naming def get_or_create_spark_application( spark_config: Optional[Dict[str, str]] = None, + force_reuse_spark_context: Optional[bool] = False, ): # Due to the uniqueness of SparkContext per JVM, it is impossible to change SparkSession configuration dynamically. # Attempts to circumvent this constraint cause "ValueError: Cannot run multiple SparkContexts at once" to be thrown. @@ -491,7 +492,7 @@ def get_or_create_spark_application( # noinspection PyProtectedMember sc_stopped: bool = spark_session.sparkContext._jsc.sc().isStopped() - if spark_restart_required( + if not force_reuse_spark_context and spark_restart_required( current_spark_config=spark_session.sparkContext.getConf().getAll(), desired_spark_config=spark_config, ): @@ -576,7 +577,7 @@ def spark_restart_required( ): return True - if not set([(k, v) for k, v in desired_spark_config.items()]).issubset( + if not {(k, v) for k, v in desired_spark_config.items()}.issubset( current_spark_config ): return True diff --git a/great_expectations/datasource/sparkdf_datasource.py b/great_expectations/datasource/sparkdf_datasource.py index 19bab7632031..1bba75c09248 100644 --- a/great_expectations/datasource/sparkdf_datasource.py +++ b/great_expectations/datasource/sparkdf_datasource.py @@ -67,6 +67,7 @@ def build_configuration( data_asset_type=None, batch_kwargs_generators=None, spark_config=None, + force_reuse_spark_context=False, **kwargs ): """ @@ -96,8 +97,13 @@ def build_configuration( configuration = kwargs configuration.update( - {"data_asset_type": data_asset_type, "spark_config": spark_config} + { + "data_asset_type": data_asset_type, + "spark_config": spark_config, + "force_reuse_spark_context": force_reuse_spark_context, + } ) + if batch_kwargs_generators: configuration["batch_kwargs_generators"] = batch_kwargs_generators @@ -110,6 +116,7 @@ def __init__( data_asset_type=None, batch_kwargs_generators=None, spark_config=None, + force_reuse_spark_context=False, **kwargs ): """Build a new SparkDFDatasource instance. @@ -123,7 +130,11 @@ def __init__( **kwargs: Additional """ configuration_with_defaults = SparkDFDatasource.build_configuration( - data_asset_type, batch_kwargs_generators, spark_config, **kwargs + data_asset_type, + batch_kwargs_generators, + spark_config, + force_reuse_spark_context, + **kwargs ) data_asset_type = configuration_with_defaults.pop("data_asset_type") batch_kwargs_generators = configuration_with_defaults.pop( @@ -139,7 +150,10 @@ def __init__( if spark_config is None: spark_config = {} - spark = get_or_create_spark_application(spark_config=spark_config) + spark = get_or_create_spark_application( + spark_config=spark_config, + force_reuse_spark_context=force_reuse_spark_context, + ) self.spark = spark self._build_generators() diff --git a/great_expectations/execution_engine/sparkdf_execution_engine.py b/great_expectations/execution_engine/sparkdf_execution_engine.py index d02e8258503c..f6d8f53ef7bb 100644 --- a/great_expectations/execution_engine/sparkdf_execution_engine.py +++ b/great_expectations/execution_engine/sparkdf_execution_engine.py @@ -142,14 +142,24 @@ class SparkDFExecutionEngine(ExecutionEngine): "reader_options", } - def __init__(self, *args, persist=True, spark_config=None, **kwargs): + def __init__( + self, + *args, + persist=True, + spark_config=None, + force_reuse_spark_context=False, + **kwargs, + ): # Creation of the Spark DataFrame is done outside this class self._persist = persist if spark_config is None: spark_config = {} - spark: SparkSession = get_or_create_spark_application(spark_config=spark_config) + spark: SparkSession = get_or_create_spark_application( + spark_config=spark_config, + force_reuse_spark_context=force_reuse_spark_context, + ) spark_config = dict(spark_config) spark_config.update({k: v for (k, v) in spark.sparkContext.getConf().getAll()})