Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11418. leader execution flow #7344

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static Codec<OmKeyInfo> newCodec(boolean ignorePipeline) {
}

public static Codec<OmKeyInfo> getCodec(boolean ignorePipeline) {
LOG.info("OmKeyInfo.getCodec ignorePipeline = {}", ignorePipeline);
// LOG.info("OmKeyInfo.getCodec ignorePipeline = {}", ignorePipeline);
return ignorePipeline ? CODEC_TRUE : CODEC_FALSE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableRate;

/**
* This class is for maintaining Ozone Manager statistics.
Expand Down Expand Up @@ -235,7 +236,16 @@ public class OMMetrics implements OmMetadataReaderMetrics {
private @Metric MutableCounterLong ecKeyCreateFailsTotal;
private @Metric MutableCounterLong ecBucketCreateTotal;
private @Metric MutableCounterLong ecBucketCreateFailsTotal;

@Metric(about = "request commit request get key")
private MutableRate keyCommitGetKeyRate;
public MutableRate getKeyCommitGetKeyRate() {
return keyCommitGetKeyRate;
}
@Metric(about = "request commit request get open key")
private MutableRate keyCommitGetOpenKeyRate;
public MutableRate getKeyCommitGetOpenKeyRate() {
return keyCommitGetOpenKeyRate;
}
private final DBCheckpointMetrics dbCheckpointMetrics;

public OMMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final BucketUtilizationMetrics bucketUtilizationMetrics;

private boolean fsSnapshotEnabled;
private final boolean isLeaderExecutorFlag;

/**
* OM Startup mode.
Expand Down Expand Up @@ -582,6 +583,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
this.isStrictS3 = conf.getBoolean(
OZONE_OM_NAMESPACE_STRICT_S3,
OZONE_OM_NAMESPACE_STRICT_S3_DEFAULT);
this.isLeaderExecutorFlag = configuration.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);

// TODO: This is a temporary check. Once fully implemented, all OM state
// change should go through Ratis - be it standalone (for non-HA) or
Expand Down Expand Up @@ -5053,7 +5056,6 @@ public void checkFeatureEnabled(OzoneManagerVersion feature) throws OMException
}

public boolean isLeaderExecutorEnabled() {
return configuration.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
return isLeaderExecutorFlag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ratis.execution.request.OmRequestBase;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.ratis.server.protocol.TermIndex;
Expand Down Expand Up @@ -159,6 +160,34 @@ public static void log(OMAuditLogger.Builder builder, OMClientRequest request, O
}
}

public static void log(OMAuditLogger.Builder builder, OmRequestBase request, OzoneManager om,
TermIndex termIndex, Throwable th) {
if (builder.isLog.get()) {
builder.getAuditLogger().logWrite(builder.getMessageBuilder().build());
return;
}

OMAction action = getAction(request.getOmRequest());
if (null == action) {
// no audit log defined
return;
}
if (builder.getAuditMap() == null) {
builder.setAuditMap(new HashMap<>());
}
try {
builder.getAuditMap().put("Command", request.getOmRequest().getCmdType().name());
builder.getAuditMap().put("Transaction", "" + termIndex.getIndex());
request.buildAuditMessage(action, builder.getAuditMap(),
th, request.getOmRequest().getUserInfo());
builder.setLog(true);
builder.setAuditLogger(om.getAuditLogger());
log(builder);
} catch (Exception ex) {
LOG.error("Exception occurred while write audit log, ", ex);
}
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@
*/
public class KeyLocking {
private static final Logger LOG = LoggerFactory.getLogger(KeyLocking.class);
private static final int DEFAULT_FILE_LOCK_STRIPED_SIZE = 10240;
private static final int DEFAULT_FILE_LOCK_STRIPED_SIZE = 102400;
private static final long LOCK_TIMEOUT = 10 * 60 * 1000;
private Striped<ReadWriteLock> fileStripedLock = Striped.readWriteLock(DEFAULT_FILE_LOCK_STRIPED_SIZE);
private final Striped<ReadWriteLock> fileStripedLock;
private AtomicLong writeLockCount = new AtomicLong();
private AtomicLong readLockCount = new AtomicLong();
private AtomicLong failedLockCount = new AtomicLong();
private AtomicLong failedUnlockCount = new AtomicLong();

public KeyLocking(int stripLockSize) {
fileStripedLock = Striped.readWriteLock(stripLockSize);
}

public void lock(List<String> keyList) throws IOException {
for (String key : keyList) {
lock(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ public OmLockOpr(LockType type, String volume, String bucket, List<String> keyNa
Collections.sort(this.keyNameList);
}
public static void init(String threadNamePrefix) {
keyLocking = new KeyLocking();
volumeLocking = new KeyLocking();
snapshotLocking = new KeyLocking();
bucketLocking = new KeyLocking();
keyLocking = new KeyLocking(102400);
volumeLocking = new KeyLocking(1024);
snapshotLocking = new KeyLocking(1024);
bucketLocking = new KeyLocking(1024);
prefixLocking = new FSOPrefixLocking(threadNamePrefix);
lockedObjMap = new ConcurrentHashMap<>();
// init scheduler to check and monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ public static class BucketQuota {
private AtomicLong incUsedBytes = new AtomicLong();
private AtomicLong incUsedNamespace = new AtomicLong();

public void addUsedBytes(long bytes) {
incUsedBytes.addAndGet(bytes);
public long addUsedBytes(long bytes) {
return incUsedBytes.addAndGet(bytes);
}

public long getUsedBytes() {
return incUsedBytes.get();
}

public void addUsedNamespace(long bytes) {
incUsedNamespace.addAndGet(bytes);
public long addUsedNamespace(long bytes) {
return incUsedNamespace.addAndGet(bytes);
}

public long getUsedNamespace() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.hadoop.ozone.om.ratis.execution;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;

/**
* Records db changes.
*/
public class DbChangesRecorder {
private Map<String, Map<String, CodecBuffer>> tableRecordsMap = new HashMap<>();
private Map<String, Pair<Long, Long>> bucketUsedQuotaMap = new HashMap<>();

public void add(String name, String dbOpenKeyName, CodecBuffer omKeyCodecBuffer) {
Map<String, CodecBuffer> recordMap = tableRecordsMap.computeIfAbsent(name, k -> new HashMap<>());
recordMap.put(dbOpenKeyName, omKeyCodecBuffer);
}
public void add(String bucketName, long incUsedBytes, long incNamespace) {
Pair<Long, Long> quotaPair = bucketUsedQuotaMap.get(bucketName);
if (null == quotaPair) {
bucketUsedQuotaMap.put(bucketName, Pair.of(incUsedBytes, incNamespace));
} else {
bucketUsedQuotaMap.put(bucketName, Pair.of(incUsedBytes + quotaPair.getLeft(),
incNamespace + quotaPair.getRight()));
}
}
public Map<String, Map<String, CodecBuffer>> getTableRecordsMap() {
return tableRecordsMap;
}
public Map<String, Pair<Long, Long>> getBucketUsedQuotaMap() {
return bucketUsedQuotaMap;
}

public void clear() {
for (Map<String, CodecBuffer> records : tableRecordsMap.values()) {
records.values().forEach(e -> {
if (e != null) {
e.release();
}
});
}
tableRecordsMap.clear();
bucketUsedQuotaMap.clear();
}
}
Loading
Loading