Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: support deferred PUB #293

Merged
merged 2 commits into from
May 3, 2015
Merged

nsqd: support deferred PUB #293

merged 2 commits into from
May 3, 2015

Conversation

mreiferson
Copy link
Member

This came up on the mailing list: https://groups.google.com/forum/#!topic/nsq-users/HkoOMygaHK0 - just opening the issue here for others to weigh in and gauge interest.

This would allow you to PUB a message that goes directly to the deferred priority queue with the specified delay.

Also, It's complicated a bit by #34.

@mreiferson
Copy link
Member Author

@jayridge always wanted this

@jayridge
Copy link

this feature might persuade me to use nsq for my current project.

@mreiferson
Copy link
Member Author

😈

There are many questions. Would be curious to hear about the use case @jayridge.

Given the fact that you PUB to a topic and topics do not currently have any concept of a deferred queue, what would be the expected behavior?

Option 1

  • Add a deferred queue to topics.
  • If a message is published with a defer it goes directly to said queue.
  • Every channel for that topic would receive the message delayed.

Is this desirable?

Option 2

  • Specify a defer delay at the channel level (ala #ephemeral)
  • All messages on that channel are deferred
  • No way for a publisher to specify a defer
  • Some channels on a topic could receive the message immediately

The problem with this approach is that, when publishing, there is no concept of channel and thus no way for a publisher to be able to say, on a per-channel level, to defer. This leaves channel creation as the only obvious implementation.

@jayridge
Copy link

The publisher should be able to specify defer timeout per message.

Primary use case is checking on a tasks progress where task is created by other party. I have also used this feature ( visibility? ) in sms where I needed a one shot after.

If there are only two options, I like option un.

@mreiferson mreiferson mentioned this pull request Feb 6, 2014
@deedubs
Copy link

deedubs commented Feb 21, 2014

@mreiferson A use case:

We have tasks we assign to people, I'd like to be able to push a message onto the queue to check if the user has completed it in the allotted time or if it should be unlocked and made available again.

@mreiferson
Copy link
Member Author

thanks @deedubs

For the record, you can already implement what you describe "outside the core" by embedding your defer timeout in your message body and having your consumer read that field and, if the deadline hasn't expired, REQ the message with the appropriate defer.

The downside of this approach is the overhead of having to receive the message the first time in order to defer it (and because it is consumer controlled rather than producer controlled).

In my mind, this issue is about justifying making this functionality "first class"... i.e. are the downsides I just listed really that bad vs. the complexity of the proposed changes inside nsqd?

@tj
Copy link
Contributor

tj commented Mar 17, 2014

+1 we could use this as well, I was planning on going with the REQ technique. That should work fine I guess the only upside to adding official support is some visibility in nsqadmin, not sure if it's worth it or not but thought I would mention we'll be using it for deferred work as well :D

@AlekSi
Copy link
Contributor

AlekSi commented May 19, 2014

+1

@chaosue
Copy link

chaosue commented Jul 31, 2014

+1 we currently works with beanstalkd thats has this feature which is very useful in our production.

@victorquinn
Copy link
Contributor

FYI a workaround we used was to create a delay topic, push messages into that delay topic with the name of the topic we actually want it to end up in along with the data it needs and the time at which it should be delayed until. For example, we would send a message like this to our delay topic:

{
    "timestamp": 1409763037301,
    "topic": "<actual_topic_for_message>",
    "data": "<data_for_this_message>"
}

The timestamp is an absolute timestamp, in milliseconds calculated at the time we put the item into the queue which represents the time at which it is supposed to be delayed until. This timestamp is in GMT so it is agnostic of timezone and is necessary up front because the data in the queued item is immutable so we must calculate it at time of publish (in other words there is no good way to have it be a relative timestamp, e.g. do this in 15000ms).

When a channel listening on the delay topic gets one of these messages, the first thing it does is inspect the body to see if the timestamp for this message is later than the current time. If so, it triggers a requeue for the difference. If not, it publishes the new message to the end topic.

Perhaps best illustrated by example. So if it is currently 2:45pm and I want to send an email in 15 minutes (by sending a message to the email topic), I actually send the following message to the delay topic:

{
    "timestamp": 1409763037301,
    "topic": "email",
    "data": {
        "to": "victor@socialradar.com",
        "subject": "Test",
        "message": "Hey, this is a test"
    }
}

Where the timestamp is 3:00pm in GMT, the time until which this message is supposed to be delayed.

The consumer will get it, inspect it, notice that it is currently 2:45pm and the message is meant to go out at 3:00pm so the consumer will requeue the message for 15 minutes in the future. Then at 3:00pm when that requeued message is picked up again by this consumer, it will publish it to the appropriate topic, in this case email.

So a simplified version of our code that listens to the delay topic looks a bit like this (this happens to be Node.js, using the nsq.js module):

reader.on("message", function(message) {
    var body = message.json(), now = new Date().getTime();

    if (body.timestamp > now) {
        message.requeue(body.timestamp - now);
    } else {
        writer.publish(body.topic, body.data);
    }
});

This is how we implemented an "initial delay" and it may be useful for anyone else trying to do the same unless or until it arrives in core NSQ. This is basically a more concrete example of what @mreiferson described above.

@zygis
Copy link

zygis commented Nov 25, 2014

+1

1 similar comment
@jnankin
Copy link

jnankin commented Jan 27, 2015

+1

@mreiferson
Copy link
Member Author

Here goes nothing... thoughts on this approach?

@mreiferson
Copy link
Member Author

(the tests are failing because of a dependency on go-nsq changes, which is unfortunate)

@jehiah
Copy link
Member

jehiah commented May 1, 2015

actually comes out pretty cleanly. 👍

@mreiferson
Copy link
Member Author

@jehiah added HTTP support. Note, we're explicitly not exposing deferred pub for MPUB varieties. These tests should pass.

@mreiferson mreiferson force-pushed the dpub_293 branch 3 times, most recently from ba691d7 to 287d57b Compare May 3, 2015 14:23
@jehiah
Copy link
Member

jehiah commented May 3, 2015

👍 thoughts on if/how this should be exposed in nsq_to_nsq?

jehiah added a commit that referenced this pull request May 3, 2015
@jehiah jehiah merged commit a63ec33 into nsqio:master May 3, 2015
@mreiferson mreiferson deleted the dpub_293 branch May 3, 2015 18:51
@mreiferson
Copy link
Member Author

thoughts on if/how this should be exposed in nsq_to_nsq?

Do we need to support it there? Arguably, it's already got too many knobs. Supporting this at all was already a question, having to write your own consumer to use it when copying streams around is too much to ask? 😁

@jehiah
Copy link
Member

jehiah commented May 3, 2015

Yeah i hear ya. I mean the obvious answer is it's the one spot where you would want to have a --defer-msg=60s type option.

Since this isn't a "reader option" it would need to be exposed directly, and we don't (yet?) support config files there so ... I'm in favor of adding there.

@jayridge
Copy link

jayridge commented May 3, 2015

its like christmas boys

@mreiferson
Copy link
Member Author

we've been had

@zygis
Copy link

zygis commented May 6, 2015

How about MDPUB or DMPUB (Multi Deferred Publish)?

@mreiferson
Copy link
Member Author

@zygis I alluded to this in my comment above, but I intentionally didn't want to expose a means to defer en masse a large set of messages. In the current implementation, nsqd keeps deferred messages entirely in memory (i.e. they do not overflow to disk).

@zygis
Copy link

zygis commented May 7, 2015

Well...
1000xDPUB - same result with ~1000 network calls
1xMDPUB - same result with ~1 network call

When deferred duration is few seconds maybe this limitation makes sence, but if duration is minutes or even hours I think this limitation solves nothing.

@mreiferson
Copy link
Member Author

@zygis I realize that. I'm concerned with the behaviors that we encourage here. I can be convinced, it just didn't seem like the obvious use case.

@jehiah
Copy link
Member

jehiah commented May 7, 2015

@zygis if the client connection buffers the socket there is little network difference between those two different protocol messages. You can pipeline a thousand DPUB just like a MDPUB would effectively provide. (there is minimal difference in command overhead between the two approaches)

@zygis
Copy link

zygis commented May 7, 2015

@mreiferson I just do lot of writes to DB and via nsq notifying rest part of system do some computations. The main problem is that reads are done from slaves in other continent, and some times written data still not available, because of cross-continent replication lag. So DPUB is acceptable solution. But few weeks ago switched from PUB(client nsq.js) to MPUB(client go-nsq) because of networking problems between servers... MDPUB could be perfect solution.

@jehiah not sure how it can be done with go-nsq client

@jehiah
Copy link
Member

jehiah commented May 7, 2015

@zygis What go-nsq needs to expose for you is the equivlant of Producer.DeferredPublishAsync. The *Async commands allow you to pipeline multiple requests for throughput.

@zygis
Copy link

zygis commented May 7, 2015

@jehiah Thanks, I will try this.

@jonathannorris
Copy link

I've been interested in using nsq for a while, we have a potential use case coming up that requires queuing of millions of deferred messages for up to a day or so. I was wondering if there are any potential performance pitfalls with this, other then nsq storing millions of messages in memory (and not backing them up on disk). It's probably not the right solution for the problem, but I'm interested to hear from @jehiah @mreiferson on this.

@ploxiln
Copy link
Member

ploxiln commented Sep 17, 2015

just a general purpose PSA: For scheduling functionality, you could use a database, with an "expires" field/column, with an index on that field/column. Have a process query the database once a minute or so for "expires < $now", process those results, then delete those rows/documents. You can limit the query to return, say, 100 results, and you can re-run the query immediately after processing&deleting if there were any results.

That's roughly what's done for a few parts of @jayridge's current project, and he grudgingly uses nsq as well anyway ;)

@mreiferson
Copy link
Member Author

@jonathannorris there aren't any other performance implications, there's just the durability question. Whether it's the right solution to the problem depends on your requirements... i.e. can you afford to lose messages that are in memory?

@mreiferson
Copy link
Member Author

That's roughly what's done for a few parts of @jayridge's current project, and he grudgingly uses nsq as well anyway ;)

I think that's a valid statement for any technology @jayridge uses.

@jayridge
Copy link

true that

@captainblue2013
Copy link

So ? Can anyone tell me how to use DPUB ?
There is no concept of DPUB in http://nsq.io/components/nsqd.html#post-pub

@mreiferson
Copy link
Member Author

@captainblue2013 we should obviously document this - just pass the ?defer=x param to /pub, see https://github.com/nsqio/nsq/blob/master/nsqd/http.go#L226-L237

@captainblue2013
Copy link

@mreiferson I did it after read the source code

@deoxen0n2 deoxen0n2 mentioned this pull request Oct 9, 2016
@kgdev
Copy link

kgdev commented Jan 12, 2017

I found a potential bug, I switched our code from using PUB to DPUB recently, and the message got handled by the client within the specified period successfully, which was good.
However, when I check the status in localhost:4171, the "Requeued" count for that channel keeps increasing (which is not the case for PUB).

Can someone take a look into it?

image

@sakop
Copy link

sakop commented Jan 12, 2017

@kgdev It is very easy to reproduce this bug, just exec this

curl -d "aDa" 'http://127.0.0.1:4151/pub?defer=1000&topic=aa'
and you will see Requeued keeps increasing while Messages remains 0

@ploxiln
Copy link
Member

ploxiln commented Jan 12, 2017

This makes sense because nsqd treats it just like a message on which the consumer called "requeue(delay=...)". It could be improved - but for that you should definitely open a new separate issue.

(Also, nsqd was not designed as a scheduling service, and thus has a few other problems with this use case.)

@kgdev
Copy link

kgdev commented Jan 12, 2017 via email

@mreiferson
Copy link
Member Author

@kgdev I believe this accounting issue was resolved in #805

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.