Skip to content

Commit

Permalink
Merge pull request #65 from bcwaldon/async-jobs
Browse files Browse the repository at this point in the history
dbus: methods return chan, do not block
  • Loading branch information
bcwaldon committed Sep 25, 2014
2 parents 7a81740 + b5b3eb9 commit cf3cdf7
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 45 deletions.
4 changes: 2 additions & 2 deletions dbus/dbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Conn struct {
sigobj *dbus.Object

jobListener struct {
jobs map[dbus.ObjectPath]chan string
jobs map[dbus.ObjectPath]chan<- string
sync.Mutex
}
subscriber struct {
Expand Down Expand Up @@ -115,7 +115,7 @@ func newConnection(createBus func() (*dbus.Conn, error)) (*Conn, error) {
}

c.subscriber.ignore = make(map[dbus.ObjectPath]int64)
c.jobListener.jobs = make(map[dbus.ObjectPath]chan string)
c.jobListener.jobs = make(map[dbus.ObjectPath]chan<- string)

// Setup the listeners on jobs so that we can get completions
c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
Expand Down
74 changes: 42 additions & 32 deletions dbus/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package dbus

import (
"errors"
"path"
"strconv"

"github.com/godbus/dbus"
)
Expand All @@ -37,26 +39,26 @@ func (c *Conn) jobComplete(signal *dbus.Signal) {
c.jobListener.Unlock()
}

func (c *Conn) startJob(job string, args ...interface{}) (<-chan string, error) {
c.jobListener.Lock()
defer c.jobListener.Unlock()
func (c *Conn) startJob(ch chan<- string, job string, args ...interface{}) (int, error) {
if ch != nil {
c.jobListener.Lock()
defer c.jobListener.Unlock()
}

ch := make(chan string, 1)
var path dbus.ObjectPath
err := c.sysobj.Call(job, 0, args...).Store(&path)
var p dbus.ObjectPath
err := c.sysobj.Call(job, 0, args...).Store(&p)
if err != nil {
return nil, err
return 0, err
}
c.jobListener.jobs[path] = ch
return ch, nil
}

func (c *Conn) runJob(job string, args ...interface{}) (string, error) {
respCh, err := c.startJob(job, args...)
if err != nil {
return "", err
if ch != nil {
c.jobListener.jobs[p] = ch
}
return <-respCh, nil

// ignore error since 0 is fine if conversion fails
jobID, _ := strconv.Atoi(path.Base(string(p)))

return jobID, nil
}

// StartUnit enqueues a start job and depending jobs, if any (unless otherwise
Expand All @@ -74,59 +76,67 @@ func (c *Conn) runJob(job string, args ...interface{}) (string, error) {
// requirement dependencies. It is not recommended to make use of the latter
// two options.
//
// Result string: one of done, canceled, timeout, failed, dependency, skipped.
// If the provided channel is non-nil, a result string will be sent to it upon
// job completion: one of done, canceled, timeout, failed, dependency, skipped.
// done indicates successful execution of a job. canceled indicates that a job
// has been canceled before it finished execution. timeout indicates that the
// job timeout was reached. failed indicates that the job failed. dependency
// indicates that a job this job has been depending on failed and the job hence
// has been removed too. skipped indicates that a job was skipped because it
// didn't apply to the units current state.
func (c *Conn) StartUnit(name string, mode string) (string, error) {
return c.runJob("org.freedesktop.systemd1.Manager.StartUnit", name, mode)
//
// If no error occurs, the ID of the underlying systemd job will be returned. There
// does exist the possibility for no error to be returned, but for the returned job
// ID to be 0. In this case, the actual underlying ID is not 0 and this datapoint
// should not be considered authoritative.
//
// If an error does occur, it will be returned to the user alongside a job ID of 0.
func (c *Conn) StartUnit(name string, mode string, ch chan<- string) (int, error) {
return c.startJob(ch, "org.freedesktop.systemd1.Manager.StartUnit", name, mode)
}

// StopUnit is similar to StartUnit but stops the specified unit rather
// than starting it.
func (c *Conn) StopUnit(name string, mode string) (string, error) {
return c.runJob("org.freedesktop.systemd1.Manager.StopUnit", name, mode)
func (c *Conn) StopUnit(name string, mode string, ch chan<- string) (int, error) {
return c.startJob(ch, "org.freedesktop.systemd1.Manager.StopUnit", name, mode)
}

// ReloadUnit reloads a unit. Reloading is done only if the unit is already running and fails otherwise.
func (c *Conn) ReloadUnit(name string, mode string) (string, error) {
return c.runJob("org.freedesktop.systemd1.Manager.ReloadUnit", name, mode)
func (c *Conn) ReloadUnit(name string, mode string, ch chan<- string) (int, error) {
return c.startJob(ch, "org.freedesktop.systemd1.Manager.ReloadUnit", name, mode)
}

// RestartUnit restarts a service. If a service is restarted that isn't
// running it will be started.
func (c *Conn) RestartUnit(name string, mode string) (string, error) {
return c.runJob("org.freedesktop.systemd1.Manager.RestartUnit", name, mode)
func (c *Conn) RestartUnit(name string, mode string, ch chan<- string) (int, error) {
return c.startJob(ch, "org.freedesktop.systemd1.Manager.RestartUnit", name, mode)
}

// TryRestartUnit is like RestartUnit, except that a service that isn't running
// is not affected by the restart.
func (c *Conn) TryRestartUnit(name string, mode string) (string, error) {
return c.runJob("org.freedesktop.systemd1.Manager.TryRestartUnit", name, mode)
func (c *Conn) TryRestartUnit(name string, mode string, ch chan<- string) (int, error) {
return c.startJob(ch, "org.freedesktop.systemd1.Manager.TryRestartUnit", name, mode)
}

// ReloadOrRestart attempts a reload if the unit supports it and use a restart
// otherwise.
func (c *Conn) ReloadOrRestartUnit(name string, mode string) (string, error) {
return c.runJob("org.freedesktop.systemd1.Manager.ReloadOrRestartUnit", name, mode)
func (c *Conn) ReloadOrRestartUnit(name string, mode string, ch chan<- string) (int, error) {
return c.startJob(ch, "org.freedesktop.systemd1.Manager.ReloadOrRestartUnit", name, mode)
}

// ReloadOrTryRestart attempts a reload if the unit supports it and use a "Try"
// flavored restart otherwise.
func (c *Conn) ReloadOrTryRestartUnit(name string, mode string) (string, error) {
return c.runJob("org.freedesktop.systemd1.Manager.ReloadOrTryRestartUnit", name, mode)
func (c *Conn) ReloadOrTryRestartUnit(name string, mode string, ch chan<- string) (int, error) {
return c.startJob(ch, "org.freedesktop.systemd1.Manager.ReloadOrTryRestartUnit", name, mode)
}

// StartTransientUnit() may be used to create and start a transient unit, which
// will be released as soon as it is not running or referenced anymore or the
// system is rebooted. name is the unit name including suffix, and must be
// unique. mode is the same as in StartUnit(), properties contains properties
// of the unit.
func (c *Conn) StartTransientUnit(name string, mode string, properties ...Property) (string, error) {
return c.runJob("org.freedesktop.systemd1.Manager.StartTransientUnit", name, mode, properties, make([]PropertyCollection, 0))
func (c *Conn) StartTransientUnit(name string, mode string, properties []Property, ch chan<- string) (int, error) {
return c.startJob(ch, "org.freedesktop.systemd1.Manager.StartTransientUnit", name, mode, properties, make([]PropertyCollection, 0))
}

// KillUnit takes the unit name and a UNIX signal number to send. All of the unit's
Expand Down
29 changes: 22 additions & 7 deletions dbus/methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func findFixture(target string, t *testing.T) string {

func setupUnit(target string, conn *Conn, t *testing.T) {
// Blindly stop the unit in case it is running
conn.StopUnit(target, "replace")
conn.StopUnit(target, "replace", nil)

// Blindly remove the symlink in case it exists
targetRun := filepath.Join("/run/systemd/system/", target)
Expand Down Expand Up @@ -81,11 +81,13 @@ func TestStartStopUnit(t *testing.T) {
linkUnit(target, conn, t)

// 2. Start the unit
job, err := conn.StartUnit(target, "replace")
reschan := make(chan string)
_, err := conn.StartUnit(target, "replace", reschan)
if err != nil {
t.Fatal(err)
}

job := <-reschan
if job != "done" {
t.Fatal("Job is not done:", job)
}
Expand All @@ -108,11 +110,14 @@ func TestStartStopUnit(t *testing.T) {
}

// 3. Stop the unit
job, err = conn.StopUnit(target, "replace")
_, err = conn.StopUnit(target, "replace", reschan)
if err != nil {
t.Fatal(err)
}

// wait for StopUnit job to complete
<-reschan

units, err = conn.ListUnits()

unit = nil
Expand Down Expand Up @@ -260,11 +265,13 @@ func TestStartStopTransientUnit(t *testing.T) {
target := fmt.Sprintf("testing-transient-%d.service", rand.Int())

// Start the unit
job, err := conn.StartTransientUnit(target, "replace", props...)
reschan := make(chan string)
_, err := conn.StartTransientUnit(target, "replace", props, reschan)
if err != nil {
t.Fatal(err)
}

job := <-reschan
if job != "done" {
t.Fatal("Job is not done:", job)
}
Expand All @@ -287,11 +294,14 @@ func TestStartStopTransientUnit(t *testing.T) {
}

// 3. Stop the unit
job, err = conn.StopUnit(target, "replace")
_, err = conn.StopUnit(target, "replace", reschan)
if err != nil {
t.Fatal(err)
}

// wait for StopUnit job to complete
<-reschan

units, err = conn.ListUnits()

unit = nil
Expand All @@ -315,16 +325,21 @@ func TestConnJobListener(t *testing.T) {

jobSize := len(conn.jobListener.jobs)

_, err := conn.StartUnit(target, "replace")
reschan := make(chan string)
_, err := conn.StartUnit(target, "replace", reschan)
if err != nil {
t.Fatal(err)
}

_, err = conn.StopUnit(target, "replace")
<-reschan

_, err = conn.StopUnit(target, "replace", reschan)
if err != nil {
t.Fatal(err)
}

<-reschan

currentJobSize := len(conn.jobListener.jobs)
if jobSize != currentJobSize {
t.Fatal("JobListener jobs leaked")
Expand Down
4 changes: 3 additions & 1 deletion dbus/subscription_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ func TestSubscriptionSetUnit(t *testing.T) {
setupUnit(target, conn, t)
linkUnit(target, conn, t)

job, err := conn.StartUnit(target, "replace")
reschan := make(chan string)
_, err = conn.StartUnit(target, "replace", reschan)
if err != nil {
t.Fatal(err)
}

job := <-reschan
if job != "done" {
t.Fatal("Couldn't start", target)
}
Expand Down
6 changes: 3 additions & 3 deletions dbus/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ func TestSubscribeUnit(t *testing.T) {
setupUnit(target, conn, t)
linkUnit(target, conn, t)

job, err := conn.StartUnit(target, "replace")
reschan := make(chan string)
_, err = conn.StartUnit(target, "replace", reschan)
if err != nil {
t.Fatal(err)
}

job := <-reschan
if job != "done" {
t.Fatal("Couldn't start", target)
}
Expand Down Expand Up @@ -87,5 +89,3 @@ func TestSubscribeUnit(t *testing.T) {
success:
return
}


0 comments on commit cf3cdf7

Please sign in to comment.