diff --git a/internal/test/assertions.go b/internal/test/assertions.go index 4054f942d..8e27acd17 100644 --- a/internal/test/assertions.go +++ b/internal/test/assertions.go @@ -17,10 +17,10 @@ func Equal(t *testing.T, expected, actual interface{}) { } func NotEqual(t *testing.T, expected, actual interface{}) { - if !reflect.DeepEqual(expected, actual) { + if reflect.DeepEqual(expected, actual) { _, file, line, _ := runtime.Caller(1) - t.Logf("\033[31m%s:%d:\n\n\tvalue should not equal %#v\033[39m\n\n", - filepath.Base(file), line, actual) + t.Logf("\033[31m%s:%d:\n\n\tnexp: %#v\n\n\tgot: %#v\033[39m\n\n", + filepath.Base(file), line, expected, actual) t.FailNow() } } diff --git a/internal/test/logger.go b/internal/test/logger.go new file mode 100644 index 000000000..8ca542c5d --- /dev/null +++ b/internal/test/logger.go @@ -0,0 +1,22 @@ +package test + +import ( + "github.com/nsqio/nsq/internal/app" +) + +type tbLog interface { + Log(...interface{}) +} + +type testLogger struct { + tbLog +} + +func (tl *testLogger) Output(maxdepth int, s string) error { + tl.Log(s) + return nil +} + +func NewTestLogger(tbl tbLog) app.Logger { + return &testLogger{tbl} +} diff --git a/nsqlookupd/http_test.go b/nsqlookupd/http_test.go new file mode 100644 index 000000000..b5e4a439f --- /dev/null +++ b/nsqlookupd/http_test.go @@ -0,0 +1,494 @@ +package nsqlookupd + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "strconv" + "testing" + "time" + + "github.com/nsqio/nsq/internal/test" + "github.com/nsqio/nsq/internal/version" + "github.com/nsqio/nsq/nsqd" +) + +type Version struct { + Version string `json:"version"` +} + +type InfoDoc struct { + Code int `json:"status_code"` + Status string `json:"status_txt"` + Data *Version `json:"data"` +} + +type ChannelsDoc struct { + Channels []interface{} `json:"channels"` +} + +type OldErrMessage struct { + Message string `json:"status_txt"` +} + +type ErrMessage struct { + Message string `json:"message"` +} + +func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, *NSQLookupd) { + lgr := test.NewTestLogger(t) + + nsqlookupdOpts := NewOptions() + nsqlookupdOpts.TCPAddress = "127.0.0.1:0" + nsqlookupdOpts.HTTPAddress = "127.0.0.1:0" + nsqlookupdOpts.BroadcastAddress = "127.0.0.1" + nsqlookupdOpts.Logger = lgr + nsqlookupd1 := New(nsqlookupdOpts) + go nsqlookupd1.Main() + + time.Sleep(100 * time.Millisecond) + + nsqdOpts := nsqd.NewOptions() + nsqdOpts.TCPAddress = "127.0.0.1:0" + nsqdOpts.HTTPAddress = "127.0.0.1:0" + nsqdOpts.BroadcastAddress = "127.0.0.1" + nsqdOpts.NSQLookupdTCPAddresses = []string{nsqlookupd1.RealTCPAddr().String()} + nsqdOpts.Logger = lgr + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + nsqdOpts.DataPath = tmpDir + nsqd1 := nsqd.New(nsqdOpts) + go nsqd1.Main() + + time.Sleep(100 * time.Millisecond) + + return tmpDir, []*nsqd.NSQD{nsqd1}, nsqlookupd1 +} + +func makeTopic(nsqlookupd *NSQLookupd, topicName string) { + key := Registration{"topic", topicName, ""} + nsqlookupd.DB.AddRegistration(key) +} + +func makeChannel(nsqlookupd *NSQLookupd, topicName string, channelName string) { + key := Registration{"channel", topicName, channelName} + nsqlookupd.DB.AddRegistration(key) + makeTopic(nsqlookupd, topicName) +} + +func TestPing(t *testing.T) { + dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t) + defer os.RemoveAll(dataPath) + defer nsqds[0].Exit() + defer nsqlookupd1.Exit() + + client := http.Client{} + url := fmt.Sprintf("http://%s/ping", nsqlookupd1.RealHTTPAddr()) + req, _ := http.NewRequest("GET", url, nil) + resp, err := client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + test.Equal(t, []byte("OK"), body) +} + +func TestInfo(t *testing.T) { + dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t) + defer os.RemoveAll(dataPath) + defer nsqds[0].Exit() + defer nsqlookupd1.Exit() + + client := http.Client{} + url := fmt.Sprintf("http://%s/info", nsqlookupd1.RealHTTPAddr()) + req, _ := http.NewRequest("GET", url, nil) + resp, err := client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + info := InfoDoc{} + err = json.Unmarshal(body, &info) + test.Nil(t, err) + test.Equal(t, version.Binary, info.Data.Version) +} + +func TestCreateTopic(t *testing.T) { + dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t) + defer os.RemoveAll(dataPath) + defer nsqds[0].Exit() + defer nsqlookupd1.Exit() + + em := ErrMessage{} + client := http.Client{} + url := fmt.Sprintf("http://%s/topic/create", nsqlookupd1.RealHTTPAddr()) + + req, _ := http.NewRequest("POST", url, nil) + resp, err := client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "MISSING_ARG_TOPIC", em.Message) + + topicName := "sampletopicA" + strconv.Itoa(int(time.Now().Unix())) + "$" + url = fmt.Sprintf("http://%s/topic/create?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "INVALID_ARG_TOPIC", em.Message) + + topicName = "sampletopicA" + strconv.Itoa(int(time.Now().Unix())) + url = fmt.Sprintf("http://%s/topic/create?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + test.Equal(t, []byte(""), body) +} + +func TestDeleteTopic(t *testing.T) { + dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t) + defer os.RemoveAll(dataPath) + defer nsqds[0].Exit() + defer nsqlookupd1.Exit() + + em := ErrMessage{} + client := http.Client{} + url := fmt.Sprintf("http://%s/topic/delete", nsqlookupd1.RealHTTPAddr()) + + req, _ := http.NewRequest("POST", url, nil) + resp, err := client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "MISSING_ARG_TOPIC", em.Message) + + topicName := "sampletopicA" + strconv.Itoa(int(time.Now().Unix())) + makeTopic(nsqlookupd1, topicName) + + url = fmt.Sprintf("http://%s/topic/delete?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + test.Equal(t, []byte(""), body) + + topicName = "sampletopicB" + strconv.Itoa(int(time.Now().Unix())) + channelName := "foobar" + strconv.Itoa(int(time.Now().Unix())) + makeChannel(nsqlookupd1, topicName, channelName) + + url = fmt.Sprintf("http://%s/topic/delete?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + test.Equal(t, []byte(""), body) +} + +func TestGetChannels(t *testing.T) { + dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t) + defer os.RemoveAll(dataPath) + defer nsqds[0].Exit() + defer nsqlookupd1.Exit() + + client := http.Client{} + url := fmt.Sprintf("http://%s/channels", nsqlookupd1.RealHTTPAddr()) + + oem := OldErrMessage{} + // pre-version 1 + req, _ := http.NewRequest("GET", url, nil) + resp, err := client.Do(req) + test.Nil(t, err) + test.Equal(t, 500, resp.StatusCode) + test.Equal(t, "Internal Server Error", http.StatusText(resp.StatusCode)) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &oem) + test.Nil(t, err) + test.Equal(t, "MISSING_ARG_TOPIC", oem.Message) + + // version 1 + em := ErrMessage{} + req, _ = http.NewRequest("GET", url, nil) + req.Header.Add("Accept", "application/vnd.nsq; version=1.0") + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "MISSING_ARG_TOPIC", em.Message) + + ch := ChannelsDoc{} + topicName := "sampletopicA" + strconv.Itoa(int(time.Now().Unix())) + makeTopic(nsqlookupd1, topicName) + + url = fmt.Sprintf("http://%s/channels?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("GET", url, nil) + req.Header.Add("Accept", "application/vnd.nsq; version=1.0") + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &ch) + test.Nil(t, err) + test.Equal(t, 0, len(ch.Channels)) + + topicName = "sampletopicB" + strconv.Itoa(int(time.Now().Unix())) + channelName := "foobar" + strconv.Itoa(int(time.Now().Unix())) + makeChannel(nsqlookupd1, topicName, channelName) + + url = fmt.Sprintf("http://%s/channels?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("GET", url, nil) + req.Header.Add("Accept", "application/vnd.nsq; version=1.0") + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &ch) + test.Nil(t, err) + test.Equal(t, 1, len(ch.Channels)) + test.Equal(t, channelName, ch.Channels[0]) +} + +func TestCreateChannel(t *testing.T) { + dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t) + defer os.RemoveAll(dataPath) + defer nsqds[0].Exit() + defer nsqlookupd1.Exit() + + em := ErrMessage{} + client := http.Client{} + url := fmt.Sprintf("http://%s/channel/create", nsqlookupd1.RealHTTPAddr()) + + req, _ := http.NewRequest("POST", url, nil) + resp, err := client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "MISSING_ARG_TOPIC", em.Message) + + topicName := "sampletopicB" + strconv.Itoa(int(time.Now().Unix())) + "$" + url = fmt.Sprintf("http://%s/channel/create?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "INVALID_ARG_TOPIC", em.Message) + + topicName = "sampletopicB" + strconv.Itoa(int(time.Now().Unix())) + url = fmt.Sprintf("http://%s/channel/create?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "MISSING_ARG_CHANNEL", em.Message) + + channelName := "foobar" + strconv.Itoa(int(time.Now().Unix())) + "$" + url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=%s", nsqlookupd1.RealHTTPAddr(), topicName, channelName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "INVALID_ARG_CHANNEL", em.Message) + + channelName = "foobar" + strconv.Itoa(int(time.Now().Unix())) + url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=%s", nsqlookupd1.RealHTTPAddr(), topicName, channelName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + test.Equal(t, []byte(""), body) +} + +func TestDeleteChannel(t *testing.T) { + dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t) + defer os.RemoveAll(dataPath) + defer nsqds[0].Exit() + defer nsqlookupd1.Exit() + + em := ErrMessage{} + client := http.Client{} + url := fmt.Sprintf("http://%s/channel/delete", nsqlookupd1.RealHTTPAddr()) + + req, _ := http.NewRequest("POST", url, nil) + resp, err := client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "MISSING_ARG_TOPIC", em.Message) + + topicName := "sampletopicB" + strconv.Itoa(int(time.Now().Unix())) + "$" + url = fmt.Sprintf("http://%s/channel/delete?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "INVALID_ARG_TOPIC", em.Message) + + topicName = "sampletopicB" + strconv.Itoa(int(time.Now().Unix())) + url = fmt.Sprintf("http://%s/channel/delete?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "MISSING_ARG_CHANNEL", em.Message) + + channelName := "foobar" + strconv.Itoa(int(time.Now().Unix())) + "$" + url = fmt.Sprintf("http://%s/channel/delete?topic=%s&channel=%s", nsqlookupd1.RealHTTPAddr(), topicName, channelName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 400, resp.StatusCode) + test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode)) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "INVALID_ARG_CHANNEL", em.Message) + + channelName = "foobar" + strconv.Itoa(int(time.Now().Unix())) + url = fmt.Sprintf("http://%s/channel/delete?topic=%s&channel=%s", nsqlookupd1.RealHTTPAddr(), topicName, channelName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 404, resp.StatusCode) + test.Equal(t, "Not Found", http.StatusText(resp.StatusCode)) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &em) + test.Nil(t, err) + test.Equal(t, "CHANNEL_NOT_FOUND", em.Message) + + makeChannel(nsqlookupd1, topicName, channelName) + + req, _ = http.NewRequest("POST", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + test.Equal(t, []byte(""), body) +} diff --git a/nsqlookupd/logger.go b/nsqlookupd/logger.go index 8b18f1496..960e11188 100644 --- a/nsqlookupd/logger.go +++ b/nsqlookupd/logger.go @@ -1,5 +1,5 @@ package nsqlookupd -type logger interface { +type Logger interface { Output(maxdepth int, s string) error } diff --git a/nsqlookupd/lookup_protocol_v1_test.go b/nsqlookupd/lookup_protocol_v1_test.go index c395031f8..b2e2feb35 100644 --- a/nsqlookupd/lookup_protocol_v1_test.go +++ b/nsqlookupd/lookup_protocol_v1_test.go @@ -33,17 +33,17 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { } opts := NewOptions() - opts.Logger = newTestLogger(t) + opts.Logger = test.NewTestLogger(t) opts.Verbose = true prot := &LookupProtocolV1{ctx: &Context{nsqlookupd: New(opts)}} errChan := make(chan error) - test := func() { + testIOLoop := func() { errChan <- prot.IOLoop(fakeConn) defer prot.ctx.nsqlookupd.Exit() } - go test() + go testIOLoop() var err error var timeout bool @@ -54,9 +54,9 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { timeout = true } - equal(t, timeout, false) + test.Equal(t, false, timeout) - nequal(t, err, nil) - equal(t, err.Error(), "E_INVALID invalid command INVALID_COMMAND") - nequal(t, err.(*protocol.FatalClientErr), nil) + test.NotNil(t, err) + test.Equal(t, "E_INVALID invalid command INVALID_COMMAND", err.Error()) + test.NotNil(t, err.(*protocol.FatalClientErr)) } diff --git a/nsqlookupd/nsqlookupd_test.go b/nsqlookupd/nsqlookupd_test.go index cba554af5..853df791d 100644 --- a/nsqlookupd/nsqlookupd_test.go +++ b/nsqlookupd/nsqlookupd_test.go @@ -3,56 +3,35 @@ package nsqlookupd import ( "fmt" "net" - "path/filepath" - "reflect" - "runtime" "testing" "time" - "github.com/bitly/go-simplejson" "github.com/nsqio/go-nsq" "github.com/nsqio/nsq/internal/clusterinfo" "github.com/nsqio/nsq/internal/http_api" + "github.com/nsqio/nsq/internal/test" ) const ( ConnectTimeout = 2 * time.Second RequestTimeout = 5 * time.Second + TCPPort = 5000 + HTTPPort = 5555 + HostAddr = "ip.address" + NSQDVersion = "fake-version" ) -func equal(t *testing.T, act, exp interface{}) { - if !reflect.DeepEqual(exp, act) { - _, file, line, _ := runtime.Caller(1) - t.Logf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", - filepath.Base(file), line, exp, act) - t.FailNow() - } -} - -func nequal(t *testing.T, act, exp interface{}) { - if reflect.DeepEqual(exp, act) { - _, file, line, _ := runtime.Caller(1) - t.Logf("\033[31m%s:%d:\n\n\tnexp: %#v\n\n\tgot: %#v\033[39m\n\n", - filepath.Base(file), line, exp, act) - t.FailNow() - } -} - -type tbLog interface { - Log(...interface{}) -} - -type testLogger struct { - tbLog +type ProducersDoc struct { + Producers []interface{} `json:"producers"` } -func (tl *testLogger) Output(maxdepth int, s string) error { - tl.Log(s) - return nil +type TopicsDoc struct { + Topics []interface{} `json:"topics"` } -func newTestLogger(tbl tbLog) logger { - return &testLogger{tbl} +type LookupDoc struct { + Channels []interface{} `json:"channels"` + Producers []*PeerInfo `json:"producers"` } func mustStartLookupd(opts *Options) (*net.TCPAddr, *net.TCPAddr, *NSQLookupd) { @@ -72,180 +51,155 @@ func mustConnectLookupd(t *testing.T, tcpAddr *net.TCPAddr) net.Conn { return conn } -func identify(t *testing.T, conn net.Conn, address string, tcpPort int, httpPort int, version string) { +func identify(t *testing.T, conn net.Conn) { ci := make(map[string]interface{}) - ci["tcp_port"] = tcpPort - ci["http_port"] = httpPort - ci["broadcast_address"] = address - ci["hostname"] = address - ci["version"] = version + ci["tcp_port"] = TCPPort + ci["http_port"] = HTTPPort + ci["broadcast_address"] = HostAddr + ci["hostname"] = HostAddr + ci["version"] = NSQDVersion cmd, _ := nsq.Identify(ci) _, err := cmd.WriteTo(conn) - equal(t, err, nil) + test.Nil(t, err) _, err = nsq.ReadResponse(conn) - equal(t, err, nil) + test.Nil(t, err) } -func API(endpoint string) (data *simplejson.Json, err error) { - d := make(map[string]interface{}) - err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &d) - data = simplejson.New() - data.SetPath(nil, d) - return +func TestNoLogger(t *testing.T) { + opts := NewOptions() + opts.Logger = nil + opts.TCPAddress = "127.0.0.1:0" + opts.HTTPAddress = "127.0.0.1:0" + nsqlookupd := New(opts) + + nsqlookupd.logf("should never be logged") } func TestBasicLookupd(t *testing.T) { opts := NewOptions() - opts.Logger = newTestLogger(t) + opts.Logger = test.NewTestLogger(t) tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) defer nsqlookupd.Exit() topics := nsqlookupd.DB.FindRegistrations("topic", "*", "*") - equal(t, len(topics), 0) + test.Equal(t, 0, len(topics)) topicName := "connectmsg" conn := mustConnectLookupd(t, tcpAddr) - tcpPort := 5000 - httpPort := 5555 - identify(t, conn, "ip.address", tcpPort, httpPort, "fake-version") + identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) v, err := nsq.ReadResponse(conn) - equal(t, err, nil) - equal(t, v, []byte("OK")) + test.Nil(t, err) + test.Equal(t, []byte("OK"), v) + pr := ProducersDoc{} endpoint := fmt.Sprintf("http://%s/nodes", httpAddr) - data, err := API(endpoint) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &pr) + test.Nil(t, err) - t.Logf("got %v", data) - returnedProducers, err := data.Get("producers").Array() - equal(t, err, nil) - equal(t, len(returnedProducers), 1) + t.Logf("got %v", pr) + test.Equal(t, 1, len(pr.Producers)) topics = nsqlookupd.DB.FindRegistrations("topic", topicName, "") - equal(t, len(topics), 1) + test.Equal(t, 1, len(topics)) producers := nsqlookupd.DB.FindProducers("topic", topicName, "") - equal(t, len(producers), 1) + test.Equal(t, 1, len(producers)) producer := producers[0] - equal(t, producer.peerInfo.BroadcastAddress, "ip.address") - equal(t, producer.peerInfo.Hostname, "ip.address") - equal(t, producer.peerInfo.TCPPort, tcpPort) - equal(t, producer.peerInfo.HTTPPort, httpPort) + test.Equal(t, HostAddr, producer.peerInfo.BroadcastAddress) + test.Equal(t, HostAddr, producer.peerInfo.Hostname) + test.Equal(t, TCPPort, producer.peerInfo.TCPPort) + test.Equal(t, HTTPPort, producer.peerInfo.HTTPPort) + tr := TopicsDoc{} endpoint = fmt.Sprintf("http://%s/topics", httpAddr) - data, err = API(endpoint) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &tr) + test.Nil(t, err) - equal(t, err, nil) - returnedTopics, err := data.Get("topics").Array() - t.Logf("got returnedTopics %v", returnedTopics) - equal(t, err, nil) - equal(t, len(returnedTopics), 1) + t.Logf("got %v", tr) + test.Equal(t, 1, len(tr.Topics)) + lr := LookupDoc{} endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err = API(endpoint) - - equal(t, err, nil) - returnedChannels, err := data.Get("channels").Array() - equal(t, err, nil) - equal(t, len(returnedChannels), 1) - - returnedProducers, err = data.Get("producers").Array() - t.Logf("got returnedProducers %v", returnedProducers) - equal(t, err, nil) - equal(t, len(returnedProducers), 1) - for i := range returnedProducers { - producer := data.Get("producers").GetIndex(i) - t.Logf("producer %v", producer) - - port, err := producer.Get("tcp_port").Int() - equal(t, err, nil) - equal(t, port, tcpPort) - - port, err = producer.Get("http_port").Int() - equal(t, err, nil) - equal(t, port, httpPort) - - broadcastaddress, err := producer.Get("broadcast_address").String() - equal(t, err, nil) - equal(t, broadcastaddress, "ip.address") - - ver, err := producer.Get("version").String() - equal(t, err, nil) - equal(t, ver, "fake-version") + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &lr) + test.Nil(t, err) + + t.Logf("got %v", lr) + test.Equal(t, 1, len(lr.Channels)) + test.Equal(t, 1, len(lr.Producers)) + for _, p := range lr.Producers { + test.Equal(t, TCPPort, p.TCPPort) + test.Equal(t, HTTPPort, p.HTTPPort) + test.Equal(t, HostAddr, p.BroadcastAddress) + test.Equal(t, NSQDVersion, p.Version) } conn.Close() time.Sleep(10 * time.Millisecond) // now there should be no producers, but still topic/channel entries - data, err = API(endpoint) - - equal(t, err, nil) - returnedChannels, err = data.Get("channels").Array() - equal(t, err, nil) - equal(t, len(returnedChannels), 1) - returnedProducers, err = data.Get("producers").Array() - equal(t, err, nil) - equal(t, len(returnedProducers), 0) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &lr) + test.Nil(t, err) + + test.Equal(t, 1, len(lr.Channels)) + test.Equal(t, 0, len(lr.Producers)) } func TestChannelUnregister(t *testing.T) { opts := NewOptions() - opts.Logger = newTestLogger(t) + opts.Logger = test.NewTestLogger(t) tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) defer nsqlookupd.Exit() topics := nsqlookupd.DB.FindRegistrations("topic", "*", "*") - equal(t, len(topics), 0) + test.Equal(t, 0, len(topics)) topicName := "channel_unregister" conn := mustConnectLookupd(t, tcpAddr) defer conn.Close() - tcpPort := 5000 - httpPort := 5555 - identify(t, conn, "ip.address", tcpPort, httpPort, "fake-version") + identify(t, conn) nsq.Register(topicName, "ch1").WriteTo(conn) v, err := nsq.ReadResponse(conn) - equal(t, err, nil) - equal(t, v, []byte("OK")) + test.Nil(t, err) + test.Equal(t, []byte("OK"), v) topics = nsqlookupd.DB.FindRegistrations("topic", topicName, "") - equal(t, len(topics), 1) + test.Equal(t, 1, len(topics)) channels := nsqlookupd.DB.FindRegistrations("channel", topicName, "*") - equal(t, len(channels), 1) + test.Equal(t, 1, len(channels)) nsq.UnRegister(topicName, "ch1").WriteTo(conn) v, err = nsq.ReadResponse(conn) - equal(t, err, nil) - equal(t, v, []byte("OK")) + test.Nil(t, err) + test.Equal(t, []byte("OK"), v) topics = nsqlookupd.DB.FindRegistrations("topic", topicName, "") - equal(t, len(topics), 1) + test.Equal(t, 1, len(topics)) // we should still have mention of the topic even though there is no producer // (ie. we haven't *deleted* the channel, just unregistered as a producer) channels = nsqlookupd.DB.FindRegistrations("channel", topicName, "*") - equal(t, len(channels), 1) + test.Equal(t, 1, len(channels)) + pr := ProducersDoc{} endpoint := fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err := API(endpoint) - equal(t, err, nil) - returnedProducers, err := data.Get("producers").Array() - equal(t, err, nil) - equal(t, len(returnedProducers), 1) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &pr) + test.Nil(t, err) + t.Logf("got %v", pr) + test.Equal(t, 1, len(pr.Producers)) } func TestTombstoneRecover(t *testing.T) { opts := NewOptions() - opts.Logger = newTestLogger(t) + opts.Logger = test.NewTestLogger(t) opts.TombstoneLifetime = 50 * time.Millisecond tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) defer nsqlookupd.Exit() @@ -256,45 +210,44 @@ func TestTombstoneRecover(t *testing.T) { conn := mustConnectLookupd(t, tcpAddr) defer conn.Close() - identify(t, conn, "ip.address", 5000, 5555, "fake-version") + identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) _, err := nsq.ReadResponse(conn) - equal(t, err, nil) + test.Nil(t, err) nsq.Register(topicName2, "channel2").WriteTo(conn) _, err = nsq.ReadResponse(conn) - equal(t, err, nil) + test.Nil(t, err) - endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s", - httpAddr, topicName, "ip.address:5555") + endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s:%d", + httpAddr, topicName, HostAddr, HTTPPort) err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(endpoint) - equal(t, err, nil) + test.Nil(t, err) + + pr := ProducersDoc{} endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err := API(endpoint) - equal(t, err, nil) - producers, _ := data.Get("producers").Array() - equal(t, len(producers), 0) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &pr) + test.Nil(t, err) + test.Equal(t, 0, len(pr.Producers)) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName2) - data, err = API(endpoint) - equal(t, err, nil) - producers, _ = data.Get("producers").Array() - equal(t, len(producers), 1) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &pr) + test.Nil(t, err) + test.Equal(t, 1, len(pr.Producers)) time.Sleep(75 * time.Millisecond) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err = API(endpoint) - equal(t, err, nil) - producers, _ = data.Get("producers").Array() - equal(t, len(producers), 1) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &pr) + test.Nil(t, err) + test.Equal(t, 1, len(pr.Producers)) } func TestTombstoneUnregister(t *testing.T) { opts := NewOptions() - opts.Logger = newTestLogger(t) + opts.Logger = test.NewTestLogger(t) opts.TombstoneLifetime = 50 * time.Millisecond tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) defer nsqlookupd.Exit() @@ -304,39 +257,39 @@ func TestTombstoneUnregister(t *testing.T) { conn := mustConnectLookupd(t, tcpAddr) defer conn.Close() - identify(t, conn, "ip.address", 5000, 5555, "fake-version") + identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) _, err := nsq.ReadResponse(conn) - equal(t, err, nil) + test.Nil(t, err) - endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s", - httpAddr, topicName, "ip.address:5555") + endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s:%d", + httpAddr, topicName, HostAddr, HTTPPort) err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(endpoint) - equal(t, err, nil) + test.Nil(t, err) + + pr := ProducersDoc{} endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err := API(endpoint) - equal(t, err, nil) - producers, _ := data.Get("producers").Array() - equal(t, len(producers), 0) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &pr) + test.Nil(t, err) + test.Equal(t, 0, len(pr.Producers)) nsq.UnRegister(topicName, "").WriteTo(conn) _, err = nsq.ReadResponse(conn) - equal(t, err, nil) + test.Nil(t, err) time.Sleep(55 * time.Millisecond) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err = API(endpoint) - equal(t, err, nil) - producers, _ = data.Get("producers").Array() - equal(t, len(producers), 0) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &pr) + test.Nil(t, err) + test.Equal(t, 0, len(pr.Producers)) } func TestInactiveNodes(t *testing.T) { opts := NewOptions() - opts.Logger = newTestLogger(t) + opts.Logger = test.NewTestLogger(t) opts.InactiveProducerTimeout = 200 * time.Millisecond tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) defer nsqlookupd.Exit() @@ -348,29 +301,29 @@ func TestInactiveNodes(t *testing.T) { conn := mustConnectLookupd(t, tcpAddr) defer conn.Close() - identify(t, conn, "ip.address", 5000, 5555, "fake-version") + identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) _, err := nsq.ReadResponse(conn) - equal(t, err, nil) + test.Nil(t, err) ci := clusterinfo.New(nil, http_api.NewClient(nil, ConnectTimeout, RequestTimeout)) producers, _ := ci.GetLookupdProducers(lookupdHTTPAddrs) - equal(t, len(producers), 1) - equal(t, len(producers[0].Topics), 1) - equal(t, producers[0].Topics[0].Topic, topicName) - equal(t, producers[0].Topics[0].Tombstoned, false) + test.Equal(t, 1, len(producers)) + test.Equal(t, 1, len(producers[0].Topics)) + test.Equal(t, topicName, producers[0].Topics[0].Topic) + test.Equal(t, false, producers[0].Topics[0].Tombstoned) time.Sleep(250 * time.Millisecond) producers, _ = ci.GetLookupdProducers(lookupdHTTPAddrs) - equal(t, len(producers), 0) + test.Equal(t, 0, len(producers)) } func TestTombstonedNodes(t *testing.T) { opts := NewOptions() - opts.Logger = newTestLogger(t) + opts.Logger = test.NewTestLogger(t) tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) defer nsqlookupd.Exit() @@ -381,28 +334,28 @@ func TestTombstonedNodes(t *testing.T) { conn := mustConnectLookupd(t, tcpAddr) defer conn.Close() - identify(t, conn, "ip.address", 5000, 5555, "fake-version") + identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) _, err := nsq.ReadResponse(conn) - equal(t, err, nil) + test.Nil(t, err) ci := clusterinfo.New(nil, http_api.NewClient(nil, ConnectTimeout, RequestTimeout)) producers, _ := ci.GetLookupdProducers(lookupdHTTPAddrs) - equal(t, len(producers), 1) - equal(t, len(producers[0].Topics), 1) - equal(t, producers[0].Topics[0].Topic, topicName) - equal(t, producers[0].Topics[0].Tombstoned, false) + test.Equal(t, 1, len(producers)) + test.Equal(t, 1, len(producers[0].Topics)) + test.Equal(t, topicName, producers[0].Topics[0].Topic) + test.Equal(t, false, producers[0].Topics[0].Tombstoned) - endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s", - httpAddr, topicName, "ip.address:5555") + endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s:%d", + httpAddr, topicName, HostAddr, HTTPPort) err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(endpoint) - equal(t, err, nil) + test.Nil(t, err) producers, _ = ci.GetLookupdProducers(lookupdHTTPAddrs) - equal(t, len(producers), 1) - equal(t, len(producers[0].Topics), 1) - equal(t, producers[0].Topics[0].Topic, topicName) - equal(t, producers[0].Topics[0].Tombstoned, true) + test.Equal(t, 1, len(producers)) + test.Equal(t, 1, len(producers[0].Topics)) + test.Equal(t, topicName, producers[0].Topics[0].Topic) + test.Equal(t, true, producers[0].Topics[0].Tombstoned) } diff --git a/nsqlookupd/options.go b/nsqlookupd/options.go index 2a3c826c3..730d02563 100644 --- a/nsqlookupd/options.go +++ b/nsqlookupd/options.go @@ -16,7 +16,7 @@ type Options struct { InactiveProducerTimeout time.Duration `flag:"inactive-producer-timeout"` TombstoneLifetime time.Duration `flag:"tombstone-lifetime"` - Logger logger + Logger Logger } func NewOptions() *Options { diff --git a/nsqlookupd/registration_db_test.go b/nsqlookupd/registration_db_test.go index b24a80dd2..b963d1624 100644 --- a/nsqlookupd/registration_db_test.go +++ b/nsqlookupd/registration_db_test.go @@ -3,6 +3,8 @@ package nsqlookupd import ( "testing" "time" + + "github.com/nsqio/nsq/internal/test" ) func TestRegistrationDB(t *testing.T) { @@ -27,63 +29,70 @@ func TestRegistrationDB(t *testing.T) { // find producers r := db.FindRegistrations("c", "*", "").Keys() - equal(t, len(r), 1) - equal(t, r[0], "a") + test.Equal(t, 1, len(r)) + test.Equal(t, "a", r[0]) p := db.FindProducers("t", "*", "") - equal(t, len(p), 1) + t.Logf("%s", p) + test.Equal(t, 1, len(p)) p = db.FindProducers("c", "*", "") - equal(t, len(p), 2) + t.Logf("%s", p) + test.Equal(t, 2, len(p)) p = db.FindProducers("c", "a", "") - equal(t, len(p), 2) + t.Logf("%s", p) + test.Equal(t, 2, len(p)) p = db.FindProducers("c", "*", "b") - equal(t, len(p), 1) - equal(t, p[0].peerInfo.id, p2.peerInfo.id) + t.Logf("%s", p) + test.Equal(t, 1, len(p)) + test.Equal(t, p2.peerInfo.id, p[0].peerInfo.id) // filter by active - equal(t, len(p.FilterByActive(sec30, sec30)), 0) + test.Equal(t, 0, len(p.FilterByActive(sec30, sec30))) p2.peerInfo.lastUpdate = time.Now().UnixNano() - equal(t, len(p.FilterByActive(sec30, sec30)), 1) + test.Equal(t, 1, len(p.FilterByActive(sec30, sec30))) p = db.FindProducers("c", "*", "") - equal(t, len(p.FilterByActive(sec30, sec30)), 1) + t.Logf("%s", p) + test.Equal(t, 1, len(p.FilterByActive(sec30, sec30))) // tombstoning fewSecAgo := time.Now().Add(-5 * time.Second).UnixNano() p1.peerInfo.lastUpdate = fewSecAgo p2.peerInfo.lastUpdate = fewSecAgo - equal(t, len(p.FilterByActive(sec30, sec30)), 2) + test.Equal(t, 2, len(p.FilterByActive(sec30, sec30))) p1.Tombstone() - equal(t, len(p.FilterByActive(sec30, sec30)), 1) + test.Equal(t, 1, len(p.FilterByActive(sec30, sec30))) time.Sleep(10 * time.Millisecond) - equal(t, len(p.FilterByActive(sec30, 5*time.Millisecond)), 2) + test.Equal(t, 2, len(p.FilterByActive(sec30, 5*time.Millisecond))) // make sure we can still retrieve p1 from another registration see #148 - equal(t, len(db.FindProducers("t", "*", "").FilterByActive(sec30, sec30)), 1) + test.Equal(t, 1, len(db.FindProducers("t", "*", "").FilterByActive(sec30, sec30))) // keys and subkeys k := db.FindRegistrations("c", "b", "").Keys() - equal(t, len(k), 0) + test.Equal(t, 0, len(k)) k = db.FindRegistrations("c", "a", "").Keys() - equal(t, len(k), 1) - equal(t, k[0], "a") + test.Equal(t, 1, len(k)) + test.Equal(t, "a", k[0]) k = db.FindRegistrations("c", "*", "b").SubKeys() - equal(t, len(k), 1) - equal(t, k[0], "b") + test.Equal(t, 1, len(k)) + test.Equal(t, "b", k[0]) // removing producers db.RemoveProducer(Registration{"c", "a", ""}, p1.peerInfo.id) p = db.FindProducers("c", "*", "*") - equal(t, len(p), 1) + t.Logf("%s", p) + test.Equal(t, 1, len(p)) db.RemoveProducer(Registration{"c", "a", ""}, p2.peerInfo.id) db.RemoveProducer(Registration{"c", "a", "b"}, p2.peerInfo.id) p = db.FindProducers("c", "*", "*") - equal(t, len(p), 0) + t.Logf("%s", p) + test.Equal(t, 0, len(p)) // do some key removals k = db.FindRegistrations("c", "*", "*").Keys() - equal(t, len(k), 2) + test.Equal(t, 2, len(k)) db.RemoveRegistration(Registration{"c", "a", ""}) db.RemoveRegistration(Registration{"c", "a", "b"}) k = db.FindRegistrations("c", "*", "*").Keys() - equal(t, len(k), 0) + test.Equal(t, 0, len(k)) }