diff --git a/compare_and_swap.go b/compare_and_swap.go new file mode 100644 index 0000000..e34e11e --- /dev/null +++ b/compare_and_swap.go @@ -0,0 +1,42 @@ +package main + +import ( + "flag" + "fmt" +) + +const CompareAndSwapUsage = `usage: etcdctl [etcd flags] compareAndSwap [testAndSet flags] +either prevvalue or previndex needs to be given +special flags: --ttl to set a key with ttl + --prevvalue to set the previous value + --previndex to set the previous index` + +var ( + compareAndSwapFlag = flag.NewFlagSet("testAndSet", flag.ExitOnError) + compareAndSwapTtl = compareAndSwapFlag.Uint64("ttl", 0, "ttl of the key") + compareAndSwapPvalue = compareAndSwapFlag.String("prevvalue", "", "previous value") + compareAndSwapPindex = compareAndSwapFlag.Uint64("previndex", 0, "previous index") +) + +func init() { + // The minimum number of arguments is 3 because + // there needs to be either pvalue or pindex + registerCommand("compareAndSwap", CompareAndSwapUsage, 3, 6, compareAndSwap) +} + +func compareAndSwap(args []string) error { + key := args[0] + value := args[1] + compareAndSwapFlag.Parse(args[2:]) + resp, err := client.CompareAndSwap(key, value, + *compareAndSwapTtl, *compareAndSwapPvalue, *compareAndSwapPindex) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { + return err + } + + output(resp) + return nil +} diff --git a/delete.go b/delete.go index 12ab285..025ca61 100644 --- a/delete.go +++ b/delete.go @@ -5,19 +5,39 @@ import ( ) const DeleteUsage = `usage: etcdctl [etcd flags] delete ` +const DeleteAllUsage = `usage: etcdctl [etcd flags] deleteAll ` func init() { - registerCommand("delete", DeleteUsage, 2, 2, delete) + registerCommand("delete", DeleteUsage, 1, 1, delete) + registerCommand("deleteAll", DeleteAllUsage, 1, 1, deleteAll) } func delete(args []string) error { - key := args[1] + key := args[0] resp, err := client.Delete(key) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { + return err + } + output(resp) + + return nil +} + +func deleteAll(args []string) error { + key := args[0] + + resp, err := client.DeleteAll(key) + if debug { + fmt.Println(<-curlChan) + } if err != nil { return err } - fmt.Println(resp.PrevValue) + output(resp) return nil } diff --git a/etcdctl.go b/etcdctl.go index 2ffd378..5021a65 100644 --- a/etcdctl.go +++ b/etcdctl.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "flag" "fmt" "github.com/coreos/go-etcd/etcd" @@ -8,16 +9,34 @@ import ( ) var ( - client *etcd.Client - + client *etcd.Client printVersion bool + outputFormat string + debug bool + cluster = ClusterValue{"http://localhost:4001"} + curlChan chan string ) +func output(resp *etcd.Response) { + switch outputFormat { + case "json": + b, err := json.Marshal(resp) + if err != nil { + panic(err) + } + fmt.Printf("%s\n", b) + case "full": + fmt.Printf("Index: %v\nValue: %v\n", resp.Index, resp.Value) + case "value-only": + fmt.Println(resp.Value) + } +} + func main() { flag.BoolVar(&printVersion, "version", false, "print the version and exit") - - cluster := ClusterValue{"http://localhost:4001"} flag.Var(&cluster, "C", "a comma seperated list of machine addresses in the cluster e.g. 127.0.0.1:4001,127.0.0.1:4002") + flag.StringVar(&outputFormat, "format", "json", "Output server response in the given format, either `json`, `full`, or `value-only`") + flag.BoolVar(&debug, "debug", false, "Output cURL commands which can be used to re-produce the request") flag.Parse() if printVersion { @@ -25,11 +44,22 @@ func main() { os.Exit(0) } + if debug { + // Making the channel buffered to avoid potential deadlocks + curlChan = make(chan string, 10) + etcd.SetCurlChan(curlChan) + } + client = etcd.NewClient(cluster.GetMachines()) args := flag.Args() if len(args) == 0 { + fmt.Println("Usage: etcdctl [flags] [command] [flags for command]\n") + fmt.Println("Available flags include:\n") + flag.PrintDefaults() + fmt.Println() + fmt.Println(`To see the usage for a specific command, run "etcdctl [command]"`) os.Exit(1) } @@ -42,8 +72,9 @@ func main() { os.Exit(MalformedEtcdctlArguments) } - if len(args) > command.maxArgs || len(args) < command.minArgs { - fmt.Println("wrong arguments provided") + commandArgs := args[1:] + + if len(commandArgs) > command.maxArgs || len(commandArgs) < command.minArgs { fmt.Println(command.usage) os.Exit(MalformedEtcdctlArguments) } @@ -53,9 +84,10 @@ func main() { os.Exit(FailedToConnectToHost) } - err := command.f(args) + err := command.f(commandArgs) if err != nil { + fmt.Print("Error: ") fmt.Println(err) os.Exit(ErrorFromEtcd) } diff --git a/get.go b/get.go index 99d347b..8dbaedd 100644 --- a/get.go +++ b/get.go @@ -1,25 +1,74 @@ package main import ( + "flag" "fmt" + "github.com/coreos/go-etcd/etcd" ) -const GetUsage = `usage: etcdctl [etcd flags] get ` +const GetUsage = `usage: etcdctl [etcd flags] get [get flags] +special flags: --sort to return the result in sorted order + --consistent to send request to the leader, thereby guranteeing that any earlier writes will be seen by the read` + +const GetAllUsage = `usage: etcdctl [etcd flags] getAll [getAll flags] +special flags: --sort set to true to return the result in sorted order + --consistent to send request to the leader, thereby guranteeing that any earlier writes will be seen by the read` + +var ( + getFlag = flag.NewFlagSet("get", flag.ExitOnError) + sorted = getFlag.Bool("sort", false, + "Return the results in sorted order or not (default to false)") + consistent = getFlag.Bool("consistent", false, + "Send the request to the leader or not (default to false)") +) func init() { - registerCommand("get", GetUsage, 2, 2, get) + registerCommand("get", GetUsage, 1, 2, get) + registerCommand("getAll", GetAllUsage, 1, 2, getAll) } func get(args []string) error { - key := args[1] - resps, err := client.Get(key) + if *consistent { + client.SetConsistency(etcd.STRONG_CONSISTENCY) + } else { + client.SetConsistency(etcd.WEAK_CONSISTENCY) + } + + key := args[0] + getFlag.Parse(args[1:]) + resp, err := client.Get(key, *sorted) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { return err } - for _, resp := range resps { - if resp.Value != "" { - fmt.Println(resp.Value) - } + + output(resp) + + return nil +} + +func getAll(args []string) error { + if *consistent { + client.SetConsistency(etcd.STRONG_CONSISTENCY) + } else { + client.SetConsistency(etcd.WEAK_CONSISTENCY) + } + + key := args[0] + getFlag.Parse(args[1:]) + resp, err := client.GetAll(key, *sorted) + if debug { + fmt.Println(<-curlChan) + } + + if err != nil { + return err } + + output(resp) + return nil } diff --git a/set.go b/set.go deleted file mode 100644 index edad0c6..0000000 --- a/set.go +++ /dev/null @@ -1,31 +0,0 @@ -package main - -import ( - "flag" - "fmt" -) - -const SetUsage = `usage: etcdctl [etcd flags] set [set flags] -special flags: --ttl to set a key with ttl` - -var ( - setFlag = flag.NewFlagSet("set", flag.ExitOnError) - ttl = setFlag.Int64("ttl", 0, "ttl of the key") -) - -func init() { - registerCommand("set", SetUsage, 3, 5, set) -} - -func set(args []string) error { - key := args[1] - value := args[2] - setFlag.Parse(args[3:]) - resp, err := client.Set(key, value, uint64(*ttl)) - if err != nil { - return err - } - fmt.Println(resp.Value) - - return nil -} diff --git a/set_update_create.go b/set_update_create.go new file mode 100644 index 0000000..7259da9 --- /dev/null +++ b/set_update_create.go @@ -0,0 +1,131 @@ +package main + +import ( + "flag" + "fmt" +) + +const SetUsage = `usage: etcdctl [etcd flags] set [set flags] +special flags: --ttl to set a key with ttl` + +const CreateUsage = `usage: etcdctl [etcd flags] create [create flags] +special flags: --ttl to create a key with ttl` + +const UpdateUsage = `usage: etcdctl [etcd flags] update [udpate flags] +special flags: --ttl to update a key with ttl` + +const SetDirUsage = `usage: etcdctl [etcd flags] setDir [setDir flags] +special flags: --ttl to set a directory with ttl` + +const CreateDirUsage = `usage: etcdctl [etcd flags] createDir [createDir flags] +special flags: --ttl to create a directory with ttl` + +const UpdateDirUsage = `usage: etcdctl [etcd flags] updateDir [udpateDir flags] +special flags: --ttl to update a directory with ttl` + +var ( + setFlag = flag.NewFlagSet("set", flag.ExitOnError) + ttl = setFlag.Int64("ttl", 0, "ttl of the key") +) + +func init() { + registerCommand("set", SetUsage, 2, 3, set) + registerCommand("create", CreateUsage, 2, 3, create) + registerCommand("update", UpdateUsage, 2, 3, update) + registerCommand("setDir", SetDirUsage, 1, 2, setDir) + registerCommand("createDir", CreateDirUsage, 1, 2, createDir) + registerCommand("updateDir", UpdateDirUsage, 1, 2, updateDir) +} + +func set(args []string) error { + key := args[0] + value := args[1] + setFlag.Parse(args[2:]) + resp, err := client.Set(key, value, uint64(*ttl)) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { + return err + } + output(resp) + + return nil +} + +func create(args []string) error { + key := args[0] + value := args[1] + setFlag.Parse(args[2:]) + resp, err := client.Create(key, value, uint64(*ttl)) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { + return err + } + output(resp) + + return nil +} + +func update(args []string) error { + key := args[0] + value := args[1] + setFlag.Parse(args[2:]) + resp, err := client.Update(key, value, uint64(*ttl)) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { + return err + } + output(resp) + + return nil +} + +func setDir(args []string) error { + key := args[0] + setFlag.Parse(args[1:]) + resp, err := client.SetDir(key, uint64(*ttl)) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { + return err + } + output(resp) + + return nil +} + +func createDir(args []string) error { + key := args[0] + setFlag.Parse(args[1:]) + resp, err := client.CreateDir(key, uint64(*ttl)) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { + return err + } + output(resp) + + return nil +} + +func updateDir(args []string) error { + key := args[0] + setFlag.Parse(args[1:]) + resp, err := client.UpdateDir(key, uint64(*ttl)) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { + return err + } + output(resp) + + return nil +} diff --git a/sets.go b/sets.go deleted file mode 100644 index 4561def..0000000 --- a/sets.go +++ /dev/null @@ -1,145 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "crypto/md5" - "io" - "os" - "github.com/coreos/go-etcd/etcd" -) - -const sUsage = `usage: etcdctl [etcd flags] - -Commands: - - sadd [flags] - --ttl to set add a value with a ttl to the set - sdel - smembers - sismember - -` - -var ( - saddFlag = flag.NewFlagSet("sadd", flag.ExitOnError) - saddTtl = saddFlag.Int64("ttl", 0, "ttl of the key") -) - -func hash(str string) string { - h := md5.New() - io.WriteString(h, str) - return fmt.Sprintf("%x", h.Sum(nil)) -} - -func getHeadKey(key string) string { - return fmt.Sprintf("%s/set-%s", key, hash(key)) -} - -func setExists(key string) bool { - headKey := getHeadKey(key) - _, err := client.Get(headKey) - return err == nil -} - -func init() { - registerCommand("sadd", sUsage, 3, 4, sadd) - registerCommand("sdel", sUsage, 3, 3, sdel) - registerCommand("sismember", sUsage, 3, 3, sismember) - registerCommand("smembers", sUsage, 2, 2, smembers) -} - -func sadd(args []string) error { - - setKey := args[1] - value := args[2] - saddFlag.Parse(args[3:]) - - // Create the set unless it exists - if ! setExists(setKey) { - headKey := getHeadKey(setKey) - _, err := client.Set(headKey, "1", 0) - if err != nil { - return err - } - } - - key := fmt.Sprintf("%s/%s", setKey, hash(value)) - _, err := client.Set(key, value, uint64(*saddTtl)) - if err != nil { - return err - } - - fmt.Println(value) - - return nil -} - -func sdel(args []string) error { - - setKey := args[1] - - if ! setExists(setKey) { - return fmt.Errorf("%s is not a set", setKey) - } - - value := args[2] - key := fmt.Sprintf("%s/%s", setKey, hash(value)) - _, err := client.Delete(key) - if err != nil { - err := err.(etcd.EtcdError) - if err.ErrorCode == 100 { - return etcd.EtcdError{ - ErrorCode: 100, - Message: "Not In Set", - Cause: setKey, - } - } - return err - } - - return nil -} - -func smembers(args []string) error { - setKey := args[1] - - if ! setExists(setKey) { - return fmt.Errorf("%s is not a set", setKey) - } - - resps, err := client.Get(setKey) - if err != nil { - return err - } - - headKey := getHeadKey(setKey) - for _, resp := range resps { - if resp.Key != headKey { - fmt.Printf("%s\n", resp.Value) - } - } - - return nil -} - -func sismember(args []string) error { - setKey := args[1] - value := args[2] - - if ! setExists(setKey) { - return fmt.Errorf("%s is not a set", setKey) - } - - key := fmt.Sprintf("%s/%s", setKey, hash(value)) - _, err := client.Get(key) - if err != nil { - fmt.Println("false") - os.Exit(1) - } else { - fmt.Println("true") - os.Exit(0) - } - - return nil -} diff --git a/test_and_set.go b/test_and_set.go deleted file mode 100644 index 601ff9c..0000000 --- a/test_and_set.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -import ( - "errors" - "flag" - "fmt" -) - -const TestAndSetUsage = `usage: etcdctl [etcd flags] testAndSet [testAndSet flags] -special flags: --ttl to set a key with ttl` - -var ( - testAndSetFlag = flag.NewFlagSet("testAndSet", flag.ExitOnError) - testAndSetTtl = testAndSetFlag.Int64("ttl", 0, "ttl of the key") -) - -func init() { - registerCommand("testAndSet", TestAndSetUsage, 4, 6, testAndSet) -} - -func testAndSet(args []string) error { - key := args[1] - prevValue := args[2] - value := args[3] - testAndSetFlag.Parse(args[4:]) - resp, success, err := client.TestAndSet(key, prevValue, value, uint64(*testAndSetTtl)) - - if err != nil { - return err - } - - if success { - fmt.Println(resp.Value) - return nil - } - - return errors.New("TestAndSet: prevValue does not match the current value of the given key") -} diff --git a/third_party/bitbucket.org/kardianos/osext/osext_sysctl.go b/third_party/bitbucket.org/kardianos/osext/osext_sysctl.go index d764646..e4d228e 100644 --- a/third_party/bitbucket.org/kardianos/osext/osext_sysctl.go +++ b/third_party/bitbucket.org/kardianos/osext/osext_sysctl.go @@ -8,6 +8,7 @@ package osext import ( "os" + "path/filepath" "runtime" "syscall" "unsafe" @@ -47,18 +48,35 @@ func executable() (string, error) { break } } + var strpath string if buf[0] != '/' { - if getwdError != nil { - return string(buf), getwdError - } else { - if buf[0] == '.' { - buf = buf[1:] - } - if startUpcwd[len(startUpcwd)-1] != '/' { - return startUpcwd + "/" + string(buf), nil - } - return startUpcwd + string(buf), nil + var e error + if strpath, e = getAbs(buf); e != nil { + return strpath, e } + } else { + strpath = string(buf) + } + // darwin KERN_PROCARGS may return the path to a symlink rather than the + // actual executable + if runtime.GOOS == "darwin" { + if strpath, err := filepath.EvalSymlinks(strpath); err != nil { + return strpath, err + } + } + return strpath, nil +} + +func getAbs(buf []byte) (string, error) { + if getwdError != nil { + return string(buf), getwdError + } else { + if buf[0] == '.' { + buf = buf[1:] + } + if startUpcwd[len(startUpcwd)-1] != '/' && buf[0] != '/' { + return startUpcwd + "/" + string(buf), nil + } + return startUpcwd + string(buf), nil } - return string(buf), nil } diff --git a/third_party/github.com/coreos/go-etcd/etcd/add_child.go b/third_party/github.com/coreos/go-etcd/etcd/add_child.go new file mode 100644 index 0000000..f275599 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/add_child.go @@ -0,0 +1,11 @@ +package etcd + +// Add a new directory with a random etcd-generated key under the given path. +func (c *Client) AddChildDir(key string, ttl uint64) (*Response, error) { + return c.post(key, "", ttl) +} + +// Add a new file with a random etcd-generated key under the given path. +func (c *Client) AddChild(key string, value string, ttl uint64) (*Response, error) { + return c.post(key, value, ttl) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go b/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go new file mode 100644 index 0000000..efe1554 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go @@ -0,0 +1,73 @@ +package etcd + +import "testing" + +func TestAddChild(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + c.DeleteAll("nonexistentDir") + }() + + c.SetDir("fooDir", 5) + + _, err := c.AddChild("fooDir", "v0", 5) + if err != nil { + t.Fatal(err) + } + + _, err = c.AddChild("fooDir", "v1", 5) + if err != nil { + t.Fatal(err) + } + + resp, err := c.Get("fooDir", true) + // The child with v0 should proceed the child with v1 because it's added + // earlier, so it should have a lower key. + if !(len(resp.Kvs) == 2 && (resp.Kvs[0].Value == "v0" && resp.Kvs[1].Value == "v1")) { + t.Fatalf("AddChild 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ + " The response was: %#v", resp) + } + + // Creating a child under a nonexistent directory should succeed. + // The directory should be created. + resp, err = c.AddChild("nonexistentDir", "foo", 5) + if err != nil { + t.Fatal(err) + } +} + +func TestAddChildDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + c.DeleteAll("nonexistentDir") + }() + + c.SetDir("fooDir", 5) + + _, err := c.AddChildDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + _, err = c.AddChildDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + resp, err := c.Get("fooDir", true) + // The child with v0 should proceed the child with v1 because it's added + // earlier, so it should have a lower key. + if !(len(resp.Kvs) == 2 && (len(resp.Kvs[0].KVPairs) == 0 && len(resp.Kvs[1].KVPairs) == 0)) { + t.Fatalf("AddChildDir 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ + " The response was: %#v", resp) + } + + // Creating a child under a nonexistent directory should succeed. + // The directory should be created. + resp, err = c.AddChildDir("nonexistentDir", 5) + if err != nil { + t.Fatal(err) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index 31d3c2a..63ce6ab 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -2,12 +2,16 @@ package etcd import ( "crypto/tls" + "encoding/json" "errors" + "io" "io/ioutil" "net" "net/http" "net/url" + "os" "path" + "reflect" "strings" "time" ) @@ -17,25 +21,43 @@ const ( HTTPS ) +// See SetConsistency for how to use these constants. +const ( + // Using strings rather than iota because the consistency level + // could be persisted to disk, so it'd be better to use + // human-readable values. + STRONG_CONSISTENCY = "STRONG" + WEAK_CONSISTENCY = "WEAK" +) + type Cluster struct { - Leader string - Machines []string + Leader string `json:"leader"` + Machines []string `json:"machines"` } type Config struct { - CertFile string - KeyFile string - Scheme string - Timeout time.Duration + CertFile string `json:"certFile"` + KeyFile string `json:"keyFile"` + Scheme string `json:"scheme"` + Timeout time.Duration `json:"timeout"` + Consistency string `json: "consistency"` } type Client struct { - cluster Cluster - config Config - httpClient *http.Client + cluster Cluster `json:"cluster"` + config Config `json:"config"` + httpClient *http.Client + persistence io.Writer } -// Setup a basic conf and cluster +type options map[string]interface{} + +// An internally-used data structure that represents a mapping +// between valid options and their kinds +type validOptions map[string]reflect.Kind + +// NewClient create a basic client that is configured to be used +// with the given machine list. func NewClient(machines []string) *Client { // if an empty slice was sent in then just assume localhost if len(machines) == 0 { @@ -53,30 +75,168 @@ func NewClient(machines []string) *Client { Scheme: "http", // default timeout is one second Timeout: time.Second, + // default consistency level is STRONG + Consistency: STRONG_CONSISTENCY, + } + + client := &Client{ + cluster: cluster, + config: config, + } + + err := setupHttpClient(client) + if err != nil { + panic(err) + } + + return client +} + +// NewClientFile creates a client from a given file path. +// The given file is expected to use the JSON format. +func NewClientFile(fpath string) (*Client, error) { + fi, err := os.Open(fpath) + if err != nil { + return nil, err + } + defer func() { + if err := fi.Close(); err != nil { + panic(err) + } + }() + + return NewClientReader(fi) +} + +// NewClientReader creates a Client configured from a given reader. +// The config is expected to use the JSON format. +func NewClientReader(reader io.Reader) (*Client, error) { + var client Client + + b, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + + err = json.Unmarshal(b, &client) + if err != nil { + return nil, err } - tr := &http.Transport{ - Dial: dialTimeout, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, + err = setupHttpClient(&client) + if err != nil { + return nil, err + } + + return &client, nil +} + +func setupHttpClient(client *Client) error { + if client.config.CertFile != "" && client.config.KeyFile != "" { + err := client.SetCertAndKey(client.config.CertFile, client.config.KeyFile) + if err != nil { + return err + } + } else { + client.config.CertFile = "" + client.config.KeyFile = "" + tr := &http.Transport{ + Dial: dialTimeout, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + client.httpClient = &http.Client{Transport: tr} + } + + return nil +} + +// SetPersistence sets a writer to which the config will be +// written every time it's changed. +func (c *Client) SetPersistence(writer io.Writer) { + c.persistence = writer +} + +// SetConsistency changes the consistency level of the client. +// +// When consistency is set to STRONG_CONSISTENCY, all requests, +// including GET, are sent to the leader. This means that, assuming +// the absence of leader failures, GET requests are guranteed to see +// the changes made by previous requests. +// +// When consistency is set to WEAK_CONSISTENCY, other requests +// are still sent to the leader, but GET requests are sent to a +// random server from the server pool. This reduces the read +// load on the leader, but it's not guranteed that the GET requests +// will see changes made by previous requests (they might have not +// yet been commited on non-leader servers). +func (c *Client) SetConsistency(consistency string) error { + if !(consistency == STRONG_CONSISTENCY || consistency == WEAK_CONSISTENCY) { + return errors.New("The argument must be either STRONG_CONSISTENCY or WEAK_CONSISTENCY.") + } + c.config.Consistency = consistency + return nil +} + +// MarshalJSON implements the Marshaller interface +// as defined by the standard JSON package. +func (c *Client) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(struct { + Config Config `json:"config"` + Cluster Cluster `json:"cluster"` + }{ + Config: c.config, + Cluster: c.cluster, + }) + + if err != nil { + return nil, err } - return &Client{ - cluster: cluster, - config: config, - httpClient: &http.Client{Transport: tr}, + return b, nil +} + +// UnmarshalJSON implements the Unmarshaller interface +// as defined by the standard JSON package. +func (c *Client) UnmarshalJSON(b []byte) error { + temp := struct { + Config Config `json: "config"` + Cluster Cluster `json: "cluster"` + }{} + err := json.Unmarshal(b, &temp) + if err != nil { + return err } + c.cluster = temp.Cluster + c.config = temp.Config + return nil } -func (c *Client) SetCertAndKey(cert string, key string) (bool, error) { +// saveConfig saves the current config using c.persistence. +func (c *Client) saveConfig() error { + if c.persistence != nil { + b, err := json.Marshal(c) + if err != nil { + return err + } + + _, err = c.persistence.Write(b) + if err != nil { + return err + } + } + return nil +} + +func (c *Client) SetCertAndKey(cert string, key string) error { if cert != "" && key != "" { tlsCert, err := tls.LoadX509KeyPair(cert, key) if err != nil { - return false, err + return err } tr := &http.Transport{ @@ -88,24 +248,27 @@ func (c *Client) SetCertAndKey(cert string, key string) (bool, error) { } c.httpClient = &http.Client{Transport: tr} - return true, nil + c.saveConfig() + return nil } - return false, errors.New("Require both cert and key path") + return errors.New("Require both cert and key path") } -func (c *Client) SetScheme(scheme int) (bool, error) { +func (c *Client) SetScheme(scheme int) error { if scheme == HTTP { c.config.Scheme = "http" - return true, nil + c.saveConfig() + return nil } if scheme == HTTPS { c.config.Scheme = "https" - return true, nil + c.saveConfig() + return nil } - return false, errors.New("Unknown Scheme") + return errors.New("Unknown Scheme") } -// Try to sync from the given machine +// SetCluster updates config using the given machine list. func (c *Client) SetCluster(machines []string) bool { success := c.internalSyncCluster(machines) return success @@ -115,13 +278,13 @@ func (c *Client) GetCluster() []string { return c.cluster.Machines } -// sycn cluster information using the existing machine list +// SyncCluster updates config using the internal machine list. func (c *Client) SyncCluster() bool { success := c.internalSyncCluster(c.cluster.Machines) return success } -// sync cluster information by providing machine list +// internalSyncCluster syncs cluster information using the given machine list. func (c *Client) internalSyncCluster(machines []string) bool { for _, machine := range machines { httpPath := c.createHttpPath(machine, version+"/machines") @@ -146,16 +309,19 @@ func (c *Client) internalSyncCluster(machines []string) bool { c.cluster.Leader = c.cluster.Machines[0] logger.Debug("sync.machines ", c.cluster.Machines) + c.saveConfig() return true } } return false } -// serverName should contain both hostName and port +// createHttpPath creates a complete HTTP URL. +// serverName should contain both the host name and a port number, if any. func (c *Client) createHttpPath(serverName string, _path string) string { u, _ := url.Parse(serverName) u.Path = path.Join(u.Path, "/", _path) + if u.Scheme == "" { u.Scheme = "http" } @@ -167,18 +333,6 @@ func dialTimeout(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, time.Second) } -func (c *Client) getHttpPath(s ...string) string { - u, _ := url.Parse(c.cluster.Leader) - - u.Path = path.Join(u.Path, "/", version) - - for _, seg := range s { - u.Path = path.Join(u.Path, seg) - } - - return u.String() -} - func (c *Client) updateLeader(httpPath string) { u, _ := url.Parse(httpPath) @@ -191,77 +345,5 @@ func (c *Client) updateLeader(httpPath string) { logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader) c.cluster.Leader = leader -} - -// Wrap GET, POST and internal error handling -func (c *Client) sendRequest(method string, _path string, body string) (*http.Response, error) { - - var resp *http.Response - var err error - var req *http.Request - - retry := 0 - // if we connect to a follower, we will retry until we found a leader - for { - - httpPath := c.getHttpPath(_path) - - logger.Debug("send.request.to ", httpPath) - if body == "" { - - req, _ = http.NewRequest(method, httpPath, nil) - - } else { - req, _ = http.NewRequest(method, httpPath, strings.NewReader(body)) - req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") - } - - resp, err = c.httpClient.Do(req) - - logger.Debug("recv.response.from ", httpPath) - // network error, change a machine! - if err != nil { - retry++ - if retry > 2*len(c.cluster.Machines) { - return nil, errors.New("Cannot reach servers") - } - num := retry % len(c.cluster.Machines) - logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]") - c.cluster.Leader = c.cluster.Machines[num] - time.Sleep(time.Millisecond * 200) - continue - } - - if resp != nil { - if resp.StatusCode == http.StatusTemporaryRedirect { - httpPath := resp.Header.Get("Location") - - resp.Body.Close() - - if httpPath == "" { - return nil, errors.New("Cannot get redirection location") - } - - c.updateLeader(httpPath) - logger.Debug("send.redirect") - // try to connect the leader - continue - } else if resp.StatusCode == http.StatusInternalServerError { - resp.Body.Close() - - retry++ - if retry > 2*len(c.cluster.Machines) { - return nil, errors.New("Cannot reach servers") - } - continue - } else { - logger.Debug("send.return.response ", httpPath) - break - } - - } - logger.Debug("error.from ", httpPath, " ", err.Error()) - return nil, err - } - return resp, nil + c.saveConfig() } diff --git a/third_party/github.com/coreos/go-etcd/etcd/client_test.go b/third_party/github.com/coreos/go-etcd/etcd/client_test.go index 29f1381..b25611b 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client_test.go @@ -1,10 +1,12 @@ package etcd import ( + "encoding/json" "fmt" - "testing" - "net/url" "net" + "net/url" + "os" + "testing" ) // To pass this test, we need to create a cluster of 3 machines @@ -19,7 +21,7 @@ func TestSync(t *testing.T) { t.Fatal("cannot sync machines") } - for _, m := range(c.GetCluster()) { + for _, m := range c.GetCluster() { u, err := url.Parse(m) if err != nil { t.Fatal(err) @@ -27,7 +29,7 @@ func TestSync(t *testing.T) { if u.Scheme != "http" { t.Fatal("scheme must be http") } - + host, _, err := net.SplitHostPort(u.Host) if err != nil { t.Fatal(err) @@ -56,3 +58,37 @@ func TestSync(t *testing.T) { } } + +func TestPersistence(t *testing.T) { + c := NewClient(nil) + c.SyncCluster() + + fo, err := os.Create("config.json") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := fo.Close(); err != nil { + panic(err) + } + }() + + c.SetPersistence(fo) + err = c.saveConfig() + if err != nil { + t.Fatal(err) + } + + c2, err := NewClientFile("config.json") + if err != nil { + t.Fatal(err) + } + + // Verify that the two clients have the same config + b1, _ := json.Marshal(c) + b2, _ := json.Marshal(c2) + + if string(b1) != string(b2) { + t.Fatalf("The two configs should be equal!") + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go new file mode 100644 index 0000000..565a03e --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go @@ -0,0 +1,18 @@ +package etcd + +import "fmt" + +func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*Response, error) { + if prevValue == "" && prevIndex == 0 { + return nil, fmt.Errorf("You must give either prevValue or prevIndex.") + } + + options := options{} + if prevValue != "" { + options["prevValue"] = prevValue + } + if prevIndex != 0 { + options["prevIndex"] = prevIndex + } + return c.put(key, value, ttl, options) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go new file mode 100644 index 0000000..647aadf --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go @@ -0,0 +1,51 @@ +package etcd + +import ( + "testing" +) + +func TestCompareAndSwap(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + c.Set("foo", "bar", 5) + + // This should succeed + resp, err := c.CompareAndSwap("foo", "bar2", 5, "bar", 0) + if err != nil { + t.Fatal(err) + } + if !(resp.Value == "bar2" && resp.PrevValue == "bar" && + resp.Key == "/foo" && resp.TTL == 5) { + t.Fatalf("CompareAndSwap 1 failed: %#v", resp) + } + + // This should fail because it gives an incorrect prevValue + resp, err = c.CompareAndSwap("foo", "bar3", 5, "xxx", 0) + if err == nil { + t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp) + } + + resp, err = c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + // This should succeed + resp, err = c.CompareAndSwap("foo", "bar2", 5, "", resp.Index) + if err != nil { + t.Fatal(err) + } + if !(resp.Value == "bar2" && resp.PrevValue == "bar" && + resp.Key == "/foo" && resp.TTL == 5) { + t.Fatalf("CompareAndSwap 1 failed: %#v", resp) + } + + // This should fail because it gives an incorrect prevIndex + resp, err = c.CompareAndSwap("foo", "bar3", 5, "", 29817514) + if err == nil { + t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/config.json b/third_party/github.com/coreos/go-etcd/etcd/config.json new file mode 100644 index 0000000..9251142 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/config.json @@ -0,0 +1 @@ +{"config":{"certFile":"","keyFile":"","scheme":"http","timeout":1000000000,"Consistency":"STRONG"},"cluster":{"leader":"http://127.0.0.1:4001","machines":["http://127.0.0.1:4001","http://127.0.0.1:4002"]}} \ No newline at end of file diff --git a/third_party/github.com/coreos/go-etcd/etcd/debug.go b/third_party/github.com/coreos/go-etcd/etcd/debug.go index d82d9b2..bd67398 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/debug.go +++ b/third_party/github.com/coreos/go-etcd/etcd/debug.go @@ -9,6 +9,8 @@ var logger *log.Logger func init() { setLogger(log.PriErr) + // Uncomment the following line if you want to see lots of logs + // OpenDebug() } func OpenDebug() { diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete.go b/third_party/github.com/coreos/go-etcd/etcd/delete.go index fea1695..00348f6 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete.go @@ -1,41 +1,17 @@ package etcd -import ( - "encoding/json" - "github.com/coreos/etcd/store" - "io/ioutil" - "net/http" - "path" -) - -func (c *Client) Delete(key string) (*store.Response, error) { - - resp, err := c.sendRequest("DELETE", path.Join("keys", key), "") - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, handleError(b) - } - - var result store.Response - - err = json.Unmarshal(b, &result) - - if err != nil { - return nil, err - } - - return &result, nil +// DeleteAll deletes everything under the given key. If the key +// points to a file, the file will be deleted. If the key points +// to a directory, then everything under the directory, include +// all child directories, will be deleted. +func (c *Client) DeleteAll(key string) (*Response, error) { + return c.delete(key, options{ + "recursive": true, + }) +} +// Delete deletes the given key. If the key points to a +// directory, the method will fail. +func (c *Client) Delete(key string) (*Response, error) { + return c.delete(key, nil) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go index 52756d0..0f8475a 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go @@ -5,18 +5,60 @@ import ( ) func TestDelete(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + c.Set("foo", "bar", 5) + resp, err := c.Delete("foo") + if err != nil { + t.Fatal(err) + } + if !(resp.PrevValue == "bar" && resp.Value == "") { + t.Fatalf("Delete failed with %s %s", resp.PrevValue, + resp.Value) + } + + resp, err = c.Delete("foo") + if err == nil { + t.Fatalf("Delete should have failed because the key foo did not exist. "+ + "The response was: %v", resp) + } +} + +func TestDeleteAll(t *testing.T) { c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + c.DeleteAll("fooDir") + }() + + c.Set("foo", "bar", 5) + resp, err := c.DeleteAll("foo") + if err != nil { + t.Fatal(err) + } + + if !(resp.PrevValue == "bar" && resp.Value == "") { + t.Fatalf("DeleteAll 1 failed: %#v", resp) + } - c.Set("foo", "bar", 100) - result, err := c.Delete("foo") + c.SetDir("fooDir", 5) + c.Set("fooDir/foo", "bar", 5) + resp, err = c.DeleteAll("fooDir") if err != nil { t.Fatal(err) } - if result.PrevValue != "bar" || result.Value != "" { - t.Fatalf("Delete failed with %s %s", result.PrevValue, - result.Value) + if !(resp.PrevValue == "" && resp.Value == "") { + t.Fatalf("DeleteAll 2 failed: %#v", resp) } + resp, err = c.DeleteAll("foo") + if err == nil { + t.Fatalf("DeleteAll should have failed because the key foo did not exist. "+ + "The response was: %v", resp) + } } diff --git a/third_party/github.com/coreos/go-etcd/etcd/get.go b/third_party/github.com/coreos/go-etcd/etcd/get.go index b0d16fe..d42a83c 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get.go @@ -1,83 +1,23 @@ package etcd -import ( - "encoding/json" - "github.com/coreos/etcd/store" - "io/ioutil" - "net/http" - "path" -) - -func (c *Client) Get(key string) ([]*store.Response, error) { - logger.Debugf("get %s [%s]", key, c.cluster.Leader) - resp, err := c.sendRequest("GET", path.Join("keys", key), "") - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - - return nil, handleError(b) - } - - return convertGetResponse(b) - +// GetDir gets the all contents under the given key. +// If the key points to a file, the file is returned. +// If the key points to a directory, everything under it is returnd, +// including all contents under all child directories. +func (c *Client) GetAll(key string, sort bool) (*Response, error) { + return c.get(key, options{ + "recursive": true, + "sorted": sort, + }) } -// GetTo gets the value of the key from a given machine address. -// If the given machine is not available it returns an error. -// Mainly use for testing purpose -func (c *Client) GetFrom(key string, addr string) ([]*store.Response, error) { - httpPath := c.createHttpPath(addr, path.Join(version, "keys", key)) - - resp, err := c.httpClient.Get(httpPath) - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, handleError(b) - } - - return convertGetResponse(b) -} - -// Convert byte stream to response. -func convertGetResponse(b []byte) ([]*store.Response, error) { - - var results []*store.Response - var result *store.Response - - err := json.Unmarshal(b, &result) - - if err != nil { - err = json.Unmarshal(b, &results) - - if err != nil { - return nil, err - } - - } else { - results = make([]*store.Response, 1) - results[0] = result - } - return results, nil +// Get gets the file or directory associated with the given key. +// If the key points to a directory, files and directories under +// it will be returned in sorted or unsorted order, depending on +// the sort flag. Note that contents under child directories +// will not be returned. To get those contents, use GetAll. +func (c *Client) Get(key string, sort bool) (*Response, error) { + return c.get(key, options{ + "sorted": sort, + }) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/get_test.go b/third_party/github.com/coreos/go-etcd/etcd/get_test.go index ff81374..a34946c 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get_test.go @@ -1,46 +1,99 @@ package etcd import ( + "reflect" "testing" - "time" ) func TestGet(t *testing.T) { - c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() - c.Set("foo", "bar", 100) - - // wait for commit - time.Sleep(100 * time.Millisecond) + c.Set("foo", "bar", 5) - results, err := c.Get("foo") + result, err := c.Get("foo", false) - if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" { - if err != nil { - t.Fatal(err) - } - t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL) + if err != nil { + t.Fatal(err) } - results, err = c.Get("goo") + if result.Key != "/foo" || result.Value != "bar" { + t.Fatalf("Get failed with %s %s %v", result.Key, result.Value, result.TTL) + } + result, err = c.Get("goo", false) if err == nil { t.Fatalf("should not be able to get non-exist key") } +} - results, err = c.GetFrom("foo", "0.0.0.0:4001") +func TestGetAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + }() - if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" { - if err != nil { - t.Fatal(err) - } - t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL) + c.SetDir("fooDir", 5) + c.Set("fooDir/k0", "v0", 5) + c.Set("fooDir/k1", "v1", 5) + + // Return kv-pairs in sorted order + result, err := c.Get("fooDir", true) + + if err != nil { + t.Fatal(err) } - results, err = c.GetFrom("foo", "0.0.0.0:4009") + expected := kvPairs{ + KeyValuePair{ + Key: "/fooDir/k0", + Value: "v0", + }, + KeyValuePair{ + Key: "/fooDir/k1", + Value: "v1", + }, + } - if err == nil { - t.Fatal("should not get from port 4009") + if !reflect.DeepEqual(result.Kvs, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Kvs, expected) + } + + // Test the `recursive` option + c.SetDir("fooDir/childDir", 5) + c.Set("fooDir/childDir/k2", "v2", 5) + + // Return kv-pairs in sorted order + result, err = c.GetAll("fooDir", true) + + if err != nil { + t.Fatal(err) + } + + expected = kvPairs{ + KeyValuePair{ + Key: "/fooDir/childDir", + Dir: true, + KVPairs: kvPairs{ + KeyValuePair{ + Key: "/fooDir/childDir/k2", + Value: "v2", + }, + }, + }, + KeyValuePair{ + Key: "/fooDir/k0", + Value: "v0", + }, + KeyValuePair{ + Key: "/fooDir/k1", + Value: "v1", + }, + } + + if !reflect.DeepEqual(result.Kvs, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Kvs) } } diff --git a/third_party/github.com/coreos/go-etcd/etcd/list_test.go b/third_party/github.com/coreos/go-etcd/etcd/list_test.go deleted file mode 100644 index 382bb35..0000000 --- a/third_party/github.com/coreos/go-etcd/etcd/list_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package etcd - -import ( - "testing" - "time" -) - -func TestList(t *testing.T) { - c := NewClient(nil) - - c.Set("foo_list/foo", "bar", 100) - c.Set("foo_list/fooo", "barbar", 100) - c.Set("foo_list/foooo/foo", "barbarbar", 100) - // wait for commit - time.Sleep(time.Second) - - _, err := c.Get("foo_list") - - if err != nil { - t.Fatal(err) - } - -} diff --git a/third_party/github.com/coreos/go-etcd/etcd/requests.go b/third_party/github.com/coreos/go-etcd/etcd/requests.go new file mode 100644 index 0000000..83e3b51 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/requests.go @@ -0,0 +1,290 @@ +package etcd + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "path" + "reflect" + "strings" + "time" +) + +// Valid options for GET, PUT, POST, DELETE +// Using CAPITALIZED_UNDERSCORE to emphasize that these +// values are meant to be used as constants. +var ( + VALID_GET_OPTIONS = validOptions{ + "recursive": reflect.Bool, + "consistent": reflect.Bool, + "sorted": reflect.Bool, + "wait": reflect.Bool, + "waitIndex": reflect.Uint64, + } + + VALID_PUT_OPTIONS = validOptions{ + "prevValue": reflect.String, + "prevIndex": reflect.Uint64, + "prevExist": reflect.Bool, + } + + VALID_POST_OPTIONS = validOptions{} + + VALID_DELETE_OPTIONS = validOptions{ + "recursive": reflect.Bool, + } + + curlChan chan string +) + +// SetCurlChan sets a channel to which cURL commands which can be used to +// re-produce requests are sent. This is useful for debugging. +func SetCurlChan(c chan string) { + curlChan = c +} + +// get issues a GET request +func (c *Client) get(key string, options options) (*Response, error) { + logger.Debugf("get %s [%s]", key, c.cluster.Leader) + + p := path.Join("keys", key) + // If consistency level is set to STRONG, append + // the `consistent` query string. + if c.config.Consistency == STRONG_CONSISTENCY { + options["consistent"] = true + } + if options != nil { + str, err := optionsToString(options, VALID_GET_OPTIONS) + if err != nil { + return nil, err + } + p += str + } + + resp, err := c.sendRequest("GET", p, url.Values{}) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// put issues a PUT request +func (c *Client) put(key string, value string, ttl uint64, options options) (*Response, error) { + logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + v := url.Values{} + + if value != "" { + v.Set("value", value) + } + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + p := path.Join("keys", key) + if options != nil { + str, err := optionsToString(options, VALID_PUT_OPTIONS) + if err != nil { + return nil, err + } + p += str + } + + resp, err := c.sendRequest("PUT", p, v) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// post issues a POST request +func (c *Client) post(key string, value string, ttl uint64) (*Response, error) { + logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + v := url.Values{} + + if value != "" { + v.Set("value", value) + } + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + resp, err := c.sendRequest("POST", path.Join("keys", key), v) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// delete issues a DELETE request +func (c *Client) delete(key string, options options) (*Response, error) { + logger.Debugf("delete %s [%s]", key, c.cluster.Leader) + v := url.Values{} + + p := path.Join("keys", key) + if options != nil { + str, err := optionsToString(options, VALID_DELETE_OPTIONS) + if err != nil { + return nil, err + } + p += str + } + + resp, err := c.sendRequest("DELETE", p, v) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// sendRequest sends a HTTP request and returns a Response as defined by etcd +func (c *Client) sendRequest(method string, _path string, values url.Values) (*Response, error) { + var body string = values.Encode() + var resp *http.Response + var req *http.Request + + retry := 0 + // if we connect to a follower, we will retry until we found a leader + for { + var httpPath string + + // If _path has schema already, then it's assumed to be + // a complete URL and therefore needs no further processing. + u, err := url.Parse(_path) + if err != nil { + return nil, err + } + + if u.Scheme != "" { + httpPath = _path + } else { + if method == "GET" && c.config.Consistency == WEAK_CONSISTENCY { + // If it's a GET and consistency level is set to WEAK, + // then use a random machine. + httpPath = c.getHttpPath(true, _path) + } else { + // Else use the leader. + httpPath = c.getHttpPath(false, _path) + } + } + + // Return a cURL command if curlChan is set + if curlChan != nil { + command := fmt.Sprintf("curl -X %s %s", method, httpPath) + for key, value := range values { + command += fmt.Sprintf(" -d %s=%s", key, value[0]) + } + curlChan <- command + } + + logger.Debug("send.request.to ", httpPath, " | method ", method) + if body == "" { + + req, _ = http.NewRequest(method, httpPath, nil) + + } else { + req, _ = http.NewRequest(method, httpPath, strings.NewReader(body)) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") + } + + resp, err = c.httpClient.Do(req) + + logger.Debug("recv.response.from ", httpPath) + // network error, change a machine! + if err != nil { + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + num := retry % len(c.cluster.Machines) + logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]") + c.cluster.Leader = c.cluster.Machines[num] + time.Sleep(time.Millisecond * 200) + continue + } + + if resp != nil { + if resp.StatusCode == http.StatusTemporaryRedirect { + httpPath := resp.Header.Get("Location") + + resp.Body.Close() + + if httpPath == "" { + return nil, errors.New("Cannot get redirection location") + } + + c.updateLeader(httpPath) + logger.Debug("send.redirect") + // try to connect the leader + continue + } else if resp.StatusCode == http.StatusInternalServerError { + resp.Body.Close() + + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + continue + } else { + logger.Debug("send.return.response ", httpPath) + break + } + + } + logger.Debug("error.from ", httpPath, " ", err.Error()) + return nil, err + } + + // Convert HTTP response to etcd response + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if !(resp.StatusCode == http.StatusOK || + resp.StatusCode == http.StatusCreated) { + return nil, handleError(b) + } + + var result Response + + err = json.Unmarshal(b, &result) + + if err != nil { + return nil, err + } + + return &result, nil +} + +func (c *Client) getHttpPath(random bool, s ...string) string { + var machine string + if random { + machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))] + } else { + machine = c.cluster.Leader + } + + fullPath := machine + "/" + version + for _, seg := range s { + fullPath = fullPath + "/" + seg + } + + return fullPath +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/response.go b/third_party/github.com/coreos/go-etcd/etcd/response.go new file mode 100644 index 0000000..4bf0c88 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/response.go @@ -0,0 +1,50 @@ +package etcd + +import ( + "time" +) + +// The response object from the server. +type Response struct { + Action string `json:"action"` + Key string `json:"key"` + Dir bool `json:"dir,omitempty"` + PrevValue string `json:"prevValue,omitempty"` + Value string `json:"value,omitempty"` + Kvs kvPairs `json:"kvs,omitempty"` + + // If the key did not exist before the action, + // this field should be set to true + NewKey bool `json:"newKey,omitempty"` + + Expiration *time.Time `json:"expiration,omitempty"` + + // Time to live in second + TTL int64 `json:"ttl,omitempty"` + + // The command index of the raft machine when the command is executed + Index uint64 `json:"index"` +} + +// When user list a directory, we add all the node into key-value pair slice +type KeyValuePair struct { + Key string `json:"key, omitempty"` + Value string `json:"value,omitempty"` + Dir bool `json:"dir,omitempty"` + KVPairs kvPairs `json:"kvs,omitempty"` +} + +type kvPairs []KeyValuePair + +// interfaces for sorting +func (kvs kvPairs) Len() int { + return len(kvs) +} + +func (kvs kvPairs) Less(i, j int) bool { + return kvs[i].Key < kvs[j].Key +} + +func (kvs kvPairs) Swap(i, j int) { + kvs[i], kvs[j] = kvs[j], kvs[i] +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set.go b/third_party/github.com/coreos/go-etcd/etcd/set.go deleted file mode 100644 index 78acb90..0000000 --- a/third_party/github.com/coreos/go-etcd/etcd/set.go +++ /dev/null @@ -1,90 +0,0 @@ -package etcd - -import ( - "encoding/json" - "fmt" - "github.com/coreos/etcd/store" - "io/ioutil" - "net/http" - "net/url" - "path" -) - -func (c *Client) Set(key string, value string, ttl uint64) (*store.Response, error) { - logger.Debugf("set %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) - v := url.Values{} - v.Set("value", value) - - if ttl > 0 { - v.Set("ttl", fmt.Sprintf("%v", ttl)) - } - - resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode()) - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - - return nil, handleError(b) - } - - return convertSetResponse(b) - -} - -// SetTo sets the value of the key to a given machine address. -// If the given machine is not available or is not leader it returns an error -// Mainly use for testing purpose. -func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*store.Response, error) { - v := url.Values{} - v.Set("value", value) - - if ttl > 0 { - v.Set("ttl", fmt.Sprintf("%v", ttl)) - } - - httpPath := c.createHttpPath(addr, path.Join(version, "keys", key)) - - resp, err := c.httpClient.PostForm(httpPath, v) - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, handleError(b) - } - - return convertSetResponse(b) -} - -// Convert byte stream to response. -func convertSetResponse(b []byte) (*store.Response, error) { - var result store.Response - - err := json.Unmarshal(b, &result) - - if err != nil { - return nil, err - } - - return &result, nil -} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_test.go deleted file mode 100644 index 3809ee9..0000000 --- a/third_party/github.com/coreos/go-etcd/etcd/set_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package etcd - -import ( - "testing" - "time" -) - -func TestSet(t *testing.T) { - c := NewClient(nil) - - result, err := c.Set("foo", "bar", 100) - - if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 { - if err != nil { - t.Fatal(err) - } - - t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL) - } - - time.Sleep(time.Second) - - result, err = c.Set("foo", "bar", 100) - - if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 99 { - if err != nil { - t.Fatal(err) - } - t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) - } - - result, err = c.SetTo("toFoo", "bar", 100, "0.0.0.0:4001") - - if err != nil || result.Key != "/toFoo" || result.Value != "bar" || result.TTL != 99 { - if err != nil { - t.Fatal(err) - } - - t.Fatalf("SetTo failed with %s %s %v", result.Key, result.Value, result.TTL) - } - -} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go new file mode 100644 index 0000000..281cd57 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go @@ -0,0 +1,43 @@ +package etcd + +// SetDir sets the given key to a directory. +func (c *Client) SetDir(key string, ttl uint64) (*Response, error) { + return c.put(key, "", ttl, nil) +} + +// UpdateDir updates the given key to a directory. It succeeds only if the +// given key already exists. +func (c *Client) UpdateDir(key string, ttl uint64) (*Response, error) { + return c.put(key, "", ttl, options{ + "prevExist": true, + }) +} + +// UpdateDir creates a directory under the given key. It succeeds only if +// the given key does not yet exist. +func (c *Client) CreateDir(key string, ttl uint64) (*Response, error) { + return c.put(key, "", ttl, options{ + "prevExist": false, + }) +} + +// Set sets the given key to the given value. +func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) { + return c.put(key, value, ttl, nil) +} + +// Update updates the given key to the given value. It succeeds only if the +// given key already exists. +func (c *Client) Update(key string, value string, ttl uint64) (*Response, error) { + return c.put(key, value, ttl, options{ + "prevExist": true, + }) +} + +// Create creates a file with the given value under the given key. It succeeds +// only if the given key does not yet exist. +func (c *Client) Create(key string, value string, ttl uint64) (*Response, error) { + return c.put(key, value, ttl, options{ + "prevExist": false, + }) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go new file mode 100644 index 0000000..6f27fdf --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go @@ -0,0 +1,183 @@ +package etcd + +import ( + "testing" +) + +func TestSet(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + resp, err := c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + if resp.Key != "/foo" || resp.Value != "bar" || resp.TTL != 5 { + t.Fatalf("Set 1 failed: %#v", resp) + } + + resp, err = c.Set("foo", "bar2", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/foo" && resp.Value == "bar2" && + resp.PrevValue == "bar" && resp.TTL == 5) { + t.Fatalf("Set 2 failed: %#v", resp) + } +} + +func TestUpdate(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + c.DeleteAll("nonexistent") + }() + + resp, err := c.Set("foo", "bar", 5) + t.Logf("%#v", resp) + if err != nil { + t.Fatal(err) + } + + // This should succeed. + resp, err = c.Update("foo", "wakawaka", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "update" && resp.Key == "/foo" && + resp.PrevValue == "bar" && resp.TTL == 5) { + t.Fatalf("Update 1 failed: %#v", resp) + } + + // This should fail because the key does not exist. + resp, err = c.Update("nonexistent", "whatever", 5) + if err == nil { + t.Fatalf("The key %v did not exist, so the update should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} + +func TestCreate(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("newKey") + }() + + newKey := "/newKey" + newValue := "/newValue" + + // This should succeed + resp, err := c.Create(newKey, newValue, 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Key == newKey && + resp.Value == newValue && resp.PrevValue == "" && resp.TTL == 5) { + t.Fatalf("Create 1 failed: %#v", resp) + } + + // This should fail, because the key is already there + resp, err = c.Create(newKey, newValue, 5) + if err == nil { + t.Fatalf("The key %v did exist, so the creation should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} + +func TestSetDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + c.DeleteAll("fooDir") + }() + + resp, err := c.SetDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/fooDir" && resp.Value == "" && resp.TTL == 5) { + t.Fatalf("SetDir 1 failed: %#v", resp) + } + + // This should fail because /fooDir already points to a directory + resp, err = c.SetDir("/fooDir", 5) + if err == nil { + t.Fatalf("fooDir already points to a directory, so SetDir should have failed."+ + "The response was: %#v", resp) + } + + _, err = c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + // This should succeed + resp, err = c.SetDir("foo", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/foo" && resp.Value == "" && + resp.PrevValue == "bar" && resp.TTL == 5) { + t.Fatalf("SetDir 2 failed: %#v", resp) + } +} + +func TestUpdateDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + }() + + resp, err := c.SetDir("fooDir", 5) + t.Logf("%#v", resp) + if err != nil { + t.Fatal(err) + } + + // This should succeed. + resp, err = c.UpdateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "update" && resp.Key == "/fooDir" && + resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) { + t.Fatalf("UpdateDir 1 failed: %#v", resp) + } + + // This should fail because the key does not exist. + resp, err = c.UpdateDir("nonexistentDir", 5) + if err == nil { + t.Fatalf("The key %v did not exist, so the update should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} + +func TestCreateDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + }() + + // This should succeed + resp, err := c.CreateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Key == "/fooDir" && + resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) { + t.Fatalf("CreateDir 1 failed: %#v", resp) + } + + // This should fail, because the key is already there + resp, err = c.CreateDir("fooDir", 5) + if err == nil { + t.Fatalf("The key %v did exist, so the creation should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go b/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go deleted file mode 100644 index 0bd8672..0000000 --- a/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go +++ /dev/null @@ -1,57 +0,0 @@ -package etcd - -import ( - "encoding/json" - "fmt" - "github.com/coreos/etcd/store" - "io/ioutil" - "net/http" - "net/url" - "path" -) - -func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint64) (*store.Response, bool, error) { - logger.Debugf("set %s, %s[%s], ttl: %d, [%s]", key, value, prevValue, ttl, c.cluster.Leader) - v := url.Values{} - v.Set("value", value) - v.Set("prevValue", prevValue) - - if ttl > 0 { - v.Set("ttl", fmt.Sprintf("%v", ttl)) - } - - resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode()) - - if err != nil { - return nil, false, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - - return nil, false, err - } - - if resp.StatusCode != http.StatusOK { - return nil, false, handleError(b) - } - - var result store.Response - - err = json.Unmarshal(b, &result) - - if err != nil { - return nil, false, err - } - - if result.PrevValue == prevValue && result.Value == value { - - return &result, true, nil - } - - return &result, false, nil - -} diff --git a/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go b/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go deleted file mode 100644 index 5dbd854..0000000 --- a/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package etcd - -import ( - "testing" - "time" -) - -func TestTestAndSet(t *testing.T) { - c := NewClient(nil) - - c.Set("foo_testAndSet", "bar", 100) - - time.Sleep(time.Second) - - results := make(chan bool, 3) - - for i := 0; i < 3; i++ { - testAndSet("foo_testAndSet", "bar", "barbar", results, c) - } - - count := 0 - - for i := 0; i < 3; i++ { - result := <-results - if result { - count++ - } - } - - if count != 1 { - t.Fatalf("test and set fails %v", count) - } - -} - -func testAndSet(key string, prevValue string, value string, ch chan bool, c *Client) { - _, success, _ := c.TestAndSet(key, prevValue, value, 0) - ch <- success -} diff --git a/third_party/github.com/coreos/go-etcd/etcd/utils.go b/third_party/github.com/coreos/go-etcd/etcd/utils.go new file mode 100644 index 0000000..eb2f604 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/utils.go @@ -0,0 +1,33 @@ +// Utility functions + +package etcd + +import ( + "fmt" + "net/url" + "reflect" +) + +// Convert options to a string of HTML parameters +func optionsToString(options options, vops validOptions) (string, error) { + p := "?" + v := url.Values{} + for opKey, opVal := range options { + // Check if the given option is valid (that it exists) + kind := vops[opKey] + if kind == reflect.Invalid { + return "", fmt.Errorf("Invalid option: %v", opKey) + } + + // Check if the given option is of the valid type + t := reflect.TypeOf(opVal) + if kind != t.Kind() { + return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.", + opKey, kind, t.Kind()) + } + + v.Set(opKey, fmt.Sprintf("%v", opVal)) + } + p += v.Encode() + return p, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/version.go b/third_party/github.com/coreos/go-etcd/etcd/version.go index e84e7b5..b3d05df 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/version.go +++ b/third_party/github.com/coreos/go-etcd/etcd/version.go @@ -1,3 +1,3 @@ package etcd -const version = "v1" +const version = "v2" diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch.go b/third_party/github.com/coreos/go-etcd/etcd/watch.go index 7f59ed0..e770f1a 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch.go @@ -1,43 +1,47 @@ package etcd import ( - "encoding/json" "errors" - "fmt" - "github.com/coreos/etcd/store" - "io/ioutil" - "net/http" - "net/url" - "path" ) -type respAndErr struct { - resp *http.Response - err error -} - // Errors introduced by the Watch command. var ( ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel") ) -// Watch any change under the given prefix. -// When a sinceIndex is given, watch will try to scan from that index to the last index -// and will return any changes under the given prefix during the history +// WatchAll returns the first change under the given prefix since the given index. To +// watch for the latest change, set waitIndex = 0. +// +// If the prefix points to a directory, any change under it, including all child directories, +// will be returned. +// // If a receiver channel is given, it will be a long-term watch. Watch will block at the // channel. And after someone receive the channel, it will go on to watch that prefix. // If a stop channel is given, client can close long-term watch using the stop channel +func (c *Client) WatchAll(prefix string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { + return c.watch(prefix, waitIndex, true, receiver, stop) +} -func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *store.Response, stop chan bool) (*store.Response, error) { +// Watch returns the first change to the given key since the given index. To +// watch for the latest change, set waitIndex = 0. +// +// If a receiver channel is given, it will be a long-term watch. Watch will block at the +// channel. And after someone receive the channel, it will go on to watch that +// prefix. If a stop channel is given, client can close long-term watch using +// the stop channel +func (c *Client) Watch(key string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { + return c.watch(key, waitIndex, false, receiver, stop) +} + +func (c *Client) watch(prefix string, waitIndex uint64, recursive bool, receiver chan *Response, stop chan bool) (*Response, error) { logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader) if receiver == nil { - return c.watchOnce(prefix, sinceIndex, stop) - + return c.watchOnce(prefix, waitIndex, recursive, stop) } else { for { - resp, err := c.watchOnce(prefix, sinceIndex, stop) + resp, err := c.watchOnce(prefix, waitIndex, recursive, stop) if resp != nil { - sinceIndex = resp.Index + 1 + waitIndex = resp.Index + 1 receiver <- resp } else { return nil, err @@ -50,70 +54,37 @@ func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *store.Re // helper func // return when there is change under the given prefix -func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*store.Response, error) { - - var resp *http.Response - var err error - - if stop != nil { - ch := make(chan respAndErr) +func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop chan bool) (*Response, error) { - go func() { - resp, err = c.sendWatchRequest(key, sinceIndex) + respChan := make(chan *Response) + errChan := make(chan error) - ch <- respAndErr{resp, err} - }() - - // select at stop or continue to receive - select { - - case res := <-ch: - resp, err = res.resp, res.err - - case <-stop: - resp, err = nil, ErrWatchStoppedByUser + go func() { + options := options{ + "wait": true, + } + if waitIndex > 0 { + options["waitIndex"] = waitIndex + } + if recursive { + options["recursive"] = true } - } else { - resp, err = c.sendWatchRequest(key, sinceIndex) - } - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, handleError(b) - } + resp, err := c.get(key, options) - var result store.Response + if err != nil { + errChan <- err + } - err = json.Unmarshal(b, &result) + respChan <- resp + }() - if err != nil { + select { + case resp := <-respChan: + return resp, nil + case err := <-errChan: return nil, err + case <-stop: + return nil, ErrWatchStoppedByUser } - - return &result, nil -} - -func (c *Client) sendWatchRequest(key string, sinceIndex uint64) (*http.Response, error) { - if sinceIndex == 0 { - resp, err := c.sendRequest("GET", path.Join("watch", key), "") - return resp, err - } else { - v := url.Values{} - v.Set("index", fmt.Sprintf("%v", sinceIndex)) - resp, err := c.sendRequest("POST", path.Join("watch", key), v.Encode()) - return resp, err - } - } diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go index 0d93485..f4efd9f 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go @@ -2,38 +2,40 @@ package etcd import ( "fmt" - "github.com/coreos/etcd/store" "testing" "time" ) func TestWatch(t *testing.T) { c := NewClient(nil) + defer func() { + c.DeleteAll("watch_foo") + }() - go setHelper("bar", c) + go setHelper("watch_foo", "bar", c) - result, err := c.Watch("watch_foo", 0, nil, nil) - - if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" { - if err != nil { - t.Fatal(err) - } - t.Fatalf("Watch failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index) + resp, err := c.Watch("watch_foo", 0, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo" && resp.Value == "bar") { + t.Fatalf("Watch 1 failed: %#v", resp) } - result, err = c.Watch("watch_foo", result.Index, nil, nil) + go setHelper("watch_foo", "bar", c) - if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" { - if err != nil { - t.Fatal(err) - } - t.Fatalf("Watch with Index failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index) + resp, err = c.Watch("watch_foo", resp.Index, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo" && resp.Value == "bar") { + t.Fatalf("Watch 2 failed: %#v", resp) } - ch := make(chan *store.Response, 10) + ch := make(chan *Response, 10) stop := make(chan bool, 1) - go setLoop("bar", c) + go setLoop("watch_foo", "bar", c) go receiver(ch, stop) @@ -43,21 +45,60 @@ func TestWatch(t *testing.T) { } } -func setHelper(value string, c *Client) { +func TestWatchAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("watch_foo") + }() + + go setHelper("watch_foo/foo", "bar", c) + + resp, err := c.WatchAll("watch_foo", 0, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") { + t.Fatalf("WatchAll 1 failed: %#v", resp) + } + + go setHelper("watch_foo/foo", "bar", c) + + resp, err = c.WatchAll("watch_foo", resp.Index, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") { + t.Fatalf("WatchAll 2 failed: %#v", resp) + } + + ch := make(chan *Response, 10) + stop := make(chan bool, 1) + + go setLoop("watch_foo/foo", "bar", c) + + go receiver(ch, stop) + + _, err = c.WatchAll("watch_foo", 0, ch, stop) + if err != ErrWatchStoppedByUser { + t.Fatalf("Watch returned a non-user stop error") + } +} + +func setHelper(key, value string, c *Client) { time.Sleep(time.Second) - c.Set("watch_foo/foo", value, 100) + c.Set(key, value, 100) } -func setLoop(value string, c *Client) { +func setLoop(key, value string, c *Client) { time.Sleep(time.Second) for i := 0; i < 10; i++ { newValue := fmt.Sprintf("%s_%v", value, i) - c.Set("watch_foo/foo", newValue, 100) + c.Set(key, newValue, 100) time.Sleep(time.Second / 10) } } -func receiver(c chan *store.Response, stop chan bool) { +func receiver(c chan *Response, stop chan bool) { for i := 0; i < 10; i++ { <-c } diff --git a/third_party/github.com/coreos/go-systemd/journal/send.go b/third_party/github.com/coreos/go-systemd/journal/send.go index 51caa11..e288bb0 100644 --- a/third_party/github.com/coreos/go-systemd/journal/send.go +++ b/third_party/github.com/coreos/go-systemd/journal/send.go @@ -3,6 +3,7 @@ package journal import ( "bytes" + "encoding/binary" "errors" "fmt" "io" @@ -12,7 +13,6 @@ import ( "strconv" "strings" "syscall" - "encoding/binary" ) // Priority of a journal message @@ -32,7 +32,11 @@ const ( var conn net.Conn func init() { - conn, _ = net.Dial("unixgram", "/run/systemd/journal/socket") + var err error + conn, err = net.Dial("unixgram", "/run/systemd/journal/socket") + if err != nil { + conn = nil + } } // Enabled returns true iff the systemd journal is available for logging diff --git a/watch.go b/watch.go index aeb4892..aedb88f 100644 --- a/watch.go +++ b/watch.go @@ -3,7 +3,7 @@ package main import ( "flag" "fmt" - "github.com/coreos/etcd/store" + "github.com/coreos/go-etcd/etcd" "os" "os/signal" ) @@ -12,6 +12,10 @@ const WatchUsage = `usage: etcdctl [etcd flags] watch [watch flags] special flags: -f forever watch a key until CTRL+C -i watch from the given index` +const WatchAllUsage = `usage: etcdctl [etcd flags] watchAll [watchAll flags] +special flags: -f forever watch a key until CTRL+C + -i watch from the given index` + var ( watchFlag = flag.NewFlagSet("watch", flag.ExitOnError) forever = watchFlag.Bool("f", false, "forever watch at the key") @@ -19,12 +23,13 @@ var ( ) func init() { - registerCommand("watch", WatchUsage, 2, 6, watch) + registerCommand("watch", WatchUsage, 1, 3, watch) + registerCommand("watchAll", WatchAllUsage, 1, 3, watchAll) } func watch(args []string) error { - key := args[1] - watchFlag.Parse(args[2:]) + key := args[0] + watchFlag.Parse(args[1:]) if *forever { @@ -39,20 +44,68 @@ func watch(args []string) error { os.Exit(0) }() - receiver := make(chan *store.Response) + receiver := make(chan *etcd.Response) go client.Watch(key, uint64(*index), receiver, stop) for { resp := <-receiver - fmt.Println(resp.Action, " ", resp.Key, " ", resp.Value) + if debug { + fmt.Println(<-curlChan) + } + output(resp) } } else { resp, err := client.Watch(key, uint64(*index), nil, nil) + if debug { + fmt.Println(<-curlChan) + } + if err != nil { + return err + } + output(resp) + } + + return nil +} + +func watchAll(args []string) error { + key := args[0] + watchFlag.Parse(args[1:]) + + if *forever { + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + + stop := make(chan bool) + + go func() { + <-c + stop <- true + os.Exit(0) + }() + + receiver := make(chan *etcd.Response) + go client.WatchAll(key, uint64(*index), receiver, stop) + + for { + resp := <-receiver + if debug { + fmt.Println(<-curlChan) + } + output(resp) + } + + } else { + resp, err := client.WatchAll(key, uint64(*index), nil, nil) + if debug { + fmt.Println(<-curlChan) + } if err != nil { return err } - fmt.Println(resp.Action, " ", resp.Key, " ", resp.Value) + output(resp) } return nil