-
Notifications
You must be signed in to change notification settings - Fork 34
/
processors.py
510 lines (436 loc) · 20.4 KB
/
processors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# processors.py
from transformers import pipeline
import subprocess
import numpy as np
from PIL import Image
import fitz
import io
import logging
import tempfile
import os
import shutil
import glob
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from utils import ArchiveHandler, can_process_file, sort_files_by_priority
from config import (
MAX_FILE_SIZE, IMAGE_EXTENSIONS, VIDEO_EXTENSIONS,
NSFW_THRESHOLD, FFMPEG_MAX_FRAMES, FFMPEG_TIMEOUT,ARCHIVE_EXTENSIONS
)
# 配置日志
logger = logging.getLogger(__name__)
# 初始化模型
pipe = pipeline("image-classification", model="Falconsai/nsfw_image_detection", device=-1)
class VideoProcessor:
def __init__(self, video_path):
self.video_path = video_path
self.temp_dir = None
self.duration = None
self.frame_rate = None
self.total_frames = None
def _get_video_info(self):
"""获取视频基本信息"""
try:
# 使用 ffprobe 而不是 ffmpeg
duration_cmd = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-show_entries', 'stream=r_frame_rate',
'-select_streams', 'v',
'-of', 'json',
self.video_path
]
result = subprocess.run(
duration_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=FFMPEG_TIMEOUT
)
if result.returncode != 0:
raise Exception(f"Failed to get video info: {result.stderr.decode()}")
# 解析视频信息
import json
info = json.loads(result.stdout.decode())
# 获取时长
if 'format' in info and 'duration' in info['format']:
self.duration = float(info['format']['duration'])
else:
# 如果无法获取时长,使用替代命令
alt_duration_cmd = [
'ffmpeg',
'-i', self.video_path,
'-f', 'null',
'-'
]
result = subprocess.run(
alt_duration_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=FFMPEG_TIMEOUT
)
# 从stderr中解析时长信息
duration_str = result.stderr.decode()
import re
duration_match = re.search(r"Duration: (\d{2}):(\d{2}):(\d{2}.\d{2})", duration_str)
if duration_match:
hours, minutes, seconds = duration_match.groups()
self.duration = float(hours) * 3600 + float(minutes) * 60 + float(seconds)
else:
self.duration = 0
# 获取帧率
if 'streams' in info and info['streams'] and 'r_frame_rate' in info['streams'][0]:
fr_str = info['streams'][0]['r_frame_rate']
if '/' in fr_str:
fr_num, fr_den = map(int, fr_str.split('/'))
self.frame_rate = fr_num / fr_den if fr_den != 0 else 0
else:
self.frame_rate = float(fr_str)
else:
self.frame_rate = 25.0 # 默认帧率
# 计算总帧数
self.total_frames = int(self.duration * self.frame_rate) if self.duration and self.frame_rate else 0
logger.info(f"视频信息: 时长={self.duration:.2f}秒, "
f"帧率={self.frame_rate:.2f}fps, "
f"总帧数={self.total_frames}")
except subprocess.TimeoutExpired:
raise Exception("获取视频信息超时")
except Exception as e:
raise Exception(f"获取视频信息失败: {str(e)}")
def _extract_keyframes(self):
"""提取视频帧,使用固定帧率策略"""
try:
# 创建临时目录
self.temp_dir = tempfile.mkdtemp()
logger.info("开始提取视频帧...")
if not self.duration:
raise ValueError("视频信息不完整,请先调用 _get_video_info()")
# 计算采样帧率,添加安全检查
if self.duration < FFMPEG_MAX_FRAMES:
# 如果视频时长小于预期提取的帧数,则每秒提取一帧
fps = "1"
frames_to_extract = min(int(self.duration), FFMPEG_MAX_FRAMES)
else:
# 正常情况下的帧率计算
interval_seconds = max(1, int(self.duration / FFMPEG_MAX_FRAMES))
fps = f"1/{interval_seconds}"
frames_to_extract = FFMPEG_MAX_FRAMES
logger.info(f"视频总长: {self.duration:.2f}秒, FPS: {fps}, 计划提取帧数: {frames_to_extract}")
# 使用 fps filter 提取帧
extract_cmd = [
'ffmpeg',
'-i', self.video_path,
'-vf', f'fps={fps}', # 使用固定帧率
'-frame_pts', '1', # 输出时间戳
'-vframes', str(frames_to_extract), # 限制提取帧数
'-q:v', '2', # 高质量(1-31,1最好)
'-y', # 覆盖已存在文件
os.path.join(self.temp_dir, 'frame-%d.jpg')
]
# 执行提取命令
result = subprocess.run(
extract_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=FFMPEG_TIMEOUT,
text=True
)
if result.returncode != 0:
logger.error(f"提取帧失败,FFMPEG输出: {result.stderr}")
# 如果第一次提取失败,尝试使用更保守的设置
conservative_cmd = [
'ffmpeg',
'-i', self.video_path,
'-r', '1', # 强制输出帧率为1fps
'-vframes', str(frames_to_extract),
'-q:v', '2',
'-y',
os.path.join(self.temp_dir, 'frame-%d.jpg')
]
logger.info("尝试使用备选提取方法...")
result = subprocess.run(
conservative_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=FFMPEG_TIMEOUT,
text=True
)
if result.returncode != 0:
raise Exception(f"提取帧失败(备选方法): {result.stderr}")
# 获取所有提取的帧文件并排序
frames = sorted(glob.glob(os.path.join(self.temp_dir, 'frame-*.jpg')))
extracted_count = len(frames)
if extracted_count == 0:
raise Exception("未能提取到任何帧")
if extracted_count < frames_to_extract:
logger.warning(f"实际提取帧数({extracted_count})小于计划帧数({frames_to_extract})")
logger.info(f"成功提取 {extracted_count} 个帧")
return frames
except subprocess.TimeoutExpired:
logger.error("提取帧操作超时")
raise Exception(f"提取帧操作超时(超过 {FFMPEG_TIMEOUT} 秒)")
except Exception as e:
logger.error(f"提取帧失败: {str(e)}")
raise
finally:
# 注意:这里不要清理临时目录,因为返回的帧路径还需要被使用
# 清理工作应该在帧处理完成后进行
pass
def _process_frame(self, frame_path):
"""处理单个帧"""
try:
with Image.open(frame_path) as img:
result = process_image(img)
frame_num = int(Path(frame_path).stem.split('-')[1])
return frame_num, result
except Exception as e:
logger.error(f"处理帧 {frame_path} 失败: {str(e)}")
return None, None
def process(self):
"""按顺序处理视频文件"""
try:
# 获取视频信息
self._get_video_info()
# 提取关键帧
frame_files = self._extract_keyframes()
if not frame_files:
logger.warning("未能提取到任何关键帧")
return None
# 按顺序处理帧
last_result = None
for frame in sorted(frame_files):
frame_num, result = self._process_frame(frame)
if result is not None:
last_result = result
if result['nsfw'] > NSFW_THRESHOLD:
logger.info(f"在帧 {frame_num} 发现匹配内容")
return result
return last_result
except Exception as e:
logger.error(f"处理视频失败: {str(e)}")
raise
finally:
# 清理临时文件
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
logger.info("清理临时文件完成")
except Exception as e:
logger.error(f"清理临时文件失败: {str(e)}")
def process_image(image):
"""处理单张图片并返回检测结果"""
try:
logger.info("开始处理图片")
result = pipe(image)
nsfw_score = next((item['score'] for item in result if item['label'] == 'nsfw'), 0)
normal_score = next((item['score'] for item in result if item['label'] == 'normal'), 1)
logger.info(f"图片处理完成: NSFW={nsfw_score:.3f}, Normal={normal_score:.3f}")
return {
'nsfw': nsfw_score,
'normal': normal_score
}
except Exception as e:
logger.error(f"图片处理失败: {str(e)}")
raise Exception(f"Image processing failed: {str(e)}")
def process_pdf_file(pdf_stream):
"""处理PDF文件并检查内容"""
try:
logger.info("开始处理PDF文件")
doc = fitz.open(stream=pdf_stream, filetype="pdf")
total_pages = len(doc)
logger.info(f"PDF共有 {total_pages} 页")
last_result = None # 保存最后一次处理结果
for page_num in range(total_pages):
logger.info(f"正在处理第 {page_num + 1} 页")
page = doc[page_num]
image_list = page.get_images()
if len(image_list) > 0:
logger.info(f"第 {page_num + 1} 页发现 {len(image_list)} 张图片")
for img_index, img in enumerate(image_list):
try:
xref = img[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
image = Image.open(io.BytesIO(image_bytes))
result = process_image(image)
last_result = result # 保存每次的处理结果
if result['nsfw'] > NSFW_THRESHOLD:
logger.info(f"在第 {page_num + 1} 页发现匹配内容")
return result
except Exception as e:
logger.error(f"处理PDF中的图片失败: {str(e)}")
continue
logger.info("PDF处理完成,返回最后一次处理结果")
return last_result # 返回最后一次处理结果,如果没有处理过任何图片则为None
except Exception as e:
logger.error(f"PDF处理失败: {str(e)}")
raise Exception(f"PDF processing failed: {str(e)}")
def process_video_file(video_path):
"""处理视频文件的入口函数"""
processor = VideoProcessor(video_path)
return processor.process()
def process_archive(filepath, filename, depth=0, max_depth=100):
"""处理压缩文件,支持嵌套压缩包
Args:
filepath: 压缩文件路径
filename: 原始文件名
depth: 当前递归深度
max_depth: 最大递归深度,防止过深的嵌套
"""
temp_dir = None
try:
# 确保 filename 正确编码
encoded_filename = filename # 保存原始文件名
if isinstance(filename, bytes):
with ArchiveHandler(filepath) as temp_handler:
encoded_filename = temp_handler.__encode_filename(filename)
# 检查递归深度
if depth > max_depth:
logger.warning(f"达到最大递归深度 {max_depth}")
return {
'status': 'error',
'message': f'Maximum archive nesting depth ({max_depth}) exceeded'
}, 400
# 创建临时目录
temp_dir = tempfile.mkdtemp()
logger.info(f"处理压缩文件: {encoded_filename}, 深度: {depth}, 临时文件路径: {filepath}")
# 检查文件大小
file_size = os.path.getsize(filepath)
if file_size > MAX_FILE_SIZE:
return {
'status': 'error',
'message': 'File too large'
}, 400
with ArchiveHandler(filepath) as handler:
# 获取文件列表
files = handler.list_files()
# 分离可直接处理的文件和嵌套压缩包
processable_files = []
nested_archives = []
for f in files:
# 确保文件名已正确编码
if isinstance(f, bytes):
f = handler.__encode_filename(f)
ext = os.path.splitext(f)[1].lower()
if ext in ARCHIVE_EXTENSIONS:
nested_archives.append(f)
elif can_process_file(f):
processable_files.append(f)
if not processable_files and not nested_archives:
return {
'status': 'error',
'message': 'No processable files found in archive'
}, 400
# 先处理可直接处理的文件
if processable_files:
sorted_files = sort_files_by_priority(handler, processable_files)
last_result = None
matched_content = None
for inner_filename in sorted_files:
try:
# 确保内部文件名已正确编码
if isinstance(inner_filename, bytes):
inner_filename = handler.__encode_filename(inner_filename)
content = handler.extract_file(inner_filename)
ext = os.path.splitext(inner_filename)[1].lower()
if ext in IMAGE_EXTENSIONS:
img = Image.open(io.BytesIO(content))
result = process_image(img)
last_result = {
'matched_file': inner_filename,
'result': result
}
if result['nsfw'] > NSFW_THRESHOLD:
matched_content = last_result
break
elif ext == '.pdf':
result = process_pdf_file(content)
if result:
last_result = {
'matched_file': inner_filename,
'result': result
}
if result['nsfw'] > NSFW_THRESHOLD:
matched_content = last_result
break
elif ext in VIDEO_EXTENSIONS:
temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
try:
with open(temp_video.name, 'wb') as f:
f.write(content)
result = process_video_file(temp_video.name)
if result:
last_result = {
'matched_file': inner_filename,
'result': result
}
if result['nsfw'] > NSFW_THRESHOLD:
matched_content = last_result
break
finally:
if os.path.exists(temp_video.name):
os.unlink(temp_video.name)
except Exception as e:
logger.error(f"处理文件 {inner_filename} 时出错: {str(e)}")
continue
if matched_content:
logger.info(f"在压缩包 {encoded_filename} 中发现匹配内容: {matched_content['matched_file']}")
return {
'status': 'success',
'filename': encoded_filename,
'result': matched_content['result']
}
# 处理嵌套的压缩包
for nested_archive in nested_archives:
try:
# 确保嵌套压缩包文件名已正确编码
if isinstance(nested_archive, bytes):
nested_archive = handler.__encode_filename(nested_archive)
temp_nested = tempfile.NamedTemporaryFile(delete=False)
content = handler.extract_file(nested_archive)
with open(temp_nested.name, 'wb') as f:
f.write(content)
# 递归处理嵌套压缩包
nested_result = process_archive(
temp_nested.name,
nested_archive,
depth + 1,
max_depth
)
# 如果找到匹配内容,直接返回
if isinstance(nested_result, tuple):
status_code = nested_result[1]
if status_code == 200:
return nested_result[0]
elif nested_result.get('status') == 'success':
return nested_result
except Exception as e:
logger.error(f"处理嵌套压缩包 {nested_archive} 时出错: {str(e)}")
continue
finally:
if os.path.exists(temp_nested.name):
os.unlink(temp_nested.name)
# 如果所有文件都处理完还没有返回,返回最后一个结果
if last_result:
logger.info(f"处理压缩包 {encoded_filename} 完成,最后处理的文件: {last_result['matched_file']}")
return {
'status': 'success',
'filename': encoded_filename,
'result': last_result['result']
}
return {
'status': 'error',
'message': 'No files could be processed successfully'
}, 400
except Exception as e:
logger.error(f"处理压缩包时出错: {str(e)}")
return {
'status': 'error',
'message': str(e)
}, 500
finally:
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.error(f"清理临时目录时出错: {str(e)}")