Skip to content

Commit

Permalink
HDDS-11013. Ensure version is always set in ContainerCommandRequestPr…
Browse files Browse the repository at this point in the history
…oto (apache#6812)
  • Loading branch information
swamirishi authored Jun 17, 2024
1 parent b20ceeb commit 81bc179
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -277,6 +278,11 @@ public ContainerCommandResponseProto sendCommand(
List<DatanodeDetails> datanodeList = pipeline.getNodes();
HashMap<DatanodeDetails, CompletableFuture<ContainerCommandResponseProto>>
futureHashMap = new HashMap<>();
if (!request.hasVersion()) {
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder(request);
builder.setVersion(ClientVersion.CURRENT.toProtoValue());
request = builder.build();
}
for (DatanodeDetails dn : datanodeList) {
try {
futureHashMap.put(dn, sendCommandAsync(request, dn).getResponse());
Expand Down Expand Up @@ -337,10 +343,13 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(

return TracingUtil.executeInNewSpan(spanName,
() -> {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan()).build();
return sendCommandWithRetry(finalPayload, validators);
.setTraceID(TracingUtil.exportCurrentSpan());
if (!request.hasVersion()) {
builder.setVersion(ClientVersion.CURRENT.toProtoValue());
}
return sendCommandWithRetry(builder.build(), validators);
});
}

Expand Down Expand Up @@ -490,12 +499,14 @@ public XceiverClientReply sendCommandAsync(

try (Scope ignored = GlobalTracer.get().activateSpan(span)) {

ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
.setTraceID(TracingUtil.exportCurrentSpan());
if (!request.hasVersion()) {
builder.setVersion(ClientVersion.CURRENT.toProtoValue());
}
XceiverClientReply asyncReply =
sendCommandAsync(finalPayload, pipeline.getFirstNode());
sendCommandAsync(builder.build(), pipeline.getFirstNode());
if (shouldBlockAndWaitAsyncReply(request)) {
asyncReply.getResponse().get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
Expand Down Expand Up @@ -228,9 +229,12 @@ public Pipeline getPipeline() {
}

@Override
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request
) {
public XceiverClientReply sendCommandAsync(ContainerCommandRequestProto request) {

if (!request.hasVersion()) {
request = ContainerCommandRequestProto.newBuilder(request)
.setVersion(ClientVersion.CURRENT.toProtoValue()).build();
}
final ContainerCommandResponseProto.Builder builder =
ContainerCommandResponseProto.newBuilder()
.setResult(Result.SUCCESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.Checksum;

import org.apache.ratis.protocol.Message;
Expand All @@ -44,6 +45,9 @@ public static ContainerCommandRequestMessage toMessage(
if (traceId != null) {
b.setTraceID(traceId);
}
if (!request.hasVersion()) {
b.setVersion(ClientVersion.CURRENT.toProtoValue());
}

ByteString data = ByteString.EMPTY;
if (request.getCmdType() == Type.WriteChunk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
Expand Down Expand Up @@ -91,6 +92,7 @@ static ContainerCommandRequestProto newPutSmallFile(
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(UUID.randomUUID().toString())
.setPutSmallFile(putSmallFileRequest)
.setVersion(ClientVersion.CURRENT.toProtoValue())
.build();
}

Expand All @@ -113,6 +115,7 @@ static ContainerCommandRequestProto newWriteChunk(
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(UUID.randomUUID().toString())
.setWriteChunk(writeChunkRequest)
.setVersion(ClientVersion.CURRENT.toProtoValue())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.UniqueId;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
Expand Down Expand Up @@ -540,6 +541,11 @@ public static byte[] generateData(int length, boolean random) {
return data;
}

public static ContainerCommandRequestProto getDummyCommandRequestProto(
ContainerProtos.Type cmdType) {
return getDummyCommandRequestProto(ClientVersion.CURRENT, cmdType, 0);
}

/**
* Construct fake protobuf messages for various types of requests.
* This is tedious, however necessary to test. Protobuf classes are final
Expand All @@ -549,16 +555,17 @@ public static byte[] generateData(int length, boolean random) {
* @return
*/
public static ContainerCommandRequestProto getDummyCommandRequestProto(
ContainerProtos.Type cmdType) {
ClientVersion clientVersion, ContainerProtos.Type cmdType, int replicaIndex) {
final Builder builder =
ContainerCommandRequestProto.newBuilder()
.setVersion(clientVersion.toProtoValue())
.setCmdType(cmdType)
.setContainerID(DUMMY_CONTAINER_ID)
.setDatanodeUuid(DATANODE_UUID);

final DatanodeBlockID fakeBlockId =
DatanodeBlockID.newBuilder()
.setContainerID(DUMMY_CONTAINER_ID).setLocalID(1)
.setContainerID(DUMMY_CONTAINER_ID).setLocalID(1).setReplicaIndex(replicaIndex)
.setBlockCommitSequenceId(101).build();

final ContainerProtos.ChunkInfo fakeChunkInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.Buffers;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
import org.apache.ratis.client.api.DataStreamOutput;
Expand Down Expand Up @@ -70,14 +71,15 @@ public class TestKeyValueStreamDataChannel {
public static final Logger LOG =
LoggerFactory.getLogger(TestKeyValueStreamDataChannel.class);

static final ContainerCommandRequestProto PUT_BLOCK_PROTO
private static final ContainerCommandRequestProto PUT_BLOCK_PROTO
= ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.PutBlock)
.setPutBlock(PutBlockRequestProto.newBuilder().setBlockData(
BlockData.newBuilder().setBlockID(DatanodeBlockID.newBuilder()
.setContainerID(222).setLocalID(333).build()).build()))
.setDatanodeUuid("datanodeId")
.setContainerID(111L)
.setVersion(ClientVersion.CURRENT.toProtoValue())
.build();
static final int PUT_BLOCK_PROTO_SIZE = PUT_BLOCK_PROTO.toByteString().size();
static {
Expand Down

0 comments on commit 81bc179

Please sign in to comment.