harbor_batch/delete_users.py
seahi e3401ec380 Initial commit: Harbor batch management tools
Add scripts for Harbor user and project management:
- register.py: Bulk user registration from Excel
- delete_users.py: Complete user deletion with resource cleanup
- delete_projects.py: Targeted deletion of student projects (stu01-stu49)
- CLAUDE.md: Project documentation and API guidance
- requirements.txt: Python dependencies

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-17 22:06:31 +08:00

258 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import requests
import getpass
import sys
from urllib3.exceptions import InsecureRequestWarning
from requests.auth import HTTPBasicAuth
# 禁用不安全HTTPS警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
HARBOR_URL = "https://harbor.seahi.me"
API_BASE = f"{HARBOR_URL}/api/v2.0"
def get_admin_credentials():
"""获取Harbor管理员凭据"""
print("请输入Harbor管理员账号信息")
username = input("用户名: ")
password = getpass.getpass("密码: ")
return username, password
def test_api_connection(auth):
"""测试API连接和认证"""
try:
response = requests.get(f"{API_BASE}/users", auth=auth, verify=False, timeout=10)
if response.status_code == 401:
print("认证失败:用户名或密码错误")
return False
elif response.status_code == 200:
print("API连接成功")
return True
else:
print(f"API连接失败: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"API连接出错: {str(e)}")
return False
def get_all_users(auth):
"""获取所有用户列表"""
try:
response = requests.get(f"{API_BASE}/users", auth=auth, verify=False, timeout=10)
if response.status_code == 200:
return response.json()
else:
print(f"获取用户列表失败: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"获取用户列表出错: {str(e)}")
return None
def get_user_by_username(auth, username):
"""根据用户名获取用户信息"""
users = get_all_users(auth)
if users is None:
return None
for user in users:
if user.get("username") == username:
return user
return None
def get_user_projects(auth, user_id):
"""获取用户拥有的所有项目"""
try:
response = requests.get(f"{API_BASE}/projects", auth=auth, verify=False, timeout=10)
if response.status_code == 200:
projects = response.json()
# 过滤出属于该用户的项目
user_projects = [p for p in projects if p.get("owner_id") == user_id]
return user_projects
else:
print(f"获取项目列表失败: {response.status_code} - {response.text}")
return []
except Exception as e:
print(f"获取项目列表出错: {str(e)}")
return []
def get_project_repositories(auth, project_name):
"""获取项目下的所有仓库"""
try:
response = requests.get(f"{API_BASE}/projects/{project_name}/repositories",
auth=auth, verify=False, timeout=10)
if response.status_code == 200:
return response.json()
else:
print(f"获取项目 {project_name} 仓库列表失败: {response.status_code}")
return []
except Exception as e:
print(f"获取项目 {project_name} 仓库列表出错: {str(e)}")
return []
def delete_repository(auth, project_name, repo_name):
"""删除仓库"""
try:
# 仓库名称可能包含项目名,需要提取仓库名部分
if '/' in repo_name:
repo_name = repo_name.split('/')[-1]
response = requests.delete(f"{API_BASE}/projects/{project_name}/repositories/{repo_name}",
auth=auth, verify=False, timeout=10)
if response.status_code == 200:
print(f" ✓ 删除仓库: {project_name}/{repo_name}")
return True
else:
print(f" ✗ 删除仓库失败 {project_name}/{repo_name}: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 删除仓库出错 {project_name}/{repo_name}: {str(e)}")
return False
def delete_project(auth, project_name):
"""删除项目"""
try:
response = requests.delete(f"{API_BASE}/projects/{project_name}",
auth=auth, verify=False, timeout=10)
if response.status_code == 200:
print(f" ✓ 删除项目: {project_name}")
return True
else:
print(f" ✗ 删除项目失败 {project_name}: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f" ✗ 删除项目出错 {project_name}: {str(e)}")
return False
def delete_user_resources(auth, user):
"""删除用户的所有资源(项目和仓库)"""
user_id = user.get("user_id")
username = user.get("username")
print(f"正在清理用户 {username} 的资源...")
# 获取用户拥有的项目
projects = get_user_projects(auth, user_id)
if not projects:
print(f" 用户 {username} 没有拥有的项目")
return True
print(f" 发现 {len(projects)} 个项目需要删除")
success = True
for project in projects:
project_name = project.get("name")
print(f" 处理项目: {project_name}")
# 获取项目下的仓库
repositories = get_project_repositories(auth, project_name)
# 删除所有仓库
for repo in repositories:
repo_name = repo.get("name")
if not delete_repository(auth, project_name, repo_name):
success = False
# 删除项目
if not delete_project(auth, project_name):
success = False
return success
def delete_harbor_user(auth, username):
"""删除Harbor用户及其所有资源"""
# 获取用户信息
user = get_user_by_username(auth, username)
if user is None:
print(f"! 用户不存在: {username}")
return False
user_id = user.get("user_id")
print(f"开始删除用户: {username} (ID: {user_id})")
# 1. 先删除用户的所有资源
if not delete_user_resources(auth, user):
print(f"! 清理用户 {username} 的资源时遇到问题,但继续尝试删除用户")
# 2. 删除用户
try:
response = requests.delete(f"{API_BASE}/users/{user_id}",
auth=auth, verify=False, timeout=10)
if response.status_code == 200:
print(f"✓ 成功删除用户: {username}")
return True
elif response.status_code == 404:
print(f"! 用户不存在: {username}")
return False
elif response.status_code == 412:
print(f"! 无法删除用户 {username}:用户仍有关联资源")
return False
else:
print(f"✗ 删除用户失败 {username}: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"✗ 删除用户出错 {username}: {str(e)}")
return False
def delete_users_from_excel():
"""从Excel文件中读取用户并删除"""
try:
# 读取Excel文件跳过第一行标题行
df = pd.read_excel('users.xlsx', skiprows=1)
# 获取管理员凭据
admin_username, admin_password = get_admin_credentials()
# 创建认证对象
auth = HTTPBasicAuth(admin_username, admin_password)
# 测试API连接
if not test_api_connection(auth):
return
print("\n开始删除用户...")
success_count = 0
total_count = 0
failed_users = []
# 遍历Excel中的用户并删除
for _, row in df.iterrows():
username = str(row.iloc[0]).strip()
# 跳过空行
if username == 'nan' or not username:
continue
total_count += 1
print(f"\n[{total_count}] 处理用户: {username}")
if delete_harbor_user(auth, username):
success_count += 1
else:
failed_users.append(username)
print(f"\n" + "="*50)
print(f"删除完成!")
print(f"成功删除: {success_count}/{total_count} 个用户")
if failed_users:
print(f"删除失败的用户: {', '.join(failed_users)}")
except FileNotFoundError:
print("错误:找不到 users.xlsx 文件,请确保文件在当前目录中")
except Exception as e:
print(f"发生错误: {e}")
def main():
print("Harbor 用户删除工具")
print("=" * 30)
print("注意:此工具会删除用户及其所有项目和仓库!")
confirm = input("\n确认要继续吗?(y/N): ").strip().lower()
if confirm != 'y':
print("已取消操作")
return
delete_users_from_excel()
if __name__ == "__main__":
main()