-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
150 lines (128 loc) · 4.8 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import logging
import os
from pathlib import Path
import uvicorn
from fastapi import FastAPI
from fastapi import Request
from fastapi import UploadFile
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from db import LiteDB
log_file_path = os.path.join(os.getcwd(), 'log.txt')
class MyLog(logging.Logger):
def __init__(self, name, level=logging.DEBUG, file=log_file_path):
super().__init__(name, level)
fmt = '%(asctime)s| %(levelname)-8s | %(name)-12s | %(filename)s [%(lineno)04d] :%(message)s'
formatter = logging.Formatter(fmt, datefmt='%Y-%m-%d %H:%M:%S')
file_handle = logging.FileHandler(file, encoding='utf-8')
file_handle.setFormatter(formatter)
self.addHandler(file_handle)
console_handle = logging.StreamHandler()
console_handle.setFormatter(formatter)
self.addHandler(console_handle)
logger = MyLog('FastPicBed')
IP = '127.0.0.1'
PORT = 30001
ICON_FILE = os.path.normpath(os.path.join(os.getcwd(), 'static/ico/favicon.ico'))
DB_FILE = os.path.normpath(os.path.join(os.getcwd(), 'fast_pic_bed.db'))
os.makedirs('saves', exist_ok=True)
UPLOAD_DIR = os.path.normpath(os.path.join(os.getcwd(), 'saves'))
ALLOWED_EXTENSIONS = [
'txt',
'pdf',
'png',
'jpg',
'jpeg',
'gif'
]
logger.debug(f'service_host_ip: {IP}')
logger.debug(f'service_host_port: {PORT}')
logger.debug(f'icon: {ICON_FILE}')
logger.debug(f'db_file: {DB_FILE}')
logger.debug(f'upload_dir: {UPLOAD_DIR}')
logger.debug(f'allowed_extensions: {ALLOWED_EXTENSIONS}')
app = FastAPI()
app.mount('/static', StaticFiles(directory='static'), name='static')
app.mount('/saves', StaticFiles(directory='saves'), name='saves')
templates = Jinja2Templates(directory='templates')
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.api_route('/', methods=['GET'])
async def root(request: Request):
pic_num = LiteDB(DB_FILE).counts('pics')
return templates.TemplateResponse('index.html', {
'request': request,
'pic_num': pic_num,
})
@app.api_route('/', methods=['POST'])
async def root(request: Request, file: UploadFile):
logger.debug(f'File name: {file.filename}')
logger.debug(f'File type: {file.content_type}')
pic_num = LiteDB(DB_FILE).counts('pics')
if file.filename == '':
return templates.TemplateResponse('index.html', {
'request': request,
'pic_num': pic_num,
'message': 'File upload failed',
})
# if allowed_file(file.filename):
if True:
file_name = file.filename
try:
url = str('http://127.0.0.1/uploads/' + file_name)
file_path = os.path.join(UPLOAD_DIR, file_name)
content = await file.read()
with open(file_path, 'wb') as fp:
fp.write(content)
LiteDB(DB_FILE).execute(
'INSERT INTO pics (filename) VALUES (?)',
(file_name,))
pic_num = LiteDB(DB_FILE).counts('pics')
html_data = templates.TemplateResponse('index.html', {
'request': request,
'pic_num': pic_num,
'message': url,
})
return html_data
except Exception as e:
logger.error(e.args)
@app.api_route('/uploads/{filename}', methods=['GET', 'POST'])
async def uploaded_file(filename):
file_path = os.path.join(os.path.join(os.getcwd(), 'saves'), filename)
if os.path.exists(file_path):
return FileResponse(path=file_path)
else:
logger.error(f'Not exist: [{file_path}]')
return None
@app.get('/file_view')
async def file_view(request: Request, page: int = 1):
page = max(1, page)
data = LiteDB(DB_FILE).get_paginated_data(page, 'pics', page_size=20)
images = []
for row in data:
id = row[0]
filename = row[1]
create_time = row[2]
image_path = f"uploads/{filename}"
image_data = {
"id": id,
"name": filename,
"create": create_time,
"url": image_path,
}
images.append(image_data)
return templates.TemplateResponse("file_view.html",
{
"request": request,
"images": images,
"page": page
})
if __name__ == '__main__':
my_config = uvicorn.Config(app=f'{Path(__file__).stem}:app',
host=IP,
port=int(PORT),
access_log=True,
workers=4)
my_server = uvicorn.Server(my_config)
my_server.run()