Skip to content

Commit

Permalink
Accommodate Hive API changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jnturton committed Sep 15, 2024
1 parent b7fd0ab commit ec503de
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public synchronized void registerSchemas(SchemaConfig schemaConfig, SchemaPlus p
public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
switch (phase) {
case PARTITION_PRUNING:
final String defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
final String defaultPartitionValue = hiveConf.get(ConfVars.DEFAULT_PARTITION_NAME.varname);
ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerContext, defaultPartitionValue));
ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerContext, defaultPartitionValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.work.ExecErrorConstants;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand Down Expand Up @@ -582,7 +583,7 @@ public static boolean hasHeaderOrFooter(HiveTableWithColumnCache table) {
*/
public static void verifyAndAddTransactionalProperties(JobConf job, StorageDescriptor sd) {

if (AcidUtils.isTablePropertyTransactional(job)) {
if (AcidUtils.isTablePropertyTransactional(job.getPropsWithPrefix("transactional"))) {
HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);

// No work is needed, if schema evolution is used
Expand All @@ -594,7 +595,7 @@ public static void verifyAndAddTransactionalProperties(JobConf job, StorageDescr
String colNames;
String colTypes;

// Try to get get column names and types from table or partition properties. If they are absent there, get columns
// Try to get column names and types from table or partition properties. If they are absent there, get columns
// data from storage descriptor of the table
colNames = job.get(serdeConstants.LIST_COLUMNS);
colTypes = job.get(serdeConstants.LIST_COLUMN_TYPES);
Expand Down Expand Up @@ -749,6 +750,7 @@ public static HiveConf generateHiveConf(HiveConf hiveConf, Map<String, String> p
return newHiveConf;
}


/**
* Helper method which stores partition columns in table columnListCache. If table columnListCache has exactly the
* same columns as partition, in partition stores columns index that corresponds to identical column list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
Expand Down Expand Up @@ -261,8 +262,9 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio
private Callable<Void> getInitTask(OutputMutator output) {
return () -> {
this.job = new JobConf(hiveConf);
Properties partitionProperties = HiveUtilities.getPartitionMetadata(partition, hiveTable);
Properties hiveTableProperties = HiveUtilities.getTableMetadata(hiveTable);
final Deserializer tableDeserializer = createDeserializer(job, hiveTable.getSd(), hiveTableProperties);
final Deserializer tableDeserializer = createDeserializer(job, hiveTable.getSd(), hiveTableProperties, partitionProperties);
final StructObjectInspector tableObjInspector = getStructOI(tableDeserializer);

if (partition == null) {
Expand All @@ -275,9 +277,8 @@ private Callable<Void> getInitTask(OutputMutator output) {
job.setInputFormat(HiveUtilities.getInputFormatClass(job, hiveTable.getSd(), hiveTable));
HiveUtilities.verifyAndAddTransactionalProperties(job, hiveTable.getSd());
} else {
Properties partitionProperties = HiveUtilities.getPartitionMetadata(partition, hiveTable);
HiveUtilities.addConfToJob(job, partitionProperties);
this.partitionDeserializer = createDeserializer(job, partition.getSd(), partitionProperties);
this.partitionDeserializer = createDeserializer(job, partition.getSd(), hiveTableProperties, partitionProperties);
this.partitionObjInspector = getStructOI(partitionDeserializer);

this.finalObjInspector = (StructObjectInspector) ObjectInspectorConverters.getConvertedOI(partitionObjInspector, tableObjInspector);
Expand Down Expand Up @@ -326,7 +327,8 @@ private Callable<Void> getInitTask(OutputMutator output) {
List<String> nestedColumnPaths = getColumns().stream()
.map(SchemaPath::getRootSegmentPath)
.collect(Collectors.toList());
ColumnProjectionUtils.appendReadColumns(job, idsOfProjectedColumns, selectedColumnNames, nestedColumnPaths);

ColumnProjectionUtils.appendReadColumns(job, idsOfProjectedColumns, selectedColumnNames, nestedColumnPaths, false);

// Initialize selectedStructFieldRefs and columnValueWriters, which are two key collections of
// objects used to read and save columns row data into Drill's value vectors
Expand All @@ -345,7 +347,7 @@ private Callable<Void> getInitTask(OutputMutator output) {
if (partition != null && selectedPartitionColumnNames.size() > 0) {
List<ValueVector> partitionVectorList = new ArrayList<>(selectedPartitionColumnNames.size());
List<Object> partitionValueList = new ArrayList<>(selectedPartitionColumnNames.size());
String defaultPartitionValue = hiveConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname);
String defaultPartitionValue = hiveConf.get(HiveConf.ConfVars.DEFAULT_PARTITION_NAME.varname);
OptionManager options = fragmentContext.getOptions();
for (int i = 0; i < partitionKeyFields.size(); i++) {
FieldSchema field = partitionKeyFields.get(i);
Expand Down Expand Up @@ -447,10 +449,10 @@ public void close() {
closeMapredReader();
}

private static Deserializer createDeserializer(JobConf job, StorageDescriptor sd, Properties properties) throws Exception {
private static Deserializer createDeserializer(JobConf job, StorageDescriptor sd, Properties hiveTableproperties, Properties partitionProperties) throws Exception {
final Class<? extends Deserializer> c = Class.forName(sd.getSerdeInfo().getSerializationLib()).asSubclass(Deserializer.class);
final Deserializer deserializer = c.getConstructor().newInstance();
deserializer.initialize(job, properties);
((AbstractSerDe) deserializer).initialize(job, hiveTableproperties, partitionProperties);

return deserializer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ public HiveDriverManager getDriverManager() {
* from pluginConf or driverConf
*/
public String getWarehouseDir() {
String warehouseDir = pluginConf.get(ConfVars.METASTOREWAREHOUSE.varname);
return nonNull(warehouseDir) ? warehouseDir : driverConf.get(ConfVars.METASTOREWAREHOUSE.varname);
String warehouseDir = pluginConf.get(ConfVars.METASTORE_WAREHOUSE.varname);
return nonNull(warehouseDir) ? warehouseDir : driverConf.get(ConfVars.METASTORE_WAREHOUSE.varname);
}

public static class Builder {
Expand All @@ -153,22 +153,22 @@ private Builder(File baseDir) {
String warehouseDir = new File(baseDir, "warehouse").getAbsolutePath();
// Drill Hive Storage plugin defaults
pluginName("hive");
pluginOption(ConfVars.METASTOREURIS, "");
pluginOption(ConfVars.METASTORECONNECTURLKEY, jdbcUrl);
pluginOption(ConfVars.METASTOREWAREHOUSE, warehouseDir);
pluginOption(ConfVars.METASTORE_URIS, "");
pluginOption(ConfVars.METASTORE_CONNECT_URL_KEY, jdbcUrl);
pluginOption(ConfVars.METASTORE_WAREHOUSE, warehouseDir);
pluginOption(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
// Hive Driver defaults
driverOption(ConfVars.METASTORECONNECTURLKEY, jdbcUrl);
driverOption(ConfVars.METASTORE_CONNECT_URL_KEY, jdbcUrl);
driverOption(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
driverOption(ConfVars.METASTOREWAREHOUSE, warehouseDir);
driverOption(ConfVars.METASTORE_URIS, warehouseDir);
driverOption("mapred.job.tracker", "local");
driverOption(ConfVars.SCRATCHDIR, createDirWithPosixPermissions(baseDir, "scratch_dir").getAbsolutePath());
driverOption(ConfVars.LOCALSCRATCHDIR, createDirWithPosixPermissions(baseDir, "local_scratch_dir").getAbsolutePath());
driverOption(ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
driverOption(ConfVars.SCRATCH_DIR, createDirWithPosixPermissions(baseDir, "scratch_dir").getAbsolutePath());
driverOption(ConfVars.LOCAL_SCRATCH_DIR, createDirWithPosixPermissions(baseDir, "local_scratch_dir").getAbsolutePath());
driverOption(ConfVars.DYNAMIC_PARTITIONING_MODE, "nonstrict");
driverOption(ConfVars.METASTORE_AUTO_CREATE_ALL, Boolean.toString(true));
driverOption(ConfVars.METASTORE_SCHEMA_VERIFICATION, Boolean.toString(false));
driverOption(ConfVars.HIVE_MATERIALIZED_VIEW_ENABLE_AUTO_REWRITING, Boolean.toString(false));
driverOption(HiveConf.ConfVars.HIVESESSIONSILENT, Boolean.toString(true));
driverOption(ConfVars.HIVE_SESSION_SILENT, Boolean.toString(true));
driverOption(ConfVars.HIVE_CBO_ENABLED, Boolean.toString(false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.util.EnumSet;
import java.util.Set;

import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.test.QueryBuilder;
import org.apache.drill.test.TestTools;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.util.ComparableVersion;
import org.apache.hive.common.util.HiveVersionInfo;
Expand All @@ -50,16 +52,10 @@ public class HiveTestUtilities {
* Execute the give <i>query</i> on given <i>hiveDriver</i> instance.
*/
public static void executeQuery(Driver hiveDriver, String query) {
CommandProcessorResponse response;
try {
response = hiveDriver.run(query);
} catch (Exception e) {
throw new RuntimeException(e);
}

if (response.getResponseCode() != 0 ) {
throw new RuntimeException(String.format("Failed to execute command '%s', errorMsg = '%s'",
query, response.getErrorMessage()));
hiveDriver.run(query);
} catch (CommandProcessorException e) {
throw new DrillRuntimeException(String.format("Failed to execute command '%s'", query, e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import static org.apache.drill.exec.hive.HiveTestUtilities.createDirWithPosixPermissions;
import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_URIS;

public class BaseTestHiveImpersonation extends BaseTestImpersonation {
protected static final String hivePluginName = "hive";
Expand Down Expand Up @@ -79,21 +79,21 @@ protected static void prepHiveConfAndData() throws Exception {

// Configure metastore persistence db location on local filesystem
final String dbUrl = String.format("jdbc:derby:;databaseName=%s;create=true", metaStoreDBDir.getAbsolutePath());
hiveConf.set(ConfVars.METASTORECONNECTURLKEY.varname, dbUrl);
hiveConf.set(ConfVars.METASTORE_CONNECT_URL_KEY.varname, dbUrl);

hiveConf.set(ConfVars.SCRATCHDIR.varname, "file://" + scratchDir.getAbsolutePath());
hiveConf.set(ConfVars.LOCALSCRATCHDIR.varname, localScratchDir.getAbsolutePath());
hiveConf.set(ConfVars.SCRATCH_DIR.varname, "file://" + scratchDir.getAbsolutePath());
hiveConf.set(ConfVars.LOCAL_SCRATCH_DIR.varname, localScratchDir.getAbsolutePath());
hiveConf.set(ConfVars.METASTORE_SCHEMA_VERIFICATION.varname, "false");
hiveConf.set(ConfVars.METASTORE_AUTO_CREATE_ALL.varname, "true");
hiveConf.set(ConfVars.HIVE_CBO_ENABLED.varname, "false");
hiveConf.set(ConfVars.HIVESTATSAUTOGATHER.varname, "false");
hiveConf.set(ConfVars.HIVESTATSCOLAUTOGATHER.varname, "false");
hiveConf.set(ConfVars.HIVESESSIONSILENT.varname, "true");
hiveConf.set(ConfVars.HIVE_SESSION_SILENT.varname, "true");

// Set MiniDFS conf in HiveConf
hiveConf.set(FS_DEFAULT_NAME_KEY, dfsConf.get(FS_DEFAULT_NAME_KEY));

whDir = hiveConf.get(ConfVars.METASTOREWAREHOUSE.varname);
whDir = hiveConf.get(ConfVars.METASTORE_WAREHOUSE.varname);
FileSystem.mkdirs(fs, new Path(whDir), new FsPermission((short) 0777));

dirTestWatcher.copyResourceToRoot(Paths.get("student.txt"));
Expand Down Expand Up @@ -122,7 +122,7 @@ protected static void startHiveMetaStore() throws Exception {
}
final int port = (int) metaStoreUtilsClass.getDeclaredMethod("findFreePort").invoke(null);

hiveConf.set(METASTOREURIS.varname, "thrift://localhost:" + port);
hiveConf.set(METASTORE_URIS.varname, "thrift://localhost:" + port);

metaStoreUtilsClass.getDeclaredMethod("startMetaStore", int.class, hadoopThriftAuthBridgeClass, confClass)
.invoke(null, port, hadoopThriftAuthBridge, hiveConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_CBO_ENABLED;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_URIS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AUTO_CREATE_ALL;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_SCHEMA_VERIFICATION;
Expand Down Expand Up @@ -100,7 +100,7 @@ private static void setSqlStdBasedAuthorizationInHiveConf() {

private static Map<String, String> getHivePluginConfig() {
final Map<String, String> hiveConfig = Maps.newHashMap();
hiveConfig.put(METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname));
hiveConfig.put(METASTORE_URIS.varname, hiveConf.get(METASTORE_URIS.varname));
hiveConfig.put(FS_DEFAULT_NAME_KEY, dfsConf.get(FS_DEFAULT_NAME_KEY));
hiveConfig.put(HIVE_SERVER2_ENABLE_DOAS.varname, hiveConf.get(HIVE_SERVER2_ENABLE_DOAS.varname));
hiveConfig.put(METASTORE_EXECUTE_SET_UGI.varname, hiveConf.get(METASTORE_EXECUTE_SET_UGI.varname));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,7 @@
import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
import static com.google.common.collect.Lists.newArrayList;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.DYNAMICPARTITIONINGMODE;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_CBO_ENABLED;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_METASTORE_AUTHENTICATOR_MANAGER;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_MANAGER;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AUTO_CREATE_ALL;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_SCHEMA_VERIFICATION;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.*;

@Category({SlowTest.class, HiveStorageTest.class})
public class TestStorageBasedHiveAuthorization extends BaseTestHiveImpersonation {
Expand Down Expand Up @@ -184,12 +174,12 @@ private static void setStorabaseBasedAuthorizationInHiveConf() {
hiveConf.set(HIVE_METASTORE_AUTHORIZATION_MANAGER.varname, StorageBasedAuthorizationProvider.class.getName());
hiveConf.set(HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname, "true");
hiveConf.set(METASTORE_EXECUTE_SET_UGI.varname, "true");
hiveConf.set(DYNAMICPARTITIONINGMODE.varname, "nonstrict");
hiveConf.set(DYNAMIC_PARTITIONING_MODE.varname, "nonstrict");
}

private static Map<String, String> getHivePluginConfig() {
final Map<String, String> hiveConfig = Maps.newHashMap();
hiveConfig.put(METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname));
hiveConfig.put(METASTORE_URIS.varname, hiveConf.get(METASTORE_URIS.varname));
hiveConfig.put(FS_DEFAULT_NAME_KEY, dfsConf.get(FS_DEFAULT_NAME_KEY));
hiveConfig.put(HIVE_SERVER2_ENABLE_DOAS.varname, hiveConf.get(HIVE_SERVER2_ENABLE_DOAS.varname));
hiveConfig.put(METASTORE_EXECUTE_SET_UGI.varname, hiveConf.get(METASTORE_EXECUTE_SET_UGI.varname));
Expand Down

0 comments on commit ec503de

Please sign in to comment.