diff --git a/.travis.yml b/.travis.yml
index 055e62e1e..f8dfa42d1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -46,88 +46,104 @@ jobs:
fast_finish: true
allow_failures:
- env: TEST_TYPE=docker
+ - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk
+ - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1
+ - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=2
+ - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=4
include:
- stage: Integration Tests
- name: Memory, H2 and Postgresql reaping Cassandra 1.2.19
- env: TEST_TYPE=ccm CASSANDRA_VERSION=1.2.19 CUCUMBER_OPTIONS="--tags ~@cassandra_2_1_onwards --tags ~@cassandra_4_0_onwards"
+ name: Memory and H2 reaping Cassandra 1.2.19
+ env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=1.2.19 CUCUMBER_OPTIONS="--tags ~@cassandra_2_1_onwards --tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Memory, H2 and Postgresql reaping Cassandra 2.0.17
- env: TEST_TYPE=ccm CASSANDRA_VERSION=2.0.17 CUCUMBER_OPTIONS="--tags ~@cassandra_2_1_onwards --tags ~@cassandra_4_0_onwards"
+ name: Memory and H2 reaping Cassandra 2.0.17
+ env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=2.0.17 CUCUMBER_OPTIONS="--tags ~@cassandra_2_1_onwards --tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Memory, H2 and Postgresql reaping Cassandra 2.1.20
- env: TEST_TYPE=ccm CASSANDRA_VERSION=2.1.20 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ name: Memory and H2 reaping Cassandra 2.1.20
+ env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=2.1.20 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Memory, H2 and Postgresql reaping Cassandra 2.2.13
- env: TEST_TYPE=ccm CASSANDRA_VERSION=2.2.13 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ name: Memory and H2 reaping Cassandra 2.2.13
+ env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=2.2.13 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Memory, H2 and Postgresql reaping Cassandra 3.0.17
- env: TEST_TYPE=ccm CASSANDRA_VERSION=3.0.17 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ name: Memory and H2 reaping Cassandra 3.0.17
+ env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=3.0.17 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Memory, H2 and Postgresql reaping Cassandra 3.11.4
- env: TEST_TYPE=ccm CASSANDRA_VERSION=3.11.4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ name: Memory and H2 reaping Cassandra 3.11.4
+ env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=3.11.4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Memory, H2 and Postgresql reaping Cassandra 4.0
- env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk
+ name: Memory and H2 reaping Cassandra 4.0
+ env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=github:apache/trunk
- stage: Integration Tests
- name: Cassandra 2.1.20
- env: TEST_TYPE=ccm CASSANDRA_VERSION=2.1.20 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ name: Cassandra reaping Cassandra 2.1.20
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.1.20 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Cassandra 2.2.13
- env: TEST_TYPE=ccm CASSANDRA_VERSION=2.2.13 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ name: Cassandra reaping Cassandra 2.2.13
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.2.13 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Cassandra 3.0.17
- env: TEST_TYPE=ccm CASSANDRA_VERSION=3.0.17 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ name: Cassandra reaping Cassandra 3.0.17
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.0.17 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Cassandra 3.11.3
- env: TEST_TYPE=ccm CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ name: Cassandra reaping Cassandra 3.11.3
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Integration Tests
- name: Cassandra 4.0
- env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1
+ name: Cassandra reaping Cassandra 4.0
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1
+ - stage: Integration Tests
+ name: Postgresql reaping Cassandra 3.11.3
+ env: TEST_TYPE=ccm STORAGE_TYPE=postgresql CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Distributed Reaper Integration Tests
name: Two Reapers on Cassandra 2.1.20
- env: TEST_TYPE=ccm CASSANDRA_VERSION=2.1.20 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.1.20 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Distributed Reaper Integration Tests
name: Two Reapers on Cassandra 2.2.13
- env: TEST_TYPE=ccm CASSANDRA_VERSION=2.2.13 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.2.13 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Distributed Reaper Integration Tests
name: Two Reapers on Cassandra 3.0.17
- env: TEST_TYPE=ccm CASSANDRA_VERSION=3.0.17 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.0.17 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Distributed Reaper Integration Tests
name: Two Reapers on Cassandra 3.11.3
- env: TEST_TYPE=ccm CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Distributed Reaper Integration Tests
name: Two Reapers on Cassandra 4.0
- env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=2
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=2
+ - stage: Distributed Reaper Integration Tests
+ name: Two Reapers on Cassandra 3.11.3 with Postgresql backend
+ env: TEST_TYPE=ccm STORAGE_TYPE=postgresql CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Sidecar Integration Tests
name: Sidecar on Cassandra 2.1.20
- env: TEST_TYPE=sidecar CASSANDRA_VERSION=2.1.20 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar"
+ env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.1.20 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar"
- stage: Sidecar Integration Tests
name: Sidecar on Cassandra 2.2.13
- env: TEST_TYPE=sidecar CASSANDRA_VERSION=2.2.13 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar"
+ env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.2.13 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar"
- stage: Sidecar Integration Tests
name: Sidecar on Cassandra 3.0.17
- env: TEST_TYPE=sidecar CASSANDRA_VERSION=3.0.17 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar"
+ env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.0.17 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar"
- stage: Sidecar Integration Tests
name: Sidecar on Cassandra 3.11.3
- env: TEST_TYPE=sidecar CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar"
+ env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar"
- stage: Sidecar Integration Tests
name: Sidecar on Cassandra 4.0
- env: TEST_TYPE=sidecar CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags @sidecar"
+ env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags @sidecar"
+ - stage: Sidecar Integration Tests
+ name: Sidecar on Cassandra 3.11.3 with Postgresql backend
+ env: TEST_TYPE=sidecar STORAGE_TYPE=postgresql CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar"
- stage: Flapping Distributed Reaper Integration Tests
name: Four Flapping Reaper on Cassandra 2.1.20
- env: TEST_TYPE=ccm CASSANDRA_VERSION=2.1.20 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.1.20 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Flapping Distributed Reaper Integration Tests
name: Four Flapping Reaper on Cassandra 2.2.13
- env: TEST_TYPE=ccm CASSANDRA_VERSION=2.2.13 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.2.13 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Flapping Distributed Reaper Integration Tests
name: Four Flapping Reaper on Cassandra 3.0.17
- env: TEST_TYPE=ccm CASSANDRA_VERSION=3.0.17 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.0.17 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Flapping Distributed Reaper Integration Tests
name: Four Flapping Reaper on Cassandra 3.11.3
- env: TEST_TYPE=ccm CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
- stage: Flapping Distributed Reaper Integration Tests
name: Four Flapping Reaper on Cassandra 4.0
- env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=4
+ env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=4
+ - stage: Flapping Distributed Reaper Integration Tests
+ name: Four Flapping Reaper on Cassandra 3.11.3 with Postgresql backend
+ env: TEST_TYPE=ccm STORAGE_TYPE=postgresql CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards"
# Upgrade Integration Tests are broken due to classloading issues around org.glassfish.jersey.internal.spi.AutoDiscoverable
# - stage: Upgrade Integration Tests
# name: Upgrading Reaper on Cassandra 2.1.20 (all_nodes_reachable)
diff --git a/src/ci/before_script.sh b/src/ci/before_script.sh
index 3980d61eb..ac8da21a4 100755
--- a/src/ci/before_script.sh
+++ b/src/ci/before_script.sh
@@ -22,10 +22,8 @@ case "${TEST_TYPE}" in
echo "ERROR: Environment variable TEST_TYPE is unspecified."
exit 1
;;
- "ccm")
- if [ "x${GRIM_MIN}" = "x" ] ; then
- psql -c 'create database reaper;' -U postgres
- fi
+ "ccm"|"sidecar")
+ psql -c 'create database reaper;' -U postgres
;;
*)
echo "Skipping, no actions for TEST_TYPE=${TEST_TYPE}."
diff --git a/src/ci/script.sh b/src/ci/script.sh
index 7cc55f580..4d1364959 100755
--- a/src/ci/script.sh
+++ b/src/ci/script.sh
@@ -41,26 +41,54 @@ case "${TEST_TYPE}" in
sleep 30
ccm status
- if [ "x${GRIM_MIN}" = "x" ]
- then
- mvn -B package
- mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperShiroIT
- mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperIT
- mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperH2IT
- mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperPostgresIT
- else
- mvn -B package -DskipTests
- mvn -B surefire:test -DsurefireArgLine="-Xmx384m" -Dtest=ReaperCassandraIT -Dgrim.reaper.min=${GRIM_MIN} -Dgrim.reaper.max=${GRIM_MAX}
- fi
+ case "${STORAGE_TYPE}" in
+ "")
+ echo "ERROR: Environment variable STORAGE_TYPE is unspecified."
+ exit 1
+ ;;
+ "local")
+ mvn -B package
+ mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperShiroIT
+ mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperIT
+ mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperH2IT
+ ;;
+ "postgresql")
+ mvn -B package -DskipTests
+ mvn -B surefire:test -DsurefireArgLine="-Xmx384m" -Dtest=ReaperPostgresIT -Dgrim.reaper.min=${GRIM_MIN} -Dgrim.reaper.max=${GRIM_MAX}
+ ;;
+ "cassandra")
+ mvn -B package -DskipTests
+ mvn -B surefire:test -DsurefireArgLine="-Xmx384m" -Dtest=ReaperCassandraIT -Dgrim.reaper.min=${GRIM_MIN} -Dgrim.reaper.max=${GRIM_MAX}
+ ;;
+ *)
+ echo "Skipping, no actions for STORAGE_TYPE=${STORAGE_TYPE}."
+ ;;
+ esac
+
;;
"sidecar")
mvn --version -B
+ mvn -B package -DskipTests
ccm start
sleep 30
ccm status
- mvn -B package -DskipTests
- mvn -B surefire:test -DsurefireArgLine="-Xmx512m" -Dtest=ReaperCassandraSidecarIT
+ case "${STORAGE_TYPE}" in
+ "")
+ echo "ERROR: Environment variable STORAGE_TYPE is unspecified."
+ exit 1
+ ;;
+ "postgresql")
+ mvn -B surefire:test -DsurefireArgLine="-Xmx512m" -Dtest=ReaperPostgresSidecarIT -Dcucumber.options="-t @sidecar"
+ ;;
+ "cassandra")
+ mvn -B surefire:test -DsurefireArgLine="-Xmx512m" -Dtest=ReaperCassandraSidecarIT -Dcucumber.options="-t @sidecar"
+ ;;
+ *)
+ echo "Skipping, no actions for STORAGE_TYPE=${STORAGE_TYPE}."
+ ;;
+ esac
+
;;
"upgrade")
mvn --version -B
diff --git a/src/server/pom.xml b/src/server/pom.xml
index 629c4ae94..7d1a5845b 100644
--- a/src/server/pom.xml
+++ b/src/server/pom.xml
@@ -296,6 +296,11 @@
0.7
test
+
+ com.ibatis
+ ibatis2-common
+ 2.1.7.597
+
diff --git a/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java b/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java
index 04c51bec1..73735bab3 100644
--- a/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java
+++ b/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java
@@ -75,6 +75,7 @@
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.MetricsServlet;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.HttpClient;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.session.SessionHandler;
@@ -441,9 +442,18 @@ private IStorage initializeStorage(ReaperApplicationConfiguration config, Enviro
|| "database".equalsIgnoreCase(config.getStorageType())) {
// create DBI instance
final DBIFactory factory = new DBIFactory();
-
- // instanciate store
- storage = new PostgresStorage(factory.build(environment, config.getDataSourceFactory(), "postgresql"));
+ if (StringUtils.isEmpty(config.getDataSourceFactory().getDriverClass())
+ && "postgres".equalsIgnoreCase(config.getStorageType())) {
+ config.getDataSourceFactory().setDriverClass("org.postgresql.Driver");
+ } else if (StringUtils.isEmpty(config.getDataSourceFactory().getDriverClass())
+ && "h2".equalsIgnoreCase(config.getStorageType())) {
+ config.getDataSourceFactory().setDriverClass("org.h2.Driver");
+ }
+ // instantiate store
+ storage = new PostgresStorage(
+ context.reaperInstanceId,
+ factory.build(environment, config.getDataSourceFactory(), "postgresql")
+ );
initDatabase(config);
} else {
LOG.error("invalid storageType: {}", config.getStorageType());
diff --git a/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java b/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java
index 6e7db6d45..bc1dab417 100644
--- a/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java
+++ b/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java
@@ -21,6 +21,7 @@
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.RepairRun;
+import io.cassandrareaper.storage.IDistributedStorage;
import java.util.Collection;
import java.util.List;
@@ -73,6 +74,7 @@ public Integer purgeDatabase() throws ReaperException {
}
}
}
+ purgeMetrics();
return purgedRuns;
}
@@ -130,4 +132,19 @@ private int purgeRepairRunsByDate(Collection repairRuns) {
return purgedRuns.get();
}
+
+ /**
+ * Purges all expired metrics from storage. Expiration time is a property of the storage, stored either in
+ * the schema itself for databases with TTL or in the storage instance for databases which must be purged manually
+ */
+ private void purgeMetrics() {
+ if (context.storage instanceof IDistributedStorage) {
+ IDistributedStorage storage = ((IDistributedStorage) context.storage);
+ if (context.config.isInSidecarMode()) {
+ storage.purgeMetrics();
+ storage.purgeNodeOperations();
+ }
+ storage.purgeNodeMetrics();
+ }
+ }
}
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java
index 77cc8cc5a..f1e77f7cb 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java
@@ -1522,6 +1522,9 @@ public void deleteNodeMetrics(UUID runId, String node) {
session.executeAsync(delNodeMetricsByNodePrepStmt.bind(minute, runId, node));
}
+ @Override
+ public void purgeNodeMetrics() {}
+
private static NodeMetrics createNodeMetrics(Row row) {
return NodeMetrics.builder()
.withNode(row.getString("node"))
@@ -1830,6 +1833,9 @@ public void storeMetric(GenericMetric metric) {
metric.getValue()));
}
+ @Override
+ public void purgeMetrics() {}
+
@Override
public void storeOperations(String clusterName, OpType operationType, String host, String operationsJson) {
session.executeAsync(
@@ -1853,4 +1859,7 @@ public String listOperations(String clusterName, OpType operationType, String ho
: operations.one().getString("data");
}
+ @Override
+ public void purgeNodeOperations() {}
+
}
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java
index 12584e3ed..2906e7948 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java
@@ -80,4 +80,19 @@ List getMetrics(
void storeOperations(String clusterName, OpType operationType, String host, String operationsJson);
String listOperations(String clusterName, OpType operationType, String host);
+
+ /**
+ * Purges old node metrics from the database (no-op for databases with TTL)
+ */
+ void purgeNodeMetrics();
+
+ /**
+ * Purges old metrics from the database (no-op for databases w/ TTL)
+ */
+ void purgeMetrics();
+
+ /**
+ * Purges old node operation info from the database (no-op for databases w/ TTL)
+ */
+ void purgeNodeOperations();
}
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java
index 1cdeb7b10..6ff9564ff 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java
@@ -17,8 +17,11 @@
package io.cassandrareaper.storage;
+import io.cassandrareaper.AppContext;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.DiagEventSubscription;
+import io.cassandrareaper.core.GenericMetric;
+import io.cassandrareaper.core.NodeMetrics;
import io.cassandrareaper.core.RepairRun;
import io.cassandrareaper.core.RepairSchedule;
import io.cassandrareaper.core.RepairSegment;
@@ -30,6 +33,8 @@
import io.cassandrareaper.service.RingRange;
import io.cassandrareaper.storage.postgresql.BigIntegerArgumentFactory;
import io.cassandrareaper.storage.postgresql.IStoragePostgreSql;
+import io.cassandrareaper.storage.postgresql.InstantArgumentFactory;
+import io.cassandrareaper.storage.postgresql.JdbiExceptionUtil;
import io.cassandrareaper.storage.postgresql.LongCollectionSqlTypeArgumentFactory;
import io.cassandrareaper.storage.postgresql.PostgresArrayArgumentFactory;
import io.cassandrareaper.storage.postgresql.PostgresRepairSegment;
@@ -40,6 +45,8 @@
import io.cassandrareaper.storage.postgresql.UuidArgumentFactory;
import io.cassandrareaper.storage.postgresql.UuidUtil;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -48,9 +55,11 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;
+import java.util.stream.Collectors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -59,23 +68,56 @@
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.exceptions.DBIException;
+import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements the StorageAPI using PostgreSQL database.
*/
-public final class PostgresStorage implements IStorage {
+public class PostgresStorage implements IStorage, IDistributedStorage {
private static final Logger LOG = LoggerFactory.getLogger(PostgresStorage.class);
+ private static final int DEFAULT_LEADER_TIMEOUT_MIN = 10; // pulled value from Cassandra DDL
+ private static final int DEFAULT_REAPER_TIMEOUT_MIN = 3; // pulled value from Cassandra DDL
+ private static final int DEFAULT_METRICS_TIMEOUT_MIN = 14400; // pulled value from Cassandra DDL (10 days in minutes)
+ private static final int DEFAULT_NODE_OPERATIONS_TIMEOUT_MIN = 5; // pulled value from Cassandra DDL
+
+ protected final DBI jdbi;
+ private final Duration leaderTimeout;
+ private final Duration reaperTimeout;
+ private final Duration metricsTimeout;
+ private final Duration nodeOperationsTimeout;
+ private final UUID reaperInstanceId;
+
+
+ public PostgresStorage(UUID reaperInstanceId, DBI jdbi) {
+ this(
+ reaperInstanceId,
+ jdbi,
+ DEFAULT_LEADER_TIMEOUT_MIN,
+ DEFAULT_REAPER_TIMEOUT_MIN,
+ DEFAULT_METRICS_TIMEOUT_MIN,
+ DEFAULT_NODE_OPERATIONS_TIMEOUT_MIN
+ );
+ }
- private final DBI jdbi;
-
- public PostgresStorage(DBI jdbi) {
+ @VisibleForTesting
+ public PostgresStorage(UUID reaperInstanceId,
+ DBI jdbi,
+ int leaderTimeoutInMinutes,
+ int reaperTimeoutInMinutes,
+ int metricsTimeoutInMinutes,
+ int nodeOperationsTimeoutInMinutes) {
+ this.reaperInstanceId = reaperInstanceId;
this.jdbi = jdbi;
+ leaderTimeout = Duration.ofMinutes(leaderTimeoutInMinutes);
+ reaperTimeout = Duration.ofMinutes(reaperTimeoutInMinutes);
+ metricsTimeout = Duration.ofMinutes(metricsTimeoutInMinutes);
+ nodeOperationsTimeout = Duration.ofMinutes(nodeOperationsTimeoutInMinutes);
}
- private static IStoragePostgreSql getPostgresStorage(Handle handle) {
+ protected static IStoragePostgreSql getPostgresStorage(Handle handle) {
handle.registerArgumentFactory(new LongCollectionSqlTypeArgumentFactory());
handle.registerArgumentFactory(new PostgresArrayArgumentFactory());
handle.registerArgumentFactory(new RunStateArgumentFactory());
@@ -84,6 +126,7 @@ private static IStoragePostgreSql getPostgresStorage(Handle handle) {
handle.registerArgumentFactory(new BigIntegerArgumentFactory());
handle.registerArgumentFactory(new ScheduleStateArgumentFactory());
handle.registerArgumentFactory(new UuidArgumentFactory());
+ handle.registerArgumentFactory(new InstantArgumentFactory());
return handle.attach(IStoragePostgreSql.class);
}
@@ -148,19 +191,26 @@ public boolean addCluster(Cluster cluster) {
Cluster result = null;
try (Handle h = jdbi.open()) {
String properties = new ObjectMapper().writeValueAsString(cluster.getProperties());
-
- int rowsAdded = getPostgresStorage(h).insertCluster(
- cluster.getName(),
- cluster.getPartitioner().get(),
- cluster.getSeedHosts(),
- properties,
- cluster.getState().name(),
- java.sql.Date.valueOf(cluster.getLastContact()));
-
- if (rowsAdded < 1) {
+ try {
+ int rowsAdded = getPostgresStorage(h).insertCluster(
+ cluster.getName(),
+ cluster.getPartitioner().get(),
+ cluster.getSeedHosts(),
+ properties,
+ cluster.getState().name(),
+ java.sql.Date.valueOf(cluster.getLastContact()));
+
+ if (rowsAdded < 1) {
+ LOG.warn("failed inserting cluster with name: {}", cluster.getName());
+ } else {
+ result = cluster; // no created id, as cluster name used for primary key
+ }
+ } catch (UnableToExecuteStatementException e) {
+ if (JdbiExceptionUtil.isDuplicateKeyError(e)) {
+ LOG.warn("cluster with name: {} already exists; updating instead", cluster.getName());
+ return updateCluster(cluster);
+ }
LOG.warn("failed inserting cluster with name: {}", cluster.getName());
- } else {
- result = cluster; // no created id, as cluster name used for primary key
}
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
@@ -632,4 +682,394 @@ public boolean deleteEventSubscription(UUID id) {
return getPostgresStorage(h).deleteEventSubscription(UuidUtil.toSequenceId(id)) > 0;
}
}
+
+ @Override
+ public boolean takeLead(UUID leaderId) {
+ return takeLead(leaderId, leaderTimeout);
+ }
+
+ @Override
+ public boolean takeLead(UUID leaderId, int ttl) {
+ Duration leadTimeout = Duration.ofSeconds(ttl);
+ return takeLead(leaderId, leadTimeout);
+ }
+
+ private boolean takeLead(UUID leaderId, Duration ttl) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ try {
+ int rowsInserted = getPostgresStorage(h).insertLeaderEntry(
+ leaderId,
+ reaperInstanceId,
+ AppContext.REAPER_INSTANCE_ADDRESS
+ );
+ if (rowsInserted == 1) { // insert should modify exactly 1 row
+ return true;
+ }
+ } catch (UnableToExecuteStatementException e) {
+ if (JdbiExceptionUtil.isDuplicateKeyError(e)) {
+ // if it's a duplicate key error, then try to update it
+ int rowsUpdated = getPostgresStorage(h).updateLeaderEntry(
+ leaderId,
+ reaperInstanceId,
+ AppContext.REAPER_INSTANCE_ADDRESS,
+ getExpirationTime(ttl)
+ );
+ if (rowsUpdated == 1) { // if row updated, took ownership from an expired leader
+ LOG.debug("Took lead from expired entry for segment {}", leaderId);
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+ }
+ LOG.warn("Unknown error occurred while taking lead on segment {}", leaderId);
+ return false;
+ }
+
+ @Override
+ public boolean renewLead(UUID leaderId) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ int rowsUpdated = getPostgresStorage(h).renewLead(
+ leaderId,
+ reaperInstanceId,
+ AppContext.REAPER_INSTANCE_ADDRESS
+ );
+
+ if (rowsUpdated == 1) {
+ LOG.debug("Renewed lead on segment {}", leaderId);
+ return true;
+ }
+ LOG.error("Failed to renew lead on segment {}", leaderId);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean renewLead(UUID leaderId, int ttl) {
+ return renewLead(leaderId);
+ }
+
+ @Override
+ public List getLeaders() {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ List leaderSequenceIds = getPostgresStorage(h).getLeaders(getExpirationTime(leaderTimeout));
+ return leaderSequenceIds
+ .stream()
+ .map(UuidUtil::fromSequenceId)
+ .collect(Collectors.toList());
+ }
+ }
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void releaseLead(UUID leaderId) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ int rowsDeleted = getPostgresStorage(h).releaseLead(
+ leaderId,
+ reaperInstanceId
+ );
+ if (rowsDeleted == 1) {
+ LOG.debug("Released lead on segment {}", leaderId);
+ } else {
+ LOG.error("Could not release lead on segment {}", leaderId);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void saveHeartbeat() {
+ beat();
+ deleteOldReapers();
+ }
+
+ @Override
+ public int countRunningReapers() {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ return getPostgresStorage(h).countRunningReapers(getExpirationTime(reaperTimeout));
+ }
+ }
+ LOG.warn("Failed to get running reaper count from storage");
+ return 1;
+ }
+
+ @Override
+ public void storeNodeMetrics(UUID runId, NodeMetrics nodeMetrics) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ getPostgresStorage(h).storeNodeMetrics(
+ UuidUtil.toSequenceId(runId),
+ nodeMetrics.getNode(),
+ nodeMetrics.getCluster(),
+ nodeMetrics.getDatacenter(),
+ nodeMetrics.isRequested(),
+ nodeMetrics.getPendingCompactions(),
+ nodeMetrics.hasRepairRunning(),
+ nodeMetrics.getActiveAnticompactions()
+ );
+ }
+ }
+ }
+
+ @Override
+ public Collection getNodeMetrics(UUID runId) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ Instant expirationTime = getExpirationTime(reaperTimeout);
+ return getPostgresStorage(h).getNodeMetrics(
+ UuidUtil.toSequenceId(runId),
+ expirationTime
+ );
+ }
+ }
+ return new ArrayList<>();
+ }
+
+ @Override
+ public Optional getNodeMetrics(UUID runId, String node) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ Instant expirationTime = getExpirationTime(reaperTimeout);
+ NodeMetrics nm = getPostgresStorage(h).getNodeMetricsByNode(
+ UuidUtil.toSequenceId(runId),
+ expirationTime,
+ node
+ );
+ if (nm != null) {
+ return Optional.of(nm);
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public void deleteNodeMetrics(UUID runId, String node) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ getPostgresStorage(h).deleteNodeMetricsByNode(
+ UuidUtil.toSequenceId(runId),
+ node
+ );
+ }
+ }
+ }
+
+ @Override
+ public void purgeNodeMetrics() {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ Instant expirationTime = getExpirationTime(reaperTimeout);
+ getPostgresStorage(h).purgeOldNodeMetrics(expirationTime);
+ }
+ }
+ }
+
+ @Override
+ public Optional getNextFreeSegmentForRanges(
+ UUID runId,
+ Optional parallelRange,
+ List ranges) {
+ List segments
+ = Lists.newArrayList(getRepairSegmentsForRun(runId));
+ Collections.shuffle(segments);
+
+ for (RepairSegment seg : segments) {
+ if (seg.getState().equals(RepairSegment.State.NOT_STARTED) && withinRange(seg, parallelRange)) {
+ for (RingRange range : ranges) {
+ if (segmentIsWithinRange(seg, range)) {
+ LOG.debug(
+ "Segment [{}, {}] is within range [{}, {}]",
+ seg.getStartToken(),
+ seg.getEndToken(),
+ range.getStart(),
+ range.getEnd());
+ return Optional.of(seg);
+ }
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public void storeMetric(GenericMetric metric) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ Instant metricTs = Instant.ofEpochMilli(metric.getTs().getMillis());
+ IStoragePostgreSql storage = getPostgresStorage(h);
+ int rowsUpdated = storage.updateMetricSourceNodeTimestamp(
+ metric.getClusterName(),
+ metric.getHost(),
+ metricTs
+ );
+ if (rowsUpdated == 0) {
+ try {
+ storage.insertMetricSourceNode(metric.getClusterName(), metric.getHost(), metricTs);
+ } catch (UnableToExecuteStatementException e) {
+ if (!JdbiExceptionUtil.isDuplicateKeyError(e)) {
+ LOG.error("Unable to update GenericMetric source nodes table");
+ throw e;
+ }
+ }
+ }
+
+ try {
+ storage.insertMetricType(
+ metric.getMetricDomain(),
+ metric.getMetricType(),
+ metric.getMetricScope(),
+ metric.getMetricName(),
+ metric.getMetricAttribute()
+ );
+ } catch (UnableToExecuteStatementException e) {
+ if (!JdbiExceptionUtil.isDuplicateKeyError(e)) {
+ LOG.error("Unable to update GenericMetric metric types table");
+ throw e;
+ }
+ }
+ storage.insertMetric(
+ metric.getClusterName(),
+ metric.getHost(),
+ metricTs,
+ metric.getMetricDomain(),
+ metric.getMetricType(),
+ metric.getMetricScope(),
+ metric.getMetricName(),
+ metric.getMetricAttribute(),
+ metric.getValue()
+ );
+ }
+ }
+ }
+
+ @Override
+ public List getMetrics(
+ String clusterName,
+ Optional host,
+ String metricDomain,
+ String metricType,
+ long since) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ if (host.isPresent()) {
+ return new ArrayList<>(
+ getPostgresStorage(h).getMetricsForHost(
+ clusterName,
+ host.get(),
+ metricDomain,
+ metricType,
+ Instant.ofEpochMilli(since)
+ )
+ );
+ } else {
+ return new ArrayList<>(
+ getPostgresStorage(h).getMetricsForCluster(
+ clusterName,
+ metricDomain,
+ metricType,
+ Instant.ofEpochMilli(since)
+ )
+ );
+ }
+ }
+ }
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void purgeMetrics() {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ IStoragePostgreSql storage = getPostgresStorage(h);
+ Instant expirationTime = getExpirationTime(metricsTimeout);
+ storage.purgeOldMetrics(expirationTime);
+ storage.purgeOldSourceNodes(expirationTime);
+ }
+ }
+ }
+
+ @Override
+ public void storeOperations(String clusterName, OpType operationType, String host, String operationsJson) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ getPostgresStorage(h).insertOperations(clusterName, operationType.getName(), host, operationsJson);
+ }
+ }
+ }
+
+ @Override
+ public String listOperations(String clusterName, OpType operationType, String host) {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ String opString = getPostgresStorage(h).listOperations(clusterName, operationType.getName(), host);
+ return (opString != null) ? opString : "[]";
+ }
+ }
+ LOG.error("Failed retrieving node operations for cluster {}, node {}", clusterName, host);
+ return "[]";
+ }
+
+ @Override
+ public void purgeNodeOperations() {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ Instant expirationTime = getExpirationTime(nodeOperationsTimeout);
+ getPostgresStorage(h).purgeOldNodeOperations(expirationTime);
+ }
+ }
+ }
+
+ private void beat() {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ int rowsUpdated = getPostgresStorage(h).updateHeartbeat(
+ reaperInstanceId,
+ AppContext.REAPER_INSTANCE_ADDRESS
+ );
+ if (rowsUpdated == 0) {
+ LOG.debug("Creating new entry for reaper {}", reaperInstanceId);
+ int rowsInserted = getPostgresStorage(h).insertHeartbeat(
+ reaperInstanceId,
+ AppContext.REAPER_INSTANCE_ADDRESS
+ );
+ if (rowsInserted != 1) {
+ LOG.error("Failed to create entry for reaper {}", reaperInstanceId);
+ }
+ }
+ }
+ }
+ }
+
+ private void deleteOldReapers() {
+ if (null != jdbi) {
+ try (Handle h = jdbi.open()) {
+ getPostgresStorage(h).deleteOldReapers(getExpirationTime(reaperTimeout));
+ }
+ }
+ }
+
+ private void insertGenericMetricSourceNode(GenericMetric metric) {
+
+ }
+
+ private static Instant getExpirationTime(Duration timeout) {
+ return Instant.now().minus(timeout);
+ }
+
+ private static boolean segmentIsWithinRange(RepairSegment segment, RingRange range) {
+ return range.encloses(new RingRange(segment.getStartToken(), segment.getEndToken()));
+ }
+
+ private static boolean withinRange(RepairSegment segment, Optional range) {
+ return !range.isPresent() || segmentIsWithinRange(segment, range.get());
+ }
}
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/GenericMetricMapper.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/GenericMetricMapper.java
new file mode 100644
index 000000000..d9c3808c0
--- /dev/null
+++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/GenericMetricMapper.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2015-2017 Spotify AB
+ * Copyright 2016-2018 The Last Pickle Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.cassandrareaper.storage.postgresql;
+
+import io.cassandrareaper.core.GenericMetric;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+public final class GenericMetricMapper implements ResultSetMapper {
+
+ @Override
+ public GenericMetric map(int index, ResultSet rs, StatementContext ctx) throws SQLException {
+ return GenericMetric.builder()
+ .withClusterName(rs.getString("cluster"))
+ .withHost(rs.getString("host"))
+ .withMetricDomain(rs.getString("metric_domain"))
+ .withMetricType(rs.getString("metric_type"))
+ .withMetricScope(rs.getString("metric_scope"))
+ .withMetricName(rs.getString("metric_name"))
+ .withMetricAttribute(rs.getString("metric_attribute"))
+ .withValue(rs.getDouble("value"))
+ .withTs(new DateTime(rs.getTimestamp("ts")))
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/IStoragePostgreSql.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/IStoragePostgreSql.java
index f0c256ef9..5782e2a6c 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/IStoragePostgreSql.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/IStoragePostgreSql.java
@@ -19,6 +19,8 @@
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.DiagEventSubscription;
+import io.cassandrareaper.core.GenericMetric;
+import io.cassandrareaper.core.NodeMetrics;
import io.cassandrareaper.core.RepairRun;
import io.cassandrareaper.core.RepairSchedule;
import io.cassandrareaper.core.RepairSegment;
@@ -29,12 +31,14 @@
import io.cassandrareaper.service.RepairParameters;
import java.math.BigInteger;
+import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.BindBean;
@@ -263,6 +267,136 @@ public interface IStoragePostgreSql {
String SQL_DELETE_EVENT_SUBSCRIPTION_BY_ID
= "DELETE FROM diag_event_subscription WHERE id = :id";
+ // leader election
+ //
+ String SQL_INSERT_LEAD = "INSERT INTO leader (leader_id, reaper_instance_id, reaper_instance_host, last_heartbeat)"
+ + " VALUES "
+ + "(:leaderId, :reaperInstanceId, :reaperInstanceHost, now())";
+
+ String SQL_UPDATE_LEAD = "UPDATE leader "
+ + " SET "
+ + "reaper_instance_id = :reaperInstanceId, reaper_instance_host = :reaperInstanceHost, last_heartbeat = now()"
+ + " WHERE "
+ + "leader_id = :leaderId AND last_heartbeat < :expirationTime";
+
+ String SQL_RENEW_LEAD = "UPDATE leader "
+ + " SET "
+ + "reaper_instance_id = :reaperInstanceId, reaper_instance_host = :reaperInstanceHost, last_heartbeat = now()"
+ + " WHERE "
+ + "leader_id = :leaderId AND reaper_instance_id = :reaperInstanceId";
+
+ String SQL_SELECT_ACTIVE_LEADERS = "SELECT leader_id from leader"
+ + " WHERE "
+ + " last_heartbeat >= :expirationTime";
+
+ String SQL_RELEASE_LEAD = "DELETE FROM leader"
+ + " WHERE "
+ + "leader_id = :leaderId AND reaper_instance_id = :reaperInstanceId";
+
+ String SQL_FORCE_RELEASE_LEAD = "DELETE FROM leader WHERE leader_id = :leaderId";
+
+ String SQL_INSERT_HEARTBEAT = "INSERT INTO running_reapers(reaper_instance_id, reaper_instance_host, last_heartbeat)"
+ + " VALUES "
+ + "(:reaperInstanceId, :reaperInstanceHost, now())";
+
+ String SQL_UPDATE_HEARTBEAT = "UPDATE running_reapers"
+ + " SET "
+ + "reaper_instance_id = :reaperInstanceId, reaper_instance_host = :reaperInstanceHost, last_heartbeat = now()"
+ + " WHERE "
+ + "reaper_instance_id = :reaperInstanceId";
+
+ String SQL_DELETE_OLD_REAPERS = "DELETE FROM running_reapers"
+ + " WHERE "
+ + "last_heartbeat < :expirationTime";
+
+ String SQL_COUNT_RUNNING_REAPERS = "SELECT COUNT(*) FROM running_reapers"
+ + " WHERE "
+ + "last_heartbeat >= :expirationTime";
+
+ String SQL_STORE_NODE_METRICS = "INSERT INTO node_metrics_v1 (run_id,ts,node,datacenter,"
+ + "cluster,requested,pending_compactions,has_repair_running,active_anticompactions)"
+ + " VALUES "
+ + "(:runId, now(), :node, :datacenter, :cluster, :requested, :pendingCompactions, :hasRepairRunning, "
+ + ":activeAntiCompactions)";
+
+ String SQL_GET_NODE_METRICS = "SELECT * FROM node_metrics_v1"
+ + " WHERE "
+ + "run_id = :runId AND ts > :expirationTime";
+
+ String SQL_GET_NODE_METRICS_BY_NODE = "SELECT * FROM node_metrics_v1"
+ + " WHERE"
+ + " run_id = :runId AND ts > :expirationTime AND node = :node"
+ + " ORDER BY ts DESC LIMIT 1";
+
+ String SQL_DELETE_NODE_METRICS_BY_NODE = "DELETE FROM node_metrics_v1"
+ + " WHERE "
+ + " run_id = :runId AND node = :node";
+
+ String SQL_PURGE_OLD_NODE_METRICS = "DELETE FROM node_metrics_v1"
+ + " WHERE"
+ + " ts < :expirationTime";
+
+ // sidecar-mode metrics
+ //
+ String SQL_UPDATE_SOURCE_NODE_TIMESTAMP = "UPDATE node_metrics_v2_source_nodes"
+ + " SET last_updated = :timestamp"
+ + " WHERE cluster = :cluster AND host = :host";
+
+ String SQL_ADD_SOURCE_NODE = "INSERT INTO node_metrics_v2_source_nodes (cluster, host, last_updated)"
+ + " VALUES (:cluster, :host, :timestamp)";
+
+ String SQL_ADD_METRIC_TYPE = "INSERT INTO node_metrics_v2_metric_types"
+ + " (metric_domain, metric_type, metric_scope, metric_name, metric_attribute)"
+ + " VALUES"
+ + " (:metricDomain, :metricType, :metricScope, :metricName, :metricAttribute)";
+
+ String SQL_GET_SOURCE_NODE_ID = "SELECT source_node_id FROM node_metrics_v2_source_nodes"
+ + " WHERE cluster = :cluster AND host = :host";
+
+ String SQL_GET_METRIC_TYPE_ID = "SELECT metric_type_id FROM node_metrics_v2_metric_types"
+ + " WHERE"
+ + " metric_domain = :metricDomain AND metric_type = :metricType AND metric_scope = :metricScope"
+ + " AND metric_name = :metricName AND metric_attribute = :metricAttribute";
+
+ String SQL_INSERT_METRIC = "INSERT INTO node_metrics_v2 (metric_type_id, source_node_id, ts, value)"
+ + " VALUES ("
+ + " (" + SQL_GET_METRIC_TYPE_ID + "),"
+ + " (" + SQL_GET_SOURCE_NODE_ID + "),"
+ + " :timestamp,"
+ + " :value)";
+
+ String SQL_GET_METRICS_FOR_HOST = "SELECT cluster, host, metric_domain, metric_type, metric_scope, metric_name,"
+ + " metric_attribute, ts, value"
+ + " FROM node_metrics_v2"
+ + " NATURAL JOIN node_metrics_v2_metric_types"
+ + " NATURAL JOIN node_metrics_v2_source_nodes"
+ + " WHERE"
+ + " cluster = :cluster AND host = :host AND metric_domain = :metricDomain AND metric_type = :metricType"
+ + " AND ts >= :since";
+
+ String SQL_GET_METRICS_FOR_CLUSTER = "SELECT cluster, host, metric_domain, metric_type, metric_scope, metric_name,"
+ + " metric_attribute, ts, value"
+ + " FROM node_metrics_v2"
+ + " NATURAL JOIN node_metrics_v2_metric_types"
+ + " NATURAL JOIN node_metrics_v2_source_nodes"
+ + " WHERE"
+ + " cluster = :cluster AND metric_domain = :metricDomain AND metric_type = :metricType"
+ + " AND ts >= :since";
+
+ String SQL_PURGE_OLD_METRICS = "DELETE FROM node_metrics_v2 WHERE ts < :expirationTime";
+
+ String SQL_PURGE_OLD_SOURCE_NODES = "DELETE FROM node_metrics_v2_source_nodes WHERE last_updated < :expirationTime";
+
+ String SQL_INSERT_OPERATIONS = "INSERT INTO node_operations (cluster, type, host, data, ts)"
+ + " VALUES (:cluster, :type, :host, :data, now())";
+
+ String SQL_LIST_OPERATIONS = "SELECT data FROM node_operations"
+ + " WHERE"
+ + " cluster = :cluster AND type = :operationType AND host = :host"
+ + " ORDER BY ts DESC LIMIT 1";
+
+ String SQL_PURGE_OLD_NODE_OPERATIONS = "DELETE from node_operations WHERE ts < :expirationTime";
+
static String[] parseStringArray(Object obj) {
String[] values = null;
if (obj instanceof String[]) {
@@ -515,4 +649,187 @@ long insertDiagEventSubscription(
@SqlUpdate(SQL_DELETE_EVENT_SUBSCRIPTION_BY_ID)
int deleteEventSubscription(
@Bind("id") long subscriptionId);
+
+ @SqlUpdate(SQL_INSERT_LEAD)
+ int insertLeaderEntry(
+ @Bind("leaderId") UUID leaderId,
+ @Bind("reaperInstanceId") UUID reaperInstanceId,
+ @Bind("reaperInstanceHost") String reaperInstanceHost
+ );
+
+ @SqlUpdate(SQL_UPDATE_LEAD)
+ int updateLeaderEntry(
+ @Bind("leaderId") UUID leaderId,
+ @Bind("reaperInstanceId") UUID reaperInstanceId,
+ @Bind("reaperInstanceHost") String reaperInstanceHost,
+ @Bind("expirationTime") Instant expirationTime
+ );
+
+ @SqlUpdate(SQL_RENEW_LEAD)
+ int renewLead(
+ @Bind("leaderId") UUID leaderId,
+ @Bind("reaperInstanceId") UUID reaperInstanceId,
+ @Bind("reaperInstanceHost") String reaperInstanceHost
+ );
+
+ @SqlQuery(SQL_SELECT_ACTIVE_LEADERS)
+ List getLeaders(
+ @Bind("expirationTime") Instant expirationTime
+ );
+
+ @SqlUpdate(SQL_RELEASE_LEAD)
+ int releaseLead(
+ @Bind("leaderId") UUID leaderId,
+ @Bind("reaperInstanceId") UUID reaperInstanceId
+ );
+
+ @SqlUpdate(SQL_FORCE_RELEASE_LEAD)
+ int forceReleaseLead(
+ @Bind("leaderId") UUID leaderId
+ );
+
+ @SqlUpdate(SQL_INSERT_HEARTBEAT)
+ int insertHeartbeat(
+ @Bind("reaperInstanceId") UUID reaperInstanceId,
+ @Bind("reaperInstanceHost") String reaperInstanceHost
+ );
+
+ @SqlUpdate(SQL_UPDATE_HEARTBEAT)
+ int updateHeartbeat(
+ @Bind("reaperInstanceId") UUID reaperInstanceId,
+ @Bind("reaperInstanceHost") String reaperInstanceHost
+ );
+
+ @SqlUpdate(SQL_DELETE_OLD_REAPERS)
+ int deleteOldReapers(
+ @Bind("expirationTime") Instant expirationTime
+ );
+
+ @SqlQuery(SQL_COUNT_RUNNING_REAPERS)
+ int countRunningReapers(
+ @Bind("expirationTime") Instant expirationTime
+ );
+
+ @SqlUpdate(SQL_STORE_NODE_METRICS)
+ int storeNodeMetrics(
+ @Bind("runId") long runId,
+ @Bind("node") String node,
+ @Bind("cluster") String cluster,
+ @Bind("datacenter") String datacenter,
+ @Bind("requested") Boolean requested,
+ @Bind("pendingCompactions") int pendingCompactions,
+ @Bind("hasRepairRunning") Boolean hasRepairRunning,
+ @Bind("activeAntiCompactions") int activeAntiCompactions
+ );
+
+ @SqlQuery(SQL_GET_NODE_METRICS)
+ @Mapper(NodeMetricsMapper.class)
+ Collection getNodeMetrics(
+ @Bind("runId") long runId,
+ @Bind("expirationTime") Instant expirationTime
+ );
+
+ @SqlQuery(SQL_GET_NODE_METRICS_BY_NODE)
+ @Mapper(NodeMetricsMapper.class)
+ NodeMetrics getNodeMetricsByNode(
+ @Bind("runId") long runId,
+ @Bind("expirationTime") Instant expirationTime,
+ @Bind("node") String node
+ );
+
+ @SqlUpdate(SQL_DELETE_NODE_METRICS_BY_NODE)
+ int deleteNodeMetricsByNode(
+ @Bind("runId") long runId,
+ @Bind("node") String node
+ );
+
+ @SqlUpdate(SQL_PURGE_OLD_NODE_METRICS)
+ int purgeOldNodeMetrics(
+ @Bind("expirationTime") Instant expirationTime
+ );
+
+ @SqlUpdate(SQL_ADD_SOURCE_NODE)
+ int insertMetricSourceNode(
+ @Bind("cluster") String cluster,
+ @Bind("host") String host,
+ @Bind("timestamp") Instant timestamp
+ );
+
+ @SqlUpdate(SQL_UPDATE_SOURCE_NODE_TIMESTAMP)
+ int updateMetricSourceNodeTimestamp(
+ @Bind("cluster") String cluster,
+ @Bind("host") String host,
+ @Bind("timestamp") Instant timestamp
+ );
+
+ @SqlUpdate(SQL_ADD_METRIC_TYPE)
+ int insertMetricType(
+ @Bind("metricDomain") String metricDomain,
+ @Bind("metricType") String metricType,
+ @Bind("metricScope") String metricScope,
+ @Bind("metricName") String metricName,
+ @Bind("metricAttribute") String metricAttribute
+ );
+
+ @SqlUpdate(SQL_INSERT_METRIC)
+ int insertMetric(
+ @Bind("cluster") String cluster,
+ @Bind("host") String host,
+ @Bind("timestamp") Instant timestamp,
+ @Bind("metricDomain") String metricDomain,
+ @Bind("metricType") String metricType,
+ @Bind("metricScope") String metricScope,
+ @Bind("metricName") String metricName,
+ @Bind("metricAttribute") String metricAttribute,
+ @Bind("value") double value
+ );
+
+ @SqlQuery(SQL_GET_METRICS_FOR_HOST)
+ @Mapper(GenericMetricMapper.class)
+ Collection getMetricsForHost(
+ @Bind("cluster") String cluster,
+ @Bind("host") String host,
+ @Bind("metricDomain") String metricDomain,
+ @Bind("metricType") String metricType,
+ @Bind("since") Instant since
+ );
+
+ @SqlQuery(SQL_GET_METRICS_FOR_CLUSTER)
+ @Mapper(GenericMetricMapper.class)
+ Collection getMetricsForCluster(
+ @Bind("cluster") String cluster,
+ @Bind("metricDomain") String metricDomain,
+ @Bind("metricType") String metricType,
+ @Bind("since") Instant since
+ );
+
+ @SqlUpdate(SQL_PURGE_OLD_METRICS)
+ int purgeOldMetrics(
+ @Bind("expirationTime") Instant expirationTime
+ );
+
+ @SqlUpdate(SQL_PURGE_OLD_SOURCE_NODES)
+ int purgeOldSourceNodes(
+ @Bind("expirationTime") Instant expirationTime
+ );
+
+ @SqlUpdate(SQL_INSERT_OPERATIONS)
+ int insertOperations(
+ @Bind("cluster") String cluster,
+ @Bind("type") String operationType,
+ @Bind("host") String host,
+ @Bind("data") String data
+ );
+
+ @SqlQuery(SQL_LIST_OPERATIONS)
+ String listOperations(
+ @Bind("cluster") String cluster,
+ @Bind("operationType") String operationType,
+ @Bind("host") String host
+ );
+
+ @SqlUpdate(SQL_PURGE_OLD_NODE_OPERATIONS)
+ int purgeOldNodeOperations(
+ @Bind("expirationTime") Instant expirationTime
+ );
}
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/InstantArgumentFactory.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/InstantArgumentFactory.java
new file mode 100644
index 000000000..ecc4dcc4e
--- /dev/null
+++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/InstantArgumentFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright 2019-2019 The Last Pickle Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.cassandrareaper.storage.postgresql;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.tweak.Argument;
+import org.skife.jdbi.v2.tweak.ArgumentFactory;
+
+public class InstantArgumentFactory implements ArgumentFactory {
+
+ @Override
+ public boolean accepts(Class> expectedType, Object value, StatementContext ctx) {
+ return value instanceof Instant;
+ }
+
+ @Override
+ public Argument build(Class> expectedType, final Instant value, StatementContext ctx) {
+ return (pos, stmt, sc) -> stmt.setTimestamp(pos, toSqlTimestamp(value));
+ }
+
+ private static Timestamp toSqlTimestamp(Instant instant) {
+ return Timestamp.from(instant);
+ }
+
+}
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/JdbiExceptionUtil.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/JdbiExceptionUtil.java
new file mode 100644
index 000000000..f9ab2a293
--- /dev/null
+++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/JdbiExceptionUtil.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2019-2019 The Last Pickle Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.cassandrareaper.storage.postgresql;
+
+import org.postgresql.util.PSQLException;
+import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
+
+public final class JdbiExceptionUtil {
+
+ private static final String DUPLICATE_KEY_CODE = "23505";
+
+ private JdbiExceptionUtil() {
+ }
+
+ public static boolean isDuplicateKeyError(UnableToExecuteStatementException exc) {
+ if (exc.getCause() instanceof PSQLException) {
+ return ((PSQLException) exc.getCause()).getSQLState().equals(DUPLICATE_KEY_CODE);
+ }
+ return false;
+ }
+}
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/NodeMetricsMapper.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/NodeMetricsMapper.java
new file mode 100644
index 000000000..21d99983b
--- /dev/null
+++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/NodeMetricsMapper.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2019-2019 The Last Pickle Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.cassandrareaper.storage.postgresql;
+
+import io.cassandrareaper.core.NodeMetrics;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+public final class NodeMetricsMapper implements ResultSetMapper {
+
+ @Override
+ public NodeMetrics map(int index, ResultSet rs, StatementContext ctx) throws SQLException {
+ return NodeMetrics.builder()
+ .withNode(rs.getString("node"))
+ .withCluster(rs.getString("cluster"))
+ .withDatacenter(rs.getString("datacenter"))
+ .withPendingCompactions(rs.getInt("pending_compactions"))
+ .withHasRepairRunning(rs.getBoolean("has_repair_running"))
+ .withActiveAnticompactions(rs.getInt("active_anticompactions"))
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/src/server/src/main/resources/db/h2/V17_0_0__multi_instance.sql b/src/server/src/main/resources/db/h2/V17_0_0__multi_instance.sql
new file mode 100644
index 000000000..a8bd0a139
--- /dev/null
+++ b/src/server/src/main/resources/db/h2/V17_0_0__multi_instance.sql
@@ -0,0 +1,29 @@
+-- H2-compatible version of multi-instance reaper Postgres DDL
+-- CHANGES:
+-- "node" TEXT --> "node" VARCHAR(255) because H2 doesn't support index on TEXT
+
+CREATE TABLE IF NOT EXISTS leader (
+ leader_id BIGINT PRIMARY KEY,
+ reaper_instance_id BIGINT,
+ reaper_instance_host TEXT,
+ last_heartbeat TIMESTAMP WITH TIME ZONE
+);
+
+CREATE TABLE IF NOT EXISTS running_reapers (
+ reaper_instance_id BIGINT PRIMARY KEY,
+ reaper_instance_host TEXT,
+ last_heartbeat TIMESTAMP WITH TIME ZONE
+);
+
+CREATE TABLE IF NOT EXISTS node_metrics_v1 (
+ time_partition BIGINT,
+ run_id BIGINT,
+ node VARCHAR(255),
+ cluster TEXT,
+ datacenter TEXT,
+ requested BOOLEAN,
+ pending_compactions INT,
+ has_repair_running BOOLEAN,
+ active_anticompactions INT,
+ PRIMARY KEY(run_id, time_partition, node)
+);
\ No newline at end of file
diff --git a/src/server/src/main/resources/db/postgres/V17_0_0__multi_instance.sql b/src/server/src/main/resources/db/postgres/V17_0_0__multi_instance.sql
new file mode 100644
index 000000000..fe39e7891
--- /dev/null
+++ b/src/server/src/main/resources/db/postgres/V17_0_0__multi_instance.sql
@@ -0,0 +1,25 @@
+CREATE TABLE IF NOT EXISTS "leader" (
+ "leader_id" BIGINT PRIMARY KEY,
+ "reaper_instance_id" BIGINT,
+ "reaper_instance_host" TEXT,
+ "last_heartbeat" TIMESTAMP WITH TIME ZONE
+);
+
+CREATE TABLE IF NOT EXISTS "running_reapers" (
+ "reaper_instance_id" BIGINT PRIMARY KEY,
+ "reaper_instance_host" TEXT,
+ "last_heartbeat" TIMESTAMP WITH TIME ZONE
+);
+
+CREATE TABLE IF NOT EXISTS "node_metrics_v1" (
+ "run_id" BIGINT,
+ "ts" TIMESTAMP WITH TIME ZONE,
+ "node" TEXT,
+ "cluster" TEXT,
+ "datacenter" TEXT,
+ "requested" BOOLEAN,
+ "pending_compactions" INT,
+ "has_repair_running" BOOLEAN,
+ "active_anticompactions" INT,
+ PRIMARY KEY("run_id", "ts", "node")
+);
diff --git a/src/server/src/main/resources/db/postgres/V18_0_0__sidecar_mode.sql b/src/server/src/main/resources/db/postgres/V18_0_0__sidecar_mode.sql
new file mode 100644
index 000000000..761d9e934
--- /dev/null
+++ b/src/server/src/main/resources/db/postgres/V18_0_0__sidecar_mode.sql
@@ -0,0 +1,34 @@
+CREATE TABLE IF NOT EXISTS "node_metrics_v2_source_nodes" (
+ "source_node_id" SERIAL UNIQUE,
+ "cluster" TEXT,
+ "host" TEXT,
+ "last_updated" TIMESTAMP WITH TIME ZONE,
+ PRIMARY KEY ("cluster", "host")
+);
+
+CREATE TABLE IF NOT EXISTS "node_metrics_v2_metric_types" (
+ "metric_type_id" SERIAL UNIQUE,
+ "metric_domain" TEXT,
+ "metric_type" TEXT,
+ "metric_scope" TEXT,
+ "metric_name" TEXT,
+ "metric_attribute" TEXT,
+ PRIMARY KEY ("metric_domain", "metric_type", "metric_scope", "metric_name", "metric_attribute")
+);
+
+CREATE TABLE IF NOT EXISTS "node_metrics_v2" (
+ "metric_type_id" INT REFERENCES "node_metrics_v2_metric_types"("metric_type_id"),
+ "source_node_id" INT REFERENCES "node_metrics_v2_source_nodes"("source_node_id"),
+ "ts" TIMESTAMP WITH TIME ZONE,
+ "value" DOUBLE PRECISION,
+ PRIMARY KEY ("metric_type_id", "source_node_id", "ts")
+);
+
+CREATE TABLE IF NOT EXISTS "node_operations" (
+ "cluster" TEXT,
+ "type" TEXT,
+ "host" TEXT,
+ "ts" TIMESTAMP WITH TIME ZONE,
+ "data" TEXT,
+ PRIMARY KEY ("cluster", "type", "host", "ts")
+);
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java
index 55ea00105..2968df3af 100644
--- a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java
@@ -30,6 +30,7 @@
import io.cassandrareaper.resources.view.RepairScheduleStatus;
import io.cassandrareaper.service.RepairRunService;
import io.cassandrareaper.storage.CassandraStorage;
+import io.cassandrareaper.storage.PostgresStorage;
import io.cassandrareaper.storage.postgresql.DiagEventSubscriptionMapper;
import java.util.Arrays;
@@ -107,13 +108,13 @@ public final class BasicSteps {
private TestContext testContext;
public static synchronized void addReaperRunner(ReaperTestJettyRunner runner) {
-
- String csCls = CassandraStorage.class.getName();
if (!CLIENTS.isEmpty()) {
- Preconditions.checkState(csCls.equals(runner.runnerInstance.getContextStorageClassname()));
-
+ Preconditions.checkState(isInstanceOfDistributedStorage(runner.runnerInstance.getContextStorageClassname()));
RUNNERS.stream()
- .forEach(r -> Preconditions.checkState(csCls.equals(runner.runnerInstance.getContextStorageClassname())));
+ .forEach(r ->
+ Preconditions.checkState(
+ isInstanceOfDistributedStorage(runner.runnerInstance.getContextStorageClassname())
+ ));
}
RUNNERS.add(runner);
CLIENTS.add(runner.getClient());
@@ -1976,4 +1977,10 @@ private List callSubscription(
? ImmutableList.of(SimpleReaperClient.parseEventSubscriptionJSON(responseData))
: SimpleReaperClient.parseEventSubscriptionsListJSON(responseData);
}
+
+ private static boolean isInstanceOfDistributedStorage(String storageClassname) {
+ String csCls = CassandraStorage.class.getName();
+ String pgCls = PostgresStorage.class.getName();
+ return csCls.equals(storageClassname) || pgCls.equals(storageClassname);
+ }
}
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresIT.java
index c6dd25000..3fff60e40 100644
--- a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresIT.java
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresIT.java
@@ -18,7 +18,10 @@
package io.cassandrareaper.acceptance;
+import java.util.List;
import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
import cucumber.api.CucumberOptions;
import cucumber.api.junit.Cucumber;
@@ -39,8 +42,10 @@
public class ReaperPostgresIT {
private static final Logger LOG = LoggerFactory.getLogger(ReaperPostgresIT.class);
- private static ReaperTestJettyRunner runner;
+ private static final List RUNNER_INSTANCES = new CopyOnWriteArrayList<>();
private static final String POSTGRES_CONFIG_FILE = "cassandra-reaper-postgres-at.yaml";
+ private static final Random RAND = new Random(System.nanoTime());
+ private static Thread GRIM_REAPER;
protected ReaperPostgresIT() {}
@@ -49,14 +54,52 @@ protected ReaperPostgresIT() {}
public static void setUp() throws Exception {
LOG.info("setting up testing Reaper runner with {} seed hosts defined and Postgres storage",
TestContext.TEST_CLUSTER_SEED_HOSTS.size());
- runner = new ReaperTestJettyRunner(POSTGRES_CONFIG_FILE, Optional.empty());
- BasicSteps.addReaperRunner(runner);
+
+ int minReaperInstances = Integer.getInteger("grim.reaper.min", 1);
+ int maxReaperInstances = Integer.getInteger("grim.reaper.max", minReaperInstances);
+
+ for (int i = 0; i < minReaperInstances; ++i) {
+ createReaperTestJettyRunner(Optional.empty());
+ }
+
+ GRIM_REAPER = new Thread(() -> {
+ Thread.currentThread().setName("GRIM REAPER");
+ while (!Thread.currentThread().isInterrupted()) { //keep adding/removing reaper instances while test is running
+ try {
+ if (maxReaperInstances > RUNNER_INSTANCES.size()) {
+ createReaperTestJettyRunner(Optional.empty());
+ } else {
+ int remove = minReaperInstances + RAND.nextInt(maxReaperInstances - minReaperInstances);
+ removeReaperTestJettyRunner(RUNNER_INSTANCES.get(remove));
+ }
+ } catch (RuntimeException | InterruptedException ex) {
+ LOG.error("failed adding/removing reaper instance", ex);
+ }
+ }
+ });
+ if (minReaperInstances < maxReaperInstances) {
+ GRIM_REAPER.start();
+ }
}
@AfterClass
public static void tearDown() {
LOG.info("Stopping reaper service...");
- runner.runnerInstance.after();
+ GRIM_REAPER.interrupt();
+ RUNNER_INSTANCES.forEach(r -> r.runnerInstance.after());
}
+ private static void createReaperTestJettyRunner(Optional version) throws InterruptedException {
+ ReaperTestJettyRunner runner = new ReaperTestJettyRunner(POSTGRES_CONFIG_FILE, version);
+ RUNNER_INSTANCES.add(runner);
+ Thread.sleep(100);
+ BasicSteps.addReaperRunner(runner);
+ }
+
+ private static void removeReaperTestJettyRunner(ReaperTestJettyRunner runner) throws InterruptedException {
+ BasicSteps.removeReaperRunner(runner);
+ Thread.sleep(200);
+ runner.runnerInstance.after();
+ RUNNER_INSTANCES.remove(runner);
+ }
}
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresSidecarIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresSidecarIT.java
new file mode 100644
index 000000000..8f5a04a7c
--- /dev/null
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresSidecarIT.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2014-2017 Spotify AB
+ * Copyright 2016-2019 The Last Pickle Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.cassandrareaper.acceptance;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import cucumber.api.CucumberOptions;
+import cucumber.api.junit.Cucumber;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Cucumber.class)
+@CucumberOptions(
+ features = {
+ "classpath:io.cassandrareaper.acceptance/integration_reaper_functionality.feature",
+ "classpath:io.cassandrareaper.acceptance/event_subscriptions.feature"
+ },
+ plugin = {"pretty"}
+ )
+public class ReaperPostgresSidecarIT {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReaperPostgresSidecarIT.class);
+ private static final List RUNNER_INSTANCES = new CopyOnWriteArrayList<>();
+ private static final String[] POSTGRES_CONFIG_FILE = {
+ "reaper-postgres-sidecar1-at.yaml",
+ "reaper-postgres-sidecar2-at.yaml",
+ };
+
+ protected ReaperPostgresSidecarIT() {}
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LOG.info("setting up testing Reaper runner with {} seed hosts defined and Postgres storage",
+ TestContext.TEST_CLUSTER_SEED_HOSTS.size());
+
+ int reaperInstances = Integer.getInteger("grim.reaper.min", 2);
+ for (int i = 0;i < reaperInstances;i++) {
+ createReaperTestJettyRunner(Optional.empty());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ LOG.info("Stopping reaper service...");
+ RUNNER_INSTANCES.forEach(r -> r.runnerInstance.after());
+ }
+
+ private static void createReaperTestJettyRunner(Optional version) throws InterruptedException {
+ ReaperTestJettyRunner runner = new ReaperTestJettyRunner(POSTGRES_CONFIG_FILE[RUNNER_INSTANCES.size()], version);
+ RUNNER_INSTANCES.add(runner);
+ Thread.sleep(100);
+ BasicSteps.addReaperRunner(runner);
+ }
+}
diff --git a/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java b/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java
new file mode 100644
index 000000000..dea08e47b
--- /dev/null
+++ b/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java
@@ -0,0 +1,565 @@
+/*
+ *
+ * Copyright 2019-2019 The Last Pickle Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.cassandrareaper.storage;
+
+import io.cassandrareaper.AppContext;
+import io.cassandrareaper.core.GenericMetric;
+import io.cassandrareaper.core.NodeMetrics;
+import io.cassandrareaper.storage.postgresql.IStoragePostgreSql;
+import io.cassandrareaper.storage.postgresql.UuidUtil;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Reader;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.ibatis.common.jdbc.ScriptRunner;
+import org.fest.assertions.api.Assertions;
+import org.h2.tools.Server;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+
+public class PostgresStorageTest {
+
+ private static final String DB_URL = "jdbc:h2:mem:test_mem;MODE=PostgreSQL;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false";
+
+ @Before
+ public void setUp() throws SQLException, IOException {
+ Server.createTcpServer().start();
+
+ DBI dbi = new DBI(DB_URL);
+ Handle handle = dbi.open();
+ Connection conn = handle.getConnection();
+
+ // to suppress output of ScriptRunner
+ PrintStream tmp = new PrintStream(new OutputStream() {
+ @Override
+ public void write(int buff) throws IOException {
+ // do nothing
+ }
+ });
+ PrintStream console = System.out;
+ System.setOut(tmp);
+
+ String cwd = Paths.get("").toAbsolutePath().toString();
+ String path = cwd + "/../src/test/resources/db/postgres/V17_0_0__multi_instance.sql";
+ ScriptRunner scriptExecutor = new ScriptRunner(conn, false, true);
+ Reader reader = new BufferedReader(new FileReader(path));
+ scriptExecutor.runScript(reader);
+
+ System.setOut(console);
+ }
+
+ @Test
+ public void testTakeLead() {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from leader");
+
+ int numEntries = 5;
+ Set leaderIds = new HashSet<>();
+ for (int i = 0; i < numEntries; i++) {
+ UUID msbLeaderId = UuidUtil.fromSequenceId(UuidUtil.toSequenceId(UUID.randomUUID()));
+ leaderIds.add(msbLeaderId);
+ }
+
+ // insert all five leader entries
+ for (UUID leaderId : leaderIds) {
+ boolean result = storage.takeLead(leaderId);
+ Assertions.assertThat(result).isEqualTo(true);
+ }
+
+ // make sure fetched leaders has all the inserted leaders
+ List fetchedLeaderIds = storage.getLeaders();
+ for (UUID fetchedLeaderId : fetchedLeaderIds) {
+ Assertions.assertThat(leaderIds.contains(fetchedLeaderId)).isTrue();
+ }
+ }
+
+ @Test
+ public void testNoLeaders() {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from leader");
+
+ List fetchedLeaderIds = storage.getLeaders();
+ Assertions.assertThat(fetchedLeaderIds.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testRenewLead() throws InterruptedException {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from leader");
+
+ UUID leaderId = UUID.randomUUID();
+ int sleepTime = 3;
+
+ final Instant initialTime = Instant.now();
+ storage.takeLead(leaderId);
+
+ // sleep 3 seconds, then renew lead
+ TimeUnit.SECONDS.sleep(sleepTime);
+ Assertions.assertThat(storage.renewLead(leaderId)).isTrue();
+
+ Instant hbTime = handle.createQuery("SELECT last_heartbeat FROM leader")
+ .mapTo(Timestamp.class)
+ .first()
+ .toInstant();
+
+ Duration between = Duration.between(initialTime, hbTime);
+ Assertions.assertThat(between.getSeconds()).isGreaterThanOrEqualTo(sleepTime);
+ }
+
+ @Test
+ public void testReleaseLead() {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from leader");
+
+ UUID leaderIdForSelf = UUID.randomUUID();
+ UUID leaderIdForOther = UUID.randomUUID();
+
+ storage.takeLead(leaderIdForSelf);
+ storage.takeLead(leaderIdForOther);
+
+ List fetchedLeaderIds = storage.getLeaders();
+ Assertions.assertThat(fetchedLeaderIds.size()).isEqualTo(2);
+
+ handle.createStatement("UPDATE leader SET reaper_instance_id = 0 WHERE leader_id = :id")
+ .bind("id", UuidUtil.toSequenceId(leaderIdForOther))
+ .execute();
+
+ // test that releaseLead succeeds for entry where instance_id = self
+ storage.releaseLead(leaderIdForSelf);
+ fetchedLeaderIds = storage.getLeaders();
+ Assertions.assertThat(fetchedLeaderIds.size()).isEqualTo(1);
+
+ // test that releaseLead fails for entry where instance_id != self
+ storage.releaseLead(leaderIdForOther);
+ fetchedLeaderIds = storage.getLeaders();
+ Assertions.assertThat(fetchedLeaderIds.size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testSaveHeartbeat() {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from running_reapers");
+
+ storage.saveHeartbeat();
+ int numReapers = storage.countRunningReapers();
+ Assertions.assertThat(numReapers).isEqualTo(1);
+ }
+
+ @Test
+ public void testNodeMetrics() {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from node_metrics_v1");
+
+ UUID runId = UUID.randomUUID();
+
+ // test empty result set
+ ArrayList emptyNmList = (ArrayList) storage.getNodeMetrics(runId);
+ Assertions.assertThat(emptyNmList.size()).isEqualTo(0);
+
+ NodeMetrics originalNm = NodeMetrics.builder()
+ .withNode("fake_node")
+ .withCluster("fake_cluster")
+ .withDatacenter("NYDC")
+ .withHasRepairRunning(true)
+ .withPendingCompactions(4)
+ .withActiveAnticompactions(1)
+ .build();
+
+ storage.storeNodeMetrics(runId, originalNm);
+ ArrayList nodeMetricsList = (ArrayList) storage.getNodeMetrics(runId);
+ Assertions.assertThat(nodeMetricsList.size()).isEqualTo(1);
+
+ NodeMetrics fetchedNm = nodeMetricsList.get(0);
+ Assertions.assertThat(fetchedNm.getNode()).isEqualTo(originalNm.getNode());
+ Assertions.assertThat(fetchedNm.getCluster()).isEqualTo(originalNm.getCluster());
+ Assertions.assertThat(fetchedNm.getDatacenter()).isEqualTo(originalNm.getDatacenter());
+ Assertions.assertThat(fetchedNm.hasRepairRunning()).isEqualTo(originalNm.hasRepairRunning());
+ Assertions.assertThat(fetchedNm.getPendingCompactions()).isEqualTo(originalNm.getPendingCompactions());
+ Assertions.assertThat(fetchedNm.getActiveAnticompactions()).isEqualTo(originalNm.getActiveAnticompactions());
+ }
+
+ @Test
+ public void testNodeMetricsByNode() throws InterruptedException {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from node_metrics_v1");
+
+ UUID runId = UUID.randomUUID();
+
+ NodeMetrics nmRequest = NodeMetrics.builder()
+ .withNode("fake_node1")
+ .withCluster("fake_cluster")
+ .withDatacenter("NYDC")
+ .withRequested(true)
+ .build();
+
+ NodeMetrics nm1 = NodeMetrics.builder()
+ .withNode("fake_node1")
+ .withCluster("fake_cluster")
+ .withDatacenter("NYDC")
+ .withHasRepairRunning(true)
+ .withPendingCompactions(4)
+ .withActiveAnticompactions(1)
+ .build();
+
+ // store a metric request and a metric response
+ storage.storeNodeMetrics(runId, nmRequest);
+ TimeUnit.MILLISECONDS.sleep(100);
+ storage.storeNodeMetrics(runId, nm1);
+
+ Optional fetchedNm1Opt = storage.getNodeMetrics(runId, "fake_node1");
+ Assertions.assertThat(fetchedNm1Opt.isPresent()).isTrue();
+ NodeMetrics fetchedNm1 = fetchedNm1Opt.get();
+ Assertions.assertThat(fetchedNm1.getNode()).isEqualTo(nm1.getNode());
+ Assertions.assertThat(fetchedNm1.getCluster()).isEqualTo(nm1.getCluster());
+ Assertions.assertThat(fetchedNm1.getDatacenter()).isEqualTo(nm1.getDatacenter());
+ Assertions.assertThat(fetchedNm1.hasRepairRunning()).isEqualTo(nm1.hasRepairRunning());
+ Assertions.assertThat(fetchedNm1.getPendingCompactions()).isEqualTo(nm1.getPendingCompactions());
+ Assertions.assertThat(fetchedNm1.getActiveAnticompactions()).isEqualTo(nm1.getActiveAnticompactions());
+
+ // test that fetching a non-existent metric returns Optional.Empty()
+ Optional fetchedNm2Opt = storage.getNodeMetrics(runId, "fake_node2");
+ Assertions.assertThat(fetchedNm2Opt.isPresent()).isFalse();
+ }
+
+ @Test
+ public void testNodeOperations() {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from node_operations");
+
+ storage.storeOperations("fake_cluster", OpType.OP_STREAMING, "fake_host", "data1");
+ String data = storage.listOperations("fake_cluster", OpType.OP_STREAMING, "fake_host");
+ Assertions.assertThat(data.equals("data1"));
+ storage.storeOperations("fake_cluster", OpType.OP_STREAMING, "fake_host", "data2");
+
+ data = storage.listOperations("fake_cluster", OpType.OP_STREAMING, "fake_host");
+ Assertions.assertThat(data.equals("data2"));
+ }
+
+ @Test
+ public void testGenericMetricsByHostandCluster() {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from node_metrics_v2");
+ handle.execute("DELETE from node_metrics_v2_source_nodes");
+ handle.execute("DELETE from node_metrics_v2_metric_types");
+
+ DateTime now = DateTime.now();
+ GenericMetric metric1 = GenericMetric.builder()
+ .withClusterName("fake_cluster")
+ .withHost("fake_host1")
+ .withTs(now)
+ .withMetricDomain("org.apache.cassandra.metrics")
+ .withMetricType("ThreadPool")
+ .withMetricName("PendingTasks")
+ .withMetricScope("MutationStage")
+ .withMetricAttribute("fake_attribute")
+ .withValue(12)
+ .build();
+ GenericMetric metric2 = GenericMetric.builder() // different metric, different host
+ .withClusterName("fake_cluster")
+ .withHost("fake_host2")
+ .withTs(now)
+ .withMetricDomain("org.apache.cassandra.metrics")
+ .withMetricType("ThreadPool")
+ .withMetricName("ActiveTasks")
+ .withMetricScope("MutationStage")
+ .withMetricAttribute("fake_attribute")
+ .withValue(14)
+ .build();
+
+ storage.storeMetric(metric1);
+ storage.storeMetric(metric2);
+
+ // verify that the two metrics above can be queried by cluster name
+ Set expectedMetrics = new HashSet<>();
+ expectedMetrics.add("PendingTasks");
+ expectedMetrics.add("ActiveTasks");
+ List retrievedMetrics = storage.getMetrics(
+ "fake_cluster",
+ Optional.empty(),
+ "org.apache.cassandra.metrics",
+ "ThreadPool",
+ now.getMillis()
+ );
+ for (GenericMetric retrievedMetric : retrievedMetrics) {
+ Assertions.assertThat(expectedMetrics.contains(retrievedMetric.getMetricName()));
+ expectedMetrics.remove(retrievedMetric.getMetricName());
+ }
+
+ // verify that metrics can be queried by host
+ retrievedMetrics = storage.getMetrics(
+ "fake_cluster",
+ Optional.of("fake_host2"),
+ "org.apache.cassandra.metrics",
+ "ThreadPool",
+ now.getMillis()
+ );
+ Assertions.assertThat(retrievedMetrics.size() == 1);
+ GenericMetric retrievedMetric = retrievedMetrics.get(0);
+ Assertions.assertThat(retrievedMetric.getMetricName().equals("ActiveTasks"));
+ }
+
+ @Test
+ public void testGenericMetricExpiration() {
+ DBI dbi = new DBI(DB_URL);
+ UUID reaperInstanceId = UUID.randomUUID();
+ PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi);
+ Assertions.assertThat(storage.isStorageConnected()).isTrue();
+
+ Handle handle = dbi.open();
+ handle.execute("DELETE from node_metrics_v2");
+ handle.execute("DELETE from node_metrics_v2_source_nodes");
+ handle.execute("DELETE from node_metrics_v2_metric_types");
+
+
+ DateTime expirationTime = DateTime.now().minusMinutes(3);
+ GenericMetric expiredMetric = GenericMetric.builder()
+ .withClusterName("fake_cluster")
+ .withHost("fake_host1")
+ .withTs(expirationTime)
+ .withMetricDomain("org.apache.cassandra.metrics")
+ .withMetricType("ThreadPool")
+ .withMetricName("PendingTasks")
+ .withMetricScope("MutationStage")
+ .withMetricAttribute("fake_attribute")
+ .withValue(12)
+ .build();
+ storage.storeMetric(expiredMetric);
+
+ // verify that the metric was stored in the DB
+ List retrievedMetrics = storage.getMetrics(
+ "fake_cluster",
+ Optional.empty(),
+ "org.apache.cassandra.metrics",
+ "ThreadPool",
+ expirationTime.getMillis()
+ );
+ Assertions.assertThat(retrievedMetrics.size() == 1);
+ List