diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 4639af7155bf..943b40d8be29 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -28,7 +28,9 @@ import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.units.DataSize; +import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; @@ -123,15 +125,10 @@ import io.trino.spi.type.TypeManager; import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -293,10 +290,8 @@ import static io.trino.plugin.hive.util.HiveUtil.toPartitionValues; import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported; import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable; -import static io.trino.plugin.hive.util.HiveWriteUtils.checkedDelete; import static io.trino.plugin.hive.util.HiveWriteUtils.createPartitionValues; import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery; -import static io.trino.plugin.hive.util.HiveWriteUtils.isS3FileSystem; import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType; import static io.trino.plugin.hive.util.RetryDriver.retry; import static io.trino.plugin.hive.util.Statistics.ReduceOperator.ADD; @@ -935,9 +930,9 @@ public void createSchema(ConnectorSession session, String schemaName, Map location = HiveSchemaProperties.getLocation(properties).map(locationUri -> { try { - hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(locationUri)); + fileSystemFactory.create(session).directoryExists(Location.of(locationUri)); } - catch (IOException e) { + catch (IOException | IllegalArgumentException e) { throw new TrinoException(INVALID_SCHEMA_PROPERTY, "Invalid location URI: " + locationUri, e); } return locationUri; @@ -997,7 +992,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe validateTimestampColumns(tableMetadata.getColumns(), getTimestampPrecision(session)); List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy)); HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties()); - Map tableProperties = getEmptyTableProperties(tableMetadata, bucketProperty, new HdfsContext(session)); + Map tableProperties = getEmptyTableProperties(tableMetadata, bucketProperty, session); hiveStorageFormat.validateColumns(columnHandles); @@ -1017,7 +1012,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe external = true; targetPath = Optional.of(getValidatedExternalLocation(externalLocation)); - checkExternalPath(new HdfsContext(session), new Path(targetPath.get().toString())); + checkExternalPath(session, targetPath.get()); } else { external = false; @@ -1056,7 +1051,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe false); } - private Map getEmptyTableProperties(ConnectorTableMetadata tableMetadata, Optional bucketProperty, HdfsContext hdfsContext) + private Map getEmptyTableProperties(ConnectorTableMetadata tableMetadata, Optional bucketProperty, ConnectorSession session) { HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties()); ImmutableMap.Builder tableProperties = ImmutableMap.builder(); @@ -1087,7 +1082,7 @@ private Map getEmptyTableProperties(ConnectorTableMetadata table checkAvroSchemaProperties(avroSchemaUrl, avroSchemaLiteral); if (avroSchemaUrl != null) { checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_URL); - tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAndNormalizeAvroSchemaUrl(avroSchemaUrl, hdfsContext)); + tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAvroSchemaUrl(session, avroSchemaUrl)); } else if (avroSchemaLiteral != null) { checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_LITERAL); @@ -1238,28 +1233,17 @@ private void validateOrcBloomFilterColumns(ConnectorTableMetadata tableMetadata, } } - private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context) + private String validateAvroSchemaUrl(ConnectorSession session, String url) { try { - new URL(url).openStream().close(); - return url; - } - catch (MalformedURLException e) { - // try locally - if (new File(url).exists()) { - // hive needs url to have a protocol - return new File(url).toURI().toString(); - } - // try hdfs - try { - if (!hdfsEnvironment.getFileSystem(context, new Path(url)).exists(new Path(url))) { - throw new TrinoException(INVALID_TABLE_PROPERTY, "Cannot locate Avro schema file: " + url); - } - return url; - } - catch (IOException ex) { - throw new TrinoException(INVALID_TABLE_PROPERTY, "Avro schema file is not a valid file system URI: " + url, ex); + Location location = Location.of(url); + if (!fileSystemFactory.create(session).newInputFile(location).exists()) { + throw new TrinoException(INVALID_TABLE_PROPERTY, "Cannot locate Avro schema file: " + url); } + return location.toString(); + } + catch (IllegalArgumentException e) { + throw new TrinoException(INVALID_TABLE_PROPERTY, "Avro schema file is not a valid file system URI: " + url, e); } catch (IOException e) { throw new TrinoException(INVALID_TABLE_PROPERTY, "Cannot open Avro schema file: " + url, e); @@ -1306,17 +1290,15 @@ private static Location getValidatedExternalLocation(String location) return validated; } - private void checkExternalPath(HdfsContext context, Path path) + private void checkExternalPath(ConnectorSession session, Location location) { try { - if (!isS3FileSystem(context, hdfsEnvironment, path)) { - if (!hdfsEnvironment.getFileSystem(context, path).isDirectory(path)) { - throw new TrinoException(INVALID_TABLE_PROPERTY, "External location must be a directory: " + path); - } + if (!fileSystemFactory.create(session).directoryExists(location).orElse(true)) { + throw new TrinoException(INVALID_TABLE_PROPERTY, "External location must be a directory: " + location); } } - catch (IOException e) { - throw new TrinoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI: " + path, e); + catch (IOException | IllegalArgumentException e) { + throw new TrinoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI: " + location, e); } } @@ -1687,7 +1669,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); - Map tableProperties = getEmptyTableProperties(tableMetadata, bucketProperty, new HdfsContext(session)); + Map tableProperties = getEmptyTableProperties(tableMetadata, bucketProperty, session); List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy)); HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat : getHiveStorageFormat(session); @@ -2262,14 +2244,14 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode private void removeNonCurrentQueryFiles(ConnectorSession session, Location partitionLocation) { String queryId = session.getQueryId(); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); try { - Path partitionPath = new Path(partitionLocation.toString()); - FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session), partitionPath); - RemoteIterator iterator = fileSystem.listFiles(partitionPath, false); + // use TrinoFileSystem instead of Hadoop file system + FileIterator iterator = fileSystem.listFiles(partitionLocation); while (iterator.hasNext()) { - Path file = iterator.next().getPath(); - if (!isFileCreatedByQuery(file.getName(), queryId)) { - checkedDelete(fileSystem, file, false); + Location location = iterator.next().location(); + if (!isFileCreatedByQuery(location.fileName(), queryId)) { + fileSystem.deleteFile(location); } } } @@ -2546,32 +2528,29 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl handle.isRetriesEnabled()); } - // get filesystem - FileSystem fs; - try { - fs = hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(table.getStorage().getLocation())); - } - catch (IOException e) { - throw new TrinoException(HIVE_FILESYSTEM_ERROR, e); - } - // path to be deleted - Set scannedPaths = splitSourceInfo.stream() - .map(file -> new Path((String) file)) + Set scannedPaths = splitSourceInfo.stream() + .map(file -> Location.of((String) file)) .collect(toImmutableSet()); // track remaining files to be delted for error reporting - Set remainingFilesToDelete = new HashSet<>(scannedPaths); + Set remainingFilesToDelete = new HashSet<>(scannedPaths); // delete loop + TrinoFileSystem fileSystem = fileSystemFactory.create(session); boolean someDeleted = false; - Optional firstScannedPath = Optional.empty(); + Optional firstScannedPath = Optional.empty(); try { - for (Path scannedPath : scannedPaths) { + for (Location scannedPath : scannedPaths) { if (firstScannedPath.isEmpty()) { firstScannedPath = Optional.of(scannedPath); } retry().run("delete " + scannedPath, () -> { - checkedDelete(fs, scannedPath, false); + try { + fileSystem.deleteFile(scannedPath); + } + catch (FileNotFoundException e) { + // ignore missing files + } return null; }); someDeleted = true; @@ -2579,7 +2558,7 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl } } catch (Exception e) { - if (!someDeleted && (firstScannedPath.isEmpty() || exists(fs, firstScannedPath.get()))) { + if (!someDeleted && (firstScannedPath.isEmpty() || exists(fileSystem, firstScannedPath.get()))) { // we are good - we did not delete any source files so we can just throw error and allow rollback to happend // if someDeleted flag is false we do extra checkig if first file we tried to delete is still there. There is a chance that // fs.delete above could throw exception but file was actually deleted. @@ -2596,10 +2575,10 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl } } - private boolean exists(FileSystem fs, Path path) + private static boolean exists(TrinoFileSystem fs, Location location) { try { - return fs.exists(path); + return fs.newInputFile(location).exists(); } catch (IOException e) { // on failure pessimistically assume file does not exist diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 9e58658d6186..fa6700b225bf 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -7625,7 +7625,7 @@ public void testCreateAvroTableWithSchemaUrl() File schemaFile = createAvroSchemaFile(); String createTableSql = getAvroCreateTableSql(tableName, schemaFile.getAbsolutePath()); - String expectedShowCreateTable = getAvroCreateTableSql(tableName, schemaFile.toURI().toString()); + String expectedShowCreateTable = getAvroCreateTableSql(tableName, schemaFile.getPath()); assertUpdate(createTableSql);