-
Notifications
You must be signed in to change notification settings - Fork 0
/
player.py
158 lines (118 loc) · 4.8 KB
/
player.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from multiprocessing import Queue, Process
from time import perf_counter
from os import get_terminal_size, system
from platform import platform
from time import sleep
from cv2 import COLOR_BGR2GRAY, VideoCapture, cvtColor, resize, CAP_PROP_FPS
# ANSI chars used for cursor movement for rendering
LINE_UP = '\033[1A'
LINE_CLEAR = '\x1b[2K'
# The ascii characters used to render our video in terminal
Ascii = ("@", "&", "#", "¤", "M", "N", "£",
"$", "%", "O", "X", "L", "|", "/",
"=", "!", ";", ":", "*", "~", "-",
",", ".", " ")
# Ascii = '@&#¤MN£$%OXL|/=!;:*~-,. '
ASCII = Ascii[::-1]
color_const = (len(ASCII)-1)/255
render_delay = 0
def read_frames(video_path: str, queue: Queue, init: bool = False) -> None:
try:
# Attempt reading first frame
stream = VideoCapture(video_path)
#stream = VideoCapture(0)
grabbed, frame = stream.read()
except Exception:
print("Exception reading video stream")
queue.put((None, None, None))
return None
# If first frame read, scale to the current terminal window size.
if grabbed:
#print(f"grabbed: init = {init}")
SCALE = frame.shape[1]/frame.shape[0]
height = int(get_terminal_size().lines*0.9)
cmd_width = get_terminal_size().columns
width = int(height*SCALE*1.5)
# Padding the left and right sides of terminal to stop line overflow
# padding_size = int(cmd_width*0.2)
padding = " " * int(cmd_width*0.2)
dsize = (width, height)
if init:
fps = stream.get(5)
full_ascii_frame = [''.join(("@" for _ in range(width))) for _ in range(height)]
empty_ascii_frame = [''.join((" " for _ in range(width))) for _ in range(height)]
for _ in range(30):
queue.put((height, padding, full_ascii_frame))
queue.put((height, padding, empty_ascii_frame))
queue.put((None, None, None))
return fps
while grabbed:
frame = resize(frame, dsize)
frame = cvtColor(frame, COLOR_BGR2GRAY)
whole_ascii_frame = [''.join((ASCII[int((pixel*color_const))] for pixel in row)) for row in frame]
queue.put((height, padding, whole_ascii_frame))
grabbed, frame = stream.read()
stream.release()
queue.put((None, None, None))
return None
def draw_frames(queue: Queue) -> None:
global render_delay
while True:
# get frame from queue
frame_length, padding, whole_ascii_frame = queue.get()
if frame_length is None:
break
line = ""
for ascii_row in whole_ascii_frame:
#print(f"{LINE_CLEAR} {padding}{ascii_row}", flush=True)
line += f"{LINE_CLEAR} {padding}{ascii_row}\n"
print(line[:-1])
print(f"{LINE_UP} \r"*frame_length, end='\r', flush=True)
sleep(render_delay)
def render(video_path: str) -> None:
global render_delay
# Video frame queue
queue = Queue(maxsize=600)
# Check if on windows or unix system
try:
current_os = platform().system()
except Exception:
current_os = "N/A"
# If current OS is windows, call system() to allow for ansi codes to work.
# NO clue why this works but found here:
# https://stackoverflow.com/questions/12492810/python-how-can-i-make-the-ansi-escape-codes-to-work-also-in-windows
if current_os.lower().__contains__("windows"):
system("")
from colorama import just_fix_windows_console_colors
just_fix_windows_console_colors()
#Initialize the player by checking rendering time for a single frame
try:
# Attempt reading framerate
#print("getting delay")
fps = read_frames(video_path, queue, True)
target_frame_delay = 1.0/fps
#print("getting start")
start = perf_counter()
#print("drawing frame")
draw_frames(queue)
#print("getting render time")
end = perf_counter()
avg_render_time = (end-start)/60
print(f"FPS: {fps}\nAverage render time: {avg_render_time:0.4f} seconds\nTarget render delay: {target_frame_delay:.04f} seconds\n")
if target_frame_delay > avg_render_time:
delay_delta = target_frame_delay - avg_render_time
if delay_delta > 0:
render_delay = delay_delta
except Exception:
print("Exception reading video stream - calibration section")
return None
# Producer process
producer = Process(target=read_frames, args=(video_path, queue,))
producer.start()
# Consumer process - 1.0 second delay to allow producer a buffer.
sleep(1.0)
consumer = Process(target=draw_frames, args=(queue,))
consumer.start()
# Process join
producer.join()
consumer.join()