Skip to content

Commit

Permalink
[MAINTENANCE] [MAINTENANCE] Add force_reuse_spark_context to Datasour…
Browse files Browse the repository at this point in the history
…ceConfigSchema (#3126)

* [MAINTENANCE]  Adds spark_config and force_reuse_spark_context fields to DataSourceConfigSchema

* Adds changes bullet point to docs_rtd/changelog.rst

* remove spark_config from DatasourceConfigSchema

* allow None for force_reuse_spark_context

* fix linting

* remove missing entry for force_reuse_spark_context

* adding back spark_config

* adding unittests for `force_reuse_spark_context`

* adding unittest for `spark_config` pass through

* adding unittest for `spark_config` pass through + run black

* add docstrings to unittests to clarify intent

* add isort:skip for spark imports

* rename test to be more meaningful

* fix TypeError in unittest

* adding a non-default config in order to hopefully see the Py4jError in CI

* remove explicit P4jError test

* add explicit test for sparkdf kwargs pass-through

* update changelog entry to reflect this is a BUGFIX

* fix linting

Co-authored-by: Micha BA Kunze <michabenachim.kunze@maersk.com>
  • Loading branch information
pasmavie and mbakunze authored Aug 20, 2021
1 parent 99a5437 commit 5d818ee
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs_rtd/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ Changelog

develop
-----------------

* [BUGFIX] Add force_reuse_spark_context to DatasourceConfigSchema
* [BUGFIX] Remove fixture parameterization for Cloud DBs (Snowflake and BigQuery) #3182
* [MAINTENANCE] Add force_reuse_spark_context to DatasourceConfigSchema
* [FEATURE] Implement V3 expect_column_pair_values_to_be_in_set expectation for SQL Alchemy execution engine (#3278, #3281)
* [FEATURE] Implement V3 expect_column_pair_values_to_be_equal expectation for SQL Alchemy execution engine (#3267)
* [FEATURE] Implement V3 expect_column_pair_values_to_be_equal expectation for Pandas execution engine (#3252)
Expand Down Expand Up @@ -41,6 +45,7 @@ develop
* [MAINTENANCE] Allow `PandasExecutionEngine` to accept `Azure DataConnectors` (#3214)
* [MAINTENANCE] Standardize Arguments to MetricConfiguration Constructor; Use {} instead of dict(). (#3246)


0.13.28
-----------------
* [FEATURE] Implement ColumnPairValuesInSet metric for PandasExecutionEngine
Expand Down
5 changes: 4 additions & 1 deletion great_expectations/data_context/types/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,10 @@ class Meta:

class_name = fields.String(missing="Datasource")
module_name = fields.String(missing="great_expectations.datasource")

force_reuse_spark_context = fields.Bool(required=False, allow_none=True)
spark_config = fields.Dict(
keys=fields.Str(), values=fields.Str(), required=False, allow_none=True
)
execution_engine = fields.Nested(
ExecutionEngineConfigSchema, required=False, allow_none=True
)
Expand Down
88 changes: 88 additions & 0 deletions tests/datasource/test_sparkdf_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,94 @@ def test_sparkdf_datasource_custom_data_asset(
assert res.success is True


def test_force_reuse_spark_context(
data_context_parameterized_expectation_suite, tmp_path_factory, test_backends
):
"""
Ensure that using an external sparkSession can be used by specfying the
force_reuse_spark_context argument.
"""
if "SparkDFDataset" not in test_backends:
pytest.skip("No spark backend selected.")
from pyspark.sql import SparkSession # isort:skip

dataset_name = "test_spark_dataset"

spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate()
data = {"col1": [0, 1, 2], "col2": ["a", "b", "c"]}

spark_df = spark.createDataFrame(pd.DataFrame(data))
tmp_parquet_filename = os.path.join(
tmp_path_factory.mktemp(dataset_name).as_posix(), dataset_name
)
spark_df.write.format("parquet").save(tmp_parquet_filename)

data_context_parameterized_expectation_suite.add_datasource(
dataset_name,
class_name="SparkDFDatasource",
force_reuse_spark_context=True,
module_name="great_expectations.datasource",
batch_kwargs_generators={},
)

df = spark.read.format("parquet").load(tmp_parquet_filename)
batch_kwargs = {"dataset": df, "datasource": dataset_name}
_ = data_context_parameterized_expectation_suite.create_expectation_suite(
dataset_name
)
batch = data_context_parameterized_expectation_suite.get_batch(
batch_kwargs=batch_kwargs, expectation_suite_name=dataset_name
)
results = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100)
assert results.success, "Failed to use external SparkSession"
spark.stop()


def test_spark_kwargs_are_passed_through(
data_context_parameterized_expectation_suite,
tmp_path_factory,
test_backends,
spark_session,
):
"""
Ensure that an external SparkSession is not stopped when the spark_config matches
the one specfied in the GE Context.
"""
if "SparkDFDataset" not in test_backends:
pytest.skip("No spark backend selected.")
dataset_name = "test_spark_dataset"
data_context_parameterized_expectation_suite.add_datasource(
dataset_name,
class_name="SparkDFDatasource",
spark_config=dict(spark_session.sparkContext.getConf().getAll()),
force_reuse_spark_context=False,
module_name="great_expectations.datasource",
batch_kwargs_generators={},
)
datasource_config = data_context_parameterized_expectation_suite.get_datasource(
dataset_name
).config
assert datasource_config["spark_config"] == dict(
spark_session.sparkContext.getConf().getAll()
)
assert datasource_config["force_reuse_spark_context"] == False

dataset_name = "test_spark_dataset_2"
data_context_parameterized_expectation_suite.add_datasource(
dataset_name,
class_name="SparkDFDatasource",
spark_config={},
force_reuse_spark_context=True,
module_name="great_expectations.datasource",
batch_kwargs_generators={},
)
datasource_config = data_context_parameterized_expectation_suite.get_datasource(
dataset_name
).config
assert datasource_config["spark_config"] == {}
assert datasource_config["force_reuse_spark_context"] == True


def test_create_sparkdf_datasource(
data_context_parameterized_expectation_suite, tmp_path_factory, test_backends
):
Expand Down

0 comments on commit 5d818ee

Please sign in to comment.