diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 0507a27de61..971c129e96d 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -69,7 +69,7 @@ private static Codec newCodec(boolean ignorePipeline) { } public static Codec getCodec(boolean ignorePipeline) { - LOG.info("OmKeyInfo.getCodec ignorePipeline = {}", ignorePipeline); + // LOG.info("OmKeyInfo.getCodec ignorePipeline = {}", ignorePipeline); return ignorePipeline ? CODEC_TRUE : CODEC_FALSE; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index cbe5205c10b..b1e850a0ba9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableRate; /** * This class is for maintaining Ozone Manager statistics. @@ -235,7 +236,16 @@ public class OMMetrics implements OmMetadataReaderMetrics { private @Metric MutableCounterLong ecKeyCreateFailsTotal; private @Metric MutableCounterLong ecBucketCreateTotal; private @Metric MutableCounterLong ecBucketCreateFailsTotal; - + @Metric(about = "request commit request get key") + private MutableRate keyCommitGetKeyRate; + public MutableRate getKeyCommitGetKeyRate() { + return keyCommitGetKeyRate; + } + @Metric(about = "request commit request get open key") + private MutableRate keyCommitGetOpenKeyRate; + public MutableRate getKeyCommitGetOpenKeyRate() { + return keyCommitGetOpenKeyRate; + } private final DBCheckpointMetrics dbCheckpointMetrics; public OMMetrics() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index e0a91059daf..39b15c4a1e2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -477,6 +477,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private final BucketUtilizationMetrics bucketUtilizationMetrics; private boolean fsSnapshotEnabled; + private final boolean isLeaderExecutorFlag; /** * OM Startup mode. @@ -582,6 +583,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) this.isStrictS3 = conf.getBoolean( OZONE_OM_NAMESPACE_STRICT_S3, OZONE_OM_NAMESPACE_STRICT_S3_DEFAULT); + this.isLeaderExecutorFlag = configuration.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE, + OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT); // TODO: This is a temporary check. Once fully implemented, all OM state // change should go through Ratis - be it standalone (for non-HA) or @@ -5053,7 +5056,6 @@ public void checkFeatureEnabled(OzoneManagerVersion feature) throws OMException } public boolean isLeaderExecutorEnabled() { - return configuration.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE, - OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT); + return isLeaderExecutorFlag; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java index 18ee42756ef..0116925d7fe 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java @@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.audit.AuditMessage; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.ratis.execution.request.OmRequestBase; import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.ratis.server.protocol.TermIndex; @@ -159,6 +160,34 @@ public static void log(OMAuditLogger.Builder builder, OMClientRequest request, O } } + public static void log(OMAuditLogger.Builder builder, OmRequestBase request, OzoneManager om, + TermIndex termIndex, Throwable th) { + if (builder.isLog.get()) { + builder.getAuditLogger().logWrite(builder.getMessageBuilder().build()); + return; + } + + OMAction action = getAction(request.getOmRequest()); + if (null == action) { + // no audit log defined + return; + } + if (builder.getAuditMap() == null) { + builder.setAuditMap(new HashMap<>()); + } + try { + builder.getAuditMap().put("Command", request.getOmRequest().getCmdType().name()); + builder.getAuditMap().put("Transaction", "" + termIndex.getIndex()); + request.buildAuditMessage(action, builder.getAuditMap(), + th, request.getOmRequest().getUserInfo()); + builder.setLog(true); + builder.setAuditLogger(om.getAuditLogger()); + log(builder); + } catch (Exception ex) { + LOG.error("Exception occurred while write audit log, ", ex); + } + } + public static Builder newBuilder() { return new Builder(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java index ad6ddc6dc1a..f6d6f51b3e5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java @@ -33,14 +33,18 @@ */ public class KeyLocking { private static final Logger LOG = LoggerFactory.getLogger(KeyLocking.class); - private static final int DEFAULT_FILE_LOCK_STRIPED_SIZE = 10240; + private static final int DEFAULT_FILE_LOCK_STRIPED_SIZE = 102400; private static final long LOCK_TIMEOUT = 10 * 60 * 1000; - private Striped fileStripedLock = Striped.readWriteLock(DEFAULT_FILE_LOCK_STRIPED_SIZE); + private final Striped fileStripedLock; private AtomicLong writeLockCount = new AtomicLong(); private AtomicLong readLockCount = new AtomicLong(); private AtomicLong failedLockCount = new AtomicLong(); private AtomicLong failedUnlockCount = new AtomicLong(); + public KeyLocking(int stripLockSize) { + fileStripedLock = Striped.readWriteLock(stripLockSize); + } + public void lock(List keyList) throws IOException { for (String key : keyList) { lock(key); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java index 8ce7855bc59..2c8e9da8f2e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java @@ -96,10 +96,10 @@ public OmLockOpr(LockType type, String volume, String bucket, List keyNa Collections.sort(this.keyNameList); } public static void init(String threadNamePrefix) { - keyLocking = new KeyLocking(); - volumeLocking = new KeyLocking(); - snapshotLocking = new KeyLocking(); - bucketLocking = new KeyLocking(); + keyLocking = new KeyLocking(102400); + volumeLocking = new KeyLocking(1024); + snapshotLocking = new KeyLocking(1024); + bucketLocking = new KeyLocking(1024); prefixLocking = new FSOPrefixLocking(threadNamePrefix); lockedObjMap = new ConcurrentHashMap<>(); // init scheduler to check and monitor diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java index 8a31e22690f..e03927ecd33 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java @@ -59,16 +59,16 @@ public static class BucketQuota { private AtomicLong incUsedBytes = new AtomicLong(); private AtomicLong incUsedNamespace = new AtomicLong(); - public void addUsedBytes(long bytes) { - incUsedBytes.addAndGet(bytes); + public long addUsedBytes(long bytes) { + return incUsedBytes.addAndGet(bytes); } public long getUsedBytes() { return incUsedBytes.get(); } - public void addUsedNamespace(long bytes) { - incUsedNamespace.addAndGet(bytes); + public long addUsedNamespace(long bytes) { + return incUsedNamespace.addAndGet(bytes); } public long getUsedNamespace() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/DbChangesRecorder.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/DbChangesRecorder.java new file mode 100644 index 00000000000..fa4fe82e213 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/DbChangesRecorder.java @@ -0,0 +1,47 @@ +package org.apache.hadoop.ozone.om.ratis.execution; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; + +/** + * Records db changes. + */ +public class DbChangesRecorder { + private Map> tableRecordsMap = new HashMap<>(); + private Map> bucketUsedQuotaMap = new HashMap<>(); + + public void add(String name, String dbOpenKeyName, CodecBuffer omKeyCodecBuffer) { + Map recordMap = tableRecordsMap.computeIfAbsent(name, k -> new HashMap<>()); + recordMap.put(dbOpenKeyName, omKeyCodecBuffer); + } + public void add(String bucketName, long incUsedBytes, long incNamespace) { + Pair quotaPair = bucketUsedQuotaMap.get(bucketName); + if (null == quotaPair) { + bucketUsedQuotaMap.put(bucketName, Pair.of(incUsedBytes, incNamespace)); + } else { + bucketUsedQuotaMap.put(bucketName, Pair.of(incUsedBytes + quotaPair.getLeft(), + incNamespace + quotaPair.getRight())); + } + } + public Map> getTableRecordsMap() { + return tableRecordsMap; + } + public Map> getBucketUsedQuotaMap() { + return bucketUsedQuotaMap; + } + + public void clear() { + for (Map records : tableRecordsMap.values()) { + records.values().forEach(e -> { + if (e != null) { + e.release(); + } + }); + } + tableRecordsMap.clear(); + bucketUsedQuotaMap.clear(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderCompatibleRequestExecutor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderCompatibleRequestExecutor.java new file mode 100644 index 00000000000..6027011db56 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderCompatibleRequestExecutor.java @@ -0,0 +1,441 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.ratis.execution; + +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; +import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.server.protocol.TermIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * om executor. + */ +public class LeaderCompatibleRequestExecutor { + private static final Logger LOG = LoggerFactory.getLogger(LeaderCompatibleRequestExecutor.class); + private static final int REQUEST_EXECUTOR_POOL_SIZE = 1; + private static final int REQUEST_EXECUTOR_QUEUE_SIZE = 1000; + private static final int MERGE_TASK_POOL_SIZE = 1; + private static final int MERGE_TASK_QUEUE_SIZE = 1000; + private static final int RATIS_TASK_POOL_SIZE = 10; + private static final int RATIS_TASK_QUEUE_SIZE = 1000; + private static final long DUMMY_TERM = -1; + private final AtomicLong uniqueIndex; + private final int ratisByteLimit; + private final OzoneManager ozoneManager; + private final PoolExecutor ratisSubmitter; + private final PoolExecutor requestMerger; + private final PoolExecutor leaderExecutor; + private final OzoneManagerRequestHandler handler; + private final AtomicBoolean isEnabled = new AtomicBoolean(true); + private final AtomicInteger ratisCurrentPool = new AtomicInteger(0); + + public LeaderCompatibleRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) { + this.ozoneManager = om; + this.handler = new OzoneManagerRequestHandler(ozoneManager); + ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE, RATIS_TASK_QUEUE_SIZE, + ozoneManager.getThreadNamePrefix() + "-LeaderRatis", this::ratisCommand, null); + requestMerger = new PoolExecutor<>(MERGE_TASK_POOL_SIZE, MERGE_TASK_QUEUE_SIZE, + ozoneManager.getThreadNamePrefix() + "-LeaderMerger", this::requestMergeCommand, this::ratisSubmit); + leaderExecutor = new PoolExecutor<>(REQUEST_EXECUTOR_POOL_SIZE, REQUEST_EXECUTOR_QUEUE_SIZE, + ozoneManager.getThreadNamePrefix() + "-LeaderExecutor", this::runExecuteCommand, this::mergeSubmit); + int limit = (int) ozoneManager.getConfiguration().getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + // always go to 90% of max limit for request as other header will be added + this.ratisByteLimit = (int) (limit * 0.8); + this.uniqueIndex = uniqueIndex; + } + public void stop() { + leaderExecutor.stop(); + requestMerger.stop(); + ratisSubmitter.stop(); + } + public int batchSize() { + return REQUEST_EXECUTOR_POOL_SIZE; + } + public boolean isProcessing() { + return isEnabled.get(); + } + public void disableProcessing() { + isEnabled.set(false); + } + public void enableProcessing() { + isEnabled.set(true); + } + + public void submit(int idx, RequestContext ctx) throws InterruptedException { + if (!isEnabled.get()) { + rejectRequest(Collections.singletonList(ctx)); + return; + } + executeRequest(ctx, this::mergeSubmit); + //leaderExecutor.submit(idx, ctx); + } + + private void rejectRequest(Collection ctxs) { + Throwable th; + if (!ozoneManager.isLeaderReady()) { + String peerId = ozoneManager.isRatisEnabled() ? ozoneManager.getOmRatisServer().getRaftPeerId().toString() + : ozoneManager.getOMNodeId(); + th = new OMLeaderNotReadyException(peerId + " is not ready to process request yet."); + } else { + th = new OMException("Request processing is disabled due to error", OMException.ResultCodes.INTERNAL_ERROR); + } + handleBatchUpdateComplete(ctxs, th, null); + } + + private void runExecuteCommand( + Collection ctxs, PoolExecutor.CheckedConsumer nxtPool) { + if (!isEnabled.get()) { + rejectRequest(ctxs); + return; + } + for (RequestContext ctx : ctxs) { + if (!isEnabled.get()) { + rejectRequest(Collections.singletonList(ctx)); + continue; + } + executeRequest(ctx, nxtPool); + } + } + + private void mergeSubmit(RequestContext ctx) throws InterruptedException { + requestMerger.submit(0, ctx); + } + + private void executeRequest(RequestContext ctx, PoolExecutor.CheckedConsumer nxtPool) { + OMRequest request = ctx.getRequest(); + TermIndex termIndex = TermIndex.valueOf(DUMMY_TERM, uniqueIndex.incrementAndGet()); + ctx.setIndex(termIndex); + try { + handleRequest(ctx, termIndex); + } catch (IOException e) { + LOG.warn("Failed to write, Exception occurred ", e); + ctx.setResponse(createErrorResponse(request, e)); + } catch (Throwable e) { + LOG.warn("Failed to write, Exception occurred ", e); + ctx.setResponse(createErrorResponse(request, new IOException(e))); + } finally { + if (ctx.getNextRequest() != null) { + try { + nxtPool.accept(ctx); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + handleBatchUpdateComplete(Collections.singletonList(ctx), null, null); + } + } + } + + private void handleRequest(RequestContext ctx, TermIndex termIndex) throws IOException { + OMClientRequest omClientRequest = ctx.getClientRequest(); + try { + OMClientResponse omClientResponse = handler.handleLeaderWriteRequest(omClientRequest, termIndex); + ctx.setResponse(omClientResponse.getOMResponse()); + if (!omClientResponse.getOMResponse().getSuccess()) { + OMAuditLogger.log(omClientRequest.getAuditBuilder(), termIndex); + } else { + OzoneManagerProtocolProtos.PersistDbRequest.Builder nxtRequest + = retrieveDbChanges(ctx, termIndex, omClientResponse); + if (nxtRequest != null) { + OMRequest.Builder omReqBuilder = OMRequest.newBuilder().setPersistDbRequest(nxtRequest.build()) + .setCmdType(OzoneManagerProtocolProtos.Type.PersistDb); + omReqBuilder.setClientId(ctx.getRequest().getClientId()); + ctx.setNextRequest(nxtRequest); + } else { + OMAuditLogger.log(omClientRequest.getAuditBuilder(), termIndex); + } + } + } catch (Throwable th) { + OMAuditLogger.log(omClientRequest.getAuditBuilder(), omClientRequest, ozoneManager, termIndex, th); + throw th; + } + } + + private OzoneManagerProtocolProtos.PersistDbRequest.Builder retrieveDbChanges( + RequestContext ctx, TermIndex termIndex, OMClientResponse omClientResponse) throws IOException { + OMMetadataManager metadataManager = ozoneManager.getMetadataManager(); + String name = metadataManager.getBucketTable().getName(); + boolean isDbChanged = false; + try (BatchOperation batchOperation = metadataManager.getStore() + .initBatchOperation()) { + omClientResponse.checkAndUpdateDB(metadataManager, batchOperation); + // get db update and create request to flush + OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder + = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder(); + Map> cachedDbTxs + = ((RDBBatchOperation) batchOperation).getCachedTransaction(); + for (Map.Entry> tblEntry : cachedDbTxs.entrySet()) { + isDbChanged = true; + if (tblEntry.getKey().equals(name)) { + if (ctx.getClientRequest().getWrappedBucketInfo() instanceof OmBucketInfoQuotaTracker) { + continue; + } + } + OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder + = OzoneManagerProtocolProtos.DBTableUpdate.newBuilder(); + tblBuilder.setTableName(tblEntry.getKey()); + for (Map.Entry kvEntry : tblEntry.getValue().entrySet()) { + OzoneManagerProtocolProtos.DBTableRecord.Builder kvBuild + = OzoneManagerProtocolProtos.DBTableRecord.newBuilder(); + kvBuild.setKey(ByteString.copyFrom(kvEntry.getKey())); + if (kvEntry.getValue() != null) { + kvBuild.setValue(ByteString.copyFrom(kvEntry.getValue())); + } + tblBuilder.addRecords(kvBuild.build()); + } + reqBuilder.addTableUpdates(tblBuilder.build()); + } + if (!isDbChanged) { + return null; + } + reqBuilder.addIndex(termIndex.getIndex()); + return reqBuilder; + } + } + + private void requestMergeCommand( + Collection ctxs, PoolExecutor.CheckedConsumer nxtPool) { + if (!isEnabled.get()) { + rejectRequest(ctxs); + return; + } + Map bucketChangeMap = new HashMap<>(); + List sendList = new ArrayList<>(); + OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder + = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder(); + long size = 0; + for (RequestContext ctx : ctxs) { + List tblList = ctx.getNextRequest().getTableUpdatesList(); + int tmpSize = 0; + for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : tblList) { + tmpSize += tblUpdates.getSerializedSize(); + } + if ((tmpSize + size) > ratisByteLimit) { + // send current batched request + appendBucketQuotaChanges(reqBuilder, bucketChangeMap); + prepareAndSendRequest(sendList, reqBuilder, nxtPool); + + // reinit and continue + reqBuilder = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder(); + size = 0; + sendList.clear(); + bucketChangeMap.clear(); + } + + // keep adding to batch list + size += tmpSize; + addBucketQuotaChanges(ctx, bucketChangeMap); + for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : tblList) { + OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder + = OzoneManagerProtocolProtos.DBTableUpdate.newBuilder(); + tblBuilder.setTableName(tblUpdates.getTableName()); + tblBuilder.addAllRecords(tblUpdates.getRecordsList()); + reqBuilder.addTableUpdates(tblBuilder.build()); + } + reqBuilder.addIndex(ctx.getIndex().getIndex()); + sendList.add(ctx); + } + if (sendList.size() > 0) { + appendBucketQuotaChanges(reqBuilder, bucketChangeMap); + prepareAndSendRequest(sendList, reqBuilder, nxtPool); + } + } + + private void ratisSubmit(RatisContext ctx) throws InterruptedException { + // follow simple strategy to submit to ratis for next set of merge request + int nxtIndex = Math.abs(ratisCurrentPool.getAndIncrement() % RATIS_TASK_POOL_SIZE); + ratisSubmitter.submit(nxtIndex, ctx); + } + + private void addBucketQuotaChanges( + RequestContext ctx, Map quotaMap) { + if (ctx.getClientRequest().getWrappedBucketInfo() instanceof OmBucketInfoQuotaTracker) { + OmBucketInfoQuotaTracker info = (OmBucketInfoQuotaTracker) ctx.getClientRequest().getWrappedBucketInfo(); + OzoneManagerProtocolProtos.BucketQuotaCount.Builder quotaBuilder = quotaMap.computeIfAbsent( + info.getObjectID(), k -> OzoneManagerProtocolProtos.BucketQuotaCount.newBuilder() + .setVolName(info.getVolumeName()).setBucketName(info.getBucketName()) + .setBucketObjectId(info.getObjectID()).setSupportOldQuota(false)); + quotaBuilder.setDiffUsedBytes(quotaBuilder.getDiffUsedBytes() + info.getIncUsedBytes()); + quotaBuilder.setDiffUsedNamespace(quotaBuilder.getDiffUsedNamespace() + info.getIncUsedNamespace()); + } + } + + private void appendBucketQuotaChanges( + OzoneManagerProtocolProtos.PersistDbRequest.Builder req, + Map quotaMap) { + for (Map.Entry entry : quotaMap.entrySet()) { + if (entry.getValue().getDiffUsedBytes() == 0 && entry.getValue().getDiffUsedNamespace() == 0) { + continue; + } + req.addBucketQuotaCount(entry.getValue().build()); + } + } + + private void prepareAndSendRequest( + List sendList, OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder, + PoolExecutor.CheckedConsumer nxtPool) { + RequestContext lastReqCtx = sendList.get(sendList.size() - 1); + OMRequest.Builder omReqBuilder = OMRequest.newBuilder().setPersistDbRequest(reqBuilder.build()) + .setCmdType(OzoneManagerProtocolProtos.Type.PersistDb) + .setClientId(lastReqCtx.getRequest().getClientId()); + OMRequest reqBatch = omReqBuilder.build(); + try { + nxtPool.accept(new RatisContext(sendList, reqBatch)); + } catch (InterruptedException e) { + handleBatchUpdateComplete(sendList, e, null); + Thread.currentThread().interrupt(); + } + } + + private void ratisCommand(Collection ctxs, PoolExecutor.CheckedConsumer nxtPool) { + if (!isEnabled.get()) { + for (RatisContext ctx : ctxs) { + rejectRequest(ctx.getRequestContexts()); + } + return; + } + for (RatisContext ctx : ctxs) { + List sendList = ctx.getRequestContexts(); + RequestContext lastReqCtx = sendList.get(sendList.size() - 1); + OMRequest reqBatch = ctx.getRequest(); + try { + OMResponse dbUpdateRsp = sendDbUpdateRequest(reqBatch, lastReqCtx.getIndex()); + if (!dbUpdateRsp.getSuccess()) { + throw new OMException(dbUpdateRsp.getMessage(), + OMException.ResultCodes.values()[dbUpdateRsp.getStatus().ordinal()]); + } + handleBatchUpdateComplete(sendList, null, dbUpdateRsp.getLeaderOMNodeId()); + } catch (Throwable e) { + LOG.warn("Failed to write, Exception occurred ", e); + handleBatchUpdateComplete(sendList, e, null); + } + } + } + private OMResponse sendDbUpdateRequest(OMRequest nextRequest, TermIndex termIndex) throws Exception { + try { + if (!ozoneManager.isRatisEnabled()) { + return OMBasicStateMachine.runCommand(nextRequest, termIndex, handler, ozoneManager); + } + OMResponse response = ozoneManager.getOmRatisServer().submitRequest(nextRequest, ClientId.randomId(), + termIndex.getIndex()); + return response; + } catch (Exception ex) { + throw ex; + } + } + private OMResponse createErrorResponse(OMRequest omRequest, IOException exception) { + OMResponse.Builder omResponseBuilder = OMResponse.newBuilder() + .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception)) + .setCmdType(omRequest.getCmdType()) + .setTraceID(omRequest.getTraceID()) + .setSuccess(false); + if (exception.getMessage() != null) { + omResponseBuilder.setMessage(exception.getMessage()); + } + OMResponse omResponse = omResponseBuilder.build(); + return omResponse; + } + private void handleBatchUpdateComplete(Collection ctxs, Throwable th, String leaderOMNodeId) { + // TODO: no-cache switch, no need cleanup cache + Map> cleanupMap = new HashMap<>(); + for (RequestContext ctx : ctxs) { + // cache cleanup + if (null != ctx.getNextRequest()) { + List tblList = ctx.getNextRequest().getTableUpdatesList(); + for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdate : tblList) { + List epochs = cleanupMap.computeIfAbsent(tblUpdate.getTableName(), k -> new ArrayList<>()); + epochs.add(ctx.getIndex().getIndex()); + } + } + for (Map.Entry> entry : cleanupMap.entrySet()) { + ozoneManager.getMetadataManager().getTable(entry.getKey()).cleanupCache(entry.getValue()); + } + } + + for (RequestContext ctx : ctxs) { + if (ctx.getClientRequest().getWrappedBucketInfo() instanceof OmBucketInfoQuotaTracker) { + // reset to be done to update resource quota for both success and failure + // for success also, its added here as it's difficult to ensure its execution at nodes + ((OmBucketInfoQuotaTracker) ctx.getClientRequest().getWrappedBucketInfo()).reset(); + } + + if (th != null) { + OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(), ctx.getClientRequest(), ozoneManager, + ctx.getIndex(), th); + if (th instanceof IOException) { + ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), (IOException)th)); + } else { + ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), new IOException(th))); + } + } else { + OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(), ctx.getIndex()); + OMResponse newRsp = ctx.getResponse(); + if (leaderOMNodeId != null) { + newRsp = OMResponse.newBuilder(newRsp).setLeaderOMNodeId(leaderOMNodeId).build(); + } + ctx.getFuture().complete(newRsp); + } + } + } + + static class RatisContext { + private List ctxs; + private OMRequest req; + RatisContext(List ctxs, OMRequest req) { + this.ctxs = ctxs; + this.req = req; + } + public List getRequestContexts() { + return ctxs; + } + + public OMRequest getRequest() { + return req; + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java index f213c0a514f..04d6947c0b6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java @@ -18,7 +18,6 @@ import com.google.protobuf.ByteString; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -28,17 +27,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.StorageUnit; -import org.apache.hadoop.hdds.utils.db.BatchOperation; -import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; import org.apache.hadoop.ozone.om.OMConfigKeys; -import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.ratis.execution.request.OmRequestBase; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; -import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; @@ -49,6 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; + /** * om executor. */ @@ -56,27 +57,34 @@ public class LeaderRequestExecutor { private static final Logger LOG = LoggerFactory.getLogger(LeaderRequestExecutor.class); private static final int REQUEST_EXECUTOR_POOL_SIZE = 1; private static final int REQUEST_EXECUTOR_QUEUE_SIZE = 1000; - private static final int MERGE_TASK_POOL_SIZE = 1; - private static final int MERGE_TASK_QUEUE_SIZE = 1000; - private static final int RATIS_TASK_POOL_SIZE = 10; - private static final int RATIS_TASK_QUEUE_SIZE = 1000; + private final int mergeTaskPoolSize; + private final int mergeTaskQueueSize; + private final int ratisTaskPoolSize; + private final int ratisTaskQueueSize; private static final long DUMMY_TERM = -1; private final AtomicLong uniqueIndex; private final int ratisByteLimit; private final OzoneManager ozoneManager; + private OMGatewayMetrics omGatewayMetrics; private final PoolExecutor ratisSubmitter; private final PoolExecutor requestMerger; private final PoolExecutor leaderExecutor; private final OzoneManagerRequestHandler handler; private final AtomicBoolean isEnabled = new AtomicBoolean(true); private final AtomicInteger ratisCurrentPool = new AtomicInteger(0); + private final AtomicInteger mergeCurrentPool = new AtomicInteger(0); - public LeaderRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) { + public LeaderRequestExecutor(OzoneManager om, AtomicLong uniqueIndex, OMGatewayMetrics omGatewayMetrics) { this.ozoneManager = om; + this.omGatewayMetrics = omGatewayMetrics; + this.mergeTaskPoolSize = om.getConfiguration().getInt("ozone.om.leader.merge.pool.size", 1); + this.mergeTaskQueueSize = om.getConfiguration().getInt("ozone.om.leader.merge.queue.size", 1000); + this.ratisTaskPoolSize = om.getConfiguration().getInt("ozone.om.leader.ratis.pool.size", 10); + this.ratisTaskQueueSize = om.getConfiguration().getInt("ozone.om.leader.ratis.queue.size", 1000); this.handler = new OzoneManagerRequestHandler(ozoneManager); - ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE, RATIS_TASK_QUEUE_SIZE, + ratisSubmitter = new PoolExecutor<>(ratisTaskPoolSize, ratisTaskQueueSize, ozoneManager.getThreadNamePrefix() + "-LeaderRatis", this::ratisCommand, null); - requestMerger = new PoolExecutor<>(MERGE_TASK_POOL_SIZE, MERGE_TASK_QUEUE_SIZE, + requestMerger = new PoolExecutor<>(mergeTaskPoolSize, mergeTaskQueueSize, ozoneManager.getThreadNamePrefix() + "-LeaderMerger", this::requestMergeCommand, this::ratisSubmit); leaderExecutor = new PoolExecutor<>(REQUEST_EXECUTOR_POOL_SIZE, REQUEST_EXECUTOR_QUEUE_SIZE, ozoneManager.getThreadNamePrefix() + "-LeaderExecutor", this::runExecuteCommand, this::mergeSubmit); @@ -106,12 +114,13 @@ public void enableProcessing() { isEnabled.set(true); } - public void submit(int idx, RequestContext ctx) throws InterruptedException { + public void submit(int idx, RequestContext ctx) throws InterruptedException, IOException { if (!isEnabled.get()) { rejectRequest(Collections.singletonList(ctx)); return; } - executeRequest(ctx, this::mergeSubmit); + captureLatencyNs(omGatewayMetrics.getGatewayRequestExecute(), () -> executeRequest(ctx, this::mergeSubmit)); + //executeRequest(ctx, this::mergeSubmit); //leaderExecutor.submit(idx, ctx); } @@ -143,7 +152,12 @@ private void runExecuteCommand( } private void mergeSubmit(RequestContext ctx) throws InterruptedException { - requestMerger.submit(0, ctx); + if (mergeTaskPoolSize == 0) { + requestMergeCommand(Collections.singletonList(ctx), this::ratisSubmit); + return; + } + int nxtIndex = Math.abs(mergeCurrentPool.getAndIncrement() % mergeTaskPoolSize); + requestMerger.submit(nxtIndex, ctx); } private void executeRequest(RequestContext ctx, PoolExecutor.CheckedConsumer nxtPool) { @@ -159,7 +173,7 @@ private void executeRequest(RequestContext ctx, PoolExecutor.CheckedConsumer 0) { try { nxtPool.accept(ctx); } catch (InterruptedException e) { @@ -172,73 +186,35 @@ private void executeRequest(RequestContext ctx, PoolExecutor.CheckedConsumer 0)) { + // if no update, audit log the response OMAuditLogger.log(omClientRequest.getAuditBuilder(), termIndex); } } } catch (Throwable th) { + omClientRequest.changeRecorder().clear(); OMAuditLogger.log(omClientRequest.getAuditBuilder(), omClientRequest, ozoneManager, termIndex, th); throw th; } } - private OzoneManagerProtocolProtos.PersistDbRequest.Builder retrieveDbChanges( - RequestContext ctx, TermIndex termIndex, OMClientResponse omClientResponse) throws IOException { - OMMetadataManager metadataManager = ozoneManager.getMetadataManager(); - String name = metadataManager.getBucketTable().getName(); - boolean isDbChanged = false; - try (BatchOperation batchOperation = metadataManager.getStore() - .initBatchOperation()) { - omClientResponse.checkAndUpdateDB(metadataManager, batchOperation); - // get db update and create request to flush - OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder - = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder(); - Map> cachedDbTxs - = ((RDBBatchOperation) batchOperation).getCachedTransaction(); - for (Map.Entry> tblEntry : cachedDbTxs.entrySet()) { - isDbChanged = true; - if (tblEntry.getKey().equals(name)) { - if (ctx.getClientRequest().getWrappedBucketInfo() instanceof OmBucketInfoQuotaTracker) { - continue; - } - } - OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder - = OzoneManagerProtocolProtos.DBTableUpdate.newBuilder(); - tblBuilder.setTableName(tblEntry.getKey()); - for (Map.Entry kvEntry : tblEntry.getValue().entrySet()) { - OzoneManagerProtocolProtos.DBTableRecord.Builder kvBuild - = OzoneManagerProtocolProtos.DBTableRecord.newBuilder(); - kvBuild.setKey(ByteString.copyFrom(kvEntry.getKey())); - if (kvEntry.getValue() != null) { - kvBuild.setValue(ByteString.copyFrom(kvEntry.getValue())); - } - tblBuilder.addRecords(kvBuild.build()); - } - reqBuilder.addTableUpdates(tblBuilder.build()); - } - if (!isDbChanged) { - return null; - } - reqBuilder.addIndex(termIndex.getIndex()); - return reqBuilder; + private void requestMergeCommand( + Collection ctxs, PoolExecutor.CheckedConsumer nxtPool) { + try { + captureLatencyNs(omGatewayMetrics.getGatewayMergeWait(), () -> requestMergeCommandInternal(ctxs, nxtPool)); + } catch (IOException e) { + // do nothing } } - - private void requestMergeCommand( + private void requestMergeCommandInternal( Collection ctxs, PoolExecutor.CheckedConsumer nxtPool) { if (!isEnabled.get()) { rejectRequest(ctxs); @@ -249,16 +225,23 @@ private void requestMergeCommand( OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder(); long size = 0; + omGatewayMetrics.incRequestMergeCombineCount(ctxs.size()); + omGatewayMetrics.incRequestMergeCallCount(); for (RequestContext ctx : ctxs) { - List tblList = ctx.getNextRequest().getTableUpdatesList(); + DbChangesRecorder recorder = ctx.getRequestBase().changeRecorder(); int tmpSize = 0; - for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : tblList) { - tmpSize += tblUpdates.getSerializedSize(); + for (Map.Entry> tblRecords : recorder.getTableRecordsMap().entrySet()) { + tmpSize += tblRecords.getKey().length(); + for (Map.Entry record : tblRecords.getValue().entrySet()) { + tmpSize += record.getKey().length(); + tmpSize += record.getValue() != null ? record.getValue().readableBytes() : 0; + } } if ((tmpSize + size) > ratisByteLimit) { // send current batched request appendBucketQuotaChanges(reqBuilder, bucketChangeMap); prepareAndSendRequest(sendList, reqBuilder, nxtPool); + omGatewayMetrics.incRequestMergeOverflowCount(); // reinit and continue reqBuilder = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder(); @@ -270,11 +253,19 @@ private void requestMergeCommand( // keep adding to batch list size += tmpSize; addBucketQuotaChanges(ctx, bucketChangeMap); - for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : tblList) { + for (Map.Entry> tblRecords : recorder.getTableRecordsMap().entrySet()) { OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder = OzoneManagerProtocolProtos.DBTableUpdate.newBuilder(); - tblBuilder.setTableName(tblUpdates.getTableName()); - tblBuilder.addAllRecords(tblUpdates.getRecordsList()); + tblBuilder.setTableName(tblRecords.getKey()); + for (Map.Entry record : tblRecords.getValue().entrySet()) { + OzoneManagerProtocolProtos.DBTableRecord.Builder kvBuild + = OzoneManagerProtocolProtos.DBTableRecord.newBuilder(); + kvBuild.setKey(ByteString.copyFromUtf8(record.getKey())); + if (record.getValue() != null) { + kvBuild.setValue(ByteString.copyFrom(record.getValue().asReadOnlyByteBuffer())); + } + tblBuilder.addRecords(kvBuild.build()); + } reqBuilder.addTableUpdates(tblBuilder.build()); } reqBuilder.addIndex(ctx.getIndex().getIndex()); @@ -288,21 +279,28 @@ private void requestMergeCommand( private void ratisSubmit(RatisContext ctx) throws InterruptedException { // follow simple strategy to submit to ratis for next set of merge request - int nxtIndex = Math.abs(ratisCurrentPool.getAndIncrement() % RATIS_TASK_POOL_SIZE); + if (ratisTaskPoolSize == 0) { + ratisCommand(Collections.singletonList(ctx), null); + return; + } + int nxtIndex = Math.abs(ratisCurrentPool.getAndIncrement() % ratisTaskPoolSize); ratisSubmitter.submit(nxtIndex, ctx); } private void addBucketQuotaChanges( RequestContext ctx, Map quotaMap) { - if (ctx.getClientRequest().getWrappedBucketInfo() instanceof OmBucketInfoQuotaTracker) { - OmBucketInfoQuotaTracker info = (OmBucketInfoQuotaTracker) ctx.getClientRequest().getWrappedBucketInfo(); - OzoneManagerProtocolProtos.BucketQuotaCount.Builder quotaBuilder = quotaMap.computeIfAbsent( - info.getObjectID(), k -> OzoneManagerProtocolProtos.BucketQuotaCount.newBuilder() - .setVolName(info.getVolumeName()).setBucketName(info.getBucketName()) - .setBucketObjectId(info.getObjectID()).setSupportOldQuota(false)); - quotaBuilder.setDiffUsedBytes(quotaBuilder.getDiffUsedBytes() + info.getIncUsedBytes()); - quotaBuilder.setDiffUsedNamespace(quotaBuilder.getDiffUsedNamespace() + info.getIncUsedNamespace()); + DbChangesRecorder recorder = ctx.getRequestBase().changeRecorder(); + if (recorder.getBucketUsedQuotaMap().size() == 0) { + return; } + Pair usedByteNameSpace = recorder.getBucketUsedQuotaMap().values().stream().findFirst().get(); + OmBucketInfo info = ctx.getRequestBase().getBucketInfo(); + OzoneManagerProtocolProtos.BucketQuotaCount.Builder quotaBuilder = quotaMap.computeIfAbsent( + info.getObjectID(), k -> OzoneManagerProtocolProtos.BucketQuotaCount.newBuilder() + .setVolName(info.getVolumeName()).setBucketName(info.getBucketName()) + .setBucketObjectId(info.getObjectID()).setSupportOldQuota(false)); + quotaBuilder.setDiffUsedBytes(quotaBuilder.getDiffUsedBytes() + usedByteNameSpace.getLeft()); + quotaBuilder.setDiffUsedNamespace(quotaBuilder.getDiffUsedNamespace() + usedByteNameSpace.getRight()); } private void appendBucketQuotaChanges( @@ -339,21 +337,27 @@ private void ratisCommand(Collection ctxs, PoolExecutor.CheckedCon } return; } - for (RatisContext ctx : ctxs) { - List sendList = ctx.getRequestContexts(); - RequestContext lastReqCtx = sendList.get(sendList.size() - 1); - OMRequest reqBatch = ctx.getRequest(); - try { - OMResponse dbUpdateRsp = sendDbUpdateRequest(reqBatch, lastReqCtx.getIndex()); - if (!dbUpdateRsp.getSuccess()) { - throw new OMException(dbUpdateRsp.getMessage(), - OMException.ResultCodes.values()[dbUpdateRsp.getStatus().ordinal()]); + try { + captureLatencyNs(omGatewayMetrics.getGatewayRatisWait(), () -> { + for (RatisContext ctx : ctxs) { + List sendList = ctx.getRequestContexts(); + RequestContext lastReqCtx = sendList.get(sendList.size() - 1); + OMRequest reqBatch = ctx.getRequest(); + try { + OMResponse dbUpdateRsp = sendDbUpdateRequest(reqBatch, lastReqCtx.getIndex()); + if (!dbUpdateRsp.getSuccess()) { + throw new OMException(dbUpdateRsp.getMessage(), + OMException.ResultCodes.values()[dbUpdateRsp.getStatus().ordinal()]); + } + handleBatchUpdateComplete(sendList, null, dbUpdateRsp.getLeaderOMNodeId()); + } catch (Throwable e) { + LOG.warn("Failed to write, Exception occurred ", e); + handleBatchUpdateComplete(sendList, e, null); + } } - handleBatchUpdateComplete(sendList, null, dbUpdateRsp.getLeaderOMNodeId()); - } catch (Throwable e) { - LOG.warn("Failed to write, Exception occurred ", e); - handleBatchUpdateComplete(sendList, e, null); - } + }); + } catch (IOException e) { + // do nothing } } private OMResponse sendDbUpdateRequest(OMRequest nextRequest, TermIndex termIndex) throws Exception { @@ -381,31 +385,20 @@ private OMResponse createErrorResponse(OMRequest omRequest, IOException exceptio return omResponse; } private void handleBatchUpdateComplete(Collection ctxs, Throwable th, String leaderOMNodeId) { - // TODO: no-cache switch, no need cleanup cache - Map> cleanupMap = new HashMap<>(); - for (RequestContext ctx : ctxs) { - // cache cleanup - if (null != ctx.getNextRequest()) { - List tblList = ctx.getNextRequest().getTableUpdatesList(); - for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdate : tblList) { - List epochs = cleanupMap.computeIfAbsent(tblUpdate.getTableName(), k -> new ArrayList<>()); - epochs.add(ctx.getIndex().getIndex()); - } - } - for (Map.Entry> entry : cleanupMap.entrySet()) { - ozoneManager.getMetadataManager().getTable(entry.getKey()).cleanupCache(entry.getValue()); - } - } - for (RequestContext ctx : ctxs) { - if (ctx.getClientRequest().getWrappedBucketInfo() instanceof OmBucketInfoQuotaTracker) { - // reset to be done to update resource quota for both success and failure - // for success also, its added here as it's difficult to ensure its execution at nodes - ((OmBucketInfoQuotaTracker) ctx.getClientRequest().getWrappedBucketInfo()).reset(); + // reset quota resource and release memory for change update + Map> usedQuotaMap = ctx.getRequestBase().changeRecorder().getBucketUsedQuotaMap(); + if (null != ctx.getRequestBase().getBucketInfo() && usedQuotaMap.size() > 0) { + BucketQuotaResource.BucketQuota bucketQuota = BucketQuotaResource.instance().get( + ctx.getRequestBase().getBucketInfo().getObjectID()); + Pair quotaPair = usedQuotaMap.values().stream().findFirst().get(); + bucketQuota.addUsedBytes(-quotaPair.getLeft()); + bucketQuota.addUsedNamespace(-quotaPair.getRight()); } + ctx.getRequestBase().changeRecorder().clear(); if (th != null) { - OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(), ctx.getClientRequest(), ozoneManager, + OMAuditLogger.log(ctx.getRequestBase().getAuditBuilder(), ctx.getRequestBase(), ozoneManager, ctx.getIndex(), th); if (th instanceof IOException) { ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), (IOException)th)); @@ -413,7 +406,7 @@ private void handleBatchUpdateComplete(Collection ctxs, Throwabl ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), new IOException(th))); } } else { - OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(), ctx.getIndex()); + OMAuditLogger.log(ctx.getRequestBase().getAuditBuilder(), ctx.getIndex()); OMResponse newRsp = ctx.getResponse(); if (leaderOMNodeId != null) { newRsp = OMResponse.newBuilder(newRsp).setLeaderOMNodeId(leaderOMNodeId).build(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java index c2c9333f6de..b9392fead8f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java @@ -16,10 +16,14 @@ */ package org.apache.hadoop.ozone.om.ratis.execution; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ServiceException; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -33,6 +37,7 @@ import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.lock.OmRequestLockUtils; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.execution.request.OmRequestBase; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; @@ -43,12 +48,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; + /** * entry for request execution. */ public class OMGateway { private static final Logger LOG = LoggerFactory.getLogger(OMGateway.class); private final LeaderRequestExecutor leaderExecutor; + private final LeaderCompatibleRequestExecutor leaderCompatibleExecutor; private final FollowerRequestExecutor followerExecutor; private final OzoneManager om; private final AtomicLong requestInProgress = new AtomicLong(0); @@ -58,12 +66,18 @@ public class OMGateway { * This makes use of termIndex for init shifted within 54 bits. */ private AtomicLong uniqueIndex = new AtomicLong(); + private OMGatewayMetrics omGatewayMetrics; + private final ScheduledExecutorService executorService; + private boolean isAuthorize; public OMGateway(OzoneManager om) throws IOException { this.om = om; + omGatewayMetrics = OMGatewayMetrics.create(); OmLockOpr.init(om.getThreadNamePrefix()); OmRequestLockUtils.init(); - this.leaderExecutor = new LeaderRequestExecutor(om, uniqueIndex); + isAuthorize = om.getConfiguration().getBoolean("ozone.om.leader.request.is.authorize", true); + this.leaderExecutor = new LeaderRequestExecutor(om, uniqueIndex, omGatewayMetrics); + this.leaderCompatibleExecutor = new LeaderCompatibleRequestExecutor(om, uniqueIndex); this.followerExecutor = new FollowerRequestExecutor(om, uniqueIndex); if (om.isLeaderExecutorEnabled() && om.isRatisEnabled()) { OzoneManagerRatisServer ratisServer = om.getOmRatisServer(); @@ -83,13 +97,27 @@ public OMGateway(OzoneManager om) throws IOException { if (om.isLeaderExecutorEnabled()) { BucketQuotaResource.instance().enableTrack(); } + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(om.getThreadNamePrefix() + "OmGateway-monitor-%d").build(); + executorService = Executors.newScheduledThreadPool(1, threadFactory); + executorService.scheduleWithFixedDelay(() -> monitor(), 1000L, 1000L, TimeUnit.MILLISECONDS); } public void stop() { leaderExecutor.stop(); + leaderCompatibleExecutor.stop(); followerExecutor.stop(); OmLockOpr.stop(); + executorService.shutdown(); + omGatewayMetrics.unRegister(); + } + public void monitor() { + omGatewayMetrics.getGatewayRequestInProgress().add(requestInProgress.intValue()); } public OMResponse submit(OMRequest omRequest) throws ServiceException { + return captureLatencyNs(omGatewayMetrics.getGatewayExecution(), () -> submitInternal(omRequest)); + } + public OMResponse submitInternal(OMRequest omRequest) throws ServiceException { + omGatewayMetrics.incRequestCount(); if (!om.isLeaderReady()) { try { om.checkLeaderStatus(); @@ -104,21 +132,44 @@ public OMResponse submit(OMRequest omRequest) throws ServiceException { requestContext.setFuture(new CompletableFuture<>()); CompletableFuture f = requestContext.getFuture() .whenComplete((r, th) -> handleAfterExecution(requestContext, th)); - OmLockOpr lockOperation = OmRequestLockUtils.getLockOperation(om, omRequest); + OmLockOpr lockOperation = null; try { // TODO scheduling of request to pool - OMClientRequest omClientRequest = OzoneManagerRatisUtils.createClientRequest(omRequest, om); - lockOperation.lock(om); - requestContext.setClientRequest(omClientRequest); + ExecutorType executorType = executorSelector(omRequest); + if (executorType == ExecutorType.LEADER_OPTIMIZED) { + executorType = captureLatencyNs(omGatewayMetrics.getGatewayPreExecute(), () -> { + OmRequestBase requestBase = OzoneManagerRatisUtils.createLeaderClientRequest(omRequest, om); + if (null != requestBase) { + OMRequest request = requestBase.preProcess(om); + requestContext.setRequest(request); + requestContext.setRequestBase(requestBase); + return ExecutorType.LEADER_OPTIMIZED; + } else { + return ExecutorType.LEADER_COMPATIBLE; + } + }); + if (executorType == ExecutorType.LEADER_OPTIMIZED && isAuthorize) { + captureLatencyNs(omGatewayMetrics.getGatewayAuthorize(), () -> requestContext.getRequestBase().authorize(om)); + } + } + if (executorType == ExecutorType.LEADER_COMPATIBLE) { + OMClientRequest omClientRequest = OzoneManagerRatisUtils.createClientRequest(omRequest, om); + OMRequest request = omClientRequest.preExecute(om); + omClientRequest = OzoneManagerRatisUtils.createClientRequest(request, om); + requestContext.setClientRequest(omClientRequest); + requestContext.setRequest(request); + } + lockOperation = OmRequestLockUtils.getLockOperation(om, requestContext.getRequest()); + final OmLockOpr tmpLockOpr = lockOperation; + captureLatencyNs(omGatewayMetrics.getGatewayLock(), () -> tmpLockOpr.lock(om)); - validate(omRequest); - ensurePreviousRequestCompletionForPrepare(omRequest); + validate(requestContext.getRequest()); + ensurePreviousRequestCompletionForPrepare(requestContext.getRequest()); // submit request - ExecutorType executorType = executorSelector(omRequest); if (executorType == ExecutorType.LEADER_COMPATIBLE) { - int idx = Math.abs(leaderExecCurrIdx.getAndIncrement() % leaderExecutor.batchSize()); - leaderExecutor.submit(idx, requestContext); + int idx = Math.abs(leaderExecCurrIdx.getAndIncrement() % leaderCompatibleExecutor.batchSize()); + leaderCompatibleExecutor.submit(idx, requestContext); } else if (executorType == ExecutorType.FOLLOWER) { followerExecutor.submit(0, requestContext); } else { @@ -126,7 +177,8 @@ public OMResponse submit(OMRequest omRequest) throws ServiceException { } try { - return f.get(); + return captureLatencyNs(omGatewayMetrics.getGatewayRequestResponse(), () -> f.get()); + //return f.get(); } catch (ExecutionException ex) { if (ex.getCause() != null) { throw new ServiceException(ex.getMessage(), ex.getCause()); @@ -144,16 +196,18 @@ public OMResponse submit(OMRequest omRequest) throws ServiceException { LOG.error("Exception occurred while handling request", e); throw new ServiceException(e.getMessage(), e); } finally { - lockOperation.unlock(); - Server.Call call = Server.getCurCall().get(); - if (null != call) { - OMLockDetails lockDetails = lockOperation.getLockDetails(); - call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKWAIT, - lockDetails.getWaitLockNanos(), TimeUnit.NANOSECONDS); - call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKSHARED, - lockDetails.getReadLockNanos(), TimeUnit.NANOSECONDS); - call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKEXCLUSIVE, - lockDetails.getWriteLockNanos(), TimeUnit.NANOSECONDS); + if (null != lockOperation) { + lockOperation.unlock(); + Server.Call call = Server.getCurCall().get(); + if (null != call) { + OMLockDetails lockDetails = lockOperation.getLockDetails(); + call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKWAIT, + lockDetails.getWaitLockNanos(), TimeUnit.NANOSECONDS); + call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKSHARED, + lockDetails.getReadLockNanos(), TimeUnit.NANOSECONDS); + call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKEXCLUSIVE, + lockDetails.getWriteLockNanos(), TimeUnit.NANOSECONDS); + } } } } @@ -207,7 +261,7 @@ public void leaderChangeNotifier(String newLeaderId) { if (isLeader) { resetUniqueIndex(); } else { - leaderExecutor.disableProcessing(); + leaderCompatibleExecutor.disableProcessing(); } } @@ -227,11 +281,11 @@ private void resetUniqueIndex() { } public void executorEnable() throws ServiceException { - if (leaderExecutor.isProcessing()) { + if (leaderCompatibleExecutor.isProcessing()) { return; } if (requestInProgress.get() == 0) { - leaderExecutor.enableProcessing(); + leaderCompatibleExecutor.enableProcessing(); } else { LOG.warn("Executor is not enabled, previous request {} is still not cleaned", requestInProgress.get()); String msg = "Request processing is disabled due to error"; @@ -241,9 +295,11 @@ public void executorEnable() throws ServiceException { private ExecutorType executorSelector(OMRequest req) { switch (req.getCmdType()) { - case EchoRPC: + case CreateKey: + case CommitKey: return ExecutorType.LEADER_OPTIMIZED; /* cases with Secret manager cache */ + case EchoRPC: case GetS3Secret: case SetS3Secret: case RevokeS3Secret: diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGatewayMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGatewayMetrics.java new file mode 100644 index 00000000000..324ebfa97bb --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGatewayMetrics.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.ratis.execution; + +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * This class is for maintaining Ozone Manager statistics. + */ +@InterfaceAudience.Private +@Metrics(about = "Ozone Manager Gateway Metrics", context = "dfs") +public class OMGatewayMetrics { + private static final String SOURCE_NAME = + OMGatewayMetrics.class.getSimpleName(); + + @Metric(about = "request gateway execution") + private MutableRate gatewayExecution; + @Metric(about = "request gateway pre-execute") + private MutableRate gatewayPreExecute; + @Metric(about = "request gateway lock") + private MutableRate gatewayLock; + @Metric(about = "request gateway request execute") + private MutableRate gatewayRequestExecute; + @Metric(about = "request gateway wait response") + private MutableRate gatewayRequestResponse; + @Metric(about = "request gateway ratis wait") + private MutableRate gatewayRatisWait; + @Metric(about = "request gateway merge wait") + private MutableRate gatewayMergeWait; + @Metric(about = "request gateway authorize") + private MutableRate gatewayAuthorize; + @Metric(about = "request gateway request at any time captured per sec") + private MutableRate gatewayRequestInProgress; + private @Metric MutableCounterLong requestCount; + @Metric(about = "request gateway merge call count") + private MutableCounterLong gatewayMergeCombineCount; + @Metric(about = "request gateway merge combine count") + private MutableCounterLong gatewayMergeCallCount; + @Metric(about = "request gateway merge overflow count") + private MutableCounterLong gatewayMergeOverflowCount; + + public OMGatewayMetrics() { + } + + public static OMGatewayMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + "Ozone Manager Gateway Metrics", + new OMGatewayMetrics()); + } + + + + public void unRegister() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + public MutableRate getGatewayExecution() { + return gatewayExecution; + } + + public MutableRate getGatewayPreExecute() { + return gatewayPreExecute; + } + + public MutableRate getGatewayLock() { + return gatewayLock; + } + + public MutableRate getGatewayRequestExecute() { + return gatewayRequestExecute; + } + + public MutableRate getGatewayRequestResponse() { + return gatewayRequestResponse; + } + + public MutableRate getGatewayRatisWait() { + return gatewayRatisWait; + } + + public MutableRate getGatewayAuthorize() { + return gatewayAuthorize; + } + + public MutableCounterLong getRequestCount() { + return requestCount; + } + public void incRequestCount() { + requestCount.incr(); + } + + public MutableRate getGatewayRequestInProgress() { + return gatewayRequestInProgress; + } + + public MutableRate getGatewayMergeWait() { + return gatewayMergeWait; + } + + public void incRequestMergeCombineCount(int size) { + gatewayMergeCombineCount.incr(size); + } + + public void incRequestMergeCallCount() { + gatewayMergeCallCount.incr(); + } + + public void incRequestMergeOverflowCount() { + gatewayMergeOverflowCount.incr(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/RequestContext.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/RequestContext.java index 31994a06e3c..a2ac57c473d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/RequestContext.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/RequestContext.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om.ratis.execution; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.ozone.om.ratis.execution.request.OmRequestBase; import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; @@ -29,6 +30,7 @@ public final class RequestContext { private OMRequest request; private OMClientRequest clientRequest; + private OmRequestBase requestBase; private OMResponse response; private TermIndex index; private CompletableFuture future; @@ -84,4 +86,10 @@ public OMClientRequest getClientRequest() { public void setClientRequest(OMClientRequest clientRequest) { this.clientRequest = clientRequest; } + public void setRequestBase(OmRequestBase requestBase) { + this.requestBase = requestBase; + } + public OmRequestBase getRequestBase() { + return requestBase; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OMKeyCommitRequest.java new file mode 100644 index 00000000000..3ac79a9d40c --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OMKeyCommitRequest.java @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis.execution.request; + +import com.google.common.base.Preconditions; +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.nio.file.InvalidPathException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.OzoneManagerVersion; +import org.apache.hadoop.ozone.audit.AuditLogger; +import org.apache.hadoop.ozone.audit.OMAction; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; +import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.WithMetadata; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.request.util.OmKeyHSyncUtil; +import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; +import org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator; +import org.apache.hadoop.ozone.om.request.validation.RequestProcessingPhase; +import org.apache.hadoop.ozone.om.request.validation.ValidationCondition; +import org.apache.hadoop.ozone.om.request.validation.ValidationContext; +import org.apache.hadoop.ozone.om.response.DummyOMClientResponse; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponse; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.util.Time; +import org.apache.ratis.server.protocol.TermIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION; +import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; + +/** + * Handles CommitKey request. + */ +public class OMKeyCommitRequest extends OmRequestBase { + private static final Logger LOG = LoggerFactory.getLogger(OMKeyCommitRequest.class); + + public OMKeyCommitRequest(OMRequest omRequest, OmBucketInfo bucketInfo) { + super(omRequest, bucketInfo); + } + + @Override + public OMRequest preProcess(OzoneManager ozoneManager) throws IOException { + OMRequest request = super.preProcess(ozoneManager); + CommitKeyRequest commitKeyRequest = request.getCommitKeyRequest(); + Preconditions.checkNotNull(commitKeyRequest); + + KeyArgs keyArgs = commitKeyRequest.getKeyArgs(); + + if (keyArgs.hasExpectedDataGeneration()) { + ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY); + } + + // Verify key name + final boolean checkKeyNameEnabled = ozoneManager.getConfiguration().getBoolean( + OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_KEY, + OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_DEFAULT); + if (checkKeyNameEnabled) { + OmUtils.validateKeyName(StringUtils.removeEnd(keyArgs.getKeyName(), OzoneConsts.FS_FILE_COPYING_TEMP_SUFFIX)); + } + boolean isHsync = commitKeyRequest.hasHsync() && commitKeyRequest.getHsync(); + boolean isRecovery = commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery(); + boolean enableHsync = OzoneFSUtils.canEnableHsync(ozoneManager.getConfiguration(), false); + + // If hsynced is called for a file, then this file is hsynced, otherwise it's not hsynced. + // Currently, file lease recovery by design only supports recover hsynced file + if ((isHsync || isRecovery) && !enableHsync) { + throw new OMException("Hsync is not enabled. To enable, set ozone.fs.hsync.enabled = true", + NOT_SUPPORTED_OPERATION); + } + + String keyPath = keyArgs.getKeyName(); + keyPath = OMClientRequest.validateAndNormalizeKey(ozoneManager.getEnableFileSystemPaths(), keyPath, + getBucketLayout()); + + KeyArgs.Builder newKeyArgs = keyArgs.toBuilder().setVolumeName(getBucketInfo().getVolumeName()) + .setBucketName(getBucketInfo().getBucketName()).setModificationTime(Time.now()).setKeyName(keyPath); + return request.toBuilder().setCommitKeyRequest(commitKeyRequest.toBuilder().setKeyArgs(newKeyArgs)).build(); + } + public void authorize(OzoneManager ozoneManager) throws IOException { + KeyArgs keyArgs = getOmRequest().getCommitKeyRequest().getKeyArgs(); + OmKeyUtils.checkOpenKeyAcls(ozoneManager, keyArgs.getVolumeName(), keyArgs.getBucketName(), keyArgs.getKeyName(), + IAccessAuthorizer.ACLType.WRITE, OzoneObj.ResourceType.KEY, getOmRequest().getCommitKeyRequest().getClientID(), + getOmRequest()); + } + @Override + @SuppressWarnings("methodlength") + public OMClientResponse process(OzoneManager ozoneManager, TermIndex termIndex) throws IOException { + final long trxnLogIndex = termIndex.getIndex(); + CommitKeyRequest commitKeyRequest = getOmRequest().getCommitKeyRequest(); + KeyArgs commitKeyArgs = commitKeyRequest.getKeyArgs(); + + String volumeName = commitKeyArgs.getVolumeName(); + String bucketName = commitKeyArgs.getBucketName(); + String keyName = commitKeyArgs.getKeyName(); + + OMMetrics omMetrics = ozoneManager.getMetrics(); + AuditLogger auditLogger = ozoneManager.getAuditLogger(); + Map auditMap = buildKeyArgsAuditMap(commitKeyArgs); + + OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(getOmRequest()); + + Exception exception = null; + OmKeyInfo omKeyInfo = null; + OmBucketInfo omBucketInfo = null; + OMClientResponse omClientResponse = null; + OMClientRequest.Result result; + + OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); + + boolean isHSync = commitKeyRequest.hasHsync() && commitKeyRequest.getHsync(); + boolean isRecovery = commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery(); + // isHsync = true, a commit request as a result of client side hsync call + // isRecovery = true, a commit request as a result of client side recoverLease call + // none of isHsync and isRecovery is true, a commit request as a result of client side normal + // outputStream#close call. + if (isHSync) { + omMetrics.incNumKeyHSyncs(); + } else { + omMetrics.incNumKeyCommits(); + } + + LOG.debug("isHSync = {}, isRecovery = {}, volumeName = {}, bucketName = {}, keyName = {}", + isHSync, isRecovery, volumeName, bucketName, keyName); + + try { + String dbOzoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName, keyName); + + List locationInfoList = getOmKeyLocationInfos(ozoneManager, commitKeyArgs); + omBucketInfo = resolveBucket(ozoneManager, commitKeyArgs.getVolumeName(), commitKeyArgs.getBucketName()); + + // Check for directory exists with same name, if it exists throw error. + if (LOG.isDebugEnabled()) { + LOG.debug("BucketName: {}, BucketLayout: {}", omBucketInfo.getBucketName(), omBucketInfo.getBucketLayout()); + } + + // If bucket versioning is turned on during the update, between key + // creation and key commit, old versions will be just overwritten and + // not kept. Bucket versioning will be effective from the first key + // creation after the knob turned on. + OmKeyInfo keyToDelete = null; + if (ozoneManager.getConfiguration().getBoolean("ozone.om.leader.commit.request.old.key.get", true)) { + keyToDelete = captureLatencyNs(omMetrics.getKeyCommitGetKeyRate(), + () -> omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey)); + } + // OmKeyInfo keyToDelete = omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey); + long writerClientId = commitKeyRequest.getClientID(); + boolean isSameHsyncKey = false; + boolean isOverwrittenHsyncKey = false; + final String clientIdString = String.valueOf(writerClientId); + if (null != keyToDelete) { + isSameHsyncKey = java.util.Optional.of(keyToDelete).map(WithMetadata::getMetadata) + .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID)).filter(id -> id.equals(clientIdString)).isPresent(); + if (!isSameHsyncKey) { + isOverwrittenHsyncKey = java.util.Optional.of(keyToDelete).map(WithMetadata::getMetadata) + .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID)).isPresent() && !isRecovery; + } + } + + if (isRecovery && keyToDelete != null) { + String clientId = keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID); + if (clientId == null) { + throw new OMException("Failed to recovery key, as " + dbOzoneKey + " is already closed", KEY_ALREADY_CLOSED); + } + writerClientId = Long.parseLong(clientId); + } + String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName, keyName, writerClientId); + omKeyInfo = captureLatencyNs(omMetrics.getKeyCommitGetOpenKeyRate(), + () -> omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKey)); + if (omKeyInfo == null) { + String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit"; + throw new OMException("Failed to " + action + " key, as " + dbOpenKey + + " entry is not found in the OpenKey table", KEY_NOT_FOUND); + } else if (omKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY) || + omKeyInfo.getMetadata().containsKey(OzoneConsts.OVERWRITTEN_HSYNC_KEY)) { + throw new OMException("Open Key " + keyName + " is already deleted/overwritten", KEY_NOT_FOUND); + } + + if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) && + omKeyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID)) { + if (!isRecovery) { + throw new OMException("Cannot commit key " + dbOpenKey + " with " + OzoneConsts.LEASE_RECOVERY + + " metadata while recovery flag is not set in request", KEY_UNDER_LEASE_RECOVERY); + } + } + + if (isOverwrittenHsyncKey) { + // find the overwritten openKey and add OVERWRITTEN_HSYNC_KEY to it. + String dbOpenKeyName = omMetadataManager.getOpenKey(volumeName, bucketName, + keyName, Long.parseLong(keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID))); + OmKeyInfo dbOpenKeyUpdate = omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKeyName); + dbOpenKeyUpdate.getMetadata().put(OzoneConsts.OVERWRITTEN_HSYNC_KEY, "true"); + dbOpenKeyUpdate.setModificationTime(Time.now()); + dbOpenKeyUpdate.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled()); + CodecBuffer openKeyBuf = OmKeyInfo.getCodec(true).toDirectCodecBuffer(dbOpenKeyUpdate); + changeRecorder().add(omMetadataManager.getOpenKeyTable(getBucketLayout()).getName(), dbOpenKeyName, openKeyBuf); + } + + omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime()); + // non-null indicates it is necessary to update the open key + OmKeyInfo newOpenKeyInfo = null; + if (isHSync) { + if (!OmKeyHSyncUtil.isHSyncedPreviously(omKeyInfo, clientIdString, dbOpenKey)) { + // Update open key as well if it is the first hsync of this key + omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, clientIdString); + newOpenKeyInfo = omKeyInfo.copyObject(); + } + } + + validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap); + // Optimistic locking validation has passed. Now set the rewrite fields to null so they are + // not persisted in the key table. + omKeyInfo.setExpectedDataGeneration(null); + + omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(commitKeyArgs.getMetadataList())); + omKeyInfo.setDataSize(commitKeyArgs.getDataSize()); + // Update the block length for each block, return the allocated but uncommitted blocks + List uncommitted = omKeyInfo.updateLocationInfoList(locationInfoList, false); + + // Set the UpdateID to current transactionLogIndex + omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled()); + if (keyToDelete != null) { + omKeyInfo.setObjectID(0); + omKeyInfo.setObjectID(keyToDelete.getObjectID()); + } + + RepeatedOmKeyInfo blocksToRemove = new RepeatedOmKeyInfo(); + long correctedSpace = omKeyInfo.getReplicatedSize(); + long nameSpaceUsed = 0; + if (keyToDelete != null && (isSameHsyncKey)) { + correctedSpace -= keyToDelete.getReplicatedSize(); + } else if (keyToDelete != null && !omBucketInfo.getIsVersionEnabled()) { + // Subtract the size of blocks to be overwritten. + correctedSpace -= keyToDelete.getReplicatedSize(); + blocksToRemove.addOmKeyInfo(OmUtils.prepareKeyForDelete(keyToDelete, trxnLogIndex, + ozoneManager.isRatisEnabled()).getOmKeyInfoList().get(0)); + } else { + // if keyToDelete isn't null, usedNamespace needn't check and increase. + nameSpaceUsed = 1; + } + + // let the uncommitted blocks pretend as key's old version blocks which will be deleted as RepeatedOmKeyInfo + final OmKeyInfo pseudoKeyInfo = isHSync ? null + : OmKeyUtils.wrapUncommittedBlocksAsPseudoKey(uncommitted, omKeyInfo); + if (pseudoKeyInfo != null) { + blocksToRemove.addOmKeyInfo(pseudoKeyInfo); + } + if (blocksToRemove.getOmKeyInfoList().size() > 0) { + long pseudoObjId = ozoneManager.getObjectIdFromTxId(trxnLogIndex); + String delKeyName = omMetadataManager.getOzoneDeletePathKey(pseudoObjId, dbOzoneKey); + // filterOutBlocksStillInUse(omKeyInfo, blocksToRemove); + CodecBuffer oldVerInfoBuf = RepeatedOmKeyInfo.getCodec(true).toDirectCodecBuffer(blocksToRemove); + changeRecorder().add(omMetadataManager.getDeletedTable().getName(), delKeyName, oldVerInfoBuf); + } + + omBucketInfo.incrUsedNamespace(nameSpaceUsed); + omBucketInfo.incrUsedBytes(correctedSpace); + OmKeyUtils.checkUpdateBucketQuota(omBucketInfo, correctedSpace, nameSpaceUsed); + changeRecorder().add(omMetadataManager.getBucketTable().getName(), correctedSpace, nameSpaceUsed); + + // Add to cache of open key table and key table. + if (!isHSync) { + changeRecorder().add(omMetadataManager.getKeyTable(getBucketLayout()).getName(), dbOpenKey, null); + // Prevent hsync metadata from getting committed to the final key + omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID); + if (isRecovery) { + omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY); + } + } else if (newOpenKeyInfo != null) { + // isHSync is true and newOpenKeyInfo is set, update OpenKeyTable + CodecBuffer newOpenKeyBuf = OmKeyInfo.getCodec(true).toDirectCodecBuffer(newOpenKeyInfo); + changeRecorder().add(omMetadataManager.getOpenKeyTable(getBucketLayout()).getName(), dbOpenKey, newOpenKeyBuf); + } + + CodecBuffer omKeyBuf = OmKeyInfo.getCodec(true).toDirectCodecBuffer(omKeyInfo); + changeRecorder().add(omMetadataManager.getKeyTable(getBucketLayout()).getName(), dbOzoneKey, omKeyBuf); + + omClientResponse = new DummyOMClientResponse(omResponse.build()); + result = OMClientRequest.Result.SUCCESS; + } catch (IOException | InvalidPathException ex) { + result = OMClientRequest.Result.FAILURE; + exception = ex; + omClientResponse = new OMKeyCommitResponse(OmKeyUtils.createErrorOMResponse( + omResponse, exception), getBucketLayout()); + } + + // Debug logging for any key commit operation, successful or not + LOG.debug("Key commit {} with isHSync = {}, isRecovery = {}, omKeyInfo = {}", + result == OMClientRequest.Result.SUCCESS ? "succeeded" : "failed", isHSync, isRecovery, omKeyInfo); + + if (!isHSync) { + markForAudit(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap, + exception, getOmRequest().getUserInfo())); + processResult(commitKeyRequest, volumeName, bucketName, keyName, + omMetrics, exception, omKeyInfo, result); + } + + return omClientResponse; + } + + @Nonnull + protected List getOmKeyLocationInfos( + OzoneManager ozoneManager, KeyArgs commitKeyArgs) { + List locationInfoList = new ArrayList<>(); + for (KeyLocation keyLocation : commitKeyArgs.getKeyLocationsList()) { + OmKeyLocationInfo locationInfo = + OmKeyLocationInfo.getFromProtobuf(keyLocation); + + // Strip out tokens before adding to cache. + // This way during listStatus token information does not pass on to + // client when returning from cache. + if (ozoneManager.isGrpcBlockTokenEnabled()) { + locationInfo.setToken(null); + } + locationInfoList.add(locationInfo); + } + return locationInfoList; + } + + /** + * Process result of om request execution. + * + * @param commitKeyRequest commit key request + * @param volumeName volume name + * @param bucketName bucket name + * @param keyName key name + * @param omMetrics om metrics + * @param exception exception trace + * @param omKeyInfo omKeyInfo + * @param result stores the result of the execution + */ + @SuppressWarnings("parameternumber") + protected void processResult(CommitKeyRequest commitKeyRequest, + String volumeName, String bucketName, + String keyName, OMMetrics omMetrics, + Exception exception, OmKeyInfo omKeyInfo, + OMClientRequest.Result result) { + switch (result) { + case SUCCESS: + // As when we commit the key, then it is visible in ozone, so we should + // increment here. + // As key also can have multiple versions, we need to increment keys + // only if version is 0. Currently we have not complete support of + // versioning of keys. So, this can be revisited later. + if (omKeyInfo.getKeyLocationVersions().size() == 1) { + omMetrics.incNumKeys(); + } + if (commitKeyRequest.getKeyArgs().hasEcReplicationConfig()) { + omMetrics.incEcKeysTotal(); + } + omMetrics.incDataCommittedBytes(omKeyInfo.getDataSize()); + LOG.debug("Key committed. Volume:{}, Bucket:{}, Key:{}", volumeName, + bucketName, keyName); + break; + case FAILURE: + LOG.error("Key committed failed. Volume:{}, Bucket:{}, Key:{}. " + + "Exception:{}", volumeName, bucketName, keyName, exception); + if (commitKeyRequest.getKeyArgs().hasEcReplicationConfig()) { + omMetrics.incEcKeyCreateFailsTotal(); + } + omMetrics.incNumKeyCommitFails(); + break; + default: + LOG.error("Unrecognized Result for OMKeyCommitRequest: {}", + commitKeyRequest); + } + } + + @RequestFeatureValidator( + conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION, + processingPhase = RequestProcessingPhase.PRE_PROCESS, + requestType = Type.CommitKey + ) + public static OMRequest disallowCommitKeyWithECReplicationConfig( + OMRequest req, ValidationContext ctx) throws OMException { + if (!ctx.versionManager() + .isAllowed(OMLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT)) { + if (req.getCommitKeyRequest().getKeyArgs().hasEcReplicationConfig()) { + throw new OMException("Cluster does not have the Erasure Coded" + + " Storage support feature finalized yet, but the request contains" + + " an Erasure Coded replication type. Rejecting the request," + + " please finalize the cluster upgrade and then try again.", + OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION); + } + } + return req; + } + + /** + * Validates key commit requests. + * We do not want to allow older clients to commit keys associated with + * buckets which use non LEGACY layouts. + * + * @param req - the request to validate + * @param ctx - the validation context + * @return the validated request + * @throws OMException if the request is invalid + */ + @RequestFeatureValidator( + conditions = ValidationCondition.OLDER_CLIENT_REQUESTS, + processingPhase = RequestProcessingPhase.PRE_PROCESS, + requestType = Type.CommitKey + ) + public static OMRequest blockCommitKeyWithBucketLayoutFromOldClient( + OMRequest req, ValidationContext ctx) throws IOException { + if (req.getCommitKeyRequest().hasKeyArgs()) { + KeyArgs keyArgs = req.getCommitKeyRequest().getKeyArgs(); + + if (keyArgs.hasVolumeName() && keyArgs.hasBucketName()) { + BucketLayout bucketLayout = ctx.getBucketLayout( + keyArgs.getVolumeName(), keyArgs.getBucketName()); + bucketLayout.validateSupportedOperation(); + } + } + return req; + } + + @RequestFeatureValidator( + conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION, + processingPhase = RequestProcessingPhase.PRE_PROCESS, + requestType = Type.CommitKey + ) + public static OMRequest disallowHsync( + OMRequest req, ValidationContext ctx) throws OMException { + if (!ctx.versionManager() + .isAllowed(OMLayoutFeature.HBASE_SUPPORT)) { + CommitKeyRequest commitKeyRequest = req.getCommitKeyRequest(); + boolean isHSync = commitKeyRequest.hasHsync() && + commitKeyRequest.getHsync(); + if (isHSync) { + throw new OMException("Cluster does not have the hsync support " + + "feature finalized yet, but the request contains" + + " an hsync field. Rejecting the request," + + " please finalize the cluster upgrade and then try again.", + OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION); + } + } + return req; + } + + /** + * Validates key commit requests. + * We do not want to allow clients to perform lease recovery requests + * until the cluster has finalized the HBase support feature. + * + * @param req - the request to validate + * @param ctx - the validation context + * @return the validated request + * @throws OMException if the request is invalid + */ + @RequestFeatureValidator( + conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION, + processingPhase = RequestProcessingPhase.PRE_PROCESS, + requestType = Type.CommitKey + ) + public static OMRequest disallowRecovery( + OMRequest req, ValidationContext ctx) throws OMException { + if (!ctx.versionManager() + .isAllowed(OMLayoutFeature.HBASE_SUPPORT)) { + CommitKeyRequest commitKeyRequest = req.getCommitKeyRequest(); + boolean isRecovery = commitKeyRequest.hasRecovery() && + commitKeyRequest.getRecovery(); + if (isRecovery) { + throw new OMException("Cluster does not have the HBase support " + + "feature finalized yet, but the request contains" + + " an recovery field. Rejecting the request," + + " please finalize the cluster upgrade and then try again.", + OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION); + } + } + return req; + } + + protected void validateAtomicRewrite(OmKeyInfo existing, OmKeyInfo toCommit, Map auditMap) + throws OMException { + if (toCommit.getExpectedDataGeneration() != null) { + // These values are not passed in the request keyArgs, so add them into the auditMap if they are present + // in the open key entry. + auditMap.put(OzoneConsts.REWRITE_GENERATION, String.valueOf(toCommit.getExpectedDataGeneration())); + if (existing == null) { + throw new OMException("Atomic rewrite is not allowed for a new key", KEY_NOT_FOUND); + } + if (!toCommit.getExpectedDataGeneration().equals(existing.getUpdateID())) { + throw new OMException("Cannot commit as current generation (" + existing.getUpdateID() + + ") does not match the expected generation to rewrite (" + toCommit.getExpectedDataGeneration() + ")", + KEY_NOT_FOUND); + } + } + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OMKeyCreateRequest.java new file mode 100644 index 00000000000..343908925d6 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OMKeyCreateRequest.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis.execution.request; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.utils.UniqueId; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.OzoneManagerVersion; +import org.apache.hadoop.ozone.audit.OMAction; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OzoneConfigUtil; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.request.OMClientRequestUtils; +import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; +import org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator; +import org.apache.hadoop.ozone.om.request.validation.RequestProcessingPhase; +import org.apache.hadoop.ozone.om.request.validation.ValidationCondition; +import org.apache.hadoop.ozone.om.request.validation.ValidationContext; +import org.apache.hadoop.ozone.om.response.DummyOMClientResponse; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.apache.ratis.server.protocol.TermIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE; +import static org.apache.hadoop.ozone.om.request.OMClientRequest.validateAndNormalizeKey; + +/** + * Handles CreateKey request. + */ +public class OMKeyCreateRequest extends OmRequestBase { + private static final Logger LOG = + LoggerFactory.getLogger(OMKeyCreateRequest.class); + + public OMKeyCreateRequest(OMRequest omRequest, OmBucketInfo bucketInfo) { + super(omRequest, bucketInfo); + } + + public OMRequest preProcess(OzoneManager ozoneManager) throws IOException { + OMRequest request = super.preProcess(ozoneManager); + + CreateKeyRequest createKeyRequest = request.getCreateKeyRequest(); + Preconditions.checkNotNull(createKeyRequest); + + KeyArgs keyArgs = createKeyRequest.getKeyArgs(); + if (keyArgs.hasExpectedDataGeneration()) { + ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY); + } + + OmUtils.verifyKeyNameWithSnapshotReservedWord(keyArgs.getKeyName()); + final boolean checkKeyNameEnabled = ozoneManager.getConfiguration() + .getBoolean(OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_KEY, + OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_DEFAULT); + if (checkKeyNameEnabled) { + OmUtils.validateKeyName(keyArgs.getKeyName()); + } + + String keyPath = keyArgs.getKeyName(); + //OmBucketInfo bucketInfo = OmKeyUtils.resolveBucket(ozoneManager, keyArgs); + keyPath = validateAndNormalizeKey(ozoneManager.getEnableFileSystemPaths(), keyPath, getBucketLayout()); + keyArgs = keyArgs.toBuilder().setVolumeName(getBucketInfo().getVolumeName()) + .setBucketName(getBucketInfo().getBucketName()).setKeyName(keyPath).setModificationTime(Time.now()).build(); + + createKeyRequest = createKeyRequest.toBuilder().setKeyArgs(keyArgs).setClientID(UniqueId.next()).build(); + return request.toBuilder().setCreateKeyRequest(createKeyRequest).build(); + } + + public void authorize(OzoneManager ozoneManager) throws IOException { + KeyArgs keyArgs = getOmRequest().getCreateKeyRequest().getKeyArgs(); + OmKeyUtils.checkKeyAcls(ozoneManager, keyArgs.getVolumeName(), keyArgs.getBucketName(), keyArgs.getKeyName(), + IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY, getOmRequest()); + } + + public OMClientResponse process(OzoneManager ozoneManager, TermIndex termIndex) throws IOException { + CreateKeyRequest createKeyRequest = getOmRequest().getCreateKeyRequest(); + KeyArgs keyArgs = createKeyRequest.getKeyArgs(); + OMClientResponse omClientResponse = null; + Exception exception = null; + + OMMetrics omMetrics = ozoneManager.getMetrics(); + try { + BucketLayout bucketLayout = getBucketLayout(); + OmBucketInfo bucketInfo = resolveBucket(ozoneManager, keyArgs.getVolumeName(), keyArgs.getBucketName()); + OMClientRequestUtils.checkClientRequestPrecondition(bucketInfo.getBucketLayout(), bucketLayout); + + // Check if Key already exists + /*String dbKeyName = ozoneManager.getMetadataManager().getOzoneKey(keyArgs.getVolumeName(), + keyArgs.getBucketName(), keyArgs.getKeyName()); + OmKeyInfo dbKeyInfo = ozoneManager.getMetadataManager().getKeyTable(getBucketLayout()).getIfExist(dbKeyName); + validateAtomicRewrite(dbKeyInfo, keyArgs);*/ + + // authorize + //OmKeyUtils.checkKeyAcls(ozoneManager, keyArgs.getVolumeName(), keyArgs.getBucketName(), keyArgs.getKeyName(), + // IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY, getOmRequest()); + + // prepare + FileEncryptionInfo encInfo; + if (keyArgs.getIsMultipartKey()) { + encInfo = OmKeyUtils.getFileEncryptionInfoForMpuKey(keyArgs, ozoneManager, bucketLayout); + } else { + encInfo = OmKeyUtils.getFileEncryptionInfo(ozoneManager, bucketInfo).orElse(null); + } + + long trxnLogIndex = termIndex.getIndex(); + final ReplicationConfig repConfig = OzoneConfigUtil.resolveReplicationConfigPreference(keyArgs.getType(), + keyArgs.getFactor(), keyArgs.getEcReplicationConfig(), bucketInfo.getDefaultReplicationConfig(), + ozoneManager); + OmKeyInfo omKeyInfo = OmKeyUtils.prepareKeyInfo(ozoneManager.getMetadataManager(), keyArgs, null, + 0, Collections.emptyList(), encInfo, + ozoneManager.getPrefixManager(), bucketInfo, null, trxnLogIndex, + ozoneManager.getObjectIdFromTxId(trxnLogIndex), + ozoneManager.isRatisEnabled(), repConfig); + if (!keyArgs.getIsMultipartKey()) { + addBlockInfo(ozoneManager, keyArgs, repConfig, omKeyInfo); + // check bucket and volume quota + OmKeyUtils.checkBucketQuotaInBytes(bucketInfo, omKeyInfo.getReplicatedSize()); + } + + // add changes + long clientID = createKeyRequest.getClientID(); + String dbOpenKeyName = ozoneManager.getMetadataManager().getOpenKey(keyArgs.getVolumeName(), + keyArgs.getBucketName(), keyArgs.getKeyName(), clientID); + CodecBuffer omKeyCodecBuffer = OmKeyInfo.getCodec(true).toDirectCodecBuffer(omKeyInfo); + changeRecorder().add(ozoneManager.getMetadataManager().getOpenKeyTable(bucketLayout).getName(), dbOpenKeyName, + omKeyCodecBuffer); + + // Prepare response + OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(getOmRequest()); + long openVersion = omKeyInfo.getLatestVersionLocations().getVersion(); + omResponse.setCreateKeyResponse(CreateKeyResponse.newBuilder() + .setKeyInfo(omKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), keyArgs.getLatestVersionLocation())) + .setID(clientID).setOpenVersion(openVersion).build()) + .setCmdType(Type.CreateKey); + omClientResponse = new DummyOMClientResponse(omResponse.build()); + omMetrics.incNumKeyAllocates(); + } catch (Exception ex) { + omMetrics.incNumKeyAllocateFails(); + exception = ex; + OMResponse rsp = OmKeyUtils.createErrorOMResponse(OmResponseUtil.getOMResponseBuilder(getOmRequest()), ex); + omClientResponse = new DummyOMClientResponse(rsp); + } + + // Audit Log outside the lock + Map auditMap = buildKeyArgsAuditMap(keyArgs); + markForAudit(ozoneManager.getAuditLogger(), buildAuditMessage( + OMAction.ALLOCATE_KEY, auditMap, exception, + getOmRequest().getUserInfo())); + + logResult(createKeyRequest, omMetrics, exception, OMClientRequest.Result.SUCCESS, 0); + return omClientResponse; + } + + public void addBlockInfo(OzoneManager ozoneManager, KeyArgs keyArgs, + ReplicationConfig repConfig, OmKeyInfo omKeyInfo) throws IOException { + long scmBlockSize = ozoneManager.getScmBlockSize(); + // NOTE size of a key is not a hard limit on anything, it is a value that + // client should expect, in terms of current size of key. If client sets + // a value, then this value is used, otherwise, we allocate a single + // block which is the current size, if read by the client. + final long requestedSize = keyArgs.getDataSize() > 0 ? keyArgs.getDataSize() : scmBlockSize; + + List newLocationList = null; + if (!ozoneManager.getConfiguration().getBoolean("ozone.om.leader.request.dummy.block", true)) { + UserInfo userInfo = getOmRequest().getUserInfo(); + List omKeyLocationInfoList = OmKeyUtils.allocateBlock(ozoneManager.getScmClient(), + ozoneManager.getBlockTokenSecretManager(), repConfig, + new ExcludeList(), requestedSize, scmBlockSize, + ozoneManager.getPreallocateBlocksMax(), ozoneManager.isGrpcBlockTokenEnabled(), + ozoneManager.getOMServiceId(), ozoneManager.getMetrics(), + keyArgs.getSortDatanodes(), userInfo); + // convert to proto and convert back as to filter out in existing logic + newLocationList = omKeyLocationInfoList.stream() + .map(info -> info.getProtobuf(false, getOmRequest().getVersion())).map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList()); + } else { + newLocationList = allocateDummyBlocks(scmBlockSize, ozoneManager); + } + omKeyInfo.appendNewBlocks(newLocationList, false); + omKeyInfo.setDataSize(requestedSize + omKeyInfo.getDataSize()); + } + + private List allocateDummyBlocks(long scmBlockSize, OzoneManager ozoneManager) throws IOException { + BlockID blockID = new BlockID(1L, 1L); + OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() + .setBlockID(blockID) + .setLength(scmBlockSize) + .setOffset(0) + .setPipeline(Pipeline.newBuilder().setId(PipelineID.randomId()) + .setReplicationConfig(ozoneManager.getDefaultReplicationConfig()).setState(Pipeline.PipelineState.OPEN) + .setNodes(Collections.emptyList()).build()); + if (ozoneManager.isGrpcBlockTokenEnabled()) { + UserGroupInformation ugi = Server.getRemoteUser(); + builder.setToken(ozoneManager.getBlockTokenSecretManager().generateToken( + ((ugi != null) ? ugi : UserGroupInformation.getCurrentUser()).getShortUserName(), blockID, + EnumSet.of(READ, WRITE), scmBlockSize)); + } + List locationInfos = new ArrayList<>(); + locationInfos.add(builder.build()); + return locationInfos; + } + + protected void logResult(CreateKeyRequest createKeyRequest, + OMMetrics omMetrics, Exception exception, OMClientRequest.Result result, + int numMissingParents) { + switch (result) { + case SUCCESS: + // Missing directories are created immediately, counting that here. + // The metric for the key is incremented as part of the key commit. + omMetrics.incNumKeys(numMissingParents); + LOG.debug("Key created. Volume:{}, Bucket:{}, Key:{}", + createKeyRequest.getKeyArgs().getVolumeName(), + createKeyRequest.getKeyArgs().getBucketName(), + createKeyRequest.getKeyArgs().getKeyName()); + break; + case FAILURE: + if (createKeyRequest.getKeyArgs().hasEcReplicationConfig()) { + omMetrics.incEcKeyCreateFailsTotal(); + } + LOG.error("Key creation failed. Volume:{}, Bucket:{}, Key:{}. ", + createKeyRequest.getKeyArgs().getVolumeName(), + createKeyRequest.getKeyArgs().getBucketName(), + createKeyRequest.getKeyArgs().getKeyName(), exception); + break; + default: + LOG.error("Unrecognized Result for OMKeyCreateRequest: {}", + createKeyRequest); + } + } + + @RequestFeatureValidator( + conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION, + processingPhase = RequestProcessingPhase.PRE_PROCESS, + requestType = Type.CreateKey + ) + public static OMRequest disallowCreateKeyWithECReplicationConfig( + OMRequest req, ValidationContext ctx) throws OMException { + if (!ctx.versionManager() + .isAllowed(OMLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT)) { + if (req.getCreateKeyRequest().getKeyArgs().hasEcReplicationConfig()) { + throw new OMException("Cluster does not have the Erasure Coded" + + " Storage support feature finalized yet, but the request contains" + + " an Erasure Coded replication type. Rejecting the request," + + " please finalize the cluster upgrade and then try again.", + OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION); + } + } + return req; + } + + @RequestFeatureValidator( + conditions = ValidationCondition.OLDER_CLIENT_REQUESTS, + processingPhase = RequestProcessingPhase.PRE_PROCESS, + requestType = Type.CreateKey + ) + public static OMRequest blockCreateKeyWithBucketLayoutFromOldClient( + OMRequest req, ValidationContext ctx) throws IOException { + if (req.getCreateKeyRequest().hasKeyArgs()) { + KeyArgs keyArgs = req.getCreateKeyRequest().getKeyArgs(); + + if (keyArgs.hasVolumeName() && keyArgs.hasBucketName()) { + BucketLayout bucketLayout = ctx.getBucketLayout( + keyArgs.getVolumeName(), keyArgs.getBucketName()); + bucketLayout.validateSupportedOperation(); + } + } + return req; + } + + /*protected void validateAtomicRewrite(OmKeyInfo dbKeyInfo, KeyArgs keyArgs) + throws OMException { + if (keyArgs.hasExpectedDataGeneration()) { + // If a key does not exist, or if it exists but the updateID do not match, then fail this request. + if (dbKeyInfo == null) { + throw new OMException("Key not found during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND); + } + if (dbKeyInfo.getUpdateID() != keyArgs.getExpectedDataGeneration()) { + throw new OMException("Generation mismatch during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND); + } + } + }*/ +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OmKeyUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OmKeyUtils.java new file mode 100644 index 00000000000..154f40ae5f4 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OmKeyUtils.java @@ -0,0 +1,718 @@ +package org.apache.hadoop.ozone.om.ratis.execution.request; + +import com.google.common.base.Preconditions; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.file.InvalidPathException; +import java.security.GeneralSecurityException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ozone.OzoneAcl; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.IOmMetadataReader; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OmMetadataReader; +import org.apache.hadoop.ozone.om.OzoneAclUtils; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.PrefixManager; +import org.apache.hadoop.ozone.om.ScmClient; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo; +import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil; +import org.apache.hadoop.ozone.om.protocolPB.grpc.GrpcClientConstants; +import org.apache.hadoop.ozone.om.ratis.execution.BucketQuotaResource; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.om.request.file.OMFileRequest; +import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE; +import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser; +import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS; +import static org.apache.hadoop.ozone.OzoneConsts.OBJECT_ID_RECLAIM_BLOCKS; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.UNAUTHORIZED; +import static org.apache.hadoop.util.Time.monotonicNow; + +/** + * key utils for key operations. + */ +public final class OmKeyUtils { + private static final Logger LOG = LoggerFactory.getLogger(OmKeyUtils.class); + + private OmKeyUtils() { + } + + public static void checkKeyAcls( + OzoneManager ozoneManager, String volume, String bucket, String key, IAccessAuthorizer.ACLType aclType, + OzoneObj.ResourceType resourceType, OMRequest omRequest) throws IOException { + if (ozoneManager.getAclsEnabled()) { + checkAcls(ozoneManager, resourceType, OzoneObj.StoreType.OZONE, aclType, volume, bucket, key, omRequest); + // Additional bucket ACL check for create key + try (ReferenceCounted rcMetadataReader = ozoneManager.getOmMetadataReader()) { + OzoneAclUtils.checkAllAcls((OmMetadataReader) rcMetadataReader.get(), OzoneObj.ResourceType.BUCKET, + OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.READ, volume, bucket, null, + ozoneManager.getVolumeOwner(volume, aclType, OzoneObj.ResourceType.BUCKET), + ozoneManager.getBucketOwner(volume, bucket, aclType, OzoneObj.ResourceType.BUCKET), + createUGIForApi(omRequest), getRemoteAddress(omRequest), getHostName(omRequest)); + } + } + } + @SuppressWarnings("parameternumber") + public static void checkOpenKeyAcls( + OzoneManager ozoneManager, String volume, String bucket, String key, IAccessAuthorizer.ACLType aclType, + OzoneObj.ResourceType resourceType, long clientId, OMRequest omRequest) throws IOException { + if (ozoneManager.getAclsEnabled()) { + // Native authorizer requires client id as part of key name to check + // write ACL on key. Add client id to key name if ozone native + // authorizer is configured. + if (ozoneManager.getAccessAuthorizer().isNative()) { + key = key + "/" + clientId; + } + checkAcls(ozoneManager, resourceType, OzoneObj.StoreType.OZONE, aclType, + volume, bucket, key, omRequest); + } + } + @SuppressWarnings("parameternumber") + public static void checkAcls( + OzoneManager ozoneManager, OzoneObj.ResourceType resType, OzoneObj.StoreType storeType, + IAccessAuthorizer.ACLType aclType, String vol, String bucket, String key, + OMRequest omRequest) throws IOException { + checkAcls(ozoneManager, resType, storeType, aclType, vol, bucket, key, + ozoneManager.getVolumeOwner(vol, aclType, resType), + ozoneManager.getBucketOwner(vol, bucket, aclType, resType), omRequest); + } + @SuppressWarnings("parameternumber") + public static void checkAcls( + OzoneManager ozoneManager, OzoneObj.ResourceType resType, OzoneObj.StoreType storeType, + IAccessAuthorizer.ACLType aclType, String vol, String bucket, String key, String volOwner, String bucketOwner, + OMRequest omRequest) throws IOException { + try (ReferenceCounted rcMetadataReader = ozoneManager.getOmMetadataReader()) { + OzoneAclUtils.checkAllAcls((OmMetadataReader) rcMetadataReader.get(), resType, storeType, aclType, vol, bucket, + key, volOwner, bucketOwner, createUGIForApi(omRequest), getRemoteAddress(omRequest), getHostName(omRequest)); + } + } + + /** + * For non-rpc internal calls Server.getRemoteUser() + * and Server.getRemoteIp() will be null. + * Passing getCurrentUser() and Ip of the Om node that started it. + * @return User Info. + */ + public static OzoneManagerProtocolProtos.UserInfo getUserIfNotExists( + OzoneManager ozoneManager, OMRequest omRequest) throws IOException { + OzoneManagerProtocolProtos.UserInfo userInfo = getUserInfo(omRequest); + if (!userInfo.hasRemoteAddress() || !userInfo.hasUserName()) { + OzoneManagerProtocolProtos.UserInfo.Builder newuserInfo = + OzoneManagerProtocolProtos.UserInfo.newBuilder(); + UserGroupInformation user; + InetAddress remoteAddress; + try { + user = UserGroupInformation.getCurrentUser(); + remoteAddress = ozoneManager.getOmRpcServerAddr() + .getAddress(); + } catch (Exception e) { + LOG.debug("Couldn't get om Rpc server address", e); + return getUserInfo(omRequest); + } + newuserInfo.setUserName(user.getUserName()); + newuserInfo.setHostName(remoteAddress.getHostName()); + newuserInfo.setRemoteAddress(remoteAddress.getHostAddress()); + return newuserInfo.build(); + } + return getUserInfo(omRequest); + } + + /** + * Get User information which needs to be set in the OMRequest object. + * @return User Info. + */ + public static OzoneManagerProtocolProtos.UserInfo getUserInfo( + OzoneManagerProtocolProtos.OMRequest omRequest) throws IOException { + UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser(); + InetAddress remoteAddress = ProtobufRpcEngine.Server.getRemoteIp(); + OzoneManagerProtocolProtos.UserInfo.Builder userInfo = + OzoneManagerProtocolProtos.UserInfo.newBuilder(); + + // If S3 Authentication is set, determine user based on access ID. + if (omRequest.hasS3Authentication()) { + String principal = OzoneAclUtils.accessIdToUserPrincipal( + omRequest.getS3Authentication().getAccessId()); + userInfo.setUserName(principal); + } else if (user != null) { + // Added not null checks, as in UT's these values might be null. + userInfo.setUserName(user.getUserName()); + } + + // for gRPC s3g omRequests that contain user name + if (user == null && omRequest.hasUserInfo()) { + userInfo.setUserName(omRequest.getUserInfo().getUserName()); + } + + String grpcContextClientIpAddress = + GrpcClientConstants.CLIENT_IP_ADDRESS_CTX_KEY.get(); + String grpcContextClientHostname = + GrpcClientConstants.CLIENT_HOSTNAME_CTX_KEY.get(); + if (remoteAddress != null) { + userInfo.setHostName(remoteAddress.getHostName()); + userInfo.setRemoteAddress(remoteAddress.getHostAddress()).build(); + } else if (grpcContextClientHostname != null + && grpcContextClientIpAddress != null) { + userInfo.setHostName(grpcContextClientHostname); + userInfo.setRemoteAddress(grpcContextClientIpAddress); + } + + return userInfo.build(); + } + /** + * Return InetAddress created from OMRequest userInfo. If userInfo is not + * set, returns null. + * @return InetAddress + * @throws IOException + */ + public static InetAddress getRemoteAddress(OzoneManagerProtocolProtos.OMRequest omRequest) throws IOException { + if (omRequest.hasUserInfo()) { + return InetAddress.getByName(omRequest.getUserInfo() + .getRemoteAddress()); + } else { + return null; + } + } + /** + * Return String created from OMRequest userInfo. If userInfo is not + * set, returns null. + * @return String + */ + public static String getHostName(OzoneManagerProtocolProtos.OMRequest omRequest) { + if (omRequest.hasUserInfo()) { + return omRequest.getUserInfo().getHostName(); + } else { + return null; + } + } + /** + * Return UGI object created from OMRequest userInfo. If userInfo is not + * set, returns null. + * @return UserGroupInformation. + */ + public static UserGroupInformation createUGI(OzoneManagerProtocolProtos.OMRequest omRequest) + throws AuthenticationException { + if (omRequest.hasUserInfo() && + !StringUtils.isBlank(omRequest.getUserInfo().getUserName())) { + return UserGroupInformation.createRemoteUser( + omRequest.getUserInfo().getUserName()); + } else { + throw new AuthenticationException("User info is not set." + + " Please check client auth credentials"); + } + } + + /** + * Crete a UGI from request and wrap the AuthenticationException + * to OMException in case of empty credentials. + * @return UserGroupInformation + * @throws OMException exception about an empty user credential + * (unauthorized request) + */ + public static UserGroupInformation createUGIForApi(OzoneManagerProtocolProtos.OMRequest omRequest) + throws OMException { + UserGroupInformation ugi; + try { + ugi = createUGI(omRequest); + } catch (AuthenticationException e) { + throw new OMException(e, UNAUTHORIZED); + } + return ugi; + } + /* Optimize ugi lookup for RPC operations to avoid a trip through + * UGI.getCurrentUser which is synch'ed. + */ + private static UserGroupInformation getRemoteUser() throws IOException { + UserGroupInformation ugi = Server.getRemoteUser(); + return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); + } + + public static Optional getFileEncryptionInfo( + OzoneManager ozoneManager, OmBucketInfo bucketInfo) throws IOException { + Optional encInfo = Optional.empty(); + if (ozoneManager.getKmsProvider() == null) { + return encInfo; + } + BucketEncryptionKeyInfo ezInfo = bucketInfo.getEncryptionKeyInfo(); + if (ezInfo != null) { + final String ezKeyName = ezInfo.getKeyName(); + KeyProviderCryptoExtension.EncryptedKeyVersion edek = generateEDEK(ozoneManager, ezKeyName); + encInfo = Optional.of(new FileEncryptionInfo(ezInfo.getSuite(), + ezInfo.getVersion(), + edek.getEncryptedKeyVersion().getMaterial(), + edek.getEncryptedKeyIv(), ezKeyName, + edek.getEncryptionKeyVersionName())); + } + return encInfo; + } + + private static KeyProviderCryptoExtension.EncryptedKeyVersion generateEDEK( + OzoneManager ozoneManager, String ezKeyName) throws IOException { + if (ezKeyName == null) { + return null; + } + long generateEDEKStartTime = monotonicNow(); + KeyProviderCryptoExtension.EncryptedKeyVersion edek = SecurityUtil.doAsLoginUser( + new PrivilegedExceptionAction() { + @Override + public KeyProviderCryptoExtension.EncryptedKeyVersion run() throws IOException { + try { + return ozoneManager.getKmsProvider() + .generateEncryptedKey(ezKeyName); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + }); + long generateEDEKTime = monotonicNow() - generateEDEKStartTime; + LOG.debug("generateEDEK takes {} ms", generateEDEKTime); + Preconditions.checkNotNull(edek); + return edek; + } + + public static FileEncryptionInfo getFileEncryptionInfoForMpuKey( + OzoneManagerProtocolProtos.KeyArgs keyArgs, OzoneManager ozoneManager, BucketLayout layout) throws IOException { + if (ozoneManager.getKmsProvider() != null) { + OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); + String dbMultipartOpenKey = omMetadataManager.getMultipartKey(keyArgs.getVolumeName(), + keyArgs.getBucketName(), keyArgs.getKeyName(), keyArgs.getMultipartUploadID()); + OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable(layout).get(dbMultipartOpenKey); + if (omKeyInfo != null) { + return omKeyInfo.getFileEncryptionInfo(); + } else { + LOG.warn("omKeyInfo not found. Key: " + dbMultipartOpenKey + + ". The upload id " + keyArgs.getMultipartUploadID() + " may be invalid."); + } + } + return null; + } + /** + * Check bucket quota in bytes. + * @param omBucketInfo + * @param allocateSize + * @throws IOException + */ + public static void checkBucketQuotaInBytes(OmBucketInfo omBucketInfo, long allocateSize) throws IOException { + if (omBucketInfo.getQuotaInBytes() > OzoneConsts.QUOTA_RESET) { + BucketQuotaResource.BucketQuota bucketQuota = BucketQuotaResource.instance().get(omBucketInfo.getObjectID()); + long curUsedBytes = bucketQuota.addUsedBytes(allocateSize); + curUsedBytes += omBucketInfo.getUsedBytes(); + long quotaInBytes = omBucketInfo.getQuotaInBytes(); + if (quotaInBytes < curUsedBytes) { + bucketQuota.addUsedBytes(-allocateSize); + throw new OMException("The DiskSpace quota of bucket:" + omBucketInfo.getBucketName() + + " exceeded quotaInBytes: " + quotaInBytes + " Bytes but diskspace consumed: " + + curUsedBytes + " Bytes.", OMException.ResultCodes.QUOTA_EXCEEDED); + } + bucketQuota.addUsedBytes(-allocateSize); + } + } + /** + * Check bucket quota in bytes. + * @param omBucketInfo + * @param allocateSize + * @param allocateNamespace + * @throws IOException + */ + public static void checkUpdateBucketQuota(OmBucketInfo omBucketInfo, long allocateSize, long allocateNamespace) + throws IOException { + BucketQuotaResource.BucketQuota bucketQuota = BucketQuotaResource.instance().get(omBucketInfo.getObjectID()); + if (omBucketInfo.getQuotaInBytes() > OzoneConsts.QUOTA_RESET) { + long curUsedBytes = bucketQuota.addUsedBytes(allocateSize); + curUsedBytes += omBucketInfo.getUsedBytes(); + if (omBucketInfo.getQuotaInBytes() < curUsedBytes) { + bucketQuota.addUsedBytes(-allocateSize); + throw new OMException("The DiskSpace quota of bucket:" + omBucketInfo.getBucketName() + + " exceeded quotaInBytes: " + omBucketInfo.getQuotaInBytes() + " Bytes but diskspace consumed: " + + curUsedBytes + " Bytes.", OMException.ResultCodes.QUOTA_EXCEEDED); + } + } + if (omBucketInfo.getQuotaInNamespace() > OzoneConsts.QUOTA_RESET) { + long curUsedNamespace = bucketQuota.addUsedNamespace(allocateNamespace); + curUsedNamespace += omBucketInfo.getUsedNamespace(); + if (omBucketInfo.getQuotaInNamespace() < curUsedNamespace) { + bucketQuota.addUsedBytes(-allocateSize); + bucketQuota.addUsedNamespace(-allocateNamespace); + throw new OMException("The namespace quota of Bucket:" + omBucketInfo.getBucketName() + + " exceeded: quotaInNamespace: " + omBucketInfo.getQuotaInNamespace() + " but namespace consumed: " + + (curUsedNamespace + allocateNamespace) + ".", OMException.ResultCodes.QUOTA_EXCEEDED); + } + } + } + /** + * Wrap the uncommitted blocks as pseudoKeyInfo. + * + * @param uncommitted Uncommitted OmKeyLocationInfo + * @param omKeyInfo Args for key block + * @return pseudoKeyInfo + */ + public static OmKeyInfo wrapUncommittedBlocksAsPseudoKey(List uncommitted, OmKeyInfo omKeyInfo) { + if (uncommitted.isEmpty()) { + return null; + } + LOG.debug("Detect allocated but uncommitted blocks {} in key {}.", uncommitted, omKeyInfo.getKeyName()); + OmKeyInfo pseudoKeyInfo = omKeyInfo.copyObject(); + // This is a special marker to indicate that SnapshotDeletingService + // can reclaim this key's blocks unconditionally. + pseudoKeyInfo.setObjectID(OBJECT_ID_RECLAIM_BLOCKS); + // TODO dataSize of pseudoKey is not real here + List uncommittedGroups = new ArrayList<>(); + // version not matters in the current logic of keyDeletingService, + // all versions of blocks will be deleted. + uncommittedGroups.add(new OmKeyLocationInfoGroup(0, uncommitted)); + pseudoKeyInfo.setKeyLocationVersions(uncommittedGroups); + return pseudoKeyInfo; + } + /** + * Prepare OmKeyInfo which will be persisted to openKeyTable. + * @return OmKeyInfo + * @throws IOException + */ + @SuppressWarnings("parameternumber") + public static OmKeyInfo prepareKeyInfo( + @Nonnull OMMetadataManager omMetadataManager, + @Nonnull OzoneManagerProtocolProtos.KeyArgs keyArgs, OmKeyInfo dbKeyInfo, long size, + @Nonnull List locations, + @Nullable FileEncryptionInfo encInfo, + @Nonnull PrefixManager prefixManager, + @Nullable OmBucketInfo omBucketInfo, + OMFileRequest.OMPathInfo omPathInfo, + long transactionLogIndex, long objectID, boolean isRatisEnabled, + ReplicationConfig replicationConfig) + throws IOException { + + return prepareFileInfo(omMetadataManager, keyArgs, dbKeyInfo, size, + locations, encInfo, prefixManager, omBucketInfo, omPathInfo, + transactionLogIndex, objectID, isRatisEnabled, replicationConfig); + } + + /** + * Prepare OmKeyInfo which will be persisted to openKeyTable. + * @return OmKeyInfo + * @throws IOException + */ + @SuppressWarnings("parameternumber") + private static OmKeyInfo prepareFileInfo( + @Nonnull OMMetadataManager omMetadataManager, + @Nonnull OzoneManagerProtocolProtos.KeyArgs keyArgs, OmKeyInfo dbKeyInfo, long size, + @Nonnull List locations, + @Nullable FileEncryptionInfo encInfo, + @Nonnull PrefixManager prefixManager, + @Nullable OmBucketInfo omBucketInfo, + OMFileRequest.OMPathInfo omPathInfo, + long transactionLogIndex, long objectID, + boolean isRatisEnabled, ReplicationConfig replicationConfig) + throws IOException { + if (keyArgs.getIsMultipartKey()) { + return prepareMultipartFileInfo(omMetadataManager, keyArgs, + size, locations, encInfo, prefixManager, omBucketInfo, + omPathInfo, transactionLogIndex, objectID); + //TODO args.getMetadata + } + if (dbKeyInfo != null) { + // The key already exist, the new blocks will replace old ones + // as new versions unless the bucket does not have versioning + // turned on. + dbKeyInfo.addNewVersion(locations, false, + omBucketInfo.getIsVersionEnabled()); + long newSize = size; + if (omBucketInfo.getIsVersionEnabled()) { + newSize += dbKeyInfo.getDataSize(); + } + dbKeyInfo.setDataSize(newSize); + // The modification time is set in preExecute. Use the same + // modification time. + dbKeyInfo.setModificationTime(keyArgs.getModificationTime()); + dbKeyInfo.setUpdateID(transactionLogIndex, isRatisEnabled); + dbKeyInfo.setReplicationConfig(replicationConfig); + + // Construct a new metadata map from KeyArgs. + dbKeyInfo.getMetadata().clear(); + dbKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf( + keyArgs.getMetadataList())); + + // Construct a new tags from KeyArgs + // Clear the old one when the key is overwritten + dbKeyInfo.getTags().clear(); + dbKeyInfo.getTags().putAll(KeyValueUtil.getFromProtobuf( + keyArgs.getTagsList())); + + if (keyArgs.hasExpectedDataGeneration()) { + dbKeyInfo.setExpectedDataGeneration(keyArgs.getExpectedDataGeneration()); + } + + dbKeyInfo.setFileEncryptionInfo(encInfo); + return dbKeyInfo; + } + + // the key does not exist, create a new object. + // Blocks will be appended as version 0. + return createFileInfo(keyArgs, locations, replicationConfig, + keyArgs.getDataSize(), encInfo, prefixManager, + omBucketInfo, omPathInfo, transactionLogIndex, objectID); + } + + /** + * Create OmKeyInfo object. + * @return OmKeyInfo + */ + @SuppressWarnings("parameterNumber") + public static OmKeyInfo createFileInfo( + @Nonnull OzoneManagerProtocolProtos.KeyArgs keyArgs, + @Nonnull List locations, + @Nonnull ReplicationConfig replicationConfig, + long size, + @Nullable FileEncryptionInfo encInfo, + @Nonnull PrefixManager prefixManager, + @Nullable OmBucketInfo omBucketInfo, + OMFileRequest.OMPathInfo omPathInfo, + long transactionLogIndex, long objectID) { + OmKeyInfo.Builder builder = new OmKeyInfo.Builder(); + builder.setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setOmKeyLocationInfos(Collections.singletonList(new OmKeyLocationInfoGroup(0, locations))) + .setCreationTime(keyArgs.getModificationTime()) + .setModificationTime(keyArgs.getModificationTime()) + .setDataSize(size) + .setReplicationConfig(replicationConfig) + .setFileEncryptionInfo(encInfo) + .setAcls(getAclsForKey(keyArgs, omBucketInfo, omPathInfo, prefixManager)) + .addAllMetadata(KeyValueUtil.getFromProtobuf(keyArgs.getMetadataList())) + .addAllTags(KeyValueUtil.getFromProtobuf(keyArgs.getTagsList())) + .setUpdateID(transactionLogIndex) + .setOwnerName(keyArgs.getOwnerName()) + .setFile(true); + if (omPathInfo instanceof OMFileRequest.OMPathInfoWithFSO) { + // FileTable metadata format + OMFileRequest.OMPathInfoWithFSO omPathInfoFSO = (OMFileRequest.OMPathInfoWithFSO) omPathInfo; + objectID = omPathInfoFSO.getLeafNodeObjectId(); + builder.setParentObjectID(omPathInfoFSO.getLastKnownParentId()); + builder.setFileName(omPathInfoFSO.getLeafNodeName()); + } + builder.setObjectID(objectID); + return builder.build(); + } + + /** + * Prepare OmKeyInfo for multi-part upload part key which will be persisted + * to openKeyTable. + * @return OmKeyInfo + * @throws IOException + */ + @SuppressWarnings("parameternumber") + private static OmKeyInfo prepareMultipartFileInfo( + @Nonnull OMMetadataManager omMetadataManager, + @Nonnull OzoneManagerProtocolProtos.KeyArgs args, long size, + @Nonnull List locations, + FileEncryptionInfo encInfo, @Nonnull PrefixManager prefixManager, + @Nullable OmBucketInfo omBucketInfo, + OMFileRequest.OMPathInfo omPathInfo, + @Nonnull long transactionLogIndex, long objectID) + throws IOException { + + Preconditions.checkArgument(args.getMultipartNumber() > 0, + "PartNumber Should be greater than zero"); + // When key is multipart upload part key, we should take replication + // type and replication factor from original key which has done + // initiate multipart upload. If we have not found any such, we throw + // error no such multipart upload. + String uploadID = args.getMultipartUploadID(); + Preconditions.checkNotNull(uploadID); + String multipartKey = ""; + if (omPathInfo instanceof OMFileRequest.OMPathInfoWithFSO) { + OMFileRequest.OMPathInfoWithFSO omPathInfoFSO + = (OMFileRequest.OMPathInfoWithFSO) omPathInfo; + final long volumeId = omMetadataManager.getVolumeId( + args.getVolumeName()); + final long bucketId = omMetadataManager.getBucketId( + args.getVolumeName(), args.getBucketName()); + // FileTable metadata format + multipartKey = omMetadataManager.getMultipartKey(volumeId, bucketId, + omPathInfoFSO.getLastKnownParentId(), + omPathInfoFSO.getLeafNodeName(), uploadID); + } else { + multipartKey = omMetadataManager + .getMultipartKey(args.getVolumeName(), args.getBucketName(), + args.getKeyName(), uploadID); + } + OmKeyInfo partKeyInfo = + omMetadataManager.getOpenKeyTable(omBucketInfo.getBucketLayout()).get(multipartKey); + if (partKeyInfo == null) { + throw new OMException("No such Multipart upload is with specified " + + "uploadId " + uploadID, + OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); + } + // For this upload part we don't need to check in KeyTable. As this + // is not an actual key, it is a part of the key. + return createFileInfo(args, locations, partKeyInfo.getReplicationConfig(), + size, encInfo, prefixManager, omBucketInfo, omPathInfo, + transactionLogIndex, objectID); + } + + private static List getAclsForKey(OzoneManagerProtocolProtos.KeyArgs keyArgs, + OmBucketInfo bucketInfo, OMFileRequest.OMPathInfo omPathInfo, + PrefixManager prefixManager) { + + List acls = new ArrayList<>(); + if (keyArgs.getAclsList() != null) { + acls.addAll(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList())); + } + + // Inherit DEFAULT acls from prefix. + if (prefixManager != null) { + List prefixList = prefixManager.getLongestPrefixPath( + OZONE_URI_DELIMITER + + keyArgs.getVolumeName() + OZONE_URI_DELIMITER + + keyArgs.getBucketName() + OZONE_URI_DELIMITER + + keyArgs.getKeyName()); + + if (prefixList.size() > 0) { + // Add all acls from direct parent to key. + OmPrefixInfo prefixInfo = prefixList.get(prefixList.size() - 1); + if (prefixInfo != null) { + if (OzoneAclUtil.inheritDefaultAcls(acls, prefixInfo.getAcls(), ACCESS)) { + return acls; + } + } + } + } + + // Inherit DEFAULT acls from parent-dir only if DEFAULT acls for + // prefix are not set + if (omPathInfo != null) { + if (OzoneAclUtil.inheritDefaultAcls(acls, omPathInfo.getAcls(), ACCESS)) { + return acls; + } + } + + // Inherit DEFAULT acls from bucket only if DEFAULT acls for + // parent-dir are not set. + if (bucketInfo != null) { + if (OzoneAclUtil.inheritDefaultAcls(acls, bucketInfo.getAcls(), ACCESS)) { + return acls; + } + } + + return acls; + } + + /** + * This methods avoids multiple rpc calls to SCM by allocating multiple blocks + * in one rpc call. + * @throws IOException + */ + @SuppressWarnings("parameternumber") + public static List< OmKeyLocationInfo > allocateBlock( + ScmClient scmClient, OzoneBlockTokenSecretManager secretManager, ReplicationConfig replicationConfig, + ExcludeList excludeList, long requestedSize, long scmBlockSize, int preallocateBlocksMax, + boolean grpcBlockTokenEnabled, String serviceID, OMMetrics omMetrics, boolean shouldSortDatanodes, + OzoneManagerProtocolProtos.UserInfo userInfo) + throws IOException { + int dataGroupSize = replicationConfig instanceof ECReplicationConfig + ? ((ECReplicationConfig) replicationConfig).getData() : 1; + int numBlocks = (int) Math.min(preallocateBlocksMax, + (requestedSize - 1) / (scmBlockSize * dataGroupSize) + 1); + + String clientMachine = ""; + if (shouldSortDatanodes) { + clientMachine = userInfo.getRemoteAddress(); + } + + List locationInfos = new ArrayList<>(numBlocks); + String remoteUser = getRemoteUser().getShortUserName(); + List allocatedBlocks; + try { + allocatedBlocks = scmClient.getBlockClient() + .allocateBlock(scmBlockSize, numBlocks, replicationConfig, serviceID, + excludeList, clientMachine); + } catch (SCMException ex) { + omMetrics.incNumBlockAllocateCallFails(); + if (ex.getResult() + .equals(SCMException.ResultCodes.SAFE_MODE_EXCEPTION)) { + throw new OMException(ex.getMessage(), + OMException.ResultCodes.SCM_IN_SAFE_MODE); + } + throw ex; + } + for (AllocatedBlock allocatedBlock : allocatedBlocks) { + BlockID blockID = new BlockID(allocatedBlock.getBlockID()); + OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() + .setBlockID(blockID) + .setLength(scmBlockSize) + .setOffset(0) + .setPipeline(allocatedBlock.getPipeline()); + if (grpcBlockTokenEnabled) { + builder.setToken(secretManager.generateToken(remoteUser, blockID, + EnumSet.of(READ, WRITE), scmBlockSize)); + } + locationInfos.add(builder.build()); + } + return locationInfos; + } + + /** + * Set parameters needed for return error response to client. + * @param omResponse + * @param ex - IOException + * @return error response need to be returned to client - OMResponse. + */ + public static OzoneManagerProtocolProtos.OMResponse createErrorOMResponse( + @Nonnull OzoneManagerProtocolProtos.OMResponse.Builder omResponse, @Nonnull Exception ex) { + + omResponse.setSuccess(false); + String errorMsg = exceptionErrorMessage(ex); + if (errorMsg != null) { + omResponse.setMessage(errorMsg); + } + omResponse.setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(ex)); + return omResponse.build(); + } + private static String exceptionErrorMessage(Exception ex) { + if (ex instanceof OMException || ex instanceof InvalidPathException) { + return ex.getMessage(); + } else { + return org.apache.hadoop.util.StringUtils.stringifyException(ex); + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OmRequestBase.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OmRequestBase.java new file mode 100644 index 00000000000..a1d16c7a787 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/request/OmRequestBase.java @@ -0,0 +1,121 @@ +package org.apache.hadoop.ozone.om.ratis.execution.request; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.audit.AuditAction; +import org.apache.hadoop.ozone.audit.AuditEventStatus; +import org.apache.hadoop.ozone.audit.AuditLogger; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.ratis.execution.DbChangesRecorder; +import org.apache.hadoop.ozone.om.request.RequestAuditor; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.ratis.server.protocol.TermIndex; + +/** + * define methods for request to handle. + */ +public class OmRequestBase implements RequestAuditor { + private final OMAuditLogger.Builder auditBuilder = OMAuditLogger.newBuilder(); + private DbChangesRecorder recorder = new DbChangesRecorder(); + private OMRequest omRequest; + private OmBucketInfo bucketInfo; + + public OmRequestBase(OMRequest omRequest, OmBucketInfo bucketInfo) { + this.omRequest = omRequest; + this.bucketInfo = bucketInfo; + } + /** + * Perform request validation, bucket type check, update parameter like user Info, update time and others. + */ + public OzoneManagerProtocolProtos.OMRequest preProcess(OzoneManager ozoneManager) + throws IOException { + OzoneManagerProtocolProtos.LayoutVersion layoutVersion = OzoneManagerProtocolProtos.LayoutVersion.newBuilder() + .setVersion(ozoneManager.getVersionManager().getMetadataLayoutVersion()) + .build(); + omRequest = getOmRequest().toBuilder() + .setUserInfo(OmKeyUtils.getUserIfNotExists(ozoneManager, getOmRequest())) + .setLayoutVersion(layoutVersion).build(); + return omRequest; + } + + public void authorize(OzoneManager ozoneManager) throws IOException { + } + /** + * perform request processing such as prepare changes, resource validation. + */ + public OMClientResponse process(OzoneManager ozoneManager, TermIndex termIndex) throws IOException { + return null; + } + public DbChangesRecorder changeRecorder() { + return recorder; + } + public OmBucketInfo resolveBucket(OzoneManager ozoneManager, String volume, String bucket) throws IOException { + String bucketKey = ozoneManager.getMetadataManager().getBucketKey(volume, bucket); + + CacheValue value = ozoneManager.getMetadataManager().getBucketTable() + .getCacheValue(new CacheKey<>(bucketKey)); + if (value == null || value.getCacheValue() == null) { + throw new OMException("Bucket not found: " + volume + "/" + bucket, OMException.ResultCodes.BUCKET_NOT_FOUND); + } + bucketInfo = value.getCacheValue(); + return bucketInfo; + } + + + protected BucketLayout getBucketLayout() { + return bucketInfo == null ? BucketLayout.DEFAULT : bucketInfo.getBucketLayout(); + } + public OmBucketInfo getBucketInfo() { + return bucketInfo; + } + + public OMRequest getOmRequest() { + return omRequest; + } + + @Override + public OMAuditLogger.Builder buildAuditMessage( + AuditAction op, Map< String, String > auditMap, Throwable throwable, + OzoneManagerProtocolProtos.UserInfo userInfo) { + auditBuilder.getMessageBuilder() + .setUser(userInfo != null ? userInfo.getUserName() : null) + .atIp(userInfo != null ? userInfo.getRemoteAddress() : null) + .forOperation(op) + .withParams(auditMap) + .withResult(throwable != null ? AuditEventStatus.FAILURE : + AuditEventStatus.SUCCESS) + .withException(throwable); + auditBuilder.setAuditMap(auditMap); + return auditBuilder; + } + + @Override + public Map buildVolumeAuditMap(String volume) { + Map auditMap = new LinkedHashMap<>(); + auditMap.put(OzoneConsts.VOLUME, volume); + return auditMap; + } + /** + * Mark ready for log audit. + * @param auditLogger + * @param builder + */ + protected void markForAudit(AuditLogger auditLogger, OMAuditLogger.Builder builder) { + builder.setLog(true); + builder.setAuditLogger(auditLogger); + } + + public OMAuditLogger.Builder getAuditBuilder() { + return auditBuilder; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index c3dbed9e672..b46d4eeff1b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException; +import org.apache.hadoop.ozone.om.ratis.execution.request.OmRequestBase; import org.apache.hadoop.ozone.om.request.BucketLayoutAwareOMKeyRequestFactory; import org.apache.hadoop.ozone.om.request.OMPersistDbRequest; import org.apache.hadoop.ozone.om.request.bucket.OMBucketCreateRequest; @@ -352,6 +353,46 @@ public static OMClientRequest createClientRequest(OMRequest omRequest, volumeName, bucketName, omRequest, ozoneManager.getMetadataManager()); } + /** + * Create OmRequestBase which encapsulates the OMRequest. + * @param omRequest + * @return OMClientRequest + * @throws IOException + */ + @SuppressWarnings("checkstyle:methodlength") + public static OmRequestBase createLeaderClientRequest( + OMRequest omRequest, OzoneManager ozoneManager) throws IOException { + + // Handling of exception by createClientRequest(OMRequest, OzoneManger): + // Either the code will take FSO or non FSO path, both classes has a + // validateAndUpdateCache() function which also contains + // validateBucketAndVolume() function which validates bucket and volume and + // throws necessary exceptions if required. validateAndUpdateCache() + // function has catch block which catches the exception if required and + // handles it appropriately. + Type cmdType = omRequest.getCmdType(); + OzoneManagerProtocolProtos.KeyArgs keyArgs; + String volumeName = ""; + String bucketName = ""; + + switch (cmdType) { + case CreateKey: + keyArgs = omRequest.getCreateKeyRequest().getKeyArgs(); + volumeName = keyArgs.getVolumeName(); + bucketName = keyArgs.getBucketName(); + break; + case CommitKey: + keyArgs = omRequest.getCommitKeyRequest().getKeyArgs(); + volumeName = keyArgs.getVolumeName(); + bucketName = keyArgs.getBucketName(); + break; + default: + return null; + } + return BucketLayoutAwareOMKeyRequestFactory.createLeaderRequest( + volumeName, bucketName, omRequest, ozoneManager.getMetadataManager()); + } + private static OMClientRequest getOMAclRequest(OMRequest omRequest, OzoneManager ozoneManager) { Type cmdType = omRequest.getCmdType(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/BucketLayoutAwareOMKeyRequestFactory.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/BucketLayoutAwareOMKeyRequestFactory.java index 4a5558ed7f1..c2e0b75236c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/BucketLayoutAwareOMKeyRequestFactory.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/BucketLayoutAwareOMKeyRequestFactory.java @@ -19,8 +19,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManagerUtils; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.ratis.execution.request.OmRequestBase; import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest; import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequestWithFSO; import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest; @@ -72,6 +75,8 @@ public final class BucketLayoutAwareOMKeyRequestFactory { static final HashMap> OM_KEY_REQUEST_CLASSES = new HashMap<>(); + static final HashMap> + OM_LEADER_KEY_REQUEST_CLASSES = new HashMap<>(); static { // CreateDirectory @@ -98,6 +103,10 @@ public final class BucketLayoutAwareOMKeyRequestFactory { OMKeyCreateRequest.class, BucketLayout.OBJECT_STORE ); + addLeaderRequestClass(Type.CreateKey, + org.apache.hadoop.ozone.om.ratis.execution.request.OMKeyCreateRequest.class, + BucketLayout.OBJECT_STORE + ); addRequestClass(Type.CreateKey, OMKeyCreateRequestWithFSO.class, BucketLayout.FILE_SYSTEM_OPTIMIZED); @@ -117,6 +126,10 @@ public final class BucketLayoutAwareOMKeyRequestFactory { OMKeyCommitRequest.class, BucketLayout.OBJECT_STORE ); + addLeaderRequestClass(Type.CommitKey, + org.apache.hadoop.ozone.om.ratis.execution.request.OMKeyCommitRequest.class, + BucketLayout.OBJECT_STORE + ); addRequestClass(Type.CommitKey, OMKeyCommitRequestWithFSO.class, BucketLayout.FILE_SYSTEM_OPTIMIZED @@ -259,6 +272,64 @@ public static OMKeyRequest createRequest(String volumeName, String bucketName, } + /** + * Generates a request object for the given request based on the bucket + * layout. + * + * @param volumeName volume name + * @param bucketName bucket name + * @param omRequest OMRequest object + * @param omMetadataManager ozone metadata manager instance + * @return OMKeyRequest object + * @throws IOException if the request type is not supported. + */ + public static OmRequestBase createLeaderRequest(String volumeName, String bucketName, + OMRequest omRequest, + OMMetadataManager omMetadataManager) + + throws IOException { + if (StringUtils.isBlank(volumeName)) { + throw new OMException("Invalid, volume name is empty", + OMException.ResultCodes.INVALID_VOLUME_NAME); + } + + if (StringUtils.isBlank(bucketName)) { + throw new OMException("Invalid, Bucket name is empty", + OMException.ResultCodes.INVALID_BUCKET_NAME); + } + + // Get the bucket layout of the bucket being accessed by this request. + // While doing this we make sure we are resolving the real bucket in case of + // link buckets. + OmBucketInfo bucketInfo = OzoneManagerUtils.getResolvedBucketInfo(omMetadataManager, volumeName, bucketName); + BucketLayout bucketLayout = bucketInfo.getBucketLayout(); + + // Get the CmdType. + Type requestType = omRequest.getCmdType(); + + // If the request class is associated to FSO bucket layout, + // we add a suffix to its key in the map. + String classKey = getKey(requestType, bucketLayout); + + // Check if the key is present in the map. + if (OM_LEADER_KEY_REQUEST_CLASSES.containsKey(classKey)) { + try { + return getRequestInstanceFromMap(omRequest, classKey, bucketInfo); + } catch (NoSuchMethodException | InvocationTargetException | + InstantiationException | IllegalAccessException e) { + String errMsg = "Exception while instantiating OMKeyRequest of type " + + requestType + " for bucket layout " + bucketLayout + + ". Please check the OMKeyRequest class constructor."; + LOG.error(errMsg, e); + throw new OMException(errMsg, + OMException.ResultCodes.INTERNAL_ERROR); + } + } + + // We did not find this key in the map, it means this request type is not supported. + return null; + } + /** * Helper method to add a request class to the omKeyReqClasses map. * @@ -277,6 +348,24 @@ static void addRequestClass(Type requestType, requestClass); } + /** + * Helper method to add a request class to the omKeyReqClasses map. + * + * @param requestType type of the request + * @param requestClass Request class to be added. + * @param associatedBucketLayout BucketLayout the request class is associated + * with. + */ + static void addLeaderRequestClass(Type requestType, + Class + requestClass, + BucketLayout associatedBucketLayout) { + // If the request class is associated to FSO bucket layout, + // we add a suffix to its key in the map. + OM_LEADER_KEY_REQUEST_CLASSES.put(getKey(requestType, associatedBucketLayout), + requestClass); + } + /** * Finds the Request class associated with the given OMRequest and bucket * layout, and returns an instance of the same. @@ -309,6 +398,18 @@ static OMKeyRequest getRequestInstanceFromMap(OMRequest omRequest, // Invoke the constructor. return declaredConstructor.newInstance(omRequest, bucketLayout); } + @Nonnull + static OmRequestBase getRequestInstanceFromMap( + OMRequest omRequest, String classKey, OmBucketInfo bucketInfo) + throws NoSuchMethodException, InstantiationException, + IllegalAccessException, InvocationTargetException { + // Get the constructor of the request class. + // The constructor takes OMRequest and BucketLayout as parameters. + Constructor leaderReqCtr = OM_LEADER_KEY_REQUEST_CLASSES.get(classKey) + .getDeclaredConstructor(OMRequest.class, OmBucketInfo.class); + // Invoke the constructor. + return leaderReqCtr.newInstance(omRequest, bucketInfo); +} /** * Generates a key name for a request type and bucket layout. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 43c855ebda9..d0d6097ea3e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -215,13 +215,17 @@ private OMResponse internalProcessRequest(OMRequest request) throws } OMRequest requestToSubmit; try { - omClientRequest = createClientRequest(request, ozoneManager); - // TODO: Note: Due to HDDS-6055, createClientRequest() could now - // return null, which triggered the findbugs warning. - // Added the assertion. - assert (omClientRequest != null); - OMClientRequest finalOmClientRequest = omClientRequest; - requestToSubmit = preExecute(finalOmClientRequest); + if (ozoneManager.isLeaderExecutorEnabled()) { + requestToSubmit = request; + } else { + omClientRequest = createClientRequest(request, ozoneManager); + // TODO: Note: Due to HDDS-6055, createClientRequest() could now + // return null, which triggered the findbugs warning. + // Added the assertion. + assert (omClientRequest != null); + OMClientRequest finalOmClientRequest = omClientRequest; + requestToSubmit = preExecute(finalOmClientRequest); + } this.lastRequestToSubmit = requestToSubmit; } catch (IOException ex) { if (omClientRequest != null) { @@ -232,7 +236,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws } OMResponse response = submitRequestToRatis(requestToSubmit); - if (!response.getSuccess()) { + if (!response.getSuccess() && omClientRequest != null) { omClientRequest.handleRequestFailure(ozoneManager); } return response;