Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Sample PR|WIP] Spark View Catalog with HMS and Coral #297

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ allprojects {
url 'https://linkedin.jfrog.io/artifactory/avro-util/'
}
maven {
url 'https://linkedin.bintray.com/maven/'
url uri('/Users/jiefli/.m2/repository')
}
}

Expand Down
25 changes: 25 additions & 0 deletions coral-spark-catalog/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
dependencies {
compile project(':coral-spark')
compile(deps.'linkedin-calcite-core') {
artifact {
name = 'calcite-core'
extension = 'jar'
type = 'jar'
classifier = 'shaded'
}
}

compile 'org.apache.spark:spark-hive_2.12:3.1.3-SNAPSHOT'

testCompile('org.apache.hive:hive-metastore:2.3.7') {
// avro-tools brings in whole bunch of hadoop classes causing duplicates and conflicts
exclude group: 'org.apache.avro', module: 'avro-tools'
}
testCompile('org.apache.hive:hive-exec:2.3.7:core') {
exclude group: 'org.apache.avro', module: 'avro-tools'
// These exclusions are required to prevent duplicate classes since we include
// shaded jar above
exclude group: 'org.apache.calcite', module: 'calcite-core'
exclude group: 'org.apache.calcite', module: 'calcite-avatica'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright 2022 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.spark.catalog;

import java.util.Map;

import com.google.common.collect.ImmutableMap;

import org.apache.spark.sql.connector.catalog.View;
import org.apache.spark.sql.types.StructType;


public class CoralSparkView implements View {

private final String name;
private final String sql;
private final StructType viewSchema;
private final String catalogName;
private final String[] namespace;

public CoralSparkView(String name, String sql, StructType viewSchema, String catalogName, String[] namespace) {
this.name = name;
this.sql = sql;
this.viewSchema = viewSchema;
this.catalogName = catalogName;
this.namespace = namespace;
}

@Override
public String name() {
return name;
}

@Override
public String sql() {
return sql;
}

@Override
public StructType schema() {
return viewSchema;
}

@Override
public String[] columnAliases() {
return new String[0];
}

@Override
public String[] columnComments() {
return new String[0];
}

@Override
public String currentCatalog() {
return catalogName;
}

@Override
public String[] currentNamespace() {
return namespace;
}

@Override
public Map<String, String> properties() {
return ImmutableMap.of();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/**
* Copyright 2018-2022 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.spark.catalog;

import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.avro.Schema;
import org.apache.calcite.rel.RelNode;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.catalyst.FunctionIdentifier;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
import org.apache.spark.sql.catalyst.catalog.FunctionResource;
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType;
import org.apache.spark.sql.connector.catalog.CatalogExtension;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.View;
import org.apache.spark.sql.connector.catalog.ViewCatalog;
import org.apache.spark.sql.connector.catalog.ViewChange;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;
import org.apache.thrift.TException;

import scala.Option;
import scala.collection.JavaConverters;

import com.linkedin.coral.common.HiveMscAdapter;
import com.linkedin.coral.hive.hive2rel.HiveToRelConverter;
import com.linkedin.coral.schema.avro.ViewToAvroSchemaConverter;
import com.linkedin.coral.spark.CoralSpark;
import com.linkedin.coral.spark.containers.SparkUDFInfo;


public class CoralSparkViewCatalog<T extends TableCatalog & SupportsNamespaces>
implements ViewCatalog, CatalogExtension {
private String catalogName;
private T sessionCatalog;
private IMetaStoreClient hiveClient;
private HiveToRelConverter hiveToRelConverter;
private ViewToAvroSchemaConverter viewToAvroSchemaConverter;

@Override
public Identifier[] listViews(String... strings) throws NoSuchNamespaceException {
return new Identifier[0];
}

@Override
public View loadView(Identifier identifier) {
String db = identifier.namespace()[0];
String tbl = identifier.name();

final Table table;
try {
table = hiveClient.getTable(db, tbl);
} catch (TException e) {
return null;
}
if (!table.getTableType().equalsIgnoreCase("VIRTUAL_VIEW")) {
return null;
}

try {
boolean forceLowercaseSchema = (Boolean) SparkSession.active().conf().get(SQLConf.FORCE_LOWERCASE_DALI_SCHEMA());
final Schema avroSchema = viewToAvroSchemaConverter.toAvroSchema(db, tbl, false, forceLowercaseSchema);
final DataType schema = SchemaConverters.toSqlType(avroSchema).dataType();
final RelNode relNode = hiveToRelConverter.convertView(db, tbl);
final CoralSpark coralSpark = CoralSpark.create(relNode, avroSchema);
registerUDFs(coralSpark.getSparkUDFInfoList());
return new CoralSparkView(tbl, coralSpark.getSparkSql(), (StructType) schema, name(), new String[] { db });
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

@Override
public View createView(Identifier identifier, String s, String s1, String[] strings, StructType structType,
String[] strings1, String[] strings2, Map<String, String> map)
throws ViewAlreadyExistsException, NoSuchNamespaceException {
return null;
}

@Override
public View alterView(Identifier identifier, ViewChange... viewChanges)
throws NoSuchViewException, IllegalArgumentException {
return null;
}

@Override
public boolean dropView(Identifier identifier) {
return false;
}

@Override
public void renameView(Identifier identifier, Identifier identifier1)
throws NoSuchViewException, ViewAlreadyExistsException {

}

private void registerUDFs(List<SparkUDFInfo> sparkUDFInfoList) {
final SparkSession sparkSession = SparkSession.active();
Set<URI> mavenDependencies = new HashSet<>();
sparkUDFInfoList.forEach(sparkUDFInfo -> mavenDependencies.addAll(sparkUDFInfo.getArtifactoryUrls()));

final List<FunctionResource> resolvedFunctionResources =
mavenDependencies.stream().map(f -> new FunctionResource(FunctionResourceType.fromString("jar"), f.toString()))
.collect(Collectors.toList());
try {
sparkSession.sessionState().catalog()
.loadFunctionResources(JavaConverters.asScalaBuffer(resolvedFunctionResources));
} catch (Exception e) {
e.printStackTrace();
}
sparkUDFInfoList.forEach(udf -> {
switch (udf.getUdfType()) {
case HIVE_CUSTOM_UDF:
sparkSession.sessionState().catalog()
.registerFunction(new CatalogFunction(new FunctionIdentifier(udf.getFunctionName()), udf.getClassName(),
JavaConverters.asScalaBuffer(resolvedFunctionResources)), true, Option.apply(null));
break;
case TRANSPORTABLE_UDF:
try {
Utils.classForName(udf.getClassName(), true, false).getMethod("register", String.class).invoke(null,
udf.getFunctionName());
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
e.printStackTrace();
}
break;
default:
throw new RuntimeException("Unsupported UDF type: " + udf.getUdfType());
}
});
}

@Override
public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
if (sparkSessionCatalog instanceof TableCatalog && sparkSessionCatalog instanceof SupportsNamespaces) {
this.sessionCatalog = (T) sparkSessionCatalog;
} else {
throw new IllegalArgumentException("Invalid session catalog: " + sparkSessionCatalog);
}
}

@Override
public String[][] listNamespaces() throws NoSuchNamespaceException {
return new String[0][];
}

@Override
public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
return new String[0][];
}

@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException {
return null;
}

@Override
public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException {

}

@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes) throws NoSuchNamespaceException {

}

@Override
public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
return false;
}

@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
return new Identifier[0];
}

@Override
public org.apache.spark.sql.connector.catalog.Table loadTable(Identifier ident) throws NoSuchTableException {
return sessionCatalog.loadTable(ident);
}

@Override
public org.apache.spark.sql.connector.catalog.Table createTable(Identifier ident, StructType schema,
Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
return null;
}

@Override
public org.apache.spark.sql.connector.catalog.Table alterTable(Identifier ident, TableChange... changes)
throws NoSuchTableException {
return null;
}

@Override
public boolean dropTable(Identifier ident) {
return false;
}

@Override
public void renameTable(Identifier oldIdent, Identifier newIdent)
throws NoSuchTableException, TableAlreadyExistsException {

}

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
try {
hiveClient = new HiveMetaStoreClient(new HiveConf());
} catch (MetaException e) {
e.printStackTrace();
}
hiveToRelConverter = new HiveToRelConverter(new HiveMscAdapter(hiveClient));
viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(new HiveMscAdapter(hiveClient));
}

@Override
public String name() {
return catalogName;
}
}
Loading