Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add catalog & database support to create_table #9038

Closed
1 task done
mark-druffel opened this issue Apr 22, 2024 · 12 comments · Fixed by #9042
Closed
1 task done

feat: add catalog & database support to create_table #9038

mark-druffel opened this issue Apr 22, 2024 · 12 comments · Fixed by #9042
Assignees
Labels
feature Features or general enhancements pyspark The Apache PySpark backend

Comments

@mark-druffel
Copy link

mark-druffel commented Apr 22, 2024

What happened?

TLDR

I'm wondering if it's intended that the database argument in create_table works different than the one in drop_table? create_table only accepts a str and drop_table accepts a tuple.

If I set the catalog and database via pyspark, create_table works as excepted, but I can't figure out a way to do so in my create_table, I had to do it through the pyspark session directly:

from pyspark.sql import SparkSession
import ibis
spark = SparkSession.builder.getOrCreate()
ispark = ibis.pyspark.connect(session = spark)
ispark._session.catalog.setCurrentCatalog("comms_media_dev")
ispark._session.catalog.setCurrentDatabase("dart_extensions")
ispark.create_table(name = "raw_camp_info", obj = df, overwrite = True, format="delta")

I can drop a table without accessing the pyspark session:

ispark.drop_table(name = "raw_camp_info", database=tuple(["comms_media_dev", "dart_extensions"]))

Additional Details

To drop my table I can just specify the catalog and database in my call:

from pyspark.sql import SparkSession
import ibis
spark = SparkSession.builder.getOrCreate()
ispark = ibis.pyspark.connect(session = spark)
ispark.drop_table(name = "raw_camp_info", database=tuple(["comms_media_dev", "dart_extensions"]))

Trying the same approach with create_table if fails:

ispark.create_table(name = "raw_camp_info", obj = df, overwrite = True, format="delta", database=tuple(["comms_media_dev", "dart_extensions"]))

py4j.Py4JException: Method setCurrentDatabase([class java.util.ArrayList]) does not exist

I also tried with dot separator:

ispark.create_table(name = "raw_camp_info", obj = df, overwrite = True, format="delta", database="comms_media_dev.dart_extensions")

AnalysisException: too many name parts for schema name: comms_media_dev.dart_extensions

I then tried to set the catalog and provide the database name and got a permissions error. Looking through the error, it looks like create_table didn't pass the database argument because the database was set to default (i.e. comms_media_dev.default:

ispark._session.catalog.setCurrentCatalog("comms_media_dev")
ispark.create_table(name = "raw_camp_info", obj = df, overwrite = True, format="delta", database="dart_extensions")

com.databricks.sql.managedcatalog.acl.UnauthorizedAccessException: PERMISSION_DENIED: User does not have USE SCHEMA on Schema 'comms_media_dev.default'.

Py4JJavaError Traceback (most recent call last)
File :10
8 df = ispark.read_parquet(source = "abfss://my_parquet")
9 df = df.select(_.CAMP_START_DATE, _.CAMP_END_DATE, _.KPM_PROJECT_ID)
---> 10 ispark.create_table(name = "raw_camp_info", obj = df, overwrite = True, format="delta", database="dart_extensions")

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c1f2e765-cb45-4a96-a1dc-bd25ecd4f460/lib/python3.9/site-packages/ibis/backends/pyspark/init.py:453, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
451 self._run_pre_execute_hooks(table)
452 df = self._session.sql(query)
--> 453 df.write.saveAsTable(name, format=format, mode=mode)
454 elif schema is not None:
455 schema = PySparkSchema.from_ibis(schema)

File /usr/lib/python3.9/contextlib.py:124, in _GeneratorContextManager.exit(self, type, value, traceback)
122 if type is None:
123 try:
--> 124 next(self.gen)
125 except StopIteration:
126 return False

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c1f2e765-cb45-4a96-a1dc-bd25ecd4f460/lib/python3.9/site-packages/ibis/backends/pyspark/init.py:234, in Backend._active_database(self, name)
232 yield
233 finally:
--> 234 self._session.catalog.setCurrentDatabase(current)

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function..wrapper(*args, **kwargs)
46 start = time.perf_counter()
47 try:
---> 48 res = func(*args, **kwargs)
49 logger.log_success(
50 module_name, class_name, function_name, time.perf_counter() - start, signature
51 )
52 return res

File /databricks/spark/python/pyspark/sql/catalog.py:185, in Catalog.setCurrentDatabase(self, dbName)
172 def setCurrentDatabase(self, dbName: str) -> None:
173 """
174 Sets the current default database in this session.
175
(...)
183 >>> spark.catalog.setCurrentDatabase("default")
184 """
--> 185 return self._jcatalog.setCurrentDatabase(dbName)

File /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.call(self, *args)
1315 command = proto.CALL_COMMAND_NAME +
1316 self.command_header +
1317 args_command +
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()

File /databricks/spark/python/pyspark/errors/exceptions.py:228, in capture_sql_exception..deco(*a, **kw)
226 def deco(*a: Any, **kw: Any) -> Any:
227 try:
--> 228 return f(*a, **kw)
229 except Py4JJavaError as e:
230 converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o569.setCurrentDatabase.
: com.databricks.sql.managedcatalog.acl.UnauthorizedAccessException: PERMISSION_DENIED: User does not have USE SCHEMA on Schema 'comms_media_dev.default'.
at com.databricks.managedcatalog.UCReliableHttpClient.reliablyAndTranslateExceptions(UCReliableHttpClient.scala:84)
at com.databricks.managedcatalog.UCReliableHttpClient.get(UCReliableHttpClient.scala:136)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.$anonfun$getSchema$1(ManagedCatalogClientImpl.scala:378)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.$anonfun$recordAndWrapException$2(ManagedCatalogClientImpl.scala:3401)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.$anonfun$recordAndWrapException$1(ManagedCatalogClientImpl.scala:3400)
at com.databricks.managedcatalog.ErrorDetailsHandler.wrapServiceException(ErrorDetailsHandler.scala:25)
at com.databricks.managedcatalog.ErrorDetailsHandler.wrapServiceException$(ErrorDetailsHandler.scala:23)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.wrapServiceException(ManagedCatalogClientImpl.scala:106)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.recordAndWrapException(ManagedCatalogClientImpl.scala:3397)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.getSchema(ManagedCatalogClientImpl.scala:374)
at com.databricks.sql.managedcatalog.ManagedCatalogCommon.$anonfun$getSchemaMetadata$5(ManagedCatalogCommon.scala:227)
at scala.Option.getOrElse(Option.scala:189)
at com.databricks.sql.managedcatalog.ManagedCatalogCommon.getSchemaMetadata(ManagedCatalogCommon.scala:227)
at com.databricks.sql.managedcatalog.ManagedCatalogCommon.schemaExists(ManagedCatalogCommon.scala:233)
at com.databricks.sql.managedcatalog.ManagedCatalogSessionCatalog.databaseExists(ManagedCatalogSessionCatalog.scala:631)
at com.databricks.sql.managedcatalog.ManagedCatalogSessionCatalog.requireScExists(ManagedCatalogSessionCatalog.scala:274)
at com.databricks.sql.managedcatalog.ManagedCatalogSessionCatalog.setCurrentDatabase(ManagedCatalogSessionCatalog.scala:492)
at com.databricks.sql.DatabricksCatalogManager.setCurrentNamespace(DatabricksCatalogManager.scala:159)
at org.apache.spark.sql.internal.CatalogImpl.setCurrentDatabase(CatalogImpl.scala:98)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
at java.lang.Thread.run(Thread.java:750)

If I set the catalog and database via pyspark, create_table works as excepted:

ispark._session.catalog.setCurrentCatalog("comms_media_dev")
ispark._session.catalog.setCurrentDatabase("dart_extensions")
ispark.create_table(name = "raw_camp_info", obj = df, overwrite = True, format="delta")

What version of ibis are you using?

https://github.com/ibis-project/ibis.git@93552812ee9e8e0e3397bc226cc20c381fcd173b

What backend(s) are you using, if any?

pyspark

Relevant log output

No response

Code of Conduct

  • I agree to follow this project's Code of Conduct
@mark-druffel mark-druffel added the bug Incorrect behavior inside of ibis label Apr 22, 2024
@gforsyth gforsyth added bug Incorrect behavior inside of ibis feature Features or general enhancements pyspark The Apache PySpark backend and removed bug Incorrect behavior inside of ibis labels Apr 22, 2024
@gforsyth gforsyth self-assigned this Apr 22, 2024
@gforsyth gforsyth moved this from backlog to cooking in Ibis planning and roadmap Apr 22, 2024
@gforsyth gforsyth changed the title bug: add catalog & database support to create_table feat: add catalog & database support to create_table Apr 23, 2024
@gforsyth gforsyth removed the bug Incorrect behavior inside of ibis label Apr 23, 2024
cpcloud pushed a commit that referenced this issue Apr 25, 2024
Resolves #9038 

Adds support for specifying the `catalog` in various `pyspark` calls.  

BREAKING CHANGE: Arguments to `create_database`, `drop_database`, and `get_schema` are now keyword-only except for the `name` args.  Calls to these functions that have relied on positional argument ordering need to be updated.
@github-project-automation github-project-automation bot moved this from cooking to done in Ibis planning and roadmap Apr 25, 2024
@mark-druffel
Copy link
Author

mark-druffel commented Apr 26, 2024

@gforsyth I'm still trying to debug this myself to understand but wanted to post here as well. Let me know if I should open a new issue instead of posting on the closed one.

The fix doesn't seem to be working on my end if I'm using it correctly. I double checked my Ibis version to make sure I'm on the right one, I installed w/ ibis-framework[pyspark] @ git+https://github.com/ibis-project/ibis.git@2c1a58e25575f9f0d9876e37b49154e276558526 and my version shown in the environment was:

Just now (2s)
%pip show ibis-framework
Name: ibis-framework
Version: 9.0.0.dev677
Summary: The portable Python dataframe library
Home-page: https://ibis-project.org
Author: Ibis Maintainers
Author-email: maintainers@ibis-project.org
License: Apache-2.0
Location: /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages
Requires: atpublic, bidict, numpy, pandas, parsy, pyarrow, pyarrow-hotfix, python-dateutil, pytz, rich, sqlglot, toolz, typing-extensions
Required-by: 

I tested w/ the following code and got an error that made me think it was trying to split string to tuple, but I already provided a tuple:

import ibis
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
ispark = ibis.pyspark.connect(session = spark)

df = ispark.read_parquet("abfss://media_meas_campaign_info/")
ispark.create_table(name = "raw_media_meas_campaign_info", obj = df, database=["comms_media_dev", "dart_extensions"], overwrite=True)

ValueError: oops
File , line 7
4 ispark = ibis.pyspark.connect(session = spark)
6 df = ispark.read_parquet("abfss://media_meas_campaign_info/")
----> 7 ispark.create_table(name = "raw_media_meas_campaign_info", obj = df, database=["comms_media_dev", "dart_extensions"], overwrite=True)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/pyspark/init.py:497, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
492 if temp is True:
493 raise NotImplementedError(
494 "PySpark backend does not yet support temporary tables"
495 )
--> 497 table_loc = self._to_sqlglot_table(database)
498 catalog, db = self._to_catalog_db_tuple(table_loc)
500 if obj is not None:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/sql/init.py:561, in SQLBackend._to_sqlglot_table(self, database)
559 database = sg.exp.Table(catalog=catalog, db=db)
560 else:
--> 561 raise ValueError("oops")
563 return database

So I tried providing catalog and database as string with dot separator and my error looks similar to the error I got when I opened the issue initially. It seems like it accepted my catalog argument, but dropped my database argument and substituted default:

import ibis
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
ispark = ibis.pyspark.connect(session = spark)

df = ispark.read_parquet("abfss://media_meas_campaign_info.parquet")
ispark.create_table(name = "raw_media_meas_campaign_info", obj = df, database="comms_media_dev.dart_extensions", overwrite=True)

Py4JJavaError: An error occurred while calling o435.setCurrentDatabase.
: com.databricks.sql.managedcatalog.acl.UnauthorizedAccessException: PERMISSION_DENIED: User does not have USE SCHEMA on Schema 'comms_media_dev.default'.
at com.databricks.managedcatalog.UCReliableHttpClient.reliablyAndTranslateExceptions(UCReliableHttpClient.scala:87)
at com.databricks.managedcatalog.UCReliableHttpClient.get(UCReliableHttpClient.scala:139)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.$anonfun$getSchema$1(ManagedCatalogClientImpl.scala:540)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.$anonfun$recordAndWrapException$2(ManagedCatalogClientImpl.scala:4400)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.$anonfun$recordAndWrapException$1(ManagedCatalogClientImpl.scala:4399)
at com.databricks.managedcatalog.ErrorDetailsHandler.wrapServiceException(ErrorDetailsHandler.scala:25)
at com.databricks.managedcatalog.ErrorDetailsHandler.wrapServiceException$(ErrorDetailsHandler.scala:23)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.wrapServiceException(ManagedCatalogClientImpl.scala:151)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.recordAndWrapException(ManagedCatalogClientImpl.scala:4396)
at com.databricks.managedcatalog.ManagedCatalogClientImpl.getSchema(ManagedCatalogClientImpl.scala:533)
at com.databricks.sql.managedcatalog.ManagedCatalogCommon.shouldUpdateSchemaMetadata(ManagedCatalogCommon.scala:2199)
at com.databricks.sql.managedcatalog.ManagedCatalogCommon.getSchemaMetadataInternal(ManagedCatalogCommon.scala:2652)
at com.databricks.sql.managedcatalog.ManagedCatalogCommon.$anonfun$getSchemaMetadata$3(ManagedCatalogCommon.scala:282)
at scala.Option.getOrElse(Option.scala:189)
at com.databricks.sql.managedcatalog.ManagedCatalogCommon.getSchemaMetadata(ManagedCatalogCommon.scala:282)
at com.databricks.sql.managedcatalog.ManagedCatalogCommon.schemaExists(ManagedCatalogCommon.scala:287)
at com.databricks.sql.managedcatalog.ProfiledManagedCatalog.$anonfun$schemaExists$1(ProfiledManagedCatalog.scala:143)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.catalyst.MetricKeyUtils$.measure(MetricKey.scala:672)
at com.databricks.sql.managedcatalog.ProfiledManagedCatalog.$anonfun$profile$1(ProfiledManagedCatalog.scala:60)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at com.databricks.sql.managedcatalog.ProfiledManagedCatalog.profile(ProfiledManagedCatalog.scala:59)
at com.databricks.sql.managedcatalog.ProfiledManagedCatalog.schemaExists(ProfiledManagedCatalog.scala:143)
at com.databricks.sql.managedcatalog.ManagedCatalogSessionCatalog.databaseExists(ManagedCatalogSessionCatalog.scala:625)
at com.databricks.sql.managedcatalog.ManagedCatalogSessionCatalog.requireScExists(ManagedCatalogSessionCatalog.scala:275)
at com.databricks.sql.managedcatalog.ManagedCatalogSessionCatalog.setCurrentDatabase(ManagedCatalogSessionCatalog.scala:486)
at com.databricks.sql.DatabricksCatalogManager.setCurrentNamespace(DatabricksCatalogManager.scala:156)
at org.apache.spark.sql.internal.CatalogImpl.setCurrentDatabase(CatalogImpl.scala:100)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/pyspark/init.py:240, in Backend._active_database(self, name)
239 self._session.catalog.setCurrentDatabase(name)
--> 240 yield
241 finally:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/pyspark/init.py:507, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
506 df = self._session.sql(query)
--> 507 df.write.saveAsTable(name, format=format, mode=mode)
508 elif schema is not None:
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function..wrapper(*args, **kwargs)
46 try:
---> 47 res = func(*args, **kwargs)
48 logger.log_success(
49 module_name, class_name, function_name, time.perf_counter() - start, signature
50 )
File /databricks/spark/python/pyspark/sql/readwriter.py:1841, in DataFrameWriter.saveAsTable(self, name, format, mode, partitionBy, **options)
1840 self.format(format)
-> 1841 self._jwrite.saveAsTable(name)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.call(self, *args)
1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
1356 answer, self.gateway_client, self.target_id, self.name)
1358 for temp_arg in temp_args:
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception..deco(*a, **kw)
227 if not isinstance(converted, UnknownException):
228 # Hide where the exception came from that shows a non-Pythonic
229 # JVM exception message.
--> 230 raise converted from None
231 else:
AnalysisException: [RequestId=5899623e-983f-4972-812a-dbdc7706c8a3 ErrorClass=INVALID_PARAMETER_VALUE.MANAGED_TABLE_FORMAT] Only Delta is supported for managed tables

During handling of the above exception, another exception occurred:
Py4JJavaError Traceback (most recent call last)
File , line 7
4 ispark = ibis.pyspark.connect(session = spark)
6 df = ispark.read_parquet("abfss://lmedia_meas_campaign_info/")
----> 7 ispark.create_table(name = "raw_media_meas_campaign_info", obj = df, database="comms_media_dev.dart_extensions", overwrite=True)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/pyspark/init.py:504, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
502 query = self.compile(table)
503 mode = "overwrite" if overwrite else "error"
--> 504 with self._active_catalog(catalog), self._active_database(db):
505 self._run_pre_execute_hooks(table)
506 df = self._session.sql(query)
File /usr/lib/python3.10/contextlib.py:153, in _GeneratorContextManager.exit(self, typ, value, traceback)
151 value = typ()
152 try:
--> 153 self.gen.throw(typ, value, traceback)
154 except StopIteration as exc:
155 # Suppress StopIteration unless it's the same exception that
156 # was passed to throw(). This prevents a StopIteration
157 # raised inside the "with" statement from being suppressed.
158 return exc is not value
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/pyspark/init.py:242, in Backend._active_database(self, name)
240 yield
241 finally:
--> 242 self._session.catalog.setCurrentDatabase(current)
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function..wrapper(*args, **kwargs)
45 start = time.perf_counter()
46 try:
---> 47 res = func(*args, **kwargs)
48 logger.log_success(
49 module_name, class_name, function_name, time.perf_counter() - start, signature
50 )
51 return res
File /databricks/spark/python/pyspark/sql/catalog.py:193, in Catalog.setCurrentDatabase(self, dbName)
183 def setCurrentDatabase(self, dbName: str) -> None:
184 """
185 Sets the current default database in this session.
186
(...)
191 >>> spark.catalog.setCurrentDatabase("default")
192 """
--> 193 return self._jcatalog.setCurrentDatabase(dbName)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.call(self, *args)
1349 command = proto.CALL_COMMAND_NAME +
1350 self.command_header +
1351 args_command +
1352 proto.END_COMMAND_PART
1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
1356 answer, self.gateway_client, self.target_id, self.name)
1358 for temp_arg in temp_args:
1359 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:224, in capture_sql_exception..deco(*a, **kw)
222 def deco(*a: Any, **kw: Any) -> Any:
223 try:
--> 224 return f(*a, **kw)
225 except Py4JJavaError as e:
226 converted = convert_exception(e.java_exception)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))

@gforsyth
Copy link
Member

Hey @mark-druffel -- I don't have multiple catalogs set up, so it's very possible I missed something.

That first error you got is because it needs to be a tuple, not a list -- we can almost certainly relax that requirement (and also have a much better error message. So it would be

ispark.create_table(name = "raw_media_meas_campaign_info", obj = df, database=("comms_media_dev", "dart_extensions"), overwrite=True)

That said, the second way should be equivalent to the tuple way, so something is a bit wrong. I will try to figure out what's going sideways.

@mark-druffel
Copy link
Author

mark-druffel commented Apr 26, 2024

That makes sense, I just tried that and error looks the same as the second attempt from above. Please let me know if there's anything I can do to help and thanks so much for your quick response!

ispark.create_table(name = "raw_media_meas_campaign_info", obj = df, database=("comms_media_dev", "dart_extensions"), overwrite=True)
> Py4JJavaError: An error occurred while calling o435.setCurrentDatabase.
> : com.databricks.sql.managedcatalog.acl.UnauthorizedAccessException: PERMISSION_DENIED: User does not have USE SCHEMA on Schema 'comms_media_dev.default'.
> 	at com.databricks.managedcatalog.UCReliableHttpClient.reliablyAndTranslateExceptions(UCReliableHttpClient.scala:87)
> 	at com.databricks.managedcatalog.UCReliableHttpClient.get(UCReliableHttpClient.scala:139)
> 	at com.databricks.managedcatalog.ManagedCatalogClientImpl.$anonfun$getSchema$1(ManagedCatalogClientImpl.scala:540)
> 	at com.databricks.managedcatalog.ManagedCatalogClientImpl.$anonfun$recordAndWrapException$2(ManagedCatalogClientImpl.scala:4400)
> 	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
> 	at com.databricks.managedcatalog.ManagedCatalogClientImpl.$anonfun$recordAndWrapException$1(ManagedCatalogClientImpl.scala:4399)
> 	at com.databricks.managedcatalog.ErrorDetailsHandler.wrapServiceException(ErrorDetailsHandler.scala:25)
> 	at com.databricks.managedcatalog.ErrorDetailsHandler.wrapServiceException$(ErrorDetailsHandler.scala:23)
> 	at com.databricks.managedcatalog.ManagedCatalogClientImpl.wrapServiceException(ManagedCatalogClientImpl.scala:151)
> 	at com.databricks.managedcatalog.ManagedCatalogClientImpl.recordAndWrapException(ManagedCatalogClientImpl.scala:4396)
> 	at com.databricks.managedcatalog.ManagedCatalogClientImpl.getSchema(ManagedCatalogClientImpl.scala:533)
> 	at com.databricks.sql.managedcatalog.ManagedCatalogCommon.shouldUpdateSchemaMetadata(ManagedCatalogCommon.scala:2199)
> 	at com.databricks.sql.managedcatalog.ManagedCatalogCommon.getSchemaMetadataInternal(ManagedCatalogCommon.scala:2652)
> 	at com.databricks.sql.managedcatalog.ManagedCatalogCommon.$anonfun$getSchemaMetadata$3(ManagedCatalogCommon.scala:282)
> 	at scala.Option.getOrElse(Option.scala:189)
> 	at com.databricks.sql.managedcatalog.ManagedCatalogCommon.getSchemaMetadata(ManagedCatalogCommon.scala:282)
> 	at com.databricks.sql.managedcatalog.ManagedCatalogCommon.schemaExists(ManagedCatalogCommon.scala:287)
> 	at com.databricks.sql.managedcatalog.ProfiledManagedCatalog.$anonfun$schemaExists$1(ProfiledManagedCatalog.scala:143)
> 	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
> 	at org.apache.spark.sql.catalyst.MetricKeyUtils$.measure(MetricKey.scala:672)
> 	at com.databricks.sql.managedcatalog.ProfiledManagedCatalog.$anonfun$profile$1(ProfiledManagedCatalog.scala:60)
> 	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
> 	at com.databricks.sql.managedcatalog.ProfiledManagedCatalog.profile(ProfiledManagedCatalog.scala:59)
> 	at com.databricks.sql.managedcatalog.ProfiledManagedCatalog.schemaExists(ProfiledManagedCatalog.scala:143)
> 	at com.databricks.sql.managedcatalog.ManagedCatalogSessionCatalog.databaseExists(ManagedCatalogSessionCatalog.scala:625)
> 	at com.databricks.sql.managedcatalog.ManagedCatalogSessionCatalog.requireScExists(ManagedCatalogSessionCatalog.scala:275)
> 	at com.databricks.sql.managedcatalog.ManagedCatalogSessionCatalog.setCurrentDatabase(ManagedCatalogSessionCatalog.scala:486)
> 	at com.databricks.sql.DatabricksCatalogManager.setCurrentNamespace(DatabricksCatalogManager.scala:156)
> 	at org.apache.spark.sql.internal.CatalogImpl.setCurrentDatabase(CatalogImpl.scala:100)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
> 	at py4j.Gateway.invoke(Gateway.java:306)
> 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
> 	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
> 	at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
> 	at java.lang.Thread.run(Thread.java:750)
> File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:240, in Backend._active_database(self, name)
>     239     self._session.catalog.setCurrentDatabase(name)
> --> 240     yield
>     241 finally:
> File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:507, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
>     506         df = self._session.sql(query)
> --> 507         df.write.saveAsTable(name, format=format, mode=mode)
>     508 elif schema is not None:
> File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
>      46 try:
> ---> 47     res = func(*args, **kwargs)
>      48     logger.log_success(
>      49         module_name, class_name, function_name, time.perf_counter() - start, signature
>      50     )
> File /databricks/spark/python/pyspark/sql/readwriter.py:1841, in DataFrameWriter.saveAsTable(self, name, format, mode, partitionBy, **options)
>    1840     self.format(format)
> -> 1841 self._jwrite.saveAsTable(name)
> File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
>    1354 answer = self.gateway_client.send_command(command)
> -> 1355 return_value = get_return_value(
>    1356     answer, self.gateway_client, self.target_id, self.name)
>    1358 for temp_arg in temp_args:
> File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception.<locals>.deco(*a, **kw)
>     227 if not isinstance(converted, UnknownException):
>     228     # Hide where the exception came from that shows a non-Pythonic
>     229     # JVM exception message.
> --> 230     raise converted from None
>     231 else:
> AnalysisException: [RequestId=952d4d82-e41a-4892-83ba-d52cbbfce80e ErrorClass=INVALID_PARAMETER_VALUE.MANAGED_TABLE_FORMAT] Only Delta is supported for managed tables
> 
> During handling of the above exception, another exception occurred:
> Py4JJavaError                             Traceback (most recent call last)
> File <command-1657592028371427>, line 7
>       4 ispark = ibis.pyspark.connect(session = spark)
>       6 df = ispark.read_parquet("abfss://media_meas_campaign_info")
> ----> 7 ispark.create_table(name = "raw_media_meas_campaign_info", obj = df, database=("comms_media_dev", "dart_extensions"), overwrite=True)
> File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:504, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
>     502 query = self.compile(table)
>     503 mode = "overwrite" if overwrite else "error"
> --> 504 with self._active_catalog(catalog), self._active_database(db):
>     505     self._run_pre_execute_hooks(table)
>     506     df = self._session.sql(query)
> File /usr/lib/python3.10/contextlib.py:153, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
>     151     value = typ()
>     152 try:
> --> 153     self.gen.throw(typ, value, traceback)
>     154 except StopIteration as exc:
>     155     # Suppress StopIteration *unless* it's the same exception that
>     156     # was passed to throw().  This prevents a StopIteration
>     157     # raised inside the "with" statement from being suppressed.
>     158     return exc is not value
> File /local_disk0/.ephemeral_nfs/envs/pythonEnv-59b1aa40-b629-4cf3-82ae-6f44d5b1b2f6/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:242, in Backend._active_database(self, name)
>     240     yield
>     241 finally:
> --> 242     self._session.catalog.setCurrentDatabase(current)
> File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
>      45 start = time.perf_counter()
>      46 try:
> ---> 47     res = func(*args, **kwargs)
>      48     logger.log_success(
>      49         module_name, class_name, function_name, time.perf_counter() - start, signature
>      50     )
>      51     return res
> File /databricks/spark/python/pyspark/sql/catalog.py:193, in Catalog.setCurrentDatabase(self, dbName)
>     183 def setCurrentDatabase(self, dbName: str) -> None:
>     184     """
>     185     Sets the current default database in this session.
>     186 
>    (...)
>     191     >>> spark.catalog.setCurrentDatabase("default")
>     192     """
> --> 193     return self._jcatalog.setCurrentDatabase(dbName)
> File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
>    1349 command = proto.CALL_COMMAND_NAME +\
>    1350     self.command_header +\
>    1351     args_command +\
>    1352     proto.END_COMMAND_PART
>    1354 answer = self.gateway_client.send_command(command)
> -> 1355 return_value = get_return_value(
>    1356     answer, self.gateway_client, self.target_id, self.name)
>    1358 for temp_arg in temp_args:
>    1359     if hasattr(temp_arg, "_detach"):
> File /databricks/spark/python/pyspark/errors/exceptions/captured.py:224, in capture_sql_exception.<locals>.deco(*a, **kw)
>     222 def deco(*a: Any, **kw: Any) -> Any:
>     223     try:
> --> 224         return f(*a, **kw)
>     225     except Py4JJavaError as e:
>     226         converted = convert_exception(e.java_exception)
> File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
>     324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>     325 if answer[1] == REFERENCE_TYPE:
> --> 326     raise Py4JJavaError(
>     327         "An error occurred while calling {0}{1}{2}.\n".
>     328         format(target_id, ".", name), value)
>     329 else:
>     330     raise Py4JError(
>     331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
>     332         format(target_id, ".", name, value))

@gforsyth
Copy link
Member

Question that might seem a bit odd, but does the table show up in the appropriate place in spite of the error message?
In the output of something like:

ispark.list_tables(database=("comms_media_dev", "dart_extensions"))

@mark-druffel
Copy link
Author

It does show up
image

@gforsyth
Copy link
Member

Ok, I think I have a fix for this.

This is a horrible bit of bookkeeping. For context, this is what is happening:

We set the catalog using a context manager, and we set the database also using a context manager. Currently what is happening to you is this weird edge case:

set catalog to comms_media_dev (succeeds)
set database to dart_extensions within comms_media_dev (succeeds)

then we write the table, great! Now we try to change catalog and database back in reverse order and...

set database to default (the previous value that we saved) within comms_media_dev (fails, you do not have permission to access that)

set catalog back to spark_catalog (or previous value, but we never get here because of the previous error.

So I think what we need to do is instead:

set catalog
set database
write table
set catalog back
set database back

It would be really great if spark would allow for setting both of these values at the same time, but that is apparently not a thing.

@mark-druffel
Copy link
Author

Not sure if this helps at all, but if I set the catalog & db from the spark session and pass the parameter and it appears the db switches back to default.
image

@mark-druffel
Copy link
Author

^ Sorry disregard didn't see your last pop through. Yea spark not allowing both at the same time is really annoying imho

@gforsyth
Copy link
Member

If you want to try out that PR, @mark-druffel, that would be a huge help until I can get a much more complicated pyspark testing setup put together.

@mark-druffel
Copy link
Author

mark-druffel commented Apr 26, 2024

Sorry for the delay, databricks takes forever to start... Now it says the schema can't be found, but I provided an obj parameter 🤔 I also added format='delta' because I did get an error w/o it this time saying managed tables must be in delta format. I had forgotten to add it on prior tests, but that's a valid error which I wasn't getting before.

import ibis
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
ispark = ibis.pyspark.connect(session = spark)
df = ispark.read_parquet("abfss://media_meas_campaign_info/")

print(f"Tables: {ispark.list_tables(database = ('comms_media_dev','dart_extensions'))}\n")
print(f"Current Catalog: {ispark._session.catalog.currentCatalog()}\n")
print(f"Current Database: {ispark._session.catalog.currentDatabase()}\n")
ispark.create_table(name = "raw_media_meas_campaign_info", obj = df, database=('comms_media_dev','dart_extensions'), overwrite=True, format = "delta")
print(f"Current Catalog: {ispark._session.catalog.currentCatalog()}\n")
print(f"Current Database: {ispark._session.catalog.currentDatabase()}\n")
Tables: ['ibis_read_parquet_472gdhsajjakrgoq2mzf7ffz7u', 'ibis_read_parquet_73xgg7oaunet5oyv5rmderp7wa', 'ibis_read_parquet_dzbw5jngqngsxpg6ug7u266w2i', 'ibis_read_parquet_g2kop6usdncf3k67qgk4i7igpi', 'ibis_read_parquet_j6q3xnj7uzcg5ecfsmdty6l4xa', 'ibis_read_parquet_wifrr4hijbevvdhhlv5kivn2ey', 'ibis_read_parquet_xzhqoneiorfqhfiqdk7nqmpe4u', 'raw_media_meas_offer_info', 'raw_target_history', 'standardized_media_meas_campaign_info', 'standardized_media_meas_offer_info', 'standardized_target_history']

Current Catalog: hive_metastore

Current Database: default

[[SCHEMA_NOT_FOUND](https://docs.microsoft.com/azure/databricks/error-messages/error-classes#schema_not_found)] The schema `dart_extensions` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog.
To tolerate the error on drop use DROP SCHEMA IF EXISTS. SQLSTATE: 42704
File <command-4437199335976496>, line 10
      8 print(f"Current Catalog: {ispark._session.catalog.currentCatalog()}\n")
      9 print(f"Current Database: {ispark._session.catalog.currentDatabase()}\n")
---> 10 ispark.create_table(name = "raw_media_meas_campaign_info", obj = df, database=('comms_media_dev','dart_extensions'), overwrite=True, format = "delta")
     11 print(f"Current Catalog: {ispark._session.catalog.currentCatalog()}\n")
     12 print(f"Current Database: {ispark._session.catalog.currentDatabase()}\n")
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-10d3656f-1fae-4528-917f-49d0869552d4/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:532, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
    529 else:
    530     raise com.IbisError("The schema or obj parameter is required")
--> 532 return self.table(name, database=db)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-10d3656f-1fae-4528-917f-49d0869552d4/lib/python3.10/site-packages/ibis/backends/sql/__init__.py:137, in SQLBackend.table(self, name, schema, database)
    134     catalog = table_loc.catalog or None
    135     database = table_loc.db or None
--> 137 table_schema = self.get_schema(name, catalog=catalog, database=database)
    138 return ops.DatabaseTable(
    139     name,
    140     schema=table_schema,
    141     source=self,
    142     namespace=ops.Namespace(catalog=catalog, database=database),
    143 ).to_expr()
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-10d3656f-1fae-4528-917f-49d0869552d4/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:459, in Backend.get_schema(self, table_name, catalog, database)
    457 table_loc = self._to_sqlglot_table((catalog, database))
    458 catalog, db = self._to_catalog_db_tuple(table_loc)
--> 459 with self._active_catalog_database(catalog, db):
    460     df = self._session.table(table_name)
    461     struct = PySparkType.to_ibis(df.schema)
File /usr/lib/python3.10/contextlib.py:135, in _GeneratorContextManager.__enter__(self)
    133 del self.args, self.kwds, self.func
    134 try:
--> 135     return next(self.gen)
    136 except StopIteration:
    137     raise RuntimeError("generator didn't yield") from None
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-10d3656f-1fae-4528-917f-49d0869552d4/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:254, in Backend._active_catalog_database(self, catalog, db)
    252     if not PYSPARK_LT_34 and catalog is not None:
    253         self._session.catalog.setCurrentCatalog(catalog)
--> 254     self._session.catalog.setCurrentDatabase(db)
    255     yield
    256 finally:
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res
File /databricks/spark/python/pyspark/sql/catalog.py:193, in Catalog.setCurrentDatabase(self, dbName)
    183 def setCurrentDatabase(self, dbName: str) -> None:
    184     """
    185     Sets the current default database in this session.
    186 
   (...)
    191     >>> spark.catalog.setCurrentDatabase("default")
    192     """
--> 193     return self._jcatalog.setCurrentDatabase(dbName)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
   1349 command = proto.CALL_COMMAND_NAME +\
   1350     self.command_header +\
   1351     args_command +\
>    1352     proto.END_COMMAND_PART
>    1354 answer = self.gateway_client.send_command(command)
> -> 1355 return_value = get_return_value(
>    1356     answer, self.gateway_client, self.target_id, self.name)
>    1358 for temp_arg in temp_args:
>    1359     if hasattr(temp_arg, "_detach"):
> File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception.<locals>.deco(*a, **kw)
>     226 converted = convert_exception(e.java_exception)
>     227 if not isinstance(converted, UnknownException):
>     228     # Hide where the exception came from that shows a non-Pythonic
>     229     # JVM exception message.
> --> 230     raise converted from None
>     231 else:
>     232     raise

@gforsyth
Copy link
Member

gforsyth commented Apr 29, 2024

Hey @mark-druffel -- let's take the conversation over to #9042 -- I think I know what I missed in that PR that is still causing you errors. Thanks for helping us test this out!

EDIT: let's continue over in #9067 where I'm trying to fix this

@gforsyth
Copy link
Member

Hey @mark-druffel -- we've merged in my fixes from #9067 so hopefully main will be working now -- definitely let us know if things are still failing, and thanks for your help in testing this out!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Features or general enhancements pyspark The Apache PySpark backend
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

2 participants