Skip to content

Commit

Permalink
HDDS-11650. ContainerId list to track all containers created in a dat…
Browse files Browse the repository at this point in the history
…anode (#7402)
  • Loading branch information
swamirishi authored Nov 20, 2024
1 parent a8db9cd commit fc6a2ea
Show file tree
Hide file tree
Showing 43 changed files with 1,000 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public final class OzoneConsts {
public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
public static final String SCM_DB_BACKUP_PREFIX = "scm.db.backup.";
public static final String CONTAINER_DB_NAME = "container.db";
public static final String WITNESSED_CONTAINER_DB_NAME = "witnessed_container.db";

public static final String STORAGE_DIR_CHUNKS = "chunks";
public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,18 +293,31 @@ public static Builder newReadChunkRequestBuilder(Pipeline pipeline,
*/
public static ContainerCommandRequestProto getCreateContainerRequest(
long containerID, Pipeline pipeline) throws IOException {
return getCreateContainerRequest(containerID, pipeline, ContainerProtos.ContainerDataProto.State.OPEN);
}


/**
* Returns a create container command for test purposes. There are a bunch of
* tests where we need to just send a request and get a reply.
*
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandRequestProto getCreateContainerRequest(
long containerID, Pipeline pipeline, ContainerProtos.ContainerDataProto.State state) throws IOException {
LOG.trace("addContainer: {}", containerID);
return getContainerCommandRequestBuilder(containerID, pipeline).build();
return getContainerCommandRequestBuilder(containerID, pipeline, state)
.build();
}

private static Builder getContainerCommandRequestBuilder(long containerID,
Pipeline pipeline) throws IOException {
Pipeline pipeline, ContainerProtos.ContainerDataProto.State state) throws IOException {
Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setContainerID(containerID);
request.setCreateContainer(
ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
ContainerProtos.CreateContainerRequestProto.getDefaultInstance().toBuilder().setState(state).build());
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());

return request;
Expand All @@ -320,7 +333,8 @@ public static ContainerCommandRequestProto getCreateContainerSecureRequest(
long containerID, Pipeline pipeline, Token<?> token) throws IOException {
LOG.trace("addContainer: {}", containerID);

Builder request = getContainerCommandRequestBuilder(containerID, pipeline);
Builder request = getContainerCommandRequestBuilder(containerID, pipeline,
ContainerProtos.ContainerDataProto.State.OPEN);
if (token != null) {
request.setEncodedToken(token.encodeToUrlString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Message;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;

import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.db.InMemoryTestTable;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
Expand Down Expand Up @@ -65,10 +69,24 @@ public class ContainerSet implements Iterable<Container<?>> {
new ConcurrentSkipListMap<>();
private Clock clock;
private long recoveringTimeout;
private final Table<Long, String> containerIdsTable;

@VisibleForTesting
public ContainerSet(long recoveringTimeout) {
this(new InMemoryTestTable<>(), recoveringTimeout);
}

public ContainerSet(Table<Long, String> continerIdsTable, long recoveringTimeout) {
this(continerIdsTable, recoveringTimeout, false);
}

public ContainerSet(Table<Long, String> continerIdsTable, long recoveringTimeout, boolean readOnly) {
this.clock = Clock.system(ZoneOffset.UTC);
this.containerIdsTable = continerIdsTable;
this.recoveringTimeout = recoveringTimeout;
if (!readOnly && containerIdsTable == null) {
throw new IllegalArgumentException("Container table cannot be null when container set is not read only");
}
}

public long getCurrentTime() {
Expand All @@ -85,22 +103,64 @@ public void setRecoveringTimeout(long recoveringTimeout) {
this.recoveringTimeout = recoveringTimeout;
}

/**
* Add Container to container map. This would fail if the container is already present or has been marked as missing.
* @param container container to be added
* @return If container is added to containerMap returns true, otherwise
* false
*/
public boolean addContainer(Container<?> container) throws StorageContainerException {
return addContainer(container, false);
}

/**
* Add Container to container map. This would overwrite the container even if it is missing. But would fail if the
* container is already present.
* @param container container to be added
* @return If container is added to containerMap returns true, otherwise
* false
*/
public boolean addContainerByOverwriteMissingContainer(Container<?> container) throws StorageContainerException {
return addContainer(container, true);
}

public void ensureContainerNotMissing(long containerId, State state) throws StorageContainerException {
if (missingContainerSet.contains(containerId)) {
throw new StorageContainerException(String.format("Container with container Id %d with state : %s is missing in" +
" the DN.", containerId, state),
ContainerProtos.Result.CONTAINER_MISSING);
}
}

/**
* Add Container to container map.
* @param container container to be added
* @param overwrite if true should overwrite the container if the container was missing.
* @return If container is added to containerMap returns true, otherwise
* false
*/
public boolean addContainer(Container<?> container) throws
private boolean addContainer(Container<?> container, boolean overwrite) throws
StorageContainerException {
Preconditions.checkNotNull(container, "container cannot be null");

long containerId = container.getContainerData().getContainerID();
State containerState = container.getContainerData().getState();
if (!overwrite) {
ensureContainerNotMissing(containerId, containerState);
}
if (containerMap.putIfAbsent(containerId, container) == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Container with container Id {} is added to containerMap",
containerId);
}
try {
if (containerIdsTable != null) {
containerIdsTable.put(containerId, containerState.toString());
}
} catch (IOException e) {
throw new StorageContainerException(e, ContainerProtos.Result.IO_EXCEPTION);
}
missingContainerSet.remove(containerId);
// wish we could have done this from ContainerData.setState
container.getContainerData().commitSpace();
if (container.getContainerData().getState() == RECOVERING) {
Expand All @@ -122,21 +182,69 @@ public boolean addContainer(Container<?> container) throws
* @return Container
*/
public Container<?> getContainer(long containerId) {
Preconditions.checkState(containerId >= 0,
"Container Id cannot be negative.");
Preconditions.checkState(containerId >= 0, "Container Id cannot be negative.");
return containerMap.get(containerId);
}

/**
* Removes container from both memory and database. This should be used when the containerData on disk has been
* removed completely from the node.
* @param containerId
* @return True if container is removed from containerMap.
* @throws StorageContainerException
*/
public boolean removeContainer(long containerId) throws StorageContainerException {
return removeContainer(containerId, false, true);
}

/**
* Removes containerId from memory. This needs to be used when the container is still present on disk, and the
* inmemory state of the container needs to be updated.
* @param containerId
* @return True if container is removed from containerMap.
* @throws StorageContainerException
*/
public boolean removeContainerOnlyFromMemory(long containerId) throws StorageContainerException {
return removeContainer(containerId, false, false);
}

/**
* Marks a container to be missing, thus it removes the container from inmemory containerMap and marks the
* container as missing.
* @param containerId
* @return True if container is removed from containerMap.
* @throws StorageContainerException
*/
public boolean removeMissingContainer(long containerId) throws StorageContainerException {
return removeContainer(containerId, true, false);
}

/**
* Removes the Container matching with specified containerId.
* @param containerId ID of the container to remove
* @return If container is removed from containerMap returns true, otherwise
* false
*/
public boolean removeContainer(long containerId) {
private boolean removeContainer(long containerId, boolean markMissing, boolean removeFromDB)
throws StorageContainerException {
Preconditions.checkState(containerId >= 0,
"Container Id cannot be negative.");
//We need to add to missing container set before removing containerMap since there could be write chunk operation
// that could recreate the container in another volume if we remove it from the map before adding to missing
// container.
if (markMissing) {
missingContainerSet.add(containerId);
}
Container<?> removed = containerMap.remove(containerId);
if (removeFromDB) {
try {
if (containerIdsTable != null) {
containerIdsTable.delete(containerId);
}
} catch (IOException e) {
throw new StorageContainerException(e, ContainerProtos.Result.IO_EXCEPTION);
}
}
if (removed == null) {
LOG.debug("Container with containerId {} is not present in " +
"containerMap", containerId);
Expand Down Expand Up @@ -190,20 +298,20 @@ public int containerCount() {
*
* @param context StateContext
*/
public void handleVolumeFailures(StateContext context) {
public void handleVolumeFailures(StateContext context) throws StorageContainerException {
AtomicBoolean failedVolume = new AtomicBoolean(false);
AtomicInteger containerCount = new AtomicInteger(0);
containerMap.values().forEach(c -> {
for (Container<?> c : containerMap.values()) {
ContainerData data = c.getContainerData();
if (data.getVolume().isFailed()) {
removeContainer(data.getContainerID());
removeMissingContainer(data.getContainerID());
LOG.debug("Removing Container {} as the Volume {} " +
"has failed", data.getContainerID(), data.getVolume());
"has failed", data.getContainerID(), data.getVolume());
failedVolume.set(true);
containerCount.incrementAndGet();
ContainerLogger.logLost(data, "Volume failure");
}
});
}

if (failedVolume.get()) {
try {
Expand Down Expand Up @@ -362,6 +470,10 @@ public Set<Long> getMissingContainerSet() {
return missingContainerSet;
}

public Table<Long, String> getContainerIdsTable() {
return containerIdsTable;
}

/**
* Builds the missing container set by taking a diff between total no
* containers actually found and number of containers which actually
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ private boolean canIgnoreException(Result result) {
case CONTAINER_UNHEALTHY:
case CLOSED_CONTAINER_IO:
case DELETE_ON_OPEN_CONTAINER:
case UNSUPPORTED_REQUEST: // Blame client for sending unsupported request.
case UNSUPPORTED_REQUEST:// Blame client for sending unsupported request.
case CONTAINER_MISSING:
return true;
default:
return false;
Expand Down Expand Up @@ -276,7 +277,8 @@ private ContainerCommandResponseProto dispatchRequest(
getMissingContainerSet().remove(containerID);
}
}
if (getMissingContainerSet().contains(containerID)) {
if (cmdType != Type.CreateContainer && !HddsUtils.isReadOnly(msg)
&& getMissingContainerSet().contains(containerID)) {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID
+ " has been lost and cannot be recreated on this DataNode",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -84,7 +85,7 @@ public class MutableVolumeSet implements VolumeSet {
private String clusterID;

private final StorageVolumeChecker volumeChecker;
private Runnable failedVolumeListener;
private CheckedRunnable<IOException> failedVolumeListener;
private StateContext context;
private final StorageVolumeFactory volumeFactory;
private final StorageVolume.VolumeType volumeType;
Expand Down Expand Up @@ -132,7 +133,7 @@ public MutableVolumeSet(String dnUuid, String clusterID,
initializeVolumeSet();
}

public void setFailedVolumeListener(Runnable runnable) {
public void setFailedVolumeListener(CheckedRunnable<IOException> runnable) {
failedVolumeListener = runnable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
Expand Down Expand Up @@ -119,8 +121,6 @@
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto.State.RECOVERING;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;

Expand Down Expand Up @@ -354,6 +354,15 @@ ContainerCommandResponseProto handleCreateContainer(
}

long containerID = request.getContainerID();
State containerState = request.getCreateContainer().getState();

if (containerState != RECOVERING) {
try {
containerSet.ensureContainerNotMissing(containerID, containerState);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
}
}

ContainerLayoutVersion layoutVersion =
ContainerLayoutVersion.getConfiguredVersion(conf);
Expand All @@ -378,7 +387,11 @@ ContainerCommandResponseProto handleCreateContainer(
try {
if (containerSet.getContainer(containerID) == null) {
newContainer.create(volumeSet, volumeChoosingPolicy, clusterId);
created = containerSet.addContainer(newContainer);
if (RECOVERING == newContainer.getContainerState()) {
created = containerSet.addContainerByOverwriteMissingContainer(newContainer);
} else {
created = containerSet.addContainer(newContainer);
}
} else {
// The create container request for an already existing container can
// arrive in case the ContainerStateMachine reapplies the transaction
Expand Down Expand Up @@ -1070,7 +1083,7 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
* might already be in closing state here.
*/
if (containerState == State.OPEN || containerState == State.CLOSING
|| containerState == State.RECOVERING) {
|| containerState == RECOVERING) {
return;
}

Expand Down
Loading

0 comments on commit fc6a2ea

Please sign in to comment.