Skip to content

Commit

Permalink
HDDS-11784 adding missing parent directories for MPU abort and
Browse files Browse the repository at this point in the history
expired abort request
  • Loading branch information
zhiheng xie committed Dec 13, 2024
1 parent e8f3b25 commit 437f595
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.PrefixManager;
import org.apache.hadoop.ozone.om.ResolvedBucket;
import org.apache.hadoop.ozone.om.OMConfigKeys;
Expand All @@ -67,6 +69,7 @@
import org.apache.hadoop.ozone.om.lock.OzoneLockStrategy;
import org.apache.hadoop.ozone.om.request.OMClientRequestUtils;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
Expand Down Expand Up @@ -1373,4 +1376,109 @@ protected void validateEncryptionKeyInfo(OmBucketInfo bucketInfo, KeyArgs keyArg
keyArgs.getKeyName() + " in encrypted bucket " + keyArgs.getBucketName(), INVALID_REQUEST);
}
}

protected void addMissingParentsToCache(OmBucketInfo omBucketInfo,
List<OmDirectoryInfo> missingParentInfos,
OMMetadataManager omMetadataManager,
long volumeId,
long bucketId,
long transactionLogIndex) throws IOException {

// validate and update namespace for missing parent directory.
checkBucketQuotaInNamespace(omBucketInfo, missingParentInfos.size());
omBucketInfo.incrUsedNamespace(missingParentInfos.size());

// Add cache entries for the missing parent directories.
OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
volumeId, bucketId, transactionLogIndex,
missingParentInfos, null);
}

protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey,
String keyName,
OMMetadataManager omMetadataManager) throws IOException {
return omMetadataManager.getOpenKeyTable(getBucketLayout())
.get(dbMultipartKey);
}

protected void addMultiPartToCache(
OMMetadataManager omMetadataManager, String multipartOpenKey,
OMFileRequest.OMPathInfoWithFSO pathInfoFSO, OmKeyInfo omKeyInfo,
String keyName, long transactionLogIndex
) {

// Add multi part to cache
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
multipartOpenKey, omKeyInfo, pathInfoFSO.getLeafNodeName(),
keyName, transactionLogIndex);

}

protected List<OmDirectoryInfo> addMissingDirectories(OzoneManager ozoneManager,
OzoneManagerProtocolProtos.KeyArgs keyArgs,
long trxnLogIndex) throws
IOException {
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
final String volumeName = keyArgs.getVolumeName();
final String bucketName = keyArgs.getBucketName();
final String keyName = keyArgs.getKeyName();
OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);
List<OmDirectoryInfo> missingParentInfos;
OMFileRequest.OMPathInfoWithFSO pathInfoFSO = OMFileRequest
.verifyDirectoryKeysInPath(omMetadataManager, volumeName, bucketName,
keyName, Paths.get(keyName));
missingParentInfos = getAllMissingParentDirInfo(ozoneManager, keyArgs, omBucketInfo,
pathInfoFSO, trxnLogIndex);

if (missingParentInfos != null && !missingParentInfos.isEmpty()) {
final long volumeId = omMetadataManager.getVolumeId(volumeName);
final long bucketId = omMetadataManager.getBucketId(volumeName,
bucketName);

// add all missing parents to directory table
addMissingParentsToCache(omBucketInfo, missingParentInfos,
omMetadataManager, volumeId, bucketId, trxnLogIndex);

String multipartOpenKey = omMetadataManager
.getMultipartKey(volumeId, bucketId,
pathInfoFSO.getLastKnownParentId(),
pathInfoFSO.getLeafNodeName(),
keyArgs.getMultipartUploadID());

if (getOmKeyInfoFromOpenKeyTable(multipartOpenKey,
keyName, omMetadataManager) == null) {

final ReplicationConfig replicationConfig = OzoneConfigUtil
.resolveReplicationConfigPreference(keyArgs.getType(),
keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
omBucketInfo != null ?
omBucketInfo.getDefaultReplicationConfig() :
null, ozoneManager);

OmKeyInfo keyInfoFromArgs = new OmKeyInfo.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setCreationTime(keyArgs.getModificationTime())
.setModificationTime(keyArgs.getModificationTime())
.setReplicationConfig(replicationConfig)
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, new ArrayList<>(), true)))
.setAcls(getAclsForKey(keyArgs, omBucketInfo, pathInfoFSO,
ozoneManager.getPrefixManager(), ozoneManager.getConfiguration()))
.setObjectID(pathInfoFSO.getLeafNodeObjectId())
.setUpdateID(trxnLogIndex)
.setFileEncryptionInfo(keyArgs.hasFileEncryptionInfo() ?
OMPBHelper.convert(keyArgs.getFileEncryptionInfo()) : null)
.setParentObjectID(pathInfoFSO.getLastKnownParentId())
.build();

// Add missing multi part info to open key table
addMultiPartToCache(omMetadataManager, multipartOpenKey,
pathInfoFSO, keyInfoFromArgs, keyName, trxnLogIndex);
}
}
return missingParentInfos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.audit.OMAction;
Expand Down Expand Up @@ -49,6 +53,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -190,6 +195,30 @@ private void processResults(OMMetrics omMetrics,

}

private KeyArgs buildKeyArgs(final OmMultipartUpload multipartUpload) throws OMException {
final long now = Instant.now().toEpochMilli();
final ReplicationConfig replicationConfig =
multipartUpload.getReplicationConfig();
KeyArgs.Builder builder =
KeyArgs.newBuilder()
.setVolumeName(multipartUpload.getVolumeName())
.setBucketName(multipartUpload.getBucketName())
.setKeyName(multipartUpload.getKeyName())
.setType(replicationConfig.getReplicationType())
.setModificationTime(now);

if (replicationConfig instanceof ECReplicationConfig) {
builder.setEcReplicationConfig(((ECReplicationConfig) replicationConfig).toProto());
} else if (replicationConfig instanceof RatisReplicationConfig) {
builder.setFactor(((RatisReplicationConfig) replicationConfig).getReplicationFactor());
} else if (replicationConfig instanceof StandaloneReplicationConfig) {
builder.setFactor(((StandaloneReplicationConfig) replicationConfig).getReplicationFactor());
} else {
throw new OMException(OMException.ResultCodes.INVALID_REQUEST);
}
return builder.build();
}

private void updateTableCache(OzoneManager ozoneManager,
long trxnLogIndex, ExpiredMultipartUploadsBucket mpusPerBucket,
Map<OmBucketInfo, List<OmMultipartAbortInfo>> abortedMultipartUploads)
Expand Down Expand Up @@ -250,6 +279,7 @@ private void updateTableCache(OzoneManager ozoneManager,
try {
multipartUpload =
OmMultipartUpload.from(expiredMPUKeyName);
multipartUpload.setReplicationConfig(omMultipartKeyInfo.getReplicationConfig());
} catch (IllegalArgumentException e) {
LOG.warn("Aborting expired MPU failed: MPU key: " +
expiredMPUKeyName + " has invalid structure, " +
Expand All @@ -259,6 +289,8 @@ private void updateTableCache(OzoneManager ozoneManager,

String multipartOpenKey;
try {
KeyArgs keyArgs = buildKeyArgs(multipartUpload);
addMissingDirectories(ozoneManager, keyArgs, trxnLogIndex);
multipartOpenKey =
OMMultipartUploadUtils
.getMultipartOpenKey(multipartUpload.getVolumeName(),
Expand All @@ -271,7 +303,7 @@ private void updateTableCache(OzoneManager ozoneManager,
multipartUpload.getVolumeName() + ", bucket: " +
multipartUpload.getBucketName() + ", key: " +
multipartUpload.getKeyName() + ". Cannot parse the open key" +
"for this MPU, skipping this MPU.");
"for this MPU, skipping this MPU.", ome);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn

validateBucketAndVolume(omMetadataManager, volumeName, bucketName);

missingParentInfos = addMissingDirectories(ozoneManager, keyArgs, transactionLogIndex);

OMFileRequest.OMPathInfoWithFSO pathInfoFSO = OMFileRequest
.verifyDirectoryKeysInPath(omMetadataManager, volumeName, bucketName,
keyName, Paths.get(keyName));
Expand All @@ -119,10 +121,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
final OmBucketInfo bucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);

// add all missing parents to dir table
missingParentInfos = getAllMissingParentDirInfo(ozoneManager, keyArgs, bucketInfo,
pathInfoFSO, transactionLogIndex);

// We are adding uploadId to key, because if multiple users try to
// perform multipart upload on the same key, each will try to upload, who
// ever finally commit the key, we see that key in ozone. Suppose if we
Expand Down Expand Up @@ -192,18 +190,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
.addAllMetadata(KeyValueUtil.getFromProtobuf(keyArgs.getMetadataList()))
.addAllTags(KeyValueUtil.getFromProtobuf(keyArgs.getTagsList()))
.build();

// validate and update namespace for missing parent directory
if (null != missingParentInfos) {
checkBucketQuotaInNamespace(bucketInfo, missingParentInfos.size());
bucketInfo.incrUsedNamespace(missingParentInfos.size());
}

// Add cache entries for the prefix directories.
// Skip adding for the file key itself, until Key Commit.
OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
volumeId, bucketId, transactionLogIndex,
missingParentInfos, null);

OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
multipartOpenKey, omKeyInfo, pathInfoFSO.getLeafNodeName(), keyName,
Expand All @@ -213,6 +199,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
omMetadataManager.getMultipartInfoTable().addCacheEntry(
multipartKey, multipartKeyInfo, transactionLogIndex);

if (bucketInfo == null) {
throw new IOException("bucketInfo is null");
}
omClientResponse =
new S3InitiateMultipartUploadResponseWithFSO(
omResponse.setInitiateMultiPartUploadResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
Expand All @@ -47,8 +49,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.s3.multipart
.S3MultipartUploadAbortResponse;
import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadAbortResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadAbortRequest;
Expand All @@ -57,8 +58,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;

import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;

Expand Down Expand Up @@ -140,6 +139,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn

String multipartOpenKey;
try {
addMissingDirectories(ozoneManager, keyArgs, trxnLogIndex);
multipartOpenKey =
getMultipartOpenKey(keyArgs.getMultipartUploadID(), volumeName,
bucketName, keyName, omMetadataManager);
Expand All @@ -157,13 +157,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
// If there is no entry in openKeyTable, then there is no multipart
// upload initiated for this key.
if (omKeyInfo == null) {
throw new OMException("Abort Multipart Upload Failed: volume: " +
throw new OMException("Abort Multipart Upload Failed (omKeyInfo not found): volume: " +
requestedVolume + "bucket: " + requestedBucket + "key: " + keyName,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}

multipartKeyInfo = omMetadataManager.getMultipartInfoTable()
.get(multipartKey);
if (multipartKeyInfo == null) {
throw new OMException("Abort Multipart Upload Failed (multipartKeyInfo not found): volume: " +
requestedVolume + "bucket: " + requestedBucket + "key: " + keyName,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}
multipartKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());

// When abort uploaded key, we need to subtract the PartKey length from
Expand Down
Loading

0 comments on commit 437f595

Please sign in to comment.