Skip to content

Commit

Permalink
Remove synchronization of getters and default to async prestosql clus…
Browse files Browse the repository at this point in the history
…ter manager
  • Loading branch information
James Taylor committed Nov 5, 2020
1 parent bb22e73 commit 9e0b728
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) 2019. Qubole Inc
* Licensed under the Apache License, Version 2.0 (the License);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package com.qubole.rubix.prestosql;

import io.prestosql.spi.Node;
import io.prestosql.spi.NodeManager;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

public class ClusterManagerNodeGetter {
public static Set<String> getNodesInternal(NodeManager nodeManager)
{
requireNonNull(nodeManager, "nodeManager is null");
return nodeManager.getWorkerNodes().stream()
.map(Node::getHost)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,70 +12,46 @@
*/
package com.qubole.rubix.prestosql;

import com.qubole.rubix.spi.AsyncClusterManager;
import com.qubole.rubix.spi.ClusterType;
import com.qubole.rubix.spi.SyncClusterManager;
import io.prestosql.spi.Node;
import io.prestosql.spi.NodeManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;

import java.net.UnknownHostException;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

/**
* Created by stagra on 14/1/16.
*/
public class PrestoClusterManager extends SyncClusterManager
public class PrestoClusterManager extends AsyncClusterManager
{
private static Log log = LogFactory.getLog(PrestoClusterManager.class);
private static volatile NodeManager nodeManager;
private volatile Set<Node> workerNodes;
static volatile NodeManager NODE_MANAGER;

public static void setNodeManager(NodeManager nodeManager)
{
PrestoClusterManager.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
PrestoClusterManager.NODE_MANAGER = nodeManager;
}

@Override
protected boolean hasStateChanged() {
requireNonNull(nodeManager, "nodeManager is null");
Set<Node> workerNodes = nodeManager.getWorkerNodes();
boolean hasChanged = !workerNodes.equals(this.workerNodes);
this.workerNodes = workerNodes;
return hasChanged;
}
private volatile NodeManager nodeManager;

@Override
public Set<String> getNodesInternal()
public void initialize(Configuration conf) throws UnknownHostException
{
requireNonNull(nodeManager, "nodeManager is null");
return nodeManager.getWorkerNodes().stream()
.map(Node::getHost)
.collect(Collectors.toSet());
}

@Override
protected String getCurrentNodeHostname()
{
requireNonNull(nodeManager, "nodeManager is null");
return nodeManager.getCurrentNode().getHost();
super.initialize(conf);
nodeManager = NODE_MANAGER;
if (nodeManager == null) {
nodeManager = new StandaloneNodeManager(conf);
}
}

@Override
protected String getCurrentNodeHostAddress()
public Set<String> getNodesInternal()
{
requireNonNull(nodeManager, "nodeManager is null");
try {
return nodeManager.getCurrentNode().getHostAndPort().toInetAddress().getHostAddress();
}
catch (UnknownHostException e) {
log.warn("Could not get HostAddress from NodeManager", e);
}

return super.getCurrentNodeHostAddress();
return ClusterManagerNodeGetter.getNodesInternal(nodeManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.Node;
import io.prestosql.spi.NodeManager;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -39,8 +41,9 @@

import static java.util.Objects.requireNonNull;

public class TestingNodeManager implements NodeManager {
private static Log LOG = LogFactory.getLog(TestingNodeManager.class);
public class StandaloneNodeManager
implements NodeManager {
private static Log LOG = LogFactory.getLog(StandaloneNodeManager.class);
private static final int DEFAULT_SERVER_PORT = 8081;
private static final String DEFAULT_USER = "rubix";
public static final String SERVER_PORT_CONF_KEY = "caching.fs.presto-server-port";
Expand All @@ -49,10 +52,16 @@ public class TestingNodeManager implements NodeManager {
private final Node currentNode;
private final int serverPort;

public TestingNodeManager(Configuration conf, Node currentNode)
{
public StandaloneNodeManager(Configuration conf) {
this.serverPort = conf.getInt(SERVER_PORT_CONF_KEY, DEFAULT_SERVER_PORT);
this.serverAddress = ClusterUtil.getMasterHostname(conf);
Node currentNode = null;
try {
currentNode = new StandaloneNode(URI.create("http://" + InetAddress.getLocalHost().getHostAddress()));
}
catch (UnknownHostException e) {
LOG.warn("Unable to set current node", e);
}
this.currentNode = currentNode;
}

Expand Down Expand Up @@ -145,7 +154,7 @@ public Set<Node> getWorkerNodes() {

Set<Node> hosts = new HashSet<Node>();
for (Stats node : allNodes) {
hosts.add(new TestingNode(node.getUri()));
hosts.add(new StandaloneNode(node.getUri()));
}

return hosts;
Expand Down Expand Up @@ -188,11 +197,12 @@ private URL getFailedNodeUrl()
return new URL("http://" + serverAddress + ":" + serverPort + "/v1/node/failed");
}

public static class TestingNode implements Node
public static class StandaloneNode
implements Node
{
private final URI uri;

public TestingNode(URI uri) {
public StandaloneNode(URI uri) {
this.uri = uri;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright (c) 2019. Qubole Inc
* Licensed under the Apache License, Version 2.0 (the License);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package com.qubole.rubix.prestosql;

import com.qubole.rubix.spi.ClusterType;
import com.qubole.rubix.spi.SyncClusterManager;
import io.prestosql.spi.Node;
import io.prestosql.spi.NodeManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Set;

import static java.util.Objects.requireNonNull;

public class SyncPrestoClusterManager extends SyncClusterManager
{
private static Log log = LogFactory.getLog(SyncPrestoClusterManager.class);
private volatile Set<Node> workerNodes;

@Override
protected boolean hasStateChanged() {
requireNonNull(PrestoClusterManager.NODE_MANAGER, "nodeManager is null");
Set<Node> workerNodes = PrestoClusterManager.NODE_MANAGER.getWorkerNodes();
boolean hasChanged = !workerNodes.equals(this.workerNodes);
this.workerNodes = workerNodes;
return hasChanged;
}

@Override
public Set<String> getNodesInternal()
{
return ClusterManagerNodeGetter.getNodesInternal(PrestoClusterManager.NODE_MANAGER);
}

@Override
public ClusterType getClusterType()
{
return ClusterType.PRESTOSQL_CLUSTER_MANAGER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright (c) 2019. Qubole Inc
* Licensed under the Apache License, Version 2.0 (the License);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package com.qubole.rubix.prestosql;

import com.qubole.rubix.spi.ClusterManager;
import org.apache.hadoop.conf.Configuration;

public class TestAsyncClusterManager extends TestClusterManager {
@Override
protected ClusterManager newPrestoClusterManager(Configuration configuration) {
return new PrestoClusterManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Set;

Expand All @@ -36,7 +35,7 @@
*/

@Test(singleThreaded = true)
public class TestClusterManager
public abstract class TestClusterManager
{
private Log log = LogFactory.getLog(TestClusterManager.class);

Expand Down Expand Up @@ -123,14 +122,15 @@ private HttpServer createServer(String endpoint1, HttpHandler handler1, String e
return server;
}

abstract protected ClusterManager newPrestoClusterManager(Configuration conf);

private ClusterManager getPrestoClusterManager()
throws UnknownHostException
{
PrestoClusterManager clusterManager = new PrestoClusterManager();
Configuration conf = new Configuration();
conf.setInt(TestingNodeManager.SERVER_PORT_CONF_KEY, 45326);
conf.setInt(StandaloneNodeManager.SERVER_PORT_CONF_KEY, 45326);
ClusterManager clusterManager = newPrestoClusterManager(conf);
clusterManager.initialize(conf);
PrestoClusterManager.setNodeManager(new TestingNodeManager(conf, new TestingNodeManager.TestingNode(URI.create("http://" + InetAddress.getLocalHost().getHostAddress()))));
return clusterManager;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Copyright (c) 2019. Qubole Inc
* Licensed under the Apache License, Version 2.0 (the License);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package com.qubole.rubix.prestosql;

import com.qubole.rubix.spi.ClusterManager;
import org.apache.hadoop.conf.Configuration;

public class TestSyncClusterManager extends TestClusterManager {
@Override
protected ClusterManager newPrestoClusterManager(Configuration conf) {
PrestoClusterManager.setNodeManager(new StandaloneNodeManager(conf));
return new SyncPrestoClusterManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected String getCurrentNodeHostAddress()
return nodeHostAddress;
}

protected Set<String> getNodesAndUpdateState()
protected synchronized Set<String> getNodesAndUpdateState()
{
Set<String> nodes = getNodesInternal();
if (nodes == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,32 @@ public abstract class SyncClusterManager extends ClusterManager
{
private static Log log = LogFactory.getLog(SyncClusterManager.class);

private Set<String> currentNodes;
private volatile Set<String> currentNodes;

protected abstract boolean hasStateChanged();

private synchronized void updateStateIfChanged() {
private void updateStateIfChanged() {
if (hasStateChanged()) {
currentNodes = getNodesAndUpdateState();
}
}

@Override
public synchronized String locateKey(String key)
public String locateKey(String key)
{
updateStateIfChanged();
return super.locateKey(key);
}

@Override
public synchronized String getCurrentNodeName()
public String getCurrentNodeName()
{
updateStateIfChanged();
return super.getCurrentNodeName();
}
// Returns sorted list of nodes in the cluster
@Override
public synchronized Set<String> getNodes()
public Set<String> getNodes()
{
updateStateIfChanged();
return currentNodes;
Expand Down

0 comments on commit 9e0b728

Please sign in to comment.