Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 218 additions & 70 deletions keepercli-package/src/keepercli/commands/enterprise_team.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,190 @@
logger = api.get_logger()


def _add_team_membership_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument('-au', '--add-user', action='append', help='add user to team')
parser.add_argument('-ru', '--remove-user', action='append', help='remove user from team. @all')
parser.add_argument('-ar', '--add-role', action='append', help='add role to team')
parser.add_argument('-rr', '--remove-role', action='append', help='remove role from team. @all')
parser.add_argument(
'-hsf', '--hide-shared-folders', dest='hide_shared_folders', action='store',
choices=['on', 'off'], help='User does not see shared folders. --add-user only',
)


def _add_edit_team_membership_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument('-au', '--add-user', action='append', help='add user to team')
parser.add_argument('-ru', '--remove-user', action='append', help='remove user from team')
parser.add_argument(
'-hsf', '--hide-shared-folders', dest='hide_shared_folders', action='store',
choices=['on', 'off'], help='User does not see shared folders. --add-user only',
)


def _validate_add_edit_membership(kwargs: Dict[str, Any], *, has_queued_teams: bool = False) -> None:
if kwargs.get('add_role') or kwargs.get('remove_role'):
raise base.CommandError(
'Role membership is not supported on enterprise-team add/edit. '
'Use enterprise-team membership with -ar/--add-role or -rr/--remove-role.')
remove_users = kwargs.get('remove_user')
if isinstance(remove_users, list) and any(x == '@all' for x in remove_users):
raise base.CommandError(
'@all is not supported on enterprise-team add/edit. '
'Use enterprise-team membership with -ru @all.')
if has_queued_teams:
raise base.CommandError(
'Membership changes are not supported when adding queued teams. '
'Use enterprise-team membership.')


def _has_membership_changes(kwargs: Dict[str, Any]) -> bool:
return any((
kwargs.get('add_user'),
kwargs.get('remove_user'),
kwargs.get('add_role'),
kwargs.get('remove_role'),
))


class _TeamMembershipTarget:
__slots__ = ('team_uid', 'name', 'queued')

def __init__(self, team_uid: str, name: str, *, queued: bool = False) -> None:
self.team_uid = team_uid
self.name = name
self.queued = queued

@classmethod
def from_team(cls, team: enterprise_types.Team) -> '_TeamMembershipTarget':
return cls(team.team_uid, team.name or '')

@classmethod
def from_queued_team(cls, team: enterprise_types.QueuedTeam) -> '_TeamMembershipTarget':
return cls(team.team_uid, team.name or '', queued=True)

@classmethod
def from_team_edit(cls, team: enterprise_management.TeamEdit) -> '_TeamMembershipTarget':
return cls(team.team_uid, team.name or '')


def _queue_team_membership_changes(
batch: batch_management.BatchManagement,
context: KeeperParams,
logger: enterprise_management.IEnterpriseManagementLogger,
kwargs: Dict[str, Any],
active_teams: List[_TeamMembershipTarget],
queued_teams: Optional[List[_TeamMembershipTarget]] = None,
*,
users_only: bool = False,
) -> None:
users_to_add: Optional[List[enterprise_types.User]] = None
roles_to_add: Optional[List[enterprise_types.Role]] = None
users_to_remove: Optional[List[enterprise_types.User]] = None
roles_to_remove: Optional[List[enterprise_types.Role]] = None
has_remove_all_users = False
has_remove_all_roles = False

add_users = kwargs.get('add_user')
if isinstance(add_users, list):
users_to_add = enterprise_utils.UserUtils.resolve_existing_users(
context.enterprise_data, add_users)
add_roles = kwargs.get('add_role')
if not users_only and isinstance(add_roles, list):
resolved_roles = enterprise_utils.RoleUtils.resolve_existing_roles(
context.enterprise_data, add_roles)
roles_to_add = []
for role in resolved_roles:
if enterprise_utils.RoleUtils.is_admin_role(context.enterprise_data, role.role_id):
logger.warning(
'Teams cannot be assigned to roles with administrative permissions.')
else:
roles_to_add.append(role)
if not roles_to_add:
roles_to_add = None
remove_users = kwargs.get('remove_user')
if isinstance(remove_users, list):
has_remove_all_users = not users_only and any(x == '@all' for x in remove_users)
if not has_remove_all_users:
users_to_remove = enterprise_utils.UserUtils.resolve_existing_users(
context.enterprise_data, remove_users)
remove_roles = kwargs.get('remove_role')
if not users_only and isinstance(remove_roles, list):
has_remove_all_roles = any(x == '@all' for x in remove_roles)
if not has_remove_all_roles:
roles_to_remove = enterprise_utils.RoleUtils.resolve_existing_roles(
context.enterprise_data, remove_roles)

user_type = enterprise_management.team_user_type_from_hsf_flag(kwargs.get('hide_shared_folders'))

for team in active_teams:
existing_users = {
x.enterprise_user_id
for x in context.enterprise_data.team_users.get_links_by_subject(team.team_uid)
}
existing_roles = {
x.role_id
for x in context.enterprise_data.role_teams.get_links_by_object(team.team_uid)
}
if users_to_add:
for user in users_to_add:
if user.enterprise_user_id in existing_users:
if user_type is None:
logger.warning(
'User \"%s\" is already a member of team \"%s\"',
user.username, team.name)
continue
batch.modify_team_users(to_add=[enterprise_management.TeamUserEdit(
team_uid=team.team_uid,
enterprise_user_id=user.enterprise_user_id,
user_type=user_type)])
if roles_to_add:
team_roles_to_add = [x for x in roles_to_add if x.role_id not in existing_roles]
if team_roles_to_add:
batch.modify_role_teams(to_add=[enterprise_management.RoleTeamEdit(
role_id=x.role_id, team_uid=team.team_uid) for x in team_roles_to_add])
if has_remove_all_users:
batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit(
team_uid=team.team_uid, enterprise_user_id=x) for x in existing_users])
elif users_to_remove:
batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit(
team_uid=team.team_uid, enterprise_user_id=x.enterprise_user_id)
for x in users_to_remove])
if has_remove_all_roles:
batch.modify_role_teams(to_remove=[enterprise_management.RoleTeamEdit(
role_id=x, team_uid=team.team_uid) for x in existing_roles])
elif roles_to_remove:
batch.modify_role_teams(to_remove=[enterprise_management.RoleTeamEdit(
role_id=x.role_id, team_uid=team.team_uid) for x in roles_to_remove])

if users_only:
return

for team in queued_teams or []:
existing_users = {
x.enterprise_user_id
for x in context.enterprise_data.queued_team_users.get_links_by_subject(team.team_uid)
}
if users_to_add:
for user in users_to_add:
if user.enterprise_user_id in existing_users:
if user_type is None:
logger.warning(
'User \"%s\" is already queued for team \"%s\"',
user.username, team.name)
continue
batch.modify_team_users(to_add=[enterprise_management.TeamUserEdit(
team_uid=team.team_uid,
enterprise_user_id=user.enterprise_user_id,
user_type=user_type)])
if has_remove_all_users:
batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit(
team_uid=team.team_uid, enterprise_user_id=x) for x in existing_users])
elif users_to_remove:
batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit(
team_uid=team.team_uid, enterprise_user_id=x.enterprise_user_id)
for x in users_to_remove])


class EnterpriseTeamCommand(base.GroupCommand):
def __init__(self):
super().__init__('Manage an enterprise team(s)')
Expand Down Expand Up @@ -143,6 +327,7 @@ def __init__(self):
action='store', help='disable record re-shares')
parser.add_argument('--restrict-view', dest='restrict_view', choices=['on', 'off'],
action='store', help='disable view/copy passwords')
_add_edit_team_membership_arguments(parser)
parser.add_argument('team', type=str, nargs='+', help='Team Name or Queued Team UID. Can be repeated.')
super().__init__(parser)
self.logger = api.get_logger()
Expand Down Expand Up @@ -203,11 +388,13 @@ def execute(self, context: KeeperParams, **kwargs) -> None:
restrict_view = r_view == 'on'

batch = batch_management.BatchManagement(loader=context.enterprise_loader, logger=self)
new_team_edits: List[enterprise_management.TeamEdit] = []
if team_names:
teams_to_add = [enterprise_management.TeamEdit(
team_uid=utils.generate_uid(), name=x, node_id=parent_id,
restrict_edit=restrict_edit, restrict_share=restrict_share, restrict_view=restrict_view)
for x in team_names.values()]
new_team_edits = teams_to_add
batch.modify_teams(to_add=teams_to_add)

if queued_teams:
Expand All @@ -217,6 +404,14 @@ def execute(self, context: KeeperParams, **kwargs) -> None:
for x in queued_teams]
batch.modify_teams(to_add=teams_to_add)

if _has_membership_changes(kwargs):
_validate_add_edit_membership(kwargs, has_queued_teams=bool(queued_teams))
_queue_team_membership_changes(
batch, context, self, kwargs,
[_TeamMembershipTarget.from_team_edit(x) for x in new_team_edits],
users_only=True,
)

batch.apply()

class EnterpriseTeamEditCommand(base.ArgparseCommand, enterprise_management.IEnterpriseManagementLogger):
Expand All @@ -232,6 +427,7 @@ def __init__(self):
action='store', help='disable record re-shares')
parser.add_argument('--restrict-view', dest='restrict_view', choices=['on', 'off'],
action='store', help='disable view/copy passwords')
_add_edit_team_membership_arguments(parser)
parser.add_argument('team', type=str, nargs='+', help='Team Name or UID. Can be repeated.')
super().__init__(parser)
self.logger = api.get_logger()
Expand Down Expand Up @@ -281,6 +477,13 @@ def execute(self, context: KeeperParams, **kwargs) -> None:

batch = batch_management.BatchManagement(loader=context.enterprise_loader, logger=self)
batch.modify_teams(to_update=teams_to_edit)
if _has_membership_changes(kwargs):
_validate_add_edit_membership(kwargs)
_queue_team_membership_changes(
batch, context, self, kwargs,
[_TeamMembershipTarget.from_team(x) for x in team_list],
users_only=True,
)
batch.apply()


Expand Down Expand Up @@ -309,10 +512,7 @@ def execute(self, context: KeeperParams, **kwargs) -> None:
class EnterpriseTeamMembershipCommand(base.ArgparseCommand, enterprise_management.IEnterpriseManagementLogger):
def __init__(self):
parser = argparse.ArgumentParser(prog='enterprise-team membership', description='Manage enterprise team membership.')
parser.add_argument('-au', '--add-user', action='append', help='add user to team')
parser.add_argument('-ru', '--remove-user', action='append', help='remove user from team. @all')
parser.add_argument('-ar', '--add-role', action='append', help='add user to team')
parser.add_argument('-rr', '--remove-role', action='append', help='remove user from team, @all')
_add_team_membership_arguments(parser)
parser.add_argument('team', type=str, nargs='+', help='Team Name or UID. Can be repeated.')
super().__init__(parser)
self.logger = api.get_logger()
Expand All @@ -323,81 +523,29 @@ def warning(self, message: str) -> None:
def execute(self, context: KeeperParams, **kwargs) -> None:
base.require_enterprise_admin(context)

team_list, missing_names = enterprise_utils.TeamUtils.resolve_existing_teams(context.enterprise_data, kwargs.get('team'))
if not _has_membership_changes(kwargs):
raise base.CommandError(
'No membership changes specified. Use -au/--add-user, -ru/--remove-user, '
'-ar/--add-role, or -rr/--remove-role.')

team_list, missing_names = enterprise_utils.TeamUtils.resolve_existing_teams(
context.enterprise_data, kwargs.get('team'))
queued_team_list: List[enterprise_types.QueuedTeam]
if missing_names:
queued_team_list, missing_names = enterprise_utils.TeamUtils.resolve_queued_teams(context.enterprise_data, missing_names)
queued_team_list, missing_names = enterprise_utils.TeamUtils.resolve_queued_teams(
context.enterprise_data, missing_names)
else:
queued_team_list = []
if isinstance(missing_names, list) and len(missing_names) > 0:
mn = ', '.join((str(x) for x in missing_names))
raise base.CommandError(f'Team name(s) \"{mn}\" could not be resolved')

users_to_add: Optional[List[enterprise_types.User]] = None
roles_to_add: Optional[List[enterprise_types.Role]] = None
users_to_remove: Optional[List[enterprise_types.User]] = None
roles_to_remove: Optional[List[enterprise_types.Role]] = None
has_remove_all_users: bool = False
has_remove_all_roles: bool = False

add_users = kwargs.get('add_user')
if isinstance(add_users, list):
users_to_add = enterprise_utils.UserUtils.resolve_existing_users(context.enterprise_data, add_users)
add_roles = kwargs.get('add_role')
if isinstance(add_roles, list):
roles_to_add = enterprise_utils.RoleUtils.resolve_existing_roles(context.enterprise_data, add_roles)
remove_users = kwargs.get('remove_user')
if isinstance(remove_users, list):
has_remove_all_users = any((True for x in remove_users if x == '@all'))
if not has_remove_all_users:
users_to_remove = enterprise_utils.UserUtils.resolve_existing_users(context.enterprise_data, remove_users)
remove_roles = kwargs.get('remove_role')
if isinstance(remove_roles, list):
has_remove_all_roles = any((True for x in remove_roles if x == '@all'))
if not has_remove_all_roles:
roles_to_remove = enterprise_utils.RoleUtils.resolve_existing_roles(context.enterprise_data, remove_roles)

batch = batch_management.BatchManagement(loader=context.enterprise_loader, logger=self)
for team in team_list:
existing_users = {x.enterprise_user_id for x in context.enterprise_data.team_users.get_links_by_subject(team.team_uid)}
existing_roles = {x.role_id for x in context.enterprise_data.role_teams.get_links_by_object(team.team_uid)}
if users_to_add:
users_to_add = [x for x in users_to_add if x.enterprise_user_id not in existing_users]
if users_to_add:
batch.modify_team_users(to_add=[enterprise_management.TeamUserEdit(
team_uid=team.team_uid, enterprise_user_id=x.enterprise_user_id) for x in users_to_add])
if roles_to_add:
roles_to_add = [x for x in roles_to_add if x.role_id not in existing_roles]
if roles_to_add:
batch.modify_role_teams(to_add=[enterprise_management.RoleTeamEdit(
role_id=x.role_id, team_uid=team.team_uid) for x in roles_to_add])
if has_remove_all_users:
batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit(
team_uid=team.team_uid, enterprise_user_id=x) for x in existing_users])
elif users_to_remove:
batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit(
team_uid=team.team_uid, enterprise_user_id=x.enterprise_user_id) for x in users_to_remove])
if has_remove_all_roles:
batch.modify_role_teams(to_remove=[enterprise_management.RoleTeamEdit(
role_id=x, team_uid=team.team_uid) for x in existing_roles])
elif roles_to_remove:
batch.modify_role_teams(to_remove=[enterprise_management.RoleTeamEdit(
role_id=x.role_id, team_uid=team.team_uid) for x in roles_to_remove])

for queued_team in queued_team_list:
existing_users = {x.enterprise_user_id for x in context.enterprise_data.queued_team_users.get_links_by_subject(queued_team.team_uid)}
if users_to_add:
users_to_add = [x for x in users_to_add if x.enterprise_user_id not in existing_users]
if users_to_add:
batch.modify_team_users(to_add=[enterprise_management.TeamUserEdit(
team_uid=queued_team.team_uid, enterprise_user_id=x.enterprise_user_id) for x in users_to_add])
if has_remove_all_users:
batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit(
team_uid=queued_team.team_uid, enterprise_user_id=x) for x in existing_users])
elif users_to_remove:
batch.modify_team_users(to_remove=[enterprise_management.TeamUserEdit(
team_uid=queued_team.team_uid, enterprise_user_id=x.enterprise_user_id) for x in users_to_remove])

_queue_team_membership_changes(
batch, context, self, kwargs,
[_TeamMembershipTarget.from_team(x) for x in team_list],
[_TeamMembershipTarget.from_queued_team(x) for x in queued_team_list],
)
batch.apply()


Expand Down
6 changes: 2 additions & 4 deletions keepercli-package/src/keepercli/commands/enterprise_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,8 @@ def execute(self, context: KeeperParams, **kwargs) -> None:
hide_shared_folders: Optional[bool] = None
hsf = kwargs.get('hide_shared_folders')
if isinstance(hsf, str) and len(hsf) > 0:
hide_shared_folders = True if hsf == 'on' else False
user_type: Optional[int] = None
if isinstance(hide_shared_folders, bool):
user_type = 0 if hide_shared_folders else 2
hide_shared_folders = hsf == 'on'
user_type = enterprise_management.team_user_type_from_hide_shared_folders(hide_shared_folders)
for user in users:
for team_uid in teams_to_add:
team_membership_to_add.append(enterprise_management.TeamUserEdit(
Expand Down
4 changes: 4 additions & 0 deletions keepercli-package/src/keepercli/commands/enterprise_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ def resolve_single_role(e_data: enterprise_types.IEnterpriseData, role_name: Any
raise base.CommandError(f'Role name \"{role_name}\" does not exist')
return role

@staticmethod
def is_admin_role(e_data: enterprise_types.IEnterpriseData, role_id: int) -> bool:
return any(e_data.managed_nodes.get_links_by_subject(role_id))


@staticmethod
def enforcement_value_from_file(filepath: str) -> str:
Expand Down
Loading