Skip to content

Commit

Permalink
Refactor code base
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanzlenko committed Sep 9, 2024
1 parent d024248 commit 04de724
Show file tree
Hide file tree
Showing 39 changed files with 4,218 additions and 4,784 deletions.
537 changes: 244 additions & 293 deletions hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
Expand Down Expand Up @@ -54,12 +56,10 @@ public abstract class Handler {
protected String clusterId;
protected final ContainerMetrics metrics;
protected String datanodeId;
private IncrementalReportSender<Container> icrSender;
private final IncrementalReportSender<Container> icrSender;

protected Handler(ConfigurationSource config, String datanodeId,
ContainerSet contSet, VolumeSet volumeSet,
ContainerMetrics containerMetrics,
IncrementalReportSender<Container> icrSender) {
protected Handler(ConfigurationSource config, String datanodeId, ContainerSet contSet, VolumeSet volumeSet,
ContainerMetrics containerMetrics, IncrementalReportSender<Container> icrSender) {
this.conf = config;
this.containerSet = contSet;
this.volumeSet = volumeSet;
Expand All @@ -68,28 +68,41 @@ protected Handler(ConfigurationSource config, String datanodeId,
this.icrSender = icrSender;
}

public static Handler getHandlerForContainerType(
final ContainerType containerType, final ConfigurationSource config,
final String datanodeId, final ContainerSet contSet,
final VolumeSet volumeSet, final ContainerMetrics metrics,
/**
* Returns a handler for the specified container type.
*
* @param containerType the type of container for which the handler is required
* @param config the configuration source
* @param datanodeId the ID of the data node
* @param contSet the set of containers
* @param volumeSet the set of volumes
* @param metrics metrics for the container
* @param icrSender the incremental report sender
* @return a Handler for the specified container type
* @throws IllegalArgumentException if the container type does not exist
*/
public static Handler getHandlerForContainerType(ContainerType containerType, ConfigurationSource config,
String datanodeId, ContainerSet contSet, VolumeSet volumeSet, ContainerMetrics metrics,
IncrementalReportSender<Container> icrSender) {
switch (containerType) {
case KeyValueContainer:
return new KeyValueHandler(config,
datanodeId, contSet, volumeSet, metrics,
icrSender);
default:
throw new IllegalArgumentException("Handler for ContainerType: " +
containerType + "doesn't exist.");
if (Objects.requireNonNull(containerType) == ContainerType.KeyValueContainer) {
return new KeyValueHandler(config, datanodeId, contSet, volumeSet, metrics, icrSender);
}
throw new IllegalArgumentException("Handler for ContainerType: " + containerType + "doesn't exist.");
}

public abstract StateMachine.DataChannel getStreamDataChannel(
Container container, ContainerCommandRequestProto msg)
throws StorageContainerException;
/**
* Retrieves the data channel stream for a given container based on the specified request message.
*
* @param container the container for which the data channel will be retrieved
* @param msg the command request message associated with the data channel retrieval
* @return the data channel stream corresponding to the given container and request message
* @throws StorageContainerException if an error occurs while retrieving the data channel
*/
public abstract StateMachine.DataChannel getStreamDataChannel(Container container, ContainerCommandRequestProto msg)
throws StorageContainerException;

/**
* Returns the Id of this datanode.
* Returns the id of this datanode.
*
* @return datanode Id
*/
Expand All @@ -98,41 +111,39 @@ protected String getDatanodeId() {
}

/**
* This should be called whenever there is state change. It will trigger
* an ICR to SCM.
* This should be called whenever there is state change. It will trigger an ICR to SCM.
*
* @param container Container for which ICR has to be sent
*/
protected void sendICR(final Container container)
throws StorageContainerException {
if (container
.getContainerState() == ContainerProtos.ContainerDataProto
.State.RECOVERING) {
protected void sendICR(final Container container) throws StorageContainerException {
if (container.getContainerState() == State.RECOVERING) {
// Ignoring the recovering containers reports for now.
return;
}
icrSender.send(container);
}

public abstract ContainerCommandResponseProto handle(
ContainerCommandRequestProto msg, Container container,
/**
* Handles the given container command request.
*
* @param msg the container command request protocol message
* @param container the container to be handled
* @param dispatcherContext the context of the dispatcher handling the command
* @return the response protocol for the executed command
*/
public abstract ContainerCommandResponseProto handle(ContainerCommandRequestProto msg, Container container,
DispatcherContext dispatcherContext);

/**
* Imports container from a raw input stream.
*/
public abstract Container importContainer(
ContainerData containerData, InputStream rawContainerStream,
TarContainerPacker packer)
throws IOException;
public abstract Container importContainer(ContainerData containerData, InputStream rawContainerStream,
TarContainerPacker packer) throws IOException;

/**
* Exports container to the output stream.
*/
public abstract void exportContainer(
Container container,
OutputStream outputStream,
TarContainerPacker packer)
public abstract void exportContainer(Container container, OutputStream outputStream, TarContainerPacker packer)
throws IOException;

/**
Expand All @@ -141,84 +152,84 @@ public abstract void exportContainer(
public abstract void stop();

/**
* Marks the container for closing. Moves the container to CLOSING state.
* Marks the container for closing. Moves the container to {@link State#CLOSING} state.
*
* @param container container to update
* @throws IOException in case of exception
*/
public abstract void markContainerForClose(Container container)
throws IOException;
public abstract void markContainerForClose(Container container) throws IOException;

/**
* Marks the container Unhealthy. Moves the container to UNHEALTHY state.
* Marks the container Unhealthy. Moves the container to {@link State#UNHEALTHY} state.
*
* @param container container to update
* @param reason The reason the container was marked unhealthy
* @throws IOException in case of exception
*/
public abstract void markContainerUnhealthy(Container container,
ScanResult reason)
throws IOException;
public abstract void markContainerUnhealthy(Container container, ScanResult reason) throws IOException;

/**
* Moves the Container to QUASI_CLOSED state.
* Moves the Container to {@link State#QUASI_CLOSED} state.
*
* @param container container to be quasi closed
* @param reason The reason the container was quasi closed, for logging
* purposes.
* @throws IOException
* @param reason The reason the container was quasi closed, for logging purposes.
*/
public abstract void quasiCloseContainer(Container container, String reason)
throws IOException;
public abstract void quasiCloseContainer(Container container, String reason) throws IOException;

/**
* Moves the Container to CLOSED state.
* Moves the Container to {@link State#CLOSED} state.
*
* @param container container to be closed
* @throws IOException
*/
public abstract void closeContainer(Container container)
throws IOException;
public abstract void closeContainer(Container container) throws IOException;

/**
* Deletes the given container.
*
* @param container container to be deleted
* @param force if this is set to true, we delete container without
* checking
* state of the container.
* @throws IOException
* @param force if this is set to true, we delete container without checking state of the container.
*/
public abstract void deleteContainer(Container container, boolean force)
throws IOException;
public abstract void deleteContainer(Container container, boolean force) throws IOException;

/**
* Deletes the given files associated with a block of the container.
*
* @param container container whose block is to be deleted
* @param blockData block to be deleted
* @throws IOException
*/
public abstract void deleteBlock(Container container, BlockData blockData)
throws IOException;
public abstract void deleteBlock(Container container, BlockData blockData) throws IOException;

/**
* Deletes the possible onDisk but unreferenced blocks/chunks with localID
* in the container.
* Deletes the possible onDisk but unreferenced blocks/chunks with localID in the container.
*
* @param container container whose block/chunk is to be deleted
* @param localID localId of the block/chunk
* @throws IOException
*/
public abstract void deleteUnreferenced(Container container, long localID)
throws IOException;
public abstract void deleteUnreferenced(Container container, long localID) throws IOException;

/**
* Adds a finalized block to a container.
*
* @param container The container to which the finalized block will be added.
* @param localID The local identifier for the block.
*/
public abstract void addFinalizedBlock(Container container, long localID);

/**
* Checks if a finalized block exists in the specified container with the given local ID.
*
* @param container the container to be checked
* @param localID the local ID of the block to be verified
* @return true if the finalized block exists, false otherwise
*/
public abstract boolean isFinalizedBlockExist(Container container, long localID);

/**
* Sets the cluster ID for this handler.
*
* @param clusterID the new cluster ID to be set
*/
public void setClusterID(String clusterID) {
this.clusterId = clusterID;
}

}
Loading

0 comments on commit 04de724

Please sign in to comment.