Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support 2FA in Bitwarden #178

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions skyvern/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ def __init__(self, message: str) -> None:
super().__init__(f"Error listing items in Bitwarden: {message}")


class BitwardenTOTPError(BitwardenBaseError):
def __init__(self, message: str) -> None:
super().__init__(f"Error generating TOTP in Bitwarden: {message}")


class BitwardenLogoutError(BitwardenBaseError):
def __init__(self, message: str) -> None:
super().__init__(f"Error logging out of Bitwarden: {message}")
135 changes: 84 additions & 51 deletions skyvern/forge/sdk/services/bitwarden.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
import json
import os
import subprocess
from enum import StrEnum

import structlog

from skyvern.exceptions import BitwardenListItemsError, BitwardenLoginError, BitwardenLogoutError, BitwardenUnlockError
from skyvern.exceptions import (
BitwardenListItemsError,
BitwardenLoginError,
BitwardenLogoutError,
BitwardenTOTPError,
BitwardenUnlockError,
)

LOG = structlog.get_logger()


class BitwardenConstants(StrEnum):
CLIENT_ID = "BW_CLIENT_ID"
CLIENT_SECRET = "BW_CLIENT_SECRET"
MASTER_PASSWORD = "BW_MASTER_PASSWORD"
URL = "BW_URL"

USERNAME = "BW_USERNAME"
PASSWORD = "BW_PASSWORD"
TOTP = "BW_TOTP"


class BitwardenService:
@staticmethod
def run_command(command: list[str], additional_env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
Expand All @@ -34,57 +52,72 @@ def get_secret_value_from_url(
Get the secret value from the Bitwarden CLI.
"""
# Step 1: Set up environment variables and log in
env = {"BW_CLIENTID": client_id, "BW_CLIENTSECRET": client_secret, "BW_PASSWORD": master_password}
login_command = ["bw", "login", "--apikey"]
login_result = BitwardenService.run_command(login_command, env)

# Print both stdout and stderr for debugging
if login_result.stderr:
raise BitwardenLoginError(login_result.stderr)

# Step 2: Unlock the vault
unlock_command = ["bw", "unlock", "--passwordenv", "BW_PASSWORD"]
unlock_result = BitwardenService.run_command(unlock_command, env)

if unlock_result.stderr:
raise BitwardenUnlockError(unlock_result.stderr)

# Extract session key
try:
session_key = unlock_result.stdout.split('"')[1]
except IndexError:
raise BitwardenUnlockError("Unable to extract session key.")

if not session_key:
raise BitwardenUnlockError("Session key is empty.")

# Step 3: Retrieve the items
list_command = ["bw", "list", "items", "--url", url, "--session", session_key]
items_result = BitwardenService.run_command(list_command)

if items_result.stderr:
raise BitwardenListItemsError(items_result.stderr)

# Parse the items and extract credentials
try:
items = json.loads(items_result.stdout)
except json.JSONDecodeError:
raise BitwardenListItemsError("Failed to parse items JSON. Output: " + items_result.stdout)

if not items:
raise BitwardenListItemsError("No items found in Bitwarden.")

credentials = [
{"username": item["login"]["username"], "password": item["login"]["password"]}
for item in items
if "login" in item
]

# Step 4: Log out
BitwardenService.logout()

# Todo: Handle multiple credentials, for now just return the last one
return credentials[-1] if credentials else {}
env = {"BW_CLIENTID": client_id, "BW_CLIENTSECRET": client_secret, "BW_PASSWORD": master_password}
login_command = ["bw", "login", "--apikey"]
login_result = BitwardenService.run_command(login_command, env)

# Print both stdout and stderr for debugging
if login_result.stderr:
raise BitwardenLoginError(login_result.stderr)

# Step 2: Unlock the vault
unlock_command = ["bw", "unlock", "--passwordenv", "BW_PASSWORD"]
unlock_result = BitwardenService.run_command(unlock_command, env)

# This is a part of Bitwarden's client-side telemetry
# TODO -- figure out how to disable this telemetry so we never get this error
# https://github.com/bitwarden/clients/blob/9d10825dbd891c0f41fe1b4c4dd3ca4171f63be5/libs/common/src/services/api.service.ts#L1473
if unlock_result.stderr and "Event post failed" not in unlock_result.stderr:
raise BitwardenUnlockError(unlock_result.stderr)

# Extract session key
try:
session_key = unlock_result.stdout.split('"')[1]
except IndexError:
raise BitwardenUnlockError("Unable to extract session key.")

if not session_key:
raise BitwardenUnlockError("Session key is empty.")

# Step 3: Retrieve the items
list_command = ["bw", "list", "items", "--url", url, "--session", session_key]
items_result = BitwardenService.run_command(list_command)

if items_result.stderr and "Event post failed" not in items_result.stderr:
raise BitwardenListItemsError(items_result.stderr)

# Parse the items and extract credentials
try:
items = json.loads(items_result.stdout)
except json.JSONDecodeError:
raise BitwardenListItemsError("Failed to parse items JSON. Output: " + items_result.stdout)

if not items:
raise BitwardenListItemsError("No items found in Bitwarden.")

totp_command = ["bw", "get", "totp", url, "--session", session_key]
totp_result = BitwardenService.run_command(totp_command)

if totp_result.stderr and "Event post failed" not in totp_result.stderr:
LOG.warning("Bitwarden TOTP Error", error=totp_result.stderr, e=BitwardenTOTPError(totp_result.stderr))
totp_code = totp_result.stdout

credentials: list[dict[str, str]] = [
{
BitwardenConstants.USERNAME: item["login"]["username"],
BitwardenConstants.PASSWORD: item["login"]["password"],
BitwardenConstants.TOTP: totp_code,
}
for item in items
if "login" in item
]

# Todo: Handle multiple credentials, for now just return the last one
return credentials[-1] if credentials else {}
finally:
# Step 4: Log out
BitwardenService.logout()

@staticmethod
def logout() -> None:
Expand Down
27 changes: 24 additions & 3 deletions skyvern/forge/sdk/workflow/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from skyvern.exceptions import BitwardenBaseError, WorkflowRunContextNotInitialized
from skyvern.forge.sdk.api.aws import AsyncAWSClient
from skyvern.forge.sdk.services.bitwarden import BitwardenService
from skyvern.forge.sdk.services.bitwarden import BitwardenConstants, BitwardenService
from skyvern.forge.sdk.workflow.exceptions import OutputParameterKeyCollisionError
from skyvern.forge.sdk.workflow.models.parameter import (
PARAMETER_TYPE,
Expand Down Expand Up @@ -88,6 +88,18 @@ def get_original_secret_value_or_none(self, secret_id_or_value: Any) -> Any:
return self.secrets.get(secret_id_or_value)
return None

def get_secrets_from_password_manager(self) -> dict[str, Any]:
"""
Get the secrets from the password manager. The secrets dict will contain the actual secret values.
"""
secret_credentials = BitwardenService.get_secret_value_from_url(
url=self.secrets[BitwardenConstants.URL],
client_secret=self.secrets[BitwardenConstants.CLIENT_SECRET],
client_id=self.secrets[BitwardenConstants.CLIENT_ID],
master_password=self.secrets[BitwardenConstants.MASTER_PASSWORD],
)
return secret_credentials

@staticmethod
def generate_random_secret_id() -> str:
return f"secret_{uuid.uuid4()}"
Expand Down Expand Up @@ -138,17 +150,26 @@ async def register_parameter_value(
url,
)
if secret_credentials:
self.secrets[BitwardenConstants.URL] = url
self.secrets[BitwardenConstants.CLIENT_SECRET] = client_secret
self.secrets[BitwardenConstants.CLIENT_ID] = client_id
self.secrets[BitwardenConstants.MASTER_PASSWORD] = master_password

random_secret_id = self.generate_random_secret_id()
# username secret
username_secret_id = f"{random_secret_id}_username"
self.secrets[username_secret_id] = secret_credentials["username"]
self.secrets[username_secret_id] = secret_credentials[BitwardenConstants.USERNAME]
# password secret
password_secret_id = f"{random_secret_id}_password"
self.secrets[password_secret_id] = secret_credentials["password"]
self.secrets[password_secret_id] = secret_credentials[BitwardenConstants.PASSWORD]

totp_secret_id = f"{random_secret_id}_totp"
self.secrets[totp_secret_id] = BitwardenConstants.TOTP

self.values[parameter.key] = {
"username": username_secret_id,
"password": password_secret_id,
"totp": totp_secret_id,
}
except BitwardenBaseError as e:
BitwardenService.logout()
Expand Down
5 changes: 5 additions & 0 deletions skyvern/webeye/actions/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from skyvern.forge.sdk.api.files import download_file
from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.tasks import Task
from skyvern.forge.sdk.services.bitwarden import BitwardenConstants
from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.webeye.actions import actions
from skyvern.webeye.actions.actions import Action, ActionType, ClickAction, ScrapeResult, UploadFileAction, WebAction
Expand Down Expand Up @@ -434,6 +435,10 @@ def get_actual_value_of_parameter_if_secret(task: Task, parameter: str) -> Any:

workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(task.workflow_run_id)
secret_value = workflow_run_context.get_original_secret_value_or_none(parameter)

if secret_value == BitwardenConstants.TOTP:
secrets = workflow_run_context.get_secrets_from_password_manager()
secret_value = secrets[BitwardenConstants.TOTP]
return secret_value if secret_value is not None else parameter


Expand Down
5 changes: 3 additions & 2 deletions skyvern/webeye/scraper/domUtils.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Commands for manipulating rects.
// Want to debug this? Run chromium, go to sources, and create a new snippet with the code in domUtils.js
class Rect {
// Create a rect given the top left and bottom right corners.
static create(x1, y1, x2, y2) {
Expand Down Expand Up @@ -273,8 +274,8 @@ function hasWidgetRole(element) {

function isInteractableInput(element) {
const tagName = element.tagName.toLowerCase();
const type = element.getAttribute("type");
if (tagName !== "input" || !type) {
const type = element.getAttribute("type") ?? "text"; // Default is text: https://www.w3schools.com/html/html_form_input_types.asp
if (tagName !== "input") {
// let other checks decide
return false;
}
Expand Down
Loading