diff --git a/examples/sdk_examples/device_management/admin_account_lock_device.py b/examples/sdk_examples/device_management/admin_account_lock_device.py new file mode 100644 index 0000000..acf106a --- /dev/null +++ b/examples/sdk_examples/device_management/admin_account_lock_device.py @@ -0,0 +1,549 @@ +import getpass +import sqlite3 +import json +import logging +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + device_management, + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import sqlite_storage, vault_online, ksm_management + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + + +def print_admin_devices_table(devices): + if not devices: + print('\nNo devices found.') + return + print(f'\nAdmin Device List ({len(devices)} devices found)') + print('=' * 120) + print( + f"{'ID':<4} {'Enterprise User ID':<20} {'Device Name':<22} " + f"{'UI Category':<18} {'Device Status':<16} {'Login Status':<14} {'Last Accessed':<20}" + ) + print('-' * 120) + for d in devices: + last = d.last_accessed.strftime('%Y-%m-%d %H:%M:%S') if d.last_accessed else 'N/A' + print( + f"{d.list_index:<4} {d.enterprise_user_id:<20} {d.name[:21]:<22} " + f"{d.ui_category[:17]:<18} {d.device_status[:15]:<16} " + f"{d.login_status[:13]:<14} {last:<20}" + ) + print('-' * 120) + + +def main(): + keeper_auth_context, _ = login() + if not keeper_auth_context: + return + + # Fill in your values here (enterprise admin required). + enterprise_user_id = 0 + device_identifiers = [''] + + try: + print(f'Account-locking {len(device_identifiers)} device(s) for user {enterprise_user_id}...') + for name in device_management.account_lock_admin_user_devices( + keeper_auth_context, enterprise_user_id, device_identifiers + ): + print( + f"Device action successfully completed: '{name}' account locked " + f'for user {enterprise_user_id}' + ) + print(f'\nUpdated device list for user {enterprise_user_id}:') + print_admin_devices_table( + device_management.list_admin_devices(keeper_auth_context, [enterprise_user_id]) + ) + except Exception as e: + print(f'Error account-locking admin devices: {e}') + finally: + keeper_auth_context.close() + + +if __name__ == '__main__': + main() diff --git a/examples/sdk_examples/device_management/admin_account_unlock_device.py b/examples/sdk_examples/device_management/admin_account_unlock_device.py new file mode 100644 index 0000000..dc1eadb --- /dev/null +++ b/examples/sdk_examples/device_management/admin_account_unlock_device.py @@ -0,0 +1,549 @@ +import getpass +import sqlite3 +import json +import logging +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + device_management, + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import sqlite_storage, vault_online, ksm_management + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + + +def print_admin_devices_table(devices): + if not devices: + print('\nNo devices found.') + return + print(f'\nAdmin Device List ({len(devices)} devices found)') + print('=' * 120) + print( + f"{'ID':<4} {'Enterprise User ID':<20} {'Device Name':<22} " + f"{'UI Category':<18} {'Device Status':<16} {'Login Status':<14} {'Last Accessed':<20}" + ) + print('-' * 120) + for d in devices: + last = d.last_accessed.strftime('%Y-%m-%d %H:%M:%S') if d.last_accessed else 'N/A' + print( + f"{d.list_index:<4} {d.enterprise_user_id:<20} {d.name[:21]:<22} " + f"{d.ui_category[:17]:<18} {d.device_status[:15]:<16} " + f"{d.login_status[:13]:<14} {last:<20}" + ) + print('-' * 120) + + +def main(): + keeper_auth_context, _ = login() + if not keeper_auth_context: + return + + # Fill in your values here (enterprise admin required). + enterprise_user_id = 0 + device_identifiers = [''] + + try: + print(f'Account-unlocking {len(device_identifiers)} device(s) for user {enterprise_user_id}...') + for name in device_management.account_unlock_admin_user_devices( + keeper_auth_context, enterprise_user_id, device_identifiers + ): + print( + f"Device action successfully completed: '{name}' account unlocked " + f'for user {enterprise_user_id}' + ) + print(f'\nUpdated device list for user {enterprise_user_id}:') + print_admin_devices_table( + device_management.list_admin_devices(keeper_auth_context, [enterprise_user_id]) + ) + except Exception as e: + print(f'Error account-unlocking admin devices: {e}') + finally: + keeper_auth_context.close() + + +if __name__ == '__main__': + main() diff --git a/examples/sdk_examples/device_management/admin_lock_device.py b/examples/sdk_examples/device_management/admin_lock_device.py new file mode 100644 index 0000000..7c16c64 --- /dev/null +++ b/examples/sdk_examples/device_management/admin_lock_device.py @@ -0,0 +1,549 @@ +import getpass +import sqlite3 +import json +import logging +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + device_management, + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import sqlite_storage, vault_online, ksm_management + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + + +def print_admin_devices_table(devices): + if not devices: + print('\nNo devices found.') + return + print(f'\nAdmin Device List ({len(devices)} devices found)') + print('=' * 120) + print( + f"{'ID':<4} {'Enterprise User ID':<20} {'Device Name':<22} " + f"{'UI Category':<18} {'Device Status':<16} {'Login Status':<14} {'Last Accessed':<20}" + ) + print('-' * 120) + for d in devices: + last = d.last_accessed.strftime('%Y-%m-%d %H:%M:%S') if d.last_accessed else 'N/A' + print( + f"{d.list_index:<4} {d.enterprise_user_id:<20} {d.name[:21]:<22} " + f"{d.ui_category[:17]:<18} {d.device_status[:15]:<16} " + f"{d.login_status[:13]:<14} {last:<20}" + ) + print('-' * 120) + + +def main(): + keeper_auth_context, _ = login() + if not keeper_auth_context: + return + + # Fill in your values here (enterprise admin required). + enterprise_user_id = 0 + device_identifiers = [''] + + try: + print(f'Locking {len(device_identifiers)} device(s) for user {enterprise_user_id}...') + for name in device_management.lock_admin_user_devices( + keeper_auth_context, enterprise_user_id, device_identifiers + ): + print( + f"Device action successfully completed: '{name}' locked " + f'for user {enterprise_user_id}' + ) + print(f'\nUpdated device list for user {enterprise_user_id}:') + print_admin_devices_table( + device_management.list_admin_devices(keeper_auth_context, [enterprise_user_id]) + ) + except Exception as e: + print(f'Error locking admin devices: {e}') + finally: + keeper_auth_context.close() + + +if __name__ == '__main__': + main() diff --git a/examples/sdk_examples/device_management/admin_unlock_device.py b/examples/sdk_examples/device_management/admin_unlock_device.py new file mode 100644 index 0000000..04fa79e --- /dev/null +++ b/examples/sdk_examples/device_management/admin_unlock_device.py @@ -0,0 +1,549 @@ +import getpass +import sqlite3 +import json +import logging +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + device_management, + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import sqlite_storage, vault_online, ksm_management + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + + +def print_admin_devices_table(devices): + if not devices: + print('\nNo devices found.') + return + print(f'\nAdmin Device List ({len(devices)} devices found)') + print('=' * 120) + print( + f"{'ID':<4} {'Enterprise User ID':<20} {'Device Name':<22} " + f"{'UI Category':<18} {'Device Status':<16} {'Login Status':<14} {'Last Accessed':<20}" + ) + print('-' * 120) + for d in devices: + last = d.last_accessed.strftime('%Y-%m-%d %H:%M:%S') if d.last_accessed else 'N/A' + print( + f"{d.list_index:<4} {d.enterprise_user_id:<20} {d.name[:21]:<22} " + f"{d.ui_category[:17]:<18} {d.device_status[:15]:<16} " + f"{d.login_status[:13]:<14} {last:<20}" + ) + print('-' * 120) + + +def main(): + keeper_auth_context, _ = login() + if not keeper_auth_context: + return + + # Fill in your values here (enterprise admin required). + enterprise_user_id = 0 + device_identifiers = [''] + + try: + print(f'Unlocking {len(device_identifiers)} device(s) for user {enterprise_user_id}...') + for name in device_management.unlock_admin_user_devices( + keeper_auth_context, enterprise_user_id, device_identifiers + ): + print( + f"Device action successfully completed: '{name}' unlocked " + f'for user {enterprise_user_id}' + ) + print(f'\nUpdated device list for user {enterprise_user_id}:') + print_admin_devices_table( + device_management.list_admin_devices(keeper_auth_context, [enterprise_user_id]) + ) + except Exception as e: + print(f'Error unlocking admin devices: {e}') + finally: + keeper_auth_context.close() + + +if __name__ == '__main__': + main() diff --git a/keepercli-package/src/keepercli/cli.py b/keepercli-package/src/keepercli/cli.py index 2b7e24f..2c4cc91 100644 --- a/keepercli-package/src/keepercli/cli.py +++ b/keepercli-package/src/keepercli/cli.py @@ -1,12 +1,12 @@ import logging import sys -from typing import Optional, Any, Iterable, List +from typing import Optional, Any, Iterable, List, Callable from prompt_toolkit import PromptSession from prompt_toolkit.history import History from . import prompt_utils, api, autocomplete -from .commands import command_completer, base, command_history +from .commands import command_completer, base, command_history, command_visibility from .helpers import report_utils from .params import KeeperParams, KeeperConfig from keepersdk import constants @@ -30,6 +30,27 @@ def store_string(self, string: str) -> None: command_history.append(string) +_SCOPE_DISPLAY_NAMES = { + base.CommandScope.Account: 'Account Commands', + base.CommandScope.Vault: 'Vault Commands', + base.CommandScope.DeviceManagement: 'Device Management Commands', + base.CommandScope.Enterprise: 'Enterprise Commands', + base.CommandScope.MSP: 'MSP Commands', + base.CommandScope.Distributor: 'Distributor Commands', + base.CommandScope.Common: 'Miscellaneous Commands', +} + +_SCOPE_DISPLAY_ORDER = ( + base.CommandScope.Account, + base.CommandScope.Vault, + base.CommandScope.DeviceManagement, + base.CommandScope.Enterprise, + base.CommandScope.MSP, + base.CommandScope.Distributor, + base.CommandScope.Common, +) + + def do_command(command_line: str, context: KeeperParams, commands: base.CliCommands) -> Any: cmd, sep, args = command_line.partition(' ') orig_cmd = cmd @@ -44,7 +65,7 @@ def do_command(command_line: str, context: KeeperParams, commands: base.CliComma command, _ = commands.commands[cmd] return command.execute_args(context, args.strip(), command=orig_cmd) else: - display_command_help(commands) + display_command_help(commands, context) return None @@ -81,7 +102,8 @@ def get_prompt() -> str: if sys.stdin.isatty() and sys.stdout.isatty(): from prompt_toolkit.enums import EditingMode from prompt_toolkit.shortcuts import CompleteStyle - completer = command_completer.CommandCompleter(commands, autocomplete.standard_completer(context)) + completer = command_completer.CommandCompleter( + commands, autocomplete.standard_completer(context), context_getter=lambda: context) prompt_session = PromptSession( multiline=False, editing_mode=EditingMode.EMACS, complete_style=CompleteStyle.MULTI_COLUMN, complete_while_typing=False, completer=completer, auto_suggest=None, key_bindings=prompt_utils.kb, @@ -174,18 +196,31 @@ def get_prompt() -> str: context.clear_session() return 0 -def display_command_help(commands: base.CliCommands): +def display_command_help(commands: base.CliCommands, context: Optional[KeeperParams] = None): alias_lookup = {x[1]: x[0] for x in commands.aliases.items()} - all_scopes = {x[1]: x[1].name for x in commands.commands.values()} - scopes = sorted(all_scopes.keys()) + available_scopes = {value[1] for value in commands.commands.values()} headers = ['', 'Command', 'Alias', '', 'Description'] table = [] - for scope in scopes: - scope_commands = [key for key, value in commands.commands.items() if value[1] == scope] + for scope in _SCOPE_DISPLAY_ORDER: + if scope not in available_scopes: + continue + scope_commands = [ + key for key, value in commands.commands.items() + if value[1] == scope and command_visibility.is_command_visible(key, context) + ] + if not scope_commands: + continue + scope_name = _SCOPE_DISPLAY_NAMES.get(scope, scope.name) idx = 0 for cmd in sorted(scope_commands): c = commands.commands[cmd][0] - table.append([all_scopes[scope] if idx == 0 else '', cmd, alias_lookup.get(cmd) or '', '...', c.description()]) + table.append([ + scope_name if idx == 0 else '', + cmd, + alias_lookup.get(cmd) or '', + '...', + c.description(), + ]) idx += 1 prompt_utils.output_text('\nCommands:') diff --git a/keepercli-package/src/keepercli/commands/base.py b/keepercli-package/src/keepercli/commands/base.py index 624b060..925ae48 100644 --- a/keepercli-package/src/keepercli/commands/base.py +++ b/keepercli-package/src/keepercli/commands/base.py @@ -63,6 +63,7 @@ def description(self): class CommandScope(enum.IntFlag): Account = enum.auto() Vault = enum.auto() + DeviceManagement = enum.auto() Enterprise = enum.auto() MSP = enum.auto() Distributor = enum.auto() diff --git a/keepercli-package/src/keepercli/commands/command_completer.py b/keepercli-package/src/keepercli/commands/command_completer.py index db7fb32..3110536 100644 --- a/keepercli-package/src/keepercli/commands/command_completer.py +++ b/keepercli-package/src/keepercli/commands/command_completer.py @@ -4,14 +4,17 @@ from . import base from .. import autocomplete +from .command_visibility import is_command_visible class CommandCompleter(completion.Completer): def __init__(self, command_collection: base.CommandCollection, - on_complete: Optional[Callable[[str, str], Iterable[str]]] = None) -> None: + on_complete: Optional[Callable[[str, str], Iterable[str]]] = None, + context_getter: Optional[Callable[[], object]] = None) -> None: self.commands = command_collection self.on_complete = on_complete + self.context_getter = context_getter def get_completions(self, document, complete_event): if not document.is_cursor_at_the_end: @@ -33,7 +36,11 @@ def get_completions(self, document, complete_event): command = self.commands.get_command_by_name(cmd) if command is None: if len(tokens) == 0 and document.char_before_cursor != ' ': - cmds = [x for x in self.commands.query_commands(cmd)] + context = self.context_getter() if self.context_getter else None + cmds = [ + x for x in self.commands.query_commands(cmd) + if is_command_visible(x, context) + ] cmds.sort() for c in cmds: yield completion.Completion(c, start_position=-len(document.text)) diff --git a/keepercli-package/src/keepercli/commands/command_visibility.py b/keepercli-package/src/keepercli/commands/command_visibility.py new file mode 100644 index 0000000..597159e --- /dev/null +++ b/keepercli-package/src/keepercli/commands/command_visibility.py @@ -0,0 +1,20 @@ +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from ..params import KeeperParams + +DEVICE_ADMIN_COMMANDS = frozenset({'device-admin-list', 'device-admin-action'}) + + +def is_enterprise_admin(context: Optional['KeeperParams']) -> bool: + return bool( + context + and context.auth + and context.auth.auth_context.is_enterprise_admin + ) + + +def is_command_visible(command: str, context: Optional['KeeperParams']) -> bool: + if command in DEVICE_ADMIN_COMMANDS: + return is_enterprise_admin(context) + return True diff --git a/keepercli-package/src/keepercli/commands/device_management.py b/keepercli-package/src/keepercli/commands/device_management.py index 319d432..5be6aca 100644 --- a/keepercli-package/src/keepercli/commands/device_management.py +++ b/keepercli-package/src/keepercli/commands/device_management.py @@ -4,6 +4,7 @@ from datetime import datetime from typing import Callable, Dict, List, Optional +from keepersdk import errors from keepersdk.authentication import device_management from . import base @@ -34,9 +35,35 @@ def _format_timestamp(dt: Optional[datetime]) -> str: def _sdk_error(exc: Exception) -> base.CommandError: + if isinstance(exc, errors.KeeperApiError): + if device_management.is_device_api_unavailable(exc): + return base.CommandError(device_management.DEVICE_FEATURE_UNAVAILABLE_MESSAGE) return base.CommandError(str(exc)) +def _validate_admin_enterprise_user_ids( + context: KeeperParams, + enterprise_user_ids: List[int], +) -> List[int]: + """Return enterprise user IDs known to enterprise data; warn when IDs are not found.""" + base.require_enterprise_admin(context) + resolved: List[int] = [] + seen: set[int] = set() + for user_id in enterprise_user_ids: + if user_id in seen: + continue + seen.add(user_id) + if context.enterprise_data.users.get_entity(user_id) is None: + logger.warning( + "Warning: No enterprise_user_id found matching '%s'", user_id + ) + else: + resolved.append(user_id) + if not resolved and enterprise_user_ids: + logger.info('No matching enterprise_user_id found') + return resolved + + def _run_device_action_command( context: KeeperParams, device_identifiers: List[str], @@ -47,7 +74,7 @@ def _run_device_action_command( try: for name in action_fn(context.auth, device_identifiers): logger.info(success_message, name) - except ValueError as e: + except (ValueError, errors.KeeperApiError) as e: raise _sdk_error(e) from e @@ -79,7 +106,7 @@ def _display_admin_devices( """Fetch and print the admin device list table for the given enterprise user IDs.""" try: devices = device_management.list_admin_devices(context.auth, enterprise_user_ids) - except ValueError as e: + except (ValueError, errors.KeeperApiError) as e: raise _sdk_error(e) from e if not devices: @@ -186,6 +213,31 @@ def _display_admin_devices( 'handler': device_management.remove_admin_user_devices, 'action_verb': 'removed', }, + 'lock': { + 'description': ( + 'Lock the device for all users on the devices and the associated auto linked devices. ' + 'Logout all users from the device' + ), + 'handler': device_management.lock_admin_user_devices, + 'action_verb': 'locked', + }, + 'unlock': { + 'description': ( + 'Unlock the devices and the associated auto linked devices for the calling user' + ), + 'handler': device_management.unlock_admin_user_devices, + 'action_verb': 'unlocked', + }, + 'account-lock': { + 'description': 'Lock the device for the user only. If user is logged in, logout', + 'handler': device_management.account_lock_admin_user_devices, + 'action_verb': 'account locked', + }, + 'account-unlock': { + 'description': 'Unlock the device for the user', + 'handler': device_management.account_unlock_admin_user_devices, + 'action_verb': 'account unlocked', + }, } DEVICE_ADMIN_ACTION_CHOICES = list(DEVICE_ADMIN_ACTION_DEFINITIONS.keys()) @@ -231,7 +283,7 @@ def execute(self, context: KeeperParams, **kwargs): base.require_login(context) try: devices = device_management.list_user_devices(context.auth) - except ValueError as e: + except (ValueError, errors.KeeperApiError) as e: raise _sdk_error(e) from e if not devices: @@ -277,7 +329,7 @@ class DeviceRenameCommand(base.ArgparseCommand): def __init__(self): parser = argparse.ArgumentParser( prog='device-rename', - description='Rename a device for the current user', + description='Rename user devices', ) DeviceRenameCommand.add_arguments_to_parser(parser) super().__init__(parser) @@ -302,7 +354,7 @@ def execute(self, context: KeeperParams, **kwargs): logger.info("Device name updated from '%s' to '%s'", old_name, updated_name) logger.info('') _display_user_devices(context, title_prefix='Updated ') - except ValueError as e: + except (ValueError, errors.KeeperApiError) as e: raise _sdk_error(e) from e @@ -391,7 +443,7 @@ def add_arguments_to_parser(parser: argparse.ArgumentParser): 'enterprise_user_ids', nargs='+', type=int, - help='List of Enterprise User IDs (required). You can get enterprise user IDs by running "ei --users" command', + help='List of Enterprise User IDs (required). You can get enterprise user IDs by running "enterprise-info user"', ) parser.error = base.ArgparseCommand.raise_parse_exception parser.exit = base.ArgparseCommand.suppress_exit @@ -399,11 +451,15 @@ def add_arguments_to_parser(parser: argparse.ArgumentParser): def execute(self, context: KeeperParams, **kwargs): """Display admin device list in table or JSON format for the given enterprise user IDs.""" base.require_enterprise_admin(context) - enterprise_user_ids = kwargs.get('enterprise_user_ids') or [] + enterprise_user_ids = _validate_admin_enterprise_user_ids( + context, kwargs.get('enterprise_user_ids') or [] + ) + if not enterprise_user_ids: + return try: devices = device_management.list_admin_devices(context.auth, enterprise_user_ids) - except ValueError as e: + except (ValueError, errors.KeeperApiError) as e: raise _sdk_error(e) from e if not devices: @@ -437,7 +493,7 @@ class DeviceAdminActionCommand(base.ArgparseCommand): def __init__(self): parser = argparse.ArgumentParser( prog='device-admin-action', - description='Perform various action on one or more devices that the Admin has control of.', + description='Perform actions on devices across enterprise users', ) DeviceAdminActionCommand.add_arguments_to_parser(parser) super().__init__(parser) @@ -488,7 +544,12 @@ def execute(self, context: KeeperParams, **kwargs): """Run the requested admin device action and refresh the device list.""" base.require_enterprise_admin(context) action = kwargs.get('action') - enterprise_user_id = kwargs.get('enterprise_user_id') + validated_user_ids = _validate_admin_enterprise_user_ids( + context, [kwargs.get('enterprise_user_id')] + ) + if not validated_user_ids: + return + enterprise_user_id = validated_user_ids[0] devices = kwargs.get('devices') or [] config = DEVICE_ADMIN_ACTION_DEFINITIONS.get(action or '') if not config: @@ -506,7 +567,7 @@ def execute(self, context: KeeperParams, **kwargs): "Device action successfully completed: '%s' %s for user %s", name, action_verb, enterprise_user_id, ) - except ValueError as e: + except (ValueError, errors.KeeperApiError) as e: raise _sdk_error(e) from e logger.info('Updated device list for user %s:', enterprise_user_id) diff --git a/keepercli-package/src/keepercli/register_commands.py b/keepercli-package/src/keepercli/register_commands.py index b7e87e2..2013888 100644 --- a/keepercli-package/src/keepercli/register_commands.py +++ b/keepercli-package/src/keepercli/register_commands.py @@ -23,9 +23,9 @@ def register_commands(commands: base.CliCommands, scopes: Optional[base.CommandS commands.register_command('biometric', BiometricCommand(), base.CommandScope.Account) commands.register_command('logout', account_commands.LogoutCommand(), base.CommandScope.Account) commands.register_command('this-device', account_commands.ThisDeviceCommand(), base.CommandScope.Account) - commands.register_command('device-list', device_management.DeviceListCommand(), base.CommandScope.Account) - commands.register_command('device-action', device_management.DeviceActionCommand(), base.CommandScope.Account) - commands.register_command('device-rename', device_management.DeviceRenameCommand(), base.CommandScope.Account) + commands.register_command('device-list', device_management.DeviceListCommand(), base.CommandScope.DeviceManagement) + commands.register_command('device-action', device_management.DeviceActionCommand(), base.CommandScope.DeviceManagement) + commands.register_command('device-rename', device_management.DeviceRenameCommand(), base.CommandScope.DeviceManagement) commands.register_command('whoami', account_commands.WhoamiCommand(), base.CommandScope.Account) commands.register_command('reset-password', account_commands.ResetPasswordCommand(), base.CommandScope.Account) commands.register_command('2fa', two_fa.TwoFaCommand(), base.CommandScope.Account) @@ -127,8 +127,8 @@ def register_commands(commands: base.CliCommands, scopes: Optional[base.CommandS commands.register_command('download-membership', importer_commands.DownloadMembershipCommand(), base.CommandScope.Enterprise) commands.register_command('apply-membership', importer_commands.ApplyMembershipCommand(), base.CommandScope.Enterprise) commands.register_command('device-approve', enterprise_user.EnterpriseDeviceApprovalCommand(), base.CommandScope.Enterprise) - commands.register_command('device-admin-list', device_management.DeviceAdminListCommand(), base.CommandScope.Enterprise) - commands.register_command('device-admin-action', device_management.DeviceAdminActionCommand(), base.CommandScope.Enterprise) + commands.register_command('device-admin-list', device_management.DeviceAdminListCommand(), base.CommandScope.DeviceManagement) + commands.register_command('device-admin-action', device_management.DeviceAdminActionCommand(), base.CommandScope.DeviceManagement) commands.register_command('pedm', pedm_admin.PedmCommand(), base.CommandScope.Enterprise) commands.register_command('msp-down', msp.MspDownCommand(), base.CommandScope.Enterprise, 'md') commands.register_command('msp-info', msp.MspInfoCommand(), base.CommandScope.Enterprise, 'mi') diff --git a/keepersdk-package/src/keepersdk/authentication/device_management.py b/keepersdk-package/src/keepersdk/authentication/device_management.py index 871d411..7eb0322 100644 --- a/keepersdk-package/src/keepersdk/authentication/device_management.py +++ b/keepersdk-package/src/keepersdk/authentication/device_management.py @@ -12,7 +12,7 @@ from datetime import datetime from typing import Callable, List, Optional, Tuple -from .. import utils +from .. import errors, utils from ..proto import APIRequest_pb2, DeviceManagement_pb2 from . import keeper_auth @@ -24,6 +24,33 @@ URL_DEVICE_ADMIN_LIST = 'dm/device_admin_list' URL_DEVICE_ADMIN_ACTION = 'dm/device_admin_action' +DEVICE_FEATURE_UNAVAILABLE_MESSAGE = ( + 'Notice: This feature is not in production yet. It will be available soon.' +) + + +def is_device_api_unavailable(error: errors.KeeperApiError) -> bool: + return error.result_code in (404, '404', 'invalid_path_or_method') + + +def _execute_device_rest( + auth: keeper_auth.KeeperAuth, + rest_endpoint: str, + request, + response_type, +): + """Call a device-management REST endpoint with Commander-aligned unavailable-API handling.""" + try: + return auth.execute_auth_rest( + rest_endpoint=rest_endpoint, + request=request, + response_type=response_type, + ) + except errors.KeeperApiError as exc: + if is_device_api_unavailable(exc): + raise ValueError(DEVICE_FEATURE_UNAVAILABLE_MESSAGE) from exc + raise + @dataclass(frozen=True) class UserDeviceInfo: @@ -92,10 +119,11 @@ def rename_user_device( dr.encryptedDeviceToken = device_token dr.deviceNewName = sanitized - rs = auth.execute_auth_rest( - rest_endpoint=URL_DEVICE_USER_RENAME, - request=rq, - response_type=DeviceManagement_pb2.DeviceRenameResponse, + rs = _execute_device_rest( + auth, + URL_DEVICE_USER_RENAME, + rq, + DeviceManagement_pb2.DeviceRenameResponse, ) if not rs or not rs.deviceRenameResult: raise ValueError('No response returned from device rename') @@ -161,7 +189,7 @@ def list_admin_devices( """ if not enterprise_user_ids: raise ValueError( - 'Enterprise User ID is required. You can get enterprise user IDs by running: ei --users' + 'Enterprise User ID is required. You can get enterprise user IDs by running: enterprise-info user' ) for user_id in enterprise_user_ids: _validate_enterprise_user_id(user_id) @@ -304,11 +332,56 @@ def _validate_link_unlink_identifiers(device_identifiers: List[str]) -> None: raise ValueError('At least two device identifiers are required for link/unlink') +def lock_admin_user_devices( + auth: keeper_auth.KeeperAuth, + enterprise_user_id: int, + device_identifiers: List[str], +) -> List[str]: + """Lock devices for all users and linked devices; log out all users (enterprise admin).""" + return _execute_admin_device_action( + auth, enterprise_user_id, device_identifiers, DeviceManagement_pb2.DA_LOCK + ) + + +def unlock_admin_user_devices( + auth: keeper_auth.KeeperAuth, + enterprise_user_id: int, + device_identifiers: List[str], +) -> List[str]: + """Unlock devices and linked devices for the enterprise user (enterprise admin).""" + return _execute_admin_device_action( + auth, enterprise_user_id, device_identifiers, DeviceManagement_pb2.DA_UNLOCK + ) + + +def account_lock_admin_user_devices( + auth: keeper_auth.KeeperAuth, + enterprise_user_id: int, + device_identifiers: List[str], +) -> List[str]: + """Account-lock devices for the enterprise user only (enterprise admin).""" + return _execute_admin_device_action( + auth, enterprise_user_id, device_identifiers, DeviceManagement_pb2.DA_DEVICE_ACCOUNT_LOCK + ) + + +def account_unlock_admin_user_devices( + auth: keeper_auth.KeeperAuth, + enterprise_user_id: int, + device_identifiers: List[str], +) -> List[str]: + """Account-unlock devices for the enterprise user (enterprise admin).""" + return _execute_admin_device_action( + auth, enterprise_user_id, device_identifiers, DeviceManagement_pb2.DA_DEVICE_ACCOUNT_UNLOCK + ) + + def _fetch_devices(auth: keeper_auth.KeeperAuth) -> List[DeviceManagement_pb2.Device]: - rs = auth.execute_auth_rest( - rest_endpoint=URL_DEVICE_USER_LIST, - request=None, - response_type=DeviceManagement_pb2.DeviceUserResponse, + rs = _execute_device_rest( + auth, + URL_DEVICE_USER_LIST, + None, + DeviceManagement_pb2.DeviceUserResponse, ) if not rs: return [] @@ -351,10 +424,11 @@ def _fetch_admin_device_entries( ) -> List[Tuple[int, DeviceManagement_pb2.Device]]: rq = DeviceManagement_pb2.DeviceAdminRequest() rq.enterpriseUserIds.extend(enterprise_user_ids) - rs = auth.execute_auth_rest( - rest_endpoint=URL_DEVICE_ADMIN_LIST, - request=rq, - response_type=DeviceManagement_pb2.DeviceAdminResponse, + rs = _execute_device_rest( + auth, + URL_DEVICE_ADMIN_LIST, + rq, + DeviceManagement_pb2.DeviceAdminResponse, ) if not rs: return [] @@ -507,10 +581,11 @@ def _execute_device_action( device_action.deviceActionType = action_type device_action.encryptedDeviceToken.extend(list(token_to_device.keys())) - rs = auth.execute_auth_rest( - rest_endpoint=URL_DEVICE_USER_ACTION, - request=rq, - response_type=DeviceManagement_pb2.DeviceActionResponse, + rs = _execute_device_rest( + auth, + URL_DEVICE_USER_ACTION, + rq, + DeviceManagement_pb2.DeviceActionResponse, ) if not rs or not rs.deviceActionResult: raise ValueError('No response returned from device action') @@ -557,10 +632,11 @@ def _execute_admin_device_action( admin_action.enterpriseUserId = enterprise_user_id admin_action.encryptedDeviceToken.extend(list(token_to_device.keys())) - rs = auth.execute_auth_rest( - rest_endpoint=URL_DEVICE_ADMIN_ACTION, - request=rq, - response_type=DeviceManagement_pb2.DeviceAdminActionResponse, + rs = _execute_device_rest( + auth, + URL_DEVICE_ADMIN_ACTION, + rq, + DeviceManagement_pb2.DeviceAdminActionResponse, ) if not rs or not rs.deviceAdminActionResults: raise ValueError('No response returned from device admin action') diff --git a/keepersdk-package/src/keepersdk/authentication/login_auth.py b/keepersdk-package/src/keepersdk/authentication/login_auth.py index 2ef6cea..2ff77e4 100644 --- a/keepersdk-package/src/keepersdk/authentication/login_auth.py +++ b/keepersdk-package/src/keepersdk/authentication/login_auth.py @@ -570,8 +570,12 @@ def decrypt_with_device_key(encrypted_data_key): _on_sso_redirect(login, sso_login_info, response.encryptedLoginToken) elif response.loginState == APIRequest_pb2.REQUIRES_DEVICE_ENCRYPTED_DATA_KEY: _on_request_data_key(login, response.encryptedLoginToken) - elif response.loginState in (APIRequest_pb2.DEVICE_ACCOUNT_LOCKED, APIRequest_pb2.DEVICE_LOCKED): - raise errors.InvalidDeviceTokenError(response.message) + elif response.loginState == APIRequest_pb2.DEVICE_ACCOUNT_LOCKED: + login.login_step = LoginStepError( + 'device_account_locked', 'Device for this account is locked') + elif response.loginState == APIRequest_pb2.DEVICE_LOCKED: + login.login_step = LoginStepError( + 'device_locked', 'This device is locked') else: state = APIRequest_pb2.LoginState.Name(response.loginState) # type: ignore message = f'State {state}: Not implemented: {response.message}' diff --git a/keepersdk-package/unit_tests/test_device_management.py b/keepersdk-package/unit_tests/test_device_management.py index 7e7b012..5fc8b9a 100644 --- a/keepersdk-package/unit_tests/test_device_management.py +++ b/keepersdk-package/unit_tests/test_device_management.py @@ -133,6 +133,19 @@ def test_list_admin_devices_rejects_bool_user_id(self): with self.assertRaises(ValueError): device_management.list_admin_devices(auth, [True]) + def test_list_admin_devices_feature_unavailable(self): + from keepersdk import errors + + auth = MagicMock() + auth.execute_auth_rest.side_effect = errors.KeeperApiError( + 'invalid_path_or_method', + 'An error has occurred. (bad_path)', + ) + with self.assertRaisesRegex( + ValueError, device_management.DEVICE_FEATURE_UNAVAILABLE_MESSAGE + ): + device_management.list_admin_devices(auth, [12345]) + def test_logout_admin_user_devices(self): auth = MagicMock() list_rs = _admin_list_response(12345, _device('Laptop', 100)) @@ -301,6 +314,41 @@ def test_ambiguous_device_name_lists_matches(self): with self.assertRaisesRegex(ValueError, 'No matching devices found'): device_management.unlock_user_devices(auth, ['Web Vault Chrome']) + def _admin_action_test(self, fn, action_type, user_id=12345, ident='1', device_name='Laptop'): + auth = MagicMock() + list_rs = _admin_list_response(user_id, _device(device_name, 100)) + action_rs = DeviceManagement_pb2.DeviceAdminActionResponse() + ar = action_rs.deviceAdminActionResults.add() + ar.deviceActionStatus = DeviceManagement_pb2.SUCCESS + ar.encryptedDeviceToken.append(b'\x01\x02') + auth.execute_auth_rest.side_effect = [list_rs, action_rs] + names = fn(auth, user_id, [ident]) + self.assertEqual(names, [device_name]) + admin_action = auth.execute_auth_rest.call_args_list[1].kwargs.get('request').deviceAdminAction[0] + self.assertEqual(admin_action.deviceActionType, action_type) + self.assertEqual(admin_action.enterpriseUserId, user_id) + + def test_lock_admin_user_devices(self): + self._admin_action_test( + device_management.lock_admin_user_devices, DeviceManagement_pb2.DA_LOCK + ) + + def test_unlock_admin_user_devices(self): + self._admin_action_test( + device_management.unlock_admin_user_devices, DeviceManagement_pb2.DA_UNLOCK + ) + + def test_account_lock_admin_user_devices(self): + self._admin_action_test( + device_management.account_lock_admin_user_devices, + DeviceManagement_pb2.DA_DEVICE_ACCOUNT_LOCK, + ) + + def test_account_unlock_admin_user_devices(self): + self._admin_action_test( + device_management.account_unlock_admin_user_devices, + DeviceManagement_pb2.DA_DEVICE_ACCOUNT_UNLOCK, + ) if __name__ == '__main__': unittest.main() diff --git a/keepersdk-package/unit_tests/test_login.py b/keepersdk-package/unit_tests/test_login.py index f805082..fbfdba7 100644 --- a/keepersdk-package/unit_tests/test_login.py +++ b/keepersdk-package/unit_tests/test_login.py @@ -14,6 +14,7 @@ class TestLogin(TestCase): StopAtDeviceApproval = False StopAtTwoFactor = False StopAtPassword = False + StopAtDeviceAccountLocked = False @staticmethod def mock_execute_rest(keeper_endpoint, rest_endpoint, request=None, response_type=None, session_token=None, payload_version=None): @@ -32,6 +33,9 @@ def mock_execute_rest(keeper_endpoint, rest_endpoint, request=None, response_typ lrq: APIRequest_pb2.StartLoginRequest = request lrs = response_type() lrs.encryptedLoginToken = data_vault.EncryptedLoginToken + if TestLogin.StopAtDeviceAccountLocked: + lrs.loginState = APIRequest_pb2.DEVICE_ACCOUNT_LOCKED + return lrs if TestLogin.StopAtDeviceApproval: lrs.loginState = APIRequest_pb2.DEVICE_APPROVAL_REQUIRED elif TestLogin.StopAtTwoFactor: @@ -169,6 +173,7 @@ def reset_stops(): TestLogin.StopAtDeviceApproval = False TestLogin.StopAtTwoFactor = False TestLogin.StopAtPassword = False + TestLogin.StopAtDeviceAccountLocked = False def test_success_flow(self): TestLogin.reset_stops() @@ -353,3 +358,24 @@ def test_invalid_password(self): self.assertIsInstance(step, login_auth.LoginStepPassword) with self.assertRaises(errors.KeeperApiError): step.verify_password('wrong password') + + def test_device_account_locked(self): + TestLogin.reset_stops() + TestLogin.StopAtDeviceAccountLocked = True + + auth = self.get_auth_sync() + config = auth.keeper_endpoint.get_configuration_storage().get() + device_count_before = len(list(config.devices().list())) + + auth.login(data_vault.UserName) + + step = auth.login_step + self.assertIsInstance(step, login_auth.LoginStepError) + self.assertEqual(step.code, 'device_account_locked') + config = auth.keeper_endpoint.get_configuration_storage().get() + self.assertEqual(len(list(config.devices().list())), device_count_before) + register_calls = [ + c for c in auth.keeper_endpoint.execute_rest.call_args_list + if c.args and c.args[0] == 'authentication/register_device' + ] + self.assertEqual(register_calls, [])