Skip to content

Commit

Permalink
Refactor(admin-endpoints)!: refactor admin endpoints and add new two …
Browse files Browse the repository at this point in the history
…get admin depends
  • Loading branch information
erfjab committed Aug 17, 2024
1 parent cf6d8ac commit e87e15b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 83 deletions.
29 changes: 29 additions & 0 deletions app/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Optional
from app.models.admin import AdminInDB, AdminValidationResult
from app.db import Session, crud, get_db
from config import SUDOERS
from fastapi import Depends, HTTPException


def validate_admin(db: Session, username: str, password: str) -> Optional[AdminValidationResult]:
"""
Validate admin credentials with environment variables or database.
"""
if SUDOERS.get(username) == password:
return AdminValidationResult(username, True)

dbadmin = crud.get_admin(db, username)
if dbadmin and AdminInDB.from_orm(dbadmin).verify_password(password):
return AdminValidationResult(dbadmin.username, dbadmin.is_sudo)

return None


def get_admin_by_username(username: str, db: Session = Depends(get_db)):
"""
Fetch an admin by username from the database.
"""
dbadmin = crud.get_admin(db, username)
if not dbadmin:
raise HTTPException(status_code=404, detail="Admin not found")
return dbadmin
21 changes: 21 additions & 0 deletions app/models/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ def get_current(cls,

return admin

@classmethod
def check_sudo_admin(cls,
db: Session,
token: str) -> 'Admin':
admin = cls.get_admin(token, db)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not admin.is_sudo:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You're not allowed"
)
return admin

class AdminCreate(Admin):
password: str
Expand Down Expand Up @@ -107,3 +124,7 @@ class AdminInDB(Admin):

def verify_password(self, plain_password):
return pwd_context.verify(plain_password, self.hashed_password)

class AdminValidationResult(BaseModel):
username: str
is_sudo: bool
147 changes: 64 additions & 83 deletions app/routers/admin.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
from typing import List, Optional

import sqlalchemy
from sqlalchemy.exc import IntegrityError
from app.db import Session, crud, get_db
from app.models.admin import Admin, AdminCreate, AdminInDB, AdminModify, Token
from app.models.admin import Admin, AdminCreate, AdminModify, Token
from app.utils.jwt import create_admin_token
from config import SUDOERS
from fastapi import Depends, HTTPException, status, Request, APIRouter
from fastapi.security import OAuth2PasswordRequestForm
from app.utils import report

router = APIRouter(tags=['Admin'], prefix='/api')

def authenticate_env_sudo(username: str, password: str) -> bool:
try:
return password == SUDOERS[username]
except KeyError:
return False
from app.dependencies import validate_admin, get_admin_by_username


def authenticate_admin(db: Session, username: str, password: str) -> Optional[Admin]:
dbadmin = crud.get_admin(db, username)
if not dbadmin:
return None

return dbadmin if AdminInDB.from_orm(dbadmin).verify_password(password) else None
router = APIRouter(tags=['Admin'], prefix='/api')


def get_client_ip(request: Request) -> str:
"""Extract the client's IP address from the request headers or client."""
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
Expand All @@ -41,102 +29,95 @@ def admin_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""Authenticate an admin and issue a token."""
client_ip = get_client_ip(request)
dbadmin = validate_admin(db, form_data.username, form_data.password)

if not dbadmin:
report.login(form_data.username, form_data.password, client_ip, False)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)

if authenticate_env_sudo(form_data.username, form_data.password):
report.login(form_data.username, '🔒', client_ip, True)
return Token(access_token=create_admin_token(form_data.username, is_sudo=True))

if dbadmin := authenticate_admin(db, form_data.username, form_data.password):
report.login(form_data.username, '🔒', client_ip, True)
return Token(access_token=create_admin_token(form_data.username, is_sudo=dbadmin.is_sudo))

report.login(form_data.username, form_data.password, client_ip, False)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
report.login(form_data.username, '🔒', client_ip, True)
return Token(
access_token=create_admin_token(
form_data.username,
dbadmin.is_sudo
)
)


@router.post("/admin", response_model=Admin)
def create_admin(new_admin: AdminCreate,
db: Session = Depends(get_db),
admin: Admin = Depends(Admin.get_current)):

if not admin.is_sudo:
raise HTTPException(status_code=403, detail="You're not allowed")

def create_admin(
new_admin: AdminCreate,
db: Session = Depends(get_db),
admin: Admin = Depends(Admin.check_sudo_admin)
):
"""Create a new admin if the current admin has sudo privileges."""
try:
dbadmin = crud.create_admin(db, new_admin)
except sqlalchemy.exc.IntegrityError:
except IntegrityError:
db.rollback()
raise HTTPException(status_code=409, detail="Admin already exists")

return dbadmin


@router.put("/admin/{username}", response_model=Admin)
def modify_admin(username: str,
modified_admin: AdminModify,
db: Session = Depends(get_db),
admin: Admin = Depends(Admin.get_current)):
if not (admin.is_sudo or admin.username == username):
raise HTTPException(status_code=403, detail="You're not allowed")

# If a non-sudoer admin is making itself a sudoer
if (admin.username == username) and (modified_admin.is_sudo and not admin.is_sudo):
raise HTTPException(status_code=403, detail="You can't make yourself sudoer!")

dbadmin = crud.get_admin(db, username)
if not dbadmin:
raise HTTPException(status_code=404, detail="Admin not found")

# If a sudoer admin wants to edit another sudoer
if (username != admin.username) and dbadmin.is_sudo:
def modify_admin(
modified_admin: AdminModify,
dbadmin: Admin = Depends(get_admin_by_username),
db: Session = Depends(get_db),
current_admin: Admin = Depends(Admin.check_sudo_admin)
):
"""Modify an existing admin's details."""
if (dbadmin.username != current_admin.username) and dbadmin.is_sudo:
raise HTTPException(
status_code=403,
detail=("You're not allowed to edit another sudoers account. Use marzban-cli instead.",),
detail="You're not allowed to edit another sudoer's account. Use marzban-cli instead."
)

dbadmin = crud.update_admin(db, dbadmin, modified_admin)
return dbadmin

updated_admin = crud.update_admin(db, dbadmin, modified_admin)

return updated_admin


@router.delete("/admin/{username}")
def remove_admin(username: str,
db: Session = Depends(get_db),
admin: Admin = Depends(Admin.get_current)):
if not admin.is_sudo:
raise HTTPException(status_code=403, detail="You're not allowed")

dbadmin = crud.get_admin(db, username)
if not dbadmin:
raise HTTPException(status_code=404, detail="Admin not found")

def remove_admin(
dbadmin: Admin = Depends(get_admin_by_username),
db: Session = Depends(get_db),
current_admin: Admin = Depends(Admin.check_sudo_admin)
):
"""Remove an admin from the database."""
if dbadmin.is_sudo:
raise HTTPException(
status_code=403,
detail=("You're not allowed to delete sudoers accounts. Use marzban-cli instead."),
detail="You're not allowed to delete sudo accounts. Use marzban-cli instead."
)

dbadmin = crud.remove_admin(db, dbadmin)
return {}
crud.remove_admin(db, dbadmin)

return {"detail": "Admin removed successfully"}


@router.get("/admin", response_model=Admin)
def get_current_admin(admin: Admin = Depends(Admin.get_current)):
def get_current_admin(
admin: Admin = Depends(Admin.get_current)
):
"""Retrieve the current authenticated admin."""
return admin


@router.get("/admins", response_model=List[Admin])
def get_admins(offset: int = None,
limit: int = None,
username: str = None,
db: Session = Depends(get_db),
admin: Admin = Depends(Admin.get_current)):

if not admin.is_sudo:
raise HTTPException(status_code=403, detail="You're not allowed")

return crud.get_admins(db, offset, limit, username)
def get_admins(
offset: Optional[int] = None,
limit: Optional[int] = None,
username: Optional[str] = None,
db: Session = Depends(get_db),
admin: Admin = Depends(Admin.check_sudo_admin)
):
"""Fetch a list of admins with optional filters for pagination and username."""
return crud.get_admins(db, offset, limit, username)

0 comments on commit e87e15b

Please sign in to comment.