Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HDFS operation jmx stats #17078

Merged
merged 2 commits into from
May 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.filesystem.hdfs;

import io.airlift.stats.TimeStat;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
Expand All @@ -21,6 +22,7 @@
import io.trino.hdfs.FileSystemWithBatchDelete;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.TrinoHdfsFileSystemStats;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

Expand All @@ -44,42 +46,51 @@ class HdfsFileSystem
{
private final HdfsEnvironment environment;
private final HdfsContext context;
private final TrinoHdfsFileSystemStats stats;

public HdfsFileSystem(HdfsEnvironment environment, HdfsContext context)
public HdfsFileSystem(HdfsEnvironment environment, HdfsContext context, TrinoHdfsFileSystemStats stats)
{
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
this.stats = requireNonNull(stats, "stats is null");
}

@Override
public TrinoInputFile newInputFile(Location location)
{
return new HdfsInputFile(location, null, environment, context);
return new HdfsInputFile(location, null, environment, context, stats.getOpenFileCalls());
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
return new HdfsInputFile(location, length, environment, context);
return new HdfsInputFile(location, length, environment, context, stats.getOpenFileCalls());
}

@Override
public TrinoOutputFile newOutputFile(Location location)
{
return new HdfsOutputFile(location, environment, context);
return new HdfsOutputFile(location, environment, context, stats.getCreateFileCalls());
}

@Override
public void deleteFile(Location location)
throws IOException
{
stats.getDeleteFileCalls().newCall();
Path file = hadoopPath(location);
FileSystem fileSystem = environment.getFileSystem(context, file);
environment.doAs(context.getIdentity(), () -> {
if (!fileSystem.delete(file, false)) {
throw new IOException("Failed to delete file: " + file);
try (TimeStat.BlockTimer ignored = stats.getDeleteFileCalls().time()) {
if (!fileSystem.delete(file, false)) {
throw new IOException("Failed to delete file: " + file);
}
return null;
}
catch (IOException e) {
stats.getDeleteFileCalls().recordException(e);
throw e;
}
return null;
});
}

Expand All @@ -99,7 +110,14 @@ public void deleteFiles(Collection<Location> locations)
}
else {
for (Path path : directoryWithPaths.getValue()) {
rawFileSystem.delete(path, false);
stats.getDeleteFileCalls().newCall();
try (TimeStat.BlockTimer ignored = stats.getDeleteFileCalls().time()) {
rawFileSystem.delete(path, false);
}
catch (IOException e) {
stats.getDeleteFileCalls().recordException(e);
throw e;
}
}
}
return null;
Expand All @@ -111,43 +129,62 @@ public void deleteFiles(Collection<Location> locations)
public void deleteDirectory(Location location)
throws IOException
{
stats.getDeleteDirectoryCalls().newCall();
Path directory = hadoopPath(location);
FileSystem fileSystem = environment.getFileSystem(context, directory);
environment.doAs(context.getIdentity(), () -> {
if (!fileSystem.delete(directory, true) && fileSystem.exists(directory)) {
throw new IOException("Failed to delete directory: " + directory);
try (TimeStat.BlockTimer ignored = stats.getDeleteDirectoryCalls().time()) {
if (!fileSystem.delete(directory, true) && fileSystem.exists(directory)) {
throw new IOException("Failed to delete directory: " + directory);
}
return null;
}
catch (IOException e) {
stats.getDeleteDirectoryCalls().recordException(e);
throw e;
}
return null;
});
}

@Override
public void renameFile(Location source, Location target)
throws IOException
{
stats.getRenameFileCalls().newCall();
Path sourcePath = hadoopPath(source);
Path targetPath = hadoopPath(target);
FileSystem fileSystem = environment.getFileSystem(context, sourcePath);
environment.doAs(context.getIdentity(), () -> {
if (!fileSystem.rename(sourcePath, targetPath)) {
throw new IOException(format("Failed to rename [%s] to [%s]", source, target));
try (TimeStat.BlockTimer ignored = stats.getRenameFileCalls().time()) {
if (!fileSystem.rename(sourcePath, targetPath)) {
throw new IOException(format("Failed to rename [%s] to [%s]", source, target));
}
return null;
}
catch (IOException e) {
stats.getRenameFileCalls().recordException(e);
throw e;
}
return null;
});
}

@Override
public FileIterator listFiles(Location location)
throws IOException
{
stats.getListFilesCalls().newCall();
Path directory = hadoopPath(location);
FileSystem fileSystem = environment.getFileSystem(context, directory);
return environment.doAs(context.getIdentity(), () -> {
try {
try (TimeStat.BlockTimer ignored = stats.getListFilesCalls().time()) {
return new HdfsFileIterator(location, directory, fileSystem.listFiles(directory, true));
}
catch (FileNotFoundException e) {
return FileIterator.empty();
catch (IOException e) {
stats.getListFilesCalls().recordException(e);
if (e instanceof FileNotFoundException) {
return FileIterator.empty();
}
throw e;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.TrinoHdfsFileSystemStats;
import io.trino.spi.security.ConnectorIdentity;

import javax.inject.Inject;
Expand All @@ -27,16 +28,23 @@ public class HdfsFileSystemFactory
implements TrinoFileSystemFactory
{
private final HdfsEnvironment environment;
private final TrinoHdfsFileSystemStats fileSystemStats;

@Inject
public HdfsFileSystemFactory(HdfsEnvironment environment)
public HdfsFileSystemFactory(HdfsEnvironment environment, TrinoHdfsFileSystemStats fileSystemStats)
{
this.environment = requireNonNull(environment, "environment is null");
this.fileSystemStats = requireNonNull(fileSystemStats, "fileSystemStats is null");
}

@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
return new HdfsFileSystem(environment, new HdfsContext(identity));
return new HdfsFileSystem(environment, new HdfsContext(identity), fileSystemStats);
}

public TrinoHdfsFileSystemStats getFileSystemStats()
{
return fileSystemStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.tracing.TracingFileSystemFactory;
import io.trino.hdfs.TrinoHdfsFileSystemStats;

import static com.google.inject.Scopes.SINGLETON;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class HdfsFileSystemModule
implements Module
Expand All @@ -30,6 +32,8 @@ public class HdfsFileSystemModule
public void configure(Binder binder)
{
binder.bind(HdfsFileSystemFactory.class).in(SINGLETON);
binder.bind(TrinoHdfsFileSystemStats.class).in(SINGLETON);
newExporter(binder).export(TrinoHdfsFileSystemStats.class).withGeneratedName();
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
*/
package io.trino.filesystem.hdfs;

import io.airlift.stats.TimeStat;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoInput;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoInputStream;
import io.trino.hdfs.CallStats;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -40,14 +42,16 @@ class HdfsInputFile
private final Path file;
private Long length;
private FileStatus status;
private CallStats openFileCallStat;

public HdfsInputFile(Location location, Long length, HdfsEnvironment environment, HdfsContext context)
public HdfsInputFile(Location location, Long length, HdfsEnvironment environment, HdfsContext context, CallStats openFileCallStat)
{
this.location = requireNonNull(location, "location is null");
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
this.file = hadoopPath(location);
this.length = length;
this.openFileCallStat = requireNonNull(openFileCallStat, "openFileCallStat is null");
checkArgument(length == null || length >= 0, "length is negative");
}

Expand Down Expand Up @@ -105,8 +109,17 @@ public String toString()
private FSDataInputStream openFile()
throws IOException
{
openFileCallStat.newCall();
FileSystem fileSystem = environment.getFileSystem(context, file);
return environment.doAs(context.getIdentity(), () -> fileSystem.open(file));
return environment.doAs(context.getIdentity(), () -> {
try (TimeStat.BlockTimer ignored = openFileCallStat.time()) {
return fileSystem.open(file);
}
catch (IOException e) {
openFileCallStat.recordException(e);
throw e;
}
});
}

private FileStatus lazyStatus()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package io.trino.filesystem.hdfs;

import io.airlift.stats.TimeStat;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.hdfs.CallStats;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.MemoryAwareFileSystem;
Expand All @@ -35,12 +37,14 @@ class HdfsOutputFile
private final Location location;
private final HdfsEnvironment environment;
private final HdfsContext context;
private final CallStats createFileCallStat;

public HdfsOutputFile(Location location, HdfsEnvironment environment, HdfsContext context)
public HdfsOutputFile(Location location, HdfsEnvironment environment, HdfsContext context, CallStats createFileCallStat)
{
this.location = requireNonNull(location, "location is null");
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
this.createFileCallStat = requireNonNull(createFileCallStat, "createFileCallStat is null");
}

@Override
Expand All @@ -60,13 +64,20 @@ public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext)
private OutputStream create(boolean overwrite, AggregatedMemoryContext memoryContext)
throws IOException
{
createFileCallStat.newCall();
Path file = hadoopPath(location);
FileSystem fileSystem = environment.getFileSystem(context, file);
FileSystem rawFileSystem = getRawFileSystem(fileSystem);
if (rawFileSystem instanceof MemoryAwareFileSystem memoryAwareFileSystem) {
return environment.doAs(context.getIdentity(), () -> memoryAwareFileSystem.create(file, memoryContext));
try (TimeStat.BlockTimer ignored = createFileCallStat.time()) {
if (rawFileSystem instanceof MemoryAwareFileSystem memoryAwareFileSystem) {
return environment.doAs(context.getIdentity(), () -> memoryAwareFileSystem.create(file, memoryContext));
}
return environment.doAs(context.getIdentity(), () -> fileSystem.create(file, overwrite));
}
catch (IOException e) {
createFileCallStat.recordException(e);
throw e;
}
return environment.doAs(context.getIdentity(), () -> fileSystem.create(file, overwrite));
}

@Override
Expand Down
Loading