diff --git a/fleetctl/fleetctl.go b/fleetctl/fleetctl.go index 30cd7beb8..48150e08c 100644 --- a/fleetctl/fleetctl.go +++ b/fleetctl/fleetctl.go @@ -1246,3 +1246,35 @@ func waitForState(stateCheckFunc func() error) (time.Duration, error) { } } } + +// cmdGlobalMachineState runs a specific fleetctl command on each target machine +// where global units are started. To avoid unnecessary ssh connections being +// alive, it filters out the list of machines as much as possible. +func cmdGlobalMachineState(cCmd *cobra.Command, globalUnits []schema.Unit) (err error) { + cmd := cCmd.Name() + mapUNs := map[string]string{} + for _, unit := range globalUnits { + m := cachedMachineState(unit.MachineID) + if m == nil || m.ID == "" || m.PublicIP == "" { + continue + } + mapUNs[m.ID] = unit.Name + } + + // create a list of unique unit names + resultIDs := map[string]string{} + for id, name := range mapUNs { + resultIDs[id] = name + } + + for id, name := range resultIDs { + // run a correspondent systemctl command + if exitVal := runCommand(cCmd, id, "systemctl", cmd, name); exitVal != 0 { + err = fmt.Errorf("Error running systemctl %s. machine id=%v, unit name=%s", + cmd, id, name) + break + } + } + + return err +} diff --git a/fleetctl/restart.go b/fleetctl/restart.go new file mode 100644 index 000000000..df1809636 --- /dev/null +++ b/fleetctl/restart.go @@ -0,0 +1,125 @@ +// Copyright 2016 The fleet Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "os" + + "github.com/spf13/cobra" + + "github.com/coreos/fleet/job" + "github.com/coreos/fleet/log" + "github.com/coreos/fleet/schema" +) + +var cmdRestart = &cobra.Command{ + Use: "restart [--block-attempts=N] UNIT...", + Short: "Instruct systemd to rolling restart one or more units in the cluster.", + Long: `Restarts one or more units in the cluster. If they are stopped it starts them. + +Instructs systemd on the host machine to stop then start the unit, deferring to systemd +completely for any custom restart directives (i.e. ExecStop options in the unit +file). + +For units which are not global, restart operations are performed synchronously, +which means fleetctl will block until it detects that the unit(s) have +transitioned to a stopped state and then back to a started state. This behaviour can be configured with the +respective --block-attempts options. Restart operations on global +units are always non-blocking. + +Restart a single unit: + fleetctl restart foo.service + +Restart an entire directory of units with glob matching, without waiting: + fleetctl restart myservice/*`, + Run: runWrapper(runRestartUnit), +} + +func init() { + cmdFleet.AddCommand(cmdRestart) + + cmdRestart.Flags().IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the units are stopped/started, performing up to N attempts before giving up. A value of 0 indicates no limit. Does not apply to global units.") +} + +func runRestartUnit(cCmd *cobra.Command, args []string) (exit int) { + if len(args) == 0 { + stderr("No units given") + return 0 + } + units, err := findUnits(args) + if err != nil { + stderr("%v", err) + return 1 + } + + if err := lazyCreateUnits(cCmd, args); err != nil { + stderr("Error creating units: %v", err) + return 1 + } + + globalUnits := make([]schema.Unit, 0) + for _, unit := range units { + if suToGlobal(unit) { + globalUnits = append(globalUnits, unit) + continue + } + if job.JobState(unit.CurrentState) == job.JobStateInactive { + stderr("Unable to restart unit %s in state %s", unit.Name, job.JobStateInactive) + continue + } else if job.JobState(unit.CurrentState) == job.JobStateLoaded { + log.Infof("Unit(%s) already %s, starting.", unit.Name, job.JobStateLoaded) + + exit = setUnitStateAndWait(unit, job.JobStateLaunched, getBlockAttempts(cCmd)) + if exit == 1 { + return exit + } + continue + } else { + //stop and start it + exit = setUnitStateAndWait(unit, job.JobStateLoaded, getBlockAttempts(cCmd)) + if exit == 1 { + return exit + } + exit = setUnitStateAndWait(unit, job.JobStateLaunched, getBlockAttempts(cCmd)) + if exit == 1 { + return exit + } + } + log.Infof("Unit(%s) was restarted.", unit.Name) + } + + if err := cmdGlobalMachineState(cCmd, globalUnits); err != nil { + stderr("Error restarting global units %v err:%v", globalUnits, err) + return 1 + } + + return +} + +func setUnitStateAndWait(unit schema.Unit, targetState job.JobState, blockAttempts int) (exit int) { + err := cAPI.SetUnitTargetState(unit.Name, string(targetState)) + if err != nil { + stderr("Error setting target state for unit %s: %v", unit.Name, err) + return 1 + } + + err = tryWaitForUnitStates([]string{unit.Name}, "restart", job.JobStateLaunched, blockAttempts, os.Stdout) + if err != nil { + stderr("Error waiting for unit states, exit status: %v", err) + return 1 + } + + return 0 +} diff --git a/fleetctl/restart_test.go b/fleetctl/restart_test.go new file mode 100644 index 000000000..cffb2ba2f --- /dev/null +++ b/fleetctl/restart_test.go @@ -0,0 +1,136 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "sync" + "testing" + + "github.com/coreos/fleet/schema" +) + +func checkRestartUnitState(unit schema.Unit, restartRet int, errchan chan error) { + if restartRet != 0 && unit.DesiredState != "" { + // if the whole restart operation failed, then no unit + // should have a DesiredState set + errchan <- fmt.Errorf("Error: Unit(%s) DesiredState was set to (%s)", unit.Name, unit.DesiredState) + } +} + +func doRestartUnits(t *testing.T, r commandTestResults, errchan chan error) { + sharedFlags.NoBlock = true + exit := runRestartUnit(cmdRestart, r.units) + if exit != r.expectedExit { + errchan <- fmt.Errorf("%s: expected exit code %d but received %d", r.description, r.expectedExit, exit) + return + } + + real_units, err := findUnits(r.units) + if err != nil { + errchan <- err + return + } + + for _, v := range real_units { + checkRestartUnitState(v, r.expectedExit, errchan) + } +} + +func runRestartUnits(t *testing.T, unitPrefix string, results []commandTestResults, template bool) { + unitsCount := 0 + for _, r := range results { + var wg_res sync.WaitGroup + + if !template { + unitsCount = len(r.units) + } + + cAPI = newFakeRegistryForCommands(unitPrefix, unitsCount, template) + + errchan_res := make(chan error) + wg_res.Add(1) + go func() { + defer wg_res.Done() + doRestartUnits(t, r, errchan_res) + }() + + go func() { + wg_res.Wait() + close(errchan_res) + }() + + for err := range errchan_res { + t.Errorf("%v", err) + } + } +} + +func TestRunRestartUnits(t *testing.T) { + unitPrefix := "restart" + oldNoBlock := sharedFlags.NoBlock + defer func() { + sharedFlags.NoBlock = oldNoBlock + }() + + results := []commandTestResults{ + { + "restart available units", + []string{"restart1", "restart2", "restart3", "restart4", "restart5", "restart6"}, + 0, + }, + { + "restart non-available units", + []string{"y1", "y2"}, + 1, + }, + { + "restart available and non-available units", + []string{"y1", "y2", "y3", "y4", "restart1", "restart2", "restart3", "restart4", "restart5", "restart6", "y0"}, + 1, + }, + { + "restart a unit from a non-available template", + []string{"foo-template@1"}, + 1, + }, + { + "restart null input", + []string{}, + 0, + }, + } + + templateResults := []commandTestResults{ + { + "restart a unit from a non-available template", + []string{"restart-foo@1"}, + 1, + }, + { + "restart units from an available template", + []string{"restart@1", "restart@100", "restart@1000"}, + 0, + }, + { + "restart same unit from an available template", + []string{"restart@1", "restart@1", "restart@1"}, + 0, + }, + } + + runRestartUnits(t, unitPrefix, results, false) + runRestartUnits(t, unitPrefix, templateResults, true) +} diff --git a/fleetctl/status.go b/fleetctl/status.go index 20f15864a..c24e299ca 100644 --- a/fleetctl/status.go +++ b/fleetctl/status.go @@ -20,6 +20,7 @@ import ( "github.com/spf13/cobra" "github.com/coreos/fleet/job" + "github.com/coreos/fleet/schema" ) var cmdStatus = &cobra.Command{ @@ -46,6 +47,7 @@ func init() { } func runStatusUnit(cCmd *cobra.Command, args []string) (exit int) { + globalUnits := make([]schema.Unit, 0) for i, arg := range args { name := unitNameMangle(arg) unit, err := cAPI.Unit(name) @@ -58,8 +60,8 @@ func runStatusUnit(cCmd *cobra.Command, args []string) (exit int) { stderr("Unit %s does not exist.", name) return 1 } else if suToGlobal(*unit) { - stderr("Unable to determine status of global unit %s.", unit.Name) - return 1 + globalUnits = append(globalUnits, *unit) + continue } else if job.JobState(unit.CurrentState) == job.JobStateInactive { stderr("Unit %s does not appear to be loaded.", unit.Name) return 1 @@ -76,5 +78,10 @@ func runStatusUnit(cCmd *cobra.Command, args []string) (exit int) { } } + if err := cmdGlobalMachineState(cCmd, globalUnits); err != nil { + stderr("Error retrieving machine state for global units: %v", err) + return 1 + } + return } diff --git a/functional/scheduling_test.go b/functional/scheduling_test.go index 6771df2ee..0dd386f70 100644 --- a/functional/scheduling_test.go +++ b/functional/scheduling_test.go @@ -685,12 +685,14 @@ func TestScheduleGlobalUnits(t *testing.T) { t.Fatal(err) } defer cluster.Destroy(t) - members, err := platform.CreateNClusterMembers(cluster, 3) + numGUnits := 3 + numAllUnits := 5 + members, err := platform.CreateNClusterMembers(cluster, numGUnits) if err != nil { t.Fatal(err) } m0 := members[0] - machines, err := cluster.WaitForNMachines(m0, 3) + machines, err := cluster.WaitForNMachines(m0, numGUnits) if err != nil { t.Fatal(err) } @@ -708,19 +710,21 @@ func TestScheduleGlobalUnits(t *testing.T) { } // Now add a global unit - stdout, stderr, err = cluster.Fleetctl(m0, "start", "--no-block", "fixtures/units/global.service") + globalLongPath := "fixtures/units/global.service" + stdout, stderr, err = cluster.Fleetctl(m0, "start", "--no-block", globalLongPath) if err != nil { t.Fatalf("Failed starting unit: \nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err) } // Should see 2 + 3 units - states, err := cluster.WaitForNActiveUnits(m0, 5) + states, err := cluster.WaitForNActiveUnits(m0, numAllUnits) if err != nil { t.Fatal(err) } // Each machine should have a single global unit - us := states["global.service"] + globalBase := path.Base(globalLongPath) + us := states[globalBase] for _, mach := range machines { var found bool for _, state := range us { @@ -737,6 +741,27 @@ func TestScheduleGlobalUnits(t *testing.T) { } } } + + stdout, stderr, err = cluster.Fleetctl(m0, "status", "--no-block", globalBase) + if err != nil { + t.Fatalf("Failed getting unit status: \nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err) + } + + // restarting global units + cmd := "restart" + stdout, stderr, err = cluster.Fleetctl(m0, cmd, globalBase) + if err != nil { + t.Fatalf("Failed restarting global unit: \nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err) + } + + outmap, err := waitForNUnitsStart(cluster, m0, numAllUnits) + if err != nil { + t.Fatalf("Failed listing global units: %v", err) + } + glist, _ := outmap[globalBase] + if len(glist) != numGUnits { + t.Fatalf("Did not find %d global units: got %d", numGUnits, len(glist)) + } } // TestScheduleGlobalConflicts starts 2 global units that conflict with each diff --git a/functional/unit_action_test.go b/functional/unit_action_test.go index 613161d45..602e5cc42 100644 --- a/functional/unit_action_test.go +++ b/functional/unit_action_test.go @@ -35,9 +35,10 @@ const ( ) var cleanCmd = map[string]string{ - "submit": "destroy", - "load": "unload", - "start": "stop", + "submit": "destroy", + "load": "unload", + "start": "stop", + "restart": "stop", } // TestUnitRunnable is the simplest test possible, deplying a single-node @@ -165,6 +166,41 @@ func TestUnitStartReplace(t *testing.T) { } } +// TestUnitRestart checks if a unit becomes started and restarted successfully. +// First it starts a unit, and restarts the unit, verifies it's restarted. +func TestUnitRestart(t *testing.T) { + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + t.Fatal(err) + } + defer cluster.Destroy(t) + + m, err := cluster.CreateMember() + if err != nil { + t.Fatal(err) + } + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + t.Fatal(err) + } + + numUnits := 3 + + // first start units before restarting them + unitFiles, err := launchUnitsCmd(cluster, m, "start", numUnits) + if err != nil { + t.Fatal(err) + } + if err := checkListUnits(cluster, m, "start", unitFiles, numUnits); err != nil { + t.Fatal(err) + } + + // now restart + if err := unitStartCommon(cluster, m, "restart", numUnits); err != nil { + t.Fatal(err) + } +} + func TestUnitSSHActions(t *testing.T) { cluster, err := platform.NewNspawnCluster("smoke") if err != nil { @@ -582,6 +618,8 @@ func checkListUnits(cl platform.Cluster, m platform.Member, cmd string, ufs []st lenLists = len(lus) break case "start": + fallthrough + case "restart": lus, err = waitForNUnitsStart(cl, m, nu) lenLists = len(lus) break @@ -604,18 +642,6 @@ func checkListUnits(cl platform.Cluster, m platform.Member, cmd string, ufs []st if lenLists != nu || !found { return fmt.Errorf("Expected %s to be unit file", ufs[i]) } - - if cmd == "start" { - // Check expected systemd state after starting units - stdout, stderr, err := cl.MemberCommand(m, "systemctl", "show", "--property=ActiveState", ufs[i]) - if strings.TrimSpace(stdout) != "ActiveState=active" { - return fmt.Errorf("Fleet unit not reported as active:\nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err) - } - stdout, stderr, err = cl.MemberCommand(m, "systemctl", "show", "--property=Result", ufs[i]) - if strings.TrimSpace(stdout) != "Result=success" { - return fmt.Errorf("Result for fleet unit not reported as success:\nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err) - } - } } return err } @@ -654,6 +680,8 @@ func waitForNUnitsCmd(cl platform.Cluster, m platform.Member, cmd string, nu int _, err = waitForNUnitsLoad(cl, m, nu) break case "start": + fallthrough + case "restart": _, err = waitForNUnitsStart(cl, m, nu) break default: