Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicated message ID when RDY is larger than a certain value #645

Closed
chaosue opened this issue Sep 7, 2015 · 1 comment
Closed

Duplicated message ID when RDY is larger than a certain value #645

chaosue opened this issue Sep 7, 2015 · 1 comment
Labels

Comments

@chaosue
Copy link

chaosue commented Sep 7, 2015

Steps

1. Consumer code

package main

import(gonsq "github.com/bitly/go-nsq"
    "sync"
    "os"
)

//var f *MF
type MF struct{
    *os.File
    lock *sync.RWMutex
}
func (f *MF)Write(b []byte)(int, error){
    f.lock.Lock()
    defer f.lock.Unlock()
    return f.File.Write(b)
}

func (f *MF)Output(calldepth int, s string) error{
    s += "\n"
    _, e := f.Write([]byte(s))
    calldepth = 0
    return e
}

func main(){

    mf , _  := os.OpenFile("/tmp/2.log", os.O_TRUNC|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
    f := &MF{File: mf, lock: &sync.RWMutex{}}
//  queue_channels := []string{"chn_x1", "chn_x2"}
    queue_channels := []string{"chn_x1"}
    maxInFlight := 100
    for _, qchan := range queue_channels{

        coc := 1
    for coc > 0{

        go func(qchan string){
            cfg := gonsq.NewConfig()
            cfg.MaxAttempts = 1000;

            c, e := gonsq.NewConsumer("test_topic", qchan,cfg)
            if e != nil{
                panic(e)
            }
            c.SetLogger(f, gonsq.LogLevelError)

            msgHandler := &TestHandler{}
            c.AddConcurrentHandlers(msgHandler, maxInFlight)
            c.ChangeMaxInFlight(maxInFlight)

            e = c.ConnectToNSQLookupds([]string{"127.0.0.1:4161"})

            if e != nil{
                panic(e)
            }
        }(qchan)
        coc -= 1
    }
    }


    ch := make(chan int)
    <-ch
}

type TestHandler struct {


}

func (_ *TestHandler)HandleMessage(message *gonsq.Message) error{
    return nil
}

2. NSQD configuration

node 1

data_path="/var/lib/nsq/"
http_address="127.0.0.1:2151"
tcp_address="0.0.0.0:2150"
nsqlookupd_tcp_addresses="127.0.0.1:4160"
sync_every=100 # number of messages per diskqueue fsync
mem_queue_size=200000
msg_timeout="10s"

node 2

data_path="/var/lib/nsq/"
http_address="127.0.0.1:2251"
tcp_address="0.0.0.0:2250"
nsqlookupd_tcp_addresses="127.0.0.1:4160"
sync_every=100 # number of messages per diskqueue fsync
mem_queue_size=200000
msg_timeout="10s"

3. start NSQD

GOMAXPROCS=5 nsqd -config=/etc/nsq/node-1.conf -max-rdy-count 10000 -verbose 2>/tmp/3.log
GOMAXPROCS=5 nsqd -config=/etc/nsq/node-2.conf -max-rdy-count 10000 -verbose 2>/tmp/3.log

4. Start consumer and run for a while

./consumer

5. check logs

cosumer logs
grep in /tmp/2.log
as we can see as below, there're errors says "ID not in flight".

ERR    1 [test_topic/chn_x1] (chaos-hot.local:2150) protocol error - E_FIN_FAILED FIN 08e4491335b0d004 failed ID not in flight
ERR    1 [test_topic/chn_x1] (chaos-hot.local:2150) protocol error - E_FIN_FAILED FIN 08e4491335b0d003 failed ID not in flight
ERR    1 [test_topic/chn_x1] (chaos-hot.local:2150) protocol error - E_FIN_FAILED FIN 08e449133770d003 failed ID not in flight
ERR    1 [test_topic/chn_x1] (chaos-hot.local:2150) protocol error - E_FIN_FAILED FIN 08e449133770d004 failed ID not in flight
ERR    1 [test_topic/chn_x1] (chaos-hot.local:2150) protocol error - E_FIN_FAILED FIN 08e449133770d005 failed ID not in flight
ERR    1 [test_topic/chn_x1] (chaos-hot.local:2150) protocol error - E_FIN_FAILED FIN 08e449133770d006 failed ID not in flight
ERR    1 [test_topic/chn_x1] (chaos-hot.local:2250) protocol error - E_FIN_FAILED FIN 08e449144ef0d005 failed ID not in flight

server side(nsqd) logs
grep 08e449144ef0d005 /tmp/3.log
check the following. The message ID "08e449144ef0d005" was assigned to two different messages (we can see that the message bodies are different). So the "FIN" command for the second message were to be failed.

[nsqd] 2015/09/07 11:03:33.311022 PROTOCOL(V2): writing msg(08e449144ef0d005) to client(127.0.0.1:55167) - TestMessageBlaBla:1441593906.8762381076812744
[nsqd] 2015/09/07 11:03:33.311144 PROTOCOL(V2): writing msg(08e449144ef0d005) to client(127.0.0.1:55167) - TestMessageBlaBla:1441593906.8771169185638428
[nsqd] 2015/09/07 11:03:33.311382 PROTOCOL(V2): [127.0.0.1:55167] [FIN 08e449144ef0d005]
[nsqd] 2015/09/07 11:03:33.311438 PROTOCOL(V2): [127.0.0.1:55167] [FIN 08e449144ef0d005]
[nsqd] 2015/09/07 11:03:33.311456 ERROR: [127.0.0.1:55167] - E_FIN_FAILED FIN 08e449144ef0d005 failed ID not in flight - ID not in flight

6. Versions

Go: go version go1.5 darwin/amd64
nsq: nsqd v0.3.6-alpha (built w/go1.5); git revision: 1746cc6

@mreiferson
Copy link
Member

@chaosue I see you closed the issue - did you figure out what the problem was?

If nothing else, it looks like you're configuring two nsqd on the same host with the same data_path, which could cause problems (see #583)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants