Skip to content

Commit

Permalink
Merge pull request #91 from oracle/release_2023-05-09
Browse files Browse the repository at this point in the history
Releasing version 3.3.4.1.1.0
  • Loading branch information
y-chandra authored May 9, 2023
2 parents 121ca2e + 9ae2bc8 commit e2e0574
Show file tree
Hide file tree
Showing 16 changed files with 1,390 additions and 78 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/).

## 3.3.4.1.1.0 - 2023-05-09
### Added
- Added support for parallel ranged GET requests in read-ahead mode

### Changed
- Optimized recursive calls for list files and delete path
- Optimized implementation for `FileSystem.getContentSummary`
- Replaced multipart request with PutObject for small object writes in `BmcMultipartOutputStream`

## 3.3.4.1.0.0 - 2023-04-25
### Added
- Added support for OCI Java SDK 3.x
Expand Down
6 changes: 3 additions & 3 deletions hdfs-addons/hdfs-smartparquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<parent>
<artifactId>oci-hdfs-addons</artifactId>
<groupId>com.oracle.oci.sdk</groupId>
<version>3.3.4.1.0.0</version>
<version>3.3.4.1.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -149,7 +149,7 @@
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-hdfs-connector</artifactId>
<version>3.3.4.1.0.0</version>
<version>3.3.4.1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down Expand Up @@ -251,7 +251,7 @@
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-hdfs-connector</artifactId>
<version>3.3.4.1.0.0</version>
<version>3.3.4.1.1.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion hdfs-addons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<parent>
<artifactId>oci-hdfs</artifactId>
<groupId>com.oracle.oci.sdk</groupId>
<version>3.3.4.1.0.0</version>
<version>3.3.4.1.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion hdfs-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<parent>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-hdfs</artifactId>
<version>3.3.4.1.0.0</version>
<version>3.3.4.1.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public final class BmcConstants {

public static final String READ_STREAM_CLASS_KEY = "fs.oci.io.read.custom.stream";

public static final String READ_AHEAD_BLOCK_COUNT_KEY = "fs.oci.io.read.ahead.blockcount";

public static final String NUM_READ_AHEAD_THREADS_KEY = "fs.oci.io.read.ahead.numthreads";

public static final String WRITE_STREAM_CLASS_KEY = "fs.oci.io.write.custom.stream";

public static final String OBJECT_STORE_CLIENT_CLASS_KEY = "fs.oci.client.custom.client";
Expand Down Expand Up @@ -181,6 +185,8 @@ public final class BmcConstants {
public static final String OCI_DELEGATION_TOKEN_FILEPATH_KEY =
"fs.oci.delegation.token.filepath";

public static final String RECURSIVE_DIR_LISTING_FETCH_SIZE_KEY = "fs.oci.dir.recursive.list.fetch.size";

/**
* This class contains constants with deprecated values. The HDFS connector will first try the current values
* in {@link com.oracle.bmc.hdfs.BmcConstants}.
Expand Down
213 changes: 163 additions & 50 deletions hdfs-connector/src/main/java/com/oracle/bmc/hdfs/BmcFilesystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,30 @@
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.annotations.VisibleForTesting;
import com.oracle.bmc.hdfs.store.BmcPropertyAccessor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

Expand Down Expand Up @@ -220,6 +227,11 @@ public boolean delete(final Path path, final boolean recursive) throws IOExcepti
return delegate == null ? false : delegate.delete(path, recursive);
}

@Override
public ContentSummary getContentSummary(final Path path) throws IOException {
return delegate == null ? null : delegate.getContentSummary(path);
}

@Override
public FileStatus getFileStatus(final Path path) throws IOException {
return delegate == null ? null : delegate.getFileStatus(path);
Expand Down Expand Up @@ -293,6 +305,16 @@ public BmcDataStore getDataStore() {
public Configuration getConf() {
return delegate == null ? null : delegate.getConf();
}

@Override
public RemoteIterator<LocatedFileStatus> listFiles(final Path f,
final boolean recursive) throws FileNotFoundException, IOException {
if (recursive) {
return delegate == null ? null : delegate.listFiles(f, true);
} else {
return super.listFiles(f, false);
}
}
}

/**
Expand Down Expand Up @@ -552,70 +574,70 @@ public boolean delete(final Path path, final boolean recursive) throws IOExcepti
return true;
}

// else, it must be a directory
if (!recursive) {
// Try to find if it's an empty dir. If yes, delete the empty dir.
// Find one candidate file or dir inside the given Path to determine emptiness of the dir.
RemoteIterator<LocatedFileStatus> iter = listFilesAndZeroByteDirs(path);
boolean empty = true;
while (iter.hasNext()) {
LocatedFileStatus lfs = iter.next();
if (lfs.isDirectory()) {
Path nPath = ensureAbsolutePath(lfs.getPath());
// Do not consider the Path itself in empty determination.
if (!nPath.equals(path)) {
empty = false;
break;
}
} else {
empty = false;
break;
}
}

final boolean isEmptyDirectory = this.dataStore.isEmptyDirectory(path);
// handle empty directories first
if (isEmptyDirectory) {
// removing empty root directory means nothing, can return true or
// false per spec
if (status.getPath().isRoot()) {
LOG.info("Empty root directory, nothing to delete");
if (!empty) {
throw new IOException(
"Attempting to delete a directory that is not empty, and recursive delete not specified: "
+ path);
} else {
// If the path is root, it won't be deleted by this deleteDirectory method.
// Delete the empty dir since it's empty.
dataStore.deleteDirectory(path);
return true;
}
LOG.info("Deleting empty directory");
// else remove the placeholder file
this.dataStore.deleteDirectory(path);
return true;
}

// everything else is a non-empty directory
List<Path> zeroByteDirsToDelete = new ArrayList<>();
RemoteIterator<LocatedFileStatus> iter = listFilesAndZeroByteDirs(path);
while (iter.hasNext()) {
LocatedFileStatus lfs = iter.next();
Path absPath = ensureAbsolutePath(lfs.getPath());

// non-empty and !recursive, cannot continue
if (!recursive) {
throw new IOException(
"Attempting to delete a directory that is not empty, and recursive delete not specified: "
+ path);
}

final List<FileStatus> directories = new ArrayList<>();
directories.add(status);

final List<Path> directoriesToDelete = new ArrayList<>();

LOG.debug("Recursively deleting directory");
// breadth-first recursive delete everything except for directory placeholders.
// leave those until the end to try to maintain some sort of directory
// structure if sub files fail to delete
while (!directories.isEmpty()) {
final FileStatus directory = directories.remove(0);
final Path directoryPath = this.ensureAbsolutePath(directory.getPath());
final List<FileStatus> entries = this.dataStore.listDirectory(directoryPath);
for (final FileStatus entry : entries) {
if (entry.isDirectory()) {
directories.add(entry);
} else {
this.dataStore.delete(this.ensureAbsolutePath(entry.getPath()));
}
if (lfs.isDirectory()) {
// This path is traversed only for zero byte dir markers.
// Store the zero byte dir paths to a list and only delete once all the file paths
// are done with deletion. This is to preserve the directory structure in case the process
// of this delete is interrupted somehow.
zeroByteDirsToDelete.add(absPath);
} else {
dataStore.delete(absPath);
}
// track this to delete later
directoriesToDelete.add(directoryPath);
}

// now that all objects under this directory have been deleted, delete
// all of the individual directory objects we found

// sort by length, effectively to delete child directories before parent directories. doing this
// in case a delete fails midway, then we done our best not to create unreachable directories
Collections.sort(directoriesToDelete, PATH_LENGTH_COMPARATOR);

for (final Path directoryToDelete : directoriesToDelete) {
this.dataStore.deleteDirectory(directoryToDelete);
// Delete the longest paths first to retain structure in case of interruptions.
Collections.sort(zeroByteDirsToDelete, PATH_LENGTH_COMPARATOR);
for (Path dir : zeroByteDirsToDelete) {
dataStore.deleteDirectory(dir);
}

return true;
}

@Override
public ContentSummary getContentSummary(Path path) throws IOException {
final Path absolutePath = this.ensureAbsolutePath(path);
return this.dataStore.getContentSummary(absolutePath);
}

@Override
public FileStatus getFileStatus(final Path path) throws IOException {
LOG.debug("Requested file status for {}", path);
Expand Down Expand Up @@ -875,4 +897,95 @@ public synchronized void close() throws IOException {
public synchronized void addOwner(BmcFilesystem fs) {
owners.add(fs);
}

private RemoteIterator<LocatedFileStatus> listFilesAndZeroByteDirs(Path path) throws IOException {
return new FlatListingRemoteIterator(path, true);
}

@Override
public RemoteIterator<LocatedFileStatus> listFiles(final Path f,
final boolean recursive) throws FileNotFoundException, IOException {

if (!recursive) {
return super.listFiles(f, false);
}

return new FlatListingRemoteIterator(f, false);
}

/**
* This class provides an implementation of remote iterator that does a non-recursive, flat listing
* using OSS API. Given a path, the listing is done without prefix so that all zero byte dirs and objects
* inside the given prefix are listed. includeZeroByteDirs is an option to the constructor.
* For flat listing of just files, the includeZeroByteDirs should be false.
* In the recursive delete implementation the includeZeroByteDirs is supplied as true so that we can
* cleanup even the zero byte marker directories.
*/
class FlatListingRemoteIterator implements RemoteIterator<LocatedFileStatus> {
private List<LocatedFileStatus> resultList = new LinkedList<>();
private boolean fetchComplete = false;
private String nextEleCursor = null;
private Path path;
private boolean includeZeroByteDirs;

public FlatListingRemoteIterator(Path path, boolean includeZeroByteDirs) {
this.path = path;
this.includeZeroByteDirs = includeZeroByteDirs;
}

public boolean hasNext() throws IOException {

if (resultList.size() > 0) {
return true;
}

if (fetchComplete) {
return false;
} else {
while (true) {
BmcDataStore ds = BmcFilesystemImpl.this.getDataStore();
Pair<List<FileStatus>, String> resultPair = ds.flatListDirectoryRecursive(path, nextEleCursor);

List<FileStatus> fsResultList = resultPair.getLeft();

if (fsResultList.size() > 0) {
for (FileStatus result : fsResultList) {

if (result.isFile()) {
BlockLocation[] locs =
BmcFilesystemImpl.this.getFileBlockLocations(result, 0L, result.getLen());
resultList.add(new LocatedFileStatus(result, locs));
} else if (includeZeroByteDirs) {
resultList.add(new LocatedFileStatus(result, null));
}
}
nextEleCursor = resultPair.getRight();
if (nextEleCursor == null) {
fetchComplete = true;
}
} else {
fetchComplete = true;
return false;
}

if (resultList.size() > 0) {
return true;
} else if (fetchComplete && resultList.size() == 0) {
// This check is needed because sometimes the resultList can be empty
// due to it having only directories. So we still need to continue with
// the next token if the fetch has not been completed yet.
return false;
}
}
}
}

public LocatedFileStatus next() throws IOException {
if (hasNext()) {
return resultList.remove(0);
} else {
throw new NoSuchElementException("No more elements exist.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,19 @@ public enum BmcProperties {
*/
READ_AHEAD_BLOCK_SIZE(READ_AHEAD_BLOCK_SIZE_KEY, 6 * 1024 * 1024),

/**
* (int, optional) Number of blocks to read if READ_AHEAD is enabled.
* See {@link BmcConstants#READ_AHEAD_BLOCK_COUNT_KEY} for config key name. Default is 1.
*/
READ_AHEAD_BLOCK_COUNT(READ_AHEAD_BLOCK_COUNT_KEY, 1),

/**
* (int, optional) The number of threads to use for parallel GET operations when reading ahead.
* See {@link BmcConstants#NUM_READ_AHEAD_THREADS_KEY} for config key name. Note, any value
* smaller than 0 is interpreted as using the default value. Default is 16.
*/
NUM_READ_AHEAD_THREADS(NUM_READ_AHEAD_THREADS_KEY, 16),

/**
* (boolean, optional) Flag to enable parquet caching. See
* {@link BmcConstants#OBJECT_PARQUET_CACHING_ENABLED_KEY} for config key name. Default is false.
Expand Down Expand Up @@ -474,6 +487,14 @@ public enum BmcProperties {
* (String, optional) File path that has the delegation token. Default value is null
*/
OCI_DELEGATION_TOKEN_FILEPATH(OCI_DELEGATION_TOKEN_FILEPATH_KEY, null),

/**
* (int, optional) Whenever listFiles with recursive being true is invoked on a dir, a listing API is called
* on OSS that gets all the files in a flat manner. This property determines what is the max number of files
* to be listed in one call to OSS Service (essentially the page size of the OSS listing API).
* See {@link BmcConstants#RECURSIVE_DIR_LISTING_FETCH_SIZE_KEY} for config key name.
*/
RECURSIVE_DIR_LISTING_FETCH_SIZE(RECURSIVE_DIR_LISTING_FETCH_SIZE_KEY, 1000)
;

@Getter private final String propertyName;
Expand Down
Loading

0 comments on commit e2e0574

Please sign in to comment.