-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: peek head item #34
Comments
Something along the lines of: diff --git persistqueue/queue.py persistqueue/queue.py
index 8c27a46..51f588a 100644
--- persistqueue/queue.py
+++ persistqueue/queue.py
@@ -127,7 +127,7 @@ class Queue(object):
def put_nowait(self, item):
return self.put(item, False)
- def get(self, block=True, timeout=None):
+ def get(self, block=True, timeout=None, peek=False):
self.not_empty.acquire()
try:
if not block:
@@ -145,32 +145,33 @@ class Queue(object):
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
- item = self._get()
+ item = self._get(peek)
self.not_full.notify()
return item
finally:
self.not_empty.release()
- def get_nowait(self):
- return self.get(False)
+ def get_nowait(self, peek=False):
+ return self.get(False, peek=peek)
- def _get(self):
+ def _get(self, peek=False):
tnum, tcnt, toffset = self.info['tail']
hnum, hcnt, _ = self.info['head']
if [tnum, tcnt] >= [hnum, hcnt]:
return None
data = pickle.load(self.tailf)
- toffset = self.tailf.tell()
- tcnt += 1
- if tcnt == self.info['chunksize'] and tnum <= hnum:
- tcnt = toffset = 0
- tnum += 1
- self.tailf.close()
- os.remove(self.tailf.name)
- self.tailf = self._openchunk(tnum)
- self.info['size'] -= 1
- self.info['tail'] = [tnum, tcnt, toffset]
- self.update_info = True
+ if not peek:
+ toffset = self.tailf.tell()
+ tcnt += 1
+ if tcnt == self.info['chunksize'] and tnum <= hnum:
+ tcnt = toffset = 0
+ tnum += 1
+ self.tailf.close()
+ os.remove(self.tailf.name)
+ self.tailf = self._openchunk(tnum)
+ self.info['size'] -= 1
+ self.info['tail'] = [tnum, tcnt, toffset]
+ self.update_info = True
return data
def task_done(self): |
Thanks for the great suggestion. Looks like you have make some code changes locally, It's a pleasing thing that you can send the PR to me. I am OK to add this feature. Thanks |
Cheers. Even nicer might be to add explicit |
Agreed, looking forward to your change:) |
Hi Peter. I'm happy to supply the queue.py part but I've no idea how to do the SQL version (which is why I didn't raise a PR). Is that something you can help with? |
no problem, you could send the file part, i will later add the sql part. |
I am closing this now. Please reopen it if this doesn't address your concern. |
@peter-wangxu Can you explain how |
Hi. It'd be handy to be able to peek the head item in the queue so you can deal with it and only pop off the queue when you've finished.
The value of a persistent queue is lessened without this feature. For example, if the reader crashes while uploading the thing it read into the cloud, or the cloud service is offline so it fails to upload, etc. I believe this is how more monolithic stuff like Kafka does things - you commit when you've finished consuming so Kafka knows it can move on.
The text was updated successfully, but these errors were encountered: