Skip to content

Commit

Permalink
Remove most usages of Hadoop from HiveMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Aug 12, 2023
1 parent 83f5aff commit 304b5fa
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 66 deletions.
109 changes: 44 additions & 65 deletions plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
Expand Down Expand Up @@ -123,15 +125,10 @@
import io.trino.spi.type.TypeManager;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -293,10 +290,8 @@
import static io.trino.plugin.hive.util.HiveUtil.toPartitionValues;
import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported;
import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable;
import static io.trino.plugin.hive.util.HiveWriteUtils.checkedDelete;
import static io.trino.plugin.hive.util.HiveWriteUtils.createPartitionValues;
import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery;
import static io.trino.plugin.hive.util.HiveWriteUtils.isS3FileSystem;
import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType;
import static io.trino.plugin.hive.util.RetryDriver.retry;
import static io.trino.plugin.hive.util.Statistics.ReduceOperator.ADD;
Expand Down Expand Up @@ -935,9 +930,9 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
{
Optional<String> location = HiveSchemaProperties.getLocation(properties).map(locationUri -> {
try {
hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(locationUri));
fileSystemFactory.create(session).directoryExists(Location.of(locationUri));
}
catch (IOException e) {
catch (IOException | IllegalArgumentException e) {
throw new TrinoException(INVALID_SCHEMA_PROPERTY, "Invalid location URI: " + locationUri, e);
}
return locationUri;
Expand Down Expand Up @@ -997,7 +992,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
validateTimestampColumns(tableMetadata.getColumns(), getTimestampPrecision(session));
List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy));
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
Map<String, String> tableProperties = getEmptyTableProperties(tableMetadata, bucketProperty, new HdfsContext(session));
Map<String, String> tableProperties = getEmptyTableProperties(tableMetadata, bucketProperty, session);

hiveStorageFormat.validateColumns(columnHandles);

Expand All @@ -1017,7 +1012,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe

external = true;
targetPath = Optional.of(getValidatedExternalLocation(externalLocation));
checkExternalPath(new HdfsContext(session), new Path(targetPath.get().toString()));
checkExternalPath(session, targetPath.get());
}
else {
external = false;
Expand Down Expand Up @@ -1056,7 +1051,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
false);
}

private Map<String, String> getEmptyTableProperties(ConnectorTableMetadata tableMetadata, Optional<HiveBucketProperty> bucketProperty, HdfsContext hdfsContext)
private Map<String, String> getEmptyTableProperties(ConnectorTableMetadata tableMetadata, Optional<HiveBucketProperty> bucketProperty, ConnectorSession session)
{
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
ImmutableMap.Builder<String, String> tableProperties = ImmutableMap.builder();
Expand Down Expand Up @@ -1087,7 +1082,7 @@ private Map<String, String> getEmptyTableProperties(ConnectorTableMetadata table
checkAvroSchemaProperties(avroSchemaUrl, avroSchemaLiteral);
if (avroSchemaUrl != null) {
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_URL);
tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAndNormalizeAvroSchemaUrl(avroSchemaUrl, hdfsContext));
tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAvroSchemaUrl(session, avroSchemaUrl));
}
else if (avroSchemaLiteral != null) {
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_LITERAL);
Expand Down Expand Up @@ -1238,28 +1233,17 @@ private void validateOrcBloomFilterColumns(ConnectorTableMetadata tableMetadata,
}
}

private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context)
private String validateAvroSchemaUrl(ConnectorSession session, String url)
{
try {
new URL(url).openStream().close();
return url;
}
catch (MalformedURLException e) {
// try locally
if (new File(url).exists()) {
// hive needs url to have a protocol
return new File(url).toURI().toString();
}
// try hdfs
try {
if (!hdfsEnvironment.getFileSystem(context, new Path(url)).exists(new Path(url))) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Cannot locate Avro schema file: " + url);
}
return url;
}
catch (IOException ex) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Avro schema file is not a valid file system URI: " + url, ex);
Location location = Location.of(url);
if (!fileSystemFactory.create(session).newInputFile(location).exists()) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Cannot locate Avro schema file: " + url);
}
return location.toString();
}
catch (IllegalArgumentException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Avro schema file is not a valid file system URI: " + url, e);
}
catch (IOException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Cannot open Avro schema file: " + url, e);
Expand Down Expand Up @@ -1306,17 +1290,15 @@ private static Location getValidatedExternalLocation(String location)
return validated;
}

private void checkExternalPath(HdfsContext context, Path path)
private void checkExternalPath(ConnectorSession session, Location location)
{
try {
if (!isS3FileSystem(context, hdfsEnvironment, path)) {
if (!hdfsEnvironment.getFileSystem(context, path).isDirectory(path)) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location must be a directory: " + path);
}
if (!fileSystemFactory.create(session).directoryExists(location).orElse(true)) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location must be a directory: " + location);
}
}
catch (IOException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI: " + path, e);
catch (IOException | IllegalArgumentException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI: " + location, e);
}
}

Expand Down Expand Up @@ -1687,7 +1669,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();

Map<String, String> tableProperties = getEmptyTableProperties(tableMetadata, bucketProperty, new HdfsContext(session));
Map<String, String> tableProperties = getEmptyTableProperties(tableMetadata, bucketProperty, session);
List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy));
HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat : getHiveStorageFormat(session);

Expand Down Expand Up @@ -2262,14 +2244,14 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
private void removeNonCurrentQueryFiles(ConnectorSession session, Location partitionLocation)
{
String queryId = session.getQueryId();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
try {
Path partitionPath = new Path(partitionLocation.toString());
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session), partitionPath);
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(partitionPath, false);
// use TrinoFileSystem instead of Hadoop file system
FileIterator iterator = fileSystem.listFiles(partitionLocation);
while (iterator.hasNext()) {
Path file = iterator.next().getPath();
if (!isFileCreatedByQuery(file.getName(), queryId)) {
checkedDelete(fileSystem, file, false);
Location location = iterator.next().location();
if (!isFileCreatedByQuery(location.fileName(), queryId)) {
fileSystem.deleteFile(location);
}
}
}
Expand Down Expand Up @@ -2546,40 +2528,37 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl
handle.isRetriesEnabled());
}

// get filesystem
FileSystem fs;
try {
fs = hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(table.getStorage().getLocation()));
}
catch (IOException e) {
throw new TrinoException(HIVE_FILESYSTEM_ERROR, e);
}

// path to be deleted
Set<Path> scannedPaths = splitSourceInfo.stream()
.map(file -> new Path((String) file))
Set<Location> scannedPaths = splitSourceInfo.stream()
.map(file -> Location.of((String) file))
.collect(toImmutableSet());
// track remaining files to be delted for error reporting
Set<Path> remainingFilesToDelete = new HashSet<>(scannedPaths);
Set<Location> remainingFilesToDelete = new HashSet<>(scannedPaths);

// delete loop
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
boolean someDeleted = false;
Optional<Path> firstScannedPath = Optional.empty();
Optional<Location> firstScannedPath = Optional.empty();
try {
for (Path scannedPath : scannedPaths) {
for (Location scannedPath : scannedPaths) {
if (firstScannedPath.isEmpty()) {
firstScannedPath = Optional.of(scannedPath);
}
retry().run("delete " + scannedPath, () -> {
checkedDelete(fs, scannedPath, false);
try {
fileSystem.deleteFile(scannedPath);
}
catch (FileNotFoundException e) {
// ignore missing files
}
return null;
});
someDeleted = true;
remainingFilesToDelete.remove(scannedPath);
}
}
catch (Exception e) {
if (!someDeleted && (firstScannedPath.isEmpty() || exists(fs, firstScannedPath.get()))) {
if (!someDeleted && (firstScannedPath.isEmpty() || exists(fileSystem, firstScannedPath.get()))) {
// we are good - we did not delete any source files so we can just throw error and allow rollback to happend
// if someDeleted flag is false we do extra checkig if first file we tried to delete is still there. There is a chance that
// fs.delete above could throw exception but file was actually deleted.
Expand All @@ -2596,10 +2575,10 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl
}
}

private boolean exists(FileSystem fs, Path path)
private static boolean exists(TrinoFileSystem fs, Location location)
{
try {
return fs.exists(path);
return fs.newInputFile(location).exists();
}
catch (IOException e) {
// on failure pessimistically assume file does not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7625,7 +7625,7 @@ public void testCreateAvroTableWithSchemaUrl()
File schemaFile = createAvroSchemaFile();

String createTableSql = getAvroCreateTableSql(tableName, schemaFile.getAbsolutePath());
String expectedShowCreateTable = getAvroCreateTableSql(tableName, schemaFile.toURI().toString());
String expectedShowCreateTable = getAvroCreateTableSql(tableName, schemaFile.getPath());

assertUpdate(createTableSql);

Expand Down

0 comments on commit 304b5fa

Please sign in to comment.