Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

great_expectations.exceptions.exceptions.MetricResolutionError: 'NoneType' object has no attribute 'setCallSite' #3622

Closed
oscaratnc opened this issue Nov 1, 2021 · 7 comments
Labels
community devrel This item is being addressed by the Developer Relations Team

Comments

@oscaratnc
Copy link

Describe the bug
When trying to add an expectation into a expectation suite with a DF coming from a SparkDFExecutionEngine datasource it fails and throws the error in the title of this issue specifically when calling res = df.agg(*aggregate_cols).collect() in the resolve_metric_bundle function within sparkdf_execution_engine.py.

This happens when using the databrick-connector setup for working with the databricks cluster from a pycharm instance and a GE context created, but it doesn't fail if the context is not created, tried running a .head() function that includes a .collect() in it having the same result, with and without creating a GE context.

To Reproduce
Steps to reproduce the behavior:

  1. create a DataSource for Spark:
my_spark_datasource_config = DatasourceConfig(
     name='SparkDataSource',
     class_name="Datasource",
     execution_engine={"class_name": "SparkDFExecutionEngine"},
     data_connectors={
         "spark_connector": {
             "module_name": "great_expectations.datasource.data_connector",
             "class_name": "RuntimeDataConnector",
             "batch_identifiers": ['batch_data']
         }
     }
 )
  1. Generate a context (local storage for Expectations et al):
`base = os.getcwd()

local_stores_config = {
    'expectations_lcl_store': {'class_name': 'ExpectationsStore',
                               'store_backend': {
                                   'class_name': 'TupleFilesystemStoreBackend',
                                   'base_directory': base+'/great_expectations/expectations/',
                               }        },
    'validations_lcl_store': {'class_name': 'ValidationsStore',
                              'store_backend': {
                                  'class_name': 'TupleFilesystemStoreBackend',
                                  'base_directory': base+'/great_expectations/uncommited/validations/',
                              }         },
    'checkpoint_lcl_store':
        {'class_name': 'CheckpointStore',
         'store_backend': {
             'class_name': 'TupleFilesystemStoreBackend',
             'base_directory': base+'/great_expectations/checkpoints/',
         }                              },
    'evaluation_parameter_store': {'class_name': 'EvaluationParameterStore'} }

data_docs_config_lcl = {
    "lcl_site": {
        "class_name": "SiteBuilder",
        'show_how_to_buttons': False,
        "store_backend": {
            "class_name": "TupleFilesystemStoreBackend",
            "base_directory": base+"/great_expectations/uncommitted/data_docs/local_site/"
        },
        "site_index_builder": {
            "class_name": "DefaultSiteIndexBuilder",
        }, } }

stores = local_stores_config
expectations_store_name = "expectations_lcl_store"
validations_store_name = "validations_lcl_store"
data_docs_sites = data_docs_config_lcl

validations_operators = {
    "action_list_operator": {
        "class_name": "ActionListValidationOperator",
        "action_list": [
            {
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"},
            },
            {
                "name": "store_evaluation_params",
                "action": {"class_name": "StoreEvaluationParametersAction"},
            },
            {
                "name": "update_data_docs",
                "action": {"class_name": "UpdateDataDocsAction"},
            }, ],
    } }

# DataContext Configuration for YAML of Data Context
data_context_config = DataContextConfig(
    config_version=2,
    plugins_directory=None,
    config_variables_file_path=None,
    datasources={"SparkDataSource": my_spark_datasource_config},
    validation_operators=validations_operators,
    anonymous_usage_statistics={
        "enabled": False
    },

    stores=stores,
    expectations_store_name=expectations_store_name,
    validations_store_name=validations_store_name,
    evaluation_parameter_store_name="evaluation_parameter_store",
    data_docs_sites=data_docs_sites
)
context = BaseDataContext(project_config=data_context_config)
  1. Create a Spark Instance using databricks-connector in the WSL and add Expectations Test logic with a is_not_null Expectation:

@sparkConnect #creates spark session and parameters
def run_pipeline(spark, parameters): 
#wraps all databricks and spark functionality (GE included)
    '''
    1. Create a GreatExpectations Context: use function get_GE_contest(parameters)
    '''
    context = get_GE_context(parameters)
    print(context)
    sc = spark.sparkContext

    '''
    2. Get your data to analyze (Table DF): We will use and existing table in the Gold layer for this case
    '''
    df = spark.read.format('delta').load(table_url).limit(200)

    '''
    3. Create a Batch Request:
    '''
    batch_request = RuntimeBatchRequest(
        datasource_name="SparkDataSource",
        data_connector_name="spark_connector",
        data_asset_name='data_asset_name', #CHANGE THIS ACCORDINGLY
        runtime_parameters={
            'batch_data': df.select('*'), #CHANGE FOR YOUR BATCH
        }, batch_identifiers={"batch_data": "table_name_identifier"}) 

    '''
    4. Create or Use an expectations_suite:
    '''
    from great_expectations.exceptions import DataContextError
    expectation_suite_name = 
		f"{table_name}.{table_name}_ES_final_test_pycharm"
    try:
        suite = context.create_expectation_suite(expectation_suite_name)
    except DataContextError:
        suite = context.get_expectation_suite(expectation_suite_name)

    '''
    5. Create a Validator Object
    '''
    my_validator = context.get_validator(
        batch_request=batch_request,
        expectation_suite=suite
    )

    '''
    6. Add Expectations
    '''
    my_validator.expect_column_values_to_be_null(df.columns[0])
  1. Run the expectation and see error:

When running the expectation logic it shows the following error

message=str(e), failed_metrics=[x[0] for x in metric_fn_bundle]
great_expectations.exceptions.exceptions.MetricResolutionError: 'NoneType' object has no attribute 'setCallSite'
Calculating Metrics:  25%|██▌       | 2/8 [00:24<01:13, 12.18s/it]
21/11/01 10:37:07 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-e37d0da3-84f6-4979-914a-6166490d57fe. Falling back to Java IO way

I went into the code steps and traced it back to a df.collect() function in the def resolve_metric_bundle function within sparkdf_execution_engine.py.

aggregate_cols = []
for idx in range(len(aggregate["column_aggregates"])):
    column_aggregate = aggregate["column_aggregates"][idx]
    aggregate_id = str(uuid.uuid4())
    condition_ids.append(aggregate_id)
    aggregate_cols.append(column_aggregate)
res = df.agg(*aggregate_cols).collect() <------------- HERE

This sounded weird to me and I tried a df.collect() function directly before running the expectation logic, and it resulted in the same line of failing, BUT, when I tried it without running the context creation it worked perfectly so I wonder if GE generates it’s own spark session or how can I be able to use this kind of connections (pycharm-Databricks) from the GE perspective? Do you have an example of something like this?

Expected behavior
Everything runs smoothly so the result for the expectations are shown adn we can save the expectation suite.

Environment (please complete the following information):

  • Operating System: Windows 10 using Pycharm and WSL UBUNTU 18, Python3.7 and databricks-connector configured for a databricks cluster.
  • great_expectations, version 0.13.38

Additional context
When a collect is ran independently there are two scenarios:
-GE Data Context created: it fails df.head() [includes a collect()] ..FAIL
-No GE Data Context: SUCCESS df.head() [includes a collect()]

Found this but I don't know how applicable it is.... https://issues.apache.org/jira/browse/SPARK-27335?jql=text%20~%20%22setcallsite%22

@oscaratnc
Copy link
Author

UPDATE:

if you run my_validator.head() it fails with the following:

Traceback (most recent call last): File "/home/sesa573626/.local/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3444, in run_code exec(code_obj, self.user_global_ns, self.user_ns) File "<ipython-input-1-563cda95a3bb>", line 1, in <module> my_validator.head() File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 1620, in head "fetch_all": fetch_all, File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 364, in get_metric return self.get_metrics({"_": metric})["_"] File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 359, in get_metrics for (metric_name, metric_configuration) in metrics.items() File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 359, in <dictcomp> for (metric_name, metric_configuration) in metrics.items() KeyError: ('table.head', 'batch_id=259f89f71939a89d48df516d3692e50d', '04166707abe073177c1dd922d3584468')

@artemsachuk
Copy link

Hi @oscaratnc
This one helped me to solve the similar issue with running GE in scope of existing spark session on databricks-connect and pycharm:
#3330
https://github.com/great-expectations/great_expectations/pull/3541/commits

so now when I add "force_reuse_spark_context: true" to my execution_engine config everything works fine:
<...>
execution_engine:
module_name: great_expectations.execution_engine
class_name: SparkDFExecutionEngine
force_reuse_spark_context: true
<...>

Hope it helps.

@talagluck talagluck added community devrel This item is being addressed by the Developer Relations Team labels Nov 3, 2021
@talagluck
Copy link
Contributor

Thanks for opening this issue, @oscaratnc, and thanks for your response, @artemsachuk!

Please let us know whether this resolves the issue. Otherwise we will review internally and be in touch soon.

@oscaratnc
Copy link
Author

oscaratnc commented Nov 3, 2021

Hello,

@artemsachuk Thanks for the direction, I made the change in the base.py file class ExecutionEngine but still I am getting the same result.
@talagluck this is what I am running now with the configs:

      base = os.getcwd()
        local_stores_config = {

            'expectations_lcl_store': {'class_name': 'ExpectationsStore',
                                       'store_backend': {
                                           'class_name': 'TupleFilesystemStoreBackend',
                                           'base_directory': base+'/great_expectations/expectations/',
                                       }
                                       },
            'validations_lcl_store': {'class_name': 'ValidationsStore',
                                      'store_backend': {
                                          'class_name': 'TupleFilesystemStoreBackend',
                                          'base_directory': base+'/great_expectations/uncommited/validations/',
                                      }
                                      },
            'checkpoint_lcl_store':
                {'class_name': 'CheckpointStore',
                 'store_backend': {
                     'class_name': 'TupleFilesystemStoreBackend',
                     'base_directory': base+'/great_expectations/checkpoints/',
                 }
                 },
            'evaluation_parameter_store': {'class_name': 'EvaluationParameterStore'}
        }

        data_docs_config_lcl = {
            "lcl_site": {
                "class_name": "SiteBuilder",
                'show_how_to_buttons': False,
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": base+"/great_expectations/uncommitted/data_docs/local_site/"
                },
                "site_index_builder": {
                    "class_name": "DefaultSiteIndexBuilder",
                },
            }
        }

        stores = local_stores_config
        expectations_store_name = "expectations_lcl_store"
        validations_store_name = "validations_lcl_store"
        data_docs_sites = data_docs_config_lcl

        validations_operators = {
            "action_list_operator": {
                "class_name": "ActionListValidationOperator",
                "action_list": [
                    {
                        "name": "store_validation_result",
                        "action": {"class_name": "StoreValidationResultAction"},
                    },
                    {
                        "name": "store_evaluation_params",
                        "action": {"class_name": "StoreEvaluationParametersAction"},
                    },
                    {
                        "name": "update_data_docs",
                        "action": {"class_name": "UpdateDataDocsAction"},
                    },
                ],
            }
        }

        # DataContext Configuration for YAML of Data Context
        data_context_config = DataContextConfig(
            config_version=2,
            plugins_directory=None,
            config_variables_file_path=None,
           # datasources={"SparkDataSource": my_spark_datasource_config},
            validation_operators=validations_operators,
            anonymous_usage_statistics={
                "enabled": True
            },

            stores=stores,
            expectations_store_name=expectations_store_name,
            validations_store_name=validations_store_name,
            evaluation_parameter_store_name="evaluation_parameter_store",
            data_docs_sites=data_docs_sites
        )
        context = BaseDataContext(project_config=data_context_config)

        # Datasource configuration for spark runtime

        datasource_yaml = fr"""
            name: SparkDataSource
            class_name: Datasource
            execution_engine:
                module_name: great_expectations.execution_engine
                class_name: SparkDFExecutionEngine
                force_reuse_spark_context: true
            data_connectors:
                spark_connector:
                    module_name: great_expectations.datasource.data_connector
                    class_name: RuntimeDataConnector
                    batch_identifiers:
                        - batch_data

        """
        context.test_yaml_config(datasource_yaml)
        context.add_datasource(**yaml.load(datasource_yaml))

This is the expectation I am trying to run :
my_validator.expect_column_values_to_be_null('SRCE_LOC')

a this is the result:

Traceback (most recent call last):
  File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 301, in inst_expectation
    raise err
  File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 263, in inst_expectation
    runtime_configuration=basic_runtime_configuration,
  File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/expectations/expectation.py", line 734, in validate
    runtime_configuration=runtime_configuration,
  File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 526, in graph_validate
    raise err
  File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 477, in graph_validate
    runtime_configuration=runtime_configuration,
  File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 704, in resolve_validation_graph
    raise err
  File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 674, in resolve_validation_graph
    runtime_configuration=runtime_configuration,
  File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py", line 762, in _resolve_metrics
    runtime_configuration=runtime_configuration,
  File "/home/sesa573626/.local/lib/python3.7/site-packages/great_expectations/execution_engine/execution_engine.py", line 319, in resolve_metrics
    message=str(e), failed_metrics=[x[0] for x in metric_fn_bundle]
great_expectations.exceptions.exceptions.MetricResolutionError: 'NoneType' object has no attribute 'setCallSite'
Calculating Metrics:  25%|██▌       | 2/8 [00:04<00:14,  2.48s/it]
21/11/03 14:59:11 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-a04e14af-3844-4af5-a0aa-3658f3d093dd. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/blockmgr-a04e14af-3844-4af5-a0aa-3658f3d093dd
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:171)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:110)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:91)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1242)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:181)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:177)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.storage.DiskBlockManager.doStop(DiskBlockManager.scala:177)
	at org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:172)
	at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1840)
	at org.apache.spark.SparkEnv.stop(SparkEnv.scala:165)
	at org.apache.spark.SparkContext.$anonfun$stop$23(SparkContext.scala:2219)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1561)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2219)
	at org.apache.spark.SparkContext.$anonfun$new$38(SparkContext.scala:692)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2146)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

There's again the CallSite Thing, Do you notices something wrong with my configuration?

@talagluck
Copy link
Contributor

Hi @oscaratnc, it appears that you are not passing your Spark config into your ExecutionEngine. This is not a direct example, as it applies to V2, but you can take a look at this description to get a sense of how to do this.

@oscaratnc
Copy link
Author

WOW you're the best! It's working Now Thanks a lot! I think we can close this issue :)

@talagluck
Copy link
Contributor

Great - thanks for letting us know!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community devrel This item is being addressed by the Developer Relations Team
Projects
None yet
Development

No branches or pull requests

3 participants