diff --git a/ambry-api/src/main/java/com.github.ambry/clustermap/ClusterMapSnapshotConstants.java b/ambry-api/src/main/java/com.github.ambry/clustermap/ClusterMapSnapshotConstants.java index c337f763f4..133add28f7 100644 --- a/ambry-api/src/main/java/com.github.ambry/clustermap/ClusterMapSnapshotConstants.java +++ b/ambry-api/src/main/java/com.github.ambry/clustermap/ClusterMapSnapshotConstants.java @@ -68,4 +68,7 @@ public class ClusterMapSnapshotConstants { public static final String REPLICA_DISK = "disk"; // values public static final String REPLICA_STOPPED = "stopped"; + + //the virtual mount point for the cloud replica in the cluster map + public static final String CLOUD_REPLICA_MOUNT = "/vcr"; } diff --git a/ambry-api/src/main/java/com.github.ambry/clustermap/ReplicaId.java b/ambry-api/src/main/java/com.github.ambry/clustermap/ReplicaId.java index 5be7b9d65e..b50693beb2 100644 --- a/ambry-api/src/main/java/com.github.ambry/clustermap/ReplicaId.java +++ b/ambry-api/src/main/java/com.github.ambry/clustermap/ReplicaId.java @@ -89,4 +89,9 @@ public interface ReplicaId extends Resource { * @return true if this replica is in sealed state. */ boolean isSealed(); + + /** + * @return the {@code ReplicaType} for this replica. + */ + ReplicaType getReplicaType(); } diff --git a/ambry-api/src/main/java/com.github.ambry/clustermap/ReplicaType.java b/ambry-api/src/main/java/com.github.ambry/clustermap/ReplicaType.java new file mode 100644 index 0000000000..10b4684cc9 --- /dev/null +++ b/ambry-api/src/main/java/com.github.ambry/clustermap/ReplicaType.java @@ -0,0 +1,22 @@ +/* + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.github.ambry.clustermap; + +/** + * The type of replica. + */ +public enum ReplicaType { + DISK_BACKED, CLOUD_BACKED +} diff --git a/ambry-api/src/main/java/com.github.ambry/config/ReplicationConfig.java b/ambry-api/src/main/java/com.github.ambry/config/ReplicationConfig.java index 46c1165bed..e91e2d77eb 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/ReplicationConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/ReplicationConfig.java @@ -22,7 +22,14 @@ public class ReplicationConfig { */ @Config("replication.token.factory") @Default("com.github.ambry.store.StoreFindTokenFactory") - public final String replicationTokenFactory; + public final String replicationStoreTokenFactory; + + /** + * The factory class the replicatio uses to create cloud token + */ + @Config("replcation.cloudtoken.factory") + @Default("com.github.ambry.cloud.CloudFindTokenFactory") + public final String replicationCloudTokenFactory; /** * The number of replica threads on each server that runs the replication protocol for intra dc replication @@ -124,10 +131,19 @@ public class ReplicationConfig { @Default("false") public final boolean replicationTrackPerPartitionLagFromRemote; + /** + * The version of metadata request to be used for replication. + */ + @Config("replication.metadatarequest.version") + @Default("1") + public final short replicaMetadataRequestVersion; + public ReplicationConfig(VerifiableProperties verifiableProperties) { - replicationTokenFactory = + replicationStoreTokenFactory = verifiableProperties.getString("replication.token.factory", "com.github.ambry.store.StoreFindTokenFactory"); + replicationCloudTokenFactory = verifiableProperties.getString("replication.cloudtoken.factory", + "com.github.ambry.cloud.CloudFindTokenFactory"); replicationNumOfIntraDCReplicaThreads = verifiableProperties.getInt("replication.no.of.intra.dc.replica.threads", 1); replicationNumOfInterDCReplicaThreads = @@ -155,5 +171,7 @@ public ReplicationConfig(VerifiableProperties verifiableProperties) { verifiableProperties.getBoolean("replication.persist.token.on.shutdown.or.replica.remove", true); replicationTrackPerPartitionLagFromRemote = verifiableProperties.getBoolean("replication.track.per.partition.lag.from.remote", false); + replicaMetadataRequestVersion = + verifiableProperties.getShortInRange("replication.metadatarequest.version", (short) 1, (short) 1, (short) 2); } } diff --git a/ambry-api/src/main/java/com.github.ambry/store/FindToken.java b/ambry-api/src/main/java/com.github.ambry/replication/FindToken.java similarity index 74% rename from ambry-api/src/main/java/com.github.ambry/store/FindToken.java rename to ambry-api/src/main/java/com.github.ambry/replication/FindToken.java index f36e82b65e..ed04ade443 100644 --- a/ambry-api/src/main/java/com.github.ambry/store/FindToken.java +++ b/ambry-api/src/main/java/com.github.ambry/replication/FindToken.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package com.github.ambry.store; +package com.github.ambry.replication; /** * The find token used to search entries in the store @@ -28,4 +28,16 @@ public interface FindToken { * @return The total bytes read so far until this token */ public long getBytesRead(); + + /** + * Returns the type of {@code FindToken} + * @return the type of the token + */ + public FindTokenType getType(); + + /** + * Returns the version of the {@link FindToken} + * @return the version of the {@link FindToken} + */ + public short getVersion(); } diff --git a/ambry-api/src/main/java/com.github.ambry/store/FindTokenFactory.java b/ambry-api/src/main/java/com.github.ambry/replication/FindTokenFactory.java similarity index 96% rename from ambry-api/src/main/java/com.github.ambry/store/FindTokenFactory.java rename to ambry-api/src/main/java/com.github.ambry/replication/FindTokenFactory.java index c060fcd9dd..a7eddafdbb 100644 --- a/ambry-api/src/main/java/com.github.ambry/store/FindTokenFactory.java +++ b/ambry-api/src/main/java/com.github.ambry/replication/FindTokenFactory.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package com.github.ambry.store; +package com.github.ambry.replication; import java.io.DataInputStream; import java.io.IOException; diff --git a/ambry-api/src/main/java/com.github.ambry/replication/FindTokenHelper.java b/ambry-api/src/main/java/com.github.ambry/replication/FindTokenHelper.java new file mode 100644 index 0000000000..a31e09be57 --- /dev/null +++ b/ambry-api/src/main/java/com.github.ambry/replication/FindTokenHelper.java @@ -0,0 +1,68 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.replication; + +import com.github.ambry.clustermap.ReplicaType; +import com.github.ambry.config.ReplicationConfig; +import com.github.ambry.store.StoreKeyFactory; +import com.github.ambry.utils.Utils; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper class to get findtoken based on replica type or input stream. + */ +public class FindTokenHelper { + private static final Logger logger = LoggerFactory.getLogger(FindTokenHelper.class); + private final StoreKeyFactory storeKeyFactory; + private final ReplicationConfig replicationConfig; + private final Map findTokenFactoryMap; + + public FindTokenHelper() { + storeKeyFactory = null; + replicationConfig = null; + findTokenFactoryMap = null; + } + + /** + * Create a {@code FindTokenHelper} object. + * @param storeKeyFactory + * @param replicationConfig + */ + public FindTokenHelper(StoreKeyFactory storeKeyFactory, ReplicationConfig replicationConfig) + throws ReflectiveOperationException { + this.storeKeyFactory = storeKeyFactory; + this.replicationConfig = replicationConfig; + findTokenFactoryMap = new HashMap<>(); + findTokenFactoryMap.put(ReplicaType.DISK_BACKED, + Utils.getObj(replicationConfig.replicationStoreTokenFactory, storeKeyFactory)); + findTokenFactoryMap.put(ReplicaType.CLOUD_BACKED, Utils.getObj(replicationConfig.replicationCloudTokenFactory)); + } + + /** + * Get {@code FindTokenFactory} object based on {@code ReplicaType} + * @param replicaType for which to get the {@code FindTokenFactory} object + * @return {@code FindTokenFactory} object. + * @throws ReflectiveOperationException + */ + public FindTokenFactory getFindTokenFactoryFromReplicaType(ReplicaType replicaType) { + if (!findTokenFactoryMap.containsKey(replicaType)) { + throw new IllegalArgumentException("Invalid replica type " + replicaType.getClass()); + } + return findTokenFactoryMap.get(replicaType); + } +} diff --git a/ambry-api/src/main/java/com.github.ambry/replication/FindTokenType.java b/ambry-api/src/main/java/com.github.ambry/replication/FindTokenType.java new file mode 100644 index 0000000000..b46e1efc09 --- /dev/null +++ b/ambry-api/src/main/java/com.github.ambry/replication/FindTokenType.java @@ -0,0 +1,21 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.replication; + +/** + * The type of replica token + */ +public enum FindTokenType { + Uninitialized, JournalBased, IndexBased, CloudBased; +} \ No newline at end of file diff --git a/ambry-api/src/main/java/com.github.ambry/store/FindInfo.java b/ambry-api/src/main/java/com.github.ambry/store/FindInfo.java index c19823e8c2..0204ccdf7d 100644 --- a/ambry-api/src/main/java/com.github.ambry/store/FindInfo.java +++ b/ambry-api/src/main/java/com.github.ambry/store/FindInfo.java @@ -13,6 +13,7 @@ */ package com.github.ambry.store; +import com.github.ambry.replication.FindToken; import java.util.List; diff --git a/ambry-api/src/main/java/com.github.ambry/store/Store.java b/ambry-api/src/main/java/com.github.ambry/store/Store.java index 066a0e405d..f000fe10d7 100644 --- a/ambry-api/src/main/java/com.github.ambry/store/Store.java +++ b/ambry-api/src/main/java/com.github.ambry/store/Store.java @@ -13,6 +13,7 @@ */ package com.github.ambry.store; +import com.github.ambry.replication.FindToken; import java.util.EnumSet; import java.util.List; import java.util.Set; diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java index bdb30d2cee..01629b5ddd 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java @@ -26,6 +26,7 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.ConnectionPool; import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.replication.FindTokenFactory; import com.github.ambry.replication.PartitionInfo; import com.github.ambry.replication.RemoteReplicaInfo; import com.github.ambry.replication.ReplicationEngine; @@ -77,7 +78,7 @@ public CloudBackupManager(VerifiableProperties properties, CloudConfig cloudConf this.cloudDestination = cloudDestinationFactory.getCloudDestination(); this.persistor = new CloudTokenPersistor(replicaTokenFileName, mountPathToPartitionInfos, replicationMetrics, clusterMap, - factory, cloudDestination); + tokenHelper, cloudDestination); this.cloudStorageCompactor = new CloudStorageCompactor(cloudDestination, partitionToPartitionInfo.keySet(), vcrMetrics, false); } @@ -184,8 +185,10 @@ void addPartition(PartitionId partitionId) throws ReplicationException { // We need to ensure that a replica token gets persisted only after the corresponding data in the // store gets flushed to cloud. We use the store flush interval multiplied by a constant factor // to determine the token flush interval + FindTokenFactory findTokenFactory = + tokenHelper.getFindTokenFactoryFromReplicaType(peerReplica.getReplicaType()); RemoteReplicaInfo remoteReplicaInfo = - new RemoteReplicaInfo(peerReplica, cloudReplica, cloudStore, factory.getNewFindToken(), + new RemoteReplicaInfo(peerReplica, cloudReplica, cloudStore, findTokenFactory.getNewFindToken(), storeConfig.storeDataFlushIntervalSeconds * SystemTime.MsPerSec * Replication_Delay_Multiplier, SystemTime.getInstance(), peerReplica.getDataNodeId().getPortToConnectTo()); replicationMetrics.addMetricsForRemoteReplicaInfo(remoteReplicaInfo); @@ -243,5 +246,3 @@ public VcrMetrics getVcrMetrics() { return vcrMetrics; } } - - diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java index d48746a087..8aff7ce35e 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java @@ -20,8 +20,8 @@ import com.github.ambry.config.CloudConfig; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.replication.FindToken; import com.github.ambry.store.FindInfo; -import com.github.ambry.store.FindToken; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageWriteSet; import com.github.ambry.store.Store; @@ -150,7 +150,7 @@ public void downloadBlob(BlobId blobId, OutputStream outputStream) throws StoreE try { cloudDestination.downloadBlob(blobId, outputStream); } catch (CloudStorageException e) { - throw new StoreException("Error occured in downloading blob for blobid :" + blobId, StoreErrorCodes.IOError); + throw new StoreException("Error occurred in downloading blob for blobid :" + blobId, StoreErrorCodes.IOError); } } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudFindToken.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudFindToken.java index 4180687af6..7492443a99 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudFindToken.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudFindToken.java @@ -13,7 +13,10 @@ */ package com.github.ambry.cloud; -import com.github.ambry.store.FindToken; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenType; +import java.io.DataInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; @@ -27,6 +30,7 @@ public class CloudFindToken implements FindToken { static final short VERSION_0 = 0; static final short CURRENT_VERSION = VERSION_0; private final short version; + private final FindTokenType type; private final long latestUploadTime; private final String latestBlobId; private final long bytesRead; @@ -39,11 +43,21 @@ public CloudFindToken() { /** Constructor for in-progress token */ public CloudFindToken(long latestUploadTime, String latestBlobId, long bytesRead) { this.version = CURRENT_VERSION; + this.type = FindTokenType.CloudBased; this.latestUploadTime = latestUploadTime; this.latestBlobId = latestBlobId; this.bytesRead = bytesRead; } + /** Constructor for reading token that can have older version*/ + public CloudFindToken(short version, long latestUploadTime, String latestBlobId, long bytesRead) { + this.version = version; + this.type = FindTokenType.CloudBased; + this.latestBlobId = latestBlobId; + this.latestUploadTime = latestUploadTime; + this.bytesRead = bytesRead; + } + /** * Utility to construct a new CloudFindToken from a previous instance and the results of a findEntriesSince query. * @param prevToken the previous CloudFindToken. @@ -56,7 +70,8 @@ public static CloudFindToken getUpdatedToken(CloudFindToken prevToken, List> partitionGroupedByMountPath, - ReplicationMetrics replicationMetrics, ClusterMap clusterMap, FindTokenFactory tokenFactory, + ReplicationMetrics replicationMetrics, ClusterMap clusterMap, FindTokenHelper tokenHelper, CloudDestination cloudDestination) { - super(partitionGroupedByMountPath, replicationMetrics, clusterMap, tokenFactory); + super(partitionGroupedByMountPath, replicationMetrics, clusterMap, tokenHelper); this.replicaTokenFileName = replicaTokenFileName; this.cloudDestination = cloudDestination; } diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java index 1024b198a2..fac10af34a 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java @@ -32,14 +32,15 @@ import com.github.ambry.network.Port; import com.github.ambry.network.PortType; import com.github.ambry.replication.BlobIdTransformer; +import com.github.ambry.replication.FindToken; import com.github.ambry.replication.MockConnectionPool; import com.github.ambry.replication.MockFindToken; +import com.github.ambry.replication.MockFindTokenHelper; import com.github.ambry.replication.MockHost; import com.github.ambry.replication.RemoteReplicaInfo; import com.github.ambry.replication.ReplicaThread; import com.github.ambry.replication.ReplicationMetrics; import com.github.ambry.store.FindInfo; -import com.github.ambry.store.FindToken; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MockMessageWriteSet; import com.github.ambry.store.MockStoreKeyConverterFactory; @@ -457,10 +458,10 @@ public void testPutWithTtl() throws Exception { // Create ReplicaThread and add RemoteReplicaInfo to it. ReplicationMetrics replicationMetrics = new ReplicationMetrics(new MetricRegistry(), Collections.emptyList()); ReplicaThread replicaThread = - new ReplicaThread("threadtest", new MockFindToken.MockFindTokenFactory(), clusterMap, new AtomicInteger(0), - cloudDataNode, connectionPool, replicationConfig, replicationMetrics, null, storeKeyConverter, transformer, - clusterMap.getMetricRegistry(), false, cloudDataNode.getDatacenterName(), new ResponseHandler(clusterMap), - new MockTime()); + new ReplicaThread("threadtest", new MockFindTokenHelper(storeKeyFactory, replicationConfig), clusterMap, + new AtomicInteger(0), cloudDataNode, connectionPool, replicationConfig, replicationMetrics, null, + storeKeyConverter, transformer, clusterMap.getMetricRegistry(), false, cloudDataNode.getDatacenterName(), + new ResponseHandler(clusterMap), new MockTime()); for (ReplicaId replica : partitionId.getReplicaIds()) { if (replica.getDataNodeId() == remoteHost.dataNodeId) { diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudFindTokenFactoryTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudFindTokenFactoryTest.java new file mode 100644 index 0000000000..d64424001f --- /dev/null +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudFindTokenFactoryTest.java @@ -0,0 +1,60 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud; + +import com.github.ambry.replication.FindTokenType; +import com.github.ambry.utils.UtilsTest; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Random; +import org.junit.Test; + +import static org.junit.Assert.*; + + +/** + * Test for {@link CloudFindTokenFactory} + */ +public class CloudFindTokenFactoryTest { + + /** + * test get find token from stream + * @throws IOException if an IO exception happens during deserialization + */ + @Test + public void getFindTokenTest() throws IOException { + short version = 0; + FindTokenType findTokenType = FindTokenType.CloudBased; + Random random = new Random(); + long latestBlobUploadTime = random.nextLong(); + String latestBlobId = UtilsTest.getRandomString(10); + long bytesRead = random.nextLong(); + + CloudFindToken cloudFindToken1 = new CloudFindToken(version, latestBlobUploadTime, latestBlobId, bytesRead); + DataInputStream stream = new DataInputStream(new ByteArrayInputStream(cloudFindToken1.toBytes())); + CloudFindToken cloudFindToken2 = (CloudFindToken) new CloudFindTokenFactory().getFindToken(stream); + assertEquals("incorrect token returned from factory", cloudFindToken1, cloudFindToken2); + } + + /** + * test get new find token + */ + @Test + public void getNewFindTokenTest() { + CloudFindToken cloudFindToken1 = (CloudFindToken) new CloudFindTokenFactory().getNewFindToken(); + CloudFindToken cloudFindToken2 = new CloudFindToken(); + assertEquals("tokens should be equal", cloudFindToken1, cloudFindToken2); + } +} diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudFindTokenTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudFindTokenTest.java new file mode 100644 index 0000000000..ab24c8bdd6 --- /dev/null +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudFindTokenTest.java @@ -0,0 +1,169 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud; + +import com.github.ambry.replication.FindTokenType; +import com.github.ambry.utils.UtilsTest; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import org.junit.Test; + +import static org.junit.Assert.*; + + +/** + * Test for {@link CloudFindToken} + */ +public class CloudFindTokenTest { + + /** + * Test for correctness of {@code CloudFindToken#equals(Object)} + */ + @Test + public void equalityTest() { + short version = 0; + FindTokenType findTokenType = FindTokenType.CloudBased; + Random random = new Random(); + long latestBlobUploadTime = random.nextLong(); + String latestBlobId = UtilsTest.getRandomString(10); + long bytesRead = random.nextLong(); + + //compare empty tokens + ensureEqual(new CloudFindToken(), new CloudFindToken()); + + //compare token constructed from all constructors + CloudFindToken token1 = new CloudFindToken(latestBlobUploadTime, latestBlobId, bytesRead); + CloudFindToken token2 = new CloudFindToken(latestBlobUploadTime, latestBlobId, bytesRead); + ensureEqual(token1, token2); + + token1 = new CloudFindToken(version, latestBlobUploadTime, latestBlobId, bytesRead); + token2 = new CloudFindToken(version, latestBlobUploadTime, latestBlobId, bytesRead); + ensureEqual(token1, token2); + + //ensure inequality for any unequal field + token2 = new CloudFindToken((short) 1, latestBlobUploadTime, latestBlobId, bytesRead); + ensureUnequal(token1, token2); + + token2 = new CloudFindToken(version, latestBlobUploadTime + 100, latestBlobId, bytesRead); + ensureUnequal(token1, token2); + + token2 = new CloudFindToken(version, latestBlobUploadTime, "", bytesRead); + ensureUnequal(token1, token2); + + token2 = new CloudFindToken(version, latestBlobUploadTime, latestBlobId, bytesRead + 10); + ensureUnequal(token1, token2); + + token2 = new CloudFindToken(); + ensureUnequal(token1, token2); + } + + /** + * Test for serialization and deserialization of cloud token + * @throws IOException if an IO exception happens during deserialization + */ + @Test + public void serdeTest() throws IOException { + short version = 0; + FindTokenType findTokenType = FindTokenType.CloudBased; + Random random = new Random(); + long latestBlobUploadTime = random.nextLong(); + String latestBlobId = UtilsTest.getRandomString(10); + long bytesRead = random.nextLong(); + + //Deserialization test + + //token with invalid version + CloudFindToken invalidToken = new CloudFindToken((short) 1, latestBlobUploadTime, latestBlobId, bytesRead); + DataInputStream tokenStream = getSerializedStream(invalidToken); + try { + CloudFindToken deSerToken = CloudFindToken.fromBytes(tokenStream); + fail("deserialization of token with invalid version should have failed"); + } catch (IllegalStateException ise) { + } + + //valid token + CloudFindToken token = new CloudFindToken(version, latestBlobUploadTime, latestBlobId, bytesRead); + tokenStream = getSerializedStream(token); + CloudFindToken deSerToken = CloudFindToken.fromBytes(tokenStream); + assertEquals("Stream should have ended ", 0, tokenStream.available()); + assertEquals(token, deSerToken); + + //Serialization test + + //token with invalid version + DataInputStream serializedStream = getSerializedStream(invalidToken); + try { + deSerToken = CloudFindToken.fromBytes(serializedStream); + fail("serialization of token with invalid version should have failed"); + } catch (IllegalStateException ise) { + } + + //valid token + serializedStream = new DataInputStream(new ByteArrayInputStream(token.toBytes())); + deSerToken = CloudFindToken.fromBytes(serializedStream); + assertEquals("Stream should have ended ", 0, serializedStream.available()); + assertEquals(token, deSerToken); + } + + /** + * helper to ensure that token passed are equal + * @param token1 + * @param token2 + */ + private void ensureEqual(CloudFindToken token1, CloudFindToken token2) { + assertEquals("Tokens should match", token1, token2); + assertEquals("Hashcode of tokens should match", token1.hashCode(), token2.hashCode()); + } + + /** + * helper to ensure that token passed are not equal + * @param token1 + * @param token2 + */ + private void ensureUnequal(CloudFindToken token1, CloudFindToken token2) { + assertFalse("Tokens should match", token1.equals(token2)); + } + + /** + * helper to seriliaze token. + * @param token {@code CloudFindToken} object to serialize + * @return DataInputStream serialized stream + */ + private DataInputStream getSerializedStream(CloudFindToken token) { + int size = 3 * Short.BYTES + 2 * Long.BYTES; + if (token.getLatestBlobId() != null) { + size += token.getLatestBlobId().length(); + } + byte[] buf = new byte[size]; + ByteBuffer bufWrap = ByteBuffer.wrap(buf); + // add version + bufWrap.putShort(token.getVersion()); + // add type + bufWrap.putShort((short) token.getType().ordinal()); + // add latestUploadTime + bufWrap.putLong(token.getLatestUploadTime()); + // add bytesRead + bufWrap.putLong(token.getBytesRead()); + if (token.getLatestBlobId() != null) { + bufWrap.putShort((short) token.getLatestBlobId().length()); + bufWrap.put(token.getLatestBlobId().getBytes()); + } else { + bufWrap.putShort((short) 0); + } + return new DataInputStream(new ByteArrayInputStream(buf)); + } +} diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudTokenPersistorTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudTokenPersistorTest.java index 503556a330..50938c4a56 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudTokenPersistorTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudTokenPersistorTest.java @@ -22,7 +22,9 @@ import com.github.ambry.commons.BlobIdFactory; import com.github.ambry.config.CloudConfig; import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.config.ReplicationConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.replication.PartitionInfo; import com.github.ambry.replication.RemoteReplicaInfo; import com.github.ambry.replication.ReplicationMetrics; @@ -51,7 +53,8 @@ public void basicTest() throws Exception { ClusterMap clusterMap = new MockClusterMap(); DataNodeId dataNodeId = new CloudDataNode(cloudConfig, clusterMapConfig); Map> mountPathToPartitionInfoList = new HashMap<>(); - StoreFindTokenFactory factory = new StoreFindTokenFactory(new BlobIdFactory(clusterMap)); + BlobIdFactory blobIdFactory = new BlobIdFactory(clusterMap); + StoreFindTokenFactory factory = new StoreFindTokenFactory(blobIdFactory); PartitionId partitionId = clusterMap.getAllPartitionIds(null).get(0); ReplicaId cloudReplicaId = new CloudReplica(cloudConfig, partitionId, dataNodeId); @@ -72,14 +75,15 @@ public void basicTest() throws Exception { LatchBasedInMemoryCloudDestination cloudDestination = new LatchBasedInMemoryCloudDestination(Collections.emptyList()); + ReplicationConfig replicationConfig = new ReplicationConfig(new VerifiableProperties(props)); CloudTokenPersistor cloudTokenPersistor = new CloudTokenPersistor("replicaTokens", mountPathToPartitionInfoList, - new ReplicationMetrics(new MetricRegistry(), Collections.emptyList()), clusterMap, factory, cloudDestination); + new ReplicationMetrics(new MetricRegistry(), Collections.emptyList()), clusterMap, + new FindTokenHelper(blobIdFactory, replicationConfig), cloudDestination); cloudTokenPersistor.persist(cloudReplicaId.getMountPath(), replicaTokenInfos); List retrievedReplicaTokenInfos = cloudTokenPersistor.retrieve(cloudReplicaId.getMountPath()); - Assert.assertEquals("Number of tokens doesn't match.", replicaTokenInfos.size(), - retrievedReplicaTokenInfos.size()); + Assert.assertEquals("Number of tokens doesn't match.", replicaTokenInfos.size(), retrievedReplicaTokenInfos.size()); for (int i = 0; i < replicaTokenInfos.size(); i++) { Assert.assertArrayEquals("Token is not correct.", replicaTokenInfos.get(i).getReplicaToken().toBytes(), retrievedReplicaTokenInfos.get(i).getReplicaToken().toBytes()); diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryReplica.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryReplica.java index 7730b0c900..4ea174f77b 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryReplica.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryReplica.java @@ -164,6 +164,11 @@ public void markDiskUp() { disk.onDiskOk(); } + @Override + public ReplicaType getReplicaType() { + return ReplicaType.DISK_BACKED; + } + /** * Take actions, if any, when this replica is unavailable. */ diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/Replica.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/Replica.java index 8a868fc3f5..00e3b3a509 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/Replica.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/Replica.java @@ -38,6 +38,7 @@ class Replica implements ReplicaId { private Disk disk; private volatile boolean isStopped = false; private final ResourceStatePolicy resourceStatePolicy; + private final ReplicaType replicaType; private Logger logger = LoggerFactory.getLogger(getClass()); @@ -57,6 +58,11 @@ class Replica implements ReplicaId { throw new IllegalStateException("Error creating resource state policy when instantiating a replica " + partition, e); } + if (disk.getMountPath().startsWith(CLOUD_REPLICA_MOUNT)) { + replicaType = ReplicaType.CLOUD_BACKED; + } else { + replicaType = ReplicaType.DISK_BACKED; + } validate(); } @@ -146,6 +152,11 @@ public void markDiskUp() { disk.onDiskOk(); } + @Override + public ReplicaType getReplicaType() { + return replicaType; + } + Partition getPartition() { return partition; } diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockReplicaId.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockReplicaId.java index b7e975694f..7a4f9735c8 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockReplicaId.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockReplicaId.java @@ -146,6 +146,11 @@ public void markDiskUp() { diskId.onDiskOk(); } + @Override + public ReplicaType getReplicaType() { + return ReplicaType.DISK_BACKED; + } + public void cleanup() { File replicaDir = new File(replicaPath); File[] replicaDirFiles = replicaDir.listFiles(); diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataRequest.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataRequest.java index 81af23a838..5a2b56ab7b 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataRequest.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataRequest.java @@ -14,7 +14,7 @@ package com.github.ambry.protocol; import com.github.ambry.clustermap.ClusterMap; -import com.github.ambry.store.FindTokenFactory; +import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.utils.Utils; import java.io.DataInputStream; import java.io.IOException; @@ -34,14 +34,17 @@ public class ReplicaMetadataRequest extends RequestOrResponse { private static final int Max_Entries_Size_In_Bytes = 8; private static final int Replica_Metadata_Request_Info_List_Size_In_Bytes = 4; - private static final short Replica_Metadata_Request_Version_V1 = 1; + public static final short Replica_Metadata_Request_Version_V1 = 1; + public static final short Replica_Metadata_Request_Version_V2 = 2; public ReplicaMetadataRequest(int correlationId, String clientId, - List replicaMetadataRequestInfoList, long maxTotalSizeOfEntriesInBytes) { - super(RequestOrResponseType.ReplicaMetadataRequest, Replica_Metadata_Request_Version_V1, correlationId, clientId); + List replicaMetadataRequestInfoList, long maxTotalSizeOfEntriesInBytes, + short version) { + super(RequestOrResponseType.ReplicaMetadataRequest, version, correlationId, clientId); if (replicaMetadataRequestInfoList == null) { throw new IllegalArgumentException("replicaMetadataRequestInfoList cannot be null"); } + validateVersion(version); this.replicaMetadataRequestInfoList = replicaMetadataRequestInfoList; this.maxTotalSizeOfEntriesInBytes = maxTotalSizeOfEntriesInBytes; this.replicaMetadataRequestInfoListSizeInBytes = 0; @@ -50,10 +53,10 @@ public ReplicaMetadataRequest(int correlationId, String clientId, } } - public static ReplicaMetadataRequest readFrom(DataInputStream stream, ClusterMap clusterMap, FindTokenFactory factory) - throws IOException { - RequestOrResponseType type = RequestOrResponseType.ReplicaMetadataRequest; + public static ReplicaMetadataRequest readFrom(DataInputStream stream, ClusterMap clusterMap, + FindTokenHelper findTokenHelper) throws IOException { Short versionId = stream.readShort(); + validateVersion(versionId); int correlationId = stream.readInt(); String clientId = Utils.readIntString(stream); int replicaMetadataRequestInfoListCount = stream.readInt(); @@ -61,12 +64,13 @@ public static ReplicaMetadataRequest readFrom(DataInputStream stream, ClusterMap new ArrayList(replicaMetadataRequestInfoListCount); for (int i = 0; i < replicaMetadataRequestInfoListCount; i++) { ReplicaMetadataRequestInfo replicaMetadataRequestInfo = - ReplicaMetadataRequestInfo.readFrom(stream, clusterMap, factory); + ReplicaMetadataRequestInfo.readFrom(stream, clusterMap, findTokenHelper, versionId); replicaMetadataRequestInfoList.add(replicaMetadataRequestInfo); } long maxTotalSizeOfEntries = stream.readLong(); // ignore version for now - return new ReplicaMetadataRequest(correlationId, clientId, replicaMetadataRequestInfoList, maxTotalSizeOfEntries); + return new ReplicaMetadataRequest(correlationId, clientId, replicaMetadataRequestInfoList, maxTotalSizeOfEntries, + versionId); } public List getReplicaMetadataRequestInfoList() { @@ -116,4 +120,10 @@ public String toString() { sb.append("]"); return sb.toString(); } + + static void validateVersion(short version) { + if (version < Replica_Metadata_Request_Version_V1 || version > Replica_Metadata_Request_Version_V2) { + throw new IllegalArgumentException("Invalid replicametadata request version: " + version); + } + } } diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataRequestInfo.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataRequestInfo.java index 3ee4c9c3f6..2e6a7c92f7 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataRequestInfo.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataRequestInfo.java @@ -15,8 +15,10 @@ import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.PartitionId; -import com.github.ambry.store.FindToken; -import com.github.ambry.store.FindTokenFactory; +import com.github.ambry.clustermap.ReplicaType; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenFactory; +import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.utils.Utils; import java.io.DataInputStream; import java.io.IOException; @@ -34,14 +36,18 @@ public class ReplicaMetadataRequestInfo { private FindToken token; private String hostName; private String replicaPath; + private ReplicaType replicaType; private PartitionId partitionId; + private short requestVersion; private static final int ReplicaPath_Field_Size_In_Bytes = 4; private static final int HostName_Field_Size_In_Bytes = 4; + private static final int ReplicaType_Size_In_Bytes = Short.BYTES; private final Logger logger = LoggerFactory.getLogger(getClass()); - public ReplicaMetadataRequestInfo(PartitionId partitionId, FindToken token, String hostName, String replicaPath) { + public ReplicaMetadataRequestInfo(PartitionId partitionId, FindToken token, String hostName, String replicaPath, + ReplicaType replicaType, short requestVersion) { if (partitionId == null || token == null || hostName == null || replicaPath == null) { throw new IllegalArgumentException( "A parameter in the replica metadata request is null: " + "[Partition: " + partitionId + ", token: " + token @@ -51,27 +57,45 @@ public ReplicaMetadataRequestInfo(PartitionId partitionId, FindToken token, Stri this.token = token; this.hostName = hostName; this.replicaPath = replicaPath; + this.replicaType = replicaType; + this.requestVersion = requestVersion; + ReplicaMetadataRequest.validateVersion(this.requestVersion); } public static ReplicaMetadataRequestInfo readFrom(DataInputStream stream, ClusterMap clusterMap, - FindTokenFactory factory) throws IOException { + FindTokenHelper findTokenHelper, short requestVersion) throws IOException { String hostName = Utils.readIntString(stream); String replicaPath = Utils.readIntString(stream); + ReplicaType replicaType; + if (requestVersion == ReplicaMetadataRequest.Replica_Metadata_Request_Version_V2) { + replicaType = ReplicaType.values()[stream.readShort()]; + } else { + //before version 2 we only have disk based replicas + replicaType = ReplicaType.DISK_BACKED; + } PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); - FindToken token = factory.getFindToken(stream); - return new ReplicaMetadataRequestInfo(partitionId, token, hostName, replicaPath); + FindTokenFactory findTokenFactory = findTokenHelper.getFindTokenFactoryFromReplicaType(replicaType); + FindToken token = findTokenFactory.getFindToken(stream); + return new ReplicaMetadataRequestInfo(partitionId, token, hostName, replicaPath, replicaType, requestVersion); } public void writeTo(ByteBuffer buffer) { Utils.serializeString(buffer, hostName, Charset.defaultCharset()); Utils.serializeString(buffer, replicaPath, Charset.defaultCharset()); + if (requestVersion == ReplicaMetadataRequest.Replica_Metadata_Request_Version_V2) { + buffer.putShort((short) replicaType.ordinal()); + } buffer.put(partitionId.getBytes()); buffer.put(token.toBytes()); } public long sizeInBytes() { - return HostName_Field_Size_In_Bytes + hostName.getBytes().length + ReplicaPath_Field_Size_In_Bytes + long size = HostName_Field_Size_In_Bytes + hostName.getBytes().length + ReplicaPath_Field_Size_In_Bytes + replicaPath.getBytes().length + +partitionId.getBytes().length + token.toBytes().length; + if (requestVersion == ReplicaMetadataRequest.Replica_Metadata_Request_Version_V2) { + size += ReplicaType_Size_In_Bytes; + } + return size; } public String toString() { @@ -79,7 +103,8 @@ public String toString() { sb.append("[Token=").append(token); sb.append(", ").append(" PartitionId=").append(partitionId); sb.append(", ").append(" HostName=").append(hostName); - sb.append(", ").append(" ReplicaPath=").append(replicaPath).append("]"); + sb.append(", ").append(" ReplicaPath=").append(replicaPath); + sb.append(", ").append(" ReplicaType=").append(replicaType).append("]"); return sb.toString(); } @@ -98,4 +123,8 @@ public String getReplicaPath() { public PartitionId getPartitionId() { return partitionId; } + + public ReplicaType getReplicaType() { + return replicaType; + } } diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataResponse.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataResponse.java index 931641c53e..90b216ddfe 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataResponse.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataResponse.java @@ -15,7 +15,7 @@ import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.commons.ServerErrorCode; -import com.github.ambry.store.FindTokenFactory; +import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.utils.Utils; import java.io.DataInputStream; import java.io.IOException; @@ -36,17 +36,17 @@ public class ReplicaMetadataResponse extends Response { private static int Replica_Metadata_Response_Info_List_Size_In_Bytes = 4; - static final short REPLICA_METADATA_RESPONSE_VERSION_V_1 = 1; - static final short REPLICA_METADATA_RESPONSE_VERSION_V_2 = 2; - static final short REPLICA_METADATA_RESPONSE_VERSION_V_3 = 3; - static final short REPLICA_METADATA_RESPONSE_VERSION_V_4 = 4; - static final short REPLICA_METADATA_RESPONSE_VERSION_V_5 = 5; - - static short CURRENT_VERSION = REPLICA_METADATA_RESPONSE_VERSION_V_5; + public static final short REPLICA_METADATA_RESPONSE_VERSION_V_1 = 1; + public static final short REPLICA_METADATA_RESPONSE_VERSION_V_2 = 2; + public static final short REPLICA_METADATA_RESPONSE_VERSION_V_3 = 3; + public static final short REPLICA_METADATA_RESPONSE_VERSION_V_4 = 4; + public static final short REPLICA_METADATA_RESPONSE_VERSION_V_5 = 5; + public static final short REPLICA_METADATA_RESPONSE_VERSION_V_6 = 6; public ReplicaMetadataResponse(int correlationId, String clientId, ServerErrorCode error, - List replicaMetadataResponseInfoList) { - super(RequestOrResponseType.ReplicaMetadataResponse, CURRENT_VERSION, correlationId, clientId, error); + List replicaMetadataResponseInfoList, short version) { + super(RequestOrResponseType.ReplicaMetadataResponse, version, correlationId, clientId, error); + validateVersion(version); this.replicaMetadataResponseInfoList = replicaMetadataResponseInfoList; this.replicaMetadataResponseInfoListSizeInBytes = 0; for (ReplicaMetadataResponseInfo replicaMetadataResponseInfo : replicaMetadataResponseInfoList) { @@ -54,8 +54,9 @@ public ReplicaMetadataResponse(int correlationId, String clientId, ServerErrorCo } } - public ReplicaMetadataResponse(int correlationId, String clientId, ServerErrorCode error) { - super(RequestOrResponseType.ReplicaMetadataResponse, CURRENT_VERSION, correlationId, clientId, error); + public ReplicaMetadataResponse(int correlationId, String clientId, ServerErrorCode error, short version) { + super(RequestOrResponseType.ReplicaMetadataResponse, version, correlationId, clientId, error); + validateVersion(version); replicaMetadataResponseInfoList = null; replicaMetadataResponseInfoListSizeInBytes = 0; } @@ -64,8 +65,8 @@ public List getReplicaMetadataResponseInfoList() { return replicaMetadataResponseInfoList; } - public static ReplicaMetadataResponse readFrom(DataInputStream stream, FindTokenFactory factory, - ClusterMap clusterMap) throws IOException { + public static ReplicaMetadataResponse readFrom(DataInputStream stream, FindTokenHelper helper, ClusterMap clusterMap) + throws IOException { RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; if (type != RequestOrResponseType.ReplicaMetadataResponse) { throw new IllegalArgumentException("The type of request response is not compatible"); @@ -79,14 +80,13 @@ public static ReplicaMetadataResponse readFrom(DataInputStream stream, FindToken new ArrayList(replicaMetadataResponseInfoListCount); for (int i = 0; i < replicaMetadataResponseInfoListCount; i++) { ReplicaMetadataResponseInfo replicaMetadataResponseInfo = - ReplicaMetadataResponseInfo.readFrom(stream, factory, clusterMap, versionId); + ReplicaMetadataResponseInfo.readFrom(stream, helper, clusterMap, versionId); replicaMetadataResponseInfoList.add(replicaMetadataResponseInfo); } if (error != ServerErrorCode.No_Error) { - return new ReplicaMetadataResponse(correlationId, clientId, error); + return new ReplicaMetadataResponse(correlationId, clientId, error, versionId); } else { - // ignore version for now - return new ReplicaMetadataResponse(correlationId, clientId, error, replicaMetadataResponseInfoList); + return new ReplicaMetadataResponse(correlationId, clientId, error, replicaMetadataResponseInfoList, versionId); } } @@ -135,9 +135,29 @@ public String toString() { } /** - * @return the current version in which new ReplicaMetadataResponse objects are created. + * validate that the version is valid. + * @param version to validate */ - static short getCurrentVersion() { - return CURRENT_VERSION; + static void validateVersion(short version) { + if (version < REPLICA_METADATA_RESPONSE_VERSION_V_1 || version > REPLICA_METADATA_RESPONSE_VERSION_V_6) { + throw new IllegalArgumentException("Invalid replica metadata response version: " + version); + } + } + + /** + * Get the compatible response version for the given request version. + * @param requestVersion for which to get the compatible response version. + * @return compatible responseVersion + */ + public static short getCompatibleResponseVersion(short requestVersion) { + switch (requestVersion) { + case ReplicaMetadataRequest.Replica_Metadata_Request_Version_V1: + return REPLICA_METADATA_RESPONSE_VERSION_V_5; + case ReplicaMetadataRequest.Replica_Metadata_Request_Version_V2: + return REPLICA_METADATA_RESPONSE_VERSION_V_6; + default: + throw new IllegalArgumentException("Invalid replica metadata request version: " + requestVersion + + " No compatible replica metadata response version found"); + } } } diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataResponseInfo.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataResponseInfo.java index ef1424d632..2ee670bd66 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataResponseInfo.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/ReplicaMetadataResponseInfo.java @@ -15,9 +15,11 @@ import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.ReplicaType; import com.github.ambry.commons.ServerErrorCode; -import com.github.ambry.store.FindToken; -import com.github.ambry.store.FindTokenFactory; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenFactory; +import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.store.MessageInfo; import java.io.DataInputStream; import java.io.IOException; @@ -35,20 +37,24 @@ public class ReplicaMetadataResponseInfo { private final int messageInfoListSize; private final long remoteReplicaLagInBytes; private final PartitionId partitionId; + private final ReplicaType replicaType; private final ServerErrorCode errorCode; + private final short responseVersion; private long totalSizeOfAllMessages = 0; private static final int Error_Size_InBytes = 2; private static final int Remote_Replica_Lag_Size_In_Bytes = 8; + private static final int ReplicaType_Size_In_Bytes = Short.BYTES; - private ReplicaMetadataResponseInfo(PartitionId partitionId, FindToken findToken, List messageInfoList, - long remoteReplicaLagInBytes, short replicaMetadataResponseVersion) { + public ReplicaMetadataResponseInfo(PartitionId partitionId, ReplicaType replicaType, FindToken findToken, + List messageInfoList, long remoteReplicaLagInBytes, short replicaMetadataResponseVersion) { if (partitionId == null || findToken == null || messageInfoList == null) { throw new IllegalArgumentException( "Invalid partition or token or message info list for ReplicaMetadataResponseInfo"); } this.partitionId = partitionId; + this.replicaType = replicaType; this.remoteReplicaLagInBytes = remoteReplicaLagInBytes; messageInfoAndMetadataListSerde = new MessageInfoAndMetadataListSerde(messageInfoList, getMessageInfoAndMetadataListSerDeVersion(replicaMetadataResponseVersion)); @@ -56,23 +62,22 @@ private ReplicaMetadataResponseInfo(PartitionId partitionId, FindToken findToken this.token = findToken; this.errorCode = ServerErrorCode.No_Error; messageInfoList.forEach(info -> totalSizeOfAllMessages += info.getSize()); + responseVersion = replicaMetadataResponseVersion; } - public ReplicaMetadataResponseInfo(PartitionId partitionId, ServerErrorCode errorCode) { + public ReplicaMetadataResponseInfo(PartitionId partitionId, ReplicaType replicaType, ServerErrorCode errorCode, + short replicaMetadataResponseVersion) { if (partitionId == null) { throw new IllegalArgumentException("Invalid partition for ReplicaMetadataResponseInfo"); } this.partitionId = partitionId; + this.replicaType = replicaType; this.errorCode = errorCode; this.token = null; this.messageInfoAndMetadataListSerde = null; this.messageInfoListSize = 0; this.remoteReplicaLagInBytes = 0; - } - - public ReplicaMetadataResponseInfo(PartitionId partitionId, FindToken findToken, List messageInfoList, - long remoteReplicaLagInBytes) { - this(partitionId, findToken, messageInfoList, remoteReplicaLagInBytes, ReplicaMetadataResponse.getCurrentVersion()); + this.responseVersion = replicaMetadataResponseVersion; } public PartitionId getPartitionId() { @@ -95,25 +100,36 @@ public ServerErrorCode getError() { return errorCode; } - public static ReplicaMetadataResponseInfo readFrom(DataInputStream stream, FindTokenFactory factory, + public static ReplicaMetadataResponseInfo readFrom(DataInputStream stream, FindTokenHelper helper, ClusterMap clusterMap, short replicaMetadataResponseVersion) throws IOException { PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + ReplicaType replicaType; + if (replicaMetadataResponseVersion == ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_6) { + replicaType = ReplicaType.values()[stream.readShort()]; + } else { + //before REPLICA_METADATA_RESPONSE_VERSION_V_6 there were only disk based replicas + replicaType = ReplicaType.DISK_BACKED; + } ServerErrorCode error = ServerErrorCode.values()[stream.readShort()]; if (error != ServerErrorCode.No_Error) { - return new ReplicaMetadataResponseInfo(partitionId, error); + return new ReplicaMetadataResponseInfo(partitionId, replicaType, error, replicaMetadataResponseVersion); } else { - FindToken token = factory.getFindToken(stream); + FindTokenFactory findTokenFactory = helper.getFindTokenFactoryFromReplicaType(replicaType); + FindToken token = findTokenFactory.getFindToken(stream); MessageInfoAndMetadataListSerde messageInfoAndMetadataList = MessageInfoAndMetadataListSerde.deserializeMessageInfoAndMetadataList(stream, clusterMap, getMessageInfoAndMetadataListSerDeVersion(replicaMetadataResponseVersion)); long remoteReplicaLag = stream.readLong(); - return new ReplicaMetadataResponseInfo(partitionId, token, messageInfoAndMetadataList.getMessageInfoList(), - remoteReplicaLag, replicaMetadataResponseVersion); + return new ReplicaMetadataResponseInfo(partitionId, replicaType, token, + messageInfoAndMetadataList.getMessageInfoList(), remoteReplicaLag, replicaMetadataResponseVersion); } } public void writeTo(ByteBuffer buffer) { buffer.put(partitionId.getBytes()); + if (responseVersion == ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_6) { + buffer.putShort((short) replicaType.ordinal()); + } buffer.putShort((short) errorCode.ordinal()); if (errorCode == ServerErrorCode.No_Error) { buffer.put(token.toBytes()); @@ -123,8 +139,12 @@ public void writeTo(ByteBuffer buffer) { } public long sizeInBytes() { - return (token == null ? 0 : (token.toBytes().length + Remote_Replica_Lag_Size_In_Bytes + messageInfoListSize)) + long size = (token == null ? 0 : (token.toBytes().length + Remote_Replica_Lag_Size_In_Bytes + messageInfoListSize)) + +partitionId.getBytes().length + Error_Size_InBytes; + if (responseVersion == ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_6) { + size += ReplicaType_Size_In_Bytes; + } + return size; } /** @@ -138,6 +158,7 @@ public long getTotalSizeOfAllMessages() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append(partitionId); + sb.append(replicaType.name()); sb.append(" ServerErrorCode=").append(errorCode); if (errorCode == ServerErrorCode.No_Error) { List messageInfos = messageInfoAndMetadataListSerde.getMessageInfoList(); @@ -172,6 +193,7 @@ private static short getMessageInfoAndMetadataListSerDeVersion(short replicaMeta case ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_4: return MessageInfoAndMetadataListSerde.VERSION_4; case ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_5: + case ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_6: return MessageInfoAndMetadataListSerde.DETERMINE_VERSION; default: throw new IllegalArgumentException( diff --git a/ambry-protocol/src/test/java/com.github.ambry.protocol/RequestResponseTest.java b/ambry-protocol/src/test/java/com.github.ambry.protocol/RequestResponseTest.java index 4b91c8ac68..f1bcc7b205 100644 --- a/ambry-protocol/src/test/java/com.github.ambry.protocol/RequestResponseTest.java +++ b/ambry-protocol/src/test/java/com.github.ambry.protocol/RequestResponseTest.java @@ -18,6 +18,7 @@ import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockPartitionId; import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.ReplicaType; import com.github.ambry.commons.BlobId; import com.github.ambry.commons.CommonTestUtils; import com.github.ambry.commons.ServerErrorCode; @@ -25,9 +26,12 @@ import com.github.ambry.messageformat.BlobType; import com.github.ambry.messageformat.MessageFormatFlags; import com.github.ambry.messageformat.MessageMetadata; -import com.github.ambry.store.FindToken; -import com.github.ambry.store.FindTokenFactory; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenFactory; +import com.github.ambry.replication.FindTokenHelper; +import com.github.ambry.replication.FindTokenType; import com.github.ambry.store.MessageInfo; +import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.utils.ByteBufferChannel; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; @@ -50,8 +54,27 @@ import static com.github.ambry.account.Container.*; +class MockFindTokenHelper extends FindTokenHelper { + public MockFindTokenHelper() { + super(); + } + + @Override + public FindTokenFactory getFindTokenFactoryFromReplicaType(ReplicaType replicaType) { + return new MockFindTokenFactory(); + } +} + class MockFindTokenFactory implements FindTokenFactory { + public MockFindTokenFactory(StoreKeyFactory factory) { + + } + + public MockFindTokenFactory() { + + } + @Override public FindToken getFindToken(DataInputStream stream) throws IOException { return new MockFindToken(stream); @@ -64,22 +87,30 @@ public FindToken getNewFindToken() { } class MockFindToken implements FindToken { + short version; + FindTokenType type; int index; long bytesRead; public MockFindToken(int index, long bytesRead) { + this.version = 0; + this.type = FindTokenType.IndexBased; this.index = index; this.bytesRead = bytesRead; } public MockFindToken(DataInputStream stream) throws IOException { + this.version = stream.readShort(); + this.type = FindTokenType.values()[stream.readShort()]; this.index = stream.readInt(); this.bytesRead = stream.readLong(); } @Override public byte[] toBytes() { - ByteBuffer byteBuffer = ByteBuffer.allocate(12); + ByteBuffer byteBuffer = ByteBuffer.allocate(2 * Short.BYTES + Integer.BYTES + Long.BYTES); + byteBuffer.putShort(version); + byteBuffer.putShort((short) type.ordinal()); byteBuffer.putInt(index); byteBuffer.putLong(bytesRead); return byteBuffer.array(); @@ -92,6 +123,16 @@ public int getIndex() { public long getBytesRead() { return this.bytesRead; } + + @Override + public FindTokenType getType() { + return type; + } + + @Override + public short getVersion() { + return version; + } } class InvalidVersionPutRequest extends PutRequest { @@ -394,33 +435,41 @@ public void deleteRequestResponseTest() throws IOException { @Test public void replicaMetadataRequestTest() throws IOException { - doReplicaMetadataRequestTest(ReplicaMetadataResponse.CURRENT_VERSION); - doReplicaMetadataRequestTest(ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_4); - doReplicaMetadataRequestTest(ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_5); + for (ReplicaType replicaType : ReplicaType.values()) { + doReplicaMetadataRequestTest(ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_6, + ReplicaMetadataRequest.Replica_Metadata_Request_Version_V2, replicaType); + doReplicaMetadataRequestTest(ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_4, + ReplicaMetadataRequest.Replica_Metadata_Request_Version_V1, replicaType); + doReplicaMetadataRequestTest(ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_5, + ReplicaMetadataRequest.Replica_Metadata_Request_Version_V1, replicaType); + } } - private void doReplicaMetadataRequestTest(short responseVersionToUse) throws IOException { - ReplicaMetadataResponse.CURRENT_VERSION = responseVersionToUse; + private void doReplicaMetadataRequestTest(short responseVersionToUse, short requestVersionToUse, + ReplicaType replicaType) throws IOException { MockClusterMap clusterMap = new MockClusterMap(); List replicaMetadataRequestInfoList = new ArrayList(); ReplicaMetadataRequestInfo replicaMetadataRequestInfo = - new ReplicaMetadataRequestInfo(new MockPartitionId(), new MockFindToken(0, 1000), "localhost", "path"); + new ReplicaMetadataRequestInfo(new MockPartitionId(), new MockFindToken(0, 1000), "localhost", "path", + replicaType, requestVersionToUse); replicaMetadataRequestInfoList.add(replicaMetadataRequestInfo); - ReplicaMetadataRequest request = new ReplicaMetadataRequest(1, "id", replicaMetadataRequestInfoList, 1000); + ReplicaMetadataRequest request = + new ReplicaMetadataRequest(1, "id", replicaMetadataRequestInfoList, 1000, requestVersionToUse); DataInputStream requestStream = serAndPrepForRead(request, -1, true); ReplicaMetadataRequest replicaMetadataRequestFromBytes = - ReplicaMetadataRequest.readFrom(requestStream, new MockClusterMap(), new MockFindTokenFactory()); + ReplicaMetadataRequest.readFrom(requestStream, new MockClusterMap(), new MockFindTokenHelper()); Assert.assertEquals(replicaMetadataRequestFromBytes.getMaxTotalSizeOfEntriesInBytes(), 1000); Assert.assertEquals(replicaMetadataRequestFromBytes.getReplicaMetadataRequestInfoList().size(), 1); try { - new ReplicaMetadataRequest(1, "id", null, 12); + new ReplicaMetadataRequest(1, "id", null, 12, requestVersionToUse); Assert.fail("Serializing should have failed"); } catch (IllegalArgumentException e) { // expected. Nothing to do } try { - new ReplicaMetadataRequestInfo(new MockPartitionId(), null, "localhost", "path"); + new ReplicaMetadataRequestInfo(new MockPartitionId(), null, "localhost", "path", replicaType, + requestVersionToUse); Assert.fail("Construction should have failed"); } catch (IllegalArgumentException e) { // expected. Nothing to do @@ -446,17 +495,18 @@ private void doReplicaMetadataRequestTest(short responseVersionToUse) throws IOE totalSizeOfAllMessages += msgSize; } ReplicaMetadataResponseInfo responseInfo = new ReplicaMetadataResponseInfo( - clusterMap.getWritablePartitionIds(MockClusterMap.DEFAULT_PARTITION_CLASS).get(0), new MockFindToken(0, 1000), - messageInfoList, 1000); + clusterMap.getWritablePartitionIds(MockClusterMap.DEFAULT_PARTITION_CLASS).get(0), replicaType, + new MockFindToken(0, 1000), messageInfoList, 1000, responseVersionToUse); Assert.assertEquals("Total size of messages not as expected", totalSizeOfAllMessages, responseInfo.getTotalSizeOfAllMessages()); replicaMetadataResponseInfoList.add(responseInfo); } ReplicaMetadataResponse response = - new ReplicaMetadataResponse(1234, "clientId", ServerErrorCode.No_Error, replicaMetadataResponseInfoList); + new ReplicaMetadataResponse(1234, "clientId", ServerErrorCode.No_Error, replicaMetadataResponseInfoList, + responseVersionToUse); requestStream = serAndPrepForRead(response, -1, false); ReplicaMetadataResponse deserializedReplicaMetadataResponse = - ReplicaMetadataResponse.readFrom(requestStream, new MockFindTokenFactory(), clusterMap); + ReplicaMetadataResponse.readFrom(requestStream, new MockFindTokenHelper(), clusterMap); Assert.assertEquals(deserializedReplicaMetadataResponse.getCorrelationId(), 1234); Assert.assertEquals(deserializedReplicaMetadataResponse.getError(), ServerErrorCode.No_Error); Assert.assertEquals("ReplicaMetadataResponse list size mismatch ", numResponseInfos, @@ -477,8 +527,7 @@ private void doReplicaMetadataRequestTest(short responseVersionToUse) throws IOE Assert.assertEquals("MsgInfo size mismatch ", originalMsgInfo.getSize(), msgInfo.getSize()); Assert.assertEquals("MsgInfo key mismatch ", originalMsgInfo.getStoreKey(), msgInfo.getStoreKey()); Assert.assertEquals("MsgInfo expiration value mismatch ", Utils.Infinite_Time, msgInfo.getExpirationTimeInMs()); - if (ReplicaMetadataResponse.getCurrentVersion() - >= ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_3) { + if (response.getVersionId() >= ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_3) { Assert.assertEquals("AccountId mismatch ", originalMsgInfo.getAccountId(), msgInfo.getAccountId()); Assert.assertEquals("ContainerId mismatch ", originalMsgInfo.getContainerId(), msgInfo.getContainerId()); Assert.assertEquals("OperationTime mismatch ", operationTimeMs, msgInfo.getOperationTimeMs()); @@ -496,14 +545,40 @@ private void doReplicaMetadataRequestTest(short responseVersionToUse) throws IOE response.toString().length() < maxLength); // test toString() of a ReplicaMetadataResponseInfo without any messages ReplicaMetadataResponseInfo responseInfo = new ReplicaMetadataResponseInfo( - clusterMap.getWritablePartitionIds(MockClusterMap.DEFAULT_PARTITION_CLASS).get(0), new MockFindToken(0, 1000), - Collections.emptyList(), 1000); + clusterMap.getWritablePartitionIds(MockClusterMap.DEFAULT_PARTITION_CLASS).get(0), replicaType, + new MockFindToken(0, 1000), Collections.emptyList(), 1000, responseVersionToUse); Assert.assertTrue("Length of toString() should be > 0", responseInfo.toString().length() > 0); // test toString() of a ReplicaMetadataResponse without any ReplicaMetadataResponseInfo - response = new ReplicaMetadataResponse(1234, "clientId", ServerErrorCode.No_Error, Collections.emptyList()); + response = new ReplicaMetadataResponse(1234, "clientId", ServerErrorCode.No_Error, Collections.emptyList(), + responseVersionToUse); Assert.assertTrue("Length of toString() should be > 0", response.toString().length() > 0); } + /** + * ReplicaMetadataRequestResonse compatibility test. Tests {@code ReplicaMetadataResponse#getCompatibleResponseVersion} + */ + @Test + public void getCompatibleResponseVersionTest() { + Assert.assertEquals( + "Request version Replica_Metadata_Request_Version_V1 should be compatible with REPLICA_METADATA_RESPONSE_VERSION_V_5", + ReplicaMetadataResponse.getCompatibleResponseVersion( + ReplicaMetadataRequest.Replica_Metadata_Request_Version_V1), + ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_5); + Assert.assertEquals( + "Request version Replica_Metadata_Request_Version_V6 should be compatible with REPLICA_METADATA_RESPONSE_VERSION_V_6", + ReplicaMetadataResponse.getCompatibleResponseVersion( + ReplicaMetadataRequest.Replica_Metadata_Request_Version_V2), + ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_6); + Assert.assertFalse( + "Request version Replica_Metadata_Request_Version_V1 should not be compatible with REPLICA_METADATA_RESPONSE_VERSION_V_6", + ReplicaMetadataResponse.getCompatibleResponseVersion(ReplicaMetadataRequest.Replica_Metadata_Request_Version_V1) + == ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_6); + Assert.assertFalse( + "Request version Replica_Metadata_Request_Version_V1 should not be compatible with REPLICA_METADATA_RESPONSE_VERSION_V_6", + ReplicaMetadataResponse.getCompatibleResponseVersion(ReplicaMetadataRequest.Replica_Metadata_Request_Version_V2) + == ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_5); + } + /** * Tests the ser/de of {@link AdminRequest} and {@link AdminResponse} and checks for equality of fields with * reference data. diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/DiskTokenPersistor.java b/ambry-replication/src/main/java/com.github.ambry.replication/DiskTokenPersistor.java index 6cb6102b46..9bcc9ac983 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/DiskTokenPersistor.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/DiskTokenPersistor.java @@ -14,7 +14,6 @@ package com.github.ambry.replication; import com.github.ambry.clustermap.ClusterMap; -import com.github.ambry.store.FindTokenFactory; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -42,11 +41,11 @@ public class DiskTokenPersistor extends ReplicaTokenPersistor { * @param partitionGroupedByMountPath A map between mount path and list of partitions under this mount path. * @param replicationMetrics metrics including token persist time. * @param clusterMap the {@link ClusterMap} to deserialize tokens. - * @param tokenfactory the {@link FindTokenFactory} to deserialize tokens. + * @param tokenHelper the {@link FindTokenHelper} to deserialize tokens. */ public DiskTokenPersistor(String replicaTokenFileName, Map> partitionGroupedByMountPath, - ReplicationMetrics replicationMetrics, ClusterMap clusterMap, FindTokenFactory tokenfactory) { - super(partitionGroupedByMountPath, replicationMetrics, clusterMap, tokenfactory); + ReplicationMetrics replicationMetrics, ClusterMap clusterMap, FindTokenHelper tokenHelper) { + super(partitionGroupedByMountPath, replicationMetrics, clusterMap, tokenHelper); this.replicaTokenFileName = replicaTokenFileName; } diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/RemoteReplicaInfo.java b/ambry-replication/src/main/java/com.github.ambry.replication/RemoteReplicaInfo.java index d1a2ff5e88..5122118756 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/RemoteReplicaInfo.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/RemoteReplicaInfo.java @@ -16,7 +16,6 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.network.Port; -import com.github.ambry.store.FindToken; import com.github.ambry.store.Store; import com.github.ambry.utils.Time; import java.util.Objects; diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicaThread.java b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicaThread.java index 32841e130d..832b924e12 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicaThread.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicaThread.java @@ -46,8 +46,6 @@ import com.github.ambry.protocol.ReplicaMetadataRequestInfo; import com.github.ambry.protocol.ReplicaMetadataResponse; import com.github.ambry.protocol.ReplicaMetadataResponseInfo; -import com.github.ambry.store.FindToken; -import com.github.ambry.store.FindTokenFactory; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.StoreErrorCodes; import com.github.ambry.store.StoreException; @@ -88,7 +86,7 @@ public class ReplicaThread implements Runnable { private final Set allReplicatedPartitions = new HashSet<>(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); private volatile boolean running; - private final FindTokenFactory findTokenFactory; + private final FindTokenHelper findTokenHelper; private final ClusterMap clusterMap; private final AtomicInteger correlationIdGenerator; private final DataNodeId dataNodeId; @@ -115,14 +113,14 @@ public class ReplicaThread implements Runnable { private volatile boolean allDisabled = false; - public ReplicaThread(String threadName, FindTokenFactory findTokenFactory, ClusterMap clusterMap, + public ReplicaThread(String threadName, FindTokenHelper findTokenHelper, ClusterMap clusterMap, AtomicInteger correlationIdGenerator, DataNodeId dataNodeId, ConnectionPool connectionPool, ReplicationConfig replicationConfig, ReplicationMetrics replicationMetrics, NotificationSystem notification, StoreKeyConverter storeKeyConverter, Transformer transformer, MetricRegistry metricRegistry, boolean replicatingOverSsl, String datacenterName, ResponseHandler responseHandler, Time time) { this.threadName = threadName; this.running = true; - this.findTokenFactory = findTokenFactory; + this.findTokenHelper = findTokenHelper; this.clusterMap = clusterMap; this.correlationIdGenerator = correlationIdGenerator; this.dataNodeId = dataNodeId; @@ -523,7 +521,8 @@ private ReplicaMetadataResponse getReplicaMetadataResponse(List> partitionGroupedByMountPath, - ReplicationMetrics replicationMetrics, ClusterMap clusterMap, FindTokenFactory tokenfactory) { + ReplicationMetrics replicationMetrics, ClusterMap clusterMap, FindTokenHelper findTokenHelper) { this.partitionGroupedByMountPath = partitionGroupedByMountPath; this.replicationMetrics = replicationMetrics; - this.replicaTokenSerde = new ReplicaTokenSerde(clusterMap, tokenfactory); + this.replicaTokenSerde = new ReplicaTokenSerde(clusterMap, findTokenHelper); } /** @@ -130,13 +129,13 @@ public void run() { public static class ReplicaTokenSerde { private static final short Crc_Size = 8; private final ClusterMap clusterMap; - private final FindTokenFactory tokenfactory; + private final FindTokenHelper tokenHelper; private final short version = 0; // Map - public ReplicaTokenSerde(ClusterMap clusterMap, FindTokenFactory tokenfactory) { + public ReplicaTokenSerde(ClusterMap clusterMap, FindTokenHelper tokenHelper) { this.clusterMap = clusterMap; - this.tokenfactory = tokenfactory; + this.tokenHelper = tokenHelper; } public void serializeTokens(List tokenInfoList, OutputStream outputStream) throws IOException { @@ -155,7 +154,11 @@ public void serializeTokens(List tokenInfoList, OutputStream o writer.write(replicaTokenInfo.getReplicaPath().getBytes()); // Write port writer.writeInt(replicaTokenInfo.getPort()); + // Write total bytes read from local store writer.writeLong(replicaTokenInfo.getTotalBytesReadFromLocalStore()); + // Write replica type + writer.writeShort((short) replicaTokenInfo.getReplicaInfo().getReplicaId().getReplicaType().ordinal()); + // Write replica token writer.write(replicaTokenInfo.getReplicaToken().toBytes()); } long crcValue = crcOutputStream.getValue(); @@ -191,8 +194,12 @@ public List deserializeTokens(InputStream inputStream) throws int port = stream.readInt(); // read total bytes read from local store long totalBytesReadFromLocalStore = stream.readLong(); + //read replica type + ReplicaType replicaType = ReplicaType.values()[stream.readShort()]; // read replica token - FindToken token = tokenfactory.getFindToken(stream); + FindTokenFactory findTokenFactory = tokenHelper.getFindTokenFactoryFromReplicaType(replicaType); + FindToken token = findTokenFactory.getFindToken(stream); + tokenInfoList.add( new ReplicaTokenInfo(partitionId, hostname, replicaPath, port, totalBytesReadFromLocalStore, token)); } diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationEngine.java b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationEngine.java index bf5eb69e8c..330d482e1d 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationEngine.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationEngine.java @@ -24,8 +24,6 @@ import com.github.ambry.network.ConnectionPool; import com.github.ambry.notification.NotificationSystem; import com.github.ambry.protocol.GetRequest; -import com.github.ambry.store.FindToken; -import com.github.ambry.store.FindTokenFactory; import com.github.ambry.store.StoreKeyConverter; import com.github.ambry.store.StoreKeyConverterFactory; import com.github.ambry.store.StoreKeyFactory; @@ -71,7 +69,7 @@ public abstract class ReplicationEngine { protected final DataNodeId dataNodeId; protected final MetricRegistry metricRegistry; protected final ReplicationMetrics replicationMetrics; - protected final FindTokenFactory factory; + protected final FindTokenHelper tokenHelper; protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Map partitionToPartitionInfo; protected final Map> mountPathToPartitionInfos; @@ -88,10 +86,10 @@ public ReplicationEngine(ReplicationConfig replicationConfig, ClusterMapConfig c this.replicationConfig = replicationConfig; this.storeKeyFactory = storeKeyFactory; try { - this.factory = Utils.getObj(replicationConfig.replicationTokenFactory, storeKeyFactory); - } catch (ReflectiveOperationException e) { - logger.error("Error on getting replicationTokenFactory", e); - throw new ReplicationException("Error on getting replicationTokenFactory"); + this.tokenHelper = new FindTokenHelper(this.storeKeyFactory, this.replicationConfig); + } catch (ReflectiveOperationException roe) { + logger.error("Error on getting ReplicaTokenHelper", roe); + throw new ReplicationException("Error on getting ReplicaTokenHelper"); } this.replicaThreadPoolByDc = new ConcurrentHashMap<>(); this.replicationMetrics = new ReplicationMetrics(metricRegistry, replicaIds); @@ -306,8 +304,8 @@ private List createThreadPool(String datacenter, int numberOfThre Transformer threadSpecificTransformer = Utils.getObj(transformerClassName, storeKeyFactory, threadSpecificKeyConverter); ReplicaThread replicaThread = - new ReplicaThread(threadIdentity, factory, clusterMap, correlationIdGenerator, dataNodeId, connectionPool, - replicationConfig, replicationMetrics, notification, threadSpecificKeyConverter, + new ReplicaThread(threadIdentity, tokenHelper, clusterMap, correlationIdGenerator, dataNodeId, + connectionPool, replicationConfig, replicationMetrics, notification, threadSpecificKeyConverter, threadSpecificTransformer, metricRegistry, replicatingOverSsl, datacenter, responseHandler, SystemTime.getInstance()); replicaThreads.add(replicaThread); diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationManager.java b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationManager.java index 99e0e04bc2..9b03df7b20 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationManager.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationManager.java @@ -62,10 +62,11 @@ public ReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig // We need to ensure that a replica token gets persisted only after the corresponding data in the // store gets flushed to disk. We use the store flush interval multiplied by a constant factor // to determine the token flush interval - RemoteReplicaInfo remoteReplicaInfo = - new RemoteReplicaInfo(remoteReplica, replicaId, store, factory.getNewFindToken(), - storeConfig.storeDataFlushIntervalSeconds * SystemTime.MsPerSec * Replication_Delay_Multiplier, - SystemTime.getInstance(), remoteReplica.getDataNodeId().getPortToConnectTo()); + FindToken findToken = + this.tokenHelper.getFindTokenFactoryFromReplicaType(remoteReplica.getReplicaType()).getNewFindToken(); + RemoteReplicaInfo remoteReplicaInfo = new RemoteReplicaInfo(remoteReplica, replicaId, store, findToken, + storeConfig.storeDataFlushIntervalSeconds * SystemTime.MsPerSec * Replication_Delay_Multiplier, + SystemTime.getInstance(), remoteReplica.getDataNodeId().getPortToConnectTo()); replicationMetrics.addMetricsForRemoteReplicaInfo(remoteReplicaInfo); remoteReplicas.add(remoteReplicaInfo); } @@ -83,7 +84,7 @@ public ReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig } } persistor = new DiskTokenPersistor(replicaTokenFileName, mountPathToPartitionInfos, replicationMetrics, clusterMap, - factory); + tokenHelper); } @Override diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java index baede1a75c..b19c65ec1b 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java @@ -15,7 +15,6 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.store.FindInfo; -import com.github.ambry.store.FindToken; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageReadSet; import com.github.ambry.store.MessageWriteSet; diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java b/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java index 1ff7ebe3db..a548a6f166 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java @@ -225,11 +225,13 @@ public ChannelOutput receive() throws IOException { long bytesRead = allMessageInfos.subList(0, indexRequested + 1).stream().mapToLong(i -> i.getSize()).sum(); long total = allMessageInfos.stream().mapToLong(i -> i.getSize()).sum(); ReplicaMetadataResponseInfo replicaMetadataResponseInfo = - new ReplicaMetadataResponseInfo(requestInfo.getPartitionId(), - new MockFindToken(indexRequested, bytesRead), messageInfosToReturn, total - bytesRead); + new ReplicaMetadataResponseInfo(requestInfo.getPartitionId(), requestInfo.getReplicaType(), + new MockFindToken(indexRequested, bytesRead), messageInfosToReturn, total - bytesRead, + ReplicaMetadataResponse.getCompatibleResponseVersion(metadataRequest.getVersionId())); responseInfoList.add(replicaMetadataResponseInfo); } - response = new ReplicaMetadataResponse(1, "replicametadata", ServerErrorCode.No_Error, responseInfoList); + response = new ReplicaMetadataResponse(1, "replicametadata", ServerErrorCode.No_Error, responseInfoList, + ReplicaMetadataResponse.getCompatibleResponseVersion(metadataRequest.getVersionId())); metadataRequest = null; } else { List responseInfoList = new ArrayList<>(); diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/MockFindToken.java b/ambry-replication/src/test/java/com.github.ambry.replication/MockFindToken.java index 6ad5b7c7a5..bf8a958abe 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/MockFindToken.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/MockFindToken.java @@ -13,30 +13,36 @@ */ package com.github.ambry.replication; -import com.github.ambry.store.FindToken; -import com.github.ambry.store.FindTokenFactory; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; public class MockFindToken implements FindToken { + short version; + FindTokenType type; int index; long bytesRead; public MockFindToken(int index, long bytesRead) { + this.version = 0; + this.type = FindTokenType.IndexBased; this.index = index; this.bytesRead = bytesRead; } public MockFindToken(DataInputStream stream) throws IOException { + this.version = stream.readShort(); + this.type = FindTokenType.values()[stream.readShort()]; this.index = stream.readInt(); this.bytesRead = stream.readLong(); } @Override public byte[] toBytes() { - ByteBuffer byteBuffer = ByteBuffer.allocate(12); + ByteBuffer byteBuffer = ByteBuffer.allocate(Short.BYTES * 2 + Integer.BYTES + Long.BYTES); + byteBuffer.putShort(version); + byteBuffer.putShort((short) type.ordinal()); byteBuffer.putInt(index); byteBuffer.putLong(bytesRead); return byteBuffer.array(); @@ -54,6 +60,16 @@ public boolean equals(Object o) { return index == that.index && bytesRead == that.bytesRead; } + @Override + public FindTokenType getType() { + return type; + } + + @Override + public short getVersion() { + return version; + } + public int getIndex() { return index; } diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/MockFindTokenHelper.java b/ambry-replication/src/test/java/com.github.ambry.replication/MockFindTokenHelper.java new file mode 100644 index 0000000000..bb388adcff --- /dev/null +++ b/ambry-replication/src/test/java/com.github.ambry.replication/MockFindTokenHelper.java @@ -0,0 +1,31 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.replication; + +import com.github.ambry.clustermap.ReplicaType; +import com.github.ambry.config.ReplicationConfig; +import com.github.ambry.store.StoreKeyFactory; + + +public class MockFindTokenHelper extends FindTokenHelper { + + public MockFindTokenHelper(StoreKeyFactory storeKeyFactory, ReplicationConfig replicationConfig) + throws ReflectiveOperationException { + } + + @Override + public FindTokenFactory getFindTokenFactoryFromReplicaType(ReplicaType replicaType) { + return new MockFindToken.MockFindTokenFactory(); + } +} \ No newline at end of file diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java index f74b69863d..a643a2bae9 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java @@ -36,6 +36,8 @@ import com.github.ambry.network.ConnectedChannel; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; +import com.github.ambry.protocol.ReplicaMetadataRequest; +import com.github.ambry.protocol.ReplicaMetadataResponse; import com.github.ambry.store.Message; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MockStoreKeyConverterFactory; @@ -73,13 +75,18 @@ import java.util.stream.Collectors; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.junit.Assert.*; /** - * Tests for ReplicaThread + * Tests for ReplicaThread for both pairs of compatible ReplicaMetadataRequest and ReplicaMetadataResponse + * {@code ReplicaMetadataRequest#Replica_Metadata_Request_Version_V1}, {@code ReplicaMetadataResponse#REPLICA_METADATA_RESPONSE_VERSION_V_5} + * {@code ReplicaMetadataRequest#Replica_Metadata_Request_Version_V2}, {@code ReplicaMetadataResponse#REPLICA_METADATA_RESPONSE_VERSION_V_6} */ +@RunWith(Parameterized.class) public class ReplicationTest { private static int CONSTANT_TIME_MS = 100000; @@ -90,11 +97,25 @@ public class ReplicationTest { private final MockTime time = new MockTime(); private ReplicationConfig config; + /** + * Running for the two sets of compatible ReplicaMetadataRequest and ReplicaMetadataResponse, + * viz {{@code ReplicaMetadataRequest#Replica_Metadata_Request_Version_V1}, {@code ReplicaMetadataResponse#REPLICA_METADATA_RESPONSE_VERSION_V_5}} + * & {{@code ReplicaMetadataRequest#Replica_Metadata_Request_Version_V2}, {@code ReplicaMetadataResponse#REPLICA_METADATA_RESPONSE_VERSION_V_6}} + * @return an array with both pairs of compatible request and response. + */ + @Parameterized.Parameters + public static List data() { + return Arrays.asList(new Object[][]{{ReplicaMetadataRequest.Replica_Metadata_Request_Version_V1, + ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_5}, {ReplicaMetadataRequest.Replica_Metadata_Request_Version_V2, ReplicaMetadataResponse.REPLICA_METADATA_RESPONSE_VERSION_V_6}}); + } + /** * Constructor to set the configs */ - public ReplicationTest() { + public ReplicationTest(short requestVersion, short responseVersion) { Properties properties = new Properties(); + properties.setProperty("replication.metadatarequest.version", Short.toString(requestVersion)); + properties.setProperty("replication.metadataresponse.version", Short.toString(responseVersion)); properties.setProperty("replication.synced.replica.backoff.duration.ms", "3000"); properties.setProperty("replication.intra.replica.thread.throttle.sleep.duration.ms", "100"); properties.setProperty("replication.inter.replica.thread.throttle.sleep.duration.ms", "200"); @@ -127,8 +148,8 @@ public void remoteReplicaInfoAddRemoveTest() throws Exception { hosts.put(remoteHost.dataNodeId, remoteHost); MockConnectionPool connectionPool = new MockConnectionPool(hosts, clusterMap, 4); ReplicaThread replicaThread = - new ReplicaThread("threadtest", new MockFindToken.MockFindTokenFactory(), clusterMap, new AtomicInteger(0), - localHost.dataNodeId, connectionPool, config, replicationMetrics, null, + new ReplicaThread("threadtest", new MockFindTokenHelper(storeKeyFactory, config), clusterMap, + new AtomicInteger(0), localHost.dataNodeId, connectionPool, config, replicationMetrics, null, mockStoreKeyConverterFactory.getStoreKeyConverter(), transformer, clusterMap.getMetricRegistry(), false, localHost.dataNodeId.getDatacenterName(), new ResponseHandler(clusterMap), time); for (RemoteReplicaInfo remoteReplicaInfo : remoteReplicaInfoList) { @@ -1327,22 +1348,22 @@ public void replicaTokenTest() throws InterruptedException { */ private Pair>, ReplicaThread> getRemoteReplicasAndReplicaThread(int batchSize, ClusterMap clusterMap, MockHost localHost, MockHost remoteHost, StoreKeyConverter storeKeyConverter, - Transformer transformer, StoreEventListener listener) { + Transformer transformer, StoreEventListener listener) throws ReflectiveOperationException { ReplicationMetrics replicationMetrics = new ReplicationMetrics(new MetricRegistry(), clusterMap.getReplicaIds(localHost.dataNodeId)); replicationMetrics.populateSingleColoMetrics(remoteHost.dataNodeId.getDatacenterName()); List remoteReplicaInfoList = localHost.getRemoteReplicaInfos(remoteHost, listener); Map> replicasToReplicate = - Collections.singletonMap(remoteHost.dataNodeId, remoteReplicaInfoList); + StoreKeyFactory storeKeyFactory = Utils.getObj("com.github.ambry.commons.BlobIdFactory", clusterMap); Map hosts = new HashMap<>(); hosts.put(remoteHost.dataNodeId, remoteHost); MockConnectionPool connectionPool = new MockConnectionPool(hosts, clusterMap, batchSize); ReplicaThread replicaThread = - new ReplicaThread("threadtest", new MockFindToken.MockFindTokenFactory(), clusterMap, new AtomicInteger(0), - localHost.dataNodeId, connectionPool, config, replicationMetrics, null, storeKeyConverter, transformer, - clusterMap.getMetricRegistry(), false, localHost.dataNodeId.getDatacenterName(), - new ResponseHandler(clusterMap), time); + new ReplicaThread("threadtest", new MockFindTokenHelper(storeKeyFactory, config), clusterMap, + new AtomicInteger(0), localHost.dataNodeId, connectionPool, config, replicationMetrics, null, + storeKeyConverter, transformer, clusterMap.getMetricRegistry(), false, + localHost.dataNodeId.getDatacenterName(), new ResponseHandler(clusterMap), time); for (RemoteReplicaInfo remoteReplicaInfo : remoteReplicaInfoList) { replicaThread.addRemoteReplicaInfo(remoteReplicaInfo); } diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java index d3ec265775..f0c3319921 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java @@ -37,7 +37,7 @@ import com.github.ambry.protocol.PartitionRequestInfo; import com.github.ambry.protocol.PutRequest; import com.github.ambry.protocol.PutResponse; -import com.github.ambry.store.FindTokenFactory; +import com.github.ambry.replication.FindTokenFactory; import com.github.ambry.store.HardDeleter; import com.github.ambry.store.Offset; import com.github.ambry.store.StoreFindToken; diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java index 1948ec87fb..e05713d30f 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java @@ -34,6 +34,7 @@ import com.github.ambry.clustermap.MockReplicaId; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.clustermap.ReplicaType; import com.github.ambry.commons.BlobId; import com.github.ambry.commons.BlobIdFactory; import com.github.ambry.commons.ByteBufferReadableStreamChannel; @@ -75,6 +76,7 @@ import com.github.ambry.protocol.PutResponse; import com.github.ambry.protocol.TtlUpdateRequest; import com.github.ambry.protocol.TtlUpdateResponse; +import com.github.ambry.replication.FindTokenFactory; import com.github.ambry.router.Callback; import com.github.ambry.router.GetBlobOptionsBuilder; import com.github.ambry.router.GetBlobResult; @@ -82,7 +84,6 @@ import com.github.ambry.router.PutBlobOptionsBuilder; import com.github.ambry.router.ReadableStreamChannel; import com.github.ambry.router.Router; -import com.github.ambry.store.FindTokenFactory; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.Offset; import com.github.ambry.store.StoreFindToken; @@ -2019,6 +2020,8 @@ private static void checkReplicaTokens(MockClusterMap clusterMap, DataNodeId dat setToCheck.remove(partitionId.toString() + hostname + port); // read total bytes read from local store dataInputStream.readLong(); + // read replica type + ReplicaType replicaType = ReplicaType.values()[dataInputStream.readShort()]; // read replica token StoreFindToken token = (StoreFindToken) factory.getFindToken(dataInputStream); System.out.println( diff --git a/ambry-server/src/main/java/com.github.ambry.server/AmbryRequests.java b/ambry-server/src/main/java/com.github.ambry.server/AmbryRequests.java index 583ddfc284..1ade86dcd1 100644 --- a/ambry-server/src/main/java/com.github.ambry.server/AmbryRequests.java +++ b/ambry-server/src/main/java/com.github.ambry.server/AmbryRequests.java @@ -21,6 +21,7 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.PartitionState; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.clustermap.ReplicaType; import com.github.ambry.commons.BlobId; import com.github.ambry.commons.ServerErrorCode; import com.github.ambry.messageformat.DeleteMessageFormatInputStream; @@ -64,10 +65,10 @@ import com.github.ambry.protocol.RequestOrResponseType; import com.github.ambry.protocol.TtlUpdateRequest; import com.github.ambry.protocol.TtlUpdateResponse; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.replication.ReplicationManager; import com.github.ambry.store.FindInfo; -import com.github.ambry.store.FindToken; -import com.github.ambry.store.FindTokenFactory; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.StorageManager; import com.github.ambry.store.Store; @@ -113,7 +114,7 @@ public class AmbryRequests implements RequestAPI { private final Map localPartitionToReplicaMap; private final ServerMetrics metrics; private final MessageFormatMetrics messageFormatMetrics; - private final FindTokenFactory findTokenFactory; + private final FindTokenHelper findTokenHelper; private final NotificationSystem notification; private final ReplicationManager replicationManager; private final StoreKeyFactory storeKeyFactory; @@ -123,7 +124,7 @@ public class AmbryRequests implements RequestAPI { private final StoreKeyConverterFactory storeKeyConverterFactory; public AmbryRequests(StorageManager storageManager, RequestResponseChannel requestResponseChannel, - ClusterMap clusterMap, DataNodeId nodeId, MetricRegistry registry, FindTokenFactory findTokenFactory, + ClusterMap clusterMap, DataNodeId nodeId, MetricRegistry registry, FindTokenHelper findTokenHelper, NotificationSystem operationNotification, ReplicationManager replicationManager, StoreKeyFactory storeKeyFactory, boolean enableDataPrefetch, StoreKeyConverterFactory storeKeyConverterFactory) { this.storageManager = storageManager; @@ -132,7 +133,7 @@ public AmbryRequests(StorageManager storageManager, RequestResponseChannel reque this.currentNode = nodeId; this.metrics = new ServerMetrics(registry); this.messageFormatMetrics = new MessageFormatMetrics(registry); - this.findTokenFactory = findTokenFactory; + this.findTokenHelper = findTokenHelper; this.notification = operationNotification; this.replicationManager = replicationManager; this.storeKeyFactory = storeKeyFactory; @@ -569,7 +570,7 @@ public void handleTtlUpdateRequest(Request request) throws IOException, Interrup public void handleReplicaMetadataRequest(Request request) throws IOException, InterruptedException { ReplicaMetadataRequest replicaMetadataRequest = - ReplicaMetadataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap, findTokenFactory); + ReplicaMetadataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap, findTokenHelper); long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs(); long totalTimeSpent = requestQueueTime; metrics.replicaMetadataRequestQueueTimeInMs.update(requestQueueTime); @@ -586,13 +587,16 @@ public void handleReplicaMetadataRequest(Request request) throws IOException, In for (ReplicaMetadataRequestInfo replicaMetadataRequestInfo : replicaMetadataRequestInfoList) { long partitionStartTimeInMs = SystemTime.getInstance().milliseconds(); PartitionId partitionId = replicaMetadataRequestInfo.getPartitionId(); + ReplicaType replicaType = replicaMetadataRequestInfo.getReplicaType(); ServerErrorCode error = validateRequest(partitionId, RequestOrResponseType.ReplicaMetadataRequest, false); logger.trace("{} Time used to validate metadata request: {}", partitionId, (SystemTime.getInstance().milliseconds() - partitionStartTimeInMs)); if (error != ServerErrorCode.No_Error) { logger.error("Validating replica metadata request failed with error {} for partition {}", error, partitionId); - ReplicaMetadataResponseInfo replicaMetadataResponseInfo = new ReplicaMetadataResponseInfo(partitionId, error); + ReplicaMetadataResponseInfo replicaMetadataResponseInfo = + new ReplicaMetadataResponseInfo(partitionId, replicaType, error, + ReplicaMetadataResponse.getCompatibleResponseVersion(replicaMetadataRequest.getVersionId())); replicaMetadataResponseList.add(replicaMetadataResponseInfo); } else { try { @@ -618,8 +622,9 @@ public void handleReplicaMetadataRequest(Request request) throws IOException, In (SystemTime.getInstance().milliseconds() - partitionStartTimeInMs)); ReplicaMetadataResponseInfo replicaMetadataResponseInfo = - new ReplicaMetadataResponseInfo(partitionId, findInfo.getFindToken(), findInfo.getMessageEntries(), - store.getSizeInBytes() - totalBytesRead); + new ReplicaMetadataResponseInfo(partitionId, replicaType, findInfo.getFindToken(), + findInfo.getMessageEntries(), store.getSizeInBytes() - totalBytesRead, + ReplicaMetadataResponse.getCompatibleResponseVersion(replicaMetadataRequest.getVersionId())); if (replicaMetadataResponseInfo.getTotalSizeOfAllMessages() > 5 * replicaMetadataRequest.getMaxTotalSizeOfEntriesInBytes()) { logger.debug("{} generated a metadata response {} where the cumulative size of messages is {}", @@ -639,19 +644,23 @@ public void handleReplicaMetadataRequest(Request request) throws IOException, In metrics.unExpectedStoreFindEntriesError.inc(); } ReplicaMetadataResponseInfo replicaMetadataResponseInfo = - new ReplicaMetadataResponseInfo(partitionId, ErrorMapping.getStoreErrorMapping(e.getErrorCode())); + new ReplicaMetadataResponseInfo(partitionId, replicaType, + ErrorMapping.getStoreErrorMapping(e.getErrorCode()), + ReplicaMetadataResponse.getCompatibleResponseVersion(replicaMetadataRequest.getVersionId())); replicaMetadataResponseList.add(replicaMetadataResponseInfo); } } } response = new ReplicaMetadataResponse(replicaMetadataRequest.getCorrelationId(), replicaMetadataRequest.getClientId(), - ServerErrorCode.No_Error, replicaMetadataResponseList); + ServerErrorCode.No_Error, replicaMetadataResponseList, + ReplicaMetadataResponse.getCompatibleResponseVersion(replicaMetadataRequest.getVersionId())); } catch (Exception e) { logger.error("Unknown exception for request " + replicaMetadataRequest, e); response = new ReplicaMetadataResponse(replicaMetadataRequest.getCorrelationId(), replicaMetadataRequest.getClientId(), - ServerErrorCode.Unknown_Error); + ServerErrorCode.Unknown_Error, + ReplicaMetadataResponse.getCompatibleResponseVersion(replicaMetadataRequest.getVersionId())); } finally { long processingTime = SystemTime.getInstance().milliseconds() - startTimeInMs; totalTimeSpent += processingTime; diff --git a/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java b/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java index 4df66b78d4..d031b21174 100644 --- a/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java +++ b/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java @@ -40,8 +40,8 @@ import com.github.ambry.network.PortType; import com.github.ambry.network.SocketServer; import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.replication.ReplicationManager; -import com.github.ambry.store.FindTokenFactory; import com.github.ambry.store.StorageManager; import com.github.ambry.store.StoreKeyConverterFactory; import com.github.ambry.store.StoreKeyFactory; @@ -135,7 +135,6 @@ public void startup() throws InstantiationException { } StoreKeyFactory storeKeyFactory = Utils.getObj(storeConfig.storeKeyFactory, clusterMap); - FindTokenFactory findTokenFactory = Utils.getObj(replicationConfig.replicationTokenFactory, storeKeyFactory); storageManager = new StorageManager(storeConfig, diskManagerConfig, scheduler, registry, clusterMap.getReplicaIds(nodeId), storeKeyFactory, new BlobStoreRecovery(), new BlobStoreHardDelete(), @@ -161,9 +160,10 @@ storeKeyFactory, new BlobStoreRecovery(), new BlobStoreHardDelete(), } networkServer = new SocketServer(networkConfig, sslConfig, registry, ports); + FindTokenHelper findTokenHelper = new FindTokenHelper(storeKeyFactory, replicationConfig); requests = new AmbryRequests(storageManager, networkServer.getRequestResponseChannel(), clusterMap, nodeId, registry, - findTokenFactory, notificationSystem, replicationManager, storeKeyFactory, + findTokenHelper, notificationSystem, replicationManager, storeKeyFactory, serverConfig.serverEnableStoreDataPrefetch, storeKeyConverterFactory); requestHandlerPool = new RequestHandlerPool(serverConfig.serverRequestHandlerNumOfThreads, networkServer.getRequestResponseChannel(), requests); diff --git a/ambry-server/src/test/java/com.github.ambry.server/AmbryRequestsTest.java b/ambry-server/src/test/java/com.github.ambry.server/AmbryRequestsTest.java index fdf98f8ce1..6ac24c1ad9 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/AmbryRequestsTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/AmbryRequestsTest.java @@ -23,6 +23,7 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaEventType; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.clustermap.ReplicaType; import com.github.ambry.commons.BlobId; import com.github.ambry.commons.CommonTestUtils; import com.github.ambry.commons.ServerErrorCode; @@ -64,12 +65,12 @@ import com.github.ambry.protocol.Response; import com.github.ambry.protocol.TtlUpdateRequest; import com.github.ambry.replication.BlobIdTransformer; -import com.github.ambry.replication.MockFindToken; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenHelper; +import com.github.ambry.replication.MockFindTokenHelper; import com.github.ambry.replication.ReplicationException; import com.github.ambry.replication.ReplicationManager; import com.github.ambry.store.FindInfo; -import com.github.ambry.store.FindToken; -import com.github.ambry.store.FindTokenFactory; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageInfoTest; import com.github.ambry.store.MessageReadSet; @@ -119,8 +120,8 @@ * Tests for {@link AmbryRequests}. */ public class AmbryRequestsTest { - private static final FindTokenFactory FIND_TOKEN_FACTORY = new MockFindToken.MockFindTokenFactory(); + private final FindTokenHelper findTokenHelper; private final MockClusterMap clusterMap; private final DataNodeId dataNodeId; private final MockStorageManager storageManager; @@ -130,8 +131,10 @@ public class AmbryRequestsTest { private final Set validKeysInStore = new HashSet<>(); private final Map conversionMap = new HashMap<>(); private final MockStoreKeyConverterFactory storeKeyConverterFactory; + private final ReplicationConfig replicationConfig; - public AmbryRequestsTest() throws IOException, ReplicationException, StoreException, InterruptedException { + public AmbryRequestsTest() + throws IOException, ReplicationException, StoreException, InterruptedException, ReflectiveOperationException { clusterMap = new MockClusterMap(); Properties properties = new Properties(); properties.setProperty("clustermap.cluster.name", "test"); @@ -141,15 +144,18 @@ public AmbryRequestsTest() throws IOException, ReplicationException, StoreExcept properties.setProperty("replication.no.of.intra.dc.replica.threads", "1"); properties.setProperty("replication.no.of.inter.dc.replica.threads", "1"); VerifiableProperties verifiableProperties = new VerifiableProperties(properties); + replicationConfig = new ReplicationConfig(verifiableProperties); dataNodeId = clusterMap.getDataNodeIds().get(0); - storageManager = new MockStorageManager(validKeysInStore, clusterMap.getReplicaIds(dataNodeId)); + StoreKeyFactory storeKeyFactory = Utils.getObj("com.github.ambry.commons.BlobIdFactory", clusterMap); + findTokenHelper = new MockFindTokenHelper(storeKeyFactory, replicationConfig); + storageManager = new MockStorageManager(validKeysInStore, clusterMap.getReplicaIds(dataNodeId), findTokenHelper); storeKeyConverterFactory = new MockStoreKeyConverterFactory(null, null); storeKeyConverterFactory.setConversionMap(conversionMap); replicationManager = MockReplicationManager.getReplicationManager(verifiableProperties, storageManager, clusterMap, dataNodeId, storeKeyConverterFactory); ambryRequests = new AmbryRequests(storageManager, requestResponseChannel, clusterMap, dataNodeId, - clusterMap.getMetricRegistry(), FIND_TOKEN_FACTORY, null, replicationManager, null, false, + clusterMap.getMetricRegistry(), findTokenHelper, null, replicationManager, null, false, storeKeyConverterFactory); storageManager.start(); } @@ -873,10 +879,11 @@ private void sendAndVerifyOperationRequest(RequestOrResponseType requestType, Li GetOption.Include_All); break; case ReplicaMetadataRequest: - ReplicaMetadataRequestInfo rRequestInfo = - new ReplicaMetadataRequestInfo(id, FIND_TOKEN_FACTORY.getNewFindToken(), "localhost", "/tmp"); + ReplicaMetadataRequestInfo rRequestInfo = new ReplicaMetadataRequestInfo(id, + findTokenHelper.getFindTokenFactoryFromReplicaType(ReplicaType.DISK_BACKED).getNewFindToken(), + "localhost", "/tmp", ReplicaType.DISK_BACKED, replicationConfig.replicaMetadataRequestVersion); request = new ReplicaMetadataRequest(correlationId, clientId, Collections.singletonList(rRequestInfo), - Long.MAX_VALUE); + Long.MAX_VALUE, replicationConfig.replicaMetadataRequestVersion); break; case TtlUpdateRequest: request = new TtlUpdateRequest(correlationId, clientId, originalBlobId, Utils.Infinite_Time, @@ -1319,7 +1326,8 @@ public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) th tokenReceived = token; maxTotalSizeOfEntriesReceived = maxTotalSizeOfEntries; throwExceptionIfRequired(); - return new FindInfo(Collections.emptyList(), FIND_TOKEN_FACTORY.getNewFindToken()); + return new FindInfo(Collections.emptyList(), + findTokenHelper.getFindTokenFactoryFromReplicaType(ReplicaType.DISK_BACKED).getNewFindToken()); } @Override @@ -1446,10 +1454,14 @@ private void checkValidityOfIds(Collection ids) throws Store private final Set validKeysInStore; - MockStorageManager(Set validKeysInStore, List replicas) throws StoreException { + private final FindTokenHelper findTokenHelper; + + MockStorageManager(Set validKeysInStore, List replicas, + FindTokenHelper findTokenHelper) throws StoreException { super(new StoreConfig(VPROPS), new DiskManagerConfig(VPROPS), Utils.newScheduler(1, true), new MetricRegistry(), replicas, null, null, null, null, new MockTime()); this.validKeysInStore = validKeysInStore; + this.findTokenHelper = findTokenHelper; } @Override diff --git a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java index 519de212b9..298a0bb174 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java @@ -27,8 +27,8 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; +import com.github.ambry.replication.FindToken; import com.github.ambry.store.FindInfo; -import com.github.ambry.store.FindToken; import com.github.ambry.store.MessageWriteSet; import com.github.ambry.store.StorageManager; import com.github.ambry.store.Store; diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java index a3d5208702..5bf3ec83e1 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java +++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java @@ -17,6 +17,7 @@ import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.clustermap.ReplicaStatusDelegate; import com.github.ambry.config.StoreConfig; +import com.github.ambry.replication.FindToken; import com.github.ambry.utils.FileLock; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; diff --git a/ambry-store/src/main/java/com.github.ambry.store/HardDeleter.java b/ambry-store/src/main/java/com.github.ambry.store/HardDeleter.java index 4b54ac490d..dac0da4fbf 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/HardDeleter.java +++ b/ambry-store/src/main/java/com.github.ambry.store/HardDeleter.java @@ -15,6 +15,8 @@ import com.codahale.metrics.Timer; import com.github.ambry.config.StoreConfig; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenType; import com.github.ambry.utils.CrcInputStream; import com.github.ambry.utils.CrcOutputStream; import com.github.ambry.utils.Time; @@ -258,7 +260,7 @@ void performRecovery() throws StoreException { */ void pruneHardDeleteRecoveryRange() { StoreFindToken logFlushedTillToken = (StoreFindToken) startTokenSafeToPersist; - if (logFlushedTillToken != null && !logFlushedTillToken.getType().equals(StoreFindToken.Type.Uninitialized)) { + if (logFlushedTillToken != null && !logFlushedTillToken.getType().equals(FindTokenType.Uninitialized)) { if (logFlushedTillToken.equals(endToken)) { hardDeleteRecoveryRange.clear(); } else if (logFlushedTillToken.getStoreKey() != null) { @@ -359,7 +361,7 @@ void postLogFlush() { */ long getProgress() { StoreFindToken token = (StoreFindToken) startToken; - return token.getType().equals(StoreFindToken.Type.Uninitialized) ? 0 + return token.getType().equals(FindTokenType.Uninitialized) ? 0 : index.getAbsolutePositionInLogForOffset(token.getOffset()); } diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java index 880cc27f6e..f17179f258 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java +++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java @@ -15,6 +15,8 @@ import com.codahale.metrics.Timer; import com.github.ambry.config.StoreConfig; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenType; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; import java.io.File; @@ -875,10 +877,10 @@ FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws St storeToken = revalidateStoreFindToken(storeToken, indexSegments); List messageEntries = new ArrayList(); - if (!storeToken.getType().equals(StoreFindToken.Type.IndexBased)) { + if (!storeToken.getType().equals(FindTokenType.IndexBased)) { startTimeInMs = time.milliseconds(); Offset offsetToStart = storeToken.getOffset(); - if (storeToken.getType().equals(StoreFindToken.Type.Uninitialized)) { + if (storeToken.getType().equals(FindTokenType.Uninitialized)) { offsetToStart = getStartOffset(); } logger.trace("Index : " + dataDir + " getting entries since " + offsetToStart); @@ -929,7 +931,7 @@ FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws St return new FindInfo(messageEntries, storeFindToken); } else { storeToken = revalidateStoreFindToken(storeToken, indexSegments); - offsetToStart = storeToken.getType().equals(StoreFindToken.Type.Uninitialized) ? getStartOffset(indexSegments) + offsetToStart = storeToken.getType().equals(FindTokenType.Uninitialized) ? getStartOffset(indexSegments) : storeToken.getOffset(); // Find index segment closest to the token offset. // Get entries starting from the first key in this offset. @@ -997,8 +999,8 @@ private StoreFindToken resetTokenIfRequired(StoreFindToken storeToken) { UUID remoteIncarnationId = storeToken.getIncarnationId(); // if incarnationId is null, for backwards compatibility purposes, the token is considered as good. /// if not null, we check for a match - if (!storeToken.getType().equals(StoreFindToken.Type.Uninitialized) && remoteIncarnationId != null - && !remoteIncarnationId.equals(incarnationId)) { + if (!storeToken.getType().equals(FindTokenType.Uninitialized) && remoteIncarnationId != null && !remoteIncarnationId + .equals(incarnationId)) { // incarnationId mismatch, hence resetting the token to beginning logger.info("Index : {} resetting offset after incarnation, new incarnation Id {}, " + "incarnationId from store token {}", dataDir, incarnationId, remoteIncarnationId); @@ -1008,13 +1010,13 @@ private StoreFindToken resetTokenIfRequired(StoreFindToken storeToken) { if (!cleanShutdown) { // if we had an unclean shutdown and the token offset is larger than the logEndOffsetOnStartup // we reset the token to logEndOffsetOnStartup - if (!storeToken.getType().equals(StoreFindToken.Type.Uninitialized) + if (!storeToken.getType().equals(FindTokenType.Uninitialized) && storeToken.getOffset().compareTo(logEndOffsetOnStartup) > 0) { logger.info("Index : " + dataDir + " resetting offset after not clean shutdown " + logEndOffsetOnStartup + " before offset " + storeToken.getOffset()); storeToken = new StoreFindToken(logEndOffsetOnStartup, sessionId, incarnationId, true); } - } else if (!storeToken.getType().equals(StoreFindToken.Type.Uninitialized) + } else if (!storeToken.getType().equals(FindTokenType.Uninitialized) && storeToken.getOffset().compareTo(logEndOffsetOnStartup) > 0) { logger.error( "Index : " + dataDir + " invalid token. Provided offset is outside the log range after clean shutdown"); @@ -1038,9 +1040,9 @@ private StoreFindToken resetTokenIfRequired(StoreFindToken storeToken) { private long getTotalBytesRead(StoreFindToken token, List messageEntries, Offset logEndOffsetBeforeFind, ConcurrentSkipListMap indexSegments) { long bytesRead = 0; - if (token.getType().equals(StoreFindToken.Type.IndexBased)) { + if (token.getType().equals(FindTokenType.IndexBased)) { bytesRead = getAbsolutePositionInLogForOffset(token.getOffset(), indexSegments); - } else if (token.getType().equals(StoreFindToken.Type.JournalBased)) { + } else if (token.getType().equals(FindTokenType.JournalBased)) { if (messageEntries.size() > 0) { bytesRead = getAbsolutePositionInLogForOffset(token.getOffset(), indexSegments) + messageEntries.get( messageEntries.size() - 1).getSize(); @@ -1445,20 +1447,20 @@ FindInfo findDeletedEntriesSince(FindToken token, long maxTotalSizeOfEntries, lo StoreFindToken newToken; List messageEntries = new ArrayList(); - if (storeToken.getType().equals(StoreFindToken.Type.IndexBased)) { + if (storeToken.getType().equals(FindTokenType.IndexBased)) { // Case 1: index based // Find the index segment corresponding to the token indexStartOffset. // Get entries starting from the token Key in this index. newToken = findEntriesFromSegmentStartOffset(storeToken.getOffset(), storeToken.getStoreKey(), messageEntries, new FindEntriesCondition(maxTotalSizeOfEntries, endTimeSeconds), validIndexSegments); - if (newToken.getType().equals(StoreFindToken.Type.Uninitialized)) { + if (newToken.getType().equals(FindTokenType.Uninitialized)) { newToken = storeToken; } } else { // journal based or empty Offset offsetToStart = storeToken.getOffset(); boolean inclusive = false; - if (storeToken.getType().equals(StoreFindToken.Type.Uninitialized)) { + if (storeToken.getType().equals(FindTokenType.Uninitialized)) { offsetToStart = getStartOffset(); inclusive = true; } @@ -1502,7 +1504,7 @@ FindInfo findDeletedEntriesSince(FindToken token, long maxTotalSizeOfEntries, lo if (entry != null && entry.getKey() != indexSegments.lastKey()) { newToken = findEntriesFromSegmentStartOffset(entry.getKey(), null, messageEntries, new FindEntriesCondition(maxTotalSizeOfEntries, endTimeSeconds), indexSegments); - if (newToken.getType().equals(StoreFindToken.Type.Uninitialized)) { + if (newToken.getType().equals(FindTokenType.Uninitialized)) { newToken = storeToken; } } else { @@ -1525,10 +1527,10 @@ void persistIndex() throws StoreException { /** * Re-validates the {@code token} to ensure that it is consistent with the current set of index segments. If it is - * not, a {@link StoreFindToken.Type#Uninitialized} token is returned. + * not, a {@link FindTokenType#Uninitialized} token is returned. * @param token the {@link StoreFindToken} to revalidate. * @return {@code token} if is consistent with the current set of index segments, a new - * {@link StoreFindToken.Type#Uninitialized} token otherwise. + * {@link FindTokenType#Uninitialized} token otherwise. */ FindToken revalidateFindToken(FindToken token) { return revalidateStoreFindToken((StoreFindToken) token, validIndexSegments); @@ -1536,10 +1538,10 @@ FindToken revalidateFindToken(FindToken token) { /** * Re-validates the {@code token} to ensure that it is consistent with the given view of {@code indexSegments}. If it - * is not, a {@link StoreFindToken.Type#Uninitialized} token is returned. + * is not, a {@link FindTokenType#Uninitialized} token is returned. * @param token the {@link StoreFindToken} to revalidate. * @param indexSegments the view of the index segments to revalidate against. - * @return {@code token} if is consistent with {@code indexSegments}, a new {@link StoreFindToken.Type#Uninitialized} + * @return {@code token} if is consistent with {@code indexSegments}, a new {@link FindTokenType#Uninitialized} * token otherwise. */ private StoreFindToken revalidateStoreFindToken(StoreFindToken token, diff --git a/ambry-store/src/main/java/com.github.ambry.store/StoreFindToken.java b/ambry-store/src/main/java/com.github.ambry.store/StoreFindToken.java index ca31d0f0a5..dd85a43aa9 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/StoreFindToken.java +++ b/ambry-store/src/main/java/com.github.ambry.store/StoreFindToken.java @@ -13,6 +13,8 @@ */ package com.github.ambry.store; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenType; import com.github.ambry.utils.Utils; import java.io.DataInputStream; import java.io.IOException; @@ -31,12 +33,6 @@ * always equal or larger than the writable segment. */ public class StoreFindToken implements FindToken { - /** - * The type of the store token. - */ - public enum Type { - Uninitialized, JournalBased, IndexBased - } static final short VERSION_0 = 0; static final short VERSION_1 = 1; @@ -52,7 +48,7 @@ public enum Type { private static final byte[] ZERO_LENGTH_ARRAY = new byte[0]; private static final int UNINITIALIZED_OFFSET = -1; // refers to the type of the token - private final Type type; + private final FindTokenType type; // refers to the offset in the log. Could be either of Journal or Index based token private final Offset offset; // refers to the store key incase of Index based token @@ -73,7 +69,7 @@ public enum Type { * Uninitialized token. Refers to the starting of the log. */ StoreFindToken() { - this(Type.Uninitialized, null, null, null, null, true, CURRENT_VERSION); + this(FindTokenType.Uninitialized, null, null, null, null, true, CURRENT_VERSION); } /** @@ -84,7 +80,7 @@ public enum Type { * @param incarnationId the incarnationId of the store */ StoreFindToken(StoreKey key, Offset indexSegmentStartOffset, UUID sessionId, UUID incarnationId) { - this(Type.IndexBased, indexSegmentStartOffset, key, sessionId, incarnationId, false, CURRENT_VERSION); + this(FindTokenType.IndexBased, indexSegmentStartOffset, key, sessionId, incarnationId, false, CURRENT_VERSION); } /** @@ -96,12 +92,12 @@ public enum Type { * {@code false} otherwise */ StoreFindToken(Offset offset, UUID sessionId, UUID incarnationId, boolean inclusive) { - this(Type.JournalBased, offset, null, sessionId, incarnationId, inclusive, CURRENT_VERSION); + this(FindTokenType.JournalBased, offset, null, sessionId, incarnationId, inclusive, CURRENT_VERSION); } /** * Instantiating {@link StoreFindToken} - * @param type the {@link Type} of the token + * @param type the {@link FindTokenType} of the token * @param offset the offset that this token refers to * @param key The {@link StoreKey} that the token refers to * @param sessionId the sessionId of the store that this token refers to @@ -110,12 +106,12 @@ public enum Type { * {@code false} otherwise * @param version refers to the version of the token */ - private StoreFindToken(Type type, Offset offset, StoreKey key, UUID sessionId, UUID incarnationId, boolean inclusive, - short version) { - if (!type.equals(Type.Uninitialized)) { + private StoreFindToken(FindTokenType type, Offset offset, StoreKey key, UUID sessionId, UUID incarnationId, + boolean inclusive, short version) { + if (!type.equals(FindTokenType.Uninitialized)) { if (offset == null || sessionId == null) { throw new IllegalArgumentException("Offset [" + offset + "] or SessionId [" + sessionId + "] cannot be null"); - } else if (type.equals(Type.IndexBased) && key == null) { + } else if (type.equals(FindTokenType.IndexBased) && key == null) { throw new IllegalArgumentException("StoreKey cannot be null for an index based token"); } if (version == VERSION_2 && incarnationId == null) { @@ -156,14 +152,14 @@ static StoreFindToken fromBytes(DataInputStream stream, StoreKeyFactory factory) long indexStartOffset = stream.readLong(); if (indexStartOffset != UNINITIALIZED_OFFSET) { // read store key if needed - storeFindToken = new StoreFindToken(Type.IndexBased, new Offset(logSegmentName, indexStartOffset), + storeFindToken = new StoreFindToken(FindTokenType.IndexBased, new Offset(logSegmentName, indexStartOffset), factory.getStoreKey(stream), sessionIdUUID, null, false, VERSION_0); } else if (offset != UNINITIALIZED_OFFSET) { storeFindToken = - new StoreFindToken(Type.JournalBased, new Offset(logSegmentName, offset), null, sessionIdUUID, null, - false, VERSION_0); + new StoreFindToken(FindTokenType.JournalBased, new Offset(logSegmentName, offset), null, sessionIdUUID, + null, false, VERSION_0); } else { - storeFindToken = new StoreFindToken(Type.Uninitialized, null, null, null, null, true, VERSION_0); + storeFindToken = new StoreFindToken(FindTokenType.Uninitialized, null, null, null, null, true, VERSION_0); } break; case VERSION_1: @@ -174,21 +170,21 @@ static StoreFindToken fromBytes(DataInputStream stream, StoreKeyFactory factory) sessionIdUUID = UUID.fromString(sessionId); } // read type - Type type = Type.values()[stream.readShort()]; + FindTokenType type = FindTokenType.values()[stream.readShort()]; switch (type) { case Uninitialized: - storeFindToken = new StoreFindToken(Type.Uninitialized, null, null, null, null, true, VERSION_1); + storeFindToken = new StoreFindToken(FindTokenType.Uninitialized, null, null, null, null, true, VERSION_1); break; case JournalBased: Offset logOffset = Offset.fromBytes(stream); storeFindToken = - new StoreFindToken(Type.JournalBased, logOffset, null, sessionIdUUID, null, false, VERSION_1); + new StoreFindToken(FindTokenType.JournalBased, logOffset, null, sessionIdUUID, null, false, VERSION_1); break; case IndexBased: Offset indexSegmentStartOffset = Offset.fromBytes(stream); storeFindToken = - new StoreFindToken(Type.IndexBased, indexSegmentStartOffset, factory.getStoreKey(stream), sessionIdUUID, - null, false, VERSION_1); + new StoreFindToken(FindTokenType.IndexBased, indexSegmentStartOffset, factory.getStoreKey(stream), + sessionIdUUID, null, false, VERSION_1); break; default: throw new IllegalStateException("Unknown store find token type: " + type); @@ -196,7 +192,7 @@ static StoreFindToken fromBytes(DataInputStream stream, StoreKeyFactory factory) break; case VERSION_2: // read type - type = Type.values()[stream.readShort()]; + type = FindTokenType.values()[stream.readShort()]; switch (type) { case Uninitialized: storeFindToken = new StoreFindToken(); @@ -233,7 +229,7 @@ static StoreFindToken fromBytes(DataInputStream stream, StoreKeyFactory factory) return storeFindToken; } - public Type getType() { + public FindTokenType getType() { return type; } @@ -257,11 +253,8 @@ boolean getInclusive() { return inclusive == (byte) 1; } - /** - * Returns the version of the {@link StoreFindToken} - * @return the version of the {}@link {@link StoreFindToken} - */ - short getVersion() { + @Override + public short getVersion() { return version; } @@ -292,9 +285,9 @@ public byte[] toBytes() { bufWrap.putInt(sessionIdBytes.length); bufWrap.put(sessionIdBytes); // add offset for journal based token - bufWrap.putLong((type == Type.JournalBased) ? offset.getOffset() : UNINITIALIZED_OFFSET); + bufWrap.putLong((type == FindTokenType.JournalBased) ? offset.getOffset() : UNINITIALIZED_OFFSET); // add index start offset for Index based token - bufWrap.putLong((type == Type.IndexBased) ? offset.getOffset() : UNINITIALIZED_OFFSET); + bufWrap.putLong((type == FindTokenType.IndexBased) ? offset.getOffset() : UNINITIALIZED_OFFSET); // add storekey bufWrap.put(storeKeyBytes); break; @@ -324,13 +317,13 @@ public byte[] toBytes() { byte[] incarnationIdBytes = incarnationId != null ? incarnationId.toString().getBytes() : ZERO_LENGTH_ARRAY; storeKeyBytes = storeKey != null ? storeKey.toBytes() : ZERO_LENGTH_ARRAY; size = VERSION_SIZE + TYPE_SIZE; - if (type != Type.Uninitialized) { + if (type != FindTokenType.Uninitialized) { size += INCARNATION_ID_LENGTH_SIZE + incarnationIdBytes.length + SESSION_ID_LENGTH_SIZE + sessionIdBytes.length + offsetBytes.length; - if (type == Type.JournalBased) { + if (type == FindTokenType.JournalBased) { size += INCLUSIVE_BYTE_SIZE; - } else if (type == Type.IndexBased) { + } else if (type == FindTokenType.IndexBased) { size += storeKeyBytes.length; } } @@ -340,7 +333,7 @@ public byte[] toBytes() { bufWrap.putShort(VERSION_2); // add type bufWrap.putShort((short) type.ordinal()); - if (type != Type.Uninitialized) { + if (type != FindTokenType.Uninitialized) { // add incarnationId bufWrap.putInt(incarnationIdBytes.length); bufWrap.put(incarnationIdBytes); @@ -349,9 +342,9 @@ public byte[] toBytes() { bufWrap.put(sessionIdBytes); // add offset bufWrap.put(offsetBytes); - if (type == Type.JournalBased) { + if (type == FindTokenType.JournalBased) { bufWrap.put(getInclusive() ? (byte) 1 : (byte) 0); - } else if (type == Type.IndexBased) { + } else if (type == FindTokenType.IndexBased) { bufWrap.put(storeKeyBytes); } } @@ -369,7 +362,7 @@ public String toString() { sb.append(" incarnationId ").append(incarnationId); } sb.append(" inclusiveness ").append(inclusive == 1); - if (!type.equals(Type.Uninitialized)) { + if (!type.equals(FindTokenType.Uninitialized)) { if (sessionId != null) { sb.append(" sessionId ").append(sessionId); } diff --git a/ambry-store/src/main/java/com.github.ambry.store/StoreFindTokenFactory.java b/ambry-store/src/main/java/com.github.ambry.store/StoreFindTokenFactory.java index 5e9d637095..f4d0826e1e 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/StoreFindTokenFactory.java +++ b/ambry-store/src/main/java/com.github.ambry.store/StoreFindTokenFactory.java @@ -13,6 +13,8 @@ */ package com.github.ambry.store; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenFactory; import java.io.DataInputStream; import java.io.IOException; diff --git a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java index 1c7b4248d5..856cb244fd 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java @@ -18,6 +18,7 @@ import com.github.ambry.clustermap.ReplicaStatusDelegate; import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.replication.FindToken; import com.github.ambry.utils.ByteBufferOutputStream; import com.github.ambry.utils.MockTime; import com.github.ambry.utils.Pair; diff --git a/ambry-store/src/test/java/com.github.ambry.store/HardDeleterTest.java b/ambry-store/src/test/java/com.github.ambry.store/HardDeleterTest.java index 471f99de2c..c481d015a5 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/HardDeleterTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/HardDeleterTest.java @@ -16,6 +16,8 @@ import com.codahale.metrics.MetricRegistry; import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenType; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.MockTime; import com.github.ambry.utils.SystemTime; @@ -374,9 +376,9 @@ private class MockIndex extends PersistentIndex { } /** - * Always returns a {@link StoreFindToken.Type#Uninitialized} token. + * Always returns a {@link FindTokenType#Uninitialized} token. * @param token the {@link StoreFindToken} to revalidate. - * @return a {@link StoreFindToken.Type#Uninitialized} token. + * @return a {@link FindTokenType#Uninitialized} token. */ @Override FindToken revalidateFindToken(FindToken token) { diff --git a/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java b/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java index 7e1f8f5cab..295bf3b1cc 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java @@ -18,6 +18,7 @@ import com.github.ambry.account.Container; import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.replication.FindToken; import com.github.ambry.utils.Pair; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; diff --git a/ambry-store/src/test/java/com.github.ambry.store/StoreFindTokenTest.java b/ambry-store/src/test/java/com.github.ambry.store/StoreFindTokenTest.java index a9970681c0..5638515bbb 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/StoreFindTokenTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/StoreFindTokenTest.java @@ -13,6 +13,7 @@ */ package com.github.ambry.store; +import com.github.ambry.replication.FindTokenType; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Utils; @@ -262,10 +263,10 @@ static DataInputStream getSerializedStream(StoreFindToken token, short version) Utils.serializeNullableString(bufWrap, sessionId == null ? null : sessionId.toString()); long logOffset = -1; long indexStartOffset = -1; - StoreFindToken.Type type = token.getType(); - if (type.equals(StoreFindToken.Type.JournalBased)) { + FindTokenType type = token.getType(); + if (type.equals(FindTokenType.JournalBased)) { logOffset = token.getOffset().getOffset(); - } else if (type.equals(StoreFindToken.Type.IndexBased)) { + } else if (type.equals(FindTokenType.IndexBased)) { indexStartOffset = token.getOffset().getOffset(); } // add offset diff --git a/ambry-store/src/test/java/com.github.ambry.store/StoreTestUtils.java b/ambry-store/src/test/java/com.github.ambry.store/StoreTestUtils.java index 1a208e9ce6..5aa701d093 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/StoreTestUtils.java +++ b/ambry-store/src/test/java/com.github.ambry.store/StoreTestUtils.java @@ -18,6 +18,7 @@ import com.github.ambry.clustermap.DiskId; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.clustermap.ReplicaType; import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; import java.io.File; @@ -129,6 +130,11 @@ public void markDiskUp() { // Null OK } + @Override + public ReplicaType getReplicaType() { + return ReplicaType.DISK_BACKED; + } + public void setSealedState(boolean isSealed) { this.isSealed = isSealed; } diff --git a/ambry-tools/src/main/java/com.github.ambry/store/DumpReplicaTokenTool.java b/ambry-tools/src/main/java/com.github.ambry/store/DumpReplicaTokenTool.java index 8bf5771abd..dfb39a9b11 100644 --- a/ambry-tools/src/main/java/com.github.ambry/store/DumpReplicaTokenTool.java +++ b/ambry-tools/src/main/java/com.github.ambry/store/DumpReplicaTokenTool.java @@ -18,6 +18,8 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenFactory; import com.github.ambry.tools.util.ToolUtils; import com.github.ambry.utils.Utils; import java.io.DataInputStream; diff --git a/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java b/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java index 485b283248..2878601800 100644 --- a/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java +++ b/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java @@ -23,6 +23,8 @@ import com.github.ambry.messageformat.MessageFormatException; import com.github.ambry.messageformat.MessageFormatRecord; import com.github.ambry.messageformat.UpdateRecord; +import com.github.ambry.replication.FindToken; +import com.github.ambry.replication.FindTokenFactory; import com.github.ambry.tools.util.ToolUtils; import com.github.ambry.utils.CrcInputStream; import com.github.ambry.utils.Utils; diff --git a/ambry-tools/src/main/java/com.github.ambry/store/StoreCopier.java b/ambry-tools/src/main/java/com.github.ambry/store/StoreCopier.java index 03745040a4..0b9ff82747 100644 --- a/ambry-tools/src/main/java/com.github.ambry/store/StoreCopier.java +++ b/ambry-tools/src/main/java/com.github.ambry/store/StoreCopier.java @@ -23,6 +23,7 @@ import com.github.ambry.messageformat.BlobStoreRecovery; import com.github.ambry.messageformat.MessageFormatWriteSet; import com.github.ambry.messageformat.TtlUpdateMessageFormatInputStream; +import com.github.ambry.replication.FindToken; import com.github.ambry.tools.util.ToolUtils; import com.github.ambry.utils.ByteBufferChannel; import com.github.ambry.utils.Pair; diff --git a/ambry-tools/src/test/java/com.github.ambry/store/StoreCopierTest.java b/ambry-tools/src/test/java/com.github.ambry/store/StoreCopierTest.java index 00414eacbc..b0b3673479 100644 --- a/ambry-tools/src/test/java/com.github.ambry/store/StoreCopierTest.java +++ b/ambry-tools/src/test/java/com.github.ambry/store/StoreCopierTest.java @@ -19,6 +19,7 @@ import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.messageformat.MessageFormatWriteSet; +import com.github.ambry.replication.FindToken; import com.github.ambry.utils.ByteBufferChannel; import com.github.ambry.utils.MockTime; import com.github.ambry.utils.Pair; diff --git a/build.gradle b/build.gradle index 165e209bd4..8fca960588 100644 --- a/build.gradle +++ b/build.gradle @@ -401,7 +401,7 @@ task allJar(type: Jar, dependsOn: subprojects.assemble) { task allJarVcr(type: Jar, dependsOn: subprojects.assemble) { manifest { - attributes 'Implementation-Title': 'Ambry', + attributes 'Implementation-Title': 'Vcr', 'Main-Class': 'com.github.ambry.cloud.VcrMain' } zip64 true