-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_pyqueued.py
109 lines (94 loc) · 3.66 KB
/
test_pyqueued.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# coding: utf-8
import pyqueued
import random
import requests
import time
import unittest
def random_queue_name():
return "test_pyqueued_%d" % random.randint(0, 99999999)
class TestClient(unittest.TestCase):
def test_init(self):
c = pyqueued.Client()
self.assertTrue(c is not None)
def test_enqueue(self):
Q = random_queue_name()
c = pyqueued.Client()
location = c.enqueue(Q, "Hello")
self.assertTrue(location.startswith(c.queue_url(Q)))
_, _ = c.dequeue(Q)
self.assertEquals(c.stats(Q), {u'timeouts': 0, u'depth': 0, u'enqueued': 1, u'dequeued': 1})
def test_dequeue(self):
Q = random_queue_name()
c = pyqueued.Client()
c.enqueue(Q, "Hello")
msg, location = c.dequeue(Q)
self.assertEquals(msg, "Hello")
self.assertTrue(location.startswith(c.queue_url(Q)))
def test_dequeue_timeout(self):
Q = random_queue_name()
c = pyqueued.Client()
c.enqueue(Q, "Hello")
msg, location = c.dequeue(Q, timeout=1)
self.assertEquals(msg, "Hello")
self.assertTrue(location.startswith(c.queue_url(Q)))
id = location.split('/')[-1]
self.assertEquals(c.stats(Q), {u'timeouts': 0, u'depth': 0, u'enqueued': 1, u'dequeued': 1})
c.complete(Q, id)
self.assertEquals(c.stats(Q), {u'timeouts': 0, u'depth': 0, u'enqueued': 1, u'dequeued': 1})
def test_dequeue_timeout_expire(self):
Q = random_queue_name()
c = pyqueued.Client()
c.enqueue(Q, "Hello")
msg, location = c.dequeue(Q, timeout=1)
self.assertEquals(msg, "Hello")
self.assertTrue(location.startswith(c.queue_url(Q)))
id = location.split('/')[-1]
self.assertEquals(c.stats(Q), {u'timeouts': 0, u'depth': 0, u'enqueued': 1, u'dequeued': 1})
time.sleep(1.1)
self.assertEquals(c.stats(Q), {u'timeouts': 1, u'depth': 1, u'enqueued': 2, u'dequeued': 1})
def test_dequeue_wait(self):
Q = random_queue_name()
c = pyqueued.Client()
with self.assertRaises(RuntimeError):
msg, location = c.dequeue(Q, wait=0.2)
def test_get_requests(self):
Q = random_queue_name()
c = pyqueued.Client()
location = c.enqueue(Q, "Hello")
r = requests.get(location)
self.assertEquals(r.status_code, 200)
self.assertEquals(r.text, "Hello")
def test_get(self):
Q = random_queue_name()
c = pyqueued.Client()
location = c.enqueue(Q, "Hello")
id = location.split('/')[-1]
self.assertEquals(c.get(Q, id), "Hello")
def test_complete_by_url(self):
Q = random_queue_name()
c = pyqueued.Client()
location = c.enqueue(Q, "Hello")
msg, location = c.dequeue(Q)
with self.assertRaises(RuntimeError):
c.complete_by_url(location)
def test_complete_by_url_mismatch(self):
Q = random_queue_name()
c = pyqueued.Client()
location = c.enqueue(Q, "Hello")
c.dequeue(Q, timeout=1)
with self.assertRaises(RuntimeError):
c.complete_by_url('http://localhost:1111/q/123')
def test_complete_by_url_bad_input(self):
Q = random_queue_name()
c = pyqueued.Client()
location = c.enqueue(Q, "Hello")
c.dequeue(Q, timeout=1)
with self.assertRaises(ValueError):
c.complete_by_url('http://localhost:1111/q')
def test_complete(self):
Q = random_queue_name()
c = pyqueued.Client()
location = c.enqueue(Q, "Hello")
msg, location = c.dequeue(Q)
with self.assertRaises(RuntimeError):
c.complete(*location.split('/')[-2:])