Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11650. ContainerId list to track all containers created in a datanode #4

Merged
merged 23 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4d2ea78
HDDS-11650. ContainerId list to track all containers created in a dat…
swamirishi Nov 7, 2024
87a4809
HDDS-11650. Fix test cases
swamirishi Nov 7, 2024
5603b56
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Nov 7, 2024
c048a5e
HDDS-11650. Add comments
swamirishi Nov 7, 2024
f4c538f
HDDS-11650. Fix tests
swamirishi Nov 7, 2024
f22f0d1
HDDS-11650. Remove from rocskdb
swamirishi Nov 7, 2024
c94734a
HDDS-11650. Fix checkstyle
swamirishi Nov 7, 2024
e579d0e
HDDS-11650. Fix Issues
swamirishi Nov 7, 2024
2c376bc
HDDS-11650. Fix checkstyle & rat
swamirishi Nov 7, 2024
4b77481
HDDS-11650. Fix checkstyle & rat
swamirishi Nov 7, 2024
5027029
HDDS-11650. Fix tests failures
swamirishi Nov 8, 2024
c5392d0
HDDS-11650. Fix tests failures
swamirishi Nov 8, 2024
7c4837a
HDDS-11650. Fix MasterVolumeMetaStore cache
swamirishi Nov 8, 2024
1ae494b
HDDS-11650. Fix MasterVolumeMetaStore cache
swamirishi Nov 8, 2024
bdc2e50
HDDS-11650. Fix MasterVolumeMetaStore cache
swamirishi Nov 8, 2024
06ca347
HDDS-11650. Fix acceptance tests
swamirishi Nov 8, 2024
8f98ab9
HDDS-11650. Fix acceptance tests
swamirishi Nov 9, 2024
7d7f078
HDDS-11650. Add an integration test to test dn restarts with missing …
swamirishi Nov 9, 2024
108bf82
HDDS-11650. Address review comments
swamirishi Nov 15, 2024
5b3d27a
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Nov 17, 2024
af0f757
HDDS-11650. Address review comments
swamirishi Nov 17, 2024
b97d874
HDDS-11650. Reduce number of files changed
swamirishi Nov 17, 2024
09b2dfe
HDDS-11650. Fix checkstyle
swamirishi Nov 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.utils.db;

import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
import jakarta.annotation.Nonnull;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Codecs to serialize/deserialize Protobuf v2 enums.
*/
public final class Proto2EnumCodec<M extends ProtocolMessageEnum>
implements Codec<M> {
private static final ConcurrentMap<Class<? extends ProtocolMessageEnum>,
Codec<? extends ProtocolMessageEnum>> CODECS
= new ConcurrentHashMap<>();
private static final IntegerCodec INTEGER_CODEC = IntegerCodec.get();

/**
* @return the {@link Codec} for the given class.
*/
public static <T extends ProtocolMessageEnum> Codec<T> get(T t) {
final Codec<?> codec = CODECS.computeIfAbsent(t.getClass(),
key -> new Proto2EnumCodec<>(t));
return (Codec<T>) codec;
}

private final Class<M> clazz;

private Proto2EnumCodec(M m) {
this.clazz = (Class<M>) m.getClass();
}

@Override
public Class<M> getTypeClass() {
return clazz;
}

@Override
public boolean supportCodecBuffer() {
return INTEGER_CODEC.supportCodecBuffer();
}

@Override
public CodecBuffer toCodecBuffer(@Nonnull M value,
CodecBuffer.Allocator allocator) throws IOException {
return INTEGER_CODEC.toCodecBuffer(value.getNumber(), allocator);
}

private M parseFrom(Integer value) throws IOException {
try {
return (M) this.clazz.getDeclaredMethod("forNumber", int.class).invoke(null, value);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new IOException(e);
}
}

@Override
public M fromCodecBuffer(@Nonnull CodecBuffer buffer)
throws IOException {
return parseFrom(INTEGER_CODEC.fromCodecBuffer(buffer));
}

@Override
public byte[] toPersistedFormat(M value) {
return INTEGER_CODEC.toPersistedFormat(value.getNumber());
}

@Override
public M fromPersistedFormat(byte[] bytes) throws IOException {
return parseFrom(INTEGER_CODEC.fromPersistedFormat(bytes));
}

@Override
public M copyObject(M message) {
// proto messages are immutable
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public final class OzoneConsts {
public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
public static final String SCM_DB_BACKUP_PREFIX = "scm.db.backup.";
public static final String CONTAINER_DB_NAME = "container.db";
public static final String CONTAINER_META_DB_NAME = "container_meta.db";

public static final String STORAGE_DIR_CHUNKS = "chunks";
public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Message;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;

import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.db.DBTestUtils;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
Expand Down Expand Up @@ -65,10 +69,24 @@ public class ContainerSet implements Iterable<Container<?>> {
new ConcurrentSkipListMap<>();
private Clock clock;
private long recoveringTimeout;
private final Table<Long, State> containerIdsTable;

@VisibleForTesting
public ContainerSet(long recoveringTimeout) {
this(DBTestUtils.getInMemoryTableForTest(), recoveringTimeout);
}

public ContainerSet(Table<Long, State> continerIdsTable, long recoveringTimeout) {
this(continerIdsTable, recoveringTimeout, false);
}

public ContainerSet(Table<Long, State> continerIdsTable, long recoveringTimeout, boolean readOnly) {
this.clock = Clock.system(ZoneOffset.UTC);
this.containerIdsTable = continerIdsTable;
this.recoveringTimeout = recoveringTimeout;
if (!readOnly && containerIdsTable == null) {
throw new IllegalArgumentException("Container table cannot be null when container set is not read only");
}
}

public long getCurrentTime() {
Expand All @@ -85,22 +103,46 @@ public void setRecoveringTimeout(long recoveringTimeout) {
this.recoveringTimeout = recoveringTimeout;
}

public boolean addContainer(Container<?> container) throws StorageContainerException {
return addContainer(container, false);
}

public void validateContainerIsMissing(long containerId, State state) throws StorageContainerException {
if (missingContainerSet.contains(containerId)) {
throw new StorageContainerException(String.format("Container with container Id %d with state : %s is missing in" +
" the DN.", containerId, state),
ContainerProtos.Result.CONTAINER_MISSING);
}
}

/**
* Add Container to container map.
* @param container container to be added
* @return If container is added to containerMap returns true, otherwise
* false
*/
public boolean addContainer(Container<?> container) throws
public boolean addContainer(Container<?> container, boolean overwriteMissingContainers) throws
StorageContainerException {
Preconditions.checkNotNull(container, "container cannot be null");

long containerId = container.getContainerData().getContainerID();
State containerState = container.getContainerData().getState();
if (!overwriteMissingContainers) {
validateContainerIsMissing(containerId, containerState);
}
if (containerMap.putIfAbsent(containerId, container) == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Container with container Id {} is added to containerMap",
containerId);
}
try {
if (containerIdsTable != null) {
containerIdsTable.put(containerId, containerState);
}
} catch (IOException e) {
throw new StorageContainerException(e, ContainerProtos.Result.IO_EXCEPTION);
}
missingContainerSet.remove(containerId);
// wish we could have done this from ContainerData.setState
container.getContainerData().commitSpace();
if (container.getContainerData().getState() == RECOVERING) {
Expand All @@ -122,21 +164,40 @@ public boolean addContainer(Container<?> container) throws
* @return Container
*/
public Container<?> getContainer(long containerId) {
Preconditions.checkState(containerId >= 0,
"Container Id cannot be negative.");
Preconditions.checkState(containerId >= 0, "Container Id cannot be negative.");
return containerMap.get(containerId);
}

public boolean removeContainer(long containerId) throws StorageContainerException {
return removeContainer(containerId, false, true);
}

/**
* Removes the Container matching with specified containerId.
* @param containerId ID of the container to remove
* @return If container is removed from containerMap returns true, otherwise
* false
*/
public boolean removeContainer(long containerId) {
public boolean removeContainer(long containerId, boolean markMissing, boolean removeFromDB)
throws StorageContainerException {
Preconditions.checkState(containerId >= 0,
"Container Id cannot be negative.");
//We need to add to missing container set before removing containerMap since there could be write chunk operation
// that could recreate the container in another volume if we remove it from the map before adding to missing
// container.
if (markMissing) {
missingContainerSet.add(containerId);
}
Container<?> removed = containerMap.remove(containerId);
if (removeFromDB) {
try {
if (containerIdsTable != null) {
containerIdsTable.delete(containerId);
}
} catch (IOException e) {
throw new StorageContainerException(e, ContainerProtos.Result.IO_EXCEPTION);
}
}
if (removed == null) {
LOG.debug("Container with containerId {} is not present in " +
"containerMap", containerId);
Expand Down Expand Up @@ -190,20 +251,20 @@ public int containerCount() {
*
* @param context StateContext
*/
public void handleVolumeFailures(StateContext context) {
public void handleVolumeFailures(StateContext context) throws StorageContainerException {
AtomicBoolean failedVolume = new AtomicBoolean(false);
AtomicInteger containerCount = new AtomicInteger(0);
containerMap.values().forEach(c -> {
for (Container<?> c : containerMap.values()) {
ContainerData data = c.getContainerData();
if (data.getVolume().isFailed()) {
removeContainer(data.getContainerID());
removeContainer(data.getContainerID(), true, false);
LOG.debug("Removing Container {} as the Volume {} " +
"has failed", data.getContainerID(), data.getVolume());
"has failed", data.getContainerID(), data.getVolume());
failedVolume.set(true);
containerCount.incrementAndGet();
ContainerLogger.logLost(data, "Volume failure");
}
});
}

if (failedVolume.get()) {
try {
Expand Down Expand Up @@ -362,6 +423,10 @@ public Set<Long> getMissingContainerSet() {
return missingContainerSet;
}

public Table<Long, State> getContainerIdsTable() {
return containerIdsTable;
}

/**
* Builds the missing container set by taking a diff between total no
* containers actually found and number of containers which actually
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ private boolean canIgnoreException(Result result) {
case CONTAINER_UNHEALTHY:
case CLOSED_CONTAINER_IO:
case DELETE_ON_OPEN_CONTAINER:
case UNSUPPORTED_REQUEST: // Blame client for sending unsupported request.
case UNSUPPORTED_REQUEST:
case CONTAINER_MISSING:// Blame client for sending unsupported request.
return true;
default:
return false;
Expand Down Expand Up @@ -276,7 +277,8 @@ private ContainerCommandResponseProto dispatchRequest(
getMissingContainerSet().remove(containerID);
}
}
if (getMissingContainerSet().contains(containerID)) {
if (cmdType != Type.CreateContainer && !HddsUtils.isReadOnly(msg)
&& getMissingContainerSet().contains(containerID)) {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID
+ " has been lost and cannot be recreated on this DataNode",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.interfaces;

import org.apache.hadoop.ozone.container.metadata.AbstractStore;

import java.io.Closeable;

/**
* DB handle abstract class.
*/
public abstract class BaseDBHandle<STORE extends AbstractStore> implements Closeable {

private final STORE store;
private final String containerDBPath;

public BaseDBHandle(STORE store, String containerDBPath) {
this.store = store;
this.containerDBPath = containerDBPath;
}

public STORE getStore() {
return this.store;
}

public String getContainerDBPath() {
return this.containerDBPath;
}

public boolean cleanup() {
return true;
}

@Override
public String toString() {
return "DBHandle{" +
"containerDBPath='" + containerDBPath + '\'' +
", store=" + store +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,11 @@

import org.apache.hadoop.ozone.container.metadata.DatanodeStore;

import java.io.Closeable;

/**
* DB handle abstract class.
* DB handle abstract class for datanode store.
*/
public abstract class DBHandle implements Closeable {

private final DatanodeStore store;
private final String containerDBPath;

public abstract class DBHandle extends BaseDBHandle<DatanodeStore> {
public DBHandle(DatanodeStore store, String containerDBPath) {
this.store = store;
this.containerDBPath = containerDBPath;
}

public DatanodeStore getStore() {
return this.store;
}

public String getContainerDBPath() {
return this.containerDBPath;
}

public boolean cleanup() {
return true;
super(store, containerDBPath);
}
}
Loading
Loading