Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: implement new google auth workflow with TOTP #281

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions aws_google_auth/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,118 @@ def do_login(self):

# Collect information from the page source
first_page = BeautifulSoup(sess.text, 'html.parser')

if first_page.find('form', {'id': 'gaia_loginform'}):
logging.info("Handling old-style login flow")
self.do_login_old(first_page)
else:
logging.info("Handling new-style login flow")
self.do_login_new(first_page)


def do_login_new(self, first_page):
payload = {}

form = first_page.find('form')
self.cont = first_page.find('input', {'name': 'identifier'}).get('value')
account_login_url = 'https://accounts.google.com' + form.get('action')
payload['identifier'] = self.config.username

for tag in form.find_all('input'):
if tag.get('name') is None:
continue
if tag.get('name') == 'identifier':
continue

payload[tag.get('name')] = tag.get('value')


if self.config.bg_response:
payload['bgresponse'] = self.config.bg_response

if payload.get('PersistentCookie', None) is not None:
payload['PersistentCookie'] = 'yes'

if payload.get('TrustDevice', None) is not None:
payload['TrustDevice'] = 'on'

# POST to account login info page, to solve captcha
sess = self.post(account_login_url, data=payload)

self.session.headers['Referer'] = sess.url

# Collect ProfileInformation, SessionState, signIn, and Password Challenge URL
challenge_page = BeautifulSoup(sess.text, 'html.parser')

if challenge_page.find('img', {'id': 'captchaimg'}):
self.session.headers['Referer'] = sess.url

sess = self.handle_captcha_new(challenge_page, payload)

challenge_page = BeautifulSoup(sess.text, 'html.parser')
form = challenge_page.find('form')
passwd_challenge_url = 'https://accounts.google.com' + form.get('action')

for tag in form.find_all('input'):
if tag.get('name') is None:
continue

payload[tag.get('name')] = tag.get('value')

# Update the payload
payload['Passwd'] = self.config.password

# Set bg_response in request payload to passwd challenge
if self.config.bg_response:
payload['bgresponse'] = self.config.bg_response

# POST to Authenticate Password
sess = self.post(passwd_challenge_url, data=payload)

response_page = BeautifulSoup(sess.text, 'html.parser')
error = response_page.find(class_='error-msg')

# Were there any errors logging in? Could be invalid username or password
# There could also sometimes be a Captcha, which means Google thinks you,
# or someone using the same outbound IP address as you, is a bot.
if error is not None:
raise ExpectedGoogleException('Invalid username or password')

if "signin/rejected" in sess.url:
raise ExpectedGoogleException(u'''Default value of parameter `bgresponse` has not accepted.
Please visit login URL {}, open the web inspector and execute document.bg.invoke() in the console.
Then, set --bg-response to the function output.'''.format(self.login_url))

self.check_extra_step(response_page)

self.session.headers['Referer'] = sess.url

if "selectchallenge/" in sess.url:
raise NotImplementedError('handle_selectchallenge not updated')

# Was there an MFA challenge?
if "challenge/totp?" in sess.url:
sess = self.handle_totp_new(sess, payload)
elif "challenge/ipp?" in sess.url:
raise NotImplementedError('handle_ipp not updated')
elif "challenge/az?" in sess.url:
raise NotImplementedError('handle_az not updated')
elif "challenge/sk?" in sess.url:
raise NotImplementedError('handle_sk not updated')
elif "challenge/iap?" in sess.url:
raise NotImplementedError('handle_iap not updated')
elif "challenge/dp?" in sess.url:
sess = self.handle_dp_new(sess)
elif "challenge/ootp/5" in sess.url:
raise NotImplementedError(
'Offline Google App OOTP not implemented')

# ... there are different URLs for backup codes (printed)
# and security keys (eg yubikey) as well
# save for later
self.session_state = sess

def do_login_old(self, first_page):
# gxf = first_page.find('input', {'name': 'gxf'}).get('value')
self.cont = first_page.find('input', {'name': 'continue'}).get('value')
# page = first_page.find('input', {'name': 'Page'}).get('value')
Expand Down Expand Up @@ -368,6 +480,48 @@ def parse_saml(self):

return base64.b64decode(saml_element)

def handle_captcha_new(self, response_page, payload):
passwd_challenge_url = "https://accounts.google.com" + response_page.find('form').get('action')
captcha_url = "https://accounts.google.com" + response_page.find('img', {'id': 'captchaimg'}).get('src')

open_image = True

# Check if there is a display utility installed as Image.open(f).show() do not raise any exception if not
# if neither xv or display are available just display the URL for the user to visit.
if os.name == 'posix' and sys.platform != 'darwin':
if find_executable('xv') is None and find_executable('display') is None:
open_image = False

print("Please visit the following URL to view your CAPTCHA: {}".format(captcha_url))

if open_image:
try:
with requests.get(captcha_url) as url:
with io.BytesIO(url.content) as f:
Image.open(f).show()
except Exception:
pass

try:
captcha_input = raw_input("Captcha (case insensitive): ") or None
except NameError:
captcha_input = input("Captcha (case insensitive): ") or None

# Add form inputs
for tag in response_page.find_all('input'):
if tag.get('name') is None:
continue
payload[tag.get('name')] = tag.get('value')

# Set bg_response in request payload to passwd challenge
if self.config.bg_response:
payload['bgresponse'] = self.config.bg_response

# Update the payload
payload['ca'] = captcha_input

return self.post(passwd_challenge_url, data=payload)

def handle_captcha(self, sess, payload):
response_page = BeautifulSoup(sess.text, 'html.parser')

Expand Down Expand Up @@ -660,6 +814,29 @@ def check_prompt_code(response):
if num_code:
print("numerical code for prompt: {}".format(num_code.string))

def handle_totp_new(self, sess, payload):
while True:
response_page = BeautifulSoup(sess.text, 'html.parser')
challenge_url = 'https://accounts.google.com' + response_page.find('form').get('action')

mfa_token = input("MFA token: ") or None

if not mfa_token:
raise ValueError(
"MFA token required for {} but none supplied.".format(
self.config.username))

# Update the payload
payload['Pin'] = mfa_token

# Submit TOTP
sess = self.post(challenge_url, data=payload)

if "SAMLResponse" in sess.text:
break

return sess

def handle_totp(self, sess):
response_page = BeautifulSoup(sess.text, 'html.parser')
tl = response_page.find('input', {'name': 'TL'}).get('value')
Expand Down Expand Up @@ -709,6 +886,24 @@ def handle_dp(self, sess):
# Submit Configuration
return self.post(challenge_url, data=payload)

def handle_dp_new(self, sess):
response_page = BeautifulSoup(sess.text, 'html.parser')

input("Check your phone - after you have confirmed response press ENTER to continue.") or None

form = response_page.find('form')
challenge_url = 'https://accounts.google.com' + form.get('action')

payload = {}
for tag in form.find_all('input'):
if tag.get('name') is None:
continue

payload[tag.get('name')] = tag.get('value')

# Submit Configuration
return self.post(challenge_url, data=payload)

def handle_iap(self, sess):
response_page = BeautifulSoup(sess.text, 'html.parser')
challenge_url = sess.url.split("?")[0]
Expand Down