Skip to content

Commit

Permalink
[SPARK-49791][SQL] Make DelegatingCatalogExtension more extendable
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR updates `DelegatingCatalogExtension` so that it's more extendable
- `initialize` becomes not final, so that sub-classes can overwrite it
- `delegate` becomes `protected`, so that sub-classes can access it

In addition, this PR fixes a mistake that `DelegatingCatalogExtension` is just a convenient default implementation, it's actually the `CatalogExtension` interface that indicates this catalog implementation will delegate requests to the Spark session catalog. apache/spark#47724 should use `CatalogExtension` instead.

### Why are the changes needed?

Unblock the Iceberg extension.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #48257 from cloud-fan/catalog.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
a0x8o and cloud-fan committed Sep 26, 2024
1 parent 2b32664 commit 4f56fae
Show file tree
Hide file tree
Showing 879 changed files with 36,864 additions and 9,989 deletions.
4 changes: 2 additions & 2 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
<!--
Because we don't shade dependencies anymore, we need to restore Guava to compile scope so
that the libraries Spark depend on have it available. We'll package the version that Spark
uses (14.0.1) which is not the same as Hadoop dependencies, but works.
uses which is not the same as Hadoop dependencies, but works.
-->
<dependency>
<groupId>com.google.guava</groupId>
Expand Down Expand Up @@ -200,7 +200,7 @@
<configuration>
<executable>cp</executable>
<arguments>
<argument>${basedir}/../connector/connect/client/jvm/target/spark-connect-client-jvm_${scala.binary.version}-${version}.jar</argument>
<argument>${basedir}/../connector/connect/client/jvm/target/spark-connect-client-jvm_${scala.binary.version}-${project.version}.jar</argument>
<argument>${basedir}/target/scala-${scala.binary.version}/jars/connect-repl</argument>
</arguments>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static int lowercaseMatchLengthFrom(
}
// Compare the characters in the target and pattern strings.
int matchLength = 0, codePointBuffer = -1, targetCodePoint, patternCodePoint;
while (targetIterator.hasNext() && patternIterator.hasNext()) {
while ((targetIterator.hasNext() || codePointBuffer != -1) && patternIterator.hasNext()) {
if (codePointBuffer != -1) {
targetCodePoint = codePointBuffer;
codePointBuffer = -1;
Expand Down Expand Up @@ -211,7 +211,7 @@ private static int lowercaseMatchLengthUntil(
}
// Compare the characters in the target and pattern strings.
int matchLength = 0, codePointBuffer = -1, targetCodePoint, patternCodePoint;
while (targetIterator.hasNext() && patternIterator.hasNext()) {
while ((targetIterator.hasNext() || codePointBuffer != -1) && patternIterator.hasNext()) {
if (codePointBuffer != -1) {
targetCodePoint = codePointBuffer;
codePointBuffer = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Stream;

import com.ibm.icu.text.CollationKey;
import com.ibm.icu.text.Collator;
import com.ibm.icu.text.RuleBasedCollator;
import com.ibm.icu.text.StringSearch;
import com.ibm.icu.util.ULocale;
import com.ibm.icu.text.CollationKey;
import com.ibm.icu.text.Collator;
import com.ibm.icu.util.VersionInfo;

import org.apache.spark.SparkException;
import org.apache.spark.unsafe.types.UTF8String;
Expand Down Expand Up @@ -88,6 +90,17 @@ public Optional<String> getVersion() {
}
}

public record CollationMeta(
String catalog,
String schema,
String collationName,
String language,
String country,
String icuVersion,
String padAttribute,
boolean accentSensitivity,
boolean caseSensitivity) { }

/**
* Entry encapsulating all information about a collation.
*/
Expand Down Expand Up @@ -342,6 +355,23 @@ private static int collationNameToId(String collationName) throws SparkException
}

protected abstract Collation buildCollation();

protected abstract CollationMeta buildCollationMeta();

static List<CollationIdentifier> listCollations() {
return Stream.concat(
CollationSpecUTF8.listCollations().stream(),
CollationSpecICU.listCollations().stream()).toList();
}

static CollationMeta loadCollationMeta(CollationIdentifier collationIdentifier) {
CollationMeta collationSpecUTF8 =
CollationSpecUTF8.loadCollationMeta(collationIdentifier);
if (collationSpecUTF8 == null) {
return CollationSpecICU.loadCollationMeta(collationIdentifier);
}
return collationSpecUTF8;
}
}

private static class CollationSpecUTF8 extends CollationSpec {
Expand All @@ -364,6 +394,9 @@ private enum CaseSensitivity {
*/
private static final int CASE_SENSITIVITY_MASK = 0b1;

private static final String UTF8_BINARY_COLLATION_NAME = "UTF8_BINARY";
private static final String UTF8_LCASE_COLLATION_NAME = "UTF8_LCASE";

private static final int UTF8_BINARY_COLLATION_ID =
new CollationSpecUTF8(CaseSensitivity.UNSPECIFIED).collationId;
private static final int UTF8_LCASE_COLLATION_ID =
Expand Down Expand Up @@ -406,7 +439,7 @@ private static CollationSpecUTF8 fromCollationId(int collationId) {
protected Collation buildCollation() {
if (collationId == UTF8_BINARY_COLLATION_ID) {
return new Collation(
"UTF8_BINARY",
UTF8_BINARY_COLLATION_NAME,
PROVIDER_SPARK,
null,
UTF8String::binaryCompare,
Expand All @@ -417,7 +450,7 @@ protected Collation buildCollation() {
/* supportsLowercaseEquality = */ false);
} else {
return new Collation(
"UTF8_LCASE",
UTF8_LCASE_COLLATION_NAME,
PROVIDER_SPARK,
null,
CollationAwareUTF8String::compareLowerCase,
Expand All @@ -428,6 +461,52 @@ protected Collation buildCollation() {
/* supportsLowercaseEquality = */ true);
}
}

@Override
protected CollationMeta buildCollationMeta() {
if (collationId == UTF8_BINARY_COLLATION_ID) {
return new CollationMeta(
CATALOG,
SCHEMA,
UTF8_BINARY_COLLATION_NAME,
/* language = */ null,
/* country = */ null,
/* icuVersion = */ null,
COLLATION_PAD_ATTRIBUTE,
/* accentSensitivity = */ true,
/* caseSensitivity = */ true);
} else {
return new CollationMeta(
CATALOG,
SCHEMA,
UTF8_LCASE_COLLATION_NAME,
/* language = */ null,
/* country = */ null,
/* icuVersion = */ null,
COLLATION_PAD_ATTRIBUTE,
/* accentSensitivity = */ true,
/* caseSensitivity = */ false);
}
}

static List<CollationIdentifier> listCollations() {
CollationIdentifier UTF8_BINARY_COLLATION_IDENT =
new CollationIdentifier(PROVIDER_SPARK, UTF8_BINARY_COLLATION_NAME, "1.0");
CollationIdentifier UTF8_LCASE_COLLATION_IDENT =
new CollationIdentifier(PROVIDER_SPARK, UTF8_LCASE_COLLATION_NAME, "1.0");
return Arrays.asList(UTF8_BINARY_COLLATION_IDENT, UTF8_LCASE_COLLATION_IDENT);
}

static CollationMeta loadCollationMeta(CollationIdentifier collationIdentifier) {
try {
int collationId = CollationSpecUTF8.collationNameToId(
collationIdentifier.name, collationIdentifier.name.toUpperCase());
return CollationSpecUTF8.fromCollationId(collationId).buildCollationMeta();
} catch (SparkException ignored) {
// ignore
return null;
}
}
}

private static class CollationSpecICU extends CollationSpec {
Expand Down Expand Up @@ -684,6 +763,20 @@ protected Collation buildCollation() {
/* supportsLowercaseEquality = */ false);
}

@Override
protected CollationMeta buildCollationMeta() {
return new CollationMeta(
CATALOG,
SCHEMA,
collationName(),
ICULocaleMap.get(locale).getDisplayLanguage(),
ICULocaleMap.get(locale).getDisplayCountry(),
VersionInfo.ICU_VERSION.toString(),
COLLATION_PAD_ATTRIBUTE,
accentSensitivity == AccentSensitivity.AS,
caseSensitivity == CaseSensitivity.CS);
}

/**
* Compute normalized collation name. Components of collation name are given in order:
* - Locale name
Expand All @@ -704,6 +797,37 @@ private String collationName() {
}
return builder.toString();
}

private static List<String> allCollationNames() {
List<String> collationNames = new ArrayList<>();
for (String locale: ICULocaleToId.keySet()) {
// CaseSensitivity.CS + AccentSensitivity.AS
collationNames.add(locale);
// CaseSensitivity.CS + AccentSensitivity.AI
collationNames.add(locale + "_AI");
// CaseSensitivity.CI + AccentSensitivity.AS
collationNames.add(locale + "_CI");
// CaseSensitivity.CI + AccentSensitivity.AI
collationNames.add(locale + "_CI_AI");
}
return collationNames.stream().sorted().toList();
}

static List<CollationIdentifier> listCollations() {
return allCollationNames().stream().map(name ->
new CollationIdentifier(PROVIDER_ICU, name, VersionInfo.ICU_VERSION.toString())).toList();
}

static CollationMeta loadCollationMeta(CollationIdentifier collationIdentifier) {
try {
int collationId = CollationSpecICU.collationNameToId(
collationIdentifier.name, collationIdentifier.name.toUpperCase());
return CollationSpecICU.fromCollationId(collationId).buildCollationMeta();
} catch (SparkException ignored) {
// ignore
return null;
}
}
}

/**
Expand All @@ -730,9 +854,12 @@ public CollationIdentifier identifier() {
}
}

public static final String CATALOG = "SYSTEM";
public static final String SCHEMA = "BUILTIN";
public static final String PROVIDER_SPARK = "spark";
public static final String PROVIDER_ICU = "icu";
public static final List<String> SUPPORTED_PROVIDERS = List.of(PROVIDER_SPARK, PROVIDER_ICU);
public static final String COLLATION_PAD_ATTRIBUTE = "NO_PAD";

public static final int UTF8_BINARY_COLLATION_ID =
Collation.CollationSpecUTF8.UTF8_BINARY_COLLATION_ID;
Expand Down Expand Up @@ -794,6 +921,18 @@ public static int collationNameToId(String collationName) throws SparkException
return Collation.CollationSpec.collationNameToId(collationName);
}

/**
* Returns whether the ICU collation is not Case Sensitive Accent Insensitive
* for the given collation id.
* This method is used in expressions which do not support CS_AI collations.
*/
public static boolean isCaseSensitiveAndAccentInsensitive(int collationId) {
return Collation.CollationSpecICU.fromCollationId(collationId).caseSensitivity ==
Collation.CollationSpecICU.CaseSensitivity.CS &&
Collation.CollationSpecICU.fromCollationId(collationId).accentSensitivity ==
Collation.CollationSpecICU.AccentSensitivity.AI;
}

public static void assertValidProvider(String provider) throws SparkException {
if (!SUPPORTED_PROVIDERS.contains(provider.toLowerCase())) {
Map<String, String> params = Map.of(
Expand Down Expand Up @@ -923,4 +1062,12 @@ public static String getClosestSuggestionsOnInvalidName(

return String.join(", ", suggestions);
}

public static List<CollationIdentifier> listCollations() {
return Collation.CollationSpec.listCollations();
}

public static CollationMeta loadCollationMeta(CollationIdentifier collationIdentifier) {
return Collation.CollationSpec.loadCollationMeta(collationIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ public void testStartsWith() throws SparkException {
assertStartsWith("İonic", "Io", "UTF8_LCASE", false);
assertStartsWith("İonic", "i\u0307o", "UTF8_LCASE", true);
assertStartsWith("İonic", "İo", "UTF8_LCASE", true);
assertStartsWith("oİ", "oİ", "UTF8_LCASE", true);
assertStartsWith("oİ", "oi̇", "UTF8_LCASE", true);
// Conditional case mapping (e.g. Greek sigmas).
assertStartsWith("σ", "σ", "UTF8_BINARY", true);
assertStartsWith("σ", "ς", "UTF8_BINARY", false);
Expand Down Expand Up @@ -880,6 +882,8 @@ public void testEndsWith() throws SparkException {
assertEndsWith("the İo", "Io", "UTF8_LCASE", false);
assertEndsWith("the İo", "i\u0307o", "UTF8_LCASE", true);
assertEndsWith("the İo", "İo", "UTF8_LCASE", true);
assertEndsWith("İo", "İo", "UTF8_LCASE", true);
assertEndsWith("İo", "i̇o", "UTF8_LCASE", true);
// Conditional case mapping (e.g. Greek sigmas).
assertEndsWith("σ", "σ", "UTF8_BINARY", true);
assertEndsWith("σ", "ς", "UTF8_BINARY", false);
Expand Down
Loading

0 comments on commit 4f56fae

Please sign in to comment.