forked from gleicon/restmq
-
Notifications
You must be signed in to change notification settings - Fork 1
/
restmq_engine.py
executable file
·141 lines (116 loc) · 5 KB
/
restmq_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import simplejson
import cyclone.redis
from restmq import core
from optparse import OptionParser
from twisted.internet import defer, reactor
QUEUENAME = 'test'
@defer.inlineCallbacks
def test_operations(opt, args):
"""
test and docs for a redis based queue engine in python
based on tx-redis
"""
try:
rd = yield cyclone.redis.RedisConnectionPool()
except Exception, e:
print "Error creating redis pool %s" % e
defer.returnValue(None)
ro = core.RedisOperations(rd)
if opt.producer == True:
print "Running as producer"
uuid = yield ro.queue_add(QUEUENAME, simplejson.dumps({'value':'a value'}))
print 'uuid: %s' % uuid
if opt.consumer == True:
print "Running as consumer"
(policy, ret) = yield ro.queue_get( QUEUENAME)
if ret != None:
print "value: %s" % ret['value'] #simplejson.loads(ret['value'])
print "policy: %s" % policy
else:
print 'empty queue'
if opt.stats == True:
ll = yield ro.queue_stats(QUEUENAME)
print "list len: %s" % ll
sm = yield ro.queue_all()
print "all queues: %s" % sm
if opt.non_consumer == True:
print "Running as consumer"
(policy, ret) = yield ro.queue_get( QUEUENAME, softget=True)
if ret != None:
print "value: %s" % ret['value'] #simplejson.loads(ret['value'])
print "policy: %s" % policy
else:
print 'empty queue'
if opt.get_policy == True:
print "GET queue policy"
ret = yield ro.queue_policy_get(QUEUENAME)
print repr(ret)
if ret != None:
print "value: %s" % ret['value'] #simplejson.loads(ret['value'])
else:
print 'empty queue policy'
if opt.set_policy == True:
print "SET queue policy"
resp = yield ro.queue_policy_set(QUEUENAME, "roundrobin")
print 'resp: %s' % resp
if opt.get_del == True:
print "Running as getdel consumer"
(policy, ret) = yield ro.queue_getdel(QUEUENAME)
if ret != None and ret != False:
print "value: %s" % ret['value'] #simplejson.loads(ret['value'])
print "policy: %s" % policy
else:
print 'empty queue'
if opt.tail_mget == True:
print "Running as tail multiget"
(policy, ret) = yield ro.queue_tail(QUEUENAME)
if ret != None and ret != False:
print "value: %s" % repr(ret) #simplejson.loads(ret['value'])
print "policy: %s" % policy
else:
print 'empty queue'
if opt.count_objects == True:
print "Running as count object"
ret = yield ro.queue_count_elements(QUEUENAME)
if ret != None and ret != False:
print "value: %s" % repr(ret) #simplejson.loads(ret['value'])
else:
print 'empty queue'
if opt.queue_last_items == True:
print "Running as count object"
ret = yield ro.queue_last_items(QUEUENAME)
if ret != None and ret != False:
print "value: %s" % repr(ret) #simplejson.loads(ret['value'])
else:
print 'empty queue'
if opt.authorize == True:
print "Running authorization"
ret = yield ro.authorize(QUEUENAME, 'aaa123')
print ret
if opt.create_auth == True:
print "Creating auth record"
ret = yield ro._create_auth_record('aaa123', [QUEUENAME], ['create'])
print ret
def main():
p = OptionParser()
p.add_option("-p", "--producer", action="store_true", dest="producer", help="Run as producer")
p.add_option("-c", "--consumer", action="store_true", dest="consumer", help="Run as consumer")
p.add_option("-g", "--non-consumer", action="store_true", dest="non_consumer", help="Run as a non destructive consumer")
p.add_option("-s", "--stats", action="store_true", dest="stats", help="Stats")
p.add_option("-q", "--get_policy", action="store_true", dest="get_policy", help="Get queue policy")
p.add_option("-j", "--set_policy", action="store_true", dest="set_policy", help="Set queue policy")
p.add_option("-k", "--get_delete", action="store_true", dest="get_del", help="Consumer get del")
p.add_option("-t", "--tail_multiget", action="store_true", dest="tail_mget", help="Multi get 10 keys")
p.add_option("-u", "--count_objects", action="store_true", dest="count_objects", help="Count objects of a given queue")
p.add_option("-i", "--list_last_items", action="store_true", dest="queue_last_items", help="List the latest queue items")
# authorization tests
p.add_option("-a", "--authorize", action="store_true", dest="authorize", help="authorize a key for queues/privileges")
p.add_option("-r", "--create_auth", action="store_true", dest="create_auth", help="Create an authorization record")
(opt, args)=p.parse_args(sys.argv[1:])
test_operations(opt, args).addCallback(lambda ign: reactor.stop())
if __name__ == "__main__":
main()
reactor.run()