diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 7139ea91..8240bc53 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - suite: [functions, robustness, operators, reboot] + suite: [functions, robustness, operators, reboot, repair] env: SUITE: ${{ matrix.suite }} CLUSTER: "cke-cluster.yml" diff --git a/CHANGELOG.md b/CHANGELOG.md index ec1d7765..486c34e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ This project employs a versioning scheme described in [RELEASE.md](RELEASE.md#ve ## [Unreleased] +### Added + +- Implement repair queue in [#692](https://github.com/cybozu-go/cke/pull/692) + ## [1.27.3] ### Changed diff --git a/cluster.go b/cluster.go index 2a575165..b5e14a50 100644 --- a/cluster.go +++ b/cluster.go @@ -285,6 +285,41 @@ type Reboot struct { const DefaultRebootEvictionTimeoutSeconds = 600 const DefaultMaxConcurrentReboots = 1 +type Repair struct { + RepairProcedures []RepairProcedure `json:"repair_procedures"` + MaxConcurrentRepairs *int `json:"max_concurrent_repairs,omitempty"` + ProtectedNamespaces *metav1.LabelSelector `json:"protected_namespaces,omitempty"` + EvictRetries *int `json:"evict_retries,omitempty"` + EvictInterval *int `json:"evict_interval,omitempty"` + EvictionTimeoutSeconds *int `json:"eviction_timeout_seconds,omitempty"` +} + +type RepairProcedure struct { + MachineTypes []string `json:"machine_types"` + RepairOperations []RepairOperation `json:"repair_operations"` +} + +type RepairOperation struct { + Operation string `json:"operation"` + RepairSteps []RepairStep `json:"repair_steps"` + HealthCheckCommand []string `json:"health_check_command"` + CommandTimeoutSeconds *int `json:"command_timeout_seconds,omitempty"` +} + +type RepairStep struct { + RepairCommand []string `json:"repair_command"` + CommandTimeoutSeconds *int `json:"command_timeout_seconds,omitempty"` + CommandRetries *int `json:"command_retries,omitempty"` + CommandInterval *int `json:"command_interval,omitempty"` + NeedDrain bool `json:"need_drain,omitempty"` + WatchSeconds *int `json:"watch_seconds,omitempty"` +} + +const DefaultMaxConcurrentRepairs = 1 +const DefaultRepairEvictionTimeoutSeconds = 600 +const DefaultRepairHealthCheckCommandTimeoutSeconds = 30 +const DefaultRepairCommandTimeoutSeconds = 30 + // Options is a set of optional parameters for k8s components. type Options struct { Etcd EtcdParams `json:"etcd"` @@ -307,6 +342,7 @@ type Cluster struct { DNSServers []string `json:"dns_servers"` DNSService string `json:"dns_service"` Reboot Reboot `json:"reboot"` + Repair Repair `json:"repair"` Options Options `json:"options"` } diff --git a/cluster_test.go b/cluster_test.go index 56cba618..41a4bc47 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -2,6 +2,7 @@ package cke import ( "os" + "slices" "testing" "github.com/google/go-cmp/cmp" @@ -131,6 +132,90 @@ func testClusterYAML(t *testing.T) { if c.Reboot.ProtectedNamespaces.MatchLabels["app"] != "sample" { t.Error(`c.Reboot.ProtectedNamespaces.MatchLabels["app"] != "sample"`) } + if len(c.Repair.RepairProcedures) != 1 { + t.Fatal(`len(c.Repair.RepairProcedures) != 1`) + } + if !slices.Equal(c.Repair.RepairProcedures[0].MachineTypes, []string{"Cray-1", "Cray-2"}) { + t.Error(`c.Repair.RepairProcedures[0].MachineTypes != {"Cray-1", "Cray-2"}`) + } + if len(c.Repair.RepairProcedures[0].RepairOperations) != 1 { + t.Fatal(`len(c.Repair.RepairProcedures[0].RepairOperations) != 1`) + } + if c.Repair.RepairProcedures[0].RepairOperations[0].Operation != "unreachable" { + t.Error(`c.Repair.RepairProcedures[0].RepairOperations[0].OperationName != "unreachable"`) + } + if len(c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps) != 2 { + t.Fatal(`len(c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps) != 2`) + } + if !slices.Equal(c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand, []string{"reset", "remotely"}) { + t.Error(`c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand != {"reset", "remotely"}`) + } + if c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandTimeoutSeconds == nil { + t.Fatal(`c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandTimeoutSeconds == nil`) + } + if *c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandTimeoutSeconds != 10 { + t.Error(`*c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandTimeoutSeconds != 10`) + } + if c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandRetries == nil { + t.Fatal(`c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandRetries == nil`) + } + if *c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandRetries != 1 { + t.Error(`*c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandRetries != 1`) + } + if c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandInterval == nil { + t.Fatal(`c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandInterval == nil`) + } + if *c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandInterval != 5 { + t.Error(`*c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].CommandInterval != 5`) + } + if !c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain { + t.Fatal(`!c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain`) + } + if c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds == nil { + t.Fatal(`c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds == nil`) + } + if *c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds != 60 { + t.Error(`*c.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds != 60`) + } + if !slices.Equal(c.Repair.RepairProcedures[0].RepairOperations[0].HealthCheckCommand, []string{"knock"}) { + t.Error(`c.Repair.RepairProcedures[0].RepairOperations[0].HealthCheckCommand != {"knock"}`) + } + if c.Repair.RepairProcedures[0].RepairOperations[0].CommandTimeoutSeconds == nil { + t.Fatal(`c.Repair.RepairProcedures[0].RepairOperations[0].CommandTimeoutSeconds == nil`) + } + if *c.Repair.RepairProcedures[0].RepairOperations[0].CommandTimeoutSeconds != 30 { + t.Error(`*c.Repair.RepairProcedures[0].RepairOperations[0].CommandTimeoutSeconds != 30`) + } + if c.Repair.MaxConcurrentRepairs == nil { + t.Fatal(`c.Repair.MaxConcurrentRepairs == nil`) + } + if *c.Repair.MaxConcurrentRepairs != 2 { + t.Error(`*c.Repair.MaxConcurrentRepairs != 2`) + } + if c.Repair.ProtectedNamespaces == nil { + t.Fatal(`c.Repair.ProtectedNamespaces == nil`) + } + if c.Repair.ProtectedNamespaces.MatchLabels["app"] != "protected" { + t.Error(`c.Repair.ProtectedNamespaces.MatchLabels["app"] != "protected"`) + } + if c.Repair.EvictRetries == nil { + t.Fatal(`c.Repair.EvictRetries == nil`) + } + if *c.Repair.EvictRetries != 3 { + t.Error(`*c.Repair.EvictRetries != 3`) + } + if c.Repair.EvictInterval == nil { + t.Fatal(`c.Repair.EvictInterval == nil`) + } + if *c.Repair.EvictInterval != 5 { + t.Error(`*c.Repair.EvictInterval != 5`) + } + if c.Repair.EvictionTimeoutSeconds == nil { + t.Fatal(`c.Repair.EvictionTimeoutSeconds == nil`) + } + if *c.Repair.EvictionTimeoutSeconds != 120 { + t.Error(`*c.Repair.EvictionTimeoutSeconds != 120`) + } if c.Options.Etcd.VolumeName != "myetcd" { t.Error(`c.Options.Etcd.VolumeName != "myetcd"`) } diff --git a/docs/ckecli.md b/docs/ckecli.md index 6c84e298..fd325dc0 100644 --- a/docs/ckecli.md +++ b/docs/ckecli.md @@ -49,6 +49,15 @@ $ ckecli [--config FILE] args... - [`ckecli reboot-queue cancel INDEX`](#ckecli-reboot-queue-cancel-index) - [`ckecli reboot-queue cancel-all`](#ckecli-reboot-queue-cancel-all) - [`ckecli reboot-queue reset-backoff`](#ckecli-reboot-queue-reset-backoff) +- [`ckecli repair-queue`](#ckecli-repair-queue) + - [`ckecli repair-queue enable|disable`](#ckecli-repair-queue-enabledisable) + - [`ckecli repair-queue is-enabled`](#ckecli-repair-queue-is-enabled) + - [`ckecli repair-queue add OPERATION MACHINE_TYPE ADDRESS`](#ckecli-repair-queue-add-operation-machine_type-address) + - [`ckecli repair-queue list`](#ckecli-repair-queue-list) + - [`ckecli repair-queue delete INDEX`](#ckecli-repair-queue-delete-index) + - [`ckecli repair-queue delete-finished`](#ckecli-repair-queue-delete-finished) + - [`ckecli repair-queue delete-unfinished`](#ckecli-repair-queue-delete-unfinished) + - [`ckecli repair-queue reset-backoff`](#ckecli-repair-queue-reset-backoff) - [`ckecli sabakan`](#ckecli-sabakan) - [`ckecli sabakan enable|disable`](#ckecli-sabakan-enabledisable) - [`ckecli sabakan is-enabled`](#ckecli-sabakan-is-enabled) @@ -311,6 +320,53 @@ Cancel all the reboot queue entries. Reset `drain_backoff_count` and `drain_backoff_expire` of the entries in reboot queue. Resetting these values makes CKE try to reboot nodes again immediately. +## `ckecli repair-queue` + +Control a queue of repair requests. + +### `ckecli repair-queue enable|disable` + +Enable/Disable processing repair queue entries. + +### `ckecli repair-queue is-enabled` + +Show repair queue is enabled or disabled. +This displays `true` or `false`. + +### `ckecli repair-queue add OPERATION MACHINE_TYPE ADDRESS` + +Append a repair request to the repair queue. +The repair target is a machine with an IP address `ADDRESS` and a machine type `MACHINE_TYPE`. +The machine should be processed with an operation `OPERATION`. + +### `ckecli repair-queue list` + +List the entries in the repair queue. + +### `ckecli repair-queue delete INDEX` + +Delete the specified repair queue entry. +This has two meanings: this clears up an old entry if the specified entry has finished and cancels an ongoing entry otherwise. + +Unlike the reboot queue, repair queue entries remain in the queue even after they finish. + +### `ckecli repair-queue delete-finished` + +Delete all finished repair queue entries. +Entries in `succeeded` or `failed` status are deleted. +This displays the index numbers of deleted entries, one per line. + +### `ckecli repair-queue delete-unfinished` + +Delete all unfinished repair queue entries. +Entries not in `succeeded` or `failed` status are deleted. +This displays the index numbers of deleted entries, one per line. + +### `ckecli repair-queue reset-backoff` + +Reset `drain_backoff_count` and `drain_backoff_expire` of the entries in repair queue. +Resetting these values makes CKE try to drain machines again immediately. + ## `ckecli sabakan` Control [sabakan integration feature](sabakan-integration.md). diff --git a/docs/cluster.md b/docs/cluster.md index dc1caa63..30f0f96a 100644 --- a/docs/cluster.md +++ b/docs/cluster.md @@ -8,6 +8,8 @@ a YAML or JSON object with these fields: - [Node](#node) - [Taint](#taint) - [Reboot](#reboot) +- [Repair](#repair) + - [RepairProcedure](#repairprocedure) - [Options](#options) - [ServiceParams](#serviceparams) - [Mount](#mount) @@ -27,6 +29,7 @@ a YAML or JSON object with these fields: | `dns_servers` | false | array | List of upstream DNS server IP addresses. | | `dns_service` | false | string | Upstream DNS service name with namespace as `namespace/service`. | | `reboot` | false | `Reboot` | See [Reboot](#reboot). | +| `repair` | false | `Repair` | See [Repair](#repair). | | `options` | false | `Options` | See [Options](#options). | * `control_plane_tolerations` is used in [sabakan integration](sabakan-integration.md#strategy). @@ -68,7 +71,7 @@ Reboot ------ | Name | Required | Type | Description | -|----------------------------| -------- | -------------------------------- |-------------------------------------------------------------------------| +| -------------------------- | -------- | -------------------------------- | ----------------------------------------------------------------------- | | `reboot_command` | true | array | A command to reboot. List of strings. | | `boot_check_command` | true | array | A command to check nodes booted. List of strings. | | `eviction_timeout_seconds` | false | *int | Deadline for eviction. Must be positive. Default: 600 (10 minutes). | @@ -98,6 +101,47 @@ The Pods in the non-protected namespaces are also tried to be deleted gracefully If `protected_namespaces` is not given, all namespaces are protected. +Repair +------ + +| Name | Required | Type | Description | +| -------------------------- | -------- | -------------------------------- | --------------------------------------------------------------------- | +| `repair_procedures` | true | `[]RepairProcedure` | List of [repair procedures](#repairprocedure). | +| `max_concurrent_repairs` | false | \*int | Maximum number of machines to be repaired concurrently. Default: 1 | +| `protected_namespaces` | false | [`LabelSelector`][LabelSelector] | A label selector to protect namespaces. | +| `evict_retries` | false | \*int | Number of eviction retries, not including initial attempt. Default: 0 | +| `evict_interval` | false | \*int | Number of time between eviction retries in seconds. Default: 0 | +| `eviction_timeout_seconds` | false | *int | Deadline for eviction. Must be positive. Default: 600 (10 minutes) | + +The repair configurations control the [repair functionality](repair.md). + +### RepairProcedure + +| Name | Required | Type | Description | +| ------------------- | -------- | ------------------- | ------------------------------------------------------------------------------------ | +| `machine_types` | true | array | Type names of the target machines to be repaired by this procedure. List of strings. | +| `repair_operations` | true | `[]RepairOperation` | List of [repair operations](#repairoperation). | + +#### RepairOperation + +| Name | Required | Type | Description | +| ------------------------- | -------- | -------------- | --------------------------------------------------------------- | +| `operation` | true | string | Name of repair operation. | +| `repair_steps` | true | `[]RepairStep` | Sequences of [repair steps](#repairstep). | +| `health_check_command` | true | array | A command to check repaired machine's health. List of strings. | +| `command_timeout_seconds` | false | \*int | Deadline for health retrieval. Zero means infinity. Default: 30 | + +##### RepairStep + +| Name | Required | Type | Description | +| ------------------------- | -------- | ----- | -------------------------------------------------------------------------------------------------------------------------------- | +| `repair_command` | true | array | A command and its arguments to repair the target machine. List of strings. | +| `command_timeout_seconds` | false | \*int | Deadline for repairing. Zero means infinity. Default: 30 | +| `command_retries` | false | \*int | Number of repair retries, not including initial attempt. Default: 0 | +| `command_interval` | false | \*int | Interval of time between repair retries in seconds. Default: 0 | +| `need_drain` | false | bool | If true, perform drain of Pods on the target machine prior to the execution of the repair command. Default: false | +| `watch_seconds` | false | \*int | Follow-up duration in seconds to watch whether the machine becomes healthy after the execution of the repair command. Default: 0 | + Options ------- diff --git a/docs/metrics.md b/docs/metrics.md index f1b8a01d..79329145 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -3,20 +3,23 @@ Metrics CKE exposes the following metrics with the Prometheus format at `/metrics` REST API endpoint. All these metrics are prefixed with `cke_` -| Name | Description | Type | Labels | -| ------------------------------------- | -------------------------------------------------------------------------- | ----- | ---------------- | -| leader | True (=1) if this server is the leader of CKE. | Gauge | | -| node_reboot_status | The reboot status of a node. | Gauge | `node`, `status` | -| operation_phase | 1 if CKE is operating in the phase specified by the `phase` label. | Gauge | `phase` | -| operation_phase_timestamp_seconds | The Unix timestamp when `operation_phase` was last updated. | Gauge | | -| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | | -| reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | | -| reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` | -| reboot_queue_running | True (=1) if reboot queue is running. | Gauge | | -| sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | | -| sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | | -| sabakan_workers | The number of worker nodes for each role. | Gauge | `role` | -| sabakan_unused_machines | The number of unused machines. | Gauge | | +| Name | Description | Type | Labels | +| ------------------------------------- | -------------------------------------------------------------------------- | ----- | ------------------- | +| leader | True (=1) if this server is the leader of CKE. | Gauge | | +| node_reboot_status | The reboot status of a node. | Gauge | `node`, `status` | +| machine_repair_status | The repair status of a machine. | Gauge | `address`, `status` | +| operation_phase | 1 if CKE is operating in the phase specified by the `phase` label. | Gauge | `phase` | +| operation_phase_timestamp_seconds | The Unix timestamp when `operation_phase` was last updated. | Gauge | | +| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | | +| reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | | +| reboot_queue_items | The number of reboot queue entries remaining per status. | Gauge | `status` | +| reboot_queue_running | True (=1) if reboot queue is running. | Gauge | | +| repair_queue_enabled | True (=1) if repair queue is enabled. | Gauge | | +| repair_queue_items | The number of repair queue entries remaining per status. | Gauge | `status` | +| sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | | +| sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | | +| sabakan_workers | The number of worker nodes for each role. | Gauge | `role` | +| sabakan_unused_machines | The number of unused machines. | Gauge | | All metrics but `leader` are available only when the server is the leader of CKE. `sabakan_*` metrics are available only when [Sabakan integration](sabakan-integration.md) is enabled. diff --git a/docs/repair.md b/docs/repair.md new file mode 100644 index 00000000..906399b5 --- /dev/null +++ b/docs/repair.md @@ -0,0 +1,137 @@ +Repair Machines +=============== + +CKE provides the functionality of managing operation steps to repair a machine. +This functionality has similar data structures and configuration parameters to the [reboot functionality](reboot.md). + +Unlike other functionalities of CKE, this repair functionality manages a specified machine even if the machine is not a member of the Kubernetes cluster. + +Description +----------- + +First see the [description of the reboot functionality](reboot.md#description). +The behavior of the repair functionality is almost the same with the reboot. +A significant difference is that the repair functionality issues a series of repair commands instead of one reboot command. + +An administrator can request CKE to repair a machine via `ckecli repair-queue add OPERATION MACHINE_TYPE ADDRESS`. +The request is appended to the repair queue. +Each request entry corresponds to a machine. + +The command `ckecli repair-queue add` takes extra two arguments in addition to the IP address of the target machine; the operation name and the type of the target machine. + +CKE watches the repair queue and handles the repair requests. +CKE processes a repair request in the following manner: + +1. determines a sequence of repair steps to apply according to the machine type and the operation name. +2. executes the steps sequentially: + 1. if Pod eviction is required in the step and the machine is used as a Node of the Kubernetes cluster: + 1. cordons the Node to mark it as unschedulable. + 2. checks the existence of Job-managed Pods on the Node. If such Pods exist on the Node, uncordons the Node immediately and processes it again later. + 3. evicts (and/or deletes) non-DaemonSet-managed Pods on the Node. + 2. executes a repair command specified in the step. + 3. watches whether the machine becomes healthy by running a check command specified for the machine type. + 4. if the node becomes healthy, uncordons the node, recovers it, and finishs repairing. +3. if the node is not healthy even after all steps are executed, marks the entry as failed. + +Unlike the reboot queue, repair queue entries remain in the queue even after they finished whether they succeeded or failed. +An administrator can delete a finished queue entry by `ckecli repair-queue delete INDEX`. + +Data Schema +----------- + +### `RepairQueueEntry` + +| Name | Type | Description | +| ---------------------- | --------- | ---------------------------------------------------------------- | +| `index` | string | Index number of the entry, formatted as a string. | +| `address` | string | Address of the machine to be repaired. | +| `nodename` | string | Name of the Kubernetes Node corresponding to the target machine. | +| `machine_type` | string | Type name of the target machine. | +| `operation` | string | Operation name to be applied for the target machine. | +| `status` | string | One of `queued`, `processing`, `succeeded`, `failed`. | +| `step` | int | Index number of the current step. | +| `step_status` | string | One of `waiting`, `draining`, `watching`. | +| `last_transition_time` | time.Time | Time of the last transition of `status`+`step`+`step_status`. | +| `drain_backoff_count` | int | Count of drain retries, used for linear backoff algorithm. | +| `drain_backoff_expire` | time.Time | Expiration time of drain retry wait. | + +Detailed Behavior and Parameters +-------------------------------- + +The behavior of the repair functionality is configurable mainly through the [cluster configuration](cluster.md#repair). +The following are the detailed descriptions of the repair functionality and its parameters. + +### Repair configuration + +`repair_procedures` is a list of [repair procedures](#repairprocedures) for various types of machines. +CKE selects an appropriate repair procedure according to the `TYPE` parameter specified in `ckecli repair-queue add` command line. + +At most `max_concurrent_repairs` entries are repaired concurrently. + +Other parameters under `repair` are used for [Pod eviction](#podeviction). + +#### Pod eviction + +If Pod eviction is required in the current repair step of the selected repair procedure, CKE tries to delete Pods running on the target machine before it executes a repair command. + +If a Pod to be deleted belongs to one of the Namespaces selected by `protected_namespaces`, CKE tries to delete that Pod gracefully with the Kubernetes Eviction API. +If `protected_namespaces` is not given, all namespaces are protected. + +If the Eviction API call has failed, i.e., if CKE fails to start the Pod deletion, CKE retries it for `evict_retries` times with `evict_interval`-second interval. +If CKE finally fails to start the Pod deletion, it interrupts the deletion, uncordons the target machine, waits for a while using a linear backoff algorithm, and retries the deletion. + +Once CKE succeeds in starting the Pod deletion for all Pods, it waits for the completion of the deletion. +If the Pod deletion does not finish during `eviction_timeout_seconds`, CKE interrupts the deletion, uncordons the target machine, waits for a while using a linear backoff algorithm, and retries the deletion. +The number of retries of the whole process of the Pod deletion is unlimited. + +### Repair procedures + +A repair procedure is a set of [repair operations](#repairoperations) for a certain type of machines. + +`machine_types` is a list of machine type names. +CKE decides to apply a repair procedure if its `machine_types` contains `TYPE` of a repair queue entry, where `TYPE` is specified in the `ckecli repair-queue add` command line. + +`repair_operations` maps operation names to [repair operations](#repairoperations). +More properly speaking, this is not implemented as a mapping but as a list for readability; each element has its name as its property. +CKE decides to execute a repair operation if its name matches `OPERATION` of a repair queue entry, where `OPERATION` is specified in the `ckecli repair-queue add` command line. + +### Repair operations + +A repair operation is a sequence of [repair steps](#repairsteps) and their parameters. + +`operation` is the name of a repair operation. +CKE decides to execute a repair operations if its `operation` matches `OPERATION` of a repair queue entry, where `OPERATION` is specified in the `ckecli repair-queue add` command line. + +`repair_steps` is a sequence of [repair steps](#repairsteps). + +`health_check_command` and its timeout are used during the execution of the repair steps. +When CKE executes the check command, it appends the IP address of the target machine to the command. +The command should return a string `true` if it evaluates the machine as healthy. + +### Repair steps + +A repair step is a combination of: +1. (optional) eviction operation +2. repair command +3. (optional) watch duration + +If `need_drain` is true and the target machine is used as a Kubernetes Node, CKE tries to [drain the Node](#podeviction) before starting a repair command. + +`repair_command` is a command to repair a machine. +When CKE executes the repair command, it appends the IP address of the target machine to the command. +If the command fails, CKE changes the status of the queue entry to `failed` and aborts the repair steps. + +After executing `repair_command`, CKE watches whether the machine becomes healthy. +If the health check command returns `true`, CKE finishes repairing and changes the status of the queue entry to `succeeded`. +If the command does not return `true` during `watch_seconds`, CKE proceeds to the next step if exists. +If CKE reaches at the end of the steps, it changes the status of the queue entry to `failed`. + +Enabling/Disabling +------------------ + +An administrator can enable/disable the processing of the repair queue by `ckecli repair-queue enable|disable`. +If the queue is disabled, CKE: +* does not proceed to the [Pod eviction](#podeviction) nor the repair command execution +* abandons ongoing Pod eviction +* still runs health check commands and migrates entries to succeeded/failed +* still dequeues entries if instructed by `ckecli repair-queue delete` diff --git a/metrics/collector.go b/metrics/collector.go index 6efe25ed..accd29ec 100644 --- a/metrics/collector.go +++ b/metrics/collector.go @@ -42,6 +42,8 @@ type storage interface { IsRebootQueueDisabled(ctx context.Context) (bool, error) IsRebootQueueRunning(ctx context.Context) (bool, error) GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error) + IsRepairQueueDisabled(ctx context.Context) (bool, error) + GetRepairsEntries(ctx context.Context) ([]*cke.RepairQueueEntry, error) GetCluster(ctx context.Context) (*cke.Cluster, error) } @@ -58,9 +60,9 @@ func NewCollector(storage storage) prometheus.Collector { collectors: []prometheus.Collector{operationPhase, operationPhaseTimestampSeconds}, isAvailable: isOperationPhaseAvailable, }, - "reboot": { + "node": { collectors: []prometheus.Collector{nodeMetricsCollector{storage}}, - isAvailable: isRebootAvailable, + isAvailable: isNodeAvailable, }, "sabakan_integration": { collectors: []prometheus.Collector{sabakanIntegrationSuccessful, sabakanIntegrationTimestampSeconds, sabakanWorkers, sabakanUnusedMachines}, @@ -138,9 +140,18 @@ func (c nodeMetricsCollector) Describe(ch chan<- *prometheus.Desc) { ch <- rebootQueueItems ch <- rebootQueueRunning ch <- nodeRebootStatus + + ch <- repairQueueEnabled + ch <- repairQueueItems + ch <- machineRepairStatus } func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) { + c.collectReboot(ch) + c.collectRepair(ch) +} + +func (c nodeMetricsCollector) collectReboot(ch chan<- prometheus.Metric) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() @@ -225,3 +236,67 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) { } } } + +func (c nodeMetricsCollector) collectRepair(ch chan<- prometheus.Metric) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + disabled, err := c.storage.IsRepairQueueDisabled(ctx) + if err != nil { + log.Error("failed to get if repair queue is enabled", map[string]interface{}{ + log.FnError: err, + }) + return + } + var enabled float64 + if !disabled { + enabled = 1 + } + + entries, err := c.storage.GetRepairsEntries(ctx) + if err != nil { + log.Error("failed to get repairs entries", map[string]interface{}{ + log.FnError: err, + }) + return + } + + cluster, err := c.storage.GetCluster(ctx) + if err != nil { + log.Error("failed to get cluster", map[string]interface{}{ + log.FnError: err, + }) + return + } + itemCounts := cke.CountRepairQueueEntries(entries) + machineStatus := cke.BuildMachineRepairStatus(cluster.Nodes, entries) + + ch <- prometheus.MustNewConstMetric( + repairQueueEnabled, + prometheus.GaugeValue, + enabled, + ) + for status, count := range itemCounts { + ch <- prometheus.MustNewConstMetric( + repairQueueItems, + prometheus.GaugeValue, + float64(count), + status, + ) + } + for address, statuses := range machineStatus { + for status, matches := range statuses { + value := float64(0) + if matches { + value = 1 + } + ch <- prometheus.MustNewConstMetric( + machineRepairStatus, + prometheus.GaugeValue, + value, + address, + status, + ) + } + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go index dd3397a6..9aba8b56 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -68,6 +68,27 @@ var nodeRebootStatus = prometheus.NewDesc( nil, ) +var repairQueueEnabled = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "repair_queue_enabled"), + "1 if repair queue is enabled.", + nil, + nil, +) + +var repairQueueItems = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "repair_queue_items"), + "The number of repair queue entries remaining per status.", + []string{"status"}, + nil, +) + +var machineRepairStatus = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "machine_repair_status"), + "The repair status of a machine.", + []string{"address", "status"}, + nil, +) + var sabakanIntegrationSuccessful = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: namespace, diff --git a/metrics/updater.go b/metrics/updater.go index e805b04d..b8a1e1fc 100644 --- a/metrics/updater.go +++ b/metrics/updater.go @@ -39,7 +39,7 @@ func isOperationPhaseAvailable(_ context.Context, _ storage) (bool, error) { return isLeader, nil } -func isRebootAvailable(_ context.Context, _ storage) (bool, error) { +func isNodeAvailable(_ context.Context, _ storage) (bool, error) { return isLeader, nil } diff --git a/metrics/updater_test.go b/metrics/updater_test.go index d9fdba04..86ea785d 100644 --- a/metrics/updater_test.go +++ b/metrics/updater_test.go @@ -57,6 +57,23 @@ type updateRebootQueueItemsTestCase struct { expected map[string]float64 } +type repairInput struct { + enabled bool + entries []*cke.RepairQueueEntry +} + +type repairExpected struct { + enabled float64 + items map[string]float64 + machineStatus map[string]map[string]float64 +} + +type repairTestCase struct { + name string + input repairInput + expected repairExpected +} + type sabakanInput struct { isLeader bool enabled bool @@ -84,6 +101,7 @@ func TestMetricsUpdater(t *testing.T) { t.Run("UpdateRebootQueueEntries", testUpdateRebootQueueEntries) t.Run("UpdateRebootQueueItems", testUpdateRebootQueueItems) t.Run("UpdateNodeRebootStatus", testUpdateNodeRebootStatus) + t.Run("UpdateRepair", testRepair) t.Run("UpdateSabakanIntegration", testUpdateSabakanIntegration) } @@ -528,6 +546,177 @@ func testUpdateNodeRebootStatus(t *testing.T) { } } +func testRepair(t *testing.T) { + cluster := &cke.Cluster{ + Nodes: []*cke.Node{ + { + Address: "1.1.1.1", + Hostname: "node1", + }, + { + Address: "2.2.2.2", + Hostname: "node2", + }, + }, + } + + testCases := []repairTestCase{ + { + name: "disabled", + input: repairInput{ + enabled: false, + entries: nil, + }, + expected: repairExpected{ + enabled: 0, + items: map[string]float64{ + "queued": 0, + "processing": 0, + "succeeded": 0, + "failed": 0, + }, + machineStatus: map[string]map[string]float64{ + "1.1.1.1": { + "queued": 0, + "processing": 0, + "succeeded": 0, + "failed": 0, + }, + "2.2.2.2": { + "queued": 0, + "processing": 0, + "succeeded": 0, + "failed": 0, + }, + }, + }, + }, + { + name: "enabled", + input: repairInput{ + enabled: true, + entries: []*cke.RepairQueueEntry{ + { + Address: "1.1.1.1", + Status: cke.RepairStatusSucceeded, + }, + { + Address: "10.10.10.10", + Status: cke.RepairStatusSucceeded, + }, + { + Address: "10.10.10.11", + Status: cke.RepairStatusProcessing, + }, + }, + }, + expected: repairExpected{ + enabled: 1, + items: map[string]float64{ + "queued": 0, + "processing": 1, + "succeeded": 2, + "failed": 0, + }, + machineStatus: map[string]map[string]float64{ + "1.1.1.1": { + "queued": 0, + "processing": 0, + "succeeded": 1, + "failed": 0, + }, + "2.2.2.2": { + "queued": 0, + "processing": 0, + "succeeded": 0, + "failed": 0, + }, + "10.10.10.10": { + "queued": 0, + "processing": 0, + "succeeded": 1, + "failed": 0, + }, + "10.10.10.11": { + "queued": 0, + "processing": 1, + "succeeded": 0, + "failed": 0, + }, + }, + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + collector, storage := newTestCollector() + storage.setCluster(cluster) + storage.enableRepairQueue(tt.input.enabled) + storage.setRepairsEntries(tt.input.entries) + handler := GetHandler(collector) + + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + handler.ServeHTTP(w, req) + + metricsFamily, err := parseMetrics(w.Result()) + if err != nil { + t.Fatal(err) + } + + metricsEnabledFound := false + metricsItems := make(map[string]float64) + metricsStatus := make(map[string]map[string]float64) + for _, mf := range metricsFamily { + switch *mf.Name { + case "cke_repair_queue_enabled": + for _, m := range mf.Metric { + metricsEnabledFound = true + if *m.Gauge.Value != tt.expected.enabled { + t.Errorf("value for cke_repair_queue_enabled is wrong. expected: %f, actual %f", tt.expected.enabled, *m.Gauge.Value) + } + } + case "cke_repair_queue_items": + for _, m := range mf.Metric { + labels := labelToMap(m.Label) + if len(labels) != 1 { + t.Error("cke_repair_queue_items should have exactly one label", labels) + } + status := labels["status"] + metricsItems[status] = *m.Gauge.Value + } + case "cke_machine_repair_status": + for _, m := range mf.Metric { + labels := labelToMap(m.Label) + if len(labels) != 2 { + t.Error("cke_machine_repair_status should have exactly two labels", labels) + } + address := labels["address"] + status := labels["status"] + if _, ok := metricsStatus[address]; !ok { + metricsStatus[address] = make(map[string]float64) + } + metricsStatus[address][status] = *m.Gauge.Value + } + } + } + if !metricsEnabledFound { + t.Error("metrics cke_repair_queue_enabled was not found") + } + if !cmp.Equal(metricsItems, tt.expected.items) { + t.Errorf("metrics cke_repair_queue_items is wrong. expected: %v, actual: %v", tt.expected.items, metricsItems) + } + if !cmp.Equal(metricsStatus, tt.expected.machineStatus) { + t.Errorf("metrics cke_machine_repair_status is wrong. expected: %v, actual: %v", tt.expected.machineStatus, metricsStatus) + } + }) + } +} + func testUpdateSabakanIntegration(t *testing.T) { testCases := []updateSabakanIntegrationTestCase{ { @@ -691,6 +880,8 @@ type testStorage struct { rebootQueueEnabled bool rebootQueueRunning bool rebootEntries []*cke.RebootQueueEntry + repairQueueEnabled bool + repairEntries []*cke.RepairQueueEntry cluster *cke.Cluster } @@ -726,6 +917,22 @@ func (s *testStorage) GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueue return s.rebootEntries, nil } +func (s *testStorage) enableRepairQueue(flag bool) { + s.repairQueueEnabled = flag +} + +func (s *testStorage) IsRepairQueueDisabled(_ context.Context) (bool, error) { + return !s.repairQueueEnabled, nil +} + +func (s *testStorage) setRepairsEntries(entries []*cke.RepairQueueEntry) { + s.repairEntries = entries +} + +func (s *testStorage) GetRepairsEntries(_ context.Context) ([]*cke.RepairQueueEntry, error) { + return s.repairEntries, nil +} + func (s *testStorage) setCluster(cluster *cke.Cluster) { s.cluster = cluster } diff --git a/mtest/assets_test.go b/mtest/assets_test.go index 23e7d014..0c6a1bcc 100644 --- a/mtest/assets_test.go +++ b/mtest/assets_test.go @@ -20,5 +20,8 @@ var rebootSlowEvictionDeploymentYAML []byte //go:embed reboot-alittleslow-eviction-deployment.yaml var rebootALittleSlowEvictionDeploymentYAML []byte +//go:embed repair-deployment.yaml +var repairDeploymentYAML []byte + //go:embed webhook-resources.yaml var webhookYAML []byte diff --git a/mtest/cke-cluster.yml b/mtest/cke-cluster.yml index dc71ce00..fe56e9d2 100644 --- a/mtest/cke-cluster.yml +++ b/mtest/cke-cluster.yml @@ -18,6 +18,16 @@ reboot: boot_check_command: ["bash", "-c", "echo 'true'"] eviction_timeout_seconds: 30 command_timeout_seconds: 30 +repair: + repair_procedures: + - machine_types: ["type1"] + repair_operations: + - operation: "op1" + repair_steps: + - repair_command: ["sh", "-c", "touch /tmp/mtest-repair-$1", "repair"] + need_drain: true + watch_seconds: 30 + health_check_command: ["sh", "-c", "test -f /tmp/mtest-repair-$1 && echo true", "health_check"] options: kube-api: extra_binds: diff --git a/mtest/repair-deployment.yaml b/mtest/repair-deployment.yaml new file mode 100644 index 00000000..1a8db810 --- /dev/null +++ b/mtest/repair-deployment.yaml @@ -0,0 +1,29 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: repair-test + name: sample +spec: + replicas: 3 + selector: + matchLabels: + app: sample + template: + metadata: + labels: + app: sample + spec: + containers: + - name: httpd + image: ghcr.io/cybozu/testhttpd:0 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + namespace: repair-test + name: sample +spec: + maxUnavailable: 0 + selector: + matchLabels: + app: sample diff --git a/mtest/repair_test.go b/mtest/repair_test.go new file mode 100644 index 00000000..1b6bd32b --- /dev/null +++ b/mtest/repair_test.go @@ -0,0 +1,254 @@ +package mtest + +import ( + "bytes" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/cybozu-go/cke" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func getRepairEntries() ([]*cke.RepairQueueEntry, error) { + var entries []*cke.RepairQueueEntry + data, stderr, err := ckecli("repair-queue", "list") + if err != nil { + return nil, fmt.Errorf("%w, stdout: %s, stderr: %s", err, data, stderr) + } + err = json.Unmarshal(data, &entries) + if err != nil { + return nil, err + } + return entries, nil +} + +func waitRepairCompletion(cluster *cke.Cluster, statuses []cke.RepairStatus) { + ts := time.Now() + EventuallyWithOffset(2, func(g Gomega) { + entries, err := getRepairEntries() + g.Expect(err).NotTo(HaveOccurred()) + for _, entry := range entries { + g.Expect(entry.Status).To(BeElementOf(statuses)) + } + g.Expect(checkCluster(cluster, ts)).NotTo(HaveOccurred()) + }).Should(Succeed()) +} + +func waitRepairSuccess(cluster *cke.Cluster) { + waitRepairCompletion(cluster, []cke.RepairStatus{cke.RepairStatusSucceeded}) +} + +func waitRepairFailure(cluster *cke.Cluster) { + waitRepairCompletion(cluster, []cke.RepairStatus{cke.RepairStatusFailed}) +} + +func waitRepairEmpty(cluster *cke.Cluster) { + waitRepairCompletion(cluster, nil) +} + +func repairShouldNotProceed() { + ConsistentlyWithOffset(1, func(g Gomega) { + entries, err := getRepairEntries() + g.Expect(err).NotTo(HaveOccurred()) + for _, entry := range entries { + g.Expect(entry.Status).NotTo(BeElementOf(cke.RepairStatusSucceeded, cke.RepairStatusFailed)) + } + }).WithTimeout(time.Second * 60).Should(Succeed()) +} + +func testRepairOperations() { + // this will run: + // - RepairDrainStartOp + // - RepairExecuteOp + // - RepairDrainTimeoutOp + // - RepairFinishOp + // - RepairDequeueOp + + // This test examines status gathering and CLI commands as well as operations. + // It is not necessary to test the behaviors examined in "server/strategy_test.go". + + // This test uses "touch" and "test -f" for repair_command and health_check_command. + // "true" and "echo true" are insufficient for repair queue test because + // CKE first checks health and never calls "RepairDrainStartOp" for healthy machines. + It("should execute repair commands", func() { + cluster := getCluster() + for i := 0; i < 3; i++ { + cluster.Nodes[i].ControlPlane = true + } + + currentWriteIndex := 0 + repairQueueAdd := func(address string) { + execSafeAt(host1, "docker", "exec", "cke", "find", "/tmp", "-maxdepth", "1", "-name", "mtest-repair-*", "-delete") + execSafeAt(host2, "docker", "exec", "cke", "find", "/tmp", "-maxdepth", "1", "-name", "mtest-repair-*", "-delete") + _, stderr, err := ckecli("repair-queue", "add", "op1", "type1", address) + ExpectWithOffset(1, err).NotTo(HaveOccurred(), "stderr: %s", stderr) + currentWriteIndex++ + } + + By("disabling repair queue") + ckecliSafe("repair-queue", "disable") + stdout := ckecliSafe("repair-queue", "is-enabled") + Expect(bytes.TrimSpace(stdout)).To(Equal([]byte("false"))) + + repairQueueAdd(node1) + repairShouldNotProceed() + + ckecliSafe("repair-queue", "delete-unfinished") + waitRepairEmpty(cluster) + + By("enabling repair queue") + ckecliSafe("repair-queue", "enable") + stdout = ckecliSafe("repair-queue", "is-enabled") + Expect(bytes.TrimSpace(stdout)).To(Equal([]byte("true"))) + + repairQueueAdd(node1) + waitRepairSuccess(cluster) + nodesShouldBeSchedulable(node1) + + ckecliSafe("repair-queue", "delete-finished") + waitRepairEmpty(cluster) + + By("setting erroneous repair command") + originalRepairCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand + + cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand = []string{"false"} + _, err := ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + + repairQueueAdd(node1) + waitRepairFailure(cluster) + + ckecliSafe("repair-queue", "delete-finished") + waitRepairEmpty(cluster) + + By("setting noop repair command") + cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand = []string{"true"} + _, err = ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + + repairQueueAdd(node1) + waitRepairFailure(cluster) + + ckecliSafe("repair-queue", "delete-finished") + waitRepairEmpty(cluster) + + By("setting noop repair command and long watch duration") + originalWatchSeconds := cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds + + longWatch := 600 + cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds = &longWatch + _, err = ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + + repairQueueAdd(node1) + repairShouldNotProceed() + + ckecliSafe("repair-queue", "delete", strconv.Itoa(currentWriteIndex-1)) + waitRepairEmpty(cluster) + + By("restoring repair command and watch duration") + cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand = originalRepairCommand + cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds = originalWatchSeconds + _, err = ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + + By("deploying drain-blocking workload") + _, stderr, err := kubectl("create", "namespace", "repair-test") + Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr) + _, stderr, err = kubectl("label", "namespace", "repair-test", "protected=true") + Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr) + _, stderr, err = kubectlWithInput(repairDeploymentYAML, "apply", "-f", "-") + Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr) + nodeNames := make([]string, 3) + Eventually(func(g Gomega) { + stdout, stderr, err := kubectl("get", "-n=repair-test", "deployment", "sample", "-o=json") + g.Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr) + var deploy appsv1.Deployment + err = json.Unmarshal(stdout, &deploy) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(deploy.Status.ReadyReplicas).To(Equal(int32(3))) + + stdout, stderr, err = kubectl("get", "-n=repair-test", "pod", "-l=app=sample", "-o=json") + g.Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr) + var pods corev1.PodList + err = json.Unmarshal(stdout, &pods) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(pods.Items).To(HaveLen(3)) + for i, pod := range pods.Items { + nodeNames[i] = pod.Spec.NodeName + g.Expect(nodeNames[i]).NotTo(BeEmpty()) + } + }).Should(Succeed()) + + repairQueueAdd(nodeNames[0]) + repairShouldNotProceed() + + entries, err := getRepairEntries() + Expect(err).NotTo(HaveOccurred()) + Expect(entries).To(HaveLen(1)) + Expect(entries[0].Status).To(Equal(cke.RepairStatusProcessing)) + Expect(entries[0].StepStatus).To(Equal(cke.RepairStepStatusWaiting)) + Expect(entries[0].DrainBackOffExpire).NotTo(Equal(time.Time{})) + Expect(entries[0].DrainBackOffCount).NotTo(BeZero()) + + ckecliSafe("repair-queue", "reset-backoff") + entries, err = getRepairEntries() + Expect(err).NotTo(HaveOccurred()) + Expect(entries).To(HaveLen(1)) + Expect(entries[0].DrainBackOffExpire).To(Equal(time.Time{})) + Expect(entries[0].DrainBackOffCount).To(BeZero()) + + ckecliSafe("repair-queue", "delete-unfinished") + waitRepairEmpty(cluster) + + By("setting protected_namespace to include workload") + cluster.Repair.ProtectedNamespaces = &metav1.LabelSelector{ + MatchLabels: map[string]string{"protected": "true"}, + } + _, err = ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + + repairQueueAdd(nodeNames[0]) + repairShouldNotProceed() + + ckecliSafe("repair-queue", "delete-unfinished") + waitRepairEmpty(cluster) + + By("setting protected_namespace not to include workload") + cluster.Repair.ProtectedNamespaces = &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + } + _, err = ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + + repairQueueAdd(nodeNames[0]) + waitRepairSuccess(cluster) + nodesShouldBeSchedulable(nodeNames[0]) + + ckecliSafe("repair-queue", "delete-finished") + waitRepairEmpty(cluster) + + By("restoring protected_namespace and disabling need_drain") + cluster.Repair.ProtectedNamespaces = nil + cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = false + _, err = ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + + repairQueueAdd(nodeNames[1]) + waitRepairSuccess(cluster) + nodesShouldBeSchedulable(nodeNames[1]) + }) +} diff --git a/mtest/suite_test.go b/mtest/suite_test.go index 7f70a801..786ce348 100644 --- a/mtest/suite_test.go +++ b/mtest/suite_test.go @@ -153,6 +153,10 @@ var _ = Describe("Test CKE", func() { Context("reboot", func() { testRebootOperations() }) + case "repair": + Context("repair", func() { + testRepairOperations() + }) case "upgrade": Context("upgrade", Ordered, func() { testUpgrade() diff --git a/op/repair_dequeue.go b/op/repair_dequeue.go new file mode 100644 index 00000000..52a6c065 --- /dev/null +++ b/op/repair_dequeue.go @@ -0,0 +1,53 @@ +package op + +import ( + "context" + + "github.com/cybozu-go/cke" +) + +type repairDequeueOp struct { + finished bool + + entry *cke.RepairQueueEntry +} + +func RepairDequeueOp(entry *cke.RepairQueueEntry) cke.Operator { + return &repairDequeueOp{ + entry: entry, + } +} + +func (o *repairDequeueOp) Name() string { + return "repair-dequeue" +} + +func (o *repairDequeueOp) NextCommand() cke.Commander { + if o.finished { + return nil + } + + o.finished = true + return repairDequeueCommand{ + entry: o.entry, + } +} + +func (o *repairDequeueOp) Targets() []string { + return []string{o.entry.Address} +} + +type repairDequeueCommand struct { + entry *cke.RepairQueueEntry +} + +func (c repairDequeueCommand) Run(ctx context.Context, inf cke.Infrastructure, leaderKey string) error { + return inf.Storage().DeleteRepairsEntry(ctx, leaderKey, c.entry.Index) +} + +func (c repairDequeueCommand) Command() cke.Command { + return cke.Command{ + Name: "repairDequeueCommand", + Target: c.entry.Address, + } +} diff --git a/op/repair_drain_start.go b/op/repair_drain_start.go new file mode 100644 index 00000000..b537146b --- /dev/null +++ b/op/repair_drain_start.go @@ -0,0 +1,125 @@ +package op + +import ( + "context" + "fmt" + "time" + + "github.com/cybozu-go/cke" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +type repairDrainStartOp struct { + finished bool + + entry *cke.RepairQueueEntry + config *cke.Repair + apiserver *cke.Node +} + +func RepairDrainStartOp(apiserver *cke.Node, entry *cke.RepairQueueEntry, config *cke.Repair) cke.Operator { + return &repairDrainStartOp{ + entry: entry, + config: config, + apiserver: apiserver, + } +} + +func (o *repairDrainStartOp) Name() string { + return "repair-drain-start" +} + +func (o *repairDrainStartOp) NextCommand() cke.Commander { + if o.finished { + return nil + } + o.finished = true + + attempts := 1 + if o.config.EvictRetries != nil { + attempts = *o.config.EvictRetries + 1 + } + interval := 0 * time.Second + if o.config.EvictInterval != nil { + interval = time.Second * time.Duration(*o.config.EvictInterval) + } + + return repairDrainStartCommand{ + entry: o.entry, + protectedNamespaces: o.config.ProtectedNamespaces, + apiserver: o.apiserver, + evictAttempts: attempts, + evictInterval: interval, + } +} + +func (o *repairDrainStartOp) Targets() []string { + return []string{o.entry.Address} +} + +type repairDrainStartCommand struct { + entry *cke.RepairQueueEntry + protectedNamespaces *metav1.LabelSelector + apiserver *cke.Node + evictAttempts int + evictInterval time.Duration +} + +func (c repairDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { + cs, err := inf.K8sClient(ctx, c.apiserver) + if err != nil { + return err + } + nodesAPI := cs.CoreV1().Nodes() + + protected, err := listProtectedNamespaces(ctx, cs, c.protectedNamespaces) + if err != nil { + return err + } + + err = func() error { + c.entry.Status = cke.RepairStatusProcessing + c.entry.StepStatus = cke.RepairStepStatusDraining + c.entry.LastTransitionTime = time.Now().Truncate(time.Second).UTC() + err := inf.Storage().UpdateRepairsEntry(ctx, c.entry) + if err != nil { + return err + } + + err = checkJobPodNotExist(ctx, cs, c.entry.Nodename) + if err != nil { + return err + } + + // Note: The annotation name is shared with reboot operations. + _, err = nodesAPI.Patch(ctx, c.entry.Nodename, types.StrategicMergePatchType, []byte(` +{ + "metadata":{"annotations":{"`+CKEAnnotationReboot+`": "true"}}, + "spec":{"unschedulable": true} +} +`), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("failed to cordon node %s: %v", c.entry.Address, err) + } + + return nil + }() + if err != nil { + return repairDrainBackOff(ctx, inf, c.entry, err) + } + + err = evictOrDeleteNodePod(ctx, cs, c.entry.Nodename, protected, c.evictAttempts, c.evictInterval) + if err != nil { + return repairDrainBackOff(ctx, inf, c.entry, err) + } + + return nil +} + +func (c repairDrainStartCommand) Command() cke.Command { + return cke.Command{ + Name: "repairDrainStartCommand", + Target: c.entry.Address, + } +} diff --git a/op/repair_drain_timeout.go b/op/repair_drain_timeout.go new file mode 100644 index 00000000..4bf6a45f --- /dev/null +++ b/op/repair_drain_timeout.go @@ -0,0 +1,70 @@ +package op + +import ( + "context" + "fmt" + "math/rand" + "time" + + "github.com/cybozu-go/cke" + "github.com/cybozu-go/log" +) + +type repairDrainTimeoutOp struct { + finished bool + + entry *cke.RepairQueueEntry +} + +func RepairDrainTimeoutOp(entry *cke.RepairQueueEntry) cke.Operator { + return &repairDrainTimeoutOp{ + entry: entry, + } +} + +func (o *repairDrainTimeoutOp) Name() string { + return "repair-drain-timeout" +} + +func (o *repairDrainTimeoutOp) NextCommand() cke.Commander { + if o.finished { + return nil + } + o.finished = true + + return repairDrainTimeoutCommand{ + entry: o.entry, + } +} + +func (o *repairDrainTimeoutOp) Targets() []string { + return []string{o.entry.Address} +} + +type repairDrainTimeoutCommand struct { + entry *cke.RepairQueueEntry +} + +func (c repairDrainTimeoutCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { + return repairDrainBackOff(ctx, inf, c.entry, fmt.Errorf("drain timed out: %s", c.entry.Address)) +} + +func (c repairDrainTimeoutCommand) Command() cke.Command { + return cke.Command{ + Name: "repairDrainTimeoutCommand", + Target: c.entry.Address, + } +} + +func repairDrainBackOff(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, err error) error { + log.Warn("failed to drain node for repair", map[string]interface{}{ + "address": entry.Address, + log.FnError: err, + }) + entry.Status = cke.RepairStatusProcessing + entry.StepStatus = cke.RepairStepStatusWaiting + entry.LastTransitionTime = time.Now().Truncate(time.Second).UTC() + entry.DrainBackOffCount++ + entry.DrainBackOffExpire = entry.LastTransitionTime.Add(time.Second * time.Duration(drainBackOffBaseSeconds+rand.Int63n(int64(drainBackOffBaseSeconds*entry.DrainBackOffCount)))) + return inf.Storage().UpdateRepairsEntry(ctx, entry) +} diff --git a/op/repair_execute.go b/op/repair_execute.go new file mode 100644 index 00000000..d855551a --- /dev/null +++ b/op/repair_execute.go @@ -0,0 +1,121 @@ +package op + +import ( + "context" + "strings" + "time" + + "github.com/cybozu-go/cke" + "github.com/cybozu-go/log" + "github.com/cybozu-go/well" +) + +type repairExecuteOp struct { + finished bool + + entry *cke.RepairQueueEntry + step *cke.RepairStep +} + +func RepairExecuteOp(entry *cke.RepairQueueEntry, step *cke.RepairStep) cke.Operator { + return &repairExecuteOp{ + entry: entry, + step: step, + } +} + +func (o *repairExecuteOp) Name() string { + return "repair-execute" +} + +func (o *repairExecuteOp) NextCommand() cke.Commander { + if o.finished { + return nil + } + o.finished = true + + return repairExecuteCommand{ + entry: o.entry, + command: o.step.RepairCommand, + timeoutSeconds: o.step.CommandTimeoutSeconds, + retries: o.step.CommandRetries, + interval: o.step.CommandInterval, + } +} + +func (o *repairExecuteOp) Targets() []string { + return []string{o.entry.Address} +} + +type repairExecuteCommand struct { + entry *cke.RepairQueueEntry + command []string + timeoutSeconds *int + retries *int + interval *int +} + +func (c repairExecuteCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { + c.entry.Status = cke.RepairStatusProcessing + c.entry.StepStatus = cke.RepairStepStatusWatching + c.entry.LastTransitionTime = time.Now().Truncate(time.Second).UTC() + err := inf.Storage().UpdateRepairsEntry(ctx, c.entry) + if err != nil { + return err + } + + attempts := 1 + if c.retries != nil { + attempts = *c.retries + 1 + } +RETRY: + for i := 0; i < attempts; i++ { + err := func() error { + ctx := ctx + timeout := cke.DefaultRepairCommandTimeoutSeconds + if c.timeoutSeconds != nil { + timeout = *c.timeoutSeconds + } + if timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + } + + args := append(c.command[1:], c.entry.Address) + command := well.CommandContext(ctx, c.command[0], args...) + return command.Run() + }() + if err == nil { + return nil + } + + log.Warn("failed on executing repair command", map[string]interface{}{ + log.FnError: err, + "address": c.entry.Address, + "command": strings.Join(c.command, " "), + "attempts": i, + }) + if c.interval != nil && *c.interval != 0 { + select { + case <-time.After(time.Second * time.Duration(*c.interval)): + case <-ctx.Done(): + break RETRY + } + } + } + + // The failure of a repair command should not be considered as a serious error of CKE. + log.Warn("given up repairing machine", map[string]interface{}{ + "address": c.entry.Address, + "command": strings.Join(c.command, " "), + }) + return repairFinish(ctx, inf, c.entry, false) +} + +func (c repairExecuteCommand) Command() cke.Command { + return cke.Command{ + Name: "repairExecuteCommand", + Target: c.entry.Address, + } +} diff --git a/op/repair_finish.go b/op/repair_finish.go new file mode 100644 index 00000000..fe889dbc --- /dev/null +++ b/op/repair_finish.go @@ -0,0 +1,68 @@ +package op + +import ( + "context" + "time" + + "github.com/cybozu-go/cke" +) + +type repairFinishOp struct { + finished bool + + entry *cke.RepairQueueEntry + succeeded bool +} + +func RepairFinishOp(entry *cke.RepairQueueEntry, succeeded bool) cke.Operator { + return &repairFinishOp{ + entry: entry, + succeeded: succeeded, + } +} + +func (o *repairFinishOp) Name() string { + return "repair-finish" +} + +func (o *repairFinishOp) NextCommand() cke.Commander { + if o.finished { + return nil + } + o.finished = true + + return repairFinishCommand{ + entry: o.entry, + succeeded: o.succeeded, + } +} + +func (o *repairFinishOp) Targets() []string { + return []string{o.entry.Address} +} + +type repairFinishCommand struct { + entry *cke.RepairQueueEntry + succeeded bool +} + +func (c repairFinishCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { + return repairFinish(ctx, inf, c.entry, c.succeeded) +} + +func (c repairFinishCommand) Command() cke.Command { + return cke.Command{ + Name: "repairFinishCommand", + Target: c.entry.Address, + } +} + +func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool) error { + if succeeded { + entry.Status = cke.RepairStatusSucceeded + } else { + entry.Status = cke.RepairStatusFailed + } + entry.LastTransitionTime = time.Now().Truncate(time.Second).UTC() + return inf.Storage().UpdateRepairsEntry(ctx, entry) +} diff --git a/op/status.go b/op/status.go index 81d27f89..30db44ec 100644 --- a/op/status.go +++ b/op/status.go @@ -10,10 +10,12 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/cybozu-go/cke" "github.com/cybozu-go/cke/static" "github.com/cybozu-go/log" + "github.com/cybozu-go/well" "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" k8serr "k8s.io/apimachinery/pkg/api/errors" @@ -563,3 +565,102 @@ func containCommandOption(slice []string, optionName string) bool { } return false } + +func GetRepairQueueStatus(ctx context.Context, inf cke.Infrastructure, n *cke.Node, cluster *cke.Cluster) (cke.RepairQueueStatus, error) { + rqs := cke.RepairQueueStatus{} + + clientset, err := inf.K8sClient(ctx, n) + if err != nil { + return cke.RepairQueueStatus{}, err + } + + disabled, err := inf.Storage().IsRepairQueueDisabled(ctx) + if err != nil { + return cke.RepairQueueStatus{}, err + } + rqs.Enabled = !disabled + + entries, err := inf.Storage().GetRepairsEntries(ctx) + if err != nil { + return cke.RepairQueueStatus{}, err + } + rqs.Entries = entries + + for _, entry := range entries { + // Update Nodename every time. + // Though the nodename of a machine in a Kubernetes cluster will not change, + // the machine can join/leave the cluster dynamically. + entry.FillNodename(cluster) + } + + rqs.RepairCompleted = make(map[string]bool) + for _, entry := range entries { + if entry.HasFinished() { + // not "!(RepairStatusProcessing && repairStepStatusWatching)" + // Though the repair completion will happen a little later after the execution of + // the repair command in most cases, it is useful to check the health for all + // unfinished entries. + continue + } + healthy, err := isRepairTargetHealthy(ctx, entry, cluster) + if err != nil { + log.Warn("health check failed", map[string]interface{}{ + log.FnError: err, + "index": entry.Index, + "address": entry.Address, + }) + continue + } + if healthy { + rqs.RepairCompleted[entry.Address] = true + } + } + + rqs.DrainCompleted = make(map[string]bool) + for _, entry := range entries { + if !(entry.Status == cke.RepairStatusProcessing && entry.StepStatus == cke.RepairStepStatusDraining) { + continue + } + if !entry.IsInCluster() { + // The target machine has been removed from the Kubernetes cluster while being drained. + // The drain operation should be treated as succeeded. + // Unlike the reboot queue, the repair queue continues to manage the out-of-cluster machine. + rqs.DrainCompleted[entry.Address] = true + continue + } + err := checkPodDeletion(ctx, clientset, entry.Nodename) + if err == nil { + rqs.DrainCompleted[entry.Address] = true + } + } + + return rqs, nil +} + +func isRepairTargetHealthy(ctx context.Context, entry *cke.RepairQueueEntry, cluster *cke.Cluster) (bool, error) { + op, err := entry.GetMatchingRepairOperation(cluster) + if err != nil { + return false, err + } + if len(op.HealthCheckCommand) == 0 { + return false, errors.New("health check command not defined") + } + + timeout := cke.DefaultRepairHealthCheckCommandTimeoutSeconds + if op.CommandTimeoutSeconds != nil { + timeout = *op.CommandTimeoutSeconds + } + if timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + } + + args := append(op.HealthCheckCommand[1:], entry.Address) + command := well.CommandContext(ctx, op.HealthCheckCommand[0], args...) + stdout, err := command.Output() + if err != nil { + return false, err + } + return strings.TrimSpace(string(stdout)) == "true", nil +} diff --git a/phase.go b/phase.go index fa73fa6e..db1b248e 100644 --- a/phase.go +++ b/phase.go @@ -18,6 +18,7 @@ const ( PhaseEtcdMaintain = OperationPhase("etcd-maintain") PhaseK8sMaintain = OperationPhase("k8s-maintain") PhaseStopCP = OperationPhase("stop-control-plane") + PhaseRepairMachines = OperationPhase("repair-machines") PhaseUncordonNodes = OperationPhase("uncordon-nodes") PhaseRebootNodes = OperationPhase("reboot-nodes") PhaseCompleted = OperationPhase("completed") @@ -36,6 +37,7 @@ var AllOperationPhases = []OperationPhase{ PhaseEtcdMaintain, PhaseK8sMaintain, PhaseStopCP, + PhaseRepairMachines, PhaseUncordonNodes, PhaseRebootNodes, PhaseCompleted, diff --git a/pkg/ckecli/cmd/repair_queue.go b/pkg/ckecli/cmd/repair_queue.go new file mode 100644 index 00000000..12410598 --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue.go @@ -0,0 +1,15 @@ +package cmd + +import ( + "github.com/spf13/cobra" +) + +var repairQueueCmd = &cobra.Command{ + Use: "repair-queue", + Short: "repair-queue subcommand", + Long: "repair-queue subcommand", +} + +func init() { + rootCmd.AddCommand(repairQueueCmd) +} diff --git a/pkg/ckecli/cmd/repair_queue_add.go b/pkg/ckecli/cmd/repair_queue_add.go new file mode 100644 index 00000000..49ff3405 --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue_add.go @@ -0,0 +1,43 @@ +package cmd + +import ( + "context" + + "github.com/cybozu-go/cke" + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var repairQueueAddCmd = &cobra.Command{ + Use: "add OPERATION MACHINE_TYPE ADDRESS", + Short: "append a repair request to the repair queue", + Long: `Append a repair request to the repair queue. + +The repair target is a machine with an IP address ADDRESS and a machine type MACHINE_TYPE. +The machine should be processed with an operation OPERATION.`, + Args: cobra.ExactArgs(3), + RunE: func(cmd *cobra.Command, args []string) error { + operation := args[0] + machineType := args[1] + address := args[2] + + well.Go(func(ctx context.Context) error { + entry := cke.NewRepairQueueEntry(operation, machineType, address) + cluster, err := storage.GetCluster(ctx) + if err != nil { + return err + } + if _, err := entry.GetMatchingRepairOperation(cluster); err != nil { + return err + } + + return storage.RegisterRepairsEntry(ctx, entry) + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + repairQueueCmd.AddCommand(repairQueueAddCmd) +} diff --git a/pkg/ckecli/cmd/repair_queue_delete.go b/pkg/ckecli/cmd/repair_queue_delete.go new file mode 100644 index 00000000..955ea31b --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue_delete.go @@ -0,0 +1,42 @@ +package cmd + +import ( + "context" + "strconv" + + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var repairQueueDeleteCmd = &cobra.Command{ + Use: "delete INDEX", + Short: "delete a repair queue entry", + Long: `Delete the specified repair queue entry.`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + index, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + return err + } + + well.Go(func(ctx context.Context) error { + entry, err := storage.GetRepairsEntry(ctx, index) + if err != nil { + return err + } + + if entry.Deleted { + return nil + } + + entry.Deleted = true + return storage.UpdateRepairsEntry(ctx, entry) + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + repairQueueCmd.AddCommand(repairQueueDeleteCmd) +} diff --git a/pkg/ckecli/cmd/repair_queue_delete_finished.go b/pkg/ckecli/cmd/repair_queue_delete_finished.go new file mode 100644 index 00000000..8c00f1d7 --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue_delete_finished.go @@ -0,0 +1,54 @@ +package cmd + +import ( + "context" + "fmt" + + "github.com/cybozu-go/cke" + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var repairQueueDeleteFinishedCmd = &cobra.Command{ + Use: "delete-finished", + Short: "delete all finished repair queue entries", + Long: `Delete all finished repair queue entries. + +Entries in "succeeded" or "failed" status are deleted. +This displays the index numbers of deleted entries, one per line.`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + entries, err := storage.GetRepairsEntries(ctx) + if err != nil { + return err + } + + for _, entry := range entries { + if entry.Deleted || !entry.HasFinished() { + continue + } + + entry.Deleted = true + err := storage.UpdateRepairsEntry(ctx, entry) + if err == cke.ErrNotFound { + // The entry has just been dequeued. + continue + } + if err != nil { + return err + } + + fmt.Println(entry.Index) + } + + return nil + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + repairQueueCmd.AddCommand(repairQueueDeleteFinishedCmd) +} diff --git a/pkg/ckecli/cmd/repair_queue_delete_unfinished.go b/pkg/ckecli/cmd/repair_queue_delete_unfinished.go new file mode 100644 index 00000000..1b7e9ff8 --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue_delete_unfinished.go @@ -0,0 +1,54 @@ +package cmd + +import ( + "context" + "fmt" + + "github.com/cybozu-go/cke" + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var repairQueueDeleteUnfinishedCmd = &cobra.Command{ + Use: "delete-unfinished", + Short: "delete all unfinished repair queue entries", + Long: `Delete all unfinished repair queue entries. + +Entries not in "succeeded" or "failed" status are deleted. +This displays the index numbers of deleted entries, one per line.`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + entries, err := storage.GetRepairsEntries(ctx) + if err != nil { + return err + } + + for _, entry := range entries { + if entry.Deleted || entry.HasFinished() { + continue + } + + entry.Deleted = true + err := storage.UpdateRepairsEntry(ctx, entry) + if err == cke.ErrNotFound { + // The entry has just been dequeued. + continue + } + if err != nil { + return err + } + + fmt.Println(entry.Index) + } + + return nil + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + repairQueueCmd.AddCommand(repairQueueDeleteUnfinishedCmd) +} diff --git a/pkg/ckecli/cmd/repair_queue_disable.go b/pkg/ckecli/cmd/repair_queue_disable.go new file mode 100644 index 00000000..4756084b --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue_disable.go @@ -0,0 +1,26 @@ +package cmd + +import ( + "context" + + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var repairQueueDisableCmd = &cobra.Command{ + Use: "disable", + Short: "disable repair queue processing", + Long: `Disable repair queue processing.`, + + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + return storage.EnableRepairQueue(ctx, false) + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + repairQueueCmd.AddCommand(repairQueueDisableCmd) +} diff --git a/pkg/ckecli/cmd/repair_queue_enable.go b/pkg/ckecli/cmd/repair_queue_enable.go new file mode 100644 index 00000000..77b48351 --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue_enable.go @@ -0,0 +1,26 @@ +package cmd + +import ( + "context" + + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var repairQueueEnableCmd = &cobra.Command{ + Use: "enable", + Short: "enable repair queue processing", + Long: `Enable repair queue processing.`, + + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + return storage.EnableRepairQueue(ctx, true) + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + repairQueueCmd.AddCommand(repairQueueEnableCmd) +} diff --git a/pkg/ckecli/cmd/repair_queue_is_enabled.go b/pkg/ckecli/cmd/repair_queue_is_enabled.go new file mode 100644 index 00000000..c8b4b903 --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue_is_enabled.go @@ -0,0 +1,32 @@ +package cmd + +import ( + "context" + "fmt" + + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var repairQueueIsEnabledCmd = &cobra.Command{ + Use: "is-enabled", + Short: "show repair queue status", + Long: `Show whether the processing of the repair queue is enabled or not. "true" if enabled.`, + + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + disabled, err := storage.IsRepairQueueDisabled(ctx) + if err != nil { + return err + } + fmt.Println(!disabled) + return nil + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + repairQueueCmd.AddCommand(repairQueueIsEnabledCmd) +} diff --git a/pkg/ckecli/cmd/repair_queue_list.go b/pkg/ckecli/cmd/repair_queue_list.go new file mode 100644 index 00000000..d1c9e6d7 --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue_list.go @@ -0,0 +1,37 @@ +package cmd + +import ( + "context" + "encoding/json" + "os" + + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var repairQueueListCmd = &cobra.Command{ + Use: "list", + Short: "list the entries in the repair queue", + Long: `List the entries in the repair queue. + +The output is a list of RepairQueueEntry formatted in JSON.`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + entries, err := storage.GetRepairsEntries(ctx) + if err != nil { + return err + } + + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + return enc.Encode(entries) + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + repairQueueCmd.AddCommand(repairQueueListCmd) +} diff --git a/pkg/ckecli/cmd/repair_queue_reset_backoff.go b/pkg/ckecli/cmd/repair_queue_reset_backoff.go new file mode 100644 index 00000000..6576a035 --- /dev/null +++ b/pkg/ckecli/cmd/repair_queue_reset_backoff.go @@ -0,0 +1,43 @@ +package cmd + +import ( + "context" + "time" + + "github.com/cybozu-go/cke" + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var repairQueueResetBackoffCmd = &cobra.Command{ + Use: "reset-backoff", + Short: "Reset drain backoff of the entries in repair queue", + Long: `Reset drain_backoff_count and drain_backoff_expire of the entries in repair queue`, + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + entries, err := storage.GetRepairsEntries(ctx) + if err != nil { + return err + } + for _, entry := range entries { + entry.DrainBackOffCount = 0 + entry.DrainBackOffExpire = time.Time{} + err := storage.UpdateRepairsEntry(ctx, entry) + if err == cke.ErrNotFound { + // The entry has just been dequeued. + continue + } + if err != nil { + return err + } + } + return nil + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + repairQueueCmd.AddCommand(repairQueueResetBackoffCmd) +} diff --git a/repair.go b/repair.go new file mode 100644 index 00000000..8fa32490 --- /dev/null +++ b/repair.go @@ -0,0 +1,154 @@ +package cke + +import ( + "errors" + "slices" + "time" +) + +// RepairStatus is the status of a repair operation +type RepairStatus string + +const ( + RepairStatusQueued = RepairStatus("queued") + RepairStatusProcessing = RepairStatus("processing") + RepairStatusSucceeded = RepairStatus("succeeded") + RepairStatusFailed = RepairStatus("failed") +) + +var repairStatuses = []RepairStatus{RepairStatusQueued, RepairStatusProcessing, RepairStatusSucceeded, RepairStatusFailed} + +// RepairStepStatus is the status of the current step in a repair operation +type RepairStepStatus string + +const ( + RepairStepStatusWaiting = RepairStepStatus("waiting") + RepairStepStatusDraining = RepairStepStatus("draining") + RepairStepStatusWatching = RepairStepStatus("watching") +) + +// RepairQueueEntry represents a queue entry of a repair operation +type RepairQueueEntry struct { + Index int64 `json:"index,string"` + Address string `json:"address"` + Nodename string `json:"nodename"` + MachineType string `json:"machine_type"` + Operation string `json:"operation"` + Status RepairStatus `json:"status"` + Step int `json:"step"` + StepStatus RepairStepStatus `json:"step_status"` + Deleted bool `json:"deleted"` + LastTransitionTime time.Time `json:"last_transition_time,omitempty"` + DrainBackOffCount int `json:"drain_backoff_count,omitempty"` + DrainBackOffExpire time.Time `json:"drain_backoff_expire,omitempty"` +} + +var ( + ErrRepairProcedureNotFound = errors.New("repair procedure not found for repair queue entry") + ErrRepairOperationNotFound = errors.New("repair operation not found for repair queue entry") + ErrRepairStepOutOfRange = errors.New("repair step of repair queue entry is out of range") +) + +func NewRepairQueueEntry(operation, machineType, address string) *RepairQueueEntry { + return &RepairQueueEntry{ + Operation: operation, + MachineType: machineType, + Address: address, + Status: RepairStatusQueued, + StepStatus: RepairStepStatusWaiting, + } +} + +func (entry *RepairQueueEntry) FillNodename(cluster *Cluster) { + for _, node := range cluster.Nodes { + if node.Address == entry.Address { + entry.Nodename = node.Nodename() + return + } + } + entry.Nodename = "" +} + +func (entry *RepairQueueEntry) IsInCluster() bool { + return entry.Nodename != "" +} + +func (entry *RepairQueueEntry) HasFinished() bool { + return entry.Status == RepairStatusSucceeded || entry.Status == RepairStatusFailed +} + +func (entry *RepairQueueEntry) getMatchingRepairProcedure(cluster *Cluster) (*RepairProcedure, error) { + for i, proc := range cluster.Repair.RepairProcedures { + if slices.Contains(proc.MachineTypes, entry.MachineType) { + return &cluster.Repair.RepairProcedures[i], nil + } + } + return nil, ErrRepairProcedureNotFound +} + +func (entry *RepairQueueEntry) GetMatchingRepairOperation(cluster *Cluster) (*RepairOperation, error) { + proc, err := entry.getMatchingRepairProcedure(cluster) + if err != nil { + return nil, err + } + for i, op := range proc.RepairOperations { + if op.Operation == entry.Operation { + return &proc.RepairOperations[i], nil + } + } + return nil, ErrRepairOperationNotFound +} + +func (entry *RepairQueueEntry) GetCurrentRepairStep(cluster *Cluster) (*RepairStep, error) { + op, err := entry.GetMatchingRepairOperation(cluster) + if err != nil { + return nil, err + } + if entry.Step >= len(op.RepairSteps) { + return nil, ErrRepairStepOutOfRange + } + return &op.RepairSteps[entry.Step], nil +} + +func CountRepairQueueEntries(entries []*RepairQueueEntry) map[string]int { + ret := make(map[string]int) + for _, status := range repairStatuses { + // initialize explicitly to provide list of possible statuses + ret[string(status)] = 0 + } + + for _, entry := range entries { + ret[string(entry.Status)]++ + } + + return ret +} + +func BuildMachineRepairStatus(nodes []*Node, entries []*RepairQueueEntry) map[string]map[string]bool { + ret := make(map[string]map[string]bool) + + // (keys of ret) == union of (addresses of nodes) and (addresses of entries) + for _, node := range nodes { + ret[node.Address] = make(map[string]bool) + } + for _, entry := range entries { + if _, ok := ret[entry.Address]; ok { + continue + } + ret[entry.Address] = make(map[string]bool) + } + + for address := range ret { + for _, status := range repairStatuses { + // initialize explicitly to provide list of possible statuses + ret[address][string(status)] = false + } + } + + for _, entry := range entries { + ret[entry.Address][string(entry.Status)] = true + } + + return ret + +} diff --git a/repair_test.go b/repair_test.go new file mode 100644 index 00000000..650cee85 --- /dev/null +++ b/repair_test.go @@ -0,0 +1,214 @@ +package cke + +import ( + "slices" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestRepairQueueEntry(t *testing.T) { + cluster := &Cluster{ + Nodes: []*Node{ + { + Address: "1.1.1.1", + Hostname: "node1", + }, + }, + Repair: Repair{ + RepairProcedures: []RepairProcedure{ + { + MachineTypes: []string{"type1", "type2"}, + RepairOperations: []RepairOperation{ + { + Operation: "unreachable", + RepairSteps: []RepairStep{ + {RepairCommand: []string{"type12", "unreachable0"}}, + {RepairCommand: []string{"type12", "unreachable1"}}, + }, + HealthCheckCommand: []string{"check12-unreachable"}, + }, + { + Operation: "unhealthy", + RepairSteps: []RepairStep{ + {RepairCommand: []string{"type12", "unhealthy0"}}, + }, + HealthCheckCommand: []string{"check12-unhealthy"}, + }, + }, + }, + { + MachineTypes: []string{"type3"}, + RepairOperations: []RepairOperation{ + { + RepairSteps: []RepairStep{ + {RepairCommand: []string{"type3", "unreachable0"}}, + }, + HealthCheckCommand: []string{"check3"}, + }, + }, + }, + }, + }, + } + + // for in-cluster machine + entry := NewRepairQueueEntry("unreachable", "type2", "1.1.1.1") + entry.FillNodename(cluster) + if entry.Nodename != "node1" { + t.Error("FillNodename() failed to fill Nodename:", entry.Nodename) + } + if !entry.IsInCluster() { + t.Error("IsInCluster() returned false incorrectly") + } + + // for out-of-cluster machine + // GetCorrespondingNode should fail for bad address + entry = NewRepairQueueEntry("unreachable", "type2", "2.2.2.2") + entry.FillNodename(cluster) + if entry.Nodename != "" { + t.Error("FillNodename() filled wrong Nodename:", entry.Nodename) + } + if entry.IsInCluster() { + t.Error("IsInCluster() returned true incorrectly") + } + + // HaveFinished should return true iff entry has succeeded or failed + entry = NewRepairQueueEntry("unreachable", "type2", "1.1.1.1") + for _, testCase := range []struct { + status RepairStatus + finished bool + }{ + {RepairStatusQueued, false}, + {RepairStatusProcessing, false}, + {RepairStatusSucceeded, true}, + {RepairStatusFailed, true}, + } { + entry.Status = testCase.status + if entry.HasFinished() != testCase.finished { + t.Errorf("HaveFinished() returned %v incorrectly for %q", testCase.finished, testCase.status) + } + } + + // GetMatchingRepairOperation should succeed + entry = NewRepairQueueEntry("unreachable", "type2", "1.1.1.1") + op, err := entry.GetMatchingRepairOperation(cluster) + if err != nil { + t.Fatal("GetMatchingRepairOperation() failed:", err) + } + if !slices.Equal(op.HealthCheckCommand, []string{"check12-unreachable"}) { + t.Error("GetMatchingRepairOperation() returned wrong repair operation:", op) + } + + // GetMatchingRepairOperation should fail for bad machine type + entry = NewRepairQueueEntry("unreachable", "type4", "1.1.1.1") + _, err = entry.GetMatchingRepairOperation(cluster) + if err != ErrRepairProcedureNotFound { + t.Error("GetMatchingRepairOperation() returned wrong error:", err) + } + + // GetMatchingRepairOperation should fail for bad operation + entry = NewRepairQueueEntry("noop", "type2", "1.1.1.1") + _, err = entry.GetMatchingRepairOperation(cluster) + if err != ErrRepairOperationNotFound { + t.Error("GetMatchingRepairOperation() returned wrong error:", err) + } + + // GetCurrentRepairStep should succeed + entry = NewRepairQueueEntry("unreachable", "type2", "1.1.1.1") + entry.Status = RepairStatusProcessing + entry.Step = 1 + entry.StepStatus = RepairStepStatusWatching + step, err := entry.GetCurrentRepairStep(cluster) + if err != nil { + t.Fatal("GetCurrentRepairStep() failed:", err) + } + if !slices.Equal(step.RepairCommand, []string{"type12", "unreachable1"}) { + t.Error("GetCurrentRepairStep() returned wrong repair step:", step) + } + + // GetCurrentRepairStep should fail for end of steps + entry.Step++ + _, err = entry.GetCurrentRepairStep(cluster) + if err != ErrRepairStepOutOfRange { + t.Error("GetCurrentRepairStep() returned wrong error:", err) + } + + // GetCurrentRepairStep should fail for bad machine type + entry = NewRepairQueueEntry("unreachable", "type4", "1.1.1.1") + entry.Status = RepairStatusProcessing + entry.Step = 1 + entry.StepStatus = RepairStepStatusWatching + _, err = entry.GetCurrentRepairStep(cluster) + if err != ErrRepairProcedureNotFound { + t.Error("GetCurrentRepairStep() returned wrong error:", err) + } + + // GetCurrentRepairStep should fail for bad operation + entry = NewRepairQueueEntry("noop", "type2", "1.1.1.1") + entry.Status = RepairStatusProcessing + entry.Step = 1 + entry.StepStatus = RepairStepStatusWatching + _, err = entry.GetCurrentRepairStep(cluster) + if err != ErrRepairOperationNotFound { + t.Error("GetCurrentRepairStep() returned wrong error:", err) + } +} + +func TestCountRepairQueueEntries(t *testing.T) { + input := []*RepairQueueEntry{ + {Status: RepairStatusQueued}, + {Status: RepairStatusProcessing}, + {Status: RepairStatusSucceeded}, + {Status: RepairStatusProcessing}, + {Status: RepairStatusSucceeded}, + {Status: RepairStatusSucceeded}, + } + expected := map[string]int{ + "queued": 1, + "processing": 2, + "succeeded": 3, + "failed": 0, + } + actual := CountRepairQueueEntries(input) + + if !cmp.Equal(actual, expected) { + t.Errorf("expected: %v, actual: %v", expected, actual) + } +} + +func TestBuildMachineRepairStatus(t *testing.T) { + inputNodes := []*Node{ + {Address: "1.1.1.1"}, + {Address: "2.2.2.2"}, + } + inputEntries := []*RepairQueueEntry{ + {Address: "1.1.1.1", Status: RepairStatusQueued}, + {Address: "10.10.10.10", Status: RepairStatusFailed}, + } + expected := map[string]map[string]bool{ + "1.1.1.1": { + "queued": true, + "processing": false, + "succeeded": false, + "failed": false, + }, + "2.2.2.2": { + "queued": false, + "processing": false, + "succeeded": false, + "failed": false, + }, + "10.10.10.10": { + "queued": false, + "processing": false, + "succeeded": false, + "failed": true, + }, + } + actual := BuildMachineRepairStatus(inputNodes, inputEntries) + + if !cmp.Equal(actual, expected) { + t.Errorf("expected: %v, actual: %v", expected, actual) + } +} diff --git a/server/get_status.go b/server/get_status.go index ed0452b3..279291ce 100644 --- a/server/get_status.go +++ b/server/get_status.go @@ -88,5 +88,11 @@ func (c Controller) GetClusterStatus(ctx context.Context, cluster *cke.Cluster, } cs.Kubernetes = kcs + repairQueueStatus, err := op.GetRepairQueueStatus(ctx, inf, livingMaster, cluster) + if err != nil { + return nil, err + } + cs.RepairQueue = repairQueueStatus + return cs, nil } diff --git a/server/node_filter.go b/server/node_filter.go index f52c1e68..c9f07a39 100644 --- a/server/node_filter.go +++ b/server/node_filter.go @@ -831,7 +831,7 @@ func nodeIsOutdated(n *cke.Node, current *corev1.Node, taintCP bool) bool { return false } -// CordonedNodes returns nodes that are cordoned and annotated as reboot operation targets. +// CordonedNodes returns nodes that are cordoned and annotated as reboot/repair operation targets. func (nf *NodeFilter) CordonedNodes() (nodes []*corev1.Node) { for i := range nf.status.Kubernetes.Nodes { n := &nf.status.Kubernetes.Nodes[i] diff --git a/server/strategy.go b/server/strategy.go index 3ad1faee..360bf51e 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -1,6 +1,8 @@ package server import ( + "time" + "github.com/cybozu-go/cke" "github.com/cybozu-go/cke/op" "github.com/cybozu-go/cke/op/clusterdns" @@ -87,11 +89,20 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain } // 9. Uncordon nodes if nodes are cordoned by CKE. - if o := rebootUncordonOp(rebootArgs.RQEntries, nf); o != nil { + if o := rebootUncordonOp(cs, rebootArgs.RQEntries, nf); o != nil { return []cke.Operator{o}, cke.PhaseUncordonNodes } - // 10. Reboot nodes if reboot request has been arrived to the reboot queue, and the number of unreachable nodes is less than a threshold. + // 10. Repair machines if repair requests have been arrived to the repair queue, and the number of unreachable nodes is less than a threshold. + if ops, phaseRepair := repairOps(c, cs, constraints, rebootArgs, nf); phaseRepair { + if !nf.EtcdIsGood() { + log.Warn("cannot repair machines because etcd cluster is not responding and in-sync", nil) + return nil, cke.PhaseRepairMachines + } + return ops, cke.PhaseRepairMachines + } + + // 11. Reboot nodes if reboot request has been arrived to the reboot queue, and the number of unreachable nodes is less than a threshold. if ops, phaseReboot := rebootOps(c, constraints, rebootArgs, nf); phaseReboot { if !nf.EtcdIsGood() { log.Warn("cannot reboot nodes because etcd cluster is not responding and in-sync", nil) @@ -687,6 +698,178 @@ func cleanOps(c *cke.Cluster, nf *NodeFilter) (ops []cke.Operator) { return ops } +func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constraints, rebootArgs DecideOpsRebootArgs, nf *NodeFilter) (ops []cke.Operator, phaseRepair bool) { + rqs := &cs.RepairQueue + + // Sort/filter entries to limit the number of concurrent repairs. + // - Entries being deleted are dequeued unconditionally. + // - Entries just repaired are moved to succeeded status unconditionally. + // - Succeeded/failed entries are left unchanged. + // - Entries already being processed have higher priority than newly queued entries. + // - Entries waiting for unexpired drain-retry-timeout are filtered out. + // - Other types of timeout-wait are considered as "being processed" and + // taken into account for the concurrency limits. + // - Entries for the API servers have higher priority. + apiServers := make(map[string]bool) + for _, cp := range nf.ControlPlane() { + apiServers[cp.Address] = true + } + + now := time.Now() + + processingApiEntries := []*cke.RepairQueueEntry{} + processingOtherEntries := []*cke.RepairQueueEntry{} + queuedApiEntries := []*cke.RepairQueueEntry{} + queuedOtherEntries := []*cke.RepairQueueEntry{} + for _, entry := range rqs.Entries { + if entry.Deleted { + ops = append(ops, op.RepairDequeueOp(entry)) + continue + } + if rqs.RepairCompleted[entry.Address] { + ops = append(ops, op.RepairFinishOp(entry, true)) + continue + } + switch entry.Status { + case cke.RepairStatusQueued: + if apiServers[entry.Address] { + queuedApiEntries = append(queuedApiEntries, entry) + } else { + queuedOtherEntries = append(queuedOtherEntries, entry) + } + case cke.RepairStatusProcessing: + if entry.StepStatus == cke.RepairStepStatusWaiting && entry.DrainBackOffExpire.After(now) { + continue + } + if apiServers[entry.Address] { + processingApiEntries = append(processingApiEntries, entry) + } else { + processingOtherEntries = append(processingOtherEntries, entry) + } + } + } + + sortedEntries := []*cke.RepairQueueEntry{} + sortedEntries = append(sortedEntries, processingApiEntries...) + sortedEntries = append(sortedEntries, processingOtherEntries...) + sortedEntries = append(sortedEntries, queuedApiEntries...) + sortedEntries = append(sortedEntries, queuedOtherEntries...) + + // Rules: + // - One machine must not be repaired by two or more entries at a time. + // - API servers must be repaired one by one. + // - API server must not be repaired while another API server is being rebooted. + // - This rule can be satisfied by this repair decision function alone, + // because reboot is blocked when this function execute repair operation. + // - API server should be repaired with higher priority than worker/non-cluster nodes. + // - This rule is not so important because a seriously unreachable API server + // will be replaced before being repaired. + // - API server may be repaired simultaneously with worker/non-cluster nodes. + processed := make(map[string]bool) + + const maxConcurrentApiServerRepairs = 1 + maxConcurrentRepairs := cke.DefaultMaxConcurrentRepairs + if c.Repair.MaxConcurrentRepairs != nil { + maxConcurrentRepairs = *c.Repair.MaxConcurrentRepairs + } + concurrentApiServerRepairs := 0 + concurrentRepairs := 0 + + rebootingApiServers := make(map[string]bool) + for _, cp := range nf.ControlPlane() { + if rebootProcessing(rebootArgs.RQEntries, cp.Nodename()) { + rebootingApiServers[cp.Address] = true + } + } + + evictionTimeoutSeconds := cke.DefaultRepairEvictionTimeoutSeconds + if c.Repair.EvictionTimeoutSeconds != nil { + evictionTimeoutSeconds = *c.Repair.EvictionTimeoutSeconds + } + evictionStartLimit := now.Add(time.Duration(-evictionTimeoutSeconds) * time.Second) + + for _, entry := range sortedEntries { + if concurrentRepairs >= maxConcurrentRepairs { + break + } + if processed[entry.Address] { + continue + } + if apiServers[entry.Address] { + if concurrentApiServerRepairs >= maxConcurrentApiServerRepairs || + len(rebootingApiServers) >= 2 || + (len(rebootingApiServers) == 1 && !rebootingApiServers[entry.Address]) { + continue + } + concurrentApiServerRepairs++ + } + concurrentRepairs++ + + RUN_STEP: + step, err := entry.GetCurrentRepairStep(c) + if err != nil { + if err != cke.ErrRepairStepOutOfRange { + log.Warn("failed to get executing repair step", map[string]interface{}{ + log.FnError: err, + "index": entry.Index, + "address": entry.Address, + "operation": entry.Operation, + "machine_type": entry.MachineType, + "step": entry.Step, + }) + continue + } + // Though ErrRepairStepOutOfRange may be caused by real misconfiguration, + // e.g., by decreasing "repair_steps" in cluster.yaml, we treat the error + // as the end of the steps for simplicity. + ops = append(ops, op.RepairFinishOp(entry, false)) + continue + } + + phaseRepair = true // true even when op is not appended + + switch entry.StepStatus { + case cke.RepairStepStatusWaiting: + if !rqs.Enabled { + continue + } + if !(step.NeedDrain && entry.IsInCluster()) { + ops = append(ops, op.RepairExecuteOp(entry, step)) + continue + } + // DrainBackOffExpire has been confirmed, so start drain now. + ops = append(ops, op.RepairDrainStartOp(nf.HealthyAPIServer(), entry, &c.Repair)) + case cke.RepairStepStatusDraining: + if !rqs.Enabled { + ops = append(ops, op.RepairDrainTimeoutOp(entry)) + continue + } + if rqs.DrainCompleted[entry.Address] { + ops = append(ops, op.RepairExecuteOp(entry, step)) + continue + } + if entry.LastTransitionTime.Before(evictionStartLimit) { + ops = append(ops, op.RepairDrainTimeoutOp(entry)) + } + // Wait for drain completion until timeout. + case cke.RepairStepStatusWatching: + // Repair incompletion has been confirmed. + if step.WatchSeconds == nil || + entry.LastTransitionTime.Add(time.Duration(*step.WatchSeconds)*time.Second).Before(now) { + entry.Step++ + entry.StepStatus = cke.RepairStepStatusWaiting + goto RUN_STEP + } + // Wait for repair completion until timeout. + } + } + + if len(ops) > 0 { + phaseRepair = true + } + return ops, phaseRepair +} + func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOpsRebootArgs, nf *NodeFilter) (ops []cke.Operator, phaseReboot bool) { if len(rebootArgs.RQEntries) == 0 { return nil, false @@ -733,14 +916,14 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp return ops, phaseReboot } -func rebootUncordonOp(rqEntries []*cke.RebootQueueEntry, nf *NodeFilter) cke.Operator { +func rebootUncordonOp(cs *cke.ClusterStatus, rqEntries []*cke.RebootQueueEntry, nf *NodeFilter) cke.Operator { attrNodes := nf.CordonedNodes() if len(attrNodes) == 0 { return nil } nodes := make([]string, 0, len(attrNodes)) for _, n := range attrNodes { - if !rebootProcessing(rqEntries, n.Name) { + if !(rebootProcessing(rqEntries, n.Name) || repairProcessing(cs.RepairQueue.Entries, n.Name)) { nodes = append(nodes, n.Name) } } @@ -761,3 +944,14 @@ func rebootProcessing(rqEntries []*cke.RebootQueueEntry, node string) bool { } return false } + +func repairProcessing(entries []*cke.RepairQueueEntry, nodename string) bool { + for _, entry := range entries { + if entry.IsInCluster() && entry.Nodename == nodename && + entry.Status == cke.RepairStatusProcessing && + (entry.StepStatus == cke.RepairStepStatusDraining || entry.StepStatus == cke.RepairStepStatusWatching) { + return true + } + } + return false +} diff --git a/server/strategy_test.go b/server/strategy_test.go index 5b45ba9d..77615847 100644 --- a/server/strategy_test.go +++ b/server/strategy_test.go @@ -1,6 +1,7 @@ package server import ( + "slices" "testing" "time" @@ -169,6 +170,7 @@ func newData() testData { }, Nodes: nodeList, }, + RepairQueue: cke.RepairQueueStatus{Enabled: true}, } return testData{ @@ -506,6 +508,22 @@ func (d testData) withK8sResourceReady() testData { return d } +func (d testData) withNotReadyEndpoint(i int) testData { + masterAddresses := d.Status.Kubernetes.MasterEndpoints.Subsets[0].Addresses + etcdAddresses := d.Status.Kubernetes.EtcdEndpoints.Subsets[0].Addresses + if i < 0 || i >= len(masterAddresses) { + return d + } + d.Status.Kubernetes.MasterEndpoints.Subsets[0].Addresses = masterAddresses[0:i] + d.Status.Kubernetes.MasterEndpoints.Subsets[0].NotReadyAddresses = masterAddresses[i:3] + d.Status.Kubernetes.EtcdEndpoints.Subsets[0].Addresses = etcdAddresses[0:i] + d.Status.Kubernetes.EtcdEndpoints.Subsets[0].NotReadyAddresses = etcdAddresses[i:3] + endpointReady := false + d.Status.Kubernetes.MasterEndpointSlice.Endpoints[i].Conditions.Ready = &endpointReady + d.Status.Kubernetes.EtcdEndpointSlice.Endpoints[i].Conditions.Ready = &endpointReady + return d +} + func (d testData) withNodes(nodes ...corev1.Node) testData { d.withK8sResourceReady() OUTER: @@ -556,6 +574,59 @@ func (d testData) withSSHNotConnectedNodes() testData { return d } +func (d testData) withRebootCordon(i int) testData { + if i < 0 || i >= len(d.Status.Kubernetes.Nodes) { + return d + } + d.Status.Kubernetes.Nodes[i].Spec.Unschedulable = true + d.Status.Kubernetes.Nodes[i].Annotations = map[string]string{ + op.CKEAnnotationReboot: "true", + } + return d +} + +func (d testData) withRepairConfig() testData { + d.Cluster.Repair = cke.Repair{ + RepairProcedures: []cke.RepairProcedure{ + { + MachineTypes: []string{"type1"}, + RepairOperations: []cke.RepairOperation{ + { + Operation: "op1", + RepairSteps: []cke.RepairStep{ + {RepairCommand: []string{"repair0"}}, + {RepairCommand: []string{"repair1"}}, + }, + }, + }, + }, + }, + } + return d +} + +func (d testData) withRepairEntries(entries []*cke.RepairQueueEntry) testData { + for i, entry := range entries { + entry.Index = int64(i) + if slices.Contains(nodeNames, entry.Address) { + entry.Nodename = entry.Address + } + if entry.Status == "" { + entry.Status = cke.RepairStatusQueued + } + if entry.StepStatus == "" { + entry.StepStatus = cke.RepairStepStatusWaiting + } + } + d.Status.RepairQueue.Entries = entries + return d +} + +func (d testData) withRepairDisabled() testData { + d.Status.RepairQueue.Enabled = false + return d +} + func (d testData) withRebootConfig() testData { d.Cluster.Reboot.RebootCommand = []string{"reboot"} d.Cluster.Reboot.BootCheckCommand = []string{"true"} @@ -1192,17 +1263,7 @@ func TestDecideOps(t *testing.T) { Node: nodeNames[2], Status: cke.RebootStatusQueued, }, - }).with(func(d testData) { - masterAddresses := d.Status.Kubernetes.MasterEndpoints.Subsets[0].Addresses - d.Status.Kubernetes.MasterEndpoints.Subsets[0].Addresses = masterAddresses[0:2] - d.Status.Kubernetes.MasterEndpoints.Subsets[0].NotReadyAddresses = masterAddresses[2:3] - etcdAddresses := d.Status.Kubernetes.EtcdEndpoints.Subsets[0].Addresses - d.Status.Kubernetes.EtcdEndpoints.Subsets[0].Addresses = etcdAddresses[0:2] - d.Status.Kubernetes.EtcdEndpoints.Subsets[0].NotReadyAddresses = etcdAddresses[2:3] - endpointReady := false - d.Status.Kubernetes.MasterEndpointSlice.Endpoints[2].Conditions.Ready = &endpointReady - d.Status.Kubernetes.EtcdEndpointSlice.Endpoints[2].Conditions.Ready = &endpointReady - }), + }).withNotReadyEndpoint(2), ExpectedOps: []opData{{"reboot-drain-start", 1}}, }, { @@ -2010,13 +2071,8 @@ func TestDecideOps(t *testing.T) { ExpectedOps: nil, }, { - Name: "UncordonNodes", - Input: newData().withK8sResourceReady().withRebootConfig().with(func(d testData) { - d.Status.Kubernetes.Nodes[0].Spec.Unschedulable = true - d.Status.Kubernetes.Nodes[0].Annotations = map[string]string{ - op.CKEAnnotationReboot: "true", - } - }), + Name: "UncordonNodes", + Input: newData().withK8sResourceReady().withRebootConfig().withRebootCordon(0), ExpectedOps: []opData{ {"reboot-uncordon", 1}, }, @@ -2028,6 +2084,486 @@ func TestDecideOps(t *testing.T) { }), ExpectedOps: nil, }, + { + Name: "Repair", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1"}, + }), + ExpectedOps: []opData{ + {"repair-execute", 1}, + }, + }, + { + Name: "RepairWithoutConfig", + Input: newData().withK8sResourceReady().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1"}, + }), + ExpectedOps: nil, + }, + { + Name: "RepairBadMachineType", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type2", Operation: "op1"}, + }), + ExpectedOps: nil, + }, + { + Name: "RepairBadOperation", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "noop"}, + {Address: nodeNames[5], MachineType: "type1", Operation: "op1"}, + }), + ExpectedOps: nil, // implementation dependent; bad entry consumes concurrency slot + }, + { + Name: "RepairOutOfCluster", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: "10.0.99.99", MachineType: "type1", Operation: "op1"}, + }), + ExpectedOps: []opData{ + {"repair-execute", 1}, + }, + }, + { + Name: "RepairApiServerHighPriority", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "noop"}, + {Address: nodeNames[0], MachineType: "type1", Operation: "op1"}, + }), + ExpectedOps: []opData{ + {"repair-execute", 1}, // implementation dependent; cf. RepairBadOperation + }, + }, + { + Name: "RepairMaxConcurrent", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[0], MachineType: "type1", Operation: "op1"}, + {Address: nodeNames[4], MachineType: "type1", Operation: "op1"}, + {Address: nodeNames[5], MachineType: "type1", Operation: "op1"}, + }).with(func(d testData) { + max := 2 + d.Cluster.Repair.MaxConcurrentRepairs = &max + }), + ExpectedOps: []opData{ + {"repair-execute", 1}, + {"repair-execute", 1}, + }, + }, + { + Name: "RepairMaxConcurrentSameMachine", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[0], MachineType: "type1", Operation: "op1"}, + {Address: nodeNames[0], MachineType: "type1", Operation: "op1"}, + }).with(func(d testData) { + max := 2 + d.Cluster.Repair.MaxConcurrentRepairs = &max + }), + ExpectedOps: []opData{ + {"repair-execute", 1}, + }, + }, + { + Name: "RepairMaxConcurrentApiServer", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[0], MachineType: "type1", Operation: "op1"}, + {Address: nodeNames[1], MachineType: "type1", Operation: "op1"}, + }).with(func(d testData) { + max := 2 + d.Cluster.Repair.MaxConcurrentRepairs = &max + }), + ExpectedOps: []opData{ + {"repair-execute", 1}, + }, + }, + { + Name: "RepairApiServerAnotherRebooting", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[0], MachineType: "type1", Operation: "op1"}, + }).withRebootConfig().withRebootEntries([]*cke.RebootQueueEntry{ + {Index: 1, Node: nodeNames[2], Status: cke.RebootStatusRebooting}, + }).withRebootCordon(2).withNotReadyEndpoint(2), + ExpectedOps: nil, + }, + { + Name: "RepairRebootingApiServer", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[2], MachineType: "type1", Operation: "op1"}, + }).withRebootConfig().withRebootEntries([]*cke.RebootQueueEntry{ + {Index: 1, Node: nodeNames[2], Status: cke.RebootStatusRebooting}, + }).withRebootCordon(2).withNotReadyEndpoint(2), + ExpectedOps: []opData{ + {"repair-execute", 1}, + }, + }, + { + Name: "RepairDrain", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1"}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }), + ExpectedOps: []opData{ + {"repair-drain-start", 1}, + }, + }, + { + Name: "RepairDrainWaitCompletion", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusDraining, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second / 2)}, + }).with(func(d testData) { + timeout := 60 + d.Cluster.Repair.EvictionTimeoutSeconds = &timeout + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }).withRebootCordon(4), + ExpectedOps: nil, + }, + { + Name: "RepairDrainWaitCompletionExpire", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusDraining, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second * 2)}, + }).with(func(d testData) { + timeout := 60 + d.Cluster.Repair.EvictionTimeoutSeconds = &timeout + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-drain-timeout", 1}, + }, + }, + { + Name: "RepairDrainWaitCompletionDefaultTimeout", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusDraining, + LastTransitionTime: time.Now().Add(-time.Duration(cke.DefaultRepairEvictionTimeoutSeconds) * time.Second / 2)}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }).withRebootCordon(4), + ExpectedOps: nil, + }, + { + Name: "RepairDrainWaitCompletionExpireDefaultTimeout", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusDraining, + LastTransitionTime: time.Now().Add(-time.Duration(cke.DefaultRepairEvictionTimeoutSeconds) * time.Second * 2)}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-drain-timeout", 1}, + }, + }, + { + Name: "RepairDrainWaitRetryUncordon", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWaiting, + DrainBackOffExpire: time.Now().Add(time.Duration(60) * time.Second)}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"reboot-uncordon", 1}, + }, + }, + { + Name: "RepairDrainWaitRetry", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWaiting, + DrainBackOffExpire: time.Now().Add(time.Duration(60) * time.Second)}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }), + ExpectedOps: nil, + }, + { + Name: "RepairDrainWaitRetryExpire", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWaiting, + DrainBackOffExpire: time.Now().Add(-time.Duration(60) * time.Second)}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }), + ExpectedOps: []opData{ + {"repair-drain-start", 1}, + }, + }, + { + Name: "RepairDrainCompleted", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusDraining}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + d.Status.RepairQueue.DrainCompleted = map[string]bool{nodeNames[4]: true} + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-execute", 1}, + }, + }, + { + Name: "RepairWatch", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWatching, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second / 2)}, + }).with(func(d testData) { + watch := 60 + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds = &watch + }).withRebootCordon(4), + ExpectedOps: nil, + }, + { + Name: "RepairWatchExpire", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWatching, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second * 2)}, + }).with(func(d testData) { + watch := 60 + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds = &watch + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-execute", 1}, // next step + }, + }, + { + Name: "RepairWatchExpireDefaultTimeout", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWatching, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second / 2)}, + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-execute", 1}, // next step + }, + }, + { + Name: "RepairWatchExpireLastStep", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWatching, Step: 1, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second / 2)}, + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-finish", 1}, // failed + }, + }, + { + Name: "RepairCompleted", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWatching, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second / 2)}, + }).with(func(d testData) { + d.Status.RepairQueue.RepairCompleted = map[string]bool{nodeNames[4]: true} + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-finish", 1}, // succeeded + }, + }, + { + Name: "RepairSucceededUncordon", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusSucceeded}, + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"reboot-uncordon", 1}, + }, + }, + { + Name: "RepairSucceeded", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusSucceeded}, + }), + ExpectedOps: nil, + }, + { + Name: "RepairFailedUncordon", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusFailed}, + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"reboot-uncordon", 1}, + }, + }, + { + Name: "RepairFailed", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusFailed}, + }), + ExpectedOps: nil, + }, + { + Name: "RepairDeletedUncordon", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Deleted: true}, + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"reboot-uncordon", 1}, + }, + }, + { + Name: "RepairDeleted", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Deleted: true}, + }), + ExpectedOps: []opData{ + {"repair-dequeue", 1}, + }, + }, + { + Name: "RepairDisabled", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1"}, + }), + ExpectedOps: nil, + }, + { + Name: "RepairDisabledDrain", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1"}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }), + ExpectedOps: nil, + }, + { + Name: "RepairDisabledDrainWaitCompletion", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusDraining, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second / 2)}, + }).with(func(d testData) { + timeout := 60 + d.Cluster.Repair.EvictionTimeoutSeconds = &timeout + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-drain-timeout", 1}, + }, + }, + { + Name: "RepairDisabledDrainWaitCompletionExpire", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusDraining, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second * 2)}, + }).with(func(d testData) { + timeout := 60 + d.Cluster.Repair.EvictionTimeoutSeconds = &timeout + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-drain-timeout", 1}, + }, + }, + { + Name: "RepairDisabledDrainWaitRetry", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWaiting, + DrainBackOffExpire: time.Now().Add(time.Duration(60) * time.Second)}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }), + ExpectedOps: nil, + }, + { + Name: "RepairDisabledDrainWaitRetryExpire", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWaiting, + DrainBackOffExpire: time.Now().Add(-time.Duration(60) * time.Second)}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + }), + ExpectedOps: nil, + }, + { + Name: "RepairDisabledDrainCompleted", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusDraining}, + }).with(func(d testData) { + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = true + d.Status.RepairQueue.DrainCompleted = map[string]bool{nodeNames[4]: true} + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-drain-timeout", 1}, + }, + }, + { + Name: "RepairDisabledWatch", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWatching, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second / 2)}, + }).with(func(d testData) { + watch := 60 + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds = &watch + }).withRebootCordon(4), + ExpectedOps: nil, + }, + { + Name: "RepairDisabledWatchExpire", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWatching, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second * 2)}, + }).with(func(d testData) { + watch := 60 + d.Cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds = &watch + }).withRebootCordon(4), + ExpectedOps: nil, + }, + { + Name: "RepairDisabledWatchExpireLastStep", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWatching, Step: 1, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second / 2)}, + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-finish", 1}, // failed + }, + }, + { + Name: "RepairDisabledCompleted", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Status: cke.RepairStatusProcessing, StepStatus: cke.RepairStepStatusWatching, + LastTransitionTime: time.Now().Add(-time.Duration(60) * time.Second / 2)}, + }).with(func(d testData) { + d.Status.RepairQueue.RepairCompleted = map[string]bool{nodeNames[4]: true} + }).withRebootCordon(4), + ExpectedOps: []opData{ + {"repair-finish", 1}, // succeeded + }, + }, + { + Name: "RepairDisabledDeleted", + Input: newData().withK8sResourceReady().withRepairConfig().withRepairDisabled().withRepairEntries([]*cke.RepairQueueEntry{ + {Address: nodeNames[4], MachineType: "type1", Operation: "op1", + Deleted: true}, + }), + ExpectedOps: []opData{ + {"repair-dequeue", 1}, + }, + }, { Name: "RebootWithoutConfig", Input: newData().withK8sResourceReady().withRebootEntries([]*cke.RebootQueueEntry{ diff --git a/status.go b/status.go index 5415f091..d475d89f 100644 --- a/status.go +++ b/status.go @@ -86,8 +86,9 @@ type ClusterStatus struct { Name string NodeStatuses map[string]*NodeStatus // keys are IP address strings. - Etcd EtcdClusterStatus - Kubernetes KubernetesClusterStatus + Etcd EtcdClusterStatus + Kubernetes KubernetesClusterStatus + RepairQueue RepairQueueStatus } // NodeStatus status of a node. @@ -148,3 +149,11 @@ type ProxyStatus struct { IsHealthy bool Config *proxyv1alpha1.KubeProxyConfiguration } + +// RepairQueueStatus represents repair queue status +type RepairQueueStatus struct { + Enabled bool + Entries []*RepairQueueEntry + RepairCompleted map[string]bool + DrainCompleted map[string]bool +} diff --git a/storage.go b/storage.go index 65f714b0..832d1b25 100644 --- a/storage.go +++ b/storage.go @@ -36,6 +36,9 @@ const ( KeyRebootsWriteIndex = "reboots/write-index" KeyRecords = "records/" KeyRecordID = "records" + KeyRepairsDisabled = "repairs/disabled" + KeyRepairsPrefix = "repairs/data/" + KeyRepairsWriteIndex = "repairs/write-index" KeyResourcePrefix = "resource/" KeySabakanDisabled = "sabakan/disabled" KeySabakanQueryVariables = "sabakan/query-variables" @@ -882,6 +885,183 @@ func (s Storage) DeleteRebootsEntry(ctx context.Context, leaderKey string, index return nil } +// IsRepairQueueDisabled returns true if repair queue is disabled. +func (s Storage) IsRepairQueueDisabled(ctx context.Context) (bool, error) { + resp, err := s.Get(ctx, KeyRepairsDisabled) + if err != nil { + return false, err + } + if resp.Count == 0 { + return false, nil + } + + return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil +} + +// EnableRepairQueue enables repair queue processing when flag is true. +// When flag is false, repair queue is not processed. +func (s Storage) EnableRepairQueue(ctx context.Context, enable bool) error { + var disabled string + if enable { + disabled = "false" + } else { + disabled = "true" + } + _, err := s.Put(ctx, KeyRepairsDisabled, disabled) + return err +} + +func repairsEntryKey(index int64) string { + return fmt.Sprintf("%s%016x", KeyRepairsPrefix, index) +} + +// RegisterRepairssEntry enqueues a repair queue entry to the repair queue. +// "Index" of the entry is retrieved and updated in this method. The given value is ignored. +func (s Storage) RegisterRepairsEntry(ctx context.Context, r *RepairQueueEntry) error { +RETRY: + var writeIndex, writeIndexRev int64 + resp, err := s.Get(ctx, KeyRepairsWriteIndex) + if err != nil { + return err + } + if resp.Count != 0 { + value, err := strconv.ParseInt(string(resp.Kvs[0].Value), 10, 64) + if err != nil { + return err + } + writeIndex = value + writeIndexRev = resp.Kvs[0].ModRevision + } + + r.Index = writeIndex + data, err := json.Marshal(r) + if err != nil { + return err + } + + newWriteIndex := strconv.FormatInt(writeIndex+1, 10) + txnResp, err := s.Txn(ctx). + If( + clientv3.Compare(clientv3.ModRevision(KeyRepairsWriteIndex), "=", writeIndexRev), + ). + Then( + clientv3.OpPut(repairsEntryKey(writeIndex), string(data)), + clientv3.OpPut(KeyRepairsWriteIndex, newWriteIndex), + ). + Commit() + if err != nil { + return err + } + if !txnResp.Succeeded { + goto RETRY + } + + return nil +} + +// UpdateRepairsEntry updates existing repair queue entry. +// It always overwrites the contents with a CAS loop. +// If the entry is not found in the repair queue, this returns ErrNotFound. +func (s Storage) UpdateRepairsEntry(ctx context.Context, r *RepairQueueEntry) error { + key := repairsEntryKey(r.Index) + data, err := json.Marshal(r) + if err != nil { + return err + } + +RETRY: + resp, err := s.Get(ctx, key) + if err != nil { + return err + } + if resp.Count == 0 { + return ErrNotFound + } + + rev := resp.Kvs[0].ModRevision + txnResp, err := s.Txn(ctx). + If( + clientv3.Compare(clientv3.ModRevision(key), "=", rev), + ). + Then( + clientv3.OpPut(key, string(data)), + ). + Commit() + if err != nil { + return err + } + if !txnResp.Succeeded { + goto RETRY + } + + return nil +} + +// GetRepairsEntry loads the entry specified by the index from the repair queue. +// If the pointed entry is not found, this returns ErrNotFound. +func (s Storage) GetRepairsEntry(ctx context.Context, index int64) (*RepairQueueEntry, error) { + resp, err := s.Get(ctx, repairsEntryKey(index)) + if err != nil { + return nil, err + } + + if len(resp.Kvs) == 0 { + return nil, ErrNotFound + } + + r := new(RepairQueueEntry) + err = json.Unmarshal(resp.Kvs[0].Value, r) + if err != nil { + return nil, err + } + + return r, nil +} + +// GetRepairsEntries loads the entries from the repair queue. +func (s Storage) GetRepairsEntries(ctx context.Context) ([]*RepairQueueEntry, error) { + opts := []clientv3.OpOption{ + clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + } + resp, err := s.Get(ctx, KeyRepairsPrefix, opts...) + if err != nil { + return nil, err + } + + if len(resp.Kvs) == 0 { + return nil, nil + } + + repairs := make([]*RepairQueueEntry, len(resp.Kvs)) + for i, kv := range resp.Kvs { + r := new(RepairQueueEntry) + err = json.Unmarshal(kv.Value, r) + if err != nil { + return nil, err + } + repairs[i] = r + } + + return repairs, nil +} + +// DeleteRepairsEntry deletes the entry specified by the index from the repair queue. +func (s Storage) DeleteRepairsEntry(ctx context.Context, leaderKey string, index int64) error { + resp, err := s.Txn(ctx). + If(clientv3util.KeyExists(leaderKey)). + Then(clientv3.OpDelete(repairsEntryKey(index))). + Commit() + if err != nil { + return err + } + if !resp.Succeeded { + return ErrNoLeader + } + + return nil +} + // SetStatus stores the server status. func (s Storage) SetStatus(ctx context.Context, lease clientv3.LeaseID, st *ServerStatus) error { data, err := json.Marshal(st) diff --git a/storage_test.go b/storage_test.go index 8ab7a5f5..43785e5d 100644 --- a/storage_test.go +++ b/storage_test.go @@ -813,6 +813,148 @@ func testStorageReboot(t *testing.T) { } } +func testStorageRepair(t *testing.T) { + t.Parallel() + + client := newEtcdClient(t) + defer client.Close() + storage := Storage{client} + ctx := context.Background() + + s, err := concurrency.NewSession(client) + if err != nil { + t.Fatal(err) + } + defer s.Close() + e := concurrency.NewElection(s, KeyLeader) + err = e.Campaign(ctx, "test") + if err != nil { + t.Fatal(err) + } + + leaderKey := e.Key() + + // initial state = there are no entries && repair-queue is enabled + + // index 0 does not exist + _, err = storage.GetRepairsEntry(ctx, 0) + if err != ErrNotFound { + t.Error("unexpected error:", err) + } + + // get all - no entries + ents, err := storage.GetRepairsEntries(ctx) + if err != nil { + t.Fatal("GetRepairsEntries failed:", err) + } + if len(ents) != 0 { + t.Error("Unknown entries:", ents) + } + + // index 0 does not exist + err = storage.DeleteRepairsEntry(ctx, leaderKey, 0) + if err != nil { + t.Fatal("DeleteRepairsEntry failed:", err) + } + + // first write - index is 0 + entry := NewRepairQueueEntry("operation1", "machine1", "1.2.3.4") + err = storage.RegisterRepairsEntry(ctx, entry) + if err != nil { + t.Fatal("RegisterRepairsEntry failed:", err) + } + + // second write - index is 1 + entry2 := NewRepairQueueEntry("operation2", "machine2", "12.34.56.78") + err = storage.RegisterRepairsEntry(ctx, entry2) + if err != nil { + t.Fatal("RegisterRepairsEntry failed:", err) + } + + // get index 1 - the second written entry is return + ent, err := storage.GetRepairsEntry(ctx, 1) + if err != nil { + t.Fatal("GetRepairsEntry failed:", err) + } + if !cmp.Equal(ent, entry2) { + t.Error("GetRepairsEntry returned unexpected result:", cmp.Diff(ent, entry2)) + } + + // get all - entries are returned in written order + entries := []*RepairQueueEntry{entry, entry2} + ents, err = storage.GetRepairsEntries(ctx) + if err != nil { + t.Fatal("GetRepairsEntries failed:", err) + } + if !cmp.Equal(ents, entries) { + t.Error("GetRepairsEntries returned unexpected result:", cmp.Diff(ents, entries)) + } + + // update index 0 and get index 0 - updated entry is returned + entry.Status = RepairStatusProcessing + err = storage.UpdateRepairsEntry(ctx, entry) + if err != nil { + t.Fatal("UpdateRepairsEntry failed:", err) + } + ent, err = storage.GetRepairsEntry(ctx, 0) + if err != nil { + t.Fatal("GetRepairsEntry failed:", err) + } + if !cmp.Equal(ent, entry) { + t.Error("GetRepairsEntry returned unexpected result:", cmp.Diff(ent, entry)) + } + + // delete index 0 - the entry will not be got nor updated + err = storage.DeleteRepairsEntry(ctx, leaderKey, 0) + if err != nil { + t.Fatal("DeleteRepairsEntry failed:", err) + } + _, err = storage.GetRepairsEntry(ctx, 0) + if err != ErrNotFound { + t.Error("unexpected error:", err) + } + err = storage.UpdateRepairsEntry(ctx, entry) + if err == nil { + t.Error("UpdateRepairsEntry succeeded for deleted entry") + } + + // repair-queue is enabled by default + disabled, err := storage.IsRepairQueueDisabled(ctx) + if err != nil { + t.Fatal(err) + } + if disabled { + t.Error("repair queue should not be disabled by default") + } + + // disable repair-queue and get its state + err = storage.EnableRepairQueue(ctx, false) + if err != nil { + t.Fatal(err) + } + disabled, err = storage.IsRepairQueueDisabled(ctx) + if err != nil { + t.Fatal(err) + } + if !disabled { + t.Error("repair queue could not be disabled") + } + + // re-enable repair-queue and get its state + err = storage.EnableRepairQueue(ctx, true) + if err != nil { + t.Fatal(err) + } + disabled, err = storage.IsRepairQueueDisabled(ctx) + if err != nil { + t.Fatal(err) + } + if disabled { + t.Error("repair queue could not be re-enabled") + } + +} + func testStatus(t *testing.T) { t.Parallel() @@ -864,5 +1006,6 @@ func TestStorage(t *testing.T) { t.Run("Resource", testStorageResource) t.Run("Sabakan", testStorageSabakan) t.Run("Reboot", testStorageReboot) + t.Run("Repair", testStorageRepair) t.Run("Status", testStatus) } diff --git a/testdata/cluster.yaml b/testdata/cluster.yaml index 2c73026f..a5c787e0 100644 --- a/testdata/cluster.yaml +++ b/testdata/cluster.yaml @@ -23,6 +23,29 @@ reboot: protected_namespaces: matchLabels: app: sample +repair: + repair_procedures: + - machine_types: ["Cray-1", "Cray-2"] + repair_operations: + - operation: "unreachable" + repair_steps: + - repair_command: ["reset", "remotely"] + command_timeout_seconds: 10 + command_retries: 1 + command_interval: 5 + need_drain: true + watch_seconds: 60 + - repair_command: ["apply", "hammer"] + watch_seconds: 10 + health_check_command: ["knock"] + command_timeout_seconds: 30 + max_concurrent_repairs: 2 + protected_namespaces: + matchLabels: + app: protected + evict_retries: 3 + evict_interval: 5 + eviction_timeout_seconds: 120 options: etcd: volume_name: myetcd