import pandas as pd import requests import getpass import sys from urllib3.exceptions import InsecureRequestWarning from requests.auth import HTTPBasicAuth # 禁用不安全HTTPS警告 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) HARBOR_URL = "https://harbor.seahi.me" API_BASE = f"{HARBOR_URL}/api/v2.0" def get_admin_credentials(): """获取Harbor管理员凭据""" print("请输入Harbor管理员账号信息:") username = input("用户名: ") password = getpass.getpass("密码: ") return username, password def test_api_connection(auth): """测试API连接和认证""" try: response = requests.get(f"{API_BASE}/users", auth=auth, verify=False, timeout=10) if response.status_code == 401: print("认证失败:用户名或密码错误") return False elif response.status_code == 200: print("API连接成功") return True else: print(f"API连接失败: {response.status_code} - {response.text}") return False except Exception as e: print(f"API连接出错: {str(e)}") return False def get_all_users(auth): """获取所有用户列表""" try: response = requests.get(f"{API_BASE}/users", auth=auth, verify=False, timeout=10) if response.status_code == 200: return response.json() else: print(f"获取用户列表失败: {response.status_code} - {response.text}") return None except Exception as e: print(f"获取用户列表出错: {str(e)}") return None def get_user_by_username(auth, username): """根据用户名获取用户信息""" users = get_all_users(auth) if users is None: return None for user in users: if user.get("username") == username: return user return None def get_user_projects(auth, user_id): """获取用户拥有的所有项目""" try: response = requests.get(f"{API_BASE}/projects", auth=auth, verify=False, timeout=10) if response.status_code == 200: projects = response.json() # 过滤出属于该用户的项目 user_projects = [p for p in projects if p.get("owner_id") == user_id] return user_projects else: print(f"获取项目列表失败: {response.status_code} - {response.text}") return [] except Exception as e: print(f"获取项目列表出错: {str(e)}") return [] def get_project_repositories(auth, project_name): """获取项目下的所有仓库""" try: response = requests.get(f"{API_BASE}/projects/{project_name}/repositories", auth=auth, verify=False, timeout=10) if response.status_code == 200: return response.json() else: print(f"获取项目 {project_name} 仓库列表失败: {response.status_code}") return [] except Exception as e: print(f"获取项目 {project_name} 仓库列表出错: {str(e)}") return [] def delete_repository(auth, project_name, repo_name): """删除仓库""" try: # 仓库名称可能包含项目名,需要提取仓库名部分 if '/' in repo_name: repo_name = repo_name.split('/')[-1] response = requests.delete(f"{API_BASE}/projects/{project_name}/repositories/{repo_name}", auth=auth, verify=False, timeout=10) if response.status_code == 200: print(f" ✓ 删除仓库: {project_name}/{repo_name}") return True else: print(f" ✗ 删除仓库失败 {project_name}/{repo_name}: {response.status_code}") return False except Exception as e: print(f" ✗ 删除仓库出错 {project_name}/{repo_name}: {str(e)}") return False def delete_project(auth, project_name): """删除项目""" try: response = requests.delete(f"{API_BASE}/projects/{project_name}", auth=auth, verify=False, timeout=10) if response.status_code == 200: print(f" ✓ 删除项目: {project_name}") return True else: print(f" ✗ 删除项目失败 {project_name}: {response.status_code} - {response.text}") return False except Exception as e: print(f" ✗ 删除项目出错 {project_name}: {str(e)}") return False def delete_user_resources(auth, user): """删除用户的所有资源(项目和仓库)""" user_id = user.get("user_id") username = user.get("username") print(f"正在清理用户 {username} 的资源...") # 获取用户拥有的项目 projects = get_user_projects(auth, user_id) if not projects: print(f" 用户 {username} 没有拥有的项目") return True print(f" 发现 {len(projects)} 个项目需要删除") success = True for project in projects: project_name = project.get("name") print(f" 处理项目: {project_name}") # 获取项目下的仓库 repositories = get_project_repositories(auth, project_name) # 删除所有仓库 for repo in repositories: repo_name = repo.get("name") if not delete_repository(auth, project_name, repo_name): success = False # 删除项目 if not delete_project(auth, project_name): success = False return success def delete_harbor_user(auth, username): """删除Harbor用户及其所有资源""" # 获取用户信息 user = get_user_by_username(auth, username) if user is None: print(f"! 用户不存在: {username}") return False user_id = user.get("user_id") print(f"开始删除用户: {username} (ID: {user_id})") # 1. 先删除用户的所有资源 if not delete_user_resources(auth, user): print(f"! 清理用户 {username} 的资源时遇到问题,但继续尝试删除用户") # 2. 删除用户 try: response = requests.delete(f"{API_BASE}/users/{user_id}", auth=auth, verify=False, timeout=10) if response.status_code == 200: print(f"✓ 成功删除用户: {username}") return True elif response.status_code == 404: print(f"! 用户不存在: {username}") return False elif response.status_code == 412: print(f"! 无法删除用户 {username}:用户仍有关联资源") return False else: print(f"✗ 删除用户失败 {username}: {response.status_code} - {response.text}") return False except Exception as e: print(f"✗ 删除用户出错 {username}: {str(e)}") return False def delete_users_from_excel(): """从Excel文件中读取用户并删除""" try: # 读取Excel文件,跳过第一行(标题行) df = pd.read_excel('users.xlsx', skiprows=1) # 获取管理员凭据 admin_username, admin_password = get_admin_credentials() # 创建认证对象 auth = HTTPBasicAuth(admin_username, admin_password) # 测试API连接 if not test_api_connection(auth): return print("\n开始删除用户...") success_count = 0 total_count = 0 failed_users = [] # 遍历Excel中的用户并删除 for _, row in df.iterrows(): username = str(row.iloc[0]).strip() # 跳过空行 if username == 'nan' or not username: continue total_count += 1 print(f"\n[{total_count}] 处理用户: {username}") if delete_harbor_user(auth, username): success_count += 1 else: failed_users.append(username) print(f"\n" + "="*50) print(f"删除完成!") print(f"成功删除: {success_count}/{total_count} 个用户") if failed_users: print(f"删除失败的用户: {', '.join(failed_users)}") except FileNotFoundError: print("错误:找不到 users.xlsx 文件,请确保文件在当前目录中") except Exception as e: print(f"发生错误: {e}") def main(): print("Harbor 用户删除工具") print("=" * 30) print("注意:此工具会删除用户及其所有项目和仓库!") confirm = input("\n确认要继续吗?(y/N): ").strip().lower() if confirm != 'y': print("已取消操作") return delete_users_from_excel() if __name__ == "__main__": main()