From aa833d8cd69e27655155e40bfe971e13a3cc4e47 Mon Sep 17 00:00:00 2001 From: Saleil Bhat Date: Wed, 6 Feb 2019 12:58:11 -0500 Subject: [PATCH] support use of Postgresql as storage backend for multiple reapers --- .travis.yml | 94 +-- src/ci/before_script.sh | 6 +- src/ci/script.sh | 54 +- src/server/pom.xml | 5 + .../io/cassandrareaper/ReaperApplication.java | 16 +- .../cassandrareaper/service/PurgeService.java | 17 + .../storage/CassandraStorage.java | 9 + .../storage/IDistributedStorage.java | 15 + .../storage/PostgresStorage.java | 474 ++++++++++++++- .../postgresql/GenericMetricMapper.java | 45 ++ .../postgresql/IStoragePostgreSql.java | 317 ++++++++++ .../postgresql/InstantArgumentFactory.java | 43 ++ .../storage/postgresql/JdbiExceptionUtil.java | 36 ++ .../storage/postgresql/NodeMetricsMapper.java | 41 ++ .../db/h2/V17_0_0__multi_instance.sql | 29 + .../db/postgres/V17_0_0__multi_instance.sql | 25 + .../db/postgres/V18_0_0__sidecar_mode.sql | 34 ++ .../acceptance/BasicSteps.java | 17 +- .../acceptance/ReaperPostgresIT.java | 51 +- .../acceptance/ReaperPostgresSidecarIT.java | 74 +++ .../storage/PostgresStorageTest.java | 565 ++++++++++++++++++ .../db/postgres/V17_0_0__multi_instance.sql | 66 ++ .../reaper-postgres-sidecar1-at.yaml | 70 +++ .../reaper-postgres-sidecar2-at.yaml | 70 +++ 24 files changed, 2088 insertions(+), 85 deletions(-) create mode 100644 src/server/src/main/java/io/cassandrareaper/storage/postgresql/GenericMetricMapper.java create mode 100644 src/server/src/main/java/io/cassandrareaper/storage/postgresql/InstantArgumentFactory.java create mode 100644 src/server/src/main/java/io/cassandrareaper/storage/postgresql/JdbiExceptionUtil.java create mode 100644 src/server/src/main/java/io/cassandrareaper/storage/postgresql/NodeMetricsMapper.java create mode 100644 src/server/src/main/resources/db/h2/V17_0_0__multi_instance.sql create mode 100644 src/server/src/main/resources/db/postgres/V17_0_0__multi_instance.sql create mode 100644 src/server/src/main/resources/db/postgres/V18_0_0__sidecar_mode.sql create mode 100644 src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresSidecarIT.java create mode 100644 src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java create mode 100644 src/server/src/test/resources/db/postgres/V17_0_0__multi_instance.sql create mode 100644 src/server/src/test/resources/reaper-postgres-sidecar1-at.yaml create mode 100644 src/server/src/test/resources/reaper-postgres-sidecar2-at.yaml diff --git a/.travis.yml b/.travis.yml index 055e62e1e..f8dfa42d1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -46,88 +46,104 @@ jobs: fast_finish: true allow_failures: - env: TEST_TYPE=docker + - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk + - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1 + - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=2 + - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=4 include: - stage: Integration Tests - name: Memory, H2 and Postgresql reaping Cassandra 1.2.19 - env: TEST_TYPE=ccm CASSANDRA_VERSION=1.2.19 CUCUMBER_OPTIONS="--tags ~@cassandra_2_1_onwards --tags ~@cassandra_4_0_onwards" + name: Memory and H2 reaping Cassandra 1.2.19 + env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=1.2.19 CUCUMBER_OPTIONS="--tags ~@cassandra_2_1_onwards --tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Memory, H2 and Postgresql reaping Cassandra 2.0.17 - env: TEST_TYPE=ccm CASSANDRA_VERSION=2.0.17 CUCUMBER_OPTIONS="--tags ~@cassandra_2_1_onwards --tags ~@cassandra_4_0_onwards" + name: Memory and H2 reaping Cassandra 2.0.17 + env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=2.0.17 CUCUMBER_OPTIONS="--tags ~@cassandra_2_1_onwards --tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Memory, H2 and Postgresql reaping Cassandra 2.1.20 - env: TEST_TYPE=ccm CASSANDRA_VERSION=2.1.20 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + name: Memory and H2 reaping Cassandra 2.1.20 + env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=2.1.20 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Memory, H2 and Postgresql reaping Cassandra 2.2.13 - env: TEST_TYPE=ccm CASSANDRA_VERSION=2.2.13 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + name: Memory and H2 reaping Cassandra 2.2.13 + env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=2.2.13 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Memory, H2 and Postgresql reaping Cassandra 3.0.17 - env: TEST_TYPE=ccm CASSANDRA_VERSION=3.0.17 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + name: Memory and H2 reaping Cassandra 3.0.17 + env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=3.0.17 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Memory, H2 and Postgresql reaping Cassandra 3.11.4 - env: TEST_TYPE=ccm CASSANDRA_VERSION=3.11.4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + name: Memory and H2 reaping Cassandra 3.11.4 + env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=3.11.4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Memory, H2 and Postgresql reaping Cassandra 4.0 - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk + name: Memory and H2 reaping Cassandra 4.0 + env: TEST_TYPE=ccm STORAGE_TYPE=local CASSANDRA_VERSION=github:apache/trunk - stage: Integration Tests - name: Cassandra 2.1.20 - env: TEST_TYPE=ccm CASSANDRA_VERSION=2.1.20 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + name: Cassandra reaping Cassandra 2.1.20 + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.1.20 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Cassandra 2.2.13 - env: TEST_TYPE=ccm CASSANDRA_VERSION=2.2.13 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + name: Cassandra reaping Cassandra 2.2.13 + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.2.13 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Cassandra 3.0.17 - env: TEST_TYPE=ccm CASSANDRA_VERSION=3.0.17 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + name: Cassandra reaping Cassandra 3.0.17 + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.0.17 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Cassandra 3.11.3 - env: TEST_TYPE=ccm CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + name: Cassandra reaping Cassandra 3.11.3 + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Integration Tests - name: Cassandra 4.0 - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1 + name: Cassandra reaping Cassandra 4.0 + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1 + - stage: Integration Tests + name: Postgresql reaping Cassandra 3.11.3 + env: TEST_TYPE=ccm STORAGE_TYPE=postgresql CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Distributed Reaper Integration Tests name: Two Reapers on Cassandra 2.1.20 - env: TEST_TYPE=ccm CASSANDRA_VERSION=2.1.20 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.1.20 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Distributed Reaper Integration Tests name: Two Reapers on Cassandra 2.2.13 - env: TEST_TYPE=ccm CASSANDRA_VERSION=2.2.13 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.2.13 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Distributed Reaper Integration Tests name: Two Reapers on Cassandra 3.0.17 - env: TEST_TYPE=ccm CASSANDRA_VERSION=3.0.17 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.0.17 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Distributed Reaper Integration Tests name: Two Reapers on Cassandra 3.11.3 - env: TEST_TYPE=ccm CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Distributed Reaper Integration Tests name: Two Reapers on Cassandra 4.0 - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=2 + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=2 + - stage: Distributed Reaper Integration Tests + name: Two Reapers on Cassandra 3.11.3 with Postgresql backend + env: TEST_TYPE=ccm STORAGE_TYPE=postgresql CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=2 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Sidecar Integration Tests name: Sidecar on Cassandra 2.1.20 - env: TEST_TYPE=sidecar CASSANDRA_VERSION=2.1.20 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar" + env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.1.20 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar" - stage: Sidecar Integration Tests name: Sidecar on Cassandra 2.2.13 - env: TEST_TYPE=sidecar CASSANDRA_VERSION=2.2.13 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar" + env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.2.13 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar" - stage: Sidecar Integration Tests name: Sidecar on Cassandra 3.0.17 - env: TEST_TYPE=sidecar CASSANDRA_VERSION=3.0.17 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar" + env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.0.17 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar" - stage: Sidecar Integration Tests name: Sidecar on Cassandra 3.11.3 - env: TEST_TYPE=sidecar CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar" + env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar" - stage: Sidecar Integration Tests name: Sidecar on Cassandra 4.0 - env: TEST_TYPE=sidecar CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags @sidecar" + env: TEST_TYPE=sidecar STORAGE_TYPE=cassandra CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags @sidecar" + - stage: Sidecar Integration Tests + name: Sidecar on Cassandra 3.11.3 with Postgresql backend + env: TEST_TYPE=sidecar STORAGE_TYPE=postgresql CASSANDRA_VERSION=3.11.3 GRIM_MIN=1 GRIM_MAX=1 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards --tags @sidecar" - stage: Flapping Distributed Reaper Integration Tests name: Four Flapping Reaper on Cassandra 2.1.20 - env: TEST_TYPE=ccm CASSANDRA_VERSION=2.1.20 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.1.20 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Flapping Distributed Reaper Integration Tests name: Four Flapping Reaper on Cassandra 2.2.13 - env: TEST_TYPE=ccm CASSANDRA_VERSION=2.2.13 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=2.2.13 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Flapping Distributed Reaper Integration Tests name: Four Flapping Reaper on Cassandra 3.0.17 - env: TEST_TYPE=ccm CASSANDRA_VERSION=3.0.17 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.0.17 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Flapping Distributed Reaper Integration Tests name: Four Flapping Reaper on Cassandra 3.11.3 - env: TEST_TYPE=ccm CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" - stage: Flapping Distributed Reaper Integration Tests name: Four Flapping Reaper on Cassandra 4.0 - env: TEST_TYPE=ccm CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=4 + env: TEST_TYPE=ccm STORAGE_TYPE=cassandra CASSANDRA_VERSION=github:apache/trunk GRIM_MIN=2 GRIM_MAX=4 + - stage: Flapping Distributed Reaper Integration Tests + name: Four Flapping Reaper on Cassandra 3.11.3 with Postgresql backend + env: TEST_TYPE=ccm STORAGE_TYPE=postgresql CASSANDRA_VERSION=3.11.3 GRIM_MIN=2 GRIM_MAX=4 CUCUMBER_OPTIONS="--tags ~@cassandra_4_0_onwards" # Upgrade Integration Tests are broken due to classloading issues around org.glassfish.jersey.internal.spi.AutoDiscoverable # - stage: Upgrade Integration Tests # name: Upgrading Reaper on Cassandra 2.1.20 (all_nodes_reachable) diff --git a/src/ci/before_script.sh b/src/ci/before_script.sh index 3980d61eb..ac8da21a4 100755 --- a/src/ci/before_script.sh +++ b/src/ci/before_script.sh @@ -22,10 +22,8 @@ case "${TEST_TYPE}" in echo "ERROR: Environment variable TEST_TYPE is unspecified." exit 1 ;; - "ccm") - if [ "x${GRIM_MIN}" = "x" ] ; then - psql -c 'create database reaper;' -U postgres - fi + "ccm"|"sidecar") + psql -c 'create database reaper;' -U postgres ;; *) echo "Skipping, no actions for TEST_TYPE=${TEST_TYPE}." diff --git a/src/ci/script.sh b/src/ci/script.sh index 7cc55f580..4d1364959 100755 --- a/src/ci/script.sh +++ b/src/ci/script.sh @@ -41,26 +41,54 @@ case "${TEST_TYPE}" in sleep 30 ccm status - if [ "x${GRIM_MIN}" = "x" ] - then - mvn -B package - mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperShiroIT - mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperIT - mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperH2IT - mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperPostgresIT - else - mvn -B package -DskipTests - mvn -B surefire:test -DsurefireArgLine="-Xmx384m" -Dtest=ReaperCassandraIT -Dgrim.reaper.min=${GRIM_MIN} -Dgrim.reaper.max=${GRIM_MAX} - fi + case "${STORAGE_TYPE}" in + "") + echo "ERROR: Environment variable STORAGE_TYPE is unspecified." + exit 1 + ;; + "local") + mvn -B package + mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperShiroIT + mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperIT + mvn -B surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperH2IT + ;; + "postgresql") + mvn -B package -DskipTests + mvn -B surefire:test -DsurefireArgLine="-Xmx384m" -Dtest=ReaperPostgresIT -Dgrim.reaper.min=${GRIM_MIN} -Dgrim.reaper.max=${GRIM_MAX} + ;; + "cassandra") + mvn -B package -DskipTests + mvn -B surefire:test -DsurefireArgLine="-Xmx384m" -Dtest=ReaperCassandraIT -Dgrim.reaper.min=${GRIM_MIN} -Dgrim.reaper.max=${GRIM_MAX} + ;; + *) + echo "Skipping, no actions for STORAGE_TYPE=${STORAGE_TYPE}." + ;; + esac + ;; "sidecar") mvn --version -B + mvn -B package -DskipTests ccm start sleep 30 ccm status - mvn -B package -DskipTests - mvn -B surefire:test -DsurefireArgLine="-Xmx512m" -Dtest=ReaperCassandraSidecarIT + case "${STORAGE_TYPE}" in + "") + echo "ERROR: Environment variable STORAGE_TYPE is unspecified." + exit 1 + ;; + "postgresql") + mvn -B surefire:test -DsurefireArgLine="-Xmx512m" -Dtest=ReaperPostgresSidecarIT -Dcucumber.options="-t @sidecar" + ;; + "cassandra") + mvn -B surefire:test -DsurefireArgLine="-Xmx512m" -Dtest=ReaperCassandraSidecarIT -Dcucumber.options="-t @sidecar" + ;; + *) + echo "Skipping, no actions for STORAGE_TYPE=${STORAGE_TYPE}." + ;; + esac + ;; "upgrade") mvn --version -B diff --git a/src/server/pom.xml b/src/server/pom.xml index 629c4ae94..7d1a5845b 100644 --- a/src/server/pom.xml +++ b/src/server/pom.xml @@ -296,6 +296,11 @@ 0.7 test + + com.ibatis + ibatis2-common + 2.1.7.597 + diff --git a/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java b/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java index 04c51bec1..73735bab3 100644 --- a/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java +++ b/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java @@ -75,6 +75,7 @@ import io.prometheus.client.CollectorRegistry; import io.prometheus.client.dropwizard.DropwizardExports; import io.prometheus.client.exporter.MetricsServlet; +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.HttpClient; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.session.SessionHandler; @@ -441,9 +442,18 @@ private IStorage initializeStorage(ReaperApplicationConfiguration config, Enviro || "database".equalsIgnoreCase(config.getStorageType())) { // create DBI instance final DBIFactory factory = new DBIFactory(); - - // instanciate store - storage = new PostgresStorage(factory.build(environment, config.getDataSourceFactory(), "postgresql")); + if (StringUtils.isEmpty(config.getDataSourceFactory().getDriverClass()) + && "postgres".equalsIgnoreCase(config.getStorageType())) { + config.getDataSourceFactory().setDriverClass("org.postgresql.Driver"); + } else if (StringUtils.isEmpty(config.getDataSourceFactory().getDriverClass()) + && "h2".equalsIgnoreCase(config.getStorageType())) { + config.getDataSourceFactory().setDriverClass("org.h2.Driver"); + } + // instantiate store + storage = new PostgresStorage( + context.reaperInstanceId, + factory.build(environment, config.getDataSourceFactory(), "postgresql") + ); initDatabase(config); } else { LOG.error("invalid storageType: {}", config.getStorageType()); diff --git a/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java b/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java index 6e7db6d45..bc1dab417 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java +++ b/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java @@ -21,6 +21,7 @@ import io.cassandrareaper.ReaperException; import io.cassandrareaper.core.Cluster; import io.cassandrareaper.core.RepairRun; +import io.cassandrareaper.storage.IDistributedStorage; import java.util.Collection; import java.util.List; @@ -73,6 +74,7 @@ public Integer purgeDatabase() throws ReaperException { } } } + purgeMetrics(); return purgedRuns; } @@ -130,4 +132,19 @@ private int purgeRepairRunsByDate(Collection repairRuns) { return purgedRuns.get(); } + + /** + * Purges all expired metrics from storage. Expiration time is a property of the storage, stored either in + * the schema itself for databases with TTL or in the storage instance for databases which must be purged manually + */ + private void purgeMetrics() { + if (context.storage instanceof IDistributedStorage) { + IDistributedStorage storage = ((IDistributedStorage) context.storage); + if (context.config.isInSidecarMode()) { + storage.purgeMetrics(); + storage.purgeNodeOperations(); + } + storage.purgeNodeMetrics(); + } + } } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java index 77cc8cc5a..f1e77f7cb 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java @@ -1522,6 +1522,9 @@ public void deleteNodeMetrics(UUID runId, String node) { session.executeAsync(delNodeMetricsByNodePrepStmt.bind(minute, runId, node)); } + @Override + public void purgeNodeMetrics() {} + private static NodeMetrics createNodeMetrics(Row row) { return NodeMetrics.builder() .withNode(row.getString("node")) @@ -1830,6 +1833,9 @@ public void storeMetric(GenericMetric metric) { metric.getValue())); } + @Override + public void purgeMetrics() {} + @Override public void storeOperations(String clusterName, OpType operationType, String host, String operationsJson) { session.executeAsync( @@ -1853,4 +1859,7 @@ public String listOperations(String clusterName, OpType operationType, String ho : operations.one().getString("data"); } + @Override + public void purgeNodeOperations() {} + } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java index 12584e3ed..2906e7948 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java @@ -80,4 +80,19 @@ List getMetrics( void storeOperations(String clusterName, OpType operationType, String host, String operationsJson); String listOperations(String clusterName, OpType operationType, String host); + + /** + * Purges old node metrics from the database (no-op for databases with TTL) + */ + void purgeNodeMetrics(); + + /** + * Purges old metrics from the database (no-op for databases w/ TTL) + */ + void purgeMetrics(); + + /** + * Purges old node operation info from the database (no-op for databases w/ TTL) + */ + void purgeNodeOperations(); } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java index 1cdeb7b10..6ff9564ff 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java @@ -17,8 +17,11 @@ package io.cassandrareaper.storage; +import io.cassandrareaper.AppContext; import io.cassandrareaper.core.Cluster; import io.cassandrareaper.core.DiagEventSubscription; +import io.cassandrareaper.core.GenericMetric; +import io.cassandrareaper.core.NodeMetrics; import io.cassandrareaper.core.RepairRun; import io.cassandrareaper.core.RepairSchedule; import io.cassandrareaper.core.RepairSegment; @@ -30,6 +33,8 @@ import io.cassandrareaper.service.RingRange; import io.cassandrareaper.storage.postgresql.BigIntegerArgumentFactory; import io.cassandrareaper.storage.postgresql.IStoragePostgreSql; +import io.cassandrareaper.storage.postgresql.InstantArgumentFactory; +import io.cassandrareaper.storage.postgresql.JdbiExceptionUtil; import io.cassandrareaper.storage.postgresql.LongCollectionSqlTypeArgumentFactory; import io.cassandrareaper.storage.postgresql.PostgresArrayArgumentFactory; import io.cassandrareaper.storage.postgresql.PostgresRepairSegment; @@ -40,6 +45,8 @@ import io.cassandrareaper.storage.postgresql.UuidArgumentFactory; import io.cassandrareaper.storage.postgresql.UuidUtil; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -48,9 +55,11 @@ import java.util.Set; import java.util.SortedSet; import java.util.UUID; +import java.util.stream.Collectors; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -59,23 +68,56 @@ import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.exceptions.DBIException; +import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Implements the StorageAPI using PostgreSQL database. */ -public final class PostgresStorage implements IStorage { +public class PostgresStorage implements IStorage, IDistributedStorage { private static final Logger LOG = LoggerFactory.getLogger(PostgresStorage.class); + private static final int DEFAULT_LEADER_TIMEOUT_MIN = 10; // pulled value from Cassandra DDL + private static final int DEFAULT_REAPER_TIMEOUT_MIN = 3; // pulled value from Cassandra DDL + private static final int DEFAULT_METRICS_TIMEOUT_MIN = 14400; // pulled value from Cassandra DDL (10 days in minutes) + private static final int DEFAULT_NODE_OPERATIONS_TIMEOUT_MIN = 5; // pulled value from Cassandra DDL + + protected final DBI jdbi; + private final Duration leaderTimeout; + private final Duration reaperTimeout; + private final Duration metricsTimeout; + private final Duration nodeOperationsTimeout; + private final UUID reaperInstanceId; + + + public PostgresStorage(UUID reaperInstanceId, DBI jdbi) { + this( + reaperInstanceId, + jdbi, + DEFAULT_LEADER_TIMEOUT_MIN, + DEFAULT_REAPER_TIMEOUT_MIN, + DEFAULT_METRICS_TIMEOUT_MIN, + DEFAULT_NODE_OPERATIONS_TIMEOUT_MIN + ); + } - private final DBI jdbi; - - public PostgresStorage(DBI jdbi) { + @VisibleForTesting + public PostgresStorage(UUID reaperInstanceId, + DBI jdbi, + int leaderTimeoutInMinutes, + int reaperTimeoutInMinutes, + int metricsTimeoutInMinutes, + int nodeOperationsTimeoutInMinutes) { + this.reaperInstanceId = reaperInstanceId; this.jdbi = jdbi; + leaderTimeout = Duration.ofMinutes(leaderTimeoutInMinutes); + reaperTimeout = Duration.ofMinutes(reaperTimeoutInMinutes); + metricsTimeout = Duration.ofMinutes(metricsTimeoutInMinutes); + nodeOperationsTimeout = Duration.ofMinutes(nodeOperationsTimeoutInMinutes); } - private static IStoragePostgreSql getPostgresStorage(Handle handle) { + protected static IStoragePostgreSql getPostgresStorage(Handle handle) { handle.registerArgumentFactory(new LongCollectionSqlTypeArgumentFactory()); handle.registerArgumentFactory(new PostgresArrayArgumentFactory()); handle.registerArgumentFactory(new RunStateArgumentFactory()); @@ -84,6 +126,7 @@ private static IStoragePostgreSql getPostgresStorage(Handle handle) { handle.registerArgumentFactory(new BigIntegerArgumentFactory()); handle.registerArgumentFactory(new ScheduleStateArgumentFactory()); handle.registerArgumentFactory(new UuidArgumentFactory()); + handle.registerArgumentFactory(new InstantArgumentFactory()); return handle.attach(IStoragePostgreSql.class); } @@ -148,19 +191,26 @@ public boolean addCluster(Cluster cluster) { Cluster result = null; try (Handle h = jdbi.open()) { String properties = new ObjectMapper().writeValueAsString(cluster.getProperties()); - - int rowsAdded = getPostgresStorage(h).insertCluster( - cluster.getName(), - cluster.getPartitioner().get(), - cluster.getSeedHosts(), - properties, - cluster.getState().name(), - java.sql.Date.valueOf(cluster.getLastContact())); - - if (rowsAdded < 1) { + try { + int rowsAdded = getPostgresStorage(h).insertCluster( + cluster.getName(), + cluster.getPartitioner().get(), + cluster.getSeedHosts(), + properties, + cluster.getState().name(), + java.sql.Date.valueOf(cluster.getLastContact())); + + if (rowsAdded < 1) { + LOG.warn("failed inserting cluster with name: {}", cluster.getName()); + } else { + result = cluster; // no created id, as cluster name used for primary key + } + } catch (UnableToExecuteStatementException e) { + if (JdbiExceptionUtil.isDuplicateKeyError(e)) { + LOG.warn("cluster with name: {} already exists; updating instead", cluster.getName()); + return updateCluster(cluster); + } LOG.warn("failed inserting cluster with name: {}", cluster.getName()); - } else { - result = cluster; // no created id, as cluster name used for primary key } } catch (JsonProcessingException e) { throw new IllegalStateException(e); @@ -632,4 +682,394 @@ public boolean deleteEventSubscription(UUID id) { return getPostgresStorage(h).deleteEventSubscription(UuidUtil.toSequenceId(id)) > 0; } } + + @Override + public boolean takeLead(UUID leaderId) { + return takeLead(leaderId, leaderTimeout); + } + + @Override + public boolean takeLead(UUID leaderId, int ttl) { + Duration leadTimeout = Duration.ofSeconds(ttl); + return takeLead(leaderId, leadTimeout); + } + + private boolean takeLead(UUID leaderId, Duration ttl) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + try { + int rowsInserted = getPostgresStorage(h).insertLeaderEntry( + leaderId, + reaperInstanceId, + AppContext.REAPER_INSTANCE_ADDRESS + ); + if (rowsInserted == 1) { // insert should modify exactly 1 row + return true; + } + } catch (UnableToExecuteStatementException e) { + if (JdbiExceptionUtil.isDuplicateKeyError(e)) { + // if it's a duplicate key error, then try to update it + int rowsUpdated = getPostgresStorage(h).updateLeaderEntry( + leaderId, + reaperInstanceId, + AppContext.REAPER_INSTANCE_ADDRESS, + getExpirationTime(ttl) + ); + if (rowsUpdated == 1) { // if row updated, took ownership from an expired leader + LOG.debug("Took lead from expired entry for segment {}", leaderId); + return true; + } + } + return false; + } + } + } + LOG.warn("Unknown error occurred while taking lead on segment {}", leaderId); + return false; + } + + @Override + public boolean renewLead(UUID leaderId) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + int rowsUpdated = getPostgresStorage(h).renewLead( + leaderId, + reaperInstanceId, + AppContext.REAPER_INSTANCE_ADDRESS + ); + + if (rowsUpdated == 1) { + LOG.debug("Renewed lead on segment {}", leaderId); + return true; + } + LOG.error("Failed to renew lead on segment {}", leaderId); + } + } + return false; + } + + @Override + public boolean renewLead(UUID leaderId, int ttl) { + return renewLead(leaderId); + } + + @Override + public List getLeaders() { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + List leaderSequenceIds = getPostgresStorage(h).getLeaders(getExpirationTime(leaderTimeout)); + return leaderSequenceIds + .stream() + .map(UuidUtil::fromSequenceId) + .collect(Collectors.toList()); + } + } + return new ArrayList<>(); + } + + @Override + public void releaseLead(UUID leaderId) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + int rowsDeleted = getPostgresStorage(h).releaseLead( + leaderId, + reaperInstanceId + ); + if (rowsDeleted == 1) { + LOG.debug("Released lead on segment {}", leaderId); + } else { + LOG.error("Could not release lead on segment {}", leaderId); + } + } + } + } + + @Override + public void saveHeartbeat() { + beat(); + deleteOldReapers(); + } + + @Override + public int countRunningReapers() { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + return getPostgresStorage(h).countRunningReapers(getExpirationTime(reaperTimeout)); + } + } + LOG.warn("Failed to get running reaper count from storage"); + return 1; + } + + @Override + public void storeNodeMetrics(UUID runId, NodeMetrics nodeMetrics) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + getPostgresStorage(h).storeNodeMetrics( + UuidUtil.toSequenceId(runId), + nodeMetrics.getNode(), + nodeMetrics.getCluster(), + nodeMetrics.getDatacenter(), + nodeMetrics.isRequested(), + nodeMetrics.getPendingCompactions(), + nodeMetrics.hasRepairRunning(), + nodeMetrics.getActiveAnticompactions() + ); + } + } + } + + @Override + public Collection getNodeMetrics(UUID runId) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + Instant expirationTime = getExpirationTime(reaperTimeout); + return getPostgresStorage(h).getNodeMetrics( + UuidUtil.toSequenceId(runId), + expirationTime + ); + } + } + return new ArrayList<>(); + } + + @Override + public Optional getNodeMetrics(UUID runId, String node) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + Instant expirationTime = getExpirationTime(reaperTimeout); + NodeMetrics nm = getPostgresStorage(h).getNodeMetricsByNode( + UuidUtil.toSequenceId(runId), + expirationTime, + node + ); + if (nm != null) { + return Optional.of(nm); + } + } + } + return Optional.empty(); + } + + @Override + public void deleteNodeMetrics(UUID runId, String node) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + getPostgresStorage(h).deleteNodeMetricsByNode( + UuidUtil.toSequenceId(runId), + node + ); + } + } + } + + @Override + public void purgeNodeMetrics() { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + Instant expirationTime = getExpirationTime(reaperTimeout); + getPostgresStorage(h).purgeOldNodeMetrics(expirationTime); + } + } + } + + @Override + public Optional getNextFreeSegmentForRanges( + UUID runId, + Optional parallelRange, + List ranges) { + List segments + = Lists.newArrayList(getRepairSegmentsForRun(runId)); + Collections.shuffle(segments); + + for (RepairSegment seg : segments) { + if (seg.getState().equals(RepairSegment.State.NOT_STARTED) && withinRange(seg, parallelRange)) { + for (RingRange range : ranges) { + if (segmentIsWithinRange(seg, range)) { + LOG.debug( + "Segment [{}, {}] is within range [{}, {}]", + seg.getStartToken(), + seg.getEndToken(), + range.getStart(), + range.getEnd()); + return Optional.of(seg); + } + } + } + } + return Optional.empty(); + } + + @Override + public void storeMetric(GenericMetric metric) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + Instant metricTs = Instant.ofEpochMilli(metric.getTs().getMillis()); + IStoragePostgreSql storage = getPostgresStorage(h); + int rowsUpdated = storage.updateMetricSourceNodeTimestamp( + metric.getClusterName(), + metric.getHost(), + metricTs + ); + if (rowsUpdated == 0) { + try { + storage.insertMetricSourceNode(metric.getClusterName(), metric.getHost(), metricTs); + } catch (UnableToExecuteStatementException e) { + if (!JdbiExceptionUtil.isDuplicateKeyError(e)) { + LOG.error("Unable to update GenericMetric source nodes table"); + throw e; + } + } + } + + try { + storage.insertMetricType( + metric.getMetricDomain(), + metric.getMetricType(), + metric.getMetricScope(), + metric.getMetricName(), + metric.getMetricAttribute() + ); + } catch (UnableToExecuteStatementException e) { + if (!JdbiExceptionUtil.isDuplicateKeyError(e)) { + LOG.error("Unable to update GenericMetric metric types table"); + throw e; + } + } + storage.insertMetric( + metric.getClusterName(), + metric.getHost(), + metricTs, + metric.getMetricDomain(), + metric.getMetricType(), + metric.getMetricScope(), + metric.getMetricName(), + metric.getMetricAttribute(), + metric.getValue() + ); + } + } + } + + @Override + public List getMetrics( + String clusterName, + Optional host, + String metricDomain, + String metricType, + long since) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + if (host.isPresent()) { + return new ArrayList<>( + getPostgresStorage(h).getMetricsForHost( + clusterName, + host.get(), + metricDomain, + metricType, + Instant.ofEpochMilli(since) + ) + ); + } else { + return new ArrayList<>( + getPostgresStorage(h).getMetricsForCluster( + clusterName, + metricDomain, + metricType, + Instant.ofEpochMilli(since) + ) + ); + } + } + } + return new ArrayList<>(); + } + + @Override + public void purgeMetrics() { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + IStoragePostgreSql storage = getPostgresStorage(h); + Instant expirationTime = getExpirationTime(metricsTimeout); + storage.purgeOldMetrics(expirationTime); + storage.purgeOldSourceNodes(expirationTime); + } + } + } + + @Override + public void storeOperations(String clusterName, OpType operationType, String host, String operationsJson) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + getPostgresStorage(h).insertOperations(clusterName, operationType.getName(), host, operationsJson); + } + } + } + + @Override + public String listOperations(String clusterName, OpType operationType, String host) { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + String opString = getPostgresStorage(h).listOperations(clusterName, operationType.getName(), host); + return (opString != null) ? opString : "[]"; + } + } + LOG.error("Failed retrieving node operations for cluster {}, node {}", clusterName, host); + return "[]"; + } + + @Override + public void purgeNodeOperations() { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + Instant expirationTime = getExpirationTime(nodeOperationsTimeout); + getPostgresStorage(h).purgeOldNodeOperations(expirationTime); + } + } + } + + private void beat() { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + int rowsUpdated = getPostgresStorage(h).updateHeartbeat( + reaperInstanceId, + AppContext.REAPER_INSTANCE_ADDRESS + ); + if (rowsUpdated == 0) { + LOG.debug("Creating new entry for reaper {}", reaperInstanceId); + int rowsInserted = getPostgresStorage(h).insertHeartbeat( + reaperInstanceId, + AppContext.REAPER_INSTANCE_ADDRESS + ); + if (rowsInserted != 1) { + LOG.error("Failed to create entry for reaper {}", reaperInstanceId); + } + } + } + } + } + + private void deleteOldReapers() { + if (null != jdbi) { + try (Handle h = jdbi.open()) { + getPostgresStorage(h).deleteOldReapers(getExpirationTime(reaperTimeout)); + } + } + } + + private void insertGenericMetricSourceNode(GenericMetric metric) { + + } + + private static Instant getExpirationTime(Duration timeout) { + return Instant.now().minus(timeout); + } + + private static boolean segmentIsWithinRange(RepairSegment segment, RingRange range) { + return range.encloses(new RingRange(segment.getStartToken(), segment.getEndToken())); + } + + private static boolean withinRange(RepairSegment segment, Optional range) { + return !range.isPresent() || segmentIsWithinRange(segment, range.get()); + } } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/GenericMetricMapper.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/GenericMetricMapper.java new file mode 100644 index 000000000..d9c3808c0 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/GenericMetricMapper.java @@ -0,0 +1,45 @@ +/* + * Copyright 2015-2017 Spotify AB + * Copyright 2016-2018 The Last Pickle Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.storage.postgresql; + +import io.cassandrareaper.core.GenericMetric; + +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.joda.time.DateTime; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.ResultSetMapper; + +public final class GenericMetricMapper implements ResultSetMapper { + + @Override + public GenericMetric map(int index, ResultSet rs, StatementContext ctx) throws SQLException { + return GenericMetric.builder() + .withClusterName(rs.getString("cluster")) + .withHost(rs.getString("host")) + .withMetricDomain(rs.getString("metric_domain")) + .withMetricType(rs.getString("metric_type")) + .withMetricScope(rs.getString("metric_scope")) + .withMetricName(rs.getString("metric_name")) + .withMetricAttribute(rs.getString("metric_attribute")) + .withValue(rs.getDouble("value")) + .withTs(new DateTime(rs.getTimestamp("ts"))) + .build(); + } +} \ No newline at end of file diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/IStoragePostgreSql.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/IStoragePostgreSql.java index f0c256ef9..5782e2a6c 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/IStoragePostgreSql.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/IStoragePostgreSql.java @@ -19,6 +19,8 @@ import io.cassandrareaper.core.Cluster; import io.cassandrareaper.core.DiagEventSubscription; +import io.cassandrareaper.core.GenericMetric; +import io.cassandrareaper.core.NodeMetrics; import io.cassandrareaper.core.RepairRun; import io.cassandrareaper.core.RepairSchedule; import io.cassandrareaper.core.RepairSegment; @@ -29,12 +31,14 @@ import io.cassandrareaper.service.RepairParameters; import java.math.BigInteger; +import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.UUID; import org.skife.jdbi.v2.sqlobject.Bind; import org.skife.jdbi.v2.sqlobject.BindBean; @@ -263,6 +267,136 @@ public interface IStoragePostgreSql { String SQL_DELETE_EVENT_SUBSCRIPTION_BY_ID = "DELETE FROM diag_event_subscription WHERE id = :id"; + // leader election + // + String SQL_INSERT_LEAD = "INSERT INTO leader (leader_id, reaper_instance_id, reaper_instance_host, last_heartbeat)" + + " VALUES " + + "(:leaderId, :reaperInstanceId, :reaperInstanceHost, now())"; + + String SQL_UPDATE_LEAD = "UPDATE leader " + + " SET " + + "reaper_instance_id = :reaperInstanceId, reaper_instance_host = :reaperInstanceHost, last_heartbeat = now()" + + " WHERE " + + "leader_id = :leaderId AND last_heartbeat < :expirationTime"; + + String SQL_RENEW_LEAD = "UPDATE leader " + + " SET " + + "reaper_instance_id = :reaperInstanceId, reaper_instance_host = :reaperInstanceHost, last_heartbeat = now()" + + " WHERE " + + "leader_id = :leaderId AND reaper_instance_id = :reaperInstanceId"; + + String SQL_SELECT_ACTIVE_LEADERS = "SELECT leader_id from leader" + + " WHERE " + + " last_heartbeat >= :expirationTime"; + + String SQL_RELEASE_LEAD = "DELETE FROM leader" + + " WHERE " + + "leader_id = :leaderId AND reaper_instance_id = :reaperInstanceId"; + + String SQL_FORCE_RELEASE_LEAD = "DELETE FROM leader WHERE leader_id = :leaderId"; + + String SQL_INSERT_HEARTBEAT = "INSERT INTO running_reapers(reaper_instance_id, reaper_instance_host, last_heartbeat)" + + " VALUES " + + "(:reaperInstanceId, :reaperInstanceHost, now())"; + + String SQL_UPDATE_HEARTBEAT = "UPDATE running_reapers" + + " SET " + + "reaper_instance_id = :reaperInstanceId, reaper_instance_host = :reaperInstanceHost, last_heartbeat = now()" + + " WHERE " + + "reaper_instance_id = :reaperInstanceId"; + + String SQL_DELETE_OLD_REAPERS = "DELETE FROM running_reapers" + + " WHERE " + + "last_heartbeat < :expirationTime"; + + String SQL_COUNT_RUNNING_REAPERS = "SELECT COUNT(*) FROM running_reapers" + + " WHERE " + + "last_heartbeat >= :expirationTime"; + + String SQL_STORE_NODE_METRICS = "INSERT INTO node_metrics_v1 (run_id,ts,node,datacenter," + + "cluster,requested,pending_compactions,has_repair_running,active_anticompactions)" + + " VALUES " + + "(:runId, now(), :node, :datacenter, :cluster, :requested, :pendingCompactions, :hasRepairRunning, " + + ":activeAntiCompactions)"; + + String SQL_GET_NODE_METRICS = "SELECT * FROM node_metrics_v1" + + " WHERE " + + "run_id = :runId AND ts > :expirationTime"; + + String SQL_GET_NODE_METRICS_BY_NODE = "SELECT * FROM node_metrics_v1" + + " WHERE" + + " run_id = :runId AND ts > :expirationTime AND node = :node" + + " ORDER BY ts DESC LIMIT 1"; + + String SQL_DELETE_NODE_METRICS_BY_NODE = "DELETE FROM node_metrics_v1" + + " WHERE " + + " run_id = :runId AND node = :node"; + + String SQL_PURGE_OLD_NODE_METRICS = "DELETE FROM node_metrics_v1" + + " WHERE" + + " ts < :expirationTime"; + + // sidecar-mode metrics + // + String SQL_UPDATE_SOURCE_NODE_TIMESTAMP = "UPDATE node_metrics_v2_source_nodes" + + " SET last_updated = :timestamp" + + " WHERE cluster = :cluster AND host = :host"; + + String SQL_ADD_SOURCE_NODE = "INSERT INTO node_metrics_v2_source_nodes (cluster, host, last_updated)" + + " VALUES (:cluster, :host, :timestamp)"; + + String SQL_ADD_METRIC_TYPE = "INSERT INTO node_metrics_v2_metric_types" + + " (metric_domain, metric_type, metric_scope, metric_name, metric_attribute)" + + " VALUES" + + " (:metricDomain, :metricType, :metricScope, :metricName, :metricAttribute)"; + + String SQL_GET_SOURCE_NODE_ID = "SELECT source_node_id FROM node_metrics_v2_source_nodes" + + " WHERE cluster = :cluster AND host = :host"; + + String SQL_GET_METRIC_TYPE_ID = "SELECT metric_type_id FROM node_metrics_v2_metric_types" + + " WHERE" + + " metric_domain = :metricDomain AND metric_type = :metricType AND metric_scope = :metricScope" + + " AND metric_name = :metricName AND metric_attribute = :metricAttribute"; + + String SQL_INSERT_METRIC = "INSERT INTO node_metrics_v2 (metric_type_id, source_node_id, ts, value)" + + " VALUES (" + + " (" + SQL_GET_METRIC_TYPE_ID + ")," + + " (" + SQL_GET_SOURCE_NODE_ID + ")," + + " :timestamp," + + " :value)"; + + String SQL_GET_METRICS_FOR_HOST = "SELECT cluster, host, metric_domain, metric_type, metric_scope, metric_name," + + " metric_attribute, ts, value" + + " FROM node_metrics_v2" + + " NATURAL JOIN node_metrics_v2_metric_types" + + " NATURAL JOIN node_metrics_v2_source_nodes" + + " WHERE" + + " cluster = :cluster AND host = :host AND metric_domain = :metricDomain AND metric_type = :metricType" + + " AND ts >= :since"; + + String SQL_GET_METRICS_FOR_CLUSTER = "SELECT cluster, host, metric_domain, metric_type, metric_scope, metric_name," + + " metric_attribute, ts, value" + + " FROM node_metrics_v2" + + " NATURAL JOIN node_metrics_v2_metric_types" + + " NATURAL JOIN node_metrics_v2_source_nodes" + + " WHERE" + + " cluster = :cluster AND metric_domain = :metricDomain AND metric_type = :metricType" + + " AND ts >= :since"; + + String SQL_PURGE_OLD_METRICS = "DELETE FROM node_metrics_v2 WHERE ts < :expirationTime"; + + String SQL_PURGE_OLD_SOURCE_NODES = "DELETE FROM node_metrics_v2_source_nodes WHERE last_updated < :expirationTime"; + + String SQL_INSERT_OPERATIONS = "INSERT INTO node_operations (cluster, type, host, data, ts)" + + " VALUES (:cluster, :type, :host, :data, now())"; + + String SQL_LIST_OPERATIONS = "SELECT data FROM node_operations" + + " WHERE" + + " cluster = :cluster AND type = :operationType AND host = :host" + + " ORDER BY ts DESC LIMIT 1"; + + String SQL_PURGE_OLD_NODE_OPERATIONS = "DELETE from node_operations WHERE ts < :expirationTime"; + static String[] parseStringArray(Object obj) { String[] values = null; if (obj instanceof String[]) { @@ -515,4 +649,187 @@ long insertDiagEventSubscription( @SqlUpdate(SQL_DELETE_EVENT_SUBSCRIPTION_BY_ID) int deleteEventSubscription( @Bind("id") long subscriptionId); + + @SqlUpdate(SQL_INSERT_LEAD) + int insertLeaderEntry( + @Bind("leaderId") UUID leaderId, + @Bind("reaperInstanceId") UUID reaperInstanceId, + @Bind("reaperInstanceHost") String reaperInstanceHost + ); + + @SqlUpdate(SQL_UPDATE_LEAD) + int updateLeaderEntry( + @Bind("leaderId") UUID leaderId, + @Bind("reaperInstanceId") UUID reaperInstanceId, + @Bind("reaperInstanceHost") String reaperInstanceHost, + @Bind("expirationTime") Instant expirationTime + ); + + @SqlUpdate(SQL_RENEW_LEAD) + int renewLead( + @Bind("leaderId") UUID leaderId, + @Bind("reaperInstanceId") UUID reaperInstanceId, + @Bind("reaperInstanceHost") String reaperInstanceHost + ); + + @SqlQuery(SQL_SELECT_ACTIVE_LEADERS) + List getLeaders( + @Bind("expirationTime") Instant expirationTime + ); + + @SqlUpdate(SQL_RELEASE_LEAD) + int releaseLead( + @Bind("leaderId") UUID leaderId, + @Bind("reaperInstanceId") UUID reaperInstanceId + ); + + @SqlUpdate(SQL_FORCE_RELEASE_LEAD) + int forceReleaseLead( + @Bind("leaderId") UUID leaderId + ); + + @SqlUpdate(SQL_INSERT_HEARTBEAT) + int insertHeartbeat( + @Bind("reaperInstanceId") UUID reaperInstanceId, + @Bind("reaperInstanceHost") String reaperInstanceHost + ); + + @SqlUpdate(SQL_UPDATE_HEARTBEAT) + int updateHeartbeat( + @Bind("reaperInstanceId") UUID reaperInstanceId, + @Bind("reaperInstanceHost") String reaperInstanceHost + ); + + @SqlUpdate(SQL_DELETE_OLD_REAPERS) + int deleteOldReapers( + @Bind("expirationTime") Instant expirationTime + ); + + @SqlQuery(SQL_COUNT_RUNNING_REAPERS) + int countRunningReapers( + @Bind("expirationTime") Instant expirationTime + ); + + @SqlUpdate(SQL_STORE_NODE_METRICS) + int storeNodeMetrics( + @Bind("runId") long runId, + @Bind("node") String node, + @Bind("cluster") String cluster, + @Bind("datacenter") String datacenter, + @Bind("requested") Boolean requested, + @Bind("pendingCompactions") int pendingCompactions, + @Bind("hasRepairRunning") Boolean hasRepairRunning, + @Bind("activeAntiCompactions") int activeAntiCompactions + ); + + @SqlQuery(SQL_GET_NODE_METRICS) + @Mapper(NodeMetricsMapper.class) + Collection getNodeMetrics( + @Bind("runId") long runId, + @Bind("expirationTime") Instant expirationTime + ); + + @SqlQuery(SQL_GET_NODE_METRICS_BY_NODE) + @Mapper(NodeMetricsMapper.class) + NodeMetrics getNodeMetricsByNode( + @Bind("runId") long runId, + @Bind("expirationTime") Instant expirationTime, + @Bind("node") String node + ); + + @SqlUpdate(SQL_DELETE_NODE_METRICS_BY_NODE) + int deleteNodeMetricsByNode( + @Bind("runId") long runId, + @Bind("node") String node + ); + + @SqlUpdate(SQL_PURGE_OLD_NODE_METRICS) + int purgeOldNodeMetrics( + @Bind("expirationTime") Instant expirationTime + ); + + @SqlUpdate(SQL_ADD_SOURCE_NODE) + int insertMetricSourceNode( + @Bind("cluster") String cluster, + @Bind("host") String host, + @Bind("timestamp") Instant timestamp + ); + + @SqlUpdate(SQL_UPDATE_SOURCE_NODE_TIMESTAMP) + int updateMetricSourceNodeTimestamp( + @Bind("cluster") String cluster, + @Bind("host") String host, + @Bind("timestamp") Instant timestamp + ); + + @SqlUpdate(SQL_ADD_METRIC_TYPE) + int insertMetricType( + @Bind("metricDomain") String metricDomain, + @Bind("metricType") String metricType, + @Bind("metricScope") String metricScope, + @Bind("metricName") String metricName, + @Bind("metricAttribute") String metricAttribute + ); + + @SqlUpdate(SQL_INSERT_METRIC) + int insertMetric( + @Bind("cluster") String cluster, + @Bind("host") String host, + @Bind("timestamp") Instant timestamp, + @Bind("metricDomain") String metricDomain, + @Bind("metricType") String metricType, + @Bind("metricScope") String metricScope, + @Bind("metricName") String metricName, + @Bind("metricAttribute") String metricAttribute, + @Bind("value") double value + ); + + @SqlQuery(SQL_GET_METRICS_FOR_HOST) + @Mapper(GenericMetricMapper.class) + Collection getMetricsForHost( + @Bind("cluster") String cluster, + @Bind("host") String host, + @Bind("metricDomain") String metricDomain, + @Bind("metricType") String metricType, + @Bind("since") Instant since + ); + + @SqlQuery(SQL_GET_METRICS_FOR_CLUSTER) + @Mapper(GenericMetricMapper.class) + Collection getMetricsForCluster( + @Bind("cluster") String cluster, + @Bind("metricDomain") String metricDomain, + @Bind("metricType") String metricType, + @Bind("since") Instant since + ); + + @SqlUpdate(SQL_PURGE_OLD_METRICS) + int purgeOldMetrics( + @Bind("expirationTime") Instant expirationTime + ); + + @SqlUpdate(SQL_PURGE_OLD_SOURCE_NODES) + int purgeOldSourceNodes( + @Bind("expirationTime") Instant expirationTime + ); + + @SqlUpdate(SQL_INSERT_OPERATIONS) + int insertOperations( + @Bind("cluster") String cluster, + @Bind("type") String operationType, + @Bind("host") String host, + @Bind("data") String data + ); + + @SqlQuery(SQL_LIST_OPERATIONS) + String listOperations( + @Bind("cluster") String cluster, + @Bind("operationType") String operationType, + @Bind("host") String host + ); + + @SqlUpdate(SQL_PURGE_OLD_NODE_OPERATIONS) + int purgeOldNodeOperations( + @Bind("expirationTime") Instant expirationTime + ); } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/InstantArgumentFactory.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/InstantArgumentFactory.java new file mode 100644 index 000000000..ecc4dcc4e --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/InstantArgumentFactory.java @@ -0,0 +1,43 @@ +/* + * + * Copyright 2019-2019 The Last Pickle Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.storage.postgresql; + +import java.sql.Timestamp; +import java.time.Instant; + +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.Argument; +import org.skife.jdbi.v2.tweak.ArgumentFactory; + +public class InstantArgumentFactory implements ArgumentFactory { + + @Override + public boolean accepts(Class expectedType, Object value, StatementContext ctx) { + return value instanceof Instant; + } + + @Override + public Argument build(Class expectedType, final Instant value, StatementContext ctx) { + return (pos, stmt, sc) -> stmt.setTimestamp(pos, toSqlTimestamp(value)); + } + + private static Timestamp toSqlTimestamp(Instant instant) { + return Timestamp.from(instant); + } + +} diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/JdbiExceptionUtil.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/JdbiExceptionUtil.java new file mode 100644 index 000000000..f9ab2a293 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/JdbiExceptionUtil.java @@ -0,0 +1,36 @@ +/* + * + * Copyright 2019-2019 The Last Pickle Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.storage.postgresql; + +import org.postgresql.util.PSQLException; +import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; + +public final class JdbiExceptionUtil { + + private static final String DUPLICATE_KEY_CODE = "23505"; + + private JdbiExceptionUtil() { + } + + public static boolean isDuplicateKeyError(UnableToExecuteStatementException exc) { + if (exc.getCause() instanceof PSQLException) { + return ((PSQLException) exc.getCause()).getSQLState().equals(DUPLICATE_KEY_CODE); + } + return false; + } +} diff --git a/src/server/src/main/java/io/cassandrareaper/storage/postgresql/NodeMetricsMapper.java b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/NodeMetricsMapper.java new file mode 100644 index 000000000..21d99983b --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/storage/postgresql/NodeMetricsMapper.java @@ -0,0 +1,41 @@ +/* + * + * Copyright 2019-2019 The Last Pickle Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.storage.postgresql; + +import io.cassandrareaper.core.NodeMetrics; + +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.ResultSetMapper; + +public final class NodeMetricsMapper implements ResultSetMapper { + + @Override + public NodeMetrics map(int index, ResultSet rs, StatementContext ctx) throws SQLException { + return NodeMetrics.builder() + .withNode(rs.getString("node")) + .withCluster(rs.getString("cluster")) + .withDatacenter(rs.getString("datacenter")) + .withPendingCompactions(rs.getInt("pending_compactions")) + .withHasRepairRunning(rs.getBoolean("has_repair_running")) + .withActiveAnticompactions(rs.getInt("active_anticompactions")) + .build(); + } +} \ No newline at end of file diff --git a/src/server/src/main/resources/db/h2/V17_0_0__multi_instance.sql b/src/server/src/main/resources/db/h2/V17_0_0__multi_instance.sql new file mode 100644 index 000000000..a8bd0a139 --- /dev/null +++ b/src/server/src/main/resources/db/h2/V17_0_0__multi_instance.sql @@ -0,0 +1,29 @@ +-- H2-compatible version of multi-instance reaper Postgres DDL +-- CHANGES: +-- "node" TEXT --> "node" VARCHAR(255) because H2 doesn't support index on TEXT + +CREATE TABLE IF NOT EXISTS leader ( + leader_id BIGINT PRIMARY KEY, + reaper_instance_id BIGINT, + reaper_instance_host TEXT, + last_heartbeat TIMESTAMP WITH TIME ZONE +); + +CREATE TABLE IF NOT EXISTS running_reapers ( + reaper_instance_id BIGINT PRIMARY KEY, + reaper_instance_host TEXT, + last_heartbeat TIMESTAMP WITH TIME ZONE +); + +CREATE TABLE IF NOT EXISTS node_metrics_v1 ( + time_partition BIGINT, + run_id BIGINT, + node VARCHAR(255), + cluster TEXT, + datacenter TEXT, + requested BOOLEAN, + pending_compactions INT, + has_repair_running BOOLEAN, + active_anticompactions INT, + PRIMARY KEY(run_id, time_partition, node) +); \ No newline at end of file diff --git a/src/server/src/main/resources/db/postgres/V17_0_0__multi_instance.sql b/src/server/src/main/resources/db/postgres/V17_0_0__multi_instance.sql new file mode 100644 index 000000000..fe39e7891 --- /dev/null +++ b/src/server/src/main/resources/db/postgres/V17_0_0__multi_instance.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS "leader" ( + "leader_id" BIGINT PRIMARY KEY, + "reaper_instance_id" BIGINT, + "reaper_instance_host" TEXT, + "last_heartbeat" TIMESTAMP WITH TIME ZONE +); + +CREATE TABLE IF NOT EXISTS "running_reapers" ( + "reaper_instance_id" BIGINT PRIMARY KEY, + "reaper_instance_host" TEXT, + "last_heartbeat" TIMESTAMP WITH TIME ZONE +); + +CREATE TABLE IF NOT EXISTS "node_metrics_v1" ( + "run_id" BIGINT, + "ts" TIMESTAMP WITH TIME ZONE, + "node" TEXT, + "cluster" TEXT, + "datacenter" TEXT, + "requested" BOOLEAN, + "pending_compactions" INT, + "has_repair_running" BOOLEAN, + "active_anticompactions" INT, + PRIMARY KEY("run_id", "ts", "node") +); diff --git a/src/server/src/main/resources/db/postgres/V18_0_0__sidecar_mode.sql b/src/server/src/main/resources/db/postgres/V18_0_0__sidecar_mode.sql new file mode 100644 index 000000000..761d9e934 --- /dev/null +++ b/src/server/src/main/resources/db/postgres/V18_0_0__sidecar_mode.sql @@ -0,0 +1,34 @@ +CREATE TABLE IF NOT EXISTS "node_metrics_v2_source_nodes" ( + "source_node_id" SERIAL UNIQUE, + "cluster" TEXT, + "host" TEXT, + "last_updated" TIMESTAMP WITH TIME ZONE, + PRIMARY KEY ("cluster", "host") +); + +CREATE TABLE IF NOT EXISTS "node_metrics_v2_metric_types" ( + "metric_type_id" SERIAL UNIQUE, + "metric_domain" TEXT, + "metric_type" TEXT, + "metric_scope" TEXT, + "metric_name" TEXT, + "metric_attribute" TEXT, + PRIMARY KEY ("metric_domain", "metric_type", "metric_scope", "metric_name", "metric_attribute") +); + +CREATE TABLE IF NOT EXISTS "node_metrics_v2" ( + "metric_type_id" INT REFERENCES "node_metrics_v2_metric_types"("metric_type_id"), + "source_node_id" INT REFERENCES "node_metrics_v2_source_nodes"("source_node_id"), + "ts" TIMESTAMP WITH TIME ZONE, + "value" DOUBLE PRECISION, + PRIMARY KEY ("metric_type_id", "source_node_id", "ts") +); + +CREATE TABLE IF NOT EXISTS "node_operations" ( + "cluster" TEXT, + "type" TEXT, + "host" TEXT, + "ts" TIMESTAMP WITH TIME ZONE, + "data" TEXT, + PRIMARY KEY ("cluster", "type", "host", "ts") +); diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java index 55ea00105..2968df3af 100644 --- a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java +++ b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java @@ -30,6 +30,7 @@ import io.cassandrareaper.resources.view.RepairScheduleStatus; import io.cassandrareaper.service.RepairRunService; import io.cassandrareaper.storage.CassandraStorage; +import io.cassandrareaper.storage.PostgresStorage; import io.cassandrareaper.storage.postgresql.DiagEventSubscriptionMapper; import java.util.Arrays; @@ -107,13 +108,13 @@ public final class BasicSteps { private TestContext testContext; public static synchronized void addReaperRunner(ReaperTestJettyRunner runner) { - - String csCls = CassandraStorage.class.getName(); if (!CLIENTS.isEmpty()) { - Preconditions.checkState(csCls.equals(runner.runnerInstance.getContextStorageClassname())); - + Preconditions.checkState(isInstanceOfDistributedStorage(runner.runnerInstance.getContextStorageClassname())); RUNNERS.stream() - .forEach(r -> Preconditions.checkState(csCls.equals(runner.runnerInstance.getContextStorageClassname()))); + .forEach(r -> + Preconditions.checkState( + isInstanceOfDistributedStorage(runner.runnerInstance.getContextStorageClassname()) + )); } RUNNERS.add(runner); CLIENTS.add(runner.getClient()); @@ -1976,4 +1977,10 @@ private List callSubscription( ? ImmutableList.of(SimpleReaperClient.parseEventSubscriptionJSON(responseData)) : SimpleReaperClient.parseEventSubscriptionsListJSON(responseData); } + + private static boolean isInstanceOfDistributedStorage(String storageClassname) { + String csCls = CassandraStorage.class.getName(); + String pgCls = PostgresStorage.class.getName(); + return csCls.equals(storageClassname) || pgCls.equals(storageClassname); + } } diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresIT.java index c6dd25000..3fff60e40 100644 --- a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresIT.java +++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresIT.java @@ -18,7 +18,10 @@ package io.cassandrareaper.acceptance; +import java.util.List; import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; import cucumber.api.CucumberOptions; import cucumber.api.junit.Cucumber; @@ -39,8 +42,10 @@ public class ReaperPostgresIT { private static final Logger LOG = LoggerFactory.getLogger(ReaperPostgresIT.class); - private static ReaperTestJettyRunner runner; + private static final List RUNNER_INSTANCES = new CopyOnWriteArrayList<>(); private static final String POSTGRES_CONFIG_FILE = "cassandra-reaper-postgres-at.yaml"; + private static final Random RAND = new Random(System.nanoTime()); + private static Thread GRIM_REAPER; protected ReaperPostgresIT() {} @@ -49,14 +54,52 @@ protected ReaperPostgresIT() {} public static void setUp() throws Exception { LOG.info("setting up testing Reaper runner with {} seed hosts defined and Postgres storage", TestContext.TEST_CLUSTER_SEED_HOSTS.size()); - runner = new ReaperTestJettyRunner(POSTGRES_CONFIG_FILE, Optional.empty()); - BasicSteps.addReaperRunner(runner); + + int minReaperInstances = Integer.getInteger("grim.reaper.min", 1); + int maxReaperInstances = Integer.getInteger("grim.reaper.max", minReaperInstances); + + for (int i = 0; i < minReaperInstances; ++i) { + createReaperTestJettyRunner(Optional.empty()); + } + + GRIM_REAPER = new Thread(() -> { + Thread.currentThread().setName("GRIM REAPER"); + while (!Thread.currentThread().isInterrupted()) { //keep adding/removing reaper instances while test is running + try { + if (maxReaperInstances > RUNNER_INSTANCES.size()) { + createReaperTestJettyRunner(Optional.empty()); + } else { + int remove = minReaperInstances + RAND.nextInt(maxReaperInstances - minReaperInstances); + removeReaperTestJettyRunner(RUNNER_INSTANCES.get(remove)); + } + } catch (RuntimeException | InterruptedException ex) { + LOG.error("failed adding/removing reaper instance", ex); + } + } + }); + if (minReaperInstances < maxReaperInstances) { + GRIM_REAPER.start(); + } } @AfterClass public static void tearDown() { LOG.info("Stopping reaper service..."); - runner.runnerInstance.after(); + GRIM_REAPER.interrupt(); + RUNNER_INSTANCES.forEach(r -> r.runnerInstance.after()); } + private static void createReaperTestJettyRunner(Optional version) throws InterruptedException { + ReaperTestJettyRunner runner = new ReaperTestJettyRunner(POSTGRES_CONFIG_FILE, version); + RUNNER_INSTANCES.add(runner); + Thread.sleep(100); + BasicSteps.addReaperRunner(runner); + } + + private static void removeReaperTestJettyRunner(ReaperTestJettyRunner runner) throws InterruptedException { + BasicSteps.removeReaperRunner(runner); + Thread.sleep(200); + runner.runnerInstance.after(); + RUNNER_INSTANCES.remove(runner); + } } diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresSidecarIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresSidecarIT.java new file mode 100644 index 000000000..8f5a04a7c --- /dev/null +++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperPostgresSidecarIT.java @@ -0,0 +1,74 @@ +/* + * Copyright 2014-2017 Spotify AB + * Copyright 2016-2019 The Last Pickle Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.acceptance; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +import cucumber.api.CucumberOptions; +import cucumber.api.junit.Cucumber; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Cucumber.class) +@CucumberOptions( + features = { + "classpath:io.cassandrareaper.acceptance/integration_reaper_functionality.feature", + "classpath:io.cassandrareaper.acceptance/event_subscriptions.feature" + }, + plugin = {"pretty"} + ) +public class ReaperPostgresSidecarIT { + + private static final Logger LOG = LoggerFactory.getLogger(ReaperPostgresSidecarIT.class); + private static final List RUNNER_INSTANCES = new CopyOnWriteArrayList<>(); + private static final String[] POSTGRES_CONFIG_FILE = { + "reaper-postgres-sidecar1-at.yaml", + "reaper-postgres-sidecar2-at.yaml", + }; + + protected ReaperPostgresSidecarIT() {} + + @BeforeClass + public static void setUp() throws Exception { + LOG.info("setting up testing Reaper runner with {} seed hosts defined and Postgres storage", + TestContext.TEST_CLUSTER_SEED_HOSTS.size()); + + int reaperInstances = Integer.getInteger("grim.reaper.min", 2); + for (int i = 0;i < reaperInstances;i++) { + createReaperTestJettyRunner(Optional.empty()); + } + } + + @AfterClass + public static void tearDown() { + LOG.info("Stopping reaper service..."); + RUNNER_INSTANCES.forEach(r -> r.runnerInstance.after()); + } + + private static void createReaperTestJettyRunner(Optional version) throws InterruptedException { + ReaperTestJettyRunner runner = new ReaperTestJettyRunner(POSTGRES_CONFIG_FILE[RUNNER_INSTANCES.size()], version); + RUNNER_INSTANCES.add(runner); + Thread.sleep(100); + BasicSteps.addReaperRunner(runner); + } +} diff --git a/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java b/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java new file mode 100644 index 000000000..dea08e47b --- /dev/null +++ b/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java @@ -0,0 +1,565 @@ +/* + * + * Copyright 2019-2019 The Last Pickle Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.storage; + +import io.cassandrareaper.AppContext; +import io.cassandrareaper.core.GenericMetric; +import io.cassandrareaper.core.NodeMetrics; +import io.cassandrareaper.storage.postgresql.IStoragePostgreSql; +import io.cassandrareaper.storage.postgresql.UuidUtil; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.Reader; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.ibatis.common.jdbc.ScriptRunner; +import org.fest.assertions.api.Assertions; +import org.h2.tools.Server; +import org.joda.time.DateTime; +import org.junit.Before; +import org.junit.Test; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.Handle; + +public class PostgresStorageTest { + + private static final String DB_URL = "jdbc:h2:mem:test_mem;MODE=PostgreSQL;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false"; + + @Before + public void setUp() throws SQLException, IOException { + Server.createTcpServer().start(); + + DBI dbi = new DBI(DB_URL); + Handle handle = dbi.open(); + Connection conn = handle.getConnection(); + + // to suppress output of ScriptRunner + PrintStream tmp = new PrintStream(new OutputStream() { + @Override + public void write(int buff) throws IOException { + // do nothing + } + }); + PrintStream console = System.out; + System.setOut(tmp); + + String cwd = Paths.get("").toAbsolutePath().toString(); + String path = cwd + "/../src/test/resources/db/postgres/V17_0_0__multi_instance.sql"; + ScriptRunner scriptExecutor = new ScriptRunner(conn, false, true); + Reader reader = new BufferedReader(new FileReader(path)); + scriptExecutor.runScript(reader); + + System.setOut(console); + } + + @Test + public void testTakeLead() { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from leader"); + + int numEntries = 5; + Set leaderIds = new HashSet<>(); + for (int i = 0; i < numEntries; i++) { + UUID msbLeaderId = UuidUtil.fromSequenceId(UuidUtil.toSequenceId(UUID.randomUUID())); + leaderIds.add(msbLeaderId); + } + + // insert all five leader entries + for (UUID leaderId : leaderIds) { + boolean result = storage.takeLead(leaderId); + Assertions.assertThat(result).isEqualTo(true); + } + + // make sure fetched leaders has all the inserted leaders + List fetchedLeaderIds = storage.getLeaders(); + for (UUID fetchedLeaderId : fetchedLeaderIds) { + Assertions.assertThat(leaderIds.contains(fetchedLeaderId)).isTrue(); + } + } + + @Test + public void testNoLeaders() { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from leader"); + + List fetchedLeaderIds = storage.getLeaders(); + Assertions.assertThat(fetchedLeaderIds.size()).isEqualTo(0); + } + + @Test + public void testRenewLead() throws InterruptedException { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from leader"); + + UUID leaderId = UUID.randomUUID(); + int sleepTime = 3; + + final Instant initialTime = Instant.now(); + storage.takeLead(leaderId); + + // sleep 3 seconds, then renew lead + TimeUnit.SECONDS.sleep(sleepTime); + Assertions.assertThat(storage.renewLead(leaderId)).isTrue(); + + Instant hbTime = handle.createQuery("SELECT last_heartbeat FROM leader") + .mapTo(Timestamp.class) + .first() + .toInstant(); + + Duration between = Duration.between(initialTime, hbTime); + Assertions.assertThat(between.getSeconds()).isGreaterThanOrEqualTo(sleepTime); + } + + @Test + public void testReleaseLead() { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from leader"); + + UUID leaderIdForSelf = UUID.randomUUID(); + UUID leaderIdForOther = UUID.randomUUID(); + + storage.takeLead(leaderIdForSelf); + storage.takeLead(leaderIdForOther); + + List fetchedLeaderIds = storage.getLeaders(); + Assertions.assertThat(fetchedLeaderIds.size()).isEqualTo(2); + + handle.createStatement("UPDATE leader SET reaper_instance_id = 0 WHERE leader_id = :id") + .bind("id", UuidUtil.toSequenceId(leaderIdForOther)) + .execute(); + + // test that releaseLead succeeds for entry where instance_id = self + storage.releaseLead(leaderIdForSelf); + fetchedLeaderIds = storage.getLeaders(); + Assertions.assertThat(fetchedLeaderIds.size()).isEqualTo(1); + + // test that releaseLead fails for entry where instance_id != self + storage.releaseLead(leaderIdForOther); + fetchedLeaderIds = storage.getLeaders(); + Assertions.assertThat(fetchedLeaderIds.size()).isEqualTo(1); + } + + @Test + public void testSaveHeartbeat() { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from running_reapers"); + + storage.saveHeartbeat(); + int numReapers = storage.countRunningReapers(); + Assertions.assertThat(numReapers).isEqualTo(1); + } + + @Test + public void testNodeMetrics() { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from node_metrics_v1"); + + UUID runId = UUID.randomUUID(); + + // test empty result set + ArrayList emptyNmList = (ArrayList) storage.getNodeMetrics(runId); + Assertions.assertThat(emptyNmList.size()).isEqualTo(0); + + NodeMetrics originalNm = NodeMetrics.builder() + .withNode("fake_node") + .withCluster("fake_cluster") + .withDatacenter("NYDC") + .withHasRepairRunning(true) + .withPendingCompactions(4) + .withActiveAnticompactions(1) + .build(); + + storage.storeNodeMetrics(runId, originalNm); + ArrayList nodeMetricsList = (ArrayList) storage.getNodeMetrics(runId); + Assertions.assertThat(nodeMetricsList.size()).isEqualTo(1); + + NodeMetrics fetchedNm = nodeMetricsList.get(0); + Assertions.assertThat(fetchedNm.getNode()).isEqualTo(originalNm.getNode()); + Assertions.assertThat(fetchedNm.getCluster()).isEqualTo(originalNm.getCluster()); + Assertions.assertThat(fetchedNm.getDatacenter()).isEqualTo(originalNm.getDatacenter()); + Assertions.assertThat(fetchedNm.hasRepairRunning()).isEqualTo(originalNm.hasRepairRunning()); + Assertions.assertThat(fetchedNm.getPendingCompactions()).isEqualTo(originalNm.getPendingCompactions()); + Assertions.assertThat(fetchedNm.getActiveAnticompactions()).isEqualTo(originalNm.getActiveAnticompactions()); + } + + @Test + public void testNodeMetricsByNode() throws InterruptedException { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from node_metrics_v1"); + + UUID runId = UUID.randomUUID(); + + NodeMetrics nmRequest = NodeMetrics.builder() + .withNode("fake_node1") + .withCluster("fake_cluster") + .withDatacenter("NYDC") + .withRequested(true) + .build(); + + NodeMetrics nm1 = NodeMetrics.builder() + .withNode("fake_node1") + .withCluster("fake_cluster") + .withDatacenter("NYDC") + .withHasRepairRunning(true) + .withPendingCompactions(4) + .withActiveAnticompactions(1) + .build(); + + // store a metric request and a metric response + storage.storeNodeMetrics(runId, nmRequest); + TimeUnit.MILLISECONDS.sleep(100); + storage.storeNodeMetrics(runId, nm1); + + Optional fetchedNm1Opt = storage.getNodeMetrics(runId, "fake_node1"); + Assertions.assertThat(fetchedNm1Opt.isPresent()).isTrue(); + NodeMetrics fetchedNm1 = fetchedNm1Opt.get(); + Assertions.assertThat(fetchedNm1.getNode()).isEqualTo(nm1.getNode()); + Assertions.assertThat(fetchedNm1.getCluster()).isEqualTo(nm1.getCluster()); + Assertions.assertThat(fetchedNm1.getDatacenter()).isEqualTo(nm1.getDatacenter()); + Assertions.assertThat(fetchedNm1.hasRepairRunning()).isEqualTo(nm1.hasRepairRunning()); + Assertions.assertThat(fetchedNm1.getPendingCompactions()).isEqualTo(nm1.getPendingCompactions()); + Assertions.assertThat(fetchedNm1.getActiveAnticompactions()).isEqualTo(nm1.getActiveAnticompactions()); + + // test that fetching a non-existent metric returns Optional.Empty() + Optional fetchedNm2Opt = storage.getNodeMetrics(runId, "fake_node2"); + Assertions.assertThat(fetchedNm2Opt.isPresent()).isFalse(); + } + + @Test + public void testNodeOperations() { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from node_operations"); + + storage.storeOperations("fake_cluster", OpType.OP_STREAMING, "fake_host", "data1"); + String data = storage.listOperations("fake_cluster", OpType.OP_STREAMING, "fake_host"); + Assertions.assertThat(data.equals("data1")); + storage.storeOperations("fake_cluster", OpType.OP_STREAMING, "fake_host", "data2"); + + data = storage.listOperations("fake_cluster", OpType.OP_STREAMING, "fake_host"); + Assertions.assertThat(data.equals("data2")); + } + + @Test + public void testGenericMetricsByHostandCluster() { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from node_metrics_v2"); + handle.execute("DELETE from node_metrics_v2_source_nodes"); + handle.execute("DELETE from node_metrics_v2_metric_types"); + + DateTime now = DateTime.now(); + GenericMetric metric1 = GenericMetric.builder() + .withClusterName("fake_cluster") + .withHost("fake_host1") + .withTs(now) + .withMetricDomain("org.apache.cassandra.metrics") + .withMetricType("ThreadPool") + .withMetricName("PendingTasks") + .withMetricScope("MutationStage") + .withMetricAttribute("fake_attribute") + .withValue(12) + .build(); + GenericMetric metric2 = GenericMetric.builder() // different metric, different host + .withClusterName("fake_cluster") + .withHost("fake_host2") + .withTs(now) + .withMetricDomain("org.apache.cassandra.metrics") + .withMetricType("ThreadPool") + .withMetricName("ActiveTasks") + .withMetricScope("MutationStage") + .withMetricAttribute("fake_attribute") + .withValue(14) + .build(); + + storage.storeMetric(metric1); + storage.storeMetric(metric2); + + // verify that the two metrics above can be queried by cluster name + Set expectedMetrics = new HashSet<>(); + expectedMetrics.add("PendingTasks"); + expectedMetrics.add("ActiveTasks"); + List retrievedMetrics = storage.getMetrics( + "fake_cluster", + Optional.empty(), + "org.apache.cassandra.metrics", + "ThreadPool", + now.getMillis() + ); + for (GenericMetric retrievedMetric : retrievedMetrics) { + Assertions.assertThat(expectedMetrics.contains(retrievedMetric.getMetricName())); + expectedMetrics.remove(retrievedMetric.getMetricName()); + } + + // verify that metrics can be queried by host + retrievedMetrics = storage.getMetrics( + "fake_cluster", + Optional.of("fake_host2"), + "org.apache.cassandra.metrics", + "ThreadPool", + now.getMillis() + ); + Assertions.assertThat(retrievedMetrics.size() == 1); + GenericMetric retrievedMetric = retrievedMetrics.get(0); + Assertions.assertThat(retrievedMetric.getMetricName().equals("ActiveTasks")); + } + + @Test + public void testGenericMetricExpiration() { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from node_metrics_v2"); + handle.execute("DELETE from node_metrics_v2_source_nodes"); + handle.execute("DELETE from node_metrics_v2_metric_types"); + + + DateTime expirationTime = DateTime.now().minusMinutes(3); + GenericMetric expiredMetric = GenericMetric.builder() + .withClusterName("fake_cluster") + .withHost("fake_host1") + .withTs(expirationTime) + .withMetricDomain("org.apache.cassandra.metrics") + .withMetricType("ThreadPool") + .withMetricName("PendingTasks") + .withMetricScope("MutationStage") + .withMetricAttribute("fake_attribute") + .withValue(12) + .build(); + storage.storeMetric(expiredMetric); + + // verify that the metric was stored in the DB + List retrievedMetrics = storage.getMetrics( + "fake_cluster", + Optional.empty(), + "org.apache.cassandra.metrics", + "ThreadPool", + expirationTime.getMillis() + ); + Assertions.assertThat(retrievedMetrics.size() == 1); + List> rs = handle.select("SELECT COUNT(*) AS count FROM node_metrics_v2_source_nodes"); + long numSourceNodes = (long) rs.get(0).get("count"); + Assertions.assertThat(numSourceNodes == 1); + + // verify that on purgeMetrics(), metric is purged since it's older than 3 minutes + storage.purgeMetrics(); + retrievedMetrics = storage.getMetrics( + "fake_cluster", + Optional.empty(), + "org.apache.cassandra.metrics", + "ThreadPool", + expirationTime.getMillis() + ); + Assertions.assertThat(retrievedMetrics.size() == 0); + + // verify that source nodes have also been purged + rs = handle.select("SELECT COUNT(*) AS count FROM node_metrics_v2_source_nodes"); + numSourceNodes = (long) rs.get(0).get("count"); + Assertions.assertThat(numSourceNodes == 1); + Assertions.assertThat(numSourceNodes == 0); + } + + /* + The following tests rely on timeouts; will take a few minutes to complete + */ + @Test + public void testUpdateLeaderEntry() throws InterruptedException { + System.out.println("Testing leader timeout (this will take a minute)..."); + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi, 1, 1, 1, 1); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from leader"); + + UUID leaderId = UUID.randomUUID(); + + storage.takeLead(leaderId); + List fetchedLeaderIds = storage.getLeaders(); + Assertions.assertThat(fetchedLeaderIds.size()).isEqualTo(1); + + boolean result = storage.takeLead(leaderId); // should not work bc entry already exist + Assertions.assertThat(result).isFalse(); + + int rowsUpdated = handle.createStatement(IStoragePostgreSql.SQL_UPDATE_LEAD) + .bind("reaperInstanceId", UuidUtil.toSequenceId(reaperInstanceId)) + .bind("reaperInstanceHost", AppContext.REAPER_INSTANCE_ADDRESS) + .bind("leaderId", UuidUtil.toSequenceId(leaderId)) + .bind("expirationTime", Instant.now().minus(Duration.ofSeconds(60))) + .execute(); + + Assertions.assertThat(rowsUpdated).isEqualTo(0); // should not b/c original entry hasn't expired yet + + TimeUnit.SECONDS.sleep(60); + + rowsUpdated = handle.createStatement(IStoragePostgreSql.SQL_UPDATE_LEAD) + .bind("reaperInstanceId", UuidUtil.toSequenceId(reaperInstanceId)) + .bind("reaperInstanceHost", AppContext.REAPER_INSTANCE_ADDRESS) + .bind("leaderId", UuidUtil.toSequenceId(leaderId)) + .bind("expirationTime", Instant.now().minus(Duration.ofSeconds(60))) + .execute(); + + Assertions.assertThat(rowsUpdated).isEqualTo(1); // should update b/c original entry has expired + } + + @Test + public void testDeleteOldNodeMetrics() throws InterruptedException { + System.out.println("Testing metrics timeout (this will take a minute)..."); + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi, 1, 1, 1, 1); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from node_metrics_v1"); + + UUID runId = UUID.randomUUID(); + NodeMetrics originalNm = NodeMetrics.builder() + .withNode("fake_node") + .withCluster("fake_cluster") + .withDatacenter("NYDC") + .withHasRepairRunning(true) + .withPendingCompactions(4) + .withActiveAnticompactions(1) + .build(); + storage.storeNodeMetrics(runId, originalNm); + + // first delete attempt shouldn't do anything because the entry hasn't passed its expiration time + storage.purgeNodeMetrics(); + int numMetrics = handle.createQuery("SELECT COUNT(*) FROM node_metrics_v1") + .mapTo(Integer.class) + .first(); + Assertions.assertThat(numMetrics).isEqualTo(1); + + TimeUnit.SECONDS.sleep(61); + + // second delete attempt should work because entry has passed its expiration time + storage.purgeNodeMetrics(); + numMetrics = handle.createQuery("SELECT COUNT(*) FROM node_metrics_v1") + .mapTo(Integer.class) + .first(); + Assertions.assertThat(numMetrics).isEqualTo(0); + } + + @Test + public void testManualDeleteNodeMetrics() { + DBI dbi = new DBI(DB_URL); + UUID reaperInstanceId = UUID.randomUUID(); + PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); + Assertions.assertThat(storage.isStorageConnected()).isTrue(); + + Handle handle = dbi.open(); + handle.execute("DELETE from node_metrics_v1"); + + UUID runId = UUID.randomUUID(); + NodeMetrics nm1 = NodeMetrics.builder() + .withNode("fake_node_1") + .withCluster("fake_cluster") + .withDatacenter("NYDC") + .withHasRepairRunning(true) + .withPendingCompactions(4) + .withActiveAnticompactions(1) + .build(); + NodeMetrics nm2 = NodeMetrics.builder() + .withNode("fake_node2") + .withCluster("fake_cluster") + .withDatacenter("NYDC") + .withHasRepairRunning(true) + .withPendingCompactions(4) + .withActiveAnticompactions(1) + .build(); + storage.storeNodeMetrics(runId, nm1); + storage.storeNodeMetrics(runId, nm2); + + int numMetrics = handle.createQuery("SELECT COUNT(*) FROM node_metrics_v1") + .mapTo(Integer.class) + .first(); + Assertions.assertThat(numMetrics).isEqualTo(2); + + // delete metrics from table for fake_node_1 and verify delete succeeds + storage.deleteNodeMetrics(runId, "fake_node_1"); + numMetrics = handle.createQuery("SELECT COUNT(*) FROM node_metrics_v1") + .mapTo(Integer.class) + .first(); + Assertions.assertThat(numMetrics).isEqualTo(1); + } +} diff --git a/src/server/src/test/resources/db/postgres/V17_0_0__multi_instance.sql b/src/server/src/test/resources/db/postgres/V17_0_0__multi_instance.sql new file mode 100644 index 000000000..2e07d6eff --- /dev/null +++ b/src/server/src/test/resources/db/postgres/V17_0_0__multi_instance.sql @@ -0,0 +1,66 @@ +-- H2-compatible version of multi-instance reaper Postgres DDL +-- CHANGES: +-- TEXT --> VARCHAR(255) because H2 doesn't support index on TEXT + +CREATE TABLE IF NOT EXISTS "leader" ( + "leader_id" BIGINT PRIMARY KEY, + "reaper_instance_id" BIGINT, + "reaper_instance_host" TEXT, + "last_heartbeat" TIMESTAMP WITH TIME ZONE +); + +CREATE TABLE IF NOT EXISTS "running_reapers" ( + "reaper_instance_id" BIGINT PRIMARY KEY, + "reaper_instance_host" TEXT, + "last_heartbeat" TIMESTAMP WITH TIME ZONE +); + +CREATE TABLE IF NOT EXISTS "node_metrics_v1" ( + "run_id" BIGINT, + "ts" TIMESTAMP WITH TIME ZONE, + "node" VARCHAR(255), + "cluster" TEXT, + "datacenter" TEXT, + "requested" BOOLEAN, + "pending_compactions" INT, + "has_repair_running" BOOLEAN, + "active_anticompactions" INT, + PRIMARY KEY("run_id", "ts", "node") +); + +--- Sidecar mode + +CREATE TABLE IF NOT EXISTS "node_metrics_v2_source_nodes" ( + "source_node_id" SERIAL UNIQUE, + "cluster" VARCHAR(255), + "host" VARCHAR(255), + "last_updated" TIMESTAMP WITH TIME ZONE, + PRIMARY KEY ("cluster", "host") +); + +CREATE TABLE IF NOT EXISTS "node_metrics_v2_metric_types" ( + "metric_type_id" SERIAL UNIQUE, + "metric_domain" VARCHAR(255), + "metric_type" VARCHAR(255), + "metric_scope" VARCHAR(255), + "metric_name" VARCHAR(255), + "metric_attribute" VARCHAR(255), + PRIMARY KEY ("metric_domain", "metric_type", "metric_scope", "metric_name", "metric_attribute") +); + +CREATE TABLE IF NOT EXISTS "node_metrics_v2" ( + "metric_type_id" INT REFERENCES "node_metrics_v2_metric_types"("metric_type_id"), + "source_node_id" INT REFERENCES "node_metrics_v2_source_nodes"("source_node_id"), + "ts" TIMESTAMP WITH TIME ZONE, + "value" DOUBLE PRECISION, + PRIMARY KEY ("metric_type_id", "source_node_id", "ts") +); + +CREATE TABLE IF NOT EXISTS "node_operations" ( + "cluster" VARCHAR(255), + "type" VARCHAR(255), + "host" VARCHAR(255), + "ts" TIMESTAMP WITH TIME ZONE, + "data" TEXT, + PRIMARY KEY ("cluster", "type", "host", "ts") +); diff --git a/src/server/src/test/resources/reaper-postgres-sidecar1-at.yaml b/src/server/src/test/resources/reaper-postgres-sidecar1-at.yaml new file mode 100644 index 000000000..93587a558 --- /dev/null +++ b/src/server/src/test/resources/reaper-postgres-sidecar1-at.yaml @@ -0,0 +1,70 @@ +# Copyright 2017-2017 Spotify AB +# Copyright 2017-2019 The Last Pickle Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Cassandra Reaper Configuration for Acceptance Tests. +# +segmentCountPerNode: 16 +repairParallelism: SEQUENTIAL +repairIntensity: 0.95 +scheduleDaysBetween: 7 +repairRunThreadCount: 15 +hangingRepairTimeoutMins: 1 +storageType: postgres +incrementalRepair: false +blacklistTwcsTables: true +jmxConnectionTimeoutInSeconds: 300 +activateQueryLogger: true +datacenterAvailability: SIDECAR +enforcedLocalNode: 127.0.0.1 +enforcedLocalClusterName: test +enforcedLocalDatacenter: dc1 +enableConcurrentMigrations: false + +logging: + level: WARN + appenders: + - type: console + +server: + type: default + applicationConnectors: + - type: http + port: 8083 + bindHost: 127.0.0.1 + adminConnectors: + - type: http + port: 8084 + bindHost: 127.0.0.1 + +jmxPorts: + 127.0.0.1: 7100 + +jmxCredentials: + test: + username: cassandra + password: cassandrapassword + +# database section will be ignored if storageType is set to "memory" +postgres: + driverClass: org.postgresql.Driver + user: postgres + password: + url: jdbc:postgresql://127.0.0.1/reaper + +metrics: + frequency: 1 second + reporters: + - type: csv + file: target/dropwizard-metrics diff --git a/src/server/src/test/resources/reaper-postgres-sidecar2-at.yaml b/src/server/src/test/resources/reaper-postgres-sidecar2-at.yaml new file mode 100644 index 000000000..242b74fe3 --- /dev/null +++ b/src/server/src/test/resources/reaper-postgres-sidecar2-at.yaml @@ -0,0 +1,70 @@ +# Copyright 2017-2017 Spotify AB +# Copyright 2017-2019 The Last Pickle Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Cassandra Reaper Configuration for Acceptance Tests. +# +segmentCountPerNode: 16 +repairParallelism: SEQUENTIAL +repairIntensity: 0.95 +scheduleDaysBetween: 7 +repairRunThreadCount: 15 +hangingRepairTimeoutMins: 1 +storageType: postgres +incrementalRepair: false +blacklistTwcsTables: true +jmxConnectionTimeoutInSeconds: 300 +activateQueryLogger: true +datacenterAvailability: SIDECAR +enforcedLocalNode: 127.0.0.2 +enforcedLocalClusterName: test +enforcedLocalDatacenter: dc1 +enableConcurrentMigrations: false + +logging: + level: WARN + appenders: + - type: console + +server: + type: default + applicationConnectors: + - type: http + port: 8083 + bindHost: 127.0.0.1 + adminConnectors: + - type: http + port: 8084 + bindHost: 127.0.0.1 + +jmxPorts: + 127.0.0.2: 7200 + +jmxCredentials: + test: + username: cassandra + password: cassandrapassword + +# database section will be ignored if storageType is set to "memory" +postgres: + driverClass: org.postgresql.Driver + user: postgres + password: + url: jdbc:postgresql://127.0.0.1/reaper + +metrics: + frequency: 1 second + reporters: + - type: csv + file: target/dropwizard-metrics