Skip to content

Commit

Permalink
Parameterize and fix TestFunctionCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
kbendick committed Jul 26, 2022
1 parent b437f33 commit 1e21a4b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -31,7 +30,6 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
Expand All @@ -41,18 +39,29 @@
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.collection.JavaConverters;

@RunWith(Parameterized.class)
public class TestFunctionCatalog extends SparkCatalogTestBase {
// TODO - Add tests for SparkCatalogConfig.SPARK once the `system` namespace is resolvable from the session catalog.
@Parameterized.Parameters(name = "catalogConfig = {0}")
public static Object[][] parameters(){
return new Object[][]{
{SparkCatalogConfig.HADOOP},
{SparkCatalogConfig.HIVE}
};
}

private static final Namespace NS = Namespace.of("db");
private final String fullNamespace;
private final FunctionCatalog asFunctionCatalog;

public TestFunctionCatalog(String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
public TestFunctionCatalog(SparkCatalogConfig catalogConfig) {
super(catalogConfig);
this.fullNamespace = ("spark_catalog".equals(catalogName) ? "" : catalogName + ".") + NS;
this.asFunctionCatalog = castToFunctionCatalog(catalogName);
}
Expand All @@ -70,14 +79,28 @@ public void cleanNamespaces() {
@Test
public void testLoadAndListFunctionsFromSystemNamespaces() throws NoSuchFunctionException, NoSuchNamespaceException {
String[] namespace = {"system"};
UnboundFunction truncateFunc = asFunctionCatalog.loadFunction(Identifier.of(namespace, "truncate"));
Identifier identifier = Identifier.of(new String[]{"system"}, "truncate");

UnboundFunction truncateFunc = asFunctionCatalog.loadFunction(identifier);
Assert.assertNotNull("truncate function should be loadable via the FunctionCatalog", truncateFunc);
Identifier[] identifiers = asFunctionCatalog.listFunctions(namespace);
Assert.assertTrue("Functions listed from the system namespace should not be empty",
identifiers.length > 0);
List<String> functionNames = Arrays.stream(identifiers).map(Identifier::name).collect(Collectors.toList());
Assertions.assertThat(functionNames).hasSameElementsAs(SparkFunctions.list());

ScalarFunction<Integer> boundTruncate = (ScalarFunction<Integer>) truncateFunc.bind(
new StructType()
.add("width", DataTypes.IntegerType)
.add("value", DataTypes.IntegerType));

Object width = Integer.valueOf(10);
Object toTruncate = Integer.valueOf(9);
Assert.assertEquals("Binding the truncate function from the function catalog should produce a useable function",
Integer.valueOf(0),
boundTruncate.produceResult(
InternalRow.fromSeq(
JavaConverters.asScalaBufferConverter(ImmutableList.of(width, toTruncate)).asScala().toSeq())));
}

@Test
Expand All @@ -100,29 +123,6 @@ public void testUndefinedFunction() {
);
}

@Test
public void testSessionCatalogCanUseFunctionCatalogIfNotUsingSQL() throws NoSuchFunctionException, ParseException {
Assume.assumeTrue("spark_catalog".equals(catalogName));

SparkSessionCatalog sessionCatalog =
(SparkSessionCatalog) Spark3Util.catalogAndIdentifier(spark, "spark_catalog").catalog();

Identifier identifier = Identifier.of(new String[]{"system"}, "truncate");

ScalarFunction<Integer> integerTruncateFunc = (ScalarFunction<Integer>) sessionCatalog.loadFunction(identifier)
.bind(new StructType()
.add("width", DataTypes.IntegerType)
.add("value", DataTypes.IntegerType));

Object width = Integer.valueOf(10);
Object toTruncate = Integer.valueOf(9);
Assert.assertEquals("Using the truncate function from the session catalog should work if not using SQL",
Integer.valueOf(0),
integerTruncateFunc.produceResult(
InternalRow.fromSeq(
JavaConverters.asScalaBufferConverter(ImmutableList.of(width, toTruncate)).asScala().toSeq())));
}

private FunctionCatalog castToFunctionCatalog(String name) {
return (FunctionCatalog) spark.sessionState().catalogManager().catalog(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
@RunWith(Parameterized.class)
public class TestSparkTruncateFunction extends SparkTestBaseWithCatalog {

// When using the SparkSessionCatalog, Spark's analyzer requires that the database be resolvable.
// TODO - Add tests for SparkCatalogConfig.SPARK once the `system` namespace is resolvable from the session catalog.
@Parameterized.Parameters(name = "catalogConfig = {0}")
public static Object[][] parameters(){
Expand Down

0 comments on commit 1e21a4b

Please sign in to comment.