Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

WIP: Use hostnames rather than IP addresses for cassandra nodes #330

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile.pilot-cassandra
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ RUN chmod a+r /jmx_prometheus_javaagent.jar && touch /jmx_prometheus_javaagent.j
RUN chmod a+r /jmx_prometheus_javaagent.yaml && touch /jmx_prometheus_javaagent.yaml

# note: temporarily pulled directly from kubernetes/examples until we find a better place to put it
ADD https://github.com/kubernetes/examples/raw/master/cassandra/image/files/kubernetes-cassandra.jar /kubernetes-cassandra.jar
# Use the commit before https://github.com/kubernetes/examples/pull/201, because that broke everything.
ADD https://github.com/kubernetes/examples/raw/7ac4ceb28b98b78c5be337a141c2db10b862e31e/cassandra/image/files/kubernetes-cassandra.jar /kubernetes-cassandra.jar
RUN chmod a+r /kubernetes-cassandra.jar && touch /kubernetes-cassandra.jar

ADD navigator-pilot-cassandra_linux_amd64 /pilot
Expand Down
41 changes: 37 additions & 4 deletions hack/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,13 @@ function test_cassandracluster() {
fail_test "A ScaleOut event was not recorded"
fi

if ! retry TIMEOUT=300 stdout_equals 2 kubectl \
if ! retry TIMEOUT=600 stdout_equals 2 kubectl \
--namespace "${namespace}" \
get cassandracluster \
"${CASS_NAME}" \
"-o=jsonpath={ .status.nodePools['${CASS_NODEPOOL1_NAME}'].readyReplicas }"
then
fail_test "Second cassandra node did not become ready"
fail_test "The extra cassandra nodes did not become ready"
fi

# TODO: A better test would be to query the endpoints and check that only
Expand All @@ -395,7 +395,7 @@ function test_cassandracluster() {
--debug \
--execute='CONSISTENCY ALL; SELECT * FROM space1.testtable1'
then
fail_test "Data was not replicated to second node"
fail_test "Data was not replicated to all nodes"
fi

simulate_unresponsive_cassandra_process \
Expand All @@ -413,12 +413,45 @@ function test_cassandracluster() {
then
fail_test "Cassandra liveness probe failed to restart dead node"
fi

echo "Print a map of pod name to IP address before deleting node"
kubectl --namespace "${namespace}" get pods \
--output 'jsonpath={range .items[*]}{.spec.hostname}: {.status.podIP} {"\n"}{end}'

retry TIMEOUT=120 kube_delete_pod_and_test_for_new_ip "${namespace}" "${pod}"

echo "Get a map of pod name to IP address after pod IP changes"
kubectl --namespace "${namespace}" get pods \
'-o=jsonpath={range .items[*]}{.spec.hostname}: {.status.podIP} {"\n"}{end} '

echo "nodepool status should show a node joining..."
kubectl --namespace "${namespace}" exec \
"cass-${CASS_NAME}-${CASS_NODEPOOL1_NAME}-0" \
-- /bin/sh -c 'JVM_OPTS="" nodetool status'

echo "Wait for test data to be available on all nodes again"
if ! retry TIMEOUT=300 \
stdout_matches "testvalue1" \
cql_connect \
"${namespace}" \
"cass-${CASS_NAME}-nodes" \
"${CASS_CQL_PORT}" \
--debug \
--execute='CONSISTENCY ALL; SELECT * FROM space1.testtable1'
then
fail_test "Data was not restored to the restarted node"
fi

echo "nodepool status should now show all nodes Up and Normal (UN)"
kubectl --namespace "${namespace}" exec \
"cass-${CASS_NAME}-${CASS_NODEPOOL1_NAME}-0" \
-- /bin/sh -c 'JVM_OPTS="" nodetool status'
}

if [[ "test_cassandracluster" = "${TEST_PREFIX}"* ]]; then
CASS_TEST_NS="test-cassandra-${TEST_ID}"

for i in {1..2}; do
for i in {1..5}; do
kube_create_pv "${CASS_TEST_NS}-pv${i}" 5Gi default
done

Expand Down
109 changes: 106 additions & 3 deletions hack/libe2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ function stdout_matches() {
shift
local actual
actual=$("${@}")
grep --quiet "${expected}" <<<"${actual}"
grep "${expected}" <<<"${actual}"
}

function stdout_gt() {
Expand Down Expand Up @@ -253,6 +253,16 @@ function kube_create_pv() {
local name="${1}"
local capacity="${2}"
local storage_class="${3}"
local schedulable_nodes
schedulable_nodes=$(
kubectl get nodes \
--output \
'jsonpath={range $.items[*]}{.metadata.name} {.spec.taints[*].effect}{"\n"}{end}' \
| grep -v NoSchedule)
local node_name
node_name=$(python -c 'import random,sys; print(random.choice(sys.argv[1:]))' $schedulable_nodes)

local path="hostpath_pvs/${name}/"

kubectl create --filename - <<EOF
apiVersion: v1
Expand All @@ -261,15 +271,108 @@ metadata:
name: ${name}
labels:
purpose: test
annotations:
"volume.alpha.kubernetes.io/node-affinity": '{
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{ "matchExpressions": [
{ "key": "kubernetes.io/hostname",
"operator": "In",
"values": ["${node_name}"]
}
]}
]}
}'
spec:
accessModes:
- ReadWriteOnce
capacity:
storage: ${capacity}
hostPath:
path: /tmp/hostpath_pvs/${name}/
local:
path: "/tmp/${path}"
storageClassName: ${storage_class}
persistentVolumeReclaimPolicy: Delete
EOF

# Run a job (on the target node) to create the host directory.
kubectl create --namespace kube-system --filename - <<EOF
apiVersion: batch/v1
kind: Job
metadata:
name: "navigator-e2e-create-pv-${name}"
labels:
purpose: test
spec:
template:
spec:
restartPolicy: "OnFailure"
nodeSelector:
kubernetes.io/hostname: "${node_name}"
containers:
- name: "mkdir"
image: "busybox:latest"
resources:
limits:
cpu: "10m"
memory: "8Mi"
requests:
cpu: "10m"
memory: "8Mi"
securityContext:
privileged: true
command:
- "/bin/mkdir"
- "-p"
- "/HOST_TMP/${path}"
volumeMounts:
- mountPath: /HOST_TMP
name: host-tmp
volumes:
- name: host-tmp
hostPath:
path: /tmp
EOF
}

function kube_get_pod_uid() {
local namespace="${1}"
local pod="${2}"

kubectl --namespace "${namespace}" get pod \
"${pod}" \
--output "jsonpath={ .metadata.uid }" \
| egrep '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
}

function kube_get_pod_ip() {
local namespace="${1}"
local pod="${2}"

kubectl --namespace "${namespace}" get pod \
"${pod}" \
--output "jsonpath={ .status.podIP }" | \
egrep '^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$'
}

function kube_delete_pod_and_test_for_new_ip() {
local namespace="${1}"
local pod="${2}"

local original_ip
original_ip=$(retry kube_get_pod_ip "${namespace}" "${pod}")

local original_uid
original_uid=$(retry kube_get_pod_uid "${namespace}" "${pod}")

# Delete the pod without grace period
kubectl --namespace "${namespace}" delete pod "${pod}" --grace-period=0 --force || true
# Run another pod immediately, to hopefully take the IP address of the deleted pod
in_cluster_command "${namespace}" "busybox:latest" "/bin/sleep" "2"
retry not stdout_equals "${original_uid}" \
retry kube_get_pod_uid "${namespace}" "${pod}"

local new_ip
new_ip=$(retry kube_get_pod_ip "${namespace}" "${pod}")

[[ "${original_ip}" != "${new_ip}" ]]
}
8 changes: 4 additions & 4 deletions hack/testdata/cass-cluster-test.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ spec:
nodeSelector: {}
resources:
requests:
cpu: "500m"
memory: "2Gi"
cpu: "250m"
memory: "1Gi"
limits:
cpu: "500m"
memory: "2Gi"
cpu: "250m"
memory: "1Gi"
pilotImage:
repository: "${NAVIGATOR_IMAGE_REPOSITORY}/navigator-pilot-cassandra"
tag: "${NAVIGATOR_IMAGE_TAG}"
Expand Down
13 changes: 13 additions & 0 deletions internal/test/util/generate/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,16 @@ func CassandraClusterNodePool(c CassandraClusterNodePoolConfig) *v1alpha1.Cassan
Replicas: c.Replicas,
}
}

type ServiceConfig struct {
Name, Namespace string
}

func Service(c ServiceConfig) *core.Service {
return &core.Service{
ObjectMeta: metav1.ObjectMeta{
Name: c.Name,
Namespace: c.Namespace,
},
}
}
22 changes: 0 additions & 22 deletions pkg/cassandra/nodetool/nodetool.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ func (t *tool) Status() (NodeMap, error) {
}

nodes := NodeMap{}
mappedNodes := sets.NewString()
for host, id := range ssInfo.HostIdMap {
mappedNodes.Insert(host)
nodes[host] = &Node{
Host: host,
ID: id,
Expand All @@ -111,15 +109,6 @@ func (t *tool) Status() (NodeMap, error) {
liveNodes, unreachableNodes,
)
}
if !mappedNodes.IsSuperset(liveNodes.Union(unreachableNodes)) {
return nil, fmt.Errorf(
"mapped nodes must be a superset of Live and Unreachable nodes. "+
"Live: %v, "+
"Unreachable: %v, "+
"Mapped: %v",
liveNodes, unreachableNodes, mappedNodes,
)
}

leavingNodes := sets.NewString(ssInfo.LeavingNodes...)
joiningNodes := sets.NewString(ssInfo.JoiningNodes...)
Expand All @@ -135,17 +124,6 @@ func (t *tool) Status() (NodeMap, error) {
)
}

if !mappedNodes.IsSuperset(leavingNodes.Union(joiningNodes).Union(movingNodes)) {
return nil, fmt.Errorf(
"mapped nodes must be a superset of leaving, joining and moving nodes. "+
"Leaving: %v, "+
"Joining: %v, "+
"Moving: %v, "+
"Mapped: %v",
leavingNodes, joiningNodes, movingNodes, mappedNodes,
)
}

for host, node := range nodes {
switch {
case liveNodes.Has(host):
Expand Down
24 changes: 0 additions & 24 deletions pkg/cassandra/nodetool/nodetool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,6 @@ func TestNodeToolStatus(t *testing.T) {
},
},
},
{
title: "Live node not in HostIdMap",
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(
`{"value": {"HostIdMap": {}, "LiveNodes": ["192.0.2.254"]}}`,
))
if err != nil {
t.Fatal(err)
}
},
expectedError: true,
},
{
title: "Live intersects with unreachable",
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
Expand All @@ -258,18 +246,6 @@ func TestNodeToolStatus(t *testing.T) {
},
expectedError: true,
},
{
title: "Leaving node not in HostIdMap",
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(
`{"value": {"HostIdMap": {}, "LeavingNodes": ["192.0.2.254"]}}`,
))
if err != nil {
t.Fatal(err)
}
},
expectedError: true,
},
{
title: "Leaving intersects with joining",
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
Expand Down
9 changes: 8 additions & 1 deletion pkg/controllers/cassandra/actions/create_nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/pkg/errors"

"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
Expand All @@ -21,8 +23,13 @@ func (a *CreateNodePool) Name() string {
}

func (a *CreateNodePool) Execute(s *controllers.State) error {
headlessService := nodepool.HeadlessServiceForClusterNodePool(a.Cluster, a.NodePool)
_, err := s.Clientset.CoreV1().Services(headlessService.Namespace).Create(headlessService)
if err != nil && !k8sErrors.IsAlreadyExists(err) {
return errors.Wrap(err, "unable to create headless service for nodepool")
}
ss := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool)
_, err := s.Clientset.AppsV1beta1().StatefulSets(ss.Namespace).Create(ss)
_, err = s.Clientset.AppsV1beta1().StatefulSets(ss.Namespace).Create(ss)
if k8sErrors.IsAlreadyExists(err) {
return nil
}
Expand Down
Loading