Skip to content

Commit

Permalink
Add Duo Universal Prompt push support
Browse files Browse the repository at this point in the history
Duo's Universal Prompt Okta integration involves a new type of MFA
factor, claims_provider. This change treats claims_provider as Duo
Universal Prompt and implements the interactions required to trigger a
push to the user's preferred device, then complete the transaction and
obtain a valid Okta session.

Supported authentication includes Duo Push, Phone Call, and Passcode.
  • Loading branch information
aogail committed Nov 9, 2023
1 parent 094a329 commit 9f61e8f
Show file tree
Hide file tree
Showing 11 changed files with 1,449 additions and 33 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ A configuration wizard will prompt you to enter the necessary configuration para
- email - OTP via email
- web - DUO uses localhost webbrowser to support push|call|passcode
- passcode - DUO uses `OKTA_MFA_CODE` or `--mfa-code` if set, or prompts user for passcode(OTP).
- claims_provider - DUO Universal Prompt
- duo_universal_factor - (optional) Configure which type of factor to use with Duo Universal Prompt. Must be one of (case-sensitive):
- `Duo Push` (default)
- `Passcode`
- `Phone Call`
- resolve_aws_alias - y or n. If yes, gimme-aws-creds will try to resolve AWS account ids with respective alias names (default: n). This option can also be set interactively in the command line using `-r` or `--resolve` parameter
- include_path - (optional) Includes full role path to the role name in AWS credential profile name. (default: n). If `y`: `<acct>-/some/path/administrator`. If `n`: `<acct>-administrator`
- remember_device - y or n. If yes, the MFA device will be remembered by Okta service for a limited time. This option can also be set interactively in the command line using `-m` or `--remember-device`
Expand Down
197 changes: 197 additions & 0 deletions gimme_aws_creds/duo_universal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import time

import html5lib
from furl import furl

from . import version


class DuoMfaDenied(BaseException):
""" Duo MFA was denied """

def __init__(self, response):
super(DuoMfaDenied, self).__init__(f'Duo MFA denied: {response}')


class OktaDuoUniversal:
""" Handles interaction with the Duo Universal Prompt """

def __init__(self, ui, session, state_token, okta_factor, remember_device, duo_factor='Duo Push', duo_passcode=None):
self.ui = ui
self.state_token = state_token
self.okta_factor = okta_factor
self.remember_device = remember_device
self.session = session
if duo_factor not in ['Duo Push', 'Passcode', 'Phone Call']:
raise Exception('Preferred Duo Universal factor must be one of: Duo Push, Passcode, Phone Call')
self.duo_factor = duo_factor
self.duo_passcode = duo_passcode

def do_auth(self):
""" Follow Duo Universal Prompt flow through to an active Okta user session """

duo_prompt_url, okta_profile_login = self._initiate_okta_factor_verification()
duo_origin, duo_plugin_form_response = self._handle_duo_plugin_form(duo_prompt_url)

# Submit second Duo form (login-form), which triggers a Duo Push, phone call, or accepts the Passcode
login_form_action, duo_login_form_data = self._get_duo_universal_login_form_data(duo_plugin_form_response)
login_form_action_url = furl(duo_origin) / login_form_action
duo_factor, duo_sid, duo_txid, duo_xsrf = self._submit_duo_login_form(duo_login_form_data,
login_form_action_url)

self.ui.info(f"Duo Universal: Using {self.duo_factor}...")

self._wait_for_duo_universal_transaction(duo_origin, duo_txid, duo_sid)

# Once Duo has been approved, load the OIDC exit URL to be redirected to Okta and gain a user session
oidc_exit_url = furl(duo_origin) / 'frame/v4/oidc/exit'
exit_headers = self._get_form_headers()
exit_response = self.session.post(
oidc_exit_url.url,
data={
'txid': duo_txid,
'sid': duo_sid,
'factor': duo_factor,
'_xsrf': duo_xsrf,
'device_key': '',
'dampen_choice': 'false',
},
headers=exit_headers,
)
exit_response.raise_for_status()

# The claims_provider factor immediately yields an active user session, no subsequent request for SID required.
return {
'apiResponse': {
'status': 'SUCCESS',
'userSession': {
"username": okta_profile_login,
"session": self.session.cookies['sid'],
"device_token": self.session.cookies['DT']
}
},
}

def _submit_duo_login_form(self, duo_login_form_data, login_form_action_url):
# Submit Duo's form id=login-form, which triggers a Duo Push, phone call, or accepts a Passcode.
duo_login_form_response = self.session.post(
login_form_action_url.url,
data=duo_login_form_data,
headers=self._get_form_headers(),
)
duo_login_form_response.raise_for_status()
duo_sid = duo_login_form_data['sid']
duo_factor = duo_login_form_data['factor']
duo_xsrf = duo_login_form_data['_xsrf']
duo_login_response_data = duo_login_form_response.json()
if duo_login_response_data['stat'] != 'OK':
raise Exception(f"Triggering Duo MFA failed: {duo_login_form_response.content}")
duo_txid = duo_login_response_data['response']['txid']
return duo_factor, duo_sid, duo_txid, duo_xsrf

def _handle_duo_plugin_form(self, duo_prompt_url):
# Request Duo prompt
verify_get_response = self.session.get(
duo_prompt_url,
)
verify_get_response.raise_for_status()
duo_origin = furl(verify_get_response.url).origin
# Submit first Duo form (plugin_form)
form_data = self._get_duo_universal_plugin_form_data(verify_get_response)
duo_plugin_form_response = self.session.post(
verify_get_response.url,
data=form_data,
headers=self._get_form_headers(),
)
duo_plugin_form_response.raise_for_status()
return duo_origin, duo_plugin_form_response

def _initiate_okta_factor_verification(self):
# POST to the Okta factor verify URL gives us the URL to request to load Duo
verify_post_response = self.session.post(
self.okta_factor['_links']['verify']['href'],
params={'rememberDevice': self.remember_device},
json={'stateToken': self.state_token},
)
verify_post_response.raise_for_status()
verify_response_data = verify_post_response.json()
duo_prompt_url = verify_response_data['_links']['next']['href']
okta_profile_login = verify_response_data['_embedded']['user']['profile']['login']
return duo_prompt_url, okta_profile_login

def _wait_for_duo_universal_transaction(self, duo_host, txid, sid):
status_url = furl(duo_host) / 'frame/v4/status'
status_data = {
'txid': txid,
'sid': sid
}
headers = self._get_form_headers()

tries = 0
while tries < 16:
tries += 1
time.sleep(0.5)

status_response = self.session.post(
status_url.url,
data=status_data,
headers=headers,
)
status_response.raise_for_status()

json_response = status_response.json()
if json_response['stat'] != 'OK':
raise Exception(f"Error checking Duo MFA status: {status_response.text}")

if json_response['response']['status_code'] == 'allow':
return txid
if json_response['response']['status_code'] == 'deny':
raise DuoMfaDenied(json_response)

raise Exception('Timed out waiting for Duo MFA')

@staticmethod
def _get_form_headers():
form_headers = {
'User-Agent': "gimme-aws-creds {}".format(version),
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
}
return form_headers

def _get_duo_universal_login_form_data(self, plugin_form_response):
""" Get form data to post when submitting the Duo login-form """

doc = html5lib.parse(plugin_form_response.content, namespaceHTMLElements=False)
form_action = doc.find('.//form[@id="login-form"]').get('action')
form_data = {}
for field in doc.iterfind('.//form[@id="login-form"]/input'):
form_data[field.get('name')] = field.get('value')

preferred_device = self._find_device_to_use(doc)

form_data['factor'] = self.duo_factor
form_data['device'] = preferred_device
form_data['postAuthDestination'] = 'OIDC_EXIT'
if self.duo_passcode:
form_data['passcode'] = self.duo_passcode

return form_action, form_data

@staticmethod
def _find_device_to_use(doc):
device = doc.find('.//input[@name="preferred_device"]').get('value')
if device is None or device == '':
device = doc.find('.//select[@name="device"]/option').get('value')
return device

@staticmethod
def _get_duo_universal_plugin_form_data(response):
""" Get form data to post when submitting the Duo plugin_form """

doc = html5lib.parse(response.content, namespaceHTMLElements=False)
form_data = {}
for field in doc.iterfind('.//form[@id="plugin_form"]/input'):
form_data[field.get('name')] = field.get('value')

return form_data
3 changes: 3 additions & 0 deletions gimme_aws_creds/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,9 @@ def okta(self):
if self.conf_dict.get('preferred_mfa_type'):
okta.set_preferred_mfa_type(self.conf_dict['preferred_mfa_type'])

if self.conf_dict.get('duo_universal_factor'):
okta.set_duo_universal_factor(self.conf_dict.get('duo_universal_factor'))

if self.config.mfa_code is not None:
okta.set_mfa_code(self.config.mfa_code)
elif self.conf_dict.get('okta_mfa_code'):
Expand Down
73 changes: 49 additions & 24 deletions gimme_aws_creds/okta_classic.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from gimme_aws_creds.u2f import FactorU2F
from gimme_aws_creds.webauthn import WebAuthnClient, FakeAssertion
from . import errors, ui, version, duo
from .duo_universal import OktaDuoUniversal
from .errors import GimmeAWSCredsMFAEnrollStatus
from .registered_authenticators import RegisteredAuthenticators

Expand Down Expand Up @@ -62,6 +63,7 @@ def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=Non
self._username = None
self._password = None
self._preferred_mfa_type = None
self._duo_universal_factor = 'Duo Push'
self._mfa_code = None
self._remember_device = None

Expand Down Expand Up @@ -104,6 +106,9 @@ def set_preferred_mfa_type(self, preferred_mfa_type):
def set_mfa_code(self, mfa_code):
self._mfa_code = mfa_code

def set_duo_universal_factor(self, duo_universal_factor):
self._duo_universal_factor = duo_universal_factor

def set_remember_device(self, remember_device):
self._remember_device = bool(remember_device)

Expand Down Expand Up @@ -158,30 +163,33 @@ def auth_session(self, **kwargs):
""" Authenticate the user and return the Okta Session ID and username"""
login_response = self.auth()

session_url = self._okta_org_url + '/login/sessionCookieRedirect'

if 'redirect_uri' not in kwargs:
redirect_uri = 'http://localhost:8080/login'
if 'userSession' in login_response:
return login_response['userSession']
else:
redirect_uri = kwargs['redirect_uri']
session_url = self._okta_org_url + '/login/sessionCookieRedirect'

params = {
'token': login_response['sessionToken'],
'redirectUrl': redirect_uri
}
if 'redirect_uri' not in kwargs:
redirect_uri = 'http://localhost:8080/login'
else:
redirect_uri = kwargs['redirect_uri']

response = self._http_client.get(
session_url,
params=params,
headers=self._get_headers(),
verify=self._verify_ssl_certs,
allow_redirects=False
)
return {
"username": login_response['_embedded']['user']['profile']['login'],
"session": response.cookies['sid'],
"device_token": self._http_client.cookies['DT']
}
params = {
'token': login_response['sessionToken'],
'redirectUrl': redirect_uri
}

response = self._http_client.get(
session_url,
params=params,
headers=self._get_headers(),
verify=self._verify_ssl_certs,
allow_redirects=False
)
return {
"username": login_response['_embedded']['user']['profile']['login'],
"session": response.cookies['sid'],
"device_token": self._http_client.cookies['DT']
}

def auth_oauth(self, client_id, **kwargs):
""" Login to Okta and retrieve access token, ID token or both """
Expand Down Expand Up @@ -451,6 +459,19 @@ def _login_send_push(self, state_token, factor):
if 'sessionToken' in response_data:
return {'stateToken': None, 'sessionToken': response_data['sessionToken'], 'apiResponse': response_data}

def _login_duo_universal(self, state_token, factor):
duo_passcode = None
if self._duo_universal_factor == 'Passcode':
duo_passcode = self.ui.input(message='Duo Passcode: ')
duo_client = OktaDuoUniversal(self.ui,
self._http_client,
state_token,
factor,
self._remember_device,
self._duo_universal_factor,
duo_passcode)
return duo_client.do_auth()

def _login_input_webauthn_challenge(self, state_token, factor):
""" Retrieve nonce """
response = self._http_client.post(
Expand Down Expand Up @@ -593,6 +614,8 @@ def _login_multi_factor(self, state_token, login_data):
return self._login_input_webauthn_challenge(state_token, factor)
elif factor['factorType'] == 'token:hardware':
return self._login_input_mfa_challenge(state_token, factor['_links']['verify']['href'])
elif factor['factorType'] == 'claims_provider':
return self._login_duo_universal(state_token, factor)

def _login_input_mfa_challenge(self, state_token, next_url):
""" Submit verification code for SMS or TOTP authentication methods"""
Expand Down Expand Up @@ -717,7 +740,7 @@ def _check_webauthn_result(self, state_token, login_data):
else:
return {'stateToken': None, 'sessionToken': None, 'apiResponse': response_data}

def get_saml_response(self, url, auth_session = None):
def get_saml_response(self, url, auth_session=None):
""" return the base64 SAML value object from the SAML Response"""
response = self._http_client.get(url, verify=self._verify_ssl_certs)
response.raise_for_status()
Expand Down Expand Up @@ -888,6 +911,8 @@ def _build_factor_name(self, factor):
return factor['factorType'] + ": " + factor_name
elif factor['factorType'] == 'token:hardware':
return factor['factorType'] + ": " + factor['provider']
elif factor['factorType'] == 'claims_provider':
return factor['factorType'] + ": " + factor['vendorName']

else:
return "Unknown MFA type: " + factor['factorType']
Expand Down Expand Up @@ -1063,7 +1088,7 @@ def _introspect_factors(self, state_token):
@staticmethod
def _extract_state_token_from_http_response(http_res):
# extract the stateToken from a javascript variable
state_token_re = re.search(r"var stateToken = '(.*)';", http_res.text)
state_token_re = re.search(r"var stateToken = '(.*)';", http_res.text)
if state_token_re is not None:
return decode(state_token_re.group(1), "unicode-escape")

Expand All @@ -1074,4 +1099,4 @@ def _extract_state_token_from_http_response(http_res):
state_token_re = re.search(r"stateToken=(.*?[ \"])", http_res.text)
if state_token_re is not None:
pre_state_token = decode(state_token_re.group(1), "unicode-escape")
return pre_state_token.rstrip('\"')
return pre_state_token.rstrip('\"')
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ okta>=0.0.4,<1.0.0
ctap-keyring-device==1.0.6
pyjwt>=2.4.0,<3.0.0
urllib3>=1.26.0,<2.0.0
html5lib>=1.1,<2.0.0
furl>=2.1.3,<3.0.0
8 changes: 8 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os


def read_fixture(file_name):
"""Read a fixture file"""
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures', file_name)
with open(fixture_path, 'r', encoding='utf-8') as file:
return file.read()
Loading

0 comments on commit 9f61e8f

Please sign in to comment.