From 7201b7722bbfb3768dc0b47885552c5c8deffb59 Mon Sep 17 00:00:00 2001 From: gipaetusb Date: Wed, 28 Jul 2021 10:18:00 +0200 Subject: [PATCH 01/19] [MAINTENANCE] Adds spark_config and force_reuse_spark_context fields to DataSourceConfigSchema --- great_expectations/data_context/types/base.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/great_expectations/data_context/types/base.py b/great_expectations/data_context/types/base.py index 44f0727e57d9..91558693c1d9 100644 --- a/great_expectations/data_context/types/base.py +++ b/great_expectations/data_context/types/base.py @@ -634,6 +634,11 @@ class Meta: class_name = fields.String(missing="Datasource") module_name = fields.String(missing="great_expectations.datasource") + spark_config = fields.Dict( + keys=fields.Str(), values=fields.Str(), required=False, allow_none=True + ) + force_reuse_spark_context = fields.Bool(required=False, missing=False) + execution_engine = fields.Nested( ExecutionEngineConfigSchema, required=False, allow_none=True ) From d2c80fa81e73061407cffc877e97dbd5d595fa12 Mon Sep 17 00:00:00 2001 From: gipaetusb <26484685+gipaetusb@users.noreply.github.com> Date: Fri, 6 Aug 2021 09:40:33 +0200 Subject: [PATCH 02/19] Adds changes bullet point to docs_rtd/changelog.rst --- docs_rtd/changelog.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs_rtd/changelog.rst b/docs_rtd/changelog.rst index b6152746d9c6..7538be5c7a8e 100644 --- a/docs_rtd/changelog.rst +++ b/docs_rtd/changelog.rst @@ -8,6 +8,7 @@ Changelog develop ----------------- * [BUGFIX] Remove fixture parameterization for Cloud DBs (Snowflake and BigQuery) #3182 +* [MAINTENANCE] Add force_reuse_spark_context to DatasourceConfigSchema 0.13.26 ----------------- From 01e5e51d5bdfb81fb84936fe91d837499ad28035 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Fri, 13 Aug 2021 23:22:35 +0200 Subject: [PATCH 03/19] remove spark_config from DatasourceConfigSchema --- great_expectations/data_context/types/base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/great_expectations/data_context/types/base.py b/great_expectations/data_context/types/base.py index aad52b5fa808..af1375899d1f 100644 --- a/great_expectations/data_context/types/base.py +++ b/great_expectations/data_context/types/base.py @@ -716,9 +716,6 @@ class Meta: class_name = fields.String(missing="Datasource") module_name = fields.String(missing="great_expectations.datasource") - spark_config = fields.Dict( - keys=fields.Str(), values=fields.Str(), required=False, allow_none=True - ) force_reuse_spark_context = fields.Bool(required=False, missing=False) execution_engine = fields.Nested( From 851cd04b879f071ffdecc59cee447bdfc7d1eafb Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Fri, 13 Aug 2021 23:32:59 +0200 Subject: [PATCH 04/19] allow None for force_reuse_spark_context --- great_expectations/data_context/types/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/great_expectations/data_context/types/base.py b/great_expectations/data_context/types/base.py index af1375899d1f..72e2e6c97a47 100644 --- a/great_expectations/data_context/types/base.py +++ b/great_expectations/data_context/types/base.py @@ -716,7 +716,7 @@ class Meta: class_name = fields.String(missing="Datasource") module_name = fields.String(missing="great_expectations.datasource") - force_reuse_spark_context = fields.Bool(required=False, missing=False) + force_reuse_spark_context = fields.Bool(required=False, missing=False, allow_none=True) execution_engine = fields.Nested( ExecutionEngineConfigSchema, required=False, allow_none=True From 9ec37356e17ab7f504acbc2a520fc7c8625c7807 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Fri, 13 Aug 2021 23:58:46 +0200 Subject: [PATCH 05/19] fix linting --- great_expectations/data_context/types/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/great_expectations/data_context/types/base.py b/great_expectations/data_context/types/base.py index 72e2e6c97a47..ca612013053c 100644 --- a/great_expectations/data_context/types/base.py +++ b/great_expectations/data_context/types/base.py @@ -715,9 +715,9 @@ class Meta: class_name = fields.String(missing="Datasource") module_name = fields.String(missing="great_expectations.datasource") - - force_reuse_spark_context = fields.Bool(required=False, missing=False, allow_none=True) - + force_reuse_spark_context = fields.Bool( + required=False, missing=False, allow_none=True + ) execution_engine = fields.Nested( ExecutionEngineConfigSchema, required=False, allow_none=True ) From c170c9b248fd5041a7d7695b3c83526eaf41b23b Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Sat, 14 Aug 2021 00:20:54 +0200 Subject: [PATCH 06/19] remove missing entry for force_reuse_spark_context --- great_expectations/data_context/types/base.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/great_expectations/data_context/types/base.py b/great_expectations/data_context/types/base.py index ca612013053c..80d77631a3fc 100644 --- a/great_expectations/data_context/types/base.py +++ b/great_expectations/data_context/types/base.py @@ -715,9 +715,7 @@ class Meta: class_name = fields.String(missing="Datasource") module_name = fields.String(missing="great_expectations.datasource") - force_reuse_spark_context = fields.Bool( - required=False, missing=False, allow_none=True - ) + force_reuse_spark_context = fields.Bool(required=False, allow_none=True) execution_engine = fields.Nested( ExecutionEngineConfigSchema, required=False, allow_none=True ) From c5a6c665846bf079768ca0e1847a294eaf650c93 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Sat, 14 Aug 2021 10:20:44 +0200 Subject: [PATCH 07/19] adding back spark_config --- great_expectations/data_context/types/base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/great_expectations/data_context/types/base.py b/great_expectations/data_context/types/base.py index 80d77631a3fc..12cb91ae3c8f 100644 --- a/great_expectations/data_context/types/base.py +++ b/great_expectations/data_context/types/base.py @@ -716,6 +716,9 @@ class Meta: class_name = fields.String(missing="Datasource") module_name = fields.String(missing="great_expectations.datasource") force_reuse_spark_context = fields.Bool(required=False, allow_none=True) + spark_config = fields.Dict( + keys=fields.Str(), values=fields.Str(), required=False, allow_none=True + ) execution_engine = fields.Nested( ExecutionEngineConfigSchema, required=False, allow_none=True ) From 10aca2ac01c0092f2c4e040a8d424789b7435120 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Mon, 16 Aug 2021 20:33:22 +0200 Subject: [PATCH 08/19] adding unittests for `force_reuse_spark_context` --- tests/datasource/test_sparkdf_datasource.py | 91 +++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index 98612b5b47f3..282cdf41195f 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -98,6 +98,97 @@ def test_sparkdf_datasource_custom_data_asset( assert res.success is True +def test_force_reuse_spark_context( + data_context_parameterized_expectation_suite, + tmp_path_factory, + test_backends +): + if "SparkDFDataset" not in test_backends: + pytest.skip("No spark backend selected.") + + from pyspark.sql import SparkSession + dataset_name = "test_spark_dataset" + + spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() + data = { + "col1": [0, 1, 2], + "col2": ["a", "b", "c"] + } + + spark_df = spark.createDataFrame(pd.DataFrame(data)) + tmp_parquet_filename=os.path.join( + tmp_path_factory.mktemp(dataset_name).as_posix(), + dataset_name + ) + spark_df.write.format("parquet").save(tmp_parquet_filename) + + data_context_parameterized_expectation_suite.add_datasource( + dataset_name, + class_name="SparkDFDatasource", + force_reuse_spark_context=True, + module_name="great_expectations.datasource", + batch_kwargs_generators={} + ) + + df = spark.read.format("parquet").load(tmp_parquet_filename) + batch_kwargs = {"dataset": df, "datasource": dataset_name} + _ = ( + data_context_parameterized_expectation_suite + .create_expectation_suite(dataset_name) + ) + batch = data_context_parameterized_expectation_suite.get_batch( + batch_kwargs=batch_kwargs, + expectation_suite_name=dataset_name + ) + results = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100) + assert results.success, "Failed to use external SparkSession" + spark.stop() + + +def test_erroring_force_reuse_spark_context( + data_context_parameterized_expectation_suite, + tmp_path_factory, + test_backends +): + if "SparkDFDataset" not in test_backends: + pytest.skip("No spark backend selected.") + from pyspark.sql import SparkSession + from py4j.java_collections import Py4JError + dataset_name = "test_dataset" + tmp_parquet_filename = os.path.join( + tmp_path_factory.mktemp(dataset_name).as_posix(), + dataset_name + ) + spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() + data = { + "col1": [0, 1, 2], + "col2": ["a", "b", "c"] + } + + spark_df = spark.createDataFrame(pd.DataFrame(data)) + spark_df.write.format("parquet").save(tmp_parquet_filename) + + data_context_parameterized_expectation_suite.add_datasource( + dataset_name, + class_name="SparkDFDatasource", + # not using to force_reuse_spark_context to create the expected Py4JError + module_name="great_expectations.datasource", + batch_kwargs_generators={} + ) + with pytest.raises(Py4JError): + df = spark.read.format("parquet").load(tmp_parquet_filename) + batch_kwargs = {"dataset": df, "datasource": dataset_name} + _ = ( + data_context_parameterized_expectation_suite + .create_expectation_suite(dataset_name) + ) + batch = data_context_parameterized_expectation_suite.get_batch( + batch_kwargs=batch_kwargs, + expectation_suite_name=dataset_name + ) + _ = batch.expect_column_max_to_be_between("col1", min=1, max=100) + + def test_create_sparkdf_datasource( data_context_parameterized_expectation_suite, tmp_path_factory, test_backends ): From cb39f1b40a63e9bd2b4ff2c28237657078c95cda Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Mon, 16 Aug 2021 20:41:56 +0200 Subject: [PATCH 09/19] adding unittest for `spark_config` pass through --- tests/datasource/test_sparkdf_datasource.py | 48 +++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index 282cdf41195f..b639991f5d22 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -145,6 +145,54 @@ def test_force_reuse_spark_context( spark.stop() +def test_spark_config_is_passed_through( + data_context_parameterized_expectation_suite, + tmp_path_factory, + test_backends +): + if "SparkDFDataset" not in test_backends: + pytest.skip("No spark backend selected.") + + from pyspark.sql import SparkSession + dataset_name = "test_spark_dataset" + + spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() + spark.conf.set("spark.sql.shuffle.partitions", "2") + data = { + "col1": [0, 1, 2], + "col2": ["a", "b", "c"] + } + + spark_df = spark.createDataFrame(pd.DataFrame(data)) + tmp_parquet_filename=os.path.join( + tmp_path_factory.mktemp(dataset_name).as_posix(), + dataset_name + ) + spark_df.write.format("parquet").save(tmp_parquet_filename) + + data_context_parameterized_expectation_suite.add_datasource( + dataset_name, + class_name="SparkDFDatasource", + spark_config=dict(spark.sparkContext.getConf().getAll()), + module_name="great_expectations.datasource", + batch_kwargs_generators={} + ) + + df = spark.read.format("parquet").load(tmp_parquet_filename) + batch_kwargs = {"dataset": df, "datasource": dataset_name} + _ = ( + data_context_parameterized_expectation_suite + .create_expectation_suite(dataset_name) + ) + batch = data_context_parameterized_expectation_suite.get_batch( + batch_kwargs=batch_kwargs, + expectation_suite_name=dataset_name + ) + results = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100) + assert results.success, "Failed to use external SparkSession eventhough the config is matching." + spark.stop() + + def test_erroring_force_reuse_spark_context( data_context_parameterized_expectation_suite, tmp_path_factory, From cfb8c98060f80973d6534440dabea4761c330f17 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Mon, 16 Aug 2021 20:48:58 +0200 Subject: [PATCH 10/19] adding unittest for `spark_config` pass through + run black --- tests/datasource/test_sparkdf_datasource.py | 77 ++++++++------------- 1 file changed, 29 insertions(+), 48 deletions(-) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index b639991f5d22..33fd64672d05 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -99,26 +99,21 @@ def test_sparkdf_datasource_custom_data_asset( def test_force_reuse_spark_context( - data_context_parameterized_expectation_suite, - tmp_path_factory, - test_backends + data_context_parameterized_expectation_suite, tmp_path_factory, test_backends ): if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") from pyspark.sql import SparkSession + dataset_name = "test_spark_dataset" spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() - data = { - "col1": [0, 1, 2], - "col2": ["a", "b", "c"] - } + data = {"col1": [0, 1, 2], "col2": ["a", "b", "c"]} spark_df = spark.createDataFrame(pd.DataFrame(data)) - tmp_parquet_filename=os.path.join( - tmp_path_factory.mktemp(dataset_name).as_posix(), - dataset_name + tmp_parquet_filename = os.path.join( + tmp_path_factory.mktemp(dataset_name).as_posix(), dataset_name ) spark_df.write.format("parquet").save(tmp_parquet_filename) @@ -127,18 +122,16 @@ def test_force_reuse_spark_context( class_name="SparkDFDatasource", force_reuse_spark_context=True, module_name="great_expectations.datasource", - batch_kwargs_generators={} + batch_kwargs_generators={}, ) df = spark.read.format("parquet").load(tmp_parquet_filename) batch_kwargs = {"dataset": df, "datasource": dataset_name} - _ = ( - data_context_parameterized_expectation_suite - .create_expectation_suite(dataset_name) + _ = data_context_parameterized_expectation_suite.create_expectation_suite( + dataset_name ) batch = data_context_parameterized_expectation_suite.get_batch( - batch_kwargs=batch_kwargs, - expectation_suite_name=dataset_name + batch_kwargs=batch_kwargs, expectation_suite_name=dataset_name ) results = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100) assert results.success, "Failed to use external SparkSession" @@ -146,27 +139,22 @@ def test_force_reuse_spark_context( def test_spark_config_is_passed_through( - data_context_parameterized_expectation_suite, - tmp_path_factory, - test_backends + data_context_parameterized_expectation_suite, tmp_path_factory, test_backends ): if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") from pyspark.sql import SparkSession + dataset_name = "test_spark_dataset" spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() spark.conf.set("spark.sql.shuffle.partitions", "2") - data = { - "col1": [0, 1, 2], - "col2": ["a", "b", "c"] - } + data = {"col1": [0, 1, 2], "col2": ["a", "b", "c"]} spark_df = spark.createDataFrame(pd.DataFrame(data)) - tmp_parquet_filename=os.path.join( - tmp_path_factory.mktemp(dataset_name).as_posix(), - dataset_name + tmp_parquet_filename = os.path.join( + tmp_path_factory.mktemp(dataset_name).as_posix(), dataset_name ) spark_df.write.format("parquet").save(tmp_parquet_filename) @@ -175,43 +163,38 @@ def test_spark_config_is_passed_through( class_name="SparkDFDatasource", spark_config=dict(spark.sparkContext.getConf().getAll()), module_name="great_expectations.datasource", - batch_kwargs_generators={} + batch_kwargs_generators={}, ) df = spark.read.format("parquet").load(tmp_parquet_filename) batch_kwargs = {"dataset": df, "datasource": dataset_name} - _ = ( - data_context_parameterized_expectation_suite - .create_expectation_suite(dataset_name) + _ = data_context_parameterized_expectation_suite.create_expectation_suite( + dataset_name ) batch = data_context_parameterized_expectation_suite.get_batch( - batch_kwargs=batch_kwargs, - expectation_suite_name=dataset_name + batch_kwargs=batch_kwargs, expectation_suite_name=dataset_name ) results = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100) - assert results.success, "Failed to use external SparkSession eventhough the config is matching." + assert ( + results.success + ), "Failed to use external SparkSession eventhough the config is matching." spark.stop() def test_erroring_force_reuse_spark_context( - data_context_parameterized_expectation_suite, - tmp_path_factory, - test_backends + data_context_parameterized_expectation_suite, tmp_path_factory, test_backends ): if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") from pyspark.sql import SparkSession from py4j.java_collections import Py4JError + dataset_name = "test_dataset" tmp_parquet_filename = os.path.join( - tmp_path_factory.mktemp(dataset_name).as_posix(), - dataset_name + tmp_path_factory.mktemp(dataset_name).as_posix(), dataset_name ) spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() - data = { - "col1": [0, 1, 2], - "col2": ["a", "b", "c"] - } + data = {"col1": [0, 1, 2], "col2": ["a", "b", "c"]} spark_df = spark.createDataFrame(pd.DataFrame(data)) spark_df.write.format("parquet").save(tmp_parquet_filename) @@ -221,18 +204,16 @@ def test_erroring_force_reuse_spark_context( class_name="SparkDFDatasource", # not using to force_reuse_spark_context to create the expected Py4JError module_name="great_expectations.datasource", - batch_kwargs_generators={} + batch_kwargs_generators={}, ) with pytest.raises(Py4JError): df = spark.read.format("parquet").load(tmp_parquet_filename) batch_kwargs = {"dataset": df, "datasource": dataset_name} - _ = ( - data_context_parameterized_expectation_suite - .create_expectation_suite(dataset_name) + _ = data_context_parameterized_expectation_suite.create_expectation_suite( + dataset_name ) batch = data_context_parameterized_expectation_suite.get_batch( - batch_kwargs=batch_kwargs, - expectation_suite_name=dataset_name + batch_kwargs=batch_kwargs, expectation_suite_name=dataset_name ) _ = batch.expect_column_max_to_be_between("col1", min=1, max=100) From 82abefe62cbb60107db5fe25cbdc463849aafba6 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Mon, 16 Aug 2021 20:58:11 +0200 Subject: [PATCH 11/19] add docstrings to unittests to clarify intent --- tests/datasource/test_sparkdf_datasource.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index 33fd64672d05..5c26d80d500d 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -101,6 +101,10 @@ def test_sparkdf_datasource_custom_data_asset( def test_force_reuse_spark_context( data_context_parameterized_expectation_suite, tmp_path_factory, test_backends ): + """ + Ensure that using an external sparkSession can be used by specfying the + force_reuse_spark_context argument. + """ if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") @@ -141,6 +145,10 @@ def test_force_reuse_spark_context( def test_spark_config_is_passed_through( data_context_parameterized_expectation_suite, tmp_path_factory, test_backends ): + """ + Ensure that an external SparkSession is not stopped when the spark_config matches + the one specfied in the GE Context. + """ if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") @@ -184,6 +192,11 @@ def test_spark_config_is_passed_through( def test_erroring_force_reuse_spark_context( data_context_parameterized_expectation_suite, tmp_path_factory, test_backends ): + """ + Ensure an external sparkSession is stopped/re-created when not fitting the + spark_config or not having force_reuse_spark_context=True + """ + if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") from pyspark.sql import SparkSession From d336103b7f6684647c2ed4562595b0f23a43765e Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Mon, 16 Aug 2021 21:08:53 +0200 Subject: [PATCH 12/19] add isort:skip for spark imports --- tests/datasource/test_sparkdf_datasource.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index 5c26d80d500d..ad58521ef5b4 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -107,8 +107,7 @@ def test_force_reuse_spark_context( """ if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") - - from pyspark.sql import SparkSession + from pyspark.sql import SparkSession # isort:skip dataset_name = "test_spark_dataset" @@ -151,8 +150,7 @@ def test_spark_config_is_passed_through( """ if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") - - from pyspark.sql import SparkSession + from pyspark.sql import SparkSession # isort:skip dataset_name = "test_spark_dataset" @@ -199,8 +197,8 @@ def test_erroring_force_reuse_spark_context( if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") - from pyspark.sql import SparkSession - from py4j.java_collections import Py4JError + from pyspark.sql import SparkSession # isort:skip + from py4j.java_collections import Py4JError # isort:skip dataset_name = "test_dataset" tmp_parquet_filename = os.path.join( From 3ae38057e53b21a845a09385f7323033beab5c05 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Mon, 16 Aug 2021 21:09:40 +0200 Subject: [PATCH 13/19] rename test to be more meaningful --- tests/datasource/test_sparkdf_datasource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index ad58521ef5b4..17ab6d87e5d5 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -187,7 +187,7 @@ def test_spark_config_is_passed_through( spark.stop() -def test_erroring_force_reuse_spark_context( +def test_erroring_when_using_external_spark_context( data_context_parameterized_expectation_suite, tmp_path_factory, test_backends ): """ From 16f1c71891943a7944cd430c81202569c7c20102 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Mon, 16 Aug 2021 22:13:14 +0200 Subject: [PATCH 14/19] fix TypeError in unittest --- tests/datasource/test_sparkdf_datasource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index 17ab6d87e5d5..2fb29370ebe4 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -226,7 +226,7 @@ def test_erroring_when_using_external_spark_context( batch = data_context_parameterized_expectation_suite.get_batch( batch_kwargs=batch_kwargs, expectation_suite_name=dataset_name ) - _ = batch.expect_column_max_to_be_between("col1", min=1, max=100) + _ = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100) def test_create_sparkdf_datasource( From 0a4ff1d377314f882cb0841eea86a05f05c82b8c Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Mon, 16 Aug 2021 22:47:12 +0200 Subject: [PATCH 15/19] adding a non-default config in order to hopefully see the Py4jError in CI --- tests/datasource/test_sparkdf_datasource.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index 2fb29370ebe4..5eaeb85678ae 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -205,6 +205,7 @@ def test_erroring_when_using_external_spark_context( tmp_path_factory.mktemp(dataset_name).as_posix(), dataset_name ) spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() + spark.conf.set("spark.sql.shuffle.partitions", "2") # set some non default value data = {"col1": [0, 1, 2], "col2": ["a", "b", "c"]} spark_df = spark.createDataFrame(pd.DataFrame(data)) From 6bc4ac717937f2eed66c5e2e09cbc9a80808fcc7 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Mon, 16 Aug 2021 23:27:43 +0200 Subject: [PATCH 16/19] remove explicit P4jError test --- tests/datasource/test_sparkdf_datasource.py | 43 --------------------- 1 file changed, 43 deletions(-) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index 5eaeb85678ae..886add9d5722 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -187,49 +187,6 @@ def test_spark_config_is_passed_through( spark.stop() -def test_erroring_when_using_external_spark_context( - data_context_parameterized_expectation_suite, tmp_path_factory, test_backends -): - """ - Ensure an external sparkSession is stopped/re-created when not fitting the - spark_config or not having force_reuse_spark_context=True - """ - - if "SparkDFDataset" not in test_backends: - pytest.skip("No spark backend selected.") - from pyspark.sql import SparkSession # isort:skip - from py4j.java_collections import Py4JError # isort:skip - - dataset_name = "test_dataset" - tmp_parquet_filename = os.path.join( - tmp_path_factory.mktemp(dataset_name).as_posix(), dataset_name - ) - spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() - spark.conf.set("spark.sql.shuffle.partitions", "2") # set some non default value - data = {"col1": [0, 1, 2], "col2": ["a", "b", "c"]} - - spark_df = spark.createDataFrame(pd.DataFrame(data)) - spark_df.write.format("parquet").save(tmp_parquet_filename) - - data_context_parameterized_expectation_suite.add_datasource( - dataset_name, - class_name="SparkDFDatasource", - # not using to force_reuse_spark_context to create the expected Py4JError - module_name="great_expectations.datasource", - batch_kwargs_generators={}, - ) - with pytest.raises(Py4JError): - df = spark.read.format("parquet").load(tmp_parquet_filename) - batch_kwargs = {"dataset": df, "datasource": dataset_name} - _ = data_context_parameterized_expectation_suite.create_expectation_suite( - dataset_name - ) - batch = data_context_parameterized_expectation_suite.get_batch( - batch_kwargs=batch_kwargs, expectation_suite_name=dataset_name - ) - _ = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100) - - def test_create_sparkdf_datasource( data_context_parameterized_expectation_suite, tmp_path_factory, test_backends ): From e0a128e58fa724ab7a884a1babee29a01c5ddc82 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Tue, 17 Aug 2021 10:58:41 +0200 Subject: [PATCH 17/19] add explicit test for sparkdf kwargs pass-through --- tests/datasource/test_sparkdf_datasource.py | 46 ++++++++------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index 886add9d5722..513c134eff88 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -141,8 +141,8 @@ def test_force_reuse_spark_context( spark.stop() -def test_spark_config_is_passed_through( - data_context_parameterized_expectation_suite, tmp_path_factory, test_backends +def test_spark_kwargs_are_passed_through( + data_context_parameterized_expectation_suite, tmp_path_factory, test_backends, spark_session ): """ Ensure that an external SparkSession is not stopped when the spark_config matches @@ -150,41 +150,31 @@ def test_spark_config_is_passed_through( """ if "SparkDFDataset" not in test_backends: pytest.skip("No spark backend selected.") - from pyspark.sql import SparkSession # isort:skip - dataset_name = "test_spark_dataset" - - spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() - spark.conf.set("spark.sql.shuffle.partitions", "2") - data = {"col1": [0, 1, 2], "col2": ["a", "b", "c"]} - - spark_df = spark.createDataFrame(pd.DataFrame(data)) - tmp_parquet_filename = os.path.join( - tmp_path_factory.mktemp(dataset_name).as_posix(), dataset_name - ) - spark_df.write.format("parquet").save(tmp_parquet_filename) - data_context_parameterized_expectation_suite.add_datasource( dataset_name, class_name="SparkDFDatasource", - spark_config=dict(spark.sparkContext.getConf().getAll()), + spark_config=dict(spark_session.sparkContext.getConf().getAll()), + force_reuse_spark_context=False, module_name="great_expectations.datasource", batch_kwargs_generators={}, ) + datasource_config = data_context_parameterized_expectation_suite.get_datasource(dataset_name).config + assert datasource_config["spark_config"] == dict(spark_session.sparkContext.getConf().getAll()) + assert datasource_config["force_reuse_spark_context"] == False - df = spark.read.format("parquet").load(tmp_parquet_filename) - batch_kwargs = {"dataset": df, "datasource": dataset_name} - _ = data_context_parameterized_expectation_suite.create_expectation_suite( - dataset_name - ) - batch = data_context_parameterized_expectation_suite.get_batch( - batch_kwargs=batch_kwargs, expectation_suite_name=dataset_name + dataset_name = "test_spark_dataset_2" + data_context_parameterized_expectation_suite.add_datasource( + dataset_name, + class_name="SparkDFDatasource", + spark_config={}, + force_reuse_spark_context=True, + module_name="great_expectations.datasource", + batch_kwargs_generators={}, ) - results = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100) - assert ( - results.success - ), "Failed to use external SparkSession eventhough the config is matching." - spark.stop() + datasource_config = data_context_parameterized_expectation_suite.get_datasource(dataset_name).config + assert datasource_config["spark_config"] == {} + assert datasource_config["force_reuse_spark_context"] == True def test_create_sparkdf_datasource( From ff93e0b2c384b85ae8ed762c589909db9265c3c5 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Tue, 17 Aug 2021 11:01:51 +0200 Subject: [PATCH 18/19] update changelog entry to reflect this is a BUGFIX --- docs_rtd/changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs_rtd/changelog.rst b/docs_rtd/changelog.rst index 13448f49e34b..adcbfbc3e2e6 100644 --- a/docs_rtd/changelog.rst +++ b/docs_rtd/changelog.rst @@ -8,7 +8,7 @@ Changelog develop ----------------- -* [MAINTENANCE] Add force_reuse_spark_context to DatasourceConfigSchema +* [BUGFIX] Add force_reuse_spark_context to DatasourceConfigSchema * [FEATURE] Implement V3 expect_column_pair_values_to_be_equal expectation for Pandas execution engine (#3252) From 3ca3fb18ba4b2c8b99d222b51f6bd5a63d8d0723 Mon Sep 17 00:00:00 2001 From: Micha BA Kunze Date: Tue, 17 Aug 2021 11:05:56 +0200 Subject: [PATCH 19/19] fix linting --- tests/datasource/test_sparkdf_datasource.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py index 513c134eff88..82b2aa2ad11f 100644 --- a/tests/datasource/test_sparkdf_datasource.py +++ b/tests/datasource/test_sparkdf_datasource.py @@ -142,7 +142,10 @@ def test_force_reuse_spark_context( def test_spark_kwargs_are_passed_through( - data_context_parameterized_expectation_suite, tmp_path_factory, test_backends, spark_session + data_context_parameterized_expectation_suite, + tmp_path_factory, + test_backends, + spark_session, ): """ Ensure that an external SparkSession is not stopped when the spark_config matches @@ -159,8 +162,12 @@ def test_spark_kwargs_are_passed_through( module_name="great_expectations.datasource", batch_kwargs_generators={}, ) - datasource_config = data_context_parameterized_expectation_suite.get_datasource(dataset_name).config - assert datasource_config["spark_config"] == dict(spark_session.sparkContext.getConf().getAll()) + datasource_config = data_context_parameterized_expectation_suite.get_datasource( + dataset_name + ).config + assert datasource_config["spark_config"] == dict( + spark_session.sparkContext.getConf().getAll() + ) assert datasource_config["force_reuse_spark_context"] == False dataset_name = "test_spark_dataset_2" @@ -172,7 +179,9 @@ def test_spark_kwargs_are_passed_through( module_name="great_expectations.datasource", batch_kwargs_generators={}, ) - datasource_config = data_context_parameterized_expectation_suite.get_datasource(dataset_name).config + datasource_config = data_context_parameterized_expectation_suite.get_datasource( + dataset_name + ).config assert datasource_config["spark_config"] == {} assert datasource_config["force_reuse_spark_context"] == True