-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathserver.py
192 lines (164 loc) · 6.71 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
"""Python Flask API Auth0 integration example
"""
from functools import wraps
import json
from os import environ as env
from typing import Dict
from six.moves.urllib.request import urlopen
from dotenv import load_dotenv, find_dotenv
from flask import Flask, request, jsonify, _request_ctx_stack, Response
from flask_cors import cross_origin
from jose import jwt
ENV_FILE = find_dotenv()
if ENV_FILE:
load_dotenv(ENV_FILE)
AUTH0_DOMAIN = env.get("AUTH0_DOMAIN")
API_IDENTIFIER = env.get("API_IDENTIFIER")
ALGORITHMS = ["RS256"]
APP = Flask(__name__)
# Format error response and append status code.
class AuthError(Exception):
"""
An AuthError is raised whenever the authentication failed.
"""
def __init__(self, error: Dict[str, str], status_code: int):
super().__init__()
self.error = error
self.status_code = status_code
@APP.errorhandler(AuthError)
def handle_auth_error(ex: AuthError) -> Response:
"""
serializes the given AuthError as json and sets the response status code accordingly.
:param ex: an auth error
:return: json serialized ex response
"""
response = jsonify(ex.error)
response.status_code = ex.status_code
return response
def get_token_auth_header() -> str:
"""Obtains the access token from the Authorization Header
"""
auth = request.headers.get("Authorization", None)
if not auth:
raise AuthError({"code": "authorization_header_missing",
"description":
"Authorization header is expected"}, 401)
parts = auth.split()
if parts[0].lower() != "bearer":
raise AuthError({"code": "invalid_header",
"description":
"Authorization header must start with"
" Bearer"}, 401)
if len(parts) == 1:
raise AuthError({"code": "invalid_header",
"description": "Token not found"}, 401)
if len(parts) > 2:
raise AuthError({"code": "invalid_header",
"description":
"Authorization header must be"
" Bearer token"}, 401)
token = parts[1]
return token
def requires_scope(required_scope: str) -> bool:
"""Determines if the required scope is present in the access token
Args:
required_scope (str): The scope required to access the resource
"""
token = get_token_auth_header()
unverified_claims = jwt.get_unverified_claims(token)
if unverified_claims.get("scope"):
token_scopes = unverified_claims["scope"].split()
for token_scope in token_scopes:
if token_scope == required_scope:
return True
return False
def requires_auth(func):
"""Determines if the access token is valid
"""
@wraps(func)
def decorated(*args, **kwargs):
token = get_token_auth_header()
jsonurl = urlopen("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json")
jwks = json.loads(jsonurl.read())
try:
unverified_header = jwt.get_unverified_header(token)
except jwt.JWTError as jwt_error:
raise AuthError({"code": "invalid_header",
"description":
"Invalid header. "
"Use an RS256 signed JWT Access Token"}, 401) from jwt_error
if unverified_header["alg"] == "HS256":
raise AuthError({"code": "invalid_header",
"description":
"Invalid header. "
"Use an RS256 signed JWT Access Token"}, 401)
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
if rsa_key:
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=ALGORITHMS,
audience=API_IDENTIFIER,
issuer="https://" + AUTH0_DOMAIN + "/"
)
except jwt.ExpiredSignatureError as expired_sign_error:
raise AuthError({"code": "token_expired",
"description": "token is expired"}, 401) from expired_sign_error
except jwt.JWTClaimsError as jwt_claims_error:
raise AuthError({"code": "invalid_claims",
"description":
"incorrect claims,"
" please check the audience and issuer"}, 401) from jwt_claims_error
except Exception as exc:
raise AuthError({"code": "invalid_header",
"description":
"Unable to parse authentication"
" token."}, 401) from exc
_request_ctx_stack.top.current_user = payload
return func(*args, **kwargs)
raise AuthError({"code": "invalid_header",
"description": "Unable to find appropriate key"}, 401)
return decorated
# Controllers API
@APP.route("/api/public")
@cross_origin(headers=["Content-Type", "Authorization"])
def public():
"""No access token required to access this route
"""
response = "Hello from a public endpoint! You don't need to be authenticated to see this."
return jsonify(message=response)
@APP.route("/api/private")
@cross_origin(headers=["Content-Type", "Authorization"])
@cross_origin(headers=["Access-Control-Allow-Origin", "http://localhost:3000"])
@requires_auth
def private():
"""A valid access token is required to access this route
"""
response = "Hello from a private endpoint! You need to be authenticated to see this."
return jsonify(message=response)
@APP.route("/api/private-scoped")
@cross_origin(headers=["Content-Type", "Authorization"])
@cross_origin(headers=["Access-Control-Allow-Origin", "http://localhost:3000"])
@requires_auth
def private_scoped():
"""A valid access token and an appropriate scope are required to access this route
"""
if requires_scope("read:messages"):
response = "Hello from a private endpoint! You need to be authenticated and have a scope of read:messages to see this."
return jsonify(message=response)
raise AuthError({
"code": "Unauthorized",
"description": "You don't have access to this resource"
}, 403)
if __name__ == "__main__":
APP.run(host="0.0.0.0", port=env.get("PORT", 3010))