diff --git a/keepercli-package/src/keepercli/commands/enterprise_team.py b/keepercli-package/src/keepercli/commands/enterprise_team.py index f450d802..a6e819f4 100644 --- a/keepercli-package/src/keepercli/commands/enterprise_team.py +++ b/keepercli-package/src/keepercli/commands/enterprise_team.py @@ -13,6 +13,190 @@ logger = api.get_logger() +def _add_team_membership_arguments(parser: argparse.ArgumentParser) -> None: + parser.add_argument('-au', '--add-user', action='append', help='add user to team') + parser.add_argument('-ru', '--remove-user', action='append', help='remove user from team. @all') + parser.add_argument('-ar', '--add-role', action='append', help='add role to team') + parser.add_argument('-rr', '--remove-role', action='append', help='remove role from team. @all') + parser.add_argument( + '-hsf', '--hide-shared-folders', dest='hide_shared_folders', action='store', + choices=['on', 'off'], help='User does not see shared folders. --add-user only', + ) + + +def _add_edit_team_membership_arguments(parser: argparse.ArgumentParser) -> None: + parser.add_argument('-au', '--add-user', action='append', help='add user to team') + parser.add_argument('-ru', '--remove-user', action='append', help='remove user from team') + parser.add_argument( + '-hsf', '--hide-shared-folders', dest='hide_shared_folders', action='store', + choices=['on', 'off'], help='User does not see shared folders. --add-user only', + ) + + +def _validate_add_edit_membership(kwargs: Dict[str, Any], *, has_queued_teams: bool = False) -> None: + if kwargs.get('add_role') or kwargs.get('remove_role'): + raise base.CommandError( + 'Role membership is not supported on enterprise-team add/edit. ' + 'Use enterprise-team membership with -ar/--add-role or -rr/--remove-role.') + remove_users = kwargs.get('remove_user') + if isinstance(remove_users, list) and any(x == '@all' for x in remove_users): + raise base.CommandError( + '@all is not supported on enterprise-team add/edit. ' + 'Use enterprise-team membership with -ru @all.') + if has_queued_teams: + raise base.CommandError( + 'Membership changes are not supported when adding queued teams. ' + 'Use enterprise-team membership.') + + +def _has_membership_changes(kwargs: Dict[str, Any]) -> bool: + return any(( + kwargs.get('add_user'), + kwargs.get('remove_user'), + kwargs.get('add_role'), + kwargs.get('remove_role'), + )) + + +class _TeamMembershipTarget: + __slots__ = ('team_uid', 'name', 'queued') + + def __init__(self, team_uid: str, name: str, *, queued: bool = False) -> None: + self.team_uid = team_uid + self.name = name + self.queued = queued + + @classmethod + def from_team(cls, team: enterprise_types.Team) -> '_TeamMembershipTarget': + return cls(team.team_uid, team.name or '') + + @classmethod + def from_queued_team(cls, team: enterprise_types.QueuedTeam) -> '_TeamMembershipTarget': + return cls(team.team_uid, team.name or '', queued=True) + + @classmethod + def from_team_edit(cls, team: enterprise_management.TeamEdit) -> '_TeamMembershipTarget': + return cls(team.team_uid, team.name or '') + + +def _queue_team_membership_changes( + batch: batch_management.BatchManagement, + context: KeeperParams, + logger: enterprise_management.IEnterpriseManagementLogger, + kwargs: Dict[str, Any], + active_teams: List[_TeamMembershipTarget], + queued_teams: Optional[List[_TeamMembershipTarget]] = None, + *, + users_only: bool = False, +) -> None: + users_to_add: Optional[List[enterprise_types.User]] = None + roles_to_add: Optional[List[enterprise_types.Role]] = None + users_to_remove: Optional[List[enterprise_types.User]] = None + roles_to_remove: Optional[List[enterprise_types.Role]] = None + has_remove_all_users = False + has_remove_all_roles = False + + add_users = kwargs.get('add_user') + if isinstance(add_users, list): + users_to_add = enterprise_utils.UserUtils.resolve_existing_users( + context.enterprise_data, add_users) + add_roles = kwargs.get('add_role') + if not users_only and isinstance(add_roles, list): + resolved_roles = enterprise_utils.RoleUtils.resolve_existing_roles( + context.enterprise_data, add_roles) + roles_to_add = [] + for role in resolved_roles: + if enterprise_utils.RoleUtils.is_admin_role(context.enterprise_data, role.role_id): + logger.warning( + 'Teams cannot be assigned to roles with administrative permissions.') + else: + roles_to_add.append(role) + if not roles_to_add: + roles_to_add = None + remove_users = kwargs.get('remove_user') + if isinstance(remove_users, list): + has_remove_all_users = not users_only and any(x == '@all' for x in remove_users) + if not has_remove_all_users: + users_to_remove = enterprise_utils.UserUtils.resolve_existing_users( + context.enterprise_data, remove_users) + remove_roles = kwargs.get('remove_role') + if not users_only and isinstance(remove_roles, list): + has_remove_all_roles = any(x == '@all' for x in remove_roles) + if not has_remove_all_roles: + roles_to_remove = enterprise_utils.RoleUtils.resolve_existing_roles( + context.enterprise_data, remove_roles) + + user_type = enterprise_management.team_user_type_from_hsf_flag(kwargs.get('hide_shared_folders')) + + for team in active_teams: + existing_users = { + x.enterprise_user_id + for x in context.enterprise_data.team_users.get_links_by_subject(team.team_uid) + } + existing_roles = { + x.role_id + for x in context.enterprise_data.role_teams.get_links_by_object(team.team_uid) + } + if users_to_add: + for user in users_to_add: + if user.enterprise_user_id in existing_users: + if user_type is None: + logger.warning( + 'User \"%s\" is already a member of team \"%s\"', + user.username, team.name) + continue + batch.modify_team_users(to_add=[enterprise_management.TeamUserEdit( + team_uid=team.team_uid, + enterprise_user_id=user.enterprise_user_id, + user_type=user_type)]) + if roles_to_add: + team_roles_to_add = [x for x in roles_to_add if x.role_id not in existing_roles] + if team_roles_to_add: + batch.modify_role_teams(to_add=[enterprise_management.RoleTeamEdit( + role_id=x.role_id, team_uid=team.team_uid) for x in team_roles_to_add]) + if has_remove_all_users: + batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit( + team_uid=team.team_uid, enterprise_user_id=x) for x in existing_users]) + elif users_to_remove: + batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit( + team_uid=team.team_uid, enterprise_user_id=x.enterprise_user_id) + for x in users_to_remove]) + if has_remove_all_roles: + batch.modify_role_teams(to_remove=[enterprise_management.RoleTeamEdit( + role_id=x, team_uid=team.team_uid) for x in existing_roles]) + elif roles_to_remove: + batch.modify_role_teams(to_remove=[enterprise_management.RoleTeamEdit( + role_id=x.role_id, team_uid=team.team_uid) for x in roles_to_remove]) + + if users_only: + return + + for team in queued_teams or []: + existing_users = { + x.enterprise_user_id + for x in context.enterprise_data.queued_team_users.get_links_by_subject(team.team_uid) + } + if users_to_add: + for user in users_to_add: + if user.enterprise_user_id in existing_users: + if user_type is None: + logger.warning( + 'User \"%s\" is already queued for team \"%s\"', + user.username, team.name) + continue + batch.modify_team_users(to_add=[enterprise_management.TeamUserEdit( + team_uid=team.team_uid, + enterprise_user_id=user.enterprise_user_id, + user_type=user_type)]) + if has_remove_all_users: + batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit( + team_uid=team.team_uid, enterprise_user_id=x) for x in existing_users]) + elif users_to_remove: + batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit( + team_uid=team.team_uid, enterprise_user_id=x.enterprise_user_id) + for x in users_to_remove]) + + class EnterpriseTeamCommand(base.GroupCommand): def __init__(self): super().__init__('Manage an enterprise team(s)') @@ -143,6 +327,7 @@ def __init__(self): action='store', help='disable record re-shares') parser.add_argument('--restrict-view', dest='restrict_view', choices=['on', 'off'], action='store', help='disable view/copy passwords') + _add_edit_team_membership_arguments(parser) parser.add_argument('team', type=str, nargs='+', help='Team Name or Queued Team UID. Can be repeated.') super().__init__(parser) self.logger = api.get_logger() @@ -203,11 +388,13 @@ def execute(self, context: KeeperParams, **kwargs) -> None: restrict_view = r_view == 'on' batch = batch_management.BatchManagement(loader=context.enterprise_loader, logger=self) + new_team_edits: List[enterprise_management.TeamEdit] = [] if team_names: teams_to_add = [enterprise_management.TeamEdit( team_uid=utils.generate_uid(), name=x, node_id=parent_id, restrict_edit=restrict_edit, restrict_share=restrict_share, restrict_view=restrict_view) for x in team_names.values()] + new_team_edits = teams_to_add batch.modify_teams(to_add=teams_to_add) if queued_teams: @@ -217,6 +404,14 @@ def execute(self, context: KeeperParams, **kwargs) -> None: for x in queued_teams] batch.modify_teams(to_add=teams_to_add) + if _has_membership_changes(kwargs): + _validate_add_edit_membership(kwargs, has_queued_teams=bool(queued_teams)) + _queue_team_membership_changes( + batch, context, self, kwargs, + [_TeamMembershipTarget.from_team_edit(x) for x in new_team_edits], + users_only=True, + ) + batch.apply() class EnterpriseTeamEditCommand(base.ArgparseCommand, enterprise_management.IEnterpriseManagementLogger): @@ -232,6 +427,7 @@ def __init__(self): action='store', help='disable record re-shares') parser.add_argument('--restrict-view', dest='restrict_view', choices=['on', 'off'], action='store', help='disable view/copy passwords') + _add_edit_team_membership_arguments(parser) parser.add_argument('team', type=str, nargs='+', help='Team Name or UID. Can be repeated.') super().__init__(parser) self.logger = api.get_logger() @@ -281,6 +477,13 @@ def execute(self, context: KeeperParams, **kwargs) -> None: batch = batch_management.BatchManagement(loader=context.enterprise_loader, logger=self) batch.modify_teams(to_update=teams_to_edit) + if _has_membership_changes(kwargs): + _validate_add_edit_membership(kwargs) + _queue_team_membership_changes( + batch, context, self, kwargs, + [_TeamMembershipTarget.from_team(x) for x in team_list], + users_only=True, + ) batch.apply() @@ -309,10 +512,7 @@ def execute(self, context: KeeperParams, **kwargs) -> None: class EnterpriseTeamMembershipCommand(base.ArgparseCommand, enterprise_management.IEnterpriseManagementLogger): def __init__(self): parser = argparse.ArgumentParser(prog='enterprise-team membership', description='Manage enterprise team membership.') - parser.add_argument('-au', '--add-user', action='append', help='add user to team') - parser.add_argument('-ru', '--remove-user', action='append', help='remove user from team. @all') - parser.add_argument('-ar', '--add-role', action='append', help='add user to team') - parser.add_argument('-rr', '--remove-role', action='append', help='remove user from team, @all') + _add_team_membership_arguments(parser) parser.add_argument('team', type=str, nargs='+', help='Team Name or UID. Can be repeated.') super().__init__(parser) self.logger = api.get_logger() @@ -323,81 +523,29 @@ def warning(self, message: str) -> None: def execute(self, context: KeeperParams, **kwargs) -> None: base.require_enterprise_admin(context) - team_list, missing_names = enterprise_utils.TeamUtils.resolve_existing_teams(context.enterprise_data, kwargs.get('team')) + if not _has_membership_changes(kwargs): + raise base.CommandError( + 'No membership changes specified. Use -au/--add-user, -ru/--remove-user, ' + '-ar/--add-role, or -rr/--remove-role.') + + team_list, missing_names = enterprise_utils.TeamUtils.resolve_existing_teams( + context.enterprise_data, kwargs.get('team')) queued_team_list: List[enterprise_types.QueuedTeam] if missing_names: - queued_team_list, missing_names = enterprise_utils.TeamUtils.resolve_queued_teams(context.enterprise_data, missing_names) + queued_team_list, missing_names = enterprise_utils.TeamUtils.resolve_queued_teams( + context.enterprise_data, missing_names) else: queued_team_list = [] if isinstance(missing_names, list) and len(missing_names) > 0: mn = ', '.join((str(x) for x in missing_names)) raise base.CommandError(f'Team name(s) \"{mn}\" could not be resolved') - users_to_add: Optional[List[enterprise_types.User]] = None - roles_to_add: Optional[List[enterprise_types.Role]] = None - users_to_remove: Optional[List[enterprise_types.User]] = None - roles_to_remove: Optional[List[enterprise_types.Role]] = None - has_remove_all_users: bool = False - has_remove_all_roles: bool = False - - add_users = kwargs.get('add_user') - if isinstance(add_users, list): - users_to_add = enterprise_utils.UserUtils.resolve_existing_users(context.enterprise_data, add_users) - add_roles = kwargs.get('add_role') - if isinstance(add_roles, list): - roles_to_add = enterprise_utils.RoleUtils.resolve_existing_roles(context.enterprise_data, add_roles) - remove_users = kwargs.get('remove_user') - if isinstance(remove_users, list): - has_remove_all_users = any((True for x in remove_users if x == '@all')) - if not has_remove_all_users: - users_to_remove = enterprise_utils.UserUtils.resolve_existing_users(context.enterprise_data, remove_users) - remove_roles = kwargs.get('remove_role') - if isinstance(remove_roles, list): - has_remove_all_roles = any((True for x in remove_roles if x == '@all')) - if not has_remove_all_roles: - roles_to_remove = enterprise_utils.RoleUtils.resolve_existing_roles(context.enterprise_data, remove_roles) - batch = batch_management.BatchManagement(loader=context.enterprise_loader, logger=self) - for team in team_list: - existing_users = {x.enterprise_user_id for x in context.enterprise_data.team_users.get_links_by_subject(team.team_uid)} - existing_roles = {x.role_id for x in context.enterprise_data.role_teams.get_links_by_object(team.team_uid)} - if users_to_add: - users_to_add = [x for x in users_to_add if x.enterprise_user_id not in existing_users] - if users_to_add: - batch.modify_team_users(to_add=[enterprise_management.TeamUserEdit( - team_uid=team.team_uid, enterprise_user_id=x.enterprise_user_id) for x in users_to_add]) - if roles_to_add: - roles_to_add = [x for x in roles_to_add if x.role_id not in existing_roles] - if roles_to_add: - batch.modify_role_teams(to_add=[enterprise_management.RoleTeamEdit( - role_id=x.role_id, team_uid=team.team_uid) for x in roles_to_add]) - if has_remove_all_users: - batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit( - team_uid=team.team_uid, enterprise_user_id=x) for x in existing_users]) - elif users_to_remove: - batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit( - team_uid=team.team_uid, enterprise_user_id=x.enterprise_user_id) for x in users_to_remove]) - if has_remove_all_roles: - batch.modify_role_teams(to_remove=[enterprise_management.RoleTeamEdit( - role_id=x, team_uid=team.team_uid) for x in existing_roles]) - elif roles_to_remove: - batch.modify_role_teams(to_remove=[enterprise_management.RoleTeamEdit( - role_id=x.role_id, team_uid=team.team_uid) for x in roles_to_remove]) - - for queued_team in queued_team_list: - existing_users = {x.enterprise_user_id for x in context.enterprise_data.queued_team_users.get_links_by_subject(queued_team.team_uid)} - if users_to_add: - users_to_add = [x for x in users_to_add if x.enterprise_user_id not in existing_users] - if users_to_add: - batch.modify_team_users(to_add=[enterprise_management.TeamUserEdit( - team_uid=queued_team.team_uid, enterprise_user_id=x.enterprise_user_id) for x in users_to_add]) - if has_remove_all_users: - batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit( - team_uid=queued_team.team_uid, enterprise_user_id=x) for x in existing_users]) - elif users_to_remove: - batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit( - team_uid=queued_team.team_uid, enterprise_user_id=x.enterprise_user_id) for x in users_to_remove]) - + _queue_team_membership_changes( + batch, context, self, kwargs, + [_TeamMembershipTarget.from_team(x) for x in team_list], + [_TeamMembershipTarget.from_queued_team(x) for x in queued_team_list], + ) batch.apply() diff --git a/keepercli-package/src/keepercli/commands/enterprise_user.py b/keepercli-package/src/keepercli/commands/enterprise_user.py index 1c77a374..16fbd01b 100644 --- a/keepercli-package/src/keepercli/commands/enterprise_user.py +++ b/keepercli-package/src/keepercli/commands/enterprise_user.py @@ -624,10 +624,8 @@ def execute(self, context: KeeperParams, **kwargs) -> None: hide_shared_folders: Optional[bool] = None hsf = kwargs.get('hide_shared_folders') if isinstance(hsf, str) and len(hsf) > 0: - hide_shared_folders = True if hsf == 'on' else False - user_type: Optional[int] = None - if isinstance(hide_shared_folders, bool): - user_type = 0 if hide_shared_folders else 2 + hide_shared_folders = hsf == 'on' + user_type = enterprise_management.team_user_type_from_hide_shared_folders(hide_shared_folders) for user in users: for team_uid in teams_to_add: team_membership_to_add.append(enterprise_management.TeamUserEdit( diff --git a/keepercli-package/src/keepercli/commands/enterprise_utils.py b/keepercli-package/src/keepercli/commands/enterprise_utils.py index 5050fe6a..6226038d 100644 --- a/keepercli-package/src/keepercli/commands/enterprise_utils.py +++ b/keepercli-package/src/keepercli/commands/enterprise_utils.py @@ -189,6 +189,10 @@ def resolve_single_role(e_data: enterprise_types.IEnterpriseData, role_name: Any raise base.CommandError(f'Role name \"{role_name}\" does not exist') return role + @staticmethod + def is_admin_role(e_data: enterprise_types.IEnterpriseData, role_id: int) -> bool: + return any(e_data.managed_nodes.get_links_by_subject(role_id)) + @staticmethod def enforcement_value_from_file(filepath: str) -> str: diff --git a/keepersdk-package/src/keepersdk/enterprise/batch_management.py b/keepersdk-package/src/keepersdk/enterprise/batch_management.py index 81d95a68..2a05fd49 100644 --- a/keepersdk-package/src/keepersdk/enterprise/batch_management.py +++ b/keepersdk-package/src/keepersdk/enterprise/batch_management.py @@ -911,34 +911,47 @@ def _to_team_user_requests(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, A if not t and not qt: raise Exception('team not found') if u.status == 'active' and t: - team_keys: Optional[keeper_auth.UserKeys] - if self._team_keys and team_user.team_uid in self._team_keys: - team_keys = self._team_keys[team_user.team_uid] + is_member = enterprise_data.team_users.get_link( + team_user.team_uid, team_user.enterprise_user_id) is not None + user_type = team_user.user_type if team_user.user_type is not None else 0 + if is_member: + if team_user.user_type is None: + raise Exception('user is already a team member') + rq['command'] = 'team_enterprise_user_update' + rq['user_type'] = team_user.user_type else: - team_keys = self.loader.keeper_auth.get_team_keys(team_user.team_uid) - if not team_keys: - raise Exception('team key is not loaded') - if not team_keys.aes: - team = enterprise_data.teams.get_entity(team_user.team_uid) - if team: - team_keys.aes = team.encrypted_team_key - user_keys = self.loader.keeper_auth.get_user_keys(u.username) - if not user_keys: - raise Exception('user key is not loaded') - rq['command'] = 'team_enterprise_user_add' - rq['user_type'] = 0 - if self.loader.keeper_auth.auth_context.forbid_rsa: - if user_keys.ec: - ec_public_key = crypto.load_ec_public_key(user_keys.ec) - team_key = crypto.encrypt_ec(team_keys.aes, ec_public_key) - rq['team_key'] = utils.base64_url_encode(team_key) - rq['team_key_type'] = 'encrypted_by_public_key_ecc' - else: - if user_keys.rsa: - rsa_public_key = crypto.load_rsa_public_key(user_keys.rsa) - team_key = crypto.encrypt_rsa(team_keys.aes, rsa_public_key) - rq['team_key'] = utils.base64_url_encode(team_key) - rq['team_key_type'] = 'encrypted_by_public_key' + team_keys: Optional[keeper_auth.UserKeys] + if self._team_keys and team_user.team_uid in self._team_keys: + team_keys = self._team_keys[team_user.team_uid] + else: + team_keys = self.loader.keeper_auth.get_team_keys(team_user.team_uid) + if not team_keys: + raise Exception('team key is not loaded') + if not team_keys.aes: + team = enterprise_data.teams.get_entity(team_user.team_uid) + if team: + team_keys.aes = team.encrypted_team_key + user_keys = self.loader.keeper_auth.get_user_keys(u.username) + if not user_keys: + raise Exception('user key is not loaded') + rq['command'] = 'team_enterprise_user_add' + rq['user_type'] = user_type + if self.loader.keeper_auth.auth_context.forbid_rsa: + if user_keys.ec: + ec_public_key = crypto.load_ec_public_key(user_keys.ec) + team_key = crypto.encrypt_ec(team_keys.aes, ec_public_key) + rq['team_key'] = utils.base64_url_encode(team_key) + rq['team_key_type'] = 'encrypted_by_public_key_ecc' + else: + raise Exception('user does not have EC key') + else: + if user_keys.rsa: + rsa_public_key = crypto.load_rsa_public_key(user_keys.rsa) + team_key = crypto.encrypt_rsa(team_keys.aes, rsa_public_key) + rq['team_key'] = utils.base64_url_encode(team_key) + rq['team_key_type'] = 'encrypted_by_public_key' + else: + raise Exception('user does not have RSA key') else: rq['command'] = 'team_queue_user' elif action == EntityAction.Remove: @@ -1026,7 +1039,14 @@ def _to_role_team_requests(self) -> Tuple[List[enterprise_pb2.RoleTeam], List[en add_rt_requests: List[enterprise_pb2.RoleTeam] = [] remove_rt_requests: List[enterprise_pb2.RoleTeam] = [] if self._role_teams: + enterprise_data = self.loader.enterprise_data for action, role_team in self._role_teams.values(): + if action == EntityAction.Add: + is_admin_role = any(enterprise_data.managed_nodes.get_links_by_subject(role_team.role_id)) + if is_admin_role: + self.logger.warning( + 'Teams cannot be assigned to roles with administrative permissions.') + continue rqs = add_rt_requests if action == EntityAction.Add else remove_rt_requests rt = enterprise_pb2.RoleTeam() rt.role_id = role_team.role_id diff --git a/keepersdk-package/src/keepersdk/enterprise/enterprise_management.py b/keepersdk-package/src/keepersdk/enterprise/enterprise_management.py index 6cff5db2..1df0e10e 100644 --- a/keepersdk-package/src/keepersdk/enterprise/enterprise_management.py +++ b/keepersdk-package/src/keepersdk/enterprise/enterprise_management.py @@ -8,6 +8,18 @@ from . import enterprise_types +def team_user_type_from_hide_shared_folders(hide_shared_folders: Optional[bool]) -> Optional[int]: + if hide_shared_folders is None: + return None + return 2 if hide_shared_folders else 1 + + +def team_user_type_from_hsf_flag(hsf_flag: Optional[str]) -> Optional[int]: + if not hsf_flag: + return None + return 2 if hsf_flag == 'on' else 1 + + @attrs.define(kw_only=True) class NodeEdit: _node_id: int diff --git a/keepersdk-package/src/keepersdk/enterprise/enterprise_user_management.py b/keepersdk-package/src/keepersdk/enterprise/enterprise_user_management.py index 4757cf9b..bf8cb4cb 100644 --- a/keepersdk-package/src/keepersdk/enterprise/enterprise_user_management.py +++ b/keepersdk-package/src/keepersdk/enterprise/enterprise_user_management.py @@ -574,9 +574,9 @@ def add_users_to_teams( enterprise_data = loader.enterprise_data - user_type: Optional[int] = None - if isinstance(hide_shared_folders, bool): - user_type = 0 if hide_shared_folders else 2 + user_type = enterprise_management.team_user_type_from_hide_shared_folders( + hide_shared_folders if isinstance(hide_shared_folders, bool) else None + ) batch = batch_management.BatchManagement(loader=loader, logger=logger)