Skip to content

Commit

Permalink
Releasing version 3.3.4.1.2.0
Browse files Browse the repository at this point in the history
Releasing version 3.3.4.1.2.0
  • Loading branch information
yanhaizhongyu authored Jun 22, 2023
2 parents e2e0574 + 78a5f7a commit 958b852
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 55 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/).

## 3.3.4.1.2.0 - 2023-06-22
### Added
- Added support for namespace-prefixed domains in the Object Storage service

### Changed
- Updated OCI Java SDK version to `3.17.1`
- Updated `guava` version from `30.1-jre` to `32.0.1-jre`
- Replaced LinkedBlockingQueue with SynchronousQueue to hand off tasks to the executor
- Added relocation for shaded package `javax.servlet`

### Fixed
- Fixed the issue that caused object loss when performing a renaming operation, in case the target already existed

## 3.3.4.1.1.0 - 2023-05-09
### Added
- Added support for parallel ranged GET requests in read-ahead mode
Expand Down
6 changes: 3 additions & 3 deletions hdfs-addons/hdfs-smartparquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<parent>
<artifactId>oci-hdfs-addons</artifactId>
<groupId>com.oracle.oci.sdk</groupId>
<version>3.3.4.1.1.0</version>
<version>3.3.4.1.2.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -149,7 +149,7 @@
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-hdfs-connector</artifactId>
<version>3.3.4.1.1.0</version>
<version>3.3.4.1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down Expand Up @@ -251,7 +251,7 @@
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-hdfs-connector</artifactId>
<version>3.3.4.1.1.0</version>
<version>3.3.4.1.2.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion hdfs-addons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<parent>
<artifactId>oci-hdfs</artifactId>
<groupId>com.oracle.oci.sdk</groupId>
<version>3.3.4.1.1.0</version>
<version>3.3.4.1.2.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
4 changes: 2 additions & 2 deletions hdfs-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<parent>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-hdfs</artifactId>
<version>3.3.4.1.1.0</version>
<version>3.3.4.1.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -550,7 +550,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
<version>32.0.1-jre</version>
</dependency>
<!--
Deal with CVE-2021-28165:
Expand Down
47 changes: 27 additions & 20 deletions hdfs-connector/src/main/java/com/oracle/bmc/hdfs/BmcFilesystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,8 @@ public boolean rename(final Path source, final Path destination) throws IOExcept
}
}

final FileStatus destinationStatus = this.getNullableFileStatus(absoluteDestination);
final Path destinationPathToUse;
FileStatus destinationStatus = this.getNullableFileStatus(absoluteDestination);
Path destinationPathToUse = absoluteDestination;
if (destinationStatus == null) {
final FileStatus destinationParentStatus =
this.getNullableFileStatus(absoluteDestination.getParent());
Expand All @@ -779,17 +779,11 @@ public boolean rename(final Path source, final Path destination) throws IOExcept
LOG.debug("Destination parent directory does not exist, or is a file");
return false;
}

// destination at this point must be a filename, so this is a move + rename operation
destinationPathToUse = absoluteDestination;
} else if (destinationStatus.isFile()) {
// spec says to throw FileAlreadyExistsException or IOException, but most cloud providers
// return false instead, staying consistent here too
LOG.debug("Destination exists and is a file");
return false;
} else {
// destination is a directory, copy file name of source
} else if (destinationStatus.isDirectory()) {
// destination is a directory, need down one level.
// copy file/dir name of source, we have to check if the source name does exist in this directory
destinationPathToUse = new Path(absoluteDestination, absoluteSource.getName());
destinationStatus = this.getNullableFileStatus(destinationPathToUse);
}

// test again now that it's resolved
Expand All @@ -813,14 +807,27 @@ public boolean rename(final Path source, final Path destination) throws IOExcept
return false;
}

if (sourceStatus.isFile()) {
// file rename
LOG.debug("Renaming file {} to {}", absoluteSource, destinationPathToUse);
this.dataStore.renameFile(absoluteSource, destinationPathToUse);
} else {
// directory rename
LOG.debug("Renaming directory {} to {}", absoluteSource, destinationPathToUse);
this.dataStore.renameDirectory(absoluteSource, destinationPathToUse);

// destination should not exist, no matter it is a file or directory
if (destinationStatus != null ) {
// spec says to throw FileAlreadyExistsException or IOException, but most cloud providers
// return false instead, staying consistent here too
LOG.debug("Destination {} {} already exists", (destinationStatus.isFile() ? "file" : "directory"), destinationPathToUse.toString() );
return false;
}

try {
if (sourceStatus.isFile()) {
// file rename
LOG.debug("Renaming file {} to {}", absoluteSource, destinationPathToUse);
this.dataStore.renameFile(absoluteSource, destinationPathToUse);
} else {
// directory rename
LOG.debug("Renaming directory {} to {}", absoluteSource, destinationPathToUse);
this.dataStore.renameDirectory(absoluteSource, destinationPathToUse);
}
} catch (final FileAlreadyExistsException e) {
return false;
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Path;
Expand All @@ -83,6 +85,8 @@
*/
@Slf4j
public class BmcDataStore {
private static final int ERROR_CODE_FILE_EXISTS = 412;

private static final int MiB = 1024 * 1024;

// http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html#Data_Replication
Expand Down Expand Up @@ -258,7 +262,7 @@ private ExecutorService createParallelMd5Executor(BmcPropertyAccessor propertyAc
numThreadsForParallelMd5Operation,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(numThreadsForParallelMd5Operation),
new SynchronousQueue<>(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("bmcs-hdfs-multipart-md5-%d")
Expand Down Expand Up @@ -686,6 +690,18 @@ private void awaitRenameOperationTermination(List<RenameResponse> renameResponse
LOG.debug("Thread interrupted while waiting for rename completion", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
if (e.getCause() instanceof BmcException) {
BmcException bmcException = (BmcException) e.getCause();
if (bmcException.getStatusCode() == ERROR_CODE_FILE_EXISTS) {
LOG.debug(
"Failed to rename {} to {}",
renameResponse.getOldName(),
renameResponse.getNewName(),
e);
throw new FileAlreadyExistsException(
"Cannot rename file, destination file already exists : " + renameResponse.getNewName());
}
}
LOG.debug("Execution exception while waiting for rename completion", e);
} catch (Exception e) {
LOG.debug(
Expand All @@ -704,12 +720,19 @@ private void rename(final String sourceObject, final String destinationObject)
try {
final String newEntityTag =
new RenameOperation(
this.objectStorage,
this.requestBuilder.renameObject(
sourceObject, destinationObject))
this.objectStorage,
this.requestBuilder.renameObject(
sourceObject, destinationObject))
.call();
this.statistics.incrementWriteOps(1); // 1 put
LOG.debug("Newly renamed object has eTag {}", newEntityTag);
} catch (final BmcException e) {
LOG.debug("Failed to rename {} to {}", sourceObject, destinationObject, e);
if (e.getStatusCode() == ERROR_CODE_FILE_EXISTS) {
throw new FileAlreadyExistsException(
"Cannot rename file, destination file already exists : " + destinationObject);
}
throw new IOException("Unable to perform rename", e);
} catch (final Exception e) {
LOG.debug("Failed to rename {} to {}", sourceObject, destinationObject, e);
throw new IOException("Unable to perform rename", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class BmcDataStoreFactory {
private static final String OCI_PROPERTIES_FILE_NAME = "oci.properties";
private final Configuration configuration;
private final String OCI_DELEGATION_TOKEN_FILE = "OCI_DELEGATION_TOKEN_FILE";
private String namespaceName;

/**
* Creates a new {@link BmcDataStore} for the given namespace and bucket.
Expand All @@ -98,6 +99,7 @@ public class BmcDataStoreFactory {
*/
public BmcDataStore createDataStore(
final String namespace, final String bucket, final Statistics statistics) {
namespaceName = namespace;
this.setConnectorVersion();
// override matches the same order as the filesystem name, ie, "oci://bucket@namespace"
// so overriding property foobar is done by specifying foobar.bucket.namespace
Expand Down Expand Up @@ -134,30 +136,53 @@ private String getEndpoint(final BmcPropertyAccessor propertyAccessor) {
return propertyAccessor.asString().get(BmcProperties.HOST_NAME);
}

if (propertyAccessor.asBoolean().get(BmcProperties.REALM_SPECIFIC_ENDPOINT_TEMPLATES_ENABLED)) {
LOG.info("Getting realm-specific endpoint template as {} flag is enabled", BmcConstants.REALM_SPECIFIC_ENDPOINT_TEMPLATES_ENABLED_KEY);
String regionCodeOrId = propertyAccessor.asString().get(BmcProperties.REGION_CODE_OR_ID);
if (propertyAccessor
.asBoolean()
.get(BmcProperties.REALM_SPECIFIC_ENDPOINT_TEMPLATES_ENABLED)) {
LOG.info(
"Getting realm-specific endpoint template as {} flag is enabled",
BmcConstants.REALM_SPECIFIC_ENDPOINT_TEMPLATES_ENABLED_KEY);
String regionCodeOrId =
propertyAccessor.asString().get(BmcProperties.REGION_CODE_OR_ID);
if (regionCodeOrId != null) {
LOG.info("Region code or id set to {} using {}", regionCodeOrId, BmcConstants.REGION_CODE_OR_ID_KEY);
Region region =
Region.fromRegionCodeOrId(regionCodeOrId);
Map<String, String> realmSpecificEndpointTemplateMap = ObjectStorageClient.SERVICE.getServiceEndpointTemplateForRealmMap();
if (realmSpecificEndpointTemplateMap != null && !realmSpecificEndpointTemplateMap.isEmpty()) {
LOG.info(
"Region code or id set to {} using {}",
regionCodeOrId,
BmcConstants.REGION_CODE_OR_ID_KEY);
Region region = Region.fromRegionCodeOrId(regionCodeOrId);
Map<String, String> realmSpecificEndpointTemplateMap =
ObjectStorageClient.SERVICE.getServiceEndpointTemplateForRealmMap();
if (realmSpecificEndpointTemplateMap != null
&& !realmSpecificEndpointTemplateMap.isEmpty()) {
String realmId = region.getRealm().getRealmId();
String realmSpecificEndpointTemplate = realmSpecificEndpointTemplateMap.get(realmId.toLowerCase(Locale.ROOT));
String realmSpecificEndpointTemplate =
realmSpecificEndpointTemplateMap.get(realmId.toLowerCase(Locale.ROOT));
if (StringUtils.isNotBlank(realmSpecificEndpointTemplate)) {
LOG.info("Using realm-specific endpoint template {}", realmSpecificEndpointTemplate);
return realmSpecificEndpointTemplate.replace("{region}", region.getRegionId());
LOG.info(
"Using realm-specific endpoint template {}",
realmSpecificEndpointTemplate);
return realmSpecificEndpointTemplate
.replace("{region}", region.getRegionId())
.replace("{namespaceName+Dot}", namespaceName + ".");
} else {
LOG.info("{} property was enabled but realm-specific endpoint template is not defined for {} realm",
BmcConstants.REALM_SPECIFIC_ENDPOINT_TEMPLATES_ENABLED_KEY, realmId);
LOG.info(
"{} property was enabled but realm-specific endpoint template is not defined for {} realm",
BmcConstants.REALM_SPECIFIC_ENDPOINT_TEMPLATES_ENABLED_KEY,
realmId);
}
} else {
LOG.info("Not using realm-specific endpoint template, because no realm-specific endpoint template map was set, or the map was empty");
LOG.info(
"Not using realm-specific endpoint template, because no realm-specific endpoint template map was set, or the map was empty");
}
} else {
throw new IllegalArgumentException(
"Property `" + BmcConstants.REALM_SPECIFIC_ENDPOINT_TEMPLATES_ENABLED_KEY + "` was enabled without setting the property `" + BmcConstants.REGION_CODE_OR_ID_KEY + "`. Please set the region code or id using `" + BmcConstants.REGION_CODE_OR_ID_KEY + "` property to enable use of realm-specific endpoint template");
"Property `"
+ BmcConstants.REALM_SPECIFIC_ENDPOINT_TEMPLATES_ENABLED_KEY
+ "` was enabled without setting the property `"
+ BmcConstants.REGION_CODE_OR_ID_KEY
+ "`. Please set the region code or id using `"
+ BmcConstants.REGION_CODE_OR_ID_KEY
+ "` property to enable use of realm-specific endpoint template");
}
}

Expand All @@ -171,7 +196,10 @@ private String getEndpoint(final BmcPropertyAccessor propertyAccessor) {
return endpoint.get();
} else {
throw new IllegalArgumentException(
"Endpoint for " + ObjectStorageClient.SERVICE + " is not known in region " + region);
"Endpoint for "
+ ObjectStorageClient.SERVICE
+ " is not known in region "
+ region);
}
}

Expand All @@ -191,7 +219,8 @@ private String getEndpoint(final BmcPropertyAccessor propertyAccessor) {
},
METADATA_SERVICE_BASE_URL,
"region");
String endpoint = Region.formatDefaultRegionEndpoint(ObjectStorageClient.SERVICE, regionCode);
String endpoint =
Region.formatDefaultRegionEndpoint(ObjectStorageClient.SERVICE, regionCode);
LOG.info("Endpoint using Instance Metadata Service is {}", endpoint);
return endpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ private synchronized void initializeExecutorService() {
numThreadsForParallelUpload,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(numThreadsForParallelUpload),
new SynchronousQueue<>(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("bmcs-hdfs-multipart-upload-%d")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ RenameObjectRequest renameObject(final String sourceName, final String newName)
RenameObjectDetails.builder()
.sourceName(sourceName)
.newName(newName)
.newObjIfNoneMatchETag(getIfNoneMatchHeader(false))
.build())
.opcClientRequestId(createClientRequestId("renameObject"))
.build();
Expand Down Expand Up @@ -169,6 +170,10 @@ private static String createClientRequestId(final String operation) {
return uuid;
}

private static String getIfNoneMatchHeader(boolean allowOverwrite) {
return allowOverwrite ? null : "*";
}

@Slf4j
@RequiredArgsConstructor
private static final class HadoopProgressReporter implements ProgressReporter {
Expand Down
Loading

0 comments on commit 958b852

Please sign in to comment.