Skip to content

Commit

Permalink
PROTOTYPE: Implement S3 putIfAbsent in OSS LogStore without the need …
Browse files Browse the repository at this point in the history
…to upgrade S3/Hadoop libs

Co-authored-by: Prakhar Jain <prakharjain09@gmail.com>
  • Loading branch information
vkorukanti and prakharjain09 committed Sep 17, 2024
1 parent fbf0f9b commit 7314edf
Showing 1 changed file with 386 additions and 0 deletions.
386 changes: 386 additions & 0 deletions storage/src/main/java/io/delta/storage/S3LogStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,386 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.storage;

import java.io.*;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3URI;
import com.amazonaws.services.s3.model.*;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.io.CountingOutputStream;
import io.delta.storage.internal.FileNameUtils;
import io.delta.storage.internal.PathLock;
import io.delta.storage.internal.S3LogStoreUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem;

/**
* Single Spark-driver/JVM LogStore implementation for S3.
* <p>
* We assume the following from S3's {@link FileSystem} implementations:
* <ul>
* <li>File writing on S3 is all-or-nothing, whether overwrite or not.</li>
* <li>List-after-write can be inconsistent.</li>
* </ul>
* <p>
* Regarding file creation, this implementation:
* <ul>
* <li>Opens a stream to write to S3 (regardless of the overwrite option).</li>
* <li>Failures during stream write may leak resources, but may never result in partial
* writes.</li>
* </ul>
* <p>
* Regarding directory listing, this implementation:
* <ul>
* <li>returns a list by merging the files listed from S3 and recently-written files from the
* cache.</li>
* </ul>
*/
public class S3LogStore extends HadoopFileSystemLogStore {

/**
* Enables a faster implementation of listFrom by setting the startAfter parameter in S3 list
* requests. The feature is enabled by setting the property delta.enableFastS3AListFrom in the
* Hadoop configuration.
*
* This feature requires the Hadoop file system used for S3 paths to be castable to
* org.apache.hadoop.fs.s3a.S3AFileSystem.
*/
private final boolean enableFastListFrom =
initHadoopConf().getBoolean("delta.enableFastS3AListFrom", false);

///////////////////////////
// Static Helper Methods //
///////////////////////////

/* TODO: do we need this cache? */
/**
* A global cache that records the metadata of the files recently written.
* As list-after-write may be inconsistent on S3, we can use the files in the cache
* to fix the inconsistent file listing.
*/
private static final Cache<Path, FileMetadata> writtenPathCache =
CacheBuilder.newBuilder()
.expireAfterAccess(120, TimeUnit.MINUTES)
.build();

/////////////////////////////////////////////
// Constructor and Instance Helper Methods //
/////////////////////////////////////////////

public S3LogStore(Configuration hadoopConf) {
super(hadoopConf);
}

/**
* Check if the path is an initial version of a Delta log.
*/
private boolean isInitialVersion(Path path) {
return FileNameUtils.isDeltaFile(path) && FileNameUtils.deltaVersion(path) == 0L;
}

private Path resolvePath(FileSystem fs, Path path) {
return stripUserInfo(fs.makeQualified(path));
}

private Path stripUserInfo(Path path) {
final URI uri = path.toUri();

try {
final URI newUri = new URI(
uri.getScheme(),
null, // userInfo
uri.getHost(),
uri.getPort(),
uri.getPath(),
uri.getQuery(),
uri.getFragment()
);

return new Path(newUri);
} catch (URISyntaxException e) {
// Propagating this URISyntaxException to callers would mean we would have to either
// include it in the public LogStore.java interface or wrap it in an
// IllegalArgumentException somewhere else. Instead, catch and wrap it here.
throw new IllegalArgumentException(e);
}
}

/**
* Merge two lists of {@link FileStatus} into a single list ordered by file path name.
* In case both lists have {@link FileStatus}'s for the same file path, keep the one from
* `listWithPrecedence` and discard the other from `list`.
*/
private Iterator<FileStatus> mergeFileLists(
List<FileStatus> list,
List<FileStatus> listWithPrecedence) {
final Map<Path, FileStatus> fileStatusMap = new HashMap<>();

// insert all elements from `listWithPrecedence` (highest priority)
// and then insert elements from `list` if and only if that key doesn't already exist
Stream.concat(listWithPrecedence.stream(), list.stream())
.forEach(fs -> fileStatusMap.putIfAbsent(fs.getPath(), fs));

return fileStatusMap
.values()
.stream()
.sorted(Comparator.comparing(a -> a.getPath().getName()))
.iterator();
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory.
*/
private List<FileStatus> listFromCache(
FileSystem fs,
Path resolvedPath) {
final Path pathKey = stripUserInfo(resolvedPath);

return writtenPathCache
.asMap()
.entrySet()
.stream()
.filter(e -> {
final Path path = e.getKey();
return path.getParent().equals(pathKey.getParent()) &&
path.getName().compareTo(pathKey.getName()) >= 0;
}).map(e -> {
final Path path = e.getKey();
final FileMetadata fileMetadata = e.getValue();
return new FileStatus(
fileMetadata.length,
false, // isDir
1, // block_replication
fs.getDefaultBlockSize(path),
fileMetadata.modificationTime,
path);
}).collect(Collectors.toList());
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory, which merges
* the file system list and the cache list when `useCache` is on, otherwise
* use file system list only.
*/
private Iterator<FileStatus> listFromInternal(
FileSystem fs,
Path resolvedPath,
boolean useCache) throws IOException {
final Path parentPath = resolvedPath.getParent();
if (!fs.exists(parentPath)) {
throw new FileNotFoundException(
String.format("No such file or directory: %s", parentPath)
);
}

FileStatus[] statuses;
if (
// LocalFileSystem and RawLocalFileSystem checks are needed for tests to pass
fs instanceof LocalFileSystem || fs instanceof RawLocalFileSystem || !enableFastListFrom
) {
statuses = fs.listStatus(parentPath);
} else {
statuses = S3LogStoreUtil.s3ListFromArray(fs, resolvedPath, parentPath);
}

final List<FileStatus> listedFromFs = Arrays
.stream(statuses)
.filter(s -> s.getPath().getName().compareTo(resolvedPath.getName()) >= 0)
.collect(Collectors.toList());

final List<FileStatus> listedFromCache = useCache ?
listFromCache(fs, resolvedPath) : Collections.emptyList();

// File statuses listed from file system take precedence
return mergeFileLists(listedFromCache, listedFromFs);
}

/**
* Check if a path exists. Normally we check both the file system and the cache, but when the
* path is the first version of a Delta log, we ignore the cache.
*/
private boolean exists(
FileSystem fs,
Path resolvedPath) throws IOException {
final boolean useCache = !isInitialVersion(resolvedPath);
final Iterator<FileStatus> iter = listFromInternal(fs, resolvedPath, useCache);
if (!iter.hasNext()) return false;

return iter.next().getPath().getName().equals(resolvedPath.getName());
}

////////////////////////
// Public API Methods //
////////////////////////

@Override
public void write(
Path path,
Iterator<String> actions,
Boolean overwrite,
Configuration hadoopConf) throws IOException {
final FileSystem fs = path.getFileSystem(hadoopConf);
final Path resolvedPath = resolvePath(fs, path);
try {
if (fs instanceof S3AFileSystem) {
S3AFileSystem s3AFileSystem = (S3AFileSystem) fs;
try {

// Get the Class object for the class
Class<?> clazz = s3AFileSystem.getClass();

// Get the private method by name
Method privateMethod = clazz.getDeclaredMethod("getAmazonS3Client");

// Make the private method accessible
privateMethod.setAccessible(true);

// Invoke the private method
AmazonS3 s3Client = (AmazonS3) privateMethod.invoke(s3AFileSystem);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentType("text/plain");
if (!overwrite) {
metadata.setHeader("If-None-Match", "*");
}
// metadata.setContentLength(actions);

// Upload the string as an S3 object
String key = s3AFileSystem.pathToKey(resolvedPath);
StringIteratorInputStream inputStream = new StringIteratorInputStream(actions);
PutObjectResult result = s3Client
.putObject(
s3AFileSystem.getBucket(),
key,
inputStream,
metadata);
System.out.println("Using putIfAbsent");
return;
} catch(AmazonS3Exception s3Exception) {
if (exists(fs, resolvedPath) && !overwrite) {
throw new java.nio.file.FileAlreadyExistsException(
resolvedPath.toUri().toString()
);
} else {
throw new IOException(s3Exception);
}
} catch (Exception e) {
throw new IOException(e);
}
}

final CountingOutputStream stream =
new CountingOutputStream(fs.create(resolvedPath, overwrite));

while (actions.hasNext()) {
stream.write((actions.next() + "\n").getBytes(StandardCharsets.UTF_8));
}
stream.close();

// When a Delta log starts afresh, all cached files in that Delta log become
// obsolete, so we remove them from the cache.
if (isInitialVersion(resolvedPath)) {
final List<Path> obsoleteFiles = writtenPathCache
.asMap()
.keySet()
.stream()
.filter(p -> p.getParent().equals(resolvedPath.getParent()))
.collect(Collectors.toList());

writtenPathCache.invalidateAll(obsoleteFiles);
}

// Cache the information of written files to help fix the inconsistency in future
// listings
writtenPathCache.put(
resolvedPath,
new FileMetadata(stream.getCount(), System.currentTimeMillis())
);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException e) {
// Convert Hadoop's FileAlreadyExistsException to Java's FileAlreadyExistsException
throw new java.nio.file.FileAlreadyExistsException(e.getMessage());
}
}

@Override
public Iterator<FileStatus> listFrom(Path path, Configuration hadoopConf) throws IOException {
final FileSystem fs = path.getFileSystem(hadoopConf);
final Path resolvedPath = resolvePath(fs, path);
return listFromInternal(fs, resolvedPath, true); // useCache=true
}

@Override
public Boolean isPartialWriteVisible(Path path, Configuration hadoopConf) {
return false;
}

//////////////////
// Helper Class //
//////////////////

/**
* The file metadata to be stored in the cache.
*/
private class FileMetadata {
private long length;
private long modificationTime;

public FileMetadata(long length, long modificationTime) {
this.length = length;
this.modificationTime = modificationTime;
}
}

public class StringIteratorInputStream extends InputStream {
private final Iterator<String> iterator;
private byte[] currentStringBytes;
private int currentIndex;

public StringIteratorInputStream(Iterator<String> iterator) {
this.iterator = iterator;
this.currentStringBytes = new byte[0];
this.currentIndex = 0;
}

@Override
public int read() throws IOException {
if (currentIndex >= currentStringBytes.length) {
if (!iterator.hasNext()) {
return -1; // End of stream
}
String nextString = iterator.next() + "\n"; // Add newline between strings
currentStringBytes = nextString.getBytes(StandardCharsets.UTF_8);
currentIndex = 0;
}
return currentStringBytes[currentIndex++] & 0xFF;
}
}
}

0 comments on commit 7314edf

Please sign in to comment.