-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[v1.3.0] Merge pull request #41 from KageRyo/develop
Update to RyoURL v1.3.0
- Loading branch information
Showing
16 changed files
with
393 additions
and
269 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
ALLOWED_HOSTS = [ | ||
'.ngrok-free.app', | ||
'127.0.0.1', | ||
'172.21.0.2', | ||
'localhost', | ||
'0.0.0.0' | ||
] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,5 @@ | ||
from .short_url_basic_api import short_url_router | ||
from .short_url_with_auth_api import auth_short_url_router | ||
from .auth_api import auth_router | ||
from .short_api import url_router | ||
from .user_api import user_router | ||
from .admin_api import admin_router |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from http import HTTPStatus | ||
from ninja import Router | ||
from typing import List | ||
import datetime | ||
from django.shortcuts import get_object_or_404 | ||
from ninja.errors import HttpError | ||
|
||
from ..models import Url, User | ||
from .schemas import UrlSchema, ErrorSchema, UserInfoSchema | ||
|
||
admin_router = Router() | ||
|
||
@admin_router.get('all-urls', response={HTTPStatus.OK: List[UrlSchema], HTTPStatus.FORBIDDEN: ErrorSchema}) | ||
def get_all_url(request): | ||
urls = Url.objects.all() | ||
return HTTPStatus.OK, [UrlSchema.from_orm(url) for url in urls] | ||
|
||
@admin_router.delete('expire-urls', response={HTTPStatus.NO_CONTENT: None, HTTPStatus.FORBIDDEN: ErrorSchema}) | ||
def delete_expire_url(request): | ||
Url.objects.filter(expire_date__lt=datetime.datetime.now()).delete() | ||
return HTTPStatus.NO_CONTENT, None | ||
|
||
@admin_router.get('users', response={HTTPStatus.OK: List[UserInfoSchema], HTTPStatus.FORBIDDEN: ErrorSchema}) | ||
def get_all_users(request): | ||
users = User.objects.all() | ||
return HTTPStatus.OK, [UserInfoSchema(username=user.username, user_type=user.user_type) for user in users] | ||
|
||
@admin_router.put('user/{username}', response={HTTPStatus.OK: UserInfoSchema, HTTPStatus.FORBIDDEN: ErrorSchema, HTTPStatus.NOT_FOUND: ErrorSchema}) | ||
def update_user_type(request, username: str, user_type: int): | ||
user = get_object_or_404(User, username=username) | ||
user.user_type = user_type | ||
user.save() | ||
return HTTPStatus.OK, UserInfoSchema(username=user.username, user_type=user.user_type) | ||
|
||
@admin_router.delete('user/{username}', response={HTTPStatus.NO_CONTENT: None, HTTPStatus.FORBIDDEN: ErrorSchema, HTTPStatus.NOT_FOUND: ErrorSchema}) | ||
def delete_user(request, username: str): | ||
user = get_object_or_404(User, username=username) | ||
user.delete() | ||
return HTTPStatus.NO_CONTENT, None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from ninja.security import HttpBearer | ||
from rest_framework_simplejwt.tokens import AccessToken | ||
from ninja.errors import HttpError | ||
from http import HTTPStatus | ||
from ..models import User | ||
|
||
class JWTAuth(HttpBearer): | ||
def authenticate(self, request, token): | ||
if not token: | ||
return None | ||
try: | ||
access_token = AccessToken(token) | ||
user = User.objects.get(id=access_token['user_id']) | ||
return { | ||
'user': user, | ||
'user_type': user.user_type | ||
} | ||
except Exception: | ||
return None | ||
|
||
class AnonymousAuth: | ||
def __call__(self, request): | ||
return {'user': None, 'user_type': 0} | ||
|
||
class AdminJWTAuth(JWTAuth): | ||
def authenticate(self, request, token): | ||
auth = super().authenticate(request, token) | ||
if not auth or auth['user_type'] != 2: | ||
raise HttpError(HTTPStatus.FORBIDDEN, "需要管理員權限") | ||
return auth |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,93 +1,49 @@ | ||
from ninja import Router, Schema | ||
from ninja.security import HttpBearer | ||
from http import HTTPStatus | ||
from django.contrib.auth import authenticate | ||
|
||
from ninja import Router | ||
from rest_framework_simplejwt.tokens import RefreshToken | ||
from rest_framework_simplejwt.exceptions import TokenError | ||
from rest_framework_simplejwt.tokens import AccessToken, RefreshToken | ||
from ninja.errors import HttpError | ||
from django.db import IntegrityError | ||
|
||
from .schemas import UserSchema, UserResponseSchema, ErrorSchema | ||
from ..models import User | ||
from .auth import JWTAuth | ||
|
||
auth_router = Router(tags=["auth"]) | ||
|
||
# 定義 User 的 Schema 類別 | ||
class UserSchema(Schema): | ||
username: str | ||
password: str | ||
|
||
# 定義 Token 的 Schema 類別 | ||
class TokenSchema(Schema): | ||
refresh: str | ||
|
||
# 定義 Token 回應的 Schema 類別 | ||
class TokenResponseSchema(Schema): | ||
access: str | ||
|
||
# 定義 User 註冊或登入回應的 Schema 類別 | ||
class UserResponseSchema(Schema): | ||
username: str | ||
access: str | ||
refresh: str | ||
|
||
# 定義錯誤回應的 Schema 類別 | ||
class ErrorSchema(Schema): | ||
message: str | ||
|
||
# JWT 認證類別 | ||
class JWTAuth(HttpBearer): | ||
def authenticate(self, request, token): | ||
try: | ||
access_token = AccessToken(token) | ||
user_id = access_token['user_id'] | ||
user = User.objects.get(id=user_id) | ||
request.auth = { | ||
'user': user, | ||
'user_type': user.user_type | ||
} | ||
return request.auth | ||
except Exception: | ||
request.auth = None | ||
return None | ||
|
||
# POST : 註冊 API /api/auth/register | ||
@auth_router.post("register", auth=None, response={201: UserResponseSchema, 400: ErrorSchema}) | ||
@auth_router.post("register", auth=None, response={HTTPStatus.CREATED: UserResponseSchema, HTTPStatus.BAD_REQUEST: ErrorSchema}) | ||
def register_user(request, user_data: UserSchema): | ||
try: | ||
user = User.objects.create_user( | ||
username=user_data.username, | ||
password=user_data.password | ||
) | ||
refresh = RefreshToken.for_user(user) | ||
return 201, { | ||
"username": user.username, | ||
"access": str(refresh.access_token), | ||
"refresh": str(refresh) | ||
} | ||
except: | ||
return 400, {"message": "註冊失敗"} | ||
except IntegrityError: | ||
raise HttpError(HTTPStatus.BAD_REQUEST, "用戶名已存在") | ||
except Exception as e: | ||
raise HttpError(HTTPStatus.BAD_REQUEST, f"註冊失敗: {str(e)}") | ||
|
||
refresh = RefreshToken.for_user(user) | ||
return HTTPStatus.CREATED, UserResponseSchema( | ||
username=user.username, | ||
user_type=user.user_type, | ||
access=str(refresh.access_token), | ||
refresh=str(refresh) | ||
) | ||
|
||
# POST : 登入 API /api/auth/login | ||
@auth_router.post("login", auth=None, response={200: UserResponseSchema, 400: ErrorSchema}) | ||
@auth_router.post("login", auth=None, response={HTTPStatus.OK: UserResponseSchema, HTTPStatus.BAD_REQUEST: ErrorSchema}) | ||
def login_user(request, user_data: UserSchema): | ||
user = authenticate( | ||
username=user_data.username, | ||
password=user_data.password | ||
) | ||
if user: | ||
refresh = RefreshToken.for_user(user) | ||
return 200, { | ||
"username": user.username, | ||
"access": str(refresh.access_token), | ||
"refresh": str(refresh) | ||
} | ||
else: | ||
return 400, {"message": "登入失敗"} | ||
|
||
# POST : 更新 TOKEN API /api/auth/refresh-token | ||
@auth_router.post("refresh-token", auth=None, response={200: TokenResponseSchema, 400: ErrorSchema}) | ||
def refresh_token(request, token_data: TokenSchema): | ||
try: | ||
refresh = RefreshToken(token_data.refresh) | ||
return 200, {"access": str(refresh.access_token)} | ||
except TokenError: | ||
return 400, {"message": "無效的更新權杖"} | ||
if not user: | ||
raise HttpError(HTTPStatus.BAD_REQUEST, "登入失敗") | ||
|
||
refresh = RefreshToken.for_user(user) | ||
return HTTPStatus.OK, UserResponseSchema( | ||
username=user.username, | ||
user_type=user.user_type, | ||
access=str(refresh.access_token), | ||
refresh=str(refresh) | ||
) |
Oops, something went wrong.