Skip to content
This repository has been archived by the owner on Apr 23, 2018. It is now read-only.

Commit

Permalink
Voldemort instrumentation/modification
Browse files Browse the repository at this point in the history
  • Loading branch information
nruth committed Jul 10, 2012
1 parent 3c3a1bd commit 8b30494
Show file tree
Hide file tree
Showing 17 changed files with 824 additions and 12 deletions.
3 changes: 3 additions & 0 deletions voldemort-0.90.1-nruth/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,8 @@
<classpathentry kind="lib" path="lib/mockito-all-1.8.5.jar"/>
<classpathentry kind="lib" path="lib/avro-1.4.0.jar"/>
<classpathentry kind="lib" path="lib/libthrift-0.5.0.jar"/>
<classpathentry kind="lib" path="lib/commons-math3-3.0-javadoc.jar"/>
<classpathentry kind="lib" path="lib/commons-math3-3.0-sources.jar"/>
<classpathentry kind="lib" path="lib/commons-math3-3.0.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
Empty file modified voldemort-0.90.1-nruth/bin/generate_cluster_xml.py
100755 → 100644
Empty file.
1 change: 1 addition & 0 deletions voldemort-0.90.1-nruth/config/lakka/config/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
server.properties
58 changes: 58 additions & 0 deletions voldemort-0.90.1-nruth/config/lakka/config/cluster.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<cluster>
<name>Lakka</name>

<server>
<!-- The node id is a unique, sequential id beginning with 0 that identifies each server in the cluster-->
<id>0</id>
<host>Lakka-1.it.kth.se</host>
<http-port>8081</http-port>
<socket-port>6666</socket-port>
<admin-port>6667</admin-port>
<!-- A list of data partitions assigned to this server -->
<partitions>0,1,2,3,4</partitions>
</server>

<server>
<!-- The node id is a unique, sequential id beginning with 0 that identifies each server in the cluster-->
<id>1</id>
<host>Lakka-2.it.kth.se</host>
<http-port>8081</http-port>
<socket-port>6666</socket-port>
<admin-port>6667</admin-port>
<!-- A list of data partitions assigned to this server -->
<partitions>5,6,7,8,9,10,11</partitions>
</server>

<server>
<!-- The node id is a unique, sequential id beginning with 0 that identifies each server in the cluster-->
<id>2</id>
<host>Lakka-3.it.kth.se</host>
<http-port>8081</http-port>
<socket-port>6666</socket-port>
<admin-port>6667</admin-port>
<!-- A list of data partitions assigned to this server -->
<partitions></partitions>
</server>

<server>
<!-- The node id is a unique, sequential id beginning with 0 that identifies each server in the cluster-->
<id>3</id>
<host>Lakka-4.it.kth.se</host>
<http-port>8081</http-port>
<socket-port>6666</socket-port>
<admin-port>6667</admin-port>
<!-- A list of data partitions assigned to this server -->
<partitions></partitions>
</server>

<server>
<!-- The node id is a unique, sequential id beginning with 0 that identifies each server in the cluster-->
<id>4</id>
<host>Lakka-5.it.kth.se</host>
<http-port>8081</http-port>
<socket-port>6666</socket-port>
<admin-port>6667</admin-port>
<!-- A list of data partitions assigned to this server -->
<partitions></partitions>
</server>
</cluster>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node.id=0
21 changes: 21 additions & 0 deletions voldemort-0.90.1-nruth/config/lakka/config/stores.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<stores>
<store>
<name>trickystore2</name>
<replication-factor>1</replication-factor>
<preferred-reads>1</preferred-reads>
<required-reads>1</required-reads>
<preferred-writes>1</preferred-writes>
<required-writes>1</required-writes>
<persistence>bdb</persistence>
<routing>client</routing>
<routing-strategy>consistent-routing</routing-strategy>
<key-serializer>
<type>string</type>
<schema-info>utf8</schema-info>
</key-serializer>
<value-serializer>
<type>string</type>
<schema-info>utf8</schema-info>
</value-serializer>
</store>
</stores>
Binary file not shown.
Binary file not shown.
Binary file added voldemort-0.90.1-nruth/lib/commons-math3-3.0.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
boolean repairReads = !storeDef.isView();

// construct mapping
// this is to determine which nodes hold the key we're looking for,
// via partition positions
Map<Integer, Store<ByteArray, byte[], byte[]>> clientMapping = Maps.newHashMap();
Map<Integer, NonblockingStore> nonblockingStores = Maps.newHashMap();
Map<Integer, NonblockingStore> nonblockingSlopStores = Maps.newHashMap();
Expand All @@ -164,12 +166,21 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
slopStores = Maps.newHashMap();

for(Node node: cluster.getNodes()) {

// access to store on this node, with working RMI/RPC connection
// (e.g. tcp or http)
Store<ByteArray, byte[], byte[]> store = getStore(storeDef.getName(),
node.getHost(),
getPort(node),
this.requestFormatType);

// lookup table to get a node's RMI connection to the store we're
// processing
clientMapping.put(node.getId(), store);

// nonblocking stores are just what they sound like - async stores
// supporting callbacks,
// so single-threads can send requests to multiple nodes in parallel
NonblockingStore nonblockingStore = routedStoreFactory.toNonblockingStore(store);
nonblockingStores.put(node.getId(), nonblockingStore);

Expand Down Expand Up @@ -197,6 +208,7 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
repairReads,
clientZoneId,
getFailureDetector());
// decorator pattern, add logging
store = new LoggingStore(store);

if(isJmxEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package voldemort.client;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;

import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategy;
import voldemort.serialization.Serializer;
import voldemort.store.Store;
import voldemort.store.StoreCapabilityType;
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

/**
*
* Wraps a DefaultStoreClient instance; method calls are instrumented and
* delegated
*
* @author Nicholas Trevor Rutherford
*
* @param <K> The key type
* @param <V> The value type
*/
public class InstrumentedDefaultStoreClient<K, V> implements StoreClient<K, V> {

// all real work is delegated to this client
private DefaultStoreClient<K, V> client;
private volatile Store<K, V, Object> store;
private final Logger logger = Logger.getLogger(InstrumentedDefaultStoreClient.class);

// holds {partition: {request: count}}
private PartitionAccessHistogram partition_usage;

@SuppressWarnings("unchecked")
public InstrumentedDefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
StoreClientFactory storeFactory,
int maxMetadataRefreshAttempts) {

// to store request counts
this.partition_usage = new PartitionAccessHistogram();

// delegate actual request processing to default store client
this.client = new DefaultStoreClient<K, V>(storeName,
resolver,
storeFactory,
maxMetadataRefreshAttempts);

// make the default client's raw store accessible, for routing info
try {
Field client_store_field = this.client.getClass().getDeclaredField("store");
client_store_field.setAccessible(true);
this.store = (Store<K, V, Object>) client_store_field.get(client);
} catch(Exception e) {
throw new UnsupportedOperationException(e.toString());
}

logger.info("booted instrumented client");
}

public Versioned<V> get(K key, Object transforms) {
partition_usage.recordGet(partition_for_key(key));
return client.get(key, transforms);
}

public Versioned<V> get(K key, Versioned<V> defaultValue) {
partition_usage.recordGet(partition_for_key(key));
return client.get(key, defaultValue);
}

public Versioned<V> get(K key) {
partition_usage.recordGet(partition_for_key(key));
return client.get(key);
}

public Version put(K key, V value, Object transforms) {
partition_usage.recordPut(partition_for_key(key));
// logger.info("put recorded for " + partition_for_key(key));
return client.put(key, value, transforms);
}

public Version put(K key, V value) {
partition_usage.recordPut(partition_for_key(key));
// logger.info("put recorded for " + partition_for_key(key));
return client.put(key, value);
}

public Version put(K key, Versioned<V> versioned) throws ObsoleteVersionException {
partition_usage.recordPut(partition_for_key(key));
// logger.info("put recorded for " + partition_for_key(key));
return client.put(key, versioned);
}

public boolean delete(K key) {
partition_usage.recordDelete(partition_for_key(key));
return client.delete(key);
}

public boolean delete(K key, Version version) {
partition_usage.recordDelete(partition_for_key(key));
return client.delete(key, version);
}

public boolean applyUpdate(UpdateAction<K, V> action) {
return applyUpdate(action, 3);
}

public boolean applyUpdate(UpdateAction<K, V> action, int maxTries) {
// copy-pasted from DefaultStoreClient because need the actions to run
// methods on this store, so they are counted
// can't hard-code the counts because the action doesn't tell us which
// partition or key it's working on (could be several)
boolean success = false;
try {
for(int i = 0; i < maxTries; i++) {
try {
// logger.info("applying update with instrumented client");
action.update(this);
success = true;
return success;
} catch(ObsoleteVersionException e) {
// ignore for now
}
}
} finally {
if(!success)
action.rollback();
}

// if we got here we have seen too many ObsoleteVersionExceptions
// and have rolled back the updates
return false;
}

public V getValue(K key) {
throw new UnsupportedOperationException("Not yet implemented.");
// return client.getValue(key);
}

public V getValue(K key, V defaultValue) {
throw new UnsupportedOperationException("Not yet implemented.");
// return client.getValue(key, defaultValue);
}

public Map<K, Versioned<V>> getAll(Iterable<K> keys) {
throw new UnsupportedOperationException("Not yet implemented.");
// return client.getAll(keys);
}

public Map<K, Versioned<V>> getAll(Iterable<K> keys, Map<K, Object> transforms) {
throw new UnsupportedOperationException("Not yet implemented.");
// return client.getAll(keys, transforms);
}

public boolean putIfNotObsolete(K key, Versioned<V> versioned) {
throw new UnsupportedOperationException("Not yet implemented.");
// return client.putIfNotObsolete(key, versioned);
}

public List<Node> getResponsibleNodes(K key) {
return client.getResponsibleNodes(key);
}

private enum Request {
GET,
PUT,
DELETE
}

private int partition_for_key(K key) {
// assumes DefaultClientStore is modified so that #store is public
// access by reflection-hack
RoutingStrategy routing = (RoutingStrategy) store.getCapability(StoreCapabilityType.ROUTING_STRATEGY);
@SuppressWarnings("unchecked")
Serializer<K> keySerializer = (Serializer<K>) store.getCapability(StoreCapabilityType.KEY_SERIALIZER);

// key -> bytes, and look-up its partitions
// assuming 1 replica as in experiment
return routing.getPartitionList(keySerializer.toBytes(key)).get(0);
}

private class PartitionAccessHistogram {

public PartitionAccessHistogram() {
partition_histograms = new HashMap<Integer, Map<Request, Integer>>();
}

public void recordGet(Integer partition) {
incrementRequestCount(partition, Request.GET);
}

public void recordPut(Integer partition) {
incrementRequestCount(partition, Request.PUT);
}

public void recordDelete(Integer partition) {
incrementRequestCount(partition, Request.DELETE);
}

private Map<Integer, Map<Request, Integer>> partition_histograms;

private void incrementRequestCount(Integer partition, Request request) {
// Find or init the partition's request histogram
Map<Request, Integer> partition_counts = partition_histograms.get(partition);
if(partition_counts == null) {
// new map with counters at 0
HashMap<Request, Integer> new_req_counter = new HashMap<Request, Integer>();
new_req_counter.put(Request.GET, 0);
new_req_counter.put(Request.PUT, 0);
new_req_counter.put(Request.DELETE, 0);
// put new partition histogram into
partition_histograms.put(partition, new_req_counter);
partition_counts = new_req_counter;
}

// n = n + 1
int next_n = partition_counts.get(request) + 1;
if((next_n % 5000) == 0) {
logger.info("Received " + next_n + " " + request + " requests for partition "
+ partition);
}
partition_counts.put(request, next_n);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public Node[] getPartitionToNode() {
public ConsistentRoutingStrategy(HashFunction hash, Collection<Node> nodes, int numReplicas) {
this.numReplicas = numReplicas;
this.hash = hash;

// create lookup table for all the (primary?) partitions stored in the
// cluster
// * by asking each node for its partitions (cluster.xml)
// * using a tree for efficient contains-lookup for raising error on
// duplicate
SortedMap<Integer, Node> m = new TreeMap<Integer, Node>();
for(Node n: nodes) {
for(Integer partition: n.getPartitionIds()) {
Expand All @@ -74,6 +80,8 @@ public ConsistentRoutingStrategy(HashFunction hash, Collection<Node> nodes, int
}
}

// convert the tree to an array lookup-table
// all partitions from 0 to num_partitions-1 must be included
this.partitionToNode = new Node[m.size()];
for(int i = 0; i < m.size(); i++) {
if(!m.containsKey(i))
Expand Down
Loading

0 comments on commit 8b30494

Please sign in to comment.