Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hdds 11784 #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.PrefixManager;
import org.apache.hadoop.ozone.om.ResolvedBucket;
import org.apache.hadoop.ozone.om.OMConfigKeys;
Expand All @@ -67,6 +69,7 @@
import org.apache.hadoop.ozone.om.lock.OzoneLockStrategy;
import org.apache.hadoop.ozone.om.request.OMClientRequestUtils;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
Expand Down Expand Up @@ -1373,4 +1376,116 @@ protected void validateEncryptionKeyInfo(OmBucketInfo bucketInfo, KeyArgs keyArg
keyArgs.getKeyName() + " in encrypted bucket " + keyArgs.getBucketName(), INVALID_REQUEST);
}
}

protected void addMissingParentsToCache(OmBucketInfo omBucketInfo,
List<OmDirectoryInfo> missingParentInfos,
OMMetadataManager omMetadataManager,
long volumeId,
long bucketId,
long transactionLogIndex) throws IOException {

// validate and update namespace for missing parent directory.
checkBucketQuotaInNamespace(omBucketInfo, missingParentInfos.size());
omBucketInfo.incrUsedNamespace(missingParentInfos.size());

// Add cache entries for the missing parent directories.
OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
volumeId, bucketId, transactionLogIndex,
missingParentInfos, null);
}

protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey,
String keyName,
OMMetadataManager omMetadataManager) throws IOException {
return omMetadataManager.getOpenKeyTable(getBucketLayout())
.get(dbMultipartKey);
}

protected void addMultiPartToCache(
OMMetadataManager omMetadataManager, String multipartOpenKey,
OMFileRequest.OMPathInfoWithFSO pathInfoFSO, OmKeyInfo omKeyInfo,
String keyName, long transactionLogIndex
) {

// Add multi part to cache
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
multipartOpenKey, omKeyInfo, pathInfoFSO.getLeafNodeName(),
keyName, transactionLogIndex);

}

protected boolean addMissingDirectoriesToCacheEnabled() {
return false;
}

protected List<OmDirectoryInfo> addOrGetMissingDirectories(OzoneManager ozoneManager,
OzoneManagerProtocolProtos.KeyArgs keyArgs,
long trxnLogIndex) throws
IOException {
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
final String volumeName = keyArgs.getVolumeName();
final String bucketName = keyArgs.getBucketName();
final String keyName = keyArgs.getKeyName();
OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);
OMFileRequest.OMPathInfoWithFSO pathInfoFSO = OMFileRequest
.verifyDirectoryKeysInPath(omMetadataManager, volumeName, bucketName,
keyName, Paths.get(keyName));
List<OmDirectoryInfo> missingParentInfos =
getAllMissingParentDirInfo(ozoneManager, keyArgs, omBucketInfo,
pathInfoFSO, trxnLogIndex);
if (!addMissingDirectoriesToCacheEnabled()) {
return missingParentInfos;
}

if (missingParentInfos != null && !missingParentInfos.isEmpty()) {
final long volumeId = omMetadataManager.getVolumeId(volumeName);
final long bucketId = omMetadataManager.getBucketId(volumeName,
bucketName);

// add all missing parents to directory table
addMissingParentsToCache(omBucketInfo, missingParentInfos,
omMetadataManager, volumeId, bucketId, trxnLogIndex);

String multipartOpenKey = omMetadataManager
.getMultipartKey(volumeId, bucketId,
pathInfoFSO.getLastKnownParentId(),
pathInfoFSO.getLeafNodeName(),
keyArgs.getMultipartUploadID());

if (getOmKeyInfoFromOpenKeyTable(multipartOpenKey,
keyName, omMetadataManager) == null) {

final ReplicationConfig replicationConfig = OzoneConfigUtil
.resolveReplicationConfigPreference(keyArgs.getType(),
keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
omBucketInfo != null ?
omBucketInfo.getDefaultReplicationConfig() :
null, ozoneManager);

OmKeyInfo keyInfoFromArgs = new OmKeyInfo.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setCreationTime(keyArgs.getModificationTime())
.setModificationTime(keyArgs.getModificationTime())
.setReplicationConfig(replicationConfig)
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, new ArrayList<>(), true)))
.setAcls(getAclsForKey(keyArgs, omBucketInfo, pathInfoFSO,
ozoneManager.getPrefixManager(), ozoneManager.getConfiguration()))
.setObjectID(pathInfoFSO.getLeafNodeObjectId())
.setUpdateID(trxnLogIndex)
.setFileEncryptionInfo(keyArgs.hasFileEncryptionInfo() ?
OMPBHelper.convert(keyArgs.getFileEncryptionInfo()) : null)
.setParentObjectID(pathInfoFSO.getLastKnownParentId())
.build();

// Add missing multi part info to open key table
addMultiPartToCache(omMetadataManager, multipartOpenKey,
pathInfoFSO, keyInfoFromArgs, keyName, trxnLogIndex);
}
}
return missingParentInfos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.audit.OMAction;
Expand Down Expand Up @@ -49,6 +53,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -190,6 +195,30 @@ private void processResults(OMMetrics omMetrics,

}

private KeyArgs buildKeyArgs(final OmMultipartUpload multipartUpload) throws OMException {
final long now = Instant.now().toEpochMilli();
final ReplicationConfig replicationConfig =
multipartUpload.getReplicationConfig();
KeyArgs.Builder builder =
KeyArgs.newBuilder()
.setVolumeName(multipartUpload.getVolumeName())
.setBucketName(multipartUpload.getBucketName())
.setKeyName(multipartUpload.getKeyName())
.setType(replicationConfig.getReplicationType())
.setModificationTime(now);

if (replicationConfig instanceof ECReplicationConfig) {
builder.setEcReplicationConfig(((ECReplicationConfig) replicationConfig).toProto());
} else if (replicationConfig instanceof RatisReplicationConfig) {
builder.setFactor(((RatisReplicationConfig) replicationConfig).getReplicationFactor());
} else if (replicationConfig instanceof StandaloneReplicationConfig) {
builder.setFactor(((StandaloneReplicationConfig) replicationConfig).getReplicationFactor());
} else {
throw new OMException(OMException.ResultCodes.INVALID_REQUEST);
}
return builder.build();
}

private void updateTableCache(OzoneManager ozoneManager,
long trxnLogIndex, ExpiredMultipartUploadsBucket mpusPerBucket,
Map<OmBucketInfo, List<OmMultipartAbortInfo>> abortedMultipartUploads)
Expand Down Expand Up @@ -250,6 +279,7 @@ private void updateTableCache(OzoneManager ozoneManager,
try {
multipartUpload =
OmMultipartUpload.from(expiredMPUKeyName);
multipartUpload.setReplicationConfig(omMultipartKeyInfo.getReplicationConfig());
} catch (IllegalArgumentException e) {
LOG.warn("Aborting expired MPU failed: MPU key: " +
expiredMPUKeyName + " has invalid structure, " +
Expand All @@ -259,6 +289,8 @@ private void updateTableCache(OzoneManager ozoneManager,

String multipartOpenKey;
try {
KeyArgs keyArgs = buildKeyArgs(multipartUpload);
addOrGetMissingDirectories(ozoneManager, keyArgs, trxnLogIndex);
multipartOpenKey =
OMMultipartUploadUtils
.getMultipartOpenKey(multipartUpload.getVolumeName(),
Expand All @@ -271,7 +303,7 @@ private void updateTableCache(OzoneManager ozoneManager,
multipartUpload.getVolumeName() + ", bucket: " +
multipartUpload.getBucketName() + ", key: " +
multipartUpload.getKeyName() + ". Cannot parse the open key" +
"for this MPU, skipping this MPU.");
"for this MPU, skipping this MPU.", ome);
continue;
}

Expand Down Expand Up @@ -336,6 +368,10 @@ private void updateTableCache(OzoneManager ozoneManager,
.releaseWriteLock(BUCKET_LOCK, volumeName, bucketName));
}
}
}

@Override
protected boolean addMissingDirectoriesToCacheEnabled() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn

validateBucketAndVolume(omMetadataManager, volumeName, bucketName);

missingParentInfos = addOrGetMissingDirectories(ozoneManager, keyArgs, transactionLogIndex);

OMFileRequest.OMPathInfoWithFSO pathInfoFSO = OMFileRequest
.verifyDirectoryKeysInPath(omMetadataManager, volumeName, bucketName,
keyName, Paths.get(keyName));
Expand All @@ -119,10 +121,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
final OmBucketInfo bucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);

// add all missing parents to dir table
missingParentInfos = getAllMissingParentDirInfo(ozoneManager, keyArgs, bucketInfo,
pathInfoFSO, transactionLogIndex);

// We are adding uploadId to key, because if multiple users try to
// perform multipart upload on the same key, each will try to upload, who
// ever finally commit the key, we see that key in ozone. Suppose if we
Expand Down Expand Up @@ -192,18 +190,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
.addAllMetadata(KeyValueUtil.getFromProtobuf(keyArgs.getMetadataList()))
.addAllTags(KeyValueUtil.getFromProtobuf(keyArgs.getTagsList()))
.build();

// validate and update namespace for missing parent directory
if (null != missingParentInfos) {
checkBucketQuotaInNamespace(bucketInfo, missingParentInfos.size());
bucketInfo.incrUsedNamespace(missingParentInfos.size());
}

// Add cache entries for the prefix directories.
// Skip adding for the file key itself, until Key Commit.
OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
volumeId, bucketId, transactionLogIndex,
missingParentInfos, null);

OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
multipartOpenKey, omKeyInfo, pathInfoFSO.getLeafNodeName(), keyName,
Expand All @@ -213,6 +199,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
omMetadataManager.getMultipartInfoTable().addCacheEntry(
multipartKey, multipartKeyInfo, transactionLogIndex);

if (bucketInfo == null) {
throw new IOException("bucketInfo is null");
}
omClientResponse =
new S3InitiateMultipartUploadResponseWithFSO(
omResponse.setInitiateMultiPartUploadResponse(
Expand Down Expand Up @@ -246,6 +235,11 @@ missingParentInfos, getBucketLayout(), volumeId, bucketId,
return omClientResponse;
}

@Override
protected boolean addMissingDirectoriesToCacheEnabled() {
return true;
}

/**
* Verify om directory result.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
Expand All @@ -47,8 +49,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.s3.multipart
.S3MultipartUploadAbortResponse;
import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadAbortResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadAbortRequest;
Expand All @@ -57,8 +58,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;

import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;

Expand Down Expand Up @@ -140,6 +139,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn

String multipartOpenKey;
try {
addOrGetMissingDirectories(ozoneManager, keyArgs, trxnLogIndex);
multipartOpenKey =
getMultipartOpenKey(keyArgs.getMultipartUploadID(), volumeName,
bucketName, keyName, omMetadataManager);
Expand All @@ -157,13 +157,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
// If there is no entry in openKeyTable, then there is no multipart
// upload initiated for this key.
if (omKeyInfo == null) {
throw new OMException("Abort Multipart Upload Failed: volume: " +
throw new OMException("Abort Multipart Upload Failed (omKeyInfo not found): volume: " +
requestedVolume + "bucket: " + requestedBucket + "key: " + keyName,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}

multipartKeyInfo = omMetadataManager.getMultipartInfoTable()
.get(multipartKey);
if (multipartKeyInfo == null) {
throw new OMException("Abort Multipart Upload Failed (multipartKeyInfo not found): volume: " +
requestedVolume + "bucket: " + requestedBucket + "key: " + keyName,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}
multipartKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());

// When abort uploaded key, we need to subtract the PartKey length from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ protected OMClientResponse getOmClientResponse(OzoneManager ozoneManager,
omBucketInfo.copyObject(), getBucketLayout());
return omClientResp;
}

@Override
protected boolean addMissingDirectoriesToCacheEnabled() {
return true;
}
}
Loading
Loading