-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathoauth2.py
129 lines (98 loc) · 3.1 KB
/
oauth2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
JWT in httpOnly cookies with OAuth2 password flow.
"""
from calendar import timegm
from datetime import datetime, timedelta
import jwt
from fastapi import HTTPException, Security
from fastapi.security import OAuth2PasswordBearer
from starlette.requests import Request
from schemas import UserData
from passlib.context import CryptContext
from config import JWT_SECRET
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class OAuth2PasswordCookie(OAuth2PasswordBearer):
"""
OAuth2 password flow with token in a httpOnly cookie.
"""
def __init__(self, *args, token_name: str = None, **kwargs):
super().__init__(*args, **kwargs)
self._token_name = token_name or "my-jwt-token"
@property
def token_name(self) -> str:
"""Get the name of the token's cookie.
"""
return self._token_name
async def __call__(self, request: Request) -> str:
"""Extract and return a JWT from the request cookies.
Raises:
HTTPException: 403 error if no token cookie is present.
"""
token = request.cookies.get(self._token_name)
if not token:
raise HTTPException(status_code=403, detail="Not authenticated")
return token
oauth2_schema = OAuth2PasswordCookie(
tokenUrl="/auth/token", scopes={"user": "User", "admin": "Admin"}
)
def validate_jwt_payload(token: str) -> dict:
"""
Decode and validate a JSON web token.
Args:
token (str): JWT token.
Returns:
dict
Raises:
HTTPException: 401 error if the credentials have expired or failed
validation.
"""
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
print(payload)
utc_now = timegm(datetime.utcnow().utctimetuple())
print(payload["exp"])
print(utc_now)
if payload["exp"] <= utc_now:
raise HTTPException(401, detail="Credentials have expired")
return payload
except jwt.PyJWTError:
raise HTTPException(401, detail="Could not validate credentials")
def encode_token(**kwargs) -> str:
"""
Encode the given data to JSON web token
Args:
data = {
'user': 'example-user'
'user_id': 'user's id',
'scopes': ['user', 'admin']
}
Returns:
str (JSON web token)
"""
payload = {key: value for key, value in kwargs.items()}
issued_at = datetime.utcnow()
expire = issued_at + timedelta(minutes=15)
payload.update({"exp": expire, "iat": issued_at, "sub": "jwt-cookies-test"})
encoded_jwt = jwt.encode(
payload,
JWT_SECRET,
algorithm="HS256"
)
return encoded_jwt
def is_authenticated(token: str = Security(oauth2_schema)) -> UserData:
"""Dependency on user being authenticated.
"""
payload = validate_jwt_payload(token)
return UserData(
user=payload["user"], user_id=payload["user_id"], scopes=payload["scopes"]
)
def get_password_hash(password):
"""
Hash the given plain password
"""
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
"""
Verify the given plain_password to the given hashed_password (likely from database)
"""
return pwd_context.verify(plain_password, hashed_password)