Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests/benchmarks should clean up after themselves #553

Merged
merged 1 commit into from
May 1, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"os"
"strconv"
"testing"
"time"
Expand All @@ -11,6 +12,7 @@ func TestPutMessage(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_put_message" + strconv.Itoa(int(time.Now().Unix()))
Expand All @@ -31,6 +33,7 @@ func TestPutMessage2Chan(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_put_message_2chan" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -58,6 +61,7 @@ func TestInFlightWorker(t *testing.T) {
opts.Logger = newTestLogger(t)
opts.MsgTimeout = 100 * time.Millisecond
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -98,6 +102,7 @@ func TestChannelEmpty(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -130,6 +135,7 @@ func TestChannelEmptyConsumer(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

conn, _ := mustConnectNSQD(tcpAddr)
Expand Down
69 changes: 58 additions & 11 deletions nsqd/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package nsqd

import (
"bufio"
"fmt"
"io/ioutil"
"os"
"path"
"strconv"
Expand All @@ -15,12 +17,17 @@ func TestDiskQueue(t *testing.T) {
l := newTestLogger(t)

dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 1024, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1024, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

msg := []byte("test")
err := dq.Put(msg)
err = dq.Put(msg)
equal(t, err, nil)
equal(t, dq.Depth(), int64(1))

Expand All @@ -31,7 +38,12 @@ func TestDiskQueue(t *testing.T) {
func TestDiskQueueRoll(t *testing.T) {
l := newTestLogger(t)
dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 100, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 100, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

Expand All @@ -55,7 +67,12 @@ func assertFileNotExist(t *testing.T, fn string) {
func TestDiskQueueEmpty(t *testing.T) {
l := newTestLogger(t)
dqName := "test_disk_queue_empty" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 100, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 100, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

Expand Down Expand Up @@ -118,7 +135,12 @@ func TestDiskQueueEmpty(t *testing.T) {
func TestDiskQueueCorruption(t *testing.T) {
l := newTestLogger(t)
dqName := "test_disk_queue_corruption" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 1000, 5, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1000, 5, 2*time.Second, l)

msg := make([]byte, 123)
for i := 0; i < 25; i++ {
Expand Down Expand Up @@ -149,7 +171,12 @@ func TestDiskQueueTorture(t *testing.T) {

l := newTestLogger(t)
dqName := "test_disk_queue_torture" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 262144, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 262144, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

Expand Down Expand Up @@ -190,7 +217,7 @@ func TestDiskQueueTorture(t *testing.T) {

t.Logf("restarting diskqueue")

dq = newDiskQueue(dqName, os.TempDir(), 262144, 2500, 2*time.Second, l)
dq = newDiskQueue(dqName, tmpDir, 262144, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), depth)

Expand Down Expand Up @@ -233,7 +260,12 @@ func BenchmarkDiskQueuePut(b *testing.B) {
b.StopTimer()
l := newTestLogger(b)
dqName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 1024768*100, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1024768*100, 2500, 2*time.Second, l)
size := 1024
b.SetBytes(int64(size))
data := make([]byte, size)
Expand All @@ -247,7 +279,12 @@ func BenchmarkDiskQueuePut(b *testing.B) {
func BenchmarkDiskWrite(b *testing.B) {
b.StopTimer()
fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
f, _ := os.OpenFile(path.Join(os.TempDir(), fileName), os.O_RDWR|os.O_CREATE, 0600)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
f, _ := os.OpenFile(path.Join(tmpDir, fileName), os.O_RDWR|os.O_CREATE, 0600)
size := 256
b.SetBytes(int64(size))
data := make([]byte, size)
Expand All @@ -262,7 +299,12 @@ func BenchmarkDiskWrite(b *testing.B) {
func BenchmarkDiskWriteBuffered(b *testing.B) {
b.StopTimer()
fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
f, _ := os.OpenFile(path.Join(os.TempDir(), fileName), os.O_RDWR|os.O_CREATE, 0600)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
f, _ := os.OpenFile(path.Join(tmpDir, fileName), os.O_RDWR|os.O_CREATE, 0600)
size := 256
b.SetBytes(int64(size))
data := make([]byte, size)
Expand All @@ -286,7 +328,12 @@ func BenchmarkDiskQueueGet(b *testing.B) {
b.StopTimer()
l := newTestLogger(b)
dqName := "bench_disk_queue_get" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 1024768, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1024768, 2500, 2*time.Second, l)
for i := 0; i < b.N; i++ {
dq.Put([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
}
Expand Down
25 changes: 20 additions & 5 deletions nsqd/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net"
"net/http"
"os"
"runtime"
"strconv"
"sync"
Expand All @@ -21,6 +22,7 @@ func TestHTTPput(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_put" + strconv.Itoa(int(time.Now().Unix()))
Expand All @@ -43,6 +45,7 @@ func TestHTTPputEmpty(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_put_empty" + strconv.Itoa(int(time.Now().Unix()))
Expand All @@ -66,6 +69,7 @@ func TestHTTPmput(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_mput" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -94,6 +98,7 @@ func TestHTTPmputEmpty(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_mput_empty" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -124,6 +129,7 @@ func TestHTTPmputBinary(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_mput_bin" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -156,6 +162,7 @@ func TestHTTPSRequire(t *testing.T) {
opts.TLSKey = "./test/certs/server.key"
opts.TLSClientAuthPolicy = "require"
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_put_req" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -201,10 +208,10 @@ func TestHTTPSRequireVerify(t *testing.T) {
opts.TLSRootCAFile = "./test/certs/ca.pem"
opts.TLSClientAuthPolicy = "require-verify"
_, httpAddr, nsqd := mustStartNSQD(opts)
httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)

defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)
topicName := "test_http_put_req_verf" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)

Expand Down Expand Up @@ -266,7 +273,7 @@ func TestTLSRequireVerifyExceptHTTP(t *testing.T) {
opts.TLSClientAuthPolicy = "require-verify"
opts.TLSRequired = TLSRequiredExceptHTTP
_, httpAddr, nsqd := mustStartNSQD(opts)

defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_req_verf_except_http" + strconv.Itoa(int(time.Now().Unix()))
Expand All @@ -290,6 +297,7 @@ func TestHTTPDeprecatedTopicChannel(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_topic_channel" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -386,6 +394,7 @@ func TestHTTPTransitionTopicChannel(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

client := http.Client{}
Expand Down Expand Up @@ -507,6 +516,7 @@ func TestHTTPV1TopicChannel(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_topic_channel2" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -614,6 +624,7 @@ func BenchmarkHTTPput(b *testing.B) {
opts.Logger = newTestLogger(b)
opts.MemQueueSize = int64(b.N)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
msg := make([]byte, 256)
topicName := "bench_http_put" + strconv.Itoa(int(time.Now().Unix()))
url := fmt.Sprintf("http://%s/put?topic=%s", httpAddr, topicName)
Expand Down Expand Up @@ -653,9 +664,11 @@ func TestHTTPgetStatusJSON(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

nsqd.startTime = testTime
expectedJSON := fmt.Sprintf(`{"status_code":200,"status_txt":"OK","data":{"version":"%v","health":"OK","start_time":%v,"topics":[]}}`, version.Binary, testTime.Unix())
defer nsqd.Exit()

url := fmt.Sprintf("http://%s/stats?format=json", httpAddr)
resp, err := http.Get(url)
Expand All @@ -671,9 +684,11 @@ func TestHTTPgetStatusText(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
nsqd.startTime = testTime
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

nsqd.startTime = testTime

url := fmt.Sprintf("http://%s/stats?format=text", httpAddr)
resp, err := http.Get(url)
equal(t, err, nil)
Expand Down
7 changes: 7 additions & 0 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nsqd
import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -81,6 +82,9 @@ func TestStartup(t *testing.T) {
opts.MemQueueSize = 100
opts.MaxBytesPerFile = 10240
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)

origDataPath := opts.DataPath

topicName := "nsqd_test" + strconv.Itoa(int(time.Now().Unix()))

Expand Down Expand Up @@ -147,6 +151,7 @@ func TestStartup(t *testing.T) {
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 100
opts.MaxBytesPerFile = 10240
opts.DataPath = origDataPath
_, _, nsqd = mustStartNSQD(opts)

go func() {
Expand Down Expand Up @@ -190,6 +195,7 @@ func TestEphemeralTopicsAndChannels(t *testing.T) {
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 100
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)

topicName := "ephemeral_topic" + strconv.Itoa(int(time.Now().Unix())) + "#ephemeral"
doneExitChan := make(chan int)
Expand Down Expand Up @@ -240,6 +246,7 @@ func TestPauseMetadata(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

// avoid concurrency issue of async PersistMetadata() calls
Expand Down
Loading