import requests import getpass import re from urllib3.exceptions import InsecureRequestWarning from requests.auth import HTTPBasicAuth # 禁用不安全HTTPS警告 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) HARBOR_URL = "https://harbor.seahi.me" API_BASE = f"{HARBOR_URL}/api/v2.0" def get_admin_credentials(): """获取Harbor管理员凭据""" print("请输入Harbor管理员账号信息:") username = input("用户名: ") password = getpass.getpass("密码: ") return username, password def test_api_connection(auth): """测试API连接和认证""" try: response = requests.get(f"{API_BASE}/projects", auth=auth, verify=False, timeout=10) if response.status_code == 401: print("认证失败:用户名或密码错误") return False elif response.status_code == 200: print("API连接成功") return True else: print(f"API连接失败: {response.status_code} - {response.text}") return False except Exception as e: print(f"API连接出错: {str(e)}") return False def get_all_projects(auth): """获取所有项目列表(支持分页)""" all_projects = [] page = 1 page_size = 100 # 每页获取100个项目 while True: try: params = { 'page': page, 'page_size': page_size } response = requests.get(f"{API_BASE}/projects", params=params, auth=auth, verify=False, timeout=10) if response.status_code == 200: projects = response.json() if not projects: # 没有更多数据 break all_projects.extend(projects) print(f" 已获取第 {page} 页,共 {len(projects)} 个项目") page += 1 else: print(f"获取项目列表失败: {response.status_code} - {response.text}") break except Exception as e: print(f"获取项目列表出错: {str(e)}") break print(f"总共获取 {len(all_projects)} 个项目") return all_projects def get_student_projects(auth): """获取所有学生项目 (stu01~stu49)""" all_projects = get_all_projects(auth) student_projects = [] # 匹配 stu01 到 stu49 开头的项目名 pattern = re.compile(r'^stu(0[1-9]|[1-4][0-9]).*') for project in all_projects: project_name = project.get("name", "") if pattern.match(project_name): student_projects.append(project) return student_projects def get_project_repositories(auth, project_name): """获取项目下的所有仓库(支持分页)""" all_repositories = [] page = 1 page_size = 100 # 每页获取100个仓库 while True: try: params = { 'page': page, 'page_size': page_size } response = requests.get(f"{API_BASE}/projects/{project_name}/repositories", params=params, auth=auth, verify=False, timeout=10) if response.status_code == 200: repositories = response.json() if not repositories: # 没有更多数据 break all_repositories.extend(repositories) page += 1 else: print(f" 获取项目 {project_name} 仓库列表失败: {response.status_code}") break except Exception as e: print(f" 获取项目 {project_name} 仓库列表出错: {str(e)}") break return all_repositories def delete_repository(auth, project_name, repo_name): """删除仓库""" try: # 仓库名格式为 "project_name/repository_name",需要提取仓库名部分 if '/' in repo_name: actual_repo_name = repo_name.split('/', 1)[1] # 取第一个/后面的部分 else: actual_repo_name = repo_name response = requests.delete(f"{API_BASE}/projects/{project_name}/repositories/{actual_repo_name}", auth=auth, verify=False, timeout=30) if response.status_code == 200: print(f" ✓ 删除仓库: {repo_name}") return True else: print(f" ✗ 删除仓库失败 {repo_name}: {response.status_code}") if response.text: print(f" 错误详情: {response.text}") return False except Exception as e: print(f" ✗ 删除仓库出错 {repo_name}: {str(e)}") return False def delete_project(auth, project_name): """删除项目""" try: response = requests.delete(f"{API_BASE}/projects/{project_name}", auth=auth, verify=False, timeout=30) if response.status_code == 200: print(f" ✓ 删除项目: {project_name}") return True else: print(f" ✗ 删除项目失败 {project_name}: {response.status_code}") if response.text: print(f" 错误详情: {response.text}") return False except Exception as e: print(f" ✗ 删除项目出错 {project_name}: {str(e)}") return False def delete_student_projects_and_repositories(auth): """删除所有学生项目和仓库""" print("正在查找学生项目 (stu01~stu49)...") student_projects = get_student_projects(auth) if not student_projects: print("未找到任何学生项目") return print(f"找到 {len(student_projects)} 个学生项目:") for project in student_projects: print(f" - {project.get('name')} (ID: {project.get('project_id')})") # 确认删除 print(f"\n警告:即将删除 {len(student_projects)} 个项目及其所有仓库!") confirm = input("确认要继续吗?(输入 'DELETE' 确认): ").strip() if confirm != 'DELETE': print("已取消操作") return success_count = 0 total_count = len(student_projects) for i, project in enumerate(student_projects, 1): project_name = project.get("name") print(f"\n[{i}/{total_count}] 处理项目: {project_name}") # 获取项目下的仓库 repositories = get_project_repositories(auth, project_name) if repositories: print(f" 发现 {len(repositories)} 个仓库,开始删除...") repo_success = 0 for repo in repositories: repo_name = repo.get("name") if delete_repository(auth, project_name, repo_name): repo_success += 1 print(f" 仓库删除完成: {repo_success}/{len(repositories)}") else: print(f" 项目 {project_name} 中没有仓库") # 删除项目 if delete_project(auth, project_name): success_count += 1 print(f"\n" + "="*60) print(f"删除完成!") print(f"成功删除项目: {success_count}/{total_count}") def list_student_projects(auth): """列出所有学生项目(预览模式)""" print("正在查找学生项目 (stu01~stu49)...") student_projects = get_student_projects(auth) if not student_projects: print("未找到任何学生项目") return print(f"\n找到 {len(student_projects)} 个学生项目:") print("-" * 80) for project in student_projects: project_name = project.get("name") project_id = project.get("project_id") owner_name = project.get("owner_name", "N/A") creation_time = project.get("creation_time", "N/A") print(f"项目: {project_name}") print(f" ID: {project_id}") print(f" 所有者: {owner_name}") print(f" 创建时间: {creation_time}") # 获取仓库信息 repositories = get_project_repositories(auth, project_name) if repositories: print(f" 仓库 ({len(repositories)}个):") for repo in repositories: print(f" - {repo.get('name')} (拉取次数: {repo.get('pull_count', 0)})") else: print(f" 仓库: 无") print() def main(): print("Harbor 学生项目删除工具") print("=" * 40) print("此工具专门删除 stu01~stu49 开头的项目和仓库") # 获取管理员凭据 admin_username, admin_password = get_admin_credentials() auth = HTTPBasicAuth(admin_username, admin_password) # 测试API连接 if not test_api_connection(auth): return while True: print("\n请选择操作:") print("1. 预览学生项目(不删除)") print("2. 删除所有学生项目和仓库") print("3. 退出") choice = input("请输入选项 (1-3): ").strip() if choice == '1': list_student_projects(auth) elif choice == '2': delete_student_projects_and_repositories(auth) break elif choice == '3': print("退出程序") break else: print("无效选项,请重试") if __name__ == "__main__": main()