Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into HDDS-10261
Browse files Browse the repository at this point in the history
  • Loading branch information
sarvekshayr committed Feb 1, 2024
2 parents fbbc9c6 + baae750 commit e13167e
Show file tree
Hide file tree
Showing 78 changed files with 724 additions and 1,236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

package org.apache.hadoop.hdds.scm.client;

import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -72,37 +67,6 @@ private HddsClientUtils() {
.add(NotReplicatedException.class)
.build();

/**
* Date format that used in ozone. Here the format is thread safe to use.
*/
private static final ThreadLocal<DateTimeFormatter> DATE_FORMAT =
ThreadLocal.withInitial(() -> {
DateTimeFormatter format =
DateTimeFormatter.ofPattern(OzoneConsts.OZONE_DATE_FORMAT);
return format.withZone(ZoneId.of(OzoneConsts.OZONE_TIME_ZONE));
});


/**
* Convert time in millisecond to a human readable format required in ozone.
* @return a human readable string for the input time
*/
public static String formatDateTime(long millis) {
ZonedDateTime dateTime = ZonedDateTime.ofInstant(
Instant.ofEpochMilli(millis), DATE_FORMAT.get().getZone());
return DATE_FORMAT.get().format(dateTime);
}

/**
* Convert time in ozone date format to millisecond.
* @return time in milliseconds
*/
public static long formatDateTime(String date) throws ParseException {
Preconditions.checkNotNull(date, "Date string should not be null.");
return ZonedDateTime.parse(date, DATE_FORMAT.get())
.toInstant().toEpochMilli();
}

private static void doNameChecks(String resName) {
if (resName == null) {
throw new IllegalArgumentException("Bucket or Volume name is null");
Expand Down Expand Up @@ -208,17 +172,6 @@ public static void verifyResourceName(String resName, boolean isStrictS3) {
}
}

/**
* verifies that bucket / volume name is a valid DNS name.
*
* @param resourceNames Array of bucket / volume names to be verified.
*/
public static void verifyResourceName(String... resourceNames) {
for (String resourceName : resourceNames) {
HddsClientUtils.verifyResourceName(resourceName);
}
}

/**
* verifies that key name is a valid name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -239,11 +238,6 @@ public List<DatanodeDetails> getFailedServers() {
return failedServers;
}

@VisibleForTesting
public XceiverClientRatis getXceiverClient() {
return xceiverClient;
}

public IOException getIoException() {
return ioException.get();
}
Expand Down Expand Up @@ -331,10 +325,6 @@ private void updateFlushLength() {
totalDataFlushedLength = writtenDataLength;
}

@VisibleForTesting
public long getTotalDataFlushedLength() {
return totalDataFlushedLength;
}
/**
* Will be called on the retryPath in case closedContainerException/
* TimeoutException.
Expand Down Expand Up @@ -703,11 +693,6 @@ private void writeChunkToContainer(ByteBuffer buf)
containerBlockData.addChunks(chunkInfo);
}

@VisibleForTesting
public void setXceiverClient(XceiverClientRatis xceiverClient) {
this.xceiverClient = xceiverClient;
}

/**
* Handles InterruptedExecution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,4 @@ public synchronized List<ChunkInputStream> getChunkStreams() {
return chunkStreams;
}

@VisibleForTesting
public static Logger getLog() {
return LOG;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,6 @@ public IOException getIoException() {
return ioException.get();
}

XceiverClientSpi getXceiverClientSpi() {
return this.xceiverClient;
}

public BlockData.Builder getContainerBlockData() {
return this.containerBlockData;
}
Expand Down Expand Up @@ -327,10 +323,6 @@ private void updateFlushLength() {
totalDataFlushedLength = writtenDataLength;
}

private boolean isBufferPoolFull() {
return bufferPool.computeBufferData() == streamBufferArgs.getStreamBufferMaxSize();
}

/**
* Will be called on the retryPath in case closedContainerException/
* TimeoutException.
Expand Down Expand Up @@ -758,11 +750,6 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
return null;
}

@VisibleForTesting
public void setXceiverClient(XceiverClientSpi xceiverClient) {
this.xceiverClient = xceiverClient;
}

/**
* Handles InterruptedExecution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -58,8 +56,6 @@
*/
public class RatisBlockOutputStream extends BlockOutputStream
implements Syncable {
public static final Logger LOG = LoggerFactory.getLogger(
RatisBlockOutputStream.class);

// This object will maintain the commitIndexes and byteBufferList in order
// Also, corresponding to the logIndex, the corresponding list of buffers will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,6 @@ protected int availableDataLocations(int expectedLocations) {
return count;
}

protected int availableParityLocations() {
int count = 0;
for (int i = repConfig.getData();
i < repConfig.getData() + repConfig.getParity(); i++) {
if (dataLocations[i] != null) {
count++;
}
}
return count;
}

public ECBlockInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ public void testGetScmClientAddressForHA() {
conf.set(ScmConfigKeys.OZONE_SCM_NODE_ID_KEY, "scm1");

int port = 9880;
int i = 1;
for (String nodeId : nodes) {
conf.setInt(ConfUtils.addKeySuffixes(OZONE_SCM_CLIENT_PORT_KEY,
scmServiceId, nodeId), port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
Expand All @@ -47,7 +46,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -381,12 +379,6 @@ public void testReadNotRetriedOnOtherException(IOException ex)
}
}

private Pipeline samePipelineWithNewId(Pipeline pipeline) {
List<DatanodeDetails> reverseOrder = new ArrayList<>(pipeline.getNodes());
Collections.reverse(reverseOrder);
return MockPipeline.createPipeline(reverseOrder);
}

@ParameterizedTest
@MethodSource("exceptionsTriggersRefresh")
public void testRefreshOnReadFailureAfterUnbuffer(IOException ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ public synchronized BlockExtendedInputStream create(
public static class TestBlockInputStream extends BlockExtendedInputStream {

private ByteBuffer data;
private boolean closed = false;
private BlockID blockID;
private long length;
private boolean shouldError = false;
Expand All @@ -304,10 +303,6 @@ public static class TestBlockInputStream extends BlockExtendedInputStream {
data.position(0);
}

public boolean isClosed() {
return closed;
}

public void setShouldErrorOnSeek(boolean val) {
this.shouldErrorOnSeek = val;
}
Expand Down Expand Up @@ -377,9 +372,7 @@ protected int readWithStrategy(ByteReaderStrategy strategy) throws
}

@Override
public void close() {
closed = true;
}
public void close() { }

@Override
public void unbuffer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testCorrectStreamCreatedDependingOnDataLocations()
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);

try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
try (ECBlockInputStreamProxy ignored = createBISProxy(repConfig, blockInfo)) {
// Not all locations present, so we expect on;y the "missing=true" stream
// to be present.
assertThat(streamFactory.getStreams()).containsKey(false);
Expand All @@ -181,7 +181,7 @@ public void testCorrectStreamCreatedDependingOnDataLocations()
dnMap = ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);

try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
try (ECBlockInputStreamProxy ignored = createBISProxy(repConfig, blockInfo)) {
// Not all locations present, so we expect on;y the "missing=true" stream
// to be present.
assertThat(streamFactory.getStreams()).doesNotContainKey(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ public class ContainerReader implements Runnable {
private final ConfigurationSource config;
private final File hddsVolumeDir;
private final MutableVolumeSet volumeSet;
private final boolean shouldDeleteRecovering;
private final boolean shouldDelete;

public ContainerReader(
MutableVolumeSet volSet, HddsVolume volume, ContainerSet cset,
ConfigurationSource conf, boolean shouldDeleteRecovering) {
ConfigurationSource conf, boolean shouldDelete) {
Preconditions.checkNotNull(volume);
this.hddsVolume = volume;
this.hddsVolumeDir = hddsVolume.getHddsRootDir();
this.containerSet = cset;
this.config = conf;
this.volumeSet = volSet;
this.shouldDeleteRecovering = shouldDeleteRecovering;
this.shouldDelete = shouldDelete;
}

@Override
Expand Down Expand Up @@ -148,7 +148,7 @@ public void readVolume(File hddsVolumeRootDir) {
LOG.info("Start to verify containers on volume {}", hddsVolumeRootDir);
File currentDir = new File(idDir, Storage.STORAGE_DIR_CURRENT);
File[] containerTopDirs = currentDir.listFiles();
if (containerTopDirs != null) {
if (containerTopDirs != null && containerTopDirs.length > 0) {
for (File containerTopDir : containerTopDirs) {
if (containerTopDir.isDirectory()) {
File[] containerDirs = containerTopDir.listFiles();
Expand Down Expand Up @@ -214,7 +214,7 @@ public void verifyAndFixupContainerData(ContainerData containerData)
KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
config);
if (kvContainer.getContainerState() == RECOVERING) {
if (shouldDeleteRecovering) {
if (shouldDelete) {
kvContainer.markContainerUnhealthy();
LOG.info("Stale recovering container {} marked UNHEALTHY",
kvContainerData.getContainerID());
Expand All @@ -223,7 +223,9 @@ public void verifyAndFixupContainerData(ContainerData containerData)
return;
}
if (kvContainer.getContainerState() == DELETED) {
cleanupContainer(hddsVolume, kvContainer);
if (shouldDelete) {
cleanupContainer(hddsVolume, kvContainer);
}
return;
}
try {
Expand All @@ -232,8 +234,10 @@ public void verifyAndFixupContainerData(ContainerData containerData)
if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
throw e;
}
resolveDuplicate((KeyValueContainer) containerSet.getContainer(
kvContainer.getContainerData().getContainerID()), kvContainer);
if (shouldDelete) {
resolveDuplicate((KeyValueContainer) containerSet.getContainer(
kvContainer.getContainerData().getContainerID()), kvContainer);
}
}
} else {
throw new StorageContainerException("Container File is corrupted. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ public OzoneContainer(
containerSet = new ContainerSet(recoveringContainerTimeout);
metadataScanner = null;

buildContainerSet();
metrics = ContainerMetrics.create(conf);
handlers = Maps.newHashMap();

Expand Down Expand Up @@ -286,9 +285,10 @@ public GrpcTlsConfig getTlsClientConfig() {
}

/**
* Build's container map.
* Build's container map after volume format.
*/
private void buildContainerSet() {
@VisibleForTesting
public void buildContainerSet() {
Iterator<StorageVolume> volumeSetIterator = volumeSet.getVolumesList()
.iterator();
ArrayList<Thread> volumeThreads = new ArrayList<>();
Expand Down Expand Up @@ -442,6 +442,8 @@ public void start(String clusterId) throws IOException {
return;
}

buildContainerSet();

// Start background volume checks, which will begin after the configured
// delay.
volumeChecker.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,20 @@ RegisteredCommand register(DatanodeDetails datanodeDetails,
* TODO: Cleanup and update tests, HDDS-9642.
*
* @param datanodeDetails - Datanode ID.
* @param layoutVersionInfo - Layout Version Proto.
* @return Commands to be sent to the datanode.
*/
default List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutVersionInfo) {
return processHeartbeat(datanodeDetails, layoutVersionInfo, null);
default List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
return processHeartbeat(datanodeDetails, null);
};

/**
* Send heartbeat to indicate the datanode is alive and doing well.
* @param datanodeDetails - Datanode ID.
* @param layoutVersionInfo - Layout Version Proto.
* @param queueReport - The CommandQueueReportProto report from the
* heartbeating datanode.
* @return Commands to be sent to the datanode.
*/
List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutVersionInfo,
CommandQueueReportProto queueReport);

/**
Expand Down
Loading

0 comments on commit e13167e

Please sign in to comment.