Skip to content

Commit

Permalink
FindToken changes (#1254)
Browse files Browse the repository at this point in the history
This change enables cloud find token and store find token to work together for two way replication.
The main changes are
1. Since the tokens for a cloud replica and store replica are different, this change introduces replicatype to differentiate between the two. For now the determination about replica type is based on mount point (cloud replicas have a dummy mount point of /vcr/partitionId).
2. Implementation of a factoryfactory to get findtokenfactory based on an input stream (during deserialization) or based on replica type.
3. Implementation of cloud token factory.
4. Bug fixes in cloud find token.
5. Refactoring some common code used by both ambry-store, ambry-cloud and ambry-replication.
6. Change the replication code to recognize both cloud replicas and store replicas and serialize/deserialize their token accordingly.
7. Change tests where token factory was used.
  • Loading branch information
ankagrawal authored and lightningrob committed Sep 12, 2019
1 parent a1af48c commit 9487e8b
Show file tree
Hide file tree
Showing 60 changed files with 1,024 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,7 @@ public class ClusterMapSnapshotConstants {
public static final String REPLICA_DISK = "disk";
// values
public static final String REPLICA_STOPPED = "stopped";

//the virtual mount point for the cloud replica in the cluster map
public static final String CLOUD_REPLICA_MOUNT = "/vcr";
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,9 @@ public interface ReplicaId extends Resource {
* @return true if this replica is in sealed state.
*/
boolean isSealed();

/**
* @return the {@code ReplicaType} for this replica.
*/
ReplicaType getReplicaType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.github.ambry.clustermap;

/**
* The type of replica.
*/
public enum ReplicaType {
DISK_BACKED, CLOUD_BACKED
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ public class ReplicationConfig {
*/
@Config("replication.token.factory")
@Default("com.github.ambry.store.StoreFindTokenFactory")
public final String replicationTokenFactory;
public final String replicationStoreTokenFactory;

/**
* The factory class the replicatio uses to create cloud token
*/
@Config("replcation.cloudtoken.factory")
@Default("com.github.ambry.cloud.CloudFindTokenFactory")
public final String replicationCloudTokenFactory;

/**
* The number of replica threads on each server that runs the replication protocol for intra dc replication
Expand Down Expand Up @@ -124,10 +131,19 @@ public class ReplicationConfig {
@Default("false")
public final boolean replicationTrackPerPartitionLagFromRemote;

/**
* The version of metadata request to be used for replication.
*/
@Config("replication.metadatarequest.version")
@Default("1")
public final short replicaMetadataRequestVersion;

public ReplicationConfig(VerifiableProperties verifiableProperties) {

replicationTokenFactory =
replicationStoreTokenFactory =
verifiableProperties.getString("replication.token.factory", "com.github.ambry.store.StoreFindTokenFactory");
replicationCloudTokenFactory = verifiableProperties.getString("replication.cloudtoken.factory",
"com.github.ambry.cloud.CloudFindTokenFactory");
replicationNumOfIntraDCReplicaThreads =
verifiableProperties.getInt("replication.no.of.intra.dc.replica.threads", 1);
replicationNumOfInterDCReplicaThreads =
Expand Down Expand Up @@ -155,5 +171,7 @@ public ReplicationConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getBoolean("replication.persist.token.on.shutdown.or.replica.remove", true);
replicationTrackPerPartitionLagFromRemote =
verifiableProperties.getBoolean("replication.track.per.partition.lag.from.remote", false);
replicaMetadataRequestVersion =
verifiableProperties.getShortInRange("replication.metadatarequest.version", (short) 1, (short) 1, (short) 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.store;
package com.github.ambry.replication;

/**
* The find token used to search entries in the store
Expand All @@ -28,4 +28,16 @@ public interface FindToken {
* @return The total bytes read so far until this token
*/
public long getBytesRead();

/**
* Returns the type of {@code FindToken}
* @return the type of the token
*/
public FindTokenType getType();

/**
* Returns the version of the {@link FindToken}
* @return the version of the {@link FindToken}
*/
public short getVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.store;
package com.github.ambry.replication;

import java.io.DataInputStream;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.replication;

import com.github.ambry.clustermap.ReplicaType;
import com.github.ambry.config.ReplicationConfig;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Helper class to get findtoken based on replica type or input stream.
*/
public class FindTokenHelper {
private static final Logger logger = LoggerFactory.getLogger(FindTokenHelper.class);
private final StoreKeyFactory storeKeyFactory;
private final ReplicationConfig replicationConfig;
private final Map<ReplicaType, FindTokenFactory> findTokenFactoryMap;

public FindTokenHelper() {
storeKeyFactory = null;
replicationConfig = null;
findTokenFactoryMap = null;
}

/**
* Create a {@code FindTokenHelper} object.
* @param storeKeyFactory
* @param replicationConfig
*/
public FindTokenHelper(StoreKeyFactory storeKeyFactory, ReplicationConfig replicationConfig)
throws ReflectiveOperationException {
this.storeKeyFactory = storeKeyFactory;
this.replicationConfig = replicationConfig;
findTokenFactoryMap = new HashMap<>();
findTokenFactoryMap.put(ReplicaType.DISK_BACKED,
Utils.getObj(replicationConfig.replicationStoreTokenFactory, storeKeyFactory));
findTokenFactoryMap.put(ReplicaType.CLOUD_BACKED, Utils.getObj(replicationConfig.replicationCloudTokenFactory));
}

/**
* Get {@code FindTokenFactory} object based on {@code ReplicaType}
* @param replicaType for which to get the {@code FindTokenFactory} object
* @return {@code FindTokenFactory} object.
* @throws ReflectiveOperationException
*/
public FindTokenFactory getFindTokenFactoryFromReplicaType(ReplicaType replicaType) {
if (!findTokenFactoryMap.containsKey(replicaType)) {
throw new IllegalArgumentException("Invalid replica type " + replicaType.getClass());
}
return findTokenFactoryMap.get(replicaType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.replication;

/**
* The type of replica token
*/
public enum FindTokenType {
Uninitialized, JournalBased, IndexBased, CloudBased;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.store;

import com.github.ambry.replication.FindToken;
import java.util.List;


Expand Down
1 change: 1 addition & 0 deletions ambry-api/src/main/java/com.github.ambry/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.store;

import com.github.ambry.replication.FindToken;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.network.ConnectionPool;
import com.github.ambry.notification.NotificationSystem;
import com.github.ambry.replication.FindTokenFactory;
import com.github.ambry.replication.PartitionInfo;
import com.github.ambry.replication.RemoteReplicaInfo;
import com.github.ambry.replication.ReplicationEngine;
Expand Down Expand Up @@ -77,7 +78,7 @@ public CloudBackupManager(VerifiableProperties properties, CloudConfig cloudConf
this.cloudDestination = cloudDestinationFactory.getCloudDestination();
this.persistor =
new CloudTokenPersistor(replicaTokenFileName, mountPathToPartitionInfos, replicationMetrics, clusterMap,
factory, cloudDestination);
tokenHelper, cloudDestination);
this.cloudStorageCompactor =
new CloudStorageCompactor(cloudDestination, partitionToPartitionInfo.keySet(), vcrMetrics, false);
}
Expand Down Expand Up @@ -184,8 +185,10 @@ void addPartition(PartitionId partitionId) throws ReplicationException {
// We need to ensure that a replica token gets persisted only after the corresponding data in the
// store gets flushed to cloud. We use the store flush interval multiplied by a constant factor
// to determine the token flush interval
FindTokenFactory findTokenFactory =
tokenHelper.getFindTokenFactoryFromReplicaType(peerReplica.getReplicaType());
RemoteReplicaInfo remoteReplicaInfo =
new RemoteReplicaInfo(peerReplica, cloudReplica, cloudStore, factory.getNewFindToken(),
new RemoteReplicaInfo(peerReplica, cloudReplica, cloudStore, findTokenFactory.getNewFindToken(),
storeConfig.storeDataFlushIntervalSeconds * SystemTime.MsPerSec * Replication_Delay_Multiplier,
SystemTime.getInstance(), peerReplica.getDataNodeId().getPortToConnectTo());
replicationMetrics.addMetricsForRemoteReplicaInfo(remoteReplicaInfo);
Expand Down Expand Up @@ -243,5 +246,3 @@ public VcrMetrics getVcrMetrics() {
return vcrMetrics;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.github.ambry.config.CloudConfig;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.replication.FindToken;
import com.github.ambry.store.FindInfo;
import com.github.ambry.store.FindToken;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageWriteSet;
import com.github.ambry.store.Store;
Expand Down Expand Up @@ -150,7 +150,7 @@ public void downloadBlob(BlobId blobId, OutputStream outputStream) throws StoreE
try {
cloudDestination.downloadBlob(blobId, outputStream);
} catch (CloudStorageException e) {
throw new StoreException("Error occured in downloading blob for blobid :" + blobId, StoreErrorCodes.IOError);
throw new StoreException("Error occurred in downloading blob for blobid :" + blobId, StoreErrorCodes.IOError);
}
}

Expand Down
Loading

0 comments on commit 9487e8b

Please sign in to comment.