-
Notifications
You must be signed in to change notification settings - Fork 120
/
service.py
122 lines (86 loc) · 3.42 KB
/
service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# -*- coding:utf-8 -*-
from optparse import OptionParser, OptionGroup
from tornadoweb import *
from json import loads, dumps
from os import wait, fork, getpid, getppid, killpg, waitpid
from multiprocessing import cpu_count
from signal import signal, pause, SIGCHLD, SIGINT, SIGTERM, SIGUSR1, SIGUSR2, SIG_IGN
from redis import Redis, ConnectionError
#from settings import *
def unpack(data):
pack = loads(data)
return pack["cmd"], pack["data"]
class Consumer(object):
def __init__(self):
self._redis = Redis(host = __conf__.REDIS_HOST, port = __conf__.REDIS_PORT, \
password = __conf__.REDIS_PASS, db = __conf__.REDIS_DB)
def consume(self):
return unpack(self._redis.blpop(__conf__.REDIS_CHANNEL)[1])
def close(self):
if hasattr(self, "_redis"):
self._redis.connection_pool.disconnect()
class Service(object):
def __init__(self):
self._processes = cpu_count()
#self._processes = 1
self._consumer = Consumer()
self._parent = getpid()
self._services = self._get_services()
def _signal(self):
def sig_handler(signum, frame):
pid = getpid()
if signum in (SIGINT, SIGTERM):
if pid == self._parent:
signal(SIGCHLD, SIG_IGN)
killpg(self._parent, SIGUSR1)
elif signum == SIGCHLD:
if pid == self._parent:
print ("sub process {0} exit...".format(wait()))
elif signum == SIGUSR1:
print ("process {0} exit...".format(pid == self._parent and pid or (pid, self._parent)))
exit(0)
signal(SIGINT, sig_handler)
signal(SIGTERM, sig_handler)
signal(SIGCHLD, sig_handler)
signal(SIGUSR1, sig_handler)
def _get_services(self):
from logic import service
services = dict((item, getattr(service, item)) for item in dir(service) if item.startswith("service_"))
for k, v in services.items():
print (k, v)
return services
def _run(self):
for i in range(self._processes):
if fork() > 0: continue
try:
while True:
cmd, data = self._consumer.consume()
print (cmd, data)
srv_func = self._services.get(cmd)
if srv_func:
try:
srv_func(data)
except Exception as e:
import traceback
traceback.print_exc()
except ConnectionError as e:
import traceback
traceback.print_exc()
print ("Exception {0}".format(getpid()), ":", e.message)
exit(0)
def run(self):
self._signal()
try:
self._run()
except RuntimeError:
print ("ERROR", ": Is redis running?")
exit(-1)
while True: pause()
def _get_opt():
parser = OptionParser("%prog [options]", version="%prog v0.9")
parser.add_option("--config", dest = "config", default = "settings.py", help = "config")
return parser.parse_args()
if __name__ == "__main__":
opts, args = _get_opt()
ConfigLoader.load(opts.config)
Service().run()