Skip to content

Commit

Permalink
Add new intercept for QueryHandler.parse which is new in 4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tjake authored and Eduard Tudenhöfner committed Jul 22, 2020
1 parent 84e691d commit 3efdc84
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.datastax.mgmtapi;

import com.datastax.mgmtapi.interceptors.QueryHandlerInterceptor4x;
import com.datastax.mgmtapi.interceptors.SystemDistributedReplicationInterceptor;
import com.datastax.mgmtapi.interceptors.CassandraDaemonInterceptor;
import com.datastax.mgmtapi.interceptors.CassandraRoleManagerInterceptor;
Expand Down Expand Up @@ -32,6 +33,9 @@ public static void premain(String arg, Instrumentation inst) throws Exception {
//Query Handler
.type(QueryHandlerInterceptor.type())
.transform(QueryHandlerInterceptor.transformer())
//Query Handler 4.0
.type(QueryHandlerInterceptor4x.type())
.transform(QueryHandlerInterceptor4x.transformer())
//Seed Reload support
.type(GossiperInterceptor.type())
.transform(GossiperInterceptor.transformer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ private static Optional<CassandraAPI> maybeGetDseAPI()
}
catch (Exception e)
{
LOGGER.warn(String.format("No DSE API Shim found for DSE Version %s. Error was: %s", dseVersion, e.getMessage()), e);
if (!(e instanceof NoSuchMethodException))
LOGGER.warn(String.format("No DSE API Shim found for DSE Version %s. Error was: %s", dseVersion, e.getMessage()), e);
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.mgmtapi.ShimLoader;
import com.datastax.mgmtapi.rpc.RpcMethod;
import com.datastax.mgmtapi.rpc.RpcRegistry;
import com.datastax.mgmtapi.shims.RpcStatementShim;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.dynamic.DynamicType;
Expand All @@ -40,8 +41,8 @@
public class QueryHandlerInterceptor
{
private static final Logger logger = LoggerFactory.getLogger(QueryHandlerInterceptor.class);
private static final String handlePrefix = "CALL " + NodeOpsProvider.RPC_CLASS_NAME + ".";
private static final Pattern opsPattern = Pattern.compile("^CALL NodeOps\\.([^\\(]+)\\(([^\\)]*)\\)");
static final String handlePrefix = "CALL " + NodeOpsProvider.RPC_CLASS_NAME + ".";
static final Pattern opsPattern = Pattern.compile("^CALL NodeOps\\.([^\\(]+)\\(([^\\)]*)\\)");

public static ElementMatcher<? super TypeDescription> type()
{
Expand All @@ -63,25 +64,37 @@ public DynamicType.Builder<?> transform(DynamicType.Builder<?> builder, TypeDesc
@RuntimeType
public static Object intercept(@AllArguments Object[] allArguments, @SuperCall Callable<Object> zuper) throws Throwable
{
if (allArguments.length > 0 && allArguments[0] != null && allArguments[0] instanceof String)
if (allArguments.length > 0 && allArguments[0] != null)
{
String query = (String) allArguments[0];
if (query.startsWith(handlePrefix))
if (allArguments[0] instanceof String)
{
QueryState state = (QueryState) allArguments[1];
QueryOptions options = (QueryOptions) allArguments[2];

if (state.getClientState().isInternal)
String query = (String) allArguments[0];
if (query.startsWith(handlePrefix))
{
Matcher m = opsPattern.matcher(query);
if (m.matches())
QueryState state = (QueryState) allArguments[1];
QueryOptions options = (QueryOptions) allArguments[2];

if (state.getClientState().isInternal)
{
return handle(state.getClientState(), options, NodeOpsProvider.RPC_CLASS_NAME, m.group(1), m.group(2).trim().isEmpty() ? new String[]{} : m.group(2).split("\\s*,\\s*"));
Matcher m = opsPattern.matcher(query);
if (m.matches())
{
return handle(state.getClientState(), options, NodeOpsProvider.RPC_CLASS_NAME, m.group(1), m.group(2).trim().isEmpty() ? new String[]{} : m.group(2).split("\\s*,\\s*"));
}
}
}
}
else if (allArguments[0] instanceof RpcStatementShim)
{
RpcStatementShim statement = (RpcStatementShim) allArguments[0];
QueryState state = (QueryState) allArguments[1];
QueryOptions options = (QueryOptions) allArguments[2];

return handle(state.getClientState(), options, NodeOpsProvider.RPC_CLASS_NAME, statement.getMethod(), statement.getParams());
}
}


return zuper.call();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright DataStax, Inc.
*
* Please see the included license file for details.
*/
package com.datastax.mgmtapi.interceptors;

import java.util.concurrent.Callable;
import java.util.regex.Matcher;

import com.datastax.mgmtapi.ShimLoader;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import net.bytebuddy.matcher.ElementMatcher;
import net.bytebuddy.matcher.ElementMatchers;
import net.bytebuddy.utility.JavaModule;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.service.QueryState;

import static com.datastax.mgmtapi.interceptors.QueryHandlerInterceptor.handlePrefix;
import static com.datastax.mgmtapi.interceptors.QueryHandlerInterceptor.opsPattern;

public class QueryHandlerInterceptor4x
{
public static ElementMatcher<? super TypeDescription> type()
{
return ElementMatchers.isSubTypeOf(QueryHandler.class);
}

public static AgentBuilder.Transformer transformer()
{
return new AgentBuilder.Transformer()
{
@Override
public DynamicType.Builder<?> transform(DynamicType.Builder<?> builder, TypeDescription typeDescription, ClassLoader classLoader, JavaModule javaModule)
{
return builder.method(ElementMatchers.named("parse")).intercept(MethodDelegation.to(QueryHandlerInterceptor4x.class));
}
};
}

@RuntimeType
public static Object intercept(@AllArguments Object[] allArguments, @SuperCall Callable<Object> zuper) throws Throwable
{
if (allArguments.length > 0 && allArguments[0] != null && allArguments[0] instanceof String)
{
String query = (String) allArguments[0];
if (query.startsWith(handlePrefix))
{
QueryState state = (QueryState) allArguments[1];

if (state.getClientState().isInternal)
{
Matcher m = opsPattern.matcher(query);
if (m.matches())
{
return ShimLoader.instance.get().makeRpcStatement(m.group(1), m.group(2).trim().isEmpty() ? new String[]{} : m.group(2).split("\\s*,\\s*"));
}
}
}
}

return zuper.call();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.netty.channel.ChannelInitializer;
import org.apache.cassandra.auth.IRoleManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ConsistencyLevel;
Expand Down Expand Up @@ -66,4 +67,6 @@ default Object handleRpcResult(Callable<Object> rpcResult) throws Exception
}

String getLocalDataCenter();

RpcStatementShim makeRpcStatement(String method, String[] params);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Copyright DataStax, Inc.
*
* Please see the included license file for details.
*/
package com.datastax.mgmtapi.shims;

import org.apache.cassandra.cql3.CQLStatement;

public interface RpcStatementShim extends CQLStatement
{
String getMethod();
String[] getParams();
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,17 @@ public static Iterable<String[]> functions()
return l;
}

public BaseDockerIntegrationTest(String version)
public BaseDockerIntegrationTest(String version) throws IOException
{
this.version = version;

//If run without forking we need to start a new version
if (docker != null)
{
temporaryFolder.delete();
temporaryFolder.create();
docker.startManagementAPI(version, getEnvironmentVars());
}
}

@BeforeClass
Expand Down Expand Up @@ -126,8 +134,11 @@ public static void teardown()
@Before
public void before()
{
systemLogGrabber.dockerHelper = docker;
docker.startManagementAPI(version, getEnvironmentVars());
if (!docker.started())
{
systemLogGrabber.dockerHelper = docker;
docker.startManagementAPI(version, getEnvironmentVars());
}
}

protected ArrayList<String> getEnvironmentVars()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public static boolean waitForPort(String hostname, int port, Duration timeout, L
return false;
}

public boolean started()
{
return container != null;
}

private String startDocker(File dockerFile, File baseDir, String name, List<Integer> ports, List<String> volumeDescList, List<String> envList, List<String> cmdList)
{
ListContainersCmd listContainersCmd = dockerClient.listContainersCmd();
Expand Down Expand Up @@ -281,6 +286,7 @@ public void stopManagementAPI()
{
dockerClient.stopContainerCmd(container).exec();
dockerClient.removeContainerCmd(container).exec();
container = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.slf4j.LoggerFactory;

import com.datastax.mgmtapi.shims.CassandraAPI;
import com.datastax.mgmtapi.shims.RpcStatementShim;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import org.apache.cassandra.auth.IRoleManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.db.Keyspace;
Expand Down Expand Up @@ -297,4 +299,10 @@ public String getLocalDataCenter()
{
return DatabaseDescriptor.getLocalDataCenter();
}

@Override
public RpcStatementShim makeRpcStatement(String method, String[] params)
{
throw new UnsupportedOperationException();
}
}
2 changes: 1 addition & 1 deletion management-api-shim-4.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<artifactId>datastax-mgmtapi-shim-4.x</artifactId>

<properties>
<cassandra.version>4.0-alpha2</cassandra.version>
<cassandra.version>4.0-beta1</cassandra.version>
<bytebuddy.version>1.9.15</bytebuddy.version>
<docker.java.version>3.1.5</docker.java.version>
<driver.version>4.4.0</driver.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.slf4j.LoggerFactory;

import com.datastax.mgmtapi.shims.CassandraAPI;
import com.datastax.mgmtapi.shims.RpcStatementShim;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import org.apache.cassandra.auth.IRoleManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.db.Keyspace;
Expand Down Expand Up @@ -301,4 +303,10 @@ public String getLocalDataCenter()
{
return DatabaseDescriptor.getLocalDataCenter();
}

@Override
public RpcStatementShim makeRpcStatement(String method, String[] params)
{
return new RpcStatement(method, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.datastax.mgmtapi.shim;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import com.datastax.mgmtapi.shims.RpcStatementShim;
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;

public class RpcStatement implements RpcStatementShim
{
private final String method;
private final String[] params;

public RpcStatement(String method, String[] params)
{
this.method = method;
this.params = params;
}

@Override
public void authorize(ClientState clientState)
{

}

@Override
public void validate(ClientState clientState)
{

}

@Override
public ResultMessage execute(QueryState queryState, QueryOptions queryOptions, long l)
{
return new ResultMessage.Void();
}

@Override
public ResultMessage executeLocally(QueryState queryState, QueryOptions queryOptions)
{
return new ResultMessage.Void();
}

@Override
public AuditLogContext getAuditLogContext()
{
return null;
}

@Override
public String getMethod()
{
return method;
}

@Override
public String[] getParams()
{
return params;
}
}
Loading

0 comments on commit 3efdc84

Please sign in to comment.