-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathworker.py
executable file
·184 lines (159 loc) · 5.77 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python2
import uuid
import videooperator.encoder
import socket
from socket import error as SocketException
import os
import json
import logging
import time
class logfile(object):
'''
File-alike object to handle log information from FFmpeg
'''
def write(self, message):
print message
#pass
def close(self):
pass
class Worker(object):
'''
Worker class
'''
RECONNECTION_TIMEOUT = 10
def __init__(self, addr, port):
self._addr = addr
self._port = port
self._server = None
self._server_file = None
self._upload = None
self._connect_to_server()
def _send_message(self, socket_, message, raw=False):
if not raw:
encoded_message = json.dumps(message) + '\n'
else:
encoded_message = message
socket_.sendall(encoded_message)
def _unpack_message(self, message):
try:
# Unpack JSON
command_json = json.loads(message)
except:
logger.error('Mailformed message received')
logger.error(command)
return False
else:
return command_json
def _close_socket(self, socket_):
try:
socket_.shutdown(socket.SHUT_RDWR)
except SocketException:
result = False
else:
result = True
socket_.close()
return result
def _connect_to_server(self):
'''
Make connection to server
'''
logger = logging.getLogger(self.__class__.__name__)
self._server = self._server_file = None
while not self._server:
try:
self._server = socket.create_connection((self._addr, self._port))
except SocketException as e:
logger.error('Cannot connect to server')
logger.error(repr(e))
time.sleep(Worker.RECONNECTION_TIMEOUT)
self._server_file = self._server.makefile()
try:
self._send_message(self._server, {"action": "getjob", "worker_id": worker_id})
except SocketException:
logger.error('Cannot send message to server')
return False
logger.info('Connected to server')
return True
def _update_job_status(self, job_id, status):
sock = socket.create_connection((self._addr, self._port))
self._send_message(sock, {"action": "updatejob", "worker_id": worker_id, "job_id": job_id, "status": status})
self._close_socket(sock)
def terminate(self):
for thissocket in (self._server, self._upload):
if thissocket is not None:
self._close_socket(thissocket)
def run(self):
'''
Main worker loop
'''
logger = logging.getLogger(self.__class__.__name__)
while True:
print 'cycle'
if self._server is None:
logger.info("Reconnecting to server")
self._connect_to_server()
command = self._server_file.readline().strip()
if not command:
# socket closed, reconnect
logger.error('Connection lost')
self._close_socket(self._server)
self._server_file.close()
self._server = self._server_file = None
continue
command_json = self._unpack_message(command)
if not command_json:
logger.error('Mailformed command received')
logger.error(command)
break
action = command_json.get('action')
job_id = command_json.get('job_id')
encoding_arguments = command_json.get('encoding_arguments', '')
if action == 'ping':
# PING from server, do nothing
logger.info('Got ping command')
continue
elif action == 'do':
# Encode command from server
logger.info('Got encode command')
logger.info(command)
try:
# Creating upload socket
self._upload = socket.create_connection((self._addr, self._port))
self._send_message(self._upload, {
"action": "putjob",
"worker_id": worker_id,
"job_id": job_id
})
except Exception as e:
logger.error("Cannot create upload socket")
logger.error(repr(e))
break
logger.info('Starting encoder')
encoder = videooperator.encoder.Encoder()
encoder.encode(self._server_file, self._upload.makefile(), logfile(),
encoding_arguments = encoding_arguments)
logger.info('Encoder started')
encoder.join()
logger.info('Encoder joined')
map(self._close_socket, (self._upload, self._server))
self._upload = self._server = None
logger.info('Sockets closed')
result = encoder.get_result()
logger.info(result)
sendresult = 'success' if result['returncode'] == 0 else 'fail'
logger.info('Status upadting...')
self._update_job_status(job_id, sendresult)
print job_id
if __name__ == "__main__":
# Logging initialization
logging.basicConfig(
format='%(asctime)s %(levelname)s %(name)s: %(message)s',
datefmt='%d.%m.%Y %H:%M:%S',
level=logging.DEBUG)
# Generate worker id
worker_id = str(uuid.uuid4())
worker = Worker('localhost', 8000)
try:
worker.run()
except KeyboardInterrupt:
worker.terminate()