Skip to content

Commit

Permalink
Add endpoint that allows access to streaming info (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduard Tudenhöfner committed May 14, 2020
1 parent ce146ec commit a2a6397
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.mgmtapi.rpc.Rpc;
import com.datastax.mgmtapi.rpc.RpcParam;
import com.datastax.mgmtapi.rpc.RpcRegistry;
Expand All @@ -25,33 +31,10 @@
import org.apache.cassandra.auth.RoleOptions;
import org.apache.cassandra.auth.RoleResource;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.GossiperInterceptor;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

/**
* Replace JMX calls with CQL 'CALL' methods via the the Rpc framework
Expand Down Expand Up @@ -336,4 +319,9 @@ public List<Map<String,String>> getEndpointStates()
return ShimLoader.instance.get().getEndpointStates();
}

@Rpc(name = "getStreamInfo", permission = Permission.EXECUTE)
public List<Map<String, List<Map<String, String>>>> getStreamInfo()
{
return ShimLoader.instance.get().getStreamInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface CassandraAPI
ChannelInitializer<Channel> makeSocketInitializer(final Server.ConnectionTracker connectionTracker);

List<Map<String,String>> getEndpointStates();

List<Map<String, List<Map<String, String>>>> getStreamInfo();
}
122 changes: 68 additions & 54 deletions management-api-server/doc/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,29 @@
}
}
},
"/api/v0/ops/keyspace/cleanup" : {
"post" : {
"summary" : "Triggers the immediate cleanup of keys no longer belonging to a node. By default, clean all keyspaces",
"operationId" : "cleanup",
"requestBody" : {
"content" : {
"application/json" : {
"schema" : {
"$ref" : "#/components/schemas/KeyspaceRequest"
}
}
}
},
"responses" : {
"default" : {
"description" : "default response",
"content" : {
"text/plain" : { }
}
}
}
}
},
"/api/v0/ops/keyspace/refresh" : {
"post" : {
"summary" : "Load newly placed SSTables to the system without restart",
Expand Down Expand Up @@ -145,29 +168,6 @@
}
}
},
"/api/v0/ops/keyspace/cleanup" : {
"post" : {
"summary" : "Triggers the immediate cleanup of keys no longer belonging to a node. By default, clean all keyspaces",
"operationId" : "cleanup",
"requestBody" : {
"content" : {
"application/json" : {
"schema" : {
"$ref" : "#/components/schemas/KeyspaceRequest"
}
}
}
},
"responses" : {
"default" : {
"description" : "default response",
"content" : {
"text/plain" : { }
}
}
}
}
},
"/api/v0/lifecycle/start" : {
"post" : {
"description" : "Starts Cassandra",
Expand Down Expand Up @@ -281,10 +281,17 @@
}
}
},
"/api/v0/ops/node/drain" : {
"/api/v0/ops/node/decommission" : {
"post" : {
"summary" : "Drain the node (stop accepting writes and flush all tables)",
"operationId" : "drain",
"summary" : "Decommission the *node I am connecting to*",
"operationId" : "decommission",
"parameters" : [ {
"name" : "force",
"in" : "query",
"schema" : {
"type" : "boolean"
}
} ],
"responses" : {
"default" : {
"description" : "default response",
Expand Down Expand Up @@ -414,17 +421,24 @@
}
}
},
"/api/v0/ops/node/decommission" : {
"post" : {
"summary" : "Decommission the *node I am connecting to*",
"operationId" : "decommission",
"parameters" : [ {
"name" : "force",
"in" : "query",
"schema" : {
"type" : "boolean"
"/api/v0/ops/node/streaminfo" : {
"get" : {
"summary" : "Retrieve Streaming status information",
"operationId" : "getStreamInfo",
"responses" : {
"default" : {
"description" : "default response",
"content" : {
"application/json" : { }
}
}
} ],
}
}
},
"/api/v0/ops/node/drain" : {
"post" : {
"summary" : "Drain the node (stop accepting writes and flush all tables)",
"operationId" : "drain",
"responses" : {
"default" : {
"description" : "default response",
Expand All @@ -435,15 +449,15 @@
}
}
},
"/api/v0/ops/tables/compact" : {
"/api/v0/ops/tables/flush" : {
"post" : {
"summary" : "Force a (major) compaction on one or more tables or user-defined compaction on given SSTables",
"operationId" : "compact",
"summary" : "Flush one or more tables",
"operationId" : "flush",
"requestBody" : {
"content" : {
"application/json" : {
"schema" : {
"$ref" : "#/components/schemas/CompactRequest"
"$ref" : "#/components/schemas/KeyspaceRequest"
}
}
}
Expand All @@ -458,22 +472,15 @@
}
}
},
"/api/v0/ops/tables/garbagecollect" : {
"/api/v0/ops/tables/compact" : {
"post" : {
"summary" : "Remove deleted data from one or more tables",
"operationId" : "garbageCollect",
"parameters" : [ {
"name" : "tombstoneOption",
"in" : "query",
"schema" : {
"type" : "string"
}
} ],
"summary" : "Force a (major) compaction on one or more tables or user-defined compaction on given SSTables",
"operationId" : "compact",
"requestBody" : {
"content" : {
"application/json" : {
"schema" : {
"$ref" : "#/components/schemas/KeyspaceRequest"
"$ref" : "#/components/schemas/CompactRequest"
}
}
}
Expand Down Expand Up @@ -541,10 +548,17 @@
}
}
},
"/api/v0/ops/tables/flush" : {
"/api/v0/ops/tables/garbagecollect" : {
"post" : {
"summary" : "Flush one or more tables",
"operationId" : "flush",
"summary" : "Remove deleted data from one or more tables",
"operationId" : "garbageCollect",
"parameters" : [ {
"name" : "tombstoneOption",
"in" : "query",
"schema" : {
"type" : "string"
}
} ],
"requestBody" : {
"content" : {
"application/json" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ else if (System.getenv("CASSANDRA_HOME") != null)
}
catch (IllegalArgumentException e)
{
logger.debug("Error encountered:", e);
logger.error("Error encountered:", e);
logger.error("Unable to start: unable to find or execute bin/cassandra " + (cassandra_home == null ? "use -C" : cassandra_home));
System.exit(3);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
*/
package com.datastax.mgmtapi.resources;

import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

Expand All @@ -19,6 +21,7 @@
import org.apache.commons.lang3.StringUtils;

import com.datastax.oss.driver.api.core.NoNodeAvailableException;
import com.datastax.oss.driver.api.core.cql.Row;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
Expand Down Expand Up @@ -225,6 +228,25 @@ public Response reloadLocalSchema()
});
}

@GET
@Path("/streaminfo")
@Produces(MediaType.APPLICATION_JSON)
@Operation(summary = "Retrieve Streaming status information")
public Response getStreamInfo()
{
return handle(() ->
{
Row row = cqlService.executeCql(app.cassandraUnixSocketFile, "CALL NodeOps.getStreamInfo()").one();

Object queryResponse = null;
if (row != null)
{
queryResponse = row.getObject(0);
}
return Response.ok(Entity.json(queryResponse)).build();
});
}

static Response handle(Callable<Response> action)
{
try
Expand Down
Loading

0 comments on commit a2a6397

Please sign in to comment.