-
Notifications
You must be signed in to change notification settings - Fork 0
/
lambda_function.py
85 lines (66 loc) · 2.83 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import globus_sdk
import boto3
def get_secret():
secret_name = "arn:aws:secretsmanager:us-east-2:509474786919:secret:GlobusAuthHelloWorldSecret-0q5j66"
region_name = "us-east-2"
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
return eval(get_secret_value_response['SecretString'])
def generate_policy(principalId, effect, resource, message="", name=None, identities=[],
user_id=None, dependent_token=None, user_email=None):
authResponse = {}
authResponse['principalId'] = principalId
if effect and resource:
policyDocument = {}
policyDocument['Version'] = '2012-10-17'
policyDocument['Statement'] = [
{'Action': 'execute-api:Invoke',
'Effect': effect,
'Resource': resource
}
]
authResponse['policyDocument'] = policyDocument
authResponse['context'] = {
'name': name,
'user_id': user_id,
'identities': str(identities),
'globus_dependent_token': str(dependent_token),
'user_email': user_email
}
print("AuthResponse", authResponse)
return authResponse
def lambda_handler(event, context):
globus_secrets = get_secret()
auth_client = globus_sdk.ConfidentialAppAuthClient(
globus_secrets['API_CLIENT_ID'], globus_secrets['API_CLIENT_SECRET'])
print("established auth_client\n")
print("Event received: {} \n".format(event))
token = event['headers']['authorization'].replace("Bearer ", "")
auth_res = auth_client.oauth2_token_introspect(token, include="identities_set")
try:
dependent_token = auth_client.oauth2_get_dependent_tokens(token)
print("Dependent token ", dependent_token)
if not auth_res:
return generate_policy(None, 'Deny', event['routeArn'],
message='User not found')
if not auth_res['active']:
return generate_policy(None, 'Deny', event['routeArn'],
message='User account not active')
print("auth_res", auth_res)
user_email = auth_res.get("email", "nobody@nowhere.com")
return generate_policy(auth_res['username'], 'Allow', event['routeArn'],
name=auth_res["name"],
identities=auth_res["identities_set"],
user_id=auth_res['sub'],
dependent_token=dependent_token,
user_email=user_email)
except:
return generate_policy(None, 'Deny', event['routeArn'],
message='Invalid auth token')