Skip to content

Commit

Permalink
Cache the topology and the endpoint mappings to speed up cstar on sub…
Browse files Browse the repository at this point in the history
…sequent calls

nodetool status will get used to check the state of each node, but the topology won't be recomputed unless the list of hosts changes or the schema gets modified (replication factors could be changed).
Make fqdn resolving optional.
Remove nodetool ring calls and remove the tokens from the Host objects
  • Loading branch information
adejanovski committed Apr 30, 2020
1 parent 5967ff8 commit 3a8dd81
Show file tree
Hide file tree
Showing 27 changed files with 314 additions and 3,358 deletions.
2 changes: 2 additions & 0 deletions cstar/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def _add_strategy_arguments(parser):
help='Only run in hosts belonging to the specified data center')
parser.add_argument('--key-space', '--keyspace',
help='The keyspace to use for endpoint mapping calculation. Uses all keypsaces by default.')
parser.add_argument('-r', '--resolve-hostnames', dest='resolve_hostnames', action="store_true", default=False,
help='Resolve hostnames of cluster nodes.')

parser.add_argument('--one', dest='strategy_one', action="store_true", default=False, required=False,
help='Run on one node of one data center at the time and for each cluster')
Expand Down
3 changes: 2 additions & 1 deletion cstar/cstarcli.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ def execute_command(args):
ssh_identity_file=args.ssh_identity_file,
ssh_lib=args.ssh_lib,
jmx_username=args.jmx_username,
jmx_password=args.jmx_password)
jmx_password=args.jmx_password,
resolve_hostnames=args.resolve_hostnames)
job.run()

def validate_uuid4(uuid_string):
Expand Down
3 changes: 2 additions & 1 deletion cstar/cstarparcli.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def main():
ssh_identity_file = namespace.ssh_identity_file,
ssh_lib=namespace.ssh_lib,
jmx_username=namespace.jmx_username,
jmx_password=namespace.jmx_password)
jmx_password=namespace.jmx_password,
resolve_hostnames=namespace.resolve_hostnames)
job.run()


Expand Down
85 changes: 67 additions & 18 deletions cstar/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import queue
import time
import socket
import os
import threading
import functools
import pickle
import sys

import cstar.remote
import cstar.endpoint_mapping
import cstar.topology
from cstar.topology import Topology
import cstar.nodetoolparser
import cstar.state
import cstar.strategy
Expand All @@ -30,7 +34,7 @@
import cstar.jobwriter
from cstar.exceptions import BadSSHHost, NoHostsSpecified, HostIsDown, \
NoDefaultKeyspace, UnknownHost, FailedExecution
from cstar.output import msg, debug, emph, info, error
from cstar.output import msg, debug, emph, info, error, warn

MAX_ATTEMPTS = 3

Expand Down Expand Up @@ -70,6 +74,10 @@ def __init__(self):
self.jmx_username = None
self.jmx_password = None
self.returned_jobs = list()
self.schema_versions = list()
self.status_topology_hash = list()
self.resolve_hostnames = False


def __enter__(self):
return self
Expand All @@ -85,26 +93,54 @@ def __exit__(self, exc_type, exc_value, exc_traceback):


def get_cluster_topology(self, seed_nodes):
cluster_names = list()
self.schema_versions = list()
self.status_topology_hash = list()
topologies = list()
count = 0
tried_hosts = []
for host in seed_nodes:
tried_hosts.append(host)
conn = self._connection(host)

describe_res = self.run_nodetool(conn, "describecluster")
topology_res = self.run_nodetool(conn, "ring")

if (describe_res.status == 0) and (topology_res.status == 0):
cluster_name = cstar.nodetoolparser.parse_describe_cluster(describe_res.out)
topology = cstar.nodetoolparser.parse_nodetool_ring(topology_res.out, cluster_name, self.reverse_dns_preheat)
return topology
status_res = self.run_nodetool(conn, "status")
if (describe_res.status == 0) and (status_res.status == 0):
(cluster_name, schema_version) = cstar.nodetoolparser.parse_describe_cluster(describe_res.out)
if cluster_name not in cluster_names:
cluster_names.append(cluster_name)
topologies.append(cstar.nodetoolparser.parse_nodetool_status(status_res.out, cluster_name, self.reverse_dns_preheat, self.resolve_hostnames))
self.schema_versions.append(schema_version)
self.status_topology_hash.append(topologies[len(topologies) - 1].get_hash())


count += 1
if count >= MAX_ATTEMPTS:
break
if len(topologies) > 0:
final_topology = set()
for i in range(len(topologies)):
final_topology.update(topologies[i].hosts)
return Topology(final_topology)
raise HostIsDown("Could not find any working host while fetching topology. Is Cassandra actually running? Tried the following hosts:",
", ".join([x.ip for x in tried_hosts]))

def get_cache_file_path(self, cache_type):
debug("Cache file: {}-{}-{}".format(cache_type, "-".join(sorted(self.schema_versions)), "-".join(sorted(self.status_topology_hash))))
return os.path.join(self.cache_directory, "{}-{}-{}".format(cache_type, "-".join(sorted(self.schema_versions)), "-".join(sorted(self.status_topology_hash))))

def maybe_get_data_from_cache(self, cache_type):
try:
cache_file = self.get_cache_file_path(cache_type)
if os.path.exists(cache_file):
debug("Getting {} from cache".format(cache_type))
cached_data = pickle.load(open(cache_file, 'rb'))
return cached_data
except Exception:
warn("Failed getting data from cache : {}".format(sys.exc_info()[2]))
debug("Cache miss for {}".format(cache_type))
return None

def reverse_dns_preheat(self, ips):
if self.is_preheated:
return
Expand Down Expand Up @@ -138,14 +174,18 @@ def get_endpoint_mapping(self, topology):
clusters = []
failed_hosts = []
mappings = []
count = 0

endpoint_mappings = self.maybe_get_data_from_cache("endpoint_mapping")
if endpoint_mappings is not None:
return endpoint_mappings

for host in topology.get_up():
if host.cluster in clusters:
# We need to fetch keyspaces on one node per cluster, no more.
continue

count = 0
clusters.append(host.cluster)
count = 0
conn = self._connection(host)

if self.key_space:
Expand All @@ -154,7 +194,7 @@ def get_endpoint_mapping(self, topology):
keyspaces = self.get_keyspaces(conn)
has_error = True
for keyspace in keyspaces:
if not keyspace in ['system', 'system_schema']:
if not keyspace.startswith("system"):
debug("Fetching endpoint mapping for keyspace", keyspace)
res = self.run_nodetool(conn, *("describering", keyspace))
has_error = False
Expand All @@ -166,15 +206,21 @@ def get_endpoint_mapping(self, topology):
range_mapping = cstar.nodetoolparser.convert_describering_to_range_mapping(describering)
mappings.append(cstar.endpoint_mapping.parse(range_mapping, topology, lookup=ip_lookup))

if count >= MAX_ATTEMPTS:
failed_hosts += host
break
if has_error:
if count >= MAX_ATTEMPTS:
failed_hosts += host
break
else:
clusters.append(host.cluster)

count += 1

if failed_hosts:
raise HostIsDown("Following hosts couldn't be reached: {}".format(', '.join(host.fqdn for host in failed_hosts)))

return cstar.endpoint_mapping.merge(mappings)

endpoint_mappings = cstar.endpoint_mapping.merge(mappings)
pickle.dump(dict(endpoint_mappings), open(self.get_cache_file_path("endpoint_mapping"), 'wb'))
return endpoint_mappings

def run_nodetool(self, conn, *cmds):
if self.jmx_username and self.jmx_password:
Expand All @@ -187,7 +233,7 @@ def setup(self, hosts, seeds, command, job_id, strategy, cluster_parallel, dc_pa
ignore_down_nodes, dc_filter,
sleep_on_new_runner, sleep_after_done,
ssh_username, ssh_password, ssh_identity_file, ssh_lib,
jmx_username, jmx_password):
jmx_username, jmx_password, resolve_hostnames):

msg("Starting setup")

Expand All @@ -202,6 +248,7 @@ def setup(self, hosts, seeds, command, job_id, strategy, cluster_parallel, dc_pa
self.job_runner = job_runner
self.key_space = key_space
self.output_directory = output_directory or os.path.expanduser("~/.cstar/jobs/" + job_id)
self.cache_directory = os.path.expanduser("~/.cstar/cache")
self.sleep_on_new_runner = sleep_on_new_runner
self.sleep_after_done = sleep_after_done
self.ssh_username = ssh_username
Expand All @@ -210,14 +257,16 @@ def setup(self, hosts, seeds, command, job_id, strategy, cluster_parallel, dc_pa
self.ssh_lib = ssh_lib
self.jmx_username = jmx_username
self.jmx_password = jmx_password
self.resolve_hostnames = resolve_hostnames
if not os.path.exists(self.output_directory):
os.makedirs(self.output_directory)
if not os.path.exists(self.cache_directory):
os.makedirs(self.cache_directory)

msg("Loading cluster topology")
if seeds:
current_topology = cstar.topology.Topology([])
for seed in seeds:
current_topology = current_topology | self.get_cluster_topology((seed,))
current_topology = current_topology | self.get_cluster_topology(seeds)
original_topology = current_topology
if dc_filter:
original_topology = original_topology.with_dc_filter(dc_filter)
Expand Down
21 changes: 19 additions & 2 deletions cstar/jobprinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ def get_status(host):
if host in down:
return ":"
return '.'

def get_ordered_status(host):
if host in progress.done:
return 10
if host in progress.running:
return 100
if host in progress.failed:
return 50
return 1000

lines = [" + Done, up * Executing, up ! Failed, up . Waiting, up",
" - Done, down / Executing, down X Failed, down : Waiting, down"]
Expand All @@ -48,10 +57,18 @@ def get_status(host):
if len(dcs):
lines.append("DC: " + dc)
dc_topology = cluster_topology.with_dc(cluster, dc)
hosts = sorted(dc_topology, key=lambda x: x.token)
hosts = sorted(dc_topology, key=lambda x: (get_ordered_status(x), x.rack, x.ip))
status = "".join([get_status(host) for host in hosts])
if len(status) >= 6:
status = status[0:len(status):3] + "\n" + status[1:len(status):3] + "\n" + status[2:len(status):3]
splitStatus = list(chunks(status, 3))
status = splitStatus[0] + "\n" + splitStatus[1] + "\n" + splitStatus[2]
lines.append(status)
lines.append("%d done, %d failed, %d executing" % (len(progress.done), len(progress.failed), len(progress.running)))
printer("\n".join(lines))

def chunks(l, n):
"""Yield n number of sequential chunks from l."""
d, r = divmod(len(l), n)
for i in range(n):
si = (d+1)*(i if i < r else r) + d*(0 if i < r else i - r)
yield l[si:si+(d+1 if i < r else d)]
1 change: 1 addition & 0 deletions cstar/jobreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def _parse(input, file, output_directory, job, job_id, stop_after, max_days, end
dc_parallel = state['dc_parallel']
dc_filter = state['dc_filter'] if 'dc_filter' in state else None
max_concurrency = state['max_concurrency']
resolve_hostnames = state['resolve_hostnames'] if 'resolve_hostnames' in state.keys() else False

progress = cstar.progress.Progress(
running=[cstar.topology.Host(*arr) for arr in state['progress']['running']],
Expand Down
3 changes: 2 additions & 1 deletion cstar/nodetoolparser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .describering import parse as parse_nodetool_describering, convert_describering_to_range_mapping
from .simple import parse_describe_cluster, parse_nodetool_ring, extract_keyspaces_from_cfstats
from .simple import parse_describe_cluster, extract_keyspaces_from_cfstats
from .status import parse_nodetool_status
40 changes: 2 additions & 38 deletions cstar/nodetoolparser/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,11 @@
from cstar.topology import Topology, Host

_cluster_name_re = re.compile(r"^\s*Name:\s*(.*)$", re.MULTILINE)
_ip_re = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
_token_re = re.compile(r"^\-?\d+$")
_status_re = re.compile(r"^[A-Za-z]+$")
_state_re = re.compile(r"^[A-Za-z]+$")
_schema_version_re = re.compile(r"([0-9A-Fa-f]{8}(?:-[0-9A-Fa-f]{4}){3}-[0-9A-Fa-f]{12}): ", re.MULTILINE)
_keyspace_name_re = re.compile(r"^\s*Keyspace\s*:\s*(.*)$", re.MULTILINE)

def parse_describe_cluster(text):
return _cluster_name_re.search(text).group(1)


def _parse_node(line):
words = line.split()
if len(words) == 8 and re.match(_ip_re, words[0]) and re.match(_status_re, words[2]) and re.match(_state_re, words[3]) and re.match(_token_re, words[7]):
return words
else:
return None


def _parse_datacenter_name_and_nodes(datacenter_section):
lines = datacenter_section.split("\n")
name = lines[0]
nodes = [_parse_node(line) for line in lines[1:]]
return (name, [node for node in nodes if node is not None])


def parse_nodetool_ring(text, cluster_name, reverse_dns_preheat):
topology = []
datacenter_sections = text.split("Datacenter: ")[1:]
datacenter_names_and_nodes = [_parse_datacenter_name_and_nodes(section) for section in datacenter_sections]
reverse_dns_preheat([node[0] for (_, nodes) in datacenter_names_and_nodes for node in nodes])
for (datacenterName, nodes) in datacenter_names_and_nodes:
for node in nodes:
fqdn = node[0]
try:
fqdn=socket.gethostbyaddr(node[0])[0]
except socket.herror:
pass
topology.append(Host(fqdn=fqdn, ip=node[0], dc=datacenterName, cluster=cluster_name,
is_up=(node[2] == "Up" and node[3] == "Normal"), token=int(node[7])))

return Topology(topology)
return (_cluster_name_re.search(text).group(1), _schema_version_re.search(text).group(1))


def extract_keyspaces_from_cfstats(text):
Expand Down
67 changes: 67 additions & 0 deletions cstar/nodetoolparser/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2017 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import socket
import re

from cstar.topology import Topology, Host

_state_re = re.compile(r"^[A-Za-z]{2}$")
_ip_re = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
_load_re = re.compile(r"^(([0-9]+([,.][0-9]+)?)(\s+)([a-zA-Z]{1,2}))$")
_tokens_re = re.compile(r"^\d+$")
_owns_re = re.compile(r"^\d+\.\d+\%$")
_host_id_re = re.compile(r"^[0-9A-Fa-f]{8}(?:-[0-9A-Fa-f]{4}){3}-[0-9A-Fa-f]{12}$")
_rack_re = re.compile(r"^\w+$")
_keyspace_name_re = re.compile(r"^\s*Keyspace\s*:\s*(.*)$", re.MULTILINE)


def _parse_node(line):
words = line.split()
if len(words) == 8 \
and re.match(_state_re, words[0]) \
and re.match(_ip_re, words[1]) \
and re.match(_tokens_re, words[4]) \
and re.match(_host_id_re, words[6]) \
and re.match(_rack_re, words[7]):
return words
else:
return None


def _parse_datacenter_name_and_nodes(datacenter_section):
lines = datacenter_section.split("\n")
name = lines[0]
nodes = [_parse_node(line) for line in lines[1:]]
return (name, [node for node in nodes if node is not None])


def parse_nodetool_status(text, cluster_name, reverse_dns_preheat, resolve_hostnames=False):
topology = []
datacenter_sections = text.split("Datacenter: ")[1:]
datacenter_names_and_nodes = [_parse_datacenter_name_and_nodes(section) for section in datacenter_sections]
if resolve_hostnames:
reverse_dns_preheat([node[1] for (_, nodes) in datacenter_names_and_nodes for node in nodes])
for (datacenter_name, nodes) in datacenter_names_and_nodes:
for node in nodes:
fqdn = node[1]
if resolve_hostnames:
try:
fqdn=socket.gethostbyaddr(node[1])[0]
except socket.herror:
pass
topology.append(Host(fqdn=fqdn, ip=node[1], dc=datacenter_name, cluster=cluster_name,
is_up=(node[0] == "UN"), rack=node[7], host_id=node[6]))

return Topology(topology)
Loading

0 comments on commit 3a8dd81

Please sign in to comment.