Skip to content

Commit

Permalink
dc-serial always is default, 'one' strategy now considers dc-parallel…
Browse files Browse the repository at this point in the history
… flag

DC now identified by cluster.dc, avoid collusions in multi-cluster + add tests
  • Loading branch information
arodrime authored and adejanovski committed Feb 14, 2020
1 parent 6ff4d90 commit f749b36
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 38 deletions.
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ It's also possible to install straight from repo. This installs the latest versi

## Code of conduct

This project adheres to the
This project adheres to the
[Open Code of Conduct](https://github.com/spotify/code-of-conduct/blob/master/code-of-conduct.md).
By participating, you are expected to honor this code.

Expand All @@ -62,13 +62,13 @@ The HOST-SPEC specifies what nodes to run the script on. There are three ways to

The command is the name of a script located in either `/usr/lib/cstar/commands` or in
`~/.cstar/commands`. This script will be uploaded to all nodes in the cluster and executed. File suffixes
are stripped. The requirements of the script are described below. Cstar comes pre-packaged with one script file
are stripped. The requirements of the script are described below. Cstar comes pre-packaged with one script file
called ``run`` which takes a single parameter ``--command`` - see examples below.

Some additional switches to control cstar:

* One can override the parallelism specified in a script by setting the switches
`--cluster-parallelism`, `--dc-parallelism` and `--strategy`.
`--cluster-parallel`, `--dc-parallel` and `--strategy`.

There are two special case invocations:

Expand All @@ -87,7 +87,7 @@ A script file can specify additional parameters.

## Command syntax

In order to run a command, it is first uploaded to the relevant host, and then executed from there.
In order to run a command, it is first uploaded to the relevant host, and then executed from there.

Commands can be written in any scripting language in which the hash symbol starts a line comment, e.g.
shell-script, python, perl or ruby.
Expand All @@ -98,9 +98,9 @@ the script, e.g. `# C* dc-parallel: true`

The possible keys are:

`cluster-parallelism`, can the script be run on multiple clusters in parallel. Default value is `true`.
`cluster-parallel`, can the script be run on multiple clusters in parallel. Default value is `true`.

`dc-parallelism`, can the script be run on multiple data centers in the same cluster in parallel. Default value is `false`.
`dc-parallel`, can the script be run on multiple data centers in the same cluster in parallel. Default value is `false`.

`strategy`, how many nodes within one data center can the script be run on. Default is `topology`.
Can be one of:
Expand All @@ -112,14 +112,14 @@ Can be one of:
`description`, specifies a description for the script used in the help message.

`argument`, specifies an additional input parameter for the script, as well as a help text and an
optional default value.
optional default value.

## Job output

Cstar automatically saves the job status to file during operation.
Cstar automatically saves the job status to file during operation.

Standard output, standard error and exit status of each command run against a Cassandra host is
saved locally on machine where cstar is running. They are available under the users home directory in
saved locally on machine where cstar is running. They are available under the users home directory in
`.cstar/jobs/JOB_ID/HOSTNAME`

## How jobs are run
Expand Down Expand Up @@ -152,13 +152,13 @@ maximum age of a job before it's deleted by using the `--max-job-age` parameter.

# cstar run --command='service cassandra restart' --seed-host some-host

Explanation: Run the local cli command ``service cassandra restart`` on a cluster. If necessary, add ``sudo`` to the
Explanation: Run the local cli command ``service cassandra restart`` on a cluster. If necessary, add ``sudo`` to the
command.

# cstar puppet-upgrade-cassandra --seed-host some-host --puppet-branch=cass-2.2-upgrade

Explanation: Run the command puppet-upgrade-cassandra on a cluster. The puppet-upgrade-cassandra
command expects a parameter, the puppet branch to run in order to perform the Cassandra upgrade. See the
command expects a parameter, the puppet branch to run in order to perform the Cassandra upgrade. See the
puppet-upgrade-cassandra example [below](#Example-script-file).

# cstar puppet-upgrade-cassandra --help
Expand All @@ -173,7 +173,7 @@ The job id is the first line written on any executed job.

## Example script file

This is an example script file that would saved to `~/.cstar/commands/puppet-upgrade-cassandra.sh`. It upgrades a
This is an example script file that would saved to `~/.cstar/commands/puppet-upgrade-cassandra.sh`. It upgrades a
Cassandra cluster by running puppet on a different branch, then restarting the node, then upgrading the sstables.

# !/usr/bin/env bash
Expand All @@ -183,7 +183,7 @@ Cassandra cluster by running puppet on a different branch, then restarting the n
# C* description: Upgrade one or more clusters by switching to a different puppet branch
# C* argument: {"option":"--snapshot-name", "name":"SNAPSHOT_NAME", "description":"Name of pre-upgrade snapshot", "default":"preupgrade"}
# C* argument: {"option":"--puppet-branch", "name":"PUPPET_BRANCH", "description":"Name of puppet branch to switch to", "required":true}

nodetool snapshot -t $SNAPSHOT_NAME
sudo puppet --branch $PUPPET_BRANCH
sudo service cassandra restart
Expand Down
4 changes: 2 additions & 2 deletions cstar/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def get_endpoint_mapping(self, topology):
describering = cstar.nodetoolparser.parse_nodetool_describering(res.out)
range_mapping = cstar.nodetoolparser.convert_describering_to_range_mapping(describering)
mappings.append(cstar.endpoint_mapping.parse(range_mapping, topology, lookup=ip_lookup))

if not has_error:
return cstar.endpoint_mapping.merge(mappings)

Expand Down Expand Up @@ -336,7 +336,7 @@ def wait_for_any_job(self):
host, result = self.results.get(timeout=self.timeout)
self.returned_jobs.append((host, result))
self.handle_finished_jobs(self.returned_jobs)

self.wait_for_node_to_return(returned_job[0] for returned_job in self.returned_jobs)
self.returned_jobs = list()

Expand Down
4 changes: 2 additions & 2 deletions cstar/jobprinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def get_status(host):
lines.append("Cluster: " + cluster)
cluster_topology = original_topology.with_cluster(cluster)
dcs = sorted(cluster_topology.get_dcs())
for dc in dcs:
for cluster, dc in dcs:
if len(dcs):
lines.append("DC: " + dc)
dc_topology = cluster_topology.with_dc(dc)
dc_topology = cluster_topology.with_dc(cluster, dc)
hosts = sorted(dc_topology, key=lambda x: x.token)
status = "".join([get_status(host) for host in hosts])
if len(status) >= 6:
Expand Down
2 changes: 1 addition & 1 deletion cstar/resources/commands/run.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#! /bin/bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* dc-parallel: false
# C* strategy: topology
# C* description: Run an arbitrary shell command
# C* argument: {"option":"--command", "name":"COMMAND", "description":"Command to run", "required":true}
Expand Down
6 changes: 3 additions & 3 deletions cstar/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def find_next_host(strategy, topology, endpoint_mapping, progress, cluster_paral
remaining = remaining.with_cluster(next(iter(progress.running)).cluster)

if progress.running and not dc_parallel:
remaining = remaining.with_dc(next(iter(progress.running)).dc)
running_host = next(iter(progress.running))
remaining = remaining.with_dc(running_host.cluster, running_host.dc)

if not remaining:
return None
Expand All @@ -74,8 +75,7 @@ def _all_find_next_host(remaining, endpoint_mapping, running):


def _one_find_next_host(remaining, endpoint_mapping, running):
if running:
return None
remaining = remaining.without_dcs(remaining.get_dcs(running))
return remaining.first()


Expand Down
22 changes: 17 additions & 5 deletions cstar/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Host = namedtuple("Host", "fqdn ip dc cluster token is_up")
Host.__hash__ = lambda self: self.ip.__hash__()

Datacenter = namedtuple("Datacenter", "cluster dc")

def _host_eq(self, other):
if hasattr(other, 'ip'):
Expand Down Expand Up @@ -54,10 +55,20 @@ def with_cluster(self, cluster):
"""Return subtopology filtered on cluster"""
return Topology(filter(lambda host: cluster == host.cluster, self.hosts))

def with_dc(self, dc):
"""Return subtopology filtered on dc"""
def with_dc(self, cluster, dc):
"""Return subtopology filtered on pair cluster/dc for uniqness concerns"""
return Topology(filter(lambda host: dc == host.dc and cluster == host.cluster, self.hosts))

def with_dc_filter(self, dc):
"""Retrun subtopology filtered on dc only dc is used,
if clusters share a DC name, all clusters will be considered
Prefer 'with_dc()' function"""
return Topology(filter(lambda host: dc == host.dc, self.hosts))

def without_dcs(self, dcs):
"""Return subtopology with specific DCs filtered out"""
return Topology(filter(lambda host: Datacenter(host.cluster, host.dc) not in dcs, self.hosts))

def without_host(self, host):
"""Return subtopology without specified host"""
return Topology(self.hosts - set((host,)))
Expand All @@ -70,9 +81,10 @@ def get_clusters(self):
"""Returns a set containing all the individual clusters in this topology"""
return set(host.cluster for host in self.hosts)

def get_dcs(self):
"""Returns a set containing all the individual data centers in this topology"""
return set(host.dc for host in self.hosts)
def get_dcs(self, hosts=None):
"""Returns a set containing all the individual data centers for given hosts"""
subtopology = self if hosts is None else Topology(hosts)
return set(Datacenter(host.cluster,host.dc) for host in subtopology.hosts)

def get_down(self):
"""Returns a set of all nodes that are down in this topology"""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import sys

# circleci.py version
VERSION = '0.7.3'
VERSION = '0.7.4'

class VerifyVersionCommand(install):
"""Custom command to verify that the git tag matches our version"""
Expand Down
4 changes: 2 additions & 2 deletions tests/command_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
# C* description: Upgrade one or more clusters by switching to a different puppet branch
# C* argument: {"option":"--snapshot-name", "name":"SNAPSHOT_NAME", "description":"Name of pre-upgrade snapshot", "default":"preupgrade"}
# C* argument: {"option":"--puppet-branch", "name":"PUPPET_BRANCH", "description":"Name of puppet branch to switch to", "required":true}
#C* argument: {"option":"--bork", "name":"BORK", "description":"Bork bork bork"}
# C* argument: {"option":"--bork", "name":"BORK", "description":"Bork bork bork"}
# argument: {"option":"--bork2", "name":"BORK2", "description":"Bork bork bork"}
# C argument: {"option":"--bork3", "name":"BORK3", "description":"Bork bork bork"}
# C** argument: {"option":"--bork4", "name":"BORK4", "description":"Bork bork bork"}
Expand Down Expand Up @@ -62,7 +62,7 @@ def test_parse(self):
self.assertEqual(command.dc_parallel, True)
self.assertEqual(command.cluster_parallel, False)

self.assertEqual(len(command.arguments), 2)
self.assertEqual(len(command.arguments), 3)
self.assertEqual(command.arguments[0].option, "--snapshot-name")
self.assertEqual(command.arguments[0].name, "SNAPSHOT_NAME")
self.assertEqual(command.arguments[0].default, "preupgrade")
Expand Down
39 changes: 32 additions & 7 deletions tests/strategy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
def make_topology(size, has_down_host=False):
test_topology = []
for i in range(size):
test_topology.append(Host("a", "1.2.3.%d" % i, "eu", "cluster1", i * 100, not has_down_host))
test_topology.append(Host("b", "2.2.3.%d" % i, "us", "cluster1", (i * 100) + 1, True))
test_topology.append(Host("c", "3.2.3.%d" % i, "eu", "cluster2", i * 100, True))
test_topology.append(Host("d", "4.2.3.%d" % i, "us", "cluster2", (i * 100) + 1, True))
test_topology.append(Host("a%d" % i, "1.2.3.%d" % i, "eu", "cluster1", i * 100, not has_down_host))
test_topology.append(Host("b%d" % i, "2.2.3.%d" % i, "us", "cluster1", (i * 100) + 1, True))
test_topology.append(Host("c%d" % i, "3.2.3.%d" % i, "eu", "cluster2", i * 100, True))
test_topology.append(Host("d%d" % i, "4.2.3.%d" % i, "us", "cluster2", (i * 100) + 1, True))
return Topology(test_topology)


Expand Down Expand Up @@ -91,11 +91,20 @@ def test_all_per_dc(self):
state = State(top, Strategy.ALL, None, True, False)

state = add_work(state)
self.assertEqual(len(state.progress.running), 6)
self.assertEqual(len(state.progress.running), 3)
state = finish_work(state)

state = add_work(state)
self.assertEqual(len(state.progress.running), 6)
self.assertEqual(len(state.progress.running), 3)
state = finish_work(state)

state = add_work(state)
self.assertEqual(len(state.progress.running), 3)
state = finish_work(state)

state = add_work(state)
self.assertEqual(len(state.progress.running), 3)
state = finish_work(state)

def test_all_per_cluster(self):
top = make_topology(size=3)
Expand All @@ -110,7 +119,7 @@ def test_all_per_cluster(self):

def test_one(self):
top = make_topology(size=3)
state = State(top, Strategy.ONE, None, True, True)
state = State(top, Strategy.ONE, None, False, False)

state = add_work(state)
self.assertEqual(len(state.progress.running), 1)
Expand All @@ -119,6 +128,22 @@ def test_one(self):
state = add_work(state)
self.assertEqual(len(state.progress.running), 1)

def test_one_per_dc(self):
top = make_topology(size=3)
state = State(top, Strategy.ONE, None, True, True)

state = add_work(state)
self.assertEqual(len(state.progress.running), 4)
state = finish_work(state)

state = add_work(state)
self.assertEqual(len(state.progress.running), 4)
state = finish_work(state)

state = add_work(state)
self.assertEqual(len(state.progress.running), 4)
state = finish_work(state)

def test_topology_parallel(self):
top = make_topology(size=12)
mapping = make_mapping(top)
Expand Down
5 changes: 3 additions & 2 deletions tests/topology_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
class TopologyTest(unittest.TestCase):

def test_with_dc(self):
sub = test_topology.with_dc("us")
self.assertEqual(len(sub), 3)
sub = test_topology.with_dc("cluster1", "us")
self.assertEqual(len(sub), 2)
[self.assertEqual(host.dc, "us") for host in sub]
[self.assertEqual(host.cluster, "cluster1") for host in sub]

def test_with_cluster(self):
sub = test_topology.with_cluster("cluster1")
Expand Down

0 comments on commit f749b36

Please sign in to comment.