Skip to content

Commit

Permalink
Add a new version in the ReplicaTokenSerde class (#1266)
Browse files Browse the repository at this point in the history
Add a new version in the ReplicaTokenSerde, to account for persisting replicatype.
Write tests for DiskTokenPersistor.
Note that this fixes a bug in commit 9487e8b related to deserializing existing replica token files with the new replica token serialization format.
  • Loading branch information
ankagrawal authored and jsjtzyy committed Sep 25, 2019
1 parent da7c0b6 commit 362ffcf
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class ReplicationConfig {
public final String replicationStoreTokenFactory;

/**
* The factory class the replicatio uses to create cloud token
* The factory class the replication uses to create cloud token
*/
@Config("replcation.cloudtoken.factory")
@Config("replication.cloudtoken.factory")
@Default("com.github.ambry.cloud.CloudFindTokenFactory")
public final String replicationCloudTokenFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
* The type of replica token
*/
public enum FindTokenType {
Uninitialized, JournalBased, IndexBased, CloudBased;
}
Uninitialized, JournalBased, IndexBased, CloudBased
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ public static class ReplicaTokenSerde {
private static final short Crc_Size = 8;
private final ClusterMap clusterMap;
private final FindTokenHelper tokenHelper;
private final short version = 0;
private static final short VERSION_0 = 0;
private static final short VERSION_1 = 1;
private static final short CURRENT_VERSION = VERSION_1;

// Map<Sting,FindToken>
public ReplicaTokenSerde(ClusterMap clusterMap, FindTokenHelper tokenHelper) {
Expand All @@ -143,7 +145,7 @@ public void serializeTokens(List<ReplicaTokenInfo> tokenInfoList, OutputStream o
DataOutputStream writer = new DataOutputStream(crcOutputStream);
try {
// write the current version
writer.writeShort(version);
writer.writeShort(CURRENT_VERSION);
for (ReplicaTokenInfo replicaTokenInfo : tokenInfoList) {
writer.write(replicaTokenInfo.getPartitionId().getBytes());
// Write hostname
Expand Down Expand Up @@ -181,39 +183,40 @@ public List<ReplicaTokenInfo> deserializeTokens(InputStream inputStream) throws
List<ReplicaTokenInfo> tokenInfoList = new ArrayList<>();
try {
short version = stream.readShort();
switch (version) {
case 0:
while (stream.available() > Crc_Size) {
// read partition id
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);
// read remote node host name
String hostname = Utils.readIntString(stream);
// read remote replica path
String replicaPath = Utils.readIntString(stream);
// read remote port
int port = stream.readInt();
// read total bytes read from local store
long totalBytesReadFromLocalStore = stream.readLong();
//read replica type
ReplicaType replicaType = ReplicaType.values()[stream.readShort()];
// read replica token
FindTokenFactory findTokenFactory = tokenHelper.getFindTokenFactoryFromReplicaType(replicaType);
FindToken token = findTokenFactory.getFindToken(stream);

tokenInfoList.add(
new ReplicaTokenInfo(partitionId, hostname, replicaPath, port, totalBytesReadFromLocalStore, token));
}

long computedCrc = crcStream.getValue();
long readCrc = stream.readLong();
if (computedCrc != readCrc) {
throw new ReplicationException(
"Crc mismatch during replica token deserialization, computed " + computedCrc + ", read " + readCrc);
}
return tokenInfoList;
default:
throw new ReplicationException("Invalid version found during replica token deserialization: " + version);
if (version < VERSION_0 || version > VERSION_1) {
throw new ReplicationException("Invalid version found during replica token deserialization: " + version);
}
while (stream.available() > Crc_Size) {
// read partition id
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);
// read remote node host name
String hostname = Utils.readIntString(stream);
// read remote replica path
String replicaPath = Utils.readIntString(stream);
// read remote port
int port = stream.readInt();
// read total bytes read from local store
long totalBytesReadFromLocalStore = stream.readLong();
//read replica type; prior to VERSION_1 all the replicas were DISK_BACKED only
ReplicaType replicaType = ReplicaType.DISK_BACKED;
if (version > VERSION_0) {
replicaType = ReplicaType.values()[stream.readShort()];
}
// read replica token
FindTokenFactory findTokenFactory = tokenHelper.getFindTokenFactoryFromReplicaType(replicaType);
FindToken token = findTokenFactory.getFindToken(stream);

tokenInfoList.add(
new ReplicaTokenInfo(partitionId, hostname, replicaPath, port, totalBytesReadFromLocalStore, token));
}

long computedCrc = crcStream.getValue();
long readCrc = stream.readLong();
if (computedCrc != readCrc) {
throw new ReplicationException(
"Crc mismatch during replica token deserialization, computed " + computedCrc + ", read " + readCrc);
}
return tokenInfoList;
} catch (IOException e) {
throw new ReplicationException("IO error deserializing replica tokens", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.replication;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.MockClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.commons.BlobIdFactory;
import com.github.ambry.config.ReplicationConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.store.StoreFindTokenFactory;
import com.github.ambry.utils.CrcOutputStream;
import com.github.ambry.utils.SystemTime;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;


/**
* Test for {@link DiskTokenPersistor}.
*/
public class DiskTokenPersistorTest {
private static Map<String, List<PartitionInfo>> mountPathToPartitionInfoList;
private static ClusterMap clusterMap;
private static ReplicaId replicaId;
private static List<RemoteReplicaInfo.ReplicaTokenInfo> replicaTokenInfos;
private static FindTokenHelper findTokenHelper;
private static String REPLICA_TOKEN_FILENAME = "replicaTokens";

/**
* Create the one time setup for the tests.
* @throws Exception if Exception happens during setup.
*/
@BeforeClass
public static void setup() throws Exception {
clusterMap = new MockClusterMap();
mountPathToPartitionInfoList = new HashMap<>();
BlobIdFactory blobIdFactory = new BlobIdFactory(clusterMap);
StoreFindTokenFactory factory = new StoreFindTokenFactory(blobIdFactory);
PartitionId partitionId = clusterMap.getAllPartitionIds(null).get(0);

replicaId = partitionId.getReplicaIds().get(0);
List<? extends ReplicaId> peerReplicas = replicaId.getPeerReplicaIds();
List<RemoteReplicaInfo> remoteReplicas = new ArrayList<>();
replicaTokenInfos = new ArrayList<>();
for (ReplicaId remoteReplica : peerReplicas) {
RemoteReplicaInfo remoteReplicaInfo =
new RemoteReplicaInfo(remoteReplica, replicaId, null, factory.getNewFindToken(), 10, SystemTime.getInstance(),
remoteReplica.getDataNodeId().getPortToConnectTo());
remoteReplicas.add(remoteReplicaInfo);
replicaTokenInfos.add(new RemoteReplicaInfo.ReplicaTokenInfo(remoteReplicaInfo));
}
PartitionInfo partitionInfo = new PartitionInfo(remoteReplicas, partitionId, null, replicaId);
mountPathToPartitionInfoList.computeIfAbsent(replicaId.getMountPath(), key -> new ArrayList<>()).add(partitionInfo);

Properties replicationProperties = new Properties();
replicationProperties.setProperty("replication.cloudtoken.factory",
MockFindToken.MockFindTokenFactory.class.getName());
ReplicationConfig replicationConfig = new ReplicationConfig(new VerifiableProperties(replicationProperties));
findTokenHelper = new FindTokenHelper(blobIdFactory, replicationConfig);
}

/**
* Basic test to persist and retrieve disk tokens.
* @throws Exception if an Exception happens.
*/
@Test
public void basicTest() throws Exception {
DiskTokenPersistor diskTokenPersistor = new DiskTokenPersistor(REPLICA_TOKEN_FILENAME, mountPathToPartitionInfoList,
new ReplicationMetrics(new MetricRegistry(), Collections.emptyList()), clusterMap, findTokenHelper);

//Simple persist and retrieve should pass
diskTokenPersistor.persist(replicaId.getMountPath(), replicaTokenInfos);
List<RemoteReplicaInfo.ReplicaTokenInfo> retrievedReplicaTokenInfos =
diskTokenPersistor.retrieve(replicaId.getMountPath());

Assert.assertEquals("Number of tokens doesn't match.", replicaTokenInfos.size(), retrievedReplicaTokenInfos.size());
for (int i = 0; i < replicaTokenInfos.size(); i++) {
Assert.assertArrayEquals("Token is not correct.", replicaTokenInfos.get(i).getReplicaToken().toBytes(),
retrievedReplicaTokenInfos.get(i).getReplicaToken().toBytes());
}
}

/**
* Persist a replica with ReplicaTokenSerde version 0 and retrieve with current version (VERSION_1)
* This tests cases replica token file is previously persisted in older version (VERSION_0) and new code is deployed that bumps the version to VERSION_1
* @throws Exception if an exception happens
*/
@Test
public void testForVersion0AndCurrentVersionRetrieve() throws Exception {
DiskTokenPersistor diskTokenPersistor = new DiskTokenPersistor(REPLICA_TOKEN_FILENAME, mountPathToPartitionInfoList,
new ReplicationMetrics(new MetricRegistry(), Collections.emptyList()), clusterMap, findTokenHelper);

persistVersion0(replicaId.getMountPath(), replicaTokenInfos);
List<RemoteReplicaInfo.ReplicaTokenInfo> retrievedReplicaTokenInfos =
diskTokenPersistor.retrieve(replicaId.getMountPath());
Assert.assertEquals("Number of tokens doesn't match.", replicaTokenInfos.size(), retrievedReplicaTokenInfos.size());
for (int i = 0; i < replicaTokenInfos.size(); i++) {
Assert.assertArrayEquals("Token is not correct.", replicaTokenInfos.get(i).getReplicaToken().toBytes(),
retrievedReplicaTokenInfos.get(i).getReplicaToken().toBytes());
}
}

/**
* Persist token in VERSION_0 format.
* @param mountPath Path where persisted tokens will be saved.
* @param tokenInfoList {@link RemoteReplicaInfo.ReplicaTokenInfo} list to serialize.
* @throws IOException if an exception happens while persisting.
*/
private void persistVersion0(String mountPath, List<RemoteReplicaInfo.ReplicaTokenInfo> tokenInfoList)
throws IOException {
File temp = new File(mountPath, REPLICA_TOKEN_FILENAME + ".tmp");
File actual = new File(mountPath, REPLICA_TOKEN_FILENAME);
try (FileOutputStream fileStream = new FileOutputStream(temp)) {
serializeVersion0Tokens(tokenInfoList, fileStream);

// swap temp file with the original file
temp.renameTo(actual);
}
}

/**
* Serialize token in VERSION_0 format.
* @param tokenInfoList {@link RemoteReplicaInfo.ReplicaTokenInfo} list to serialize.
* @param outputStream {@link FileOutputStream} to persist the tokens to.
* @throws IOException if an exception happens during serialization.
*/
private void serializeVersion0Tokens(List<RemoteReplicaInfo.ReplicaTokenInfo> tokenInfoList,
FileOutputStream outputStream) throws IOException {
CrcOutputStream crcOutputStream = new CrcOutputStream(outputStream);
DataOutputStream writer = new DataOutputStream(crcOutputStream);
try {
// write the current version
writer.writeShort(0);
for (RemoteReplicaInfo.ReplicaTokenInfo replicaTokenInfo : tokenInfoList) {
writer.write(replicaTokenInfo.getPartitionId().getBytes());
// Write hostname
writer.writeInt(replicaTokenInfo.getHostname().getBytes().length);
writer.write(replicaTokenInfo.getHostname().getBytes());
// Write replica path
writer.writeInt(replicaTokenInfo.getReplicaPath().getBytes().length);
writer.write(replicaTokenInfo.getReplicaPath().getBytes());
// Write port
writer.writeInt(replicaTokenInfo.getPort());
//Write total bytes read from local store
writer.writeLong(replicaTokenInfo.getTotalBytesReadFromLocalStore());
// Write replica token
writer.write(replicaTokenInfo.getReplicaToken().toBytes());
}
long crcValue = crcOutputStream.getValue();
writer.writeLong(crcValue);
} finally {
if (outputStream != null) {
// flush and overwrite file
outputStream.getChannel().force(true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2003,9 +2003,8 @@ private static void checkReplicaTokens(MockClusterMap clusterMap, DataNodeId dat
DataInputStream dataInputStream = new DataInputStream(crcStream);
try {
short version = dataInputStream.readShort();
assertEquals(0, version);
assertEquals(1, version);

System.out.println("setToCheck" + setToCheck.size());
while (dataInputStream.available() > 8) {
// read partition id
PartitionId partitionId = clusterMap.getPartitionIdFromStream(dataInputStream);
Expand Down

0 comments on commit 362ffcf

Please sign in to comment.