Initial commit: Harbor batch management tools

Add scripts for Harbor user and project management:
- register.py: Bulk user registration from Excel
- delete_users.py: Complete user deletion with resource cleanup
- delete_projects.py: Targeted deletion of student projects (stu01-stu49)
- CLAUDE.md: Project documentation and API guidance
- requirements.txt: Python dependencies

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
seahi 2025-06-17 22:06:31 +08:00
commit e3401ec380
8 changed files with 810 additions and 0 deletions

View File

@ -0,0 +1,10 @@
{
"permissions": {
"allow": [
"Bash(git init:*)",
"Bash(git remote add:*)",
"Bash(git add:*)"
],
"deny": []
}
}

54
.gitignore vendored Normal file
View File

@ -0,0 +1,54 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
venv/
env/
.env
.venv
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Excel files (sensitive data)
*.xlsx
*.xls
# Logs
*.log
# Temporary files
*.tmp
*.temp

110
CLAUDE.md Normal file
View File

@ -0,0 +1,110 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
This is a Harbor container registry user management tool written in Python. It provides functionality to:
- Register/create Harbor users from Excel data (`register.py`)
- Delete Harbor users individually or in batches (`delete_users.py`, `delete_users_fixed.py`)
The application interacts with Harbor's REST API v2.0 at `https://harbor.seahi.me`.
## Setup and Environment
### Dependencies Installation
```bash
pip install -r requirements.txt
```
### Virtual Environment Setup
```bash
# Activate existing virtual environment
source venv/bin/activate # On Unix/macOS
# or
venv\Scripts\activate # On Windows
```
## Core Architecture
### Data Flow
1. **Input**: Excel file (`users.xlsx`) with user data (username, realname, password, email)
2. **Authentication**: HTTP Basic Auth with Harbor admin credentials
3. **API Operations**: REST API calls to Harbor's `/api/v2.0/users` endpoint
4. **Error Handling**: Comprehensive exception handling with Chinese language user feedback
### Key Components
- **Authentication**: `get_admin_credentials()` - Interactive credential collection with getpass
- **User Operations**:
- `create_harbor_user()` - User creation via POST (register.py:20)
- `delete_harbor_user()` - User deletion via DELETE (requires user ID lookup)
- `get_user_id_by_username()` - User ID resolution from username
- **Excel Processing**: `pd.read_excel()` with `skiprows=1` to handle header row
- **Connection Testing**: `test_api_connection()` validates API connectivity and auth (delete_users_fixed.py:20)
- **Interactive Modes**: `delete_users_fixed.py` provides menu-driven user interaction
### File Structure
- `register.py` - User registration from Excel
- `delete_users.py` - Basic user deletion tool
- `delete_users_fixed.py` - Enhanced deletion tool with better error handling and interactive modes
- `users.xlsx` - Excel data source (not in version control)
- `requirements.txt` - Python dependencies
## Common Commands
### Environment Setup
```bash
# Activate virtual environment (required before running scripts)
source venv/bin/activate
# Install dependencies
pip install -r requirements.txt
```
### Run Scripts
```bash
# Register users from Excel file
python register.py
# Delete users (recommended enhanced version)
python delete_users_fixed.py
# Delete users (basic version)
python delete_users.py
```
### Development
```bash
# Check Python syntax
python -m py_compile register.py
python -m py_compile delete_users_fixed.py
# Run with verbose output for debugging
python -v register.py
```
## API Configuration
- Harbor URL: `https://harbor.seahi.me`
- API Version: v2.0
- Authentication: HTTP Basic Auth
- SSL Verification: Disabled (`verify=False`)
- HTTPS Warnings: Suppressed via urllib3
## Excel File Format
Expected structure for `users.xlsx`:
- Row 1: Headers (skipped)
- Column 0: Username
- Column 1: Real name
- Column 2: Password
- Column 4: Email address
## Error Handling Patterns
- HTTP 409: User already exists (treated as success for registration)
- HTTP 401: Authentication failure
- HTTP 404: User not found
- HTTP 412: User has dependencies (cannot delete)
- Connection timeouts and network errors are handled gracefully

5
api.md Normal file
View File

@ -0,0 +1,5 @@
## 用户相关
Base URL: harbor.seahi.me/api/v2.0
GET /users

271
delete_projects.py Normal file
View File

@ -0,0 +1,271 @@
import requests
import getpass
import re
from urllib3.exceptions import InsecureRequestWarning
from requests.auth import HTTPBasicAuth
# 禁用不安全HTTPS警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
HARBOR_URL = "https://harbor.seahi.me"
API_BASE = f"{HARBOR_URL}/api/v2.0"
def get_admin_credentials():
"""获取Harbor管理员凭据"""
print("请输入Harbor管理员账号信息")
username = input("用户名: ")
password = getpass.getpass("密码: ")
return username, password
def test_api_connection(auth):
"""测试API连接和认证"""
try:
response = requests.get(f"{API_BASE}/projects", auth=auth, verify=False, timeout=10)
if response.status_code == 401:
print("认证失败:用户名或密码错误")
return False
elif response.status_code == 200:
print("API连接成功")
return True
else:
print(f"API连接失败: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"API连接出错: {str(e)}")
return False
def get_all_projects(auth):
"""获取所有项目列表(支持分页)"""
all_projects = []
page = 1
page_size = 100 # 每页获取100个项目
while True:
try:
params = {
'page': page,
'page_size': page_size
}
response = requests.get(f"{API_BASE}/projects", params=params,
auth=auth, verify=False, timeout=10)
if response.status_code == 200:
projects = response.json()
if not projects: # 没有更多数据
break
all_projects.extend(projects)
print(f" 已获取第 {page} 页,共 {len(projects)} 个项目")
page += 1
else:
print(f"获取项目列表失败: {response.status_code} - {response.text}")
break
except Exception as e:
print(f"获取项目列表出错: {str(e)}")
break
print(f"总共获取 {len(all_projects)} 个项目")
return all_projects
def get_student_projects(auth):
"""获取所有学生项目 (stu01~stu49)"""
all_projects = get_all_projects(auth)
student_projects = []
# 匹配 stu01 到 stu49 开头的项目名
pattern = re.compile(r'^stu(0[1-9]|[1-4][0-9]).*')
for project in all_projects:
project_name = project.get("name", "")
if pattern.match(project_name):
student_projects.append(project)
return student_projects
def get_project_repositories(auth, project_name):
"""获取项目下的所有仓库(支持分页)"""
all_repositories = []
page = 1
page_size = 100 # 每页获取100个仓库
while True:
try:
params = {
'page': page,
'page_size': page_size
}
response = requests.get(f"{API_BASE}/projects/{project_name}/repositories",
params=params, auth=auth, verify=False, timeout=10)
if response.status_code == 200:
repositories = response.json()
if not repositories: # 没有更多数据
break
all_repositories.extend(repositories)
page += 1
else:
print(f" 获取项目 {project_name} 仓库列表失败: {response.status_code}")
break
except Exception as e:
print(f" 获取项目 {project_name} 仓库列表出错: {str(e)}")
break
return all_repositories
def delete_repository(auth, project_name, repo_name):
"""删除仓库"""
try:
# 仓库名格式为 "project_name/repository_name",需要提取仓库名部分
if '/' in repo_name:
actual_repo_name = repo_name.split('/', 1)[1] # 取第一个/后面的部分
else:
actual_repo_name = repo_name
response = requests.delete(f"{API_BASE}/projects/{project_name}/repositories/{actual_repo_name}",
auth=auth, verify=False, timeout=30)
if response.status_code == 200:
print(f" ✓ 删除仓库: {repo_name}")
return True
else:
print(f" ✗ 删除仓库失败 {repo_name}: {response.status_code}")
if response.text:
print(f" 错误详情: {response.text}")
return False
except Exception as e:
print(f" ✗ 删除仓库出错 {repo_name}: {str(e)}")
return False
def delete_project(auth, project_name):
"""删除项目"""
try:
response = requests.delete(f"{API_BASE}/projects/{project_name}",
auth=auth, verify=False, timeout=30)
if response.status_code == 200:
print(f" ✓ 删除项目: {project_name}")
return True
else:
print(f" ✗ 删除项目失败 {project_name}: {response.status_code}")
if response.text:
print(f" 错误详情: {response.text}")
return False
except Exception as e:
print(f" ✗ 删除项目出错 {project_name}: {str(e)}")
return False
def delete_student_projects_and_repositories(auth):
"""删除所有学生项目和仓库"""
print("正在查找学生项目 (stu01~stu49)...")
student_projects = get_student_projects(auth)
if not student_projects:
print("未找到任何学生项目")
return
print(f"找到 {len(student_projects)} 个学生项目:")
for project in student_projects:
print(f" - {project.get('name')} (ID: {project.get('project_id')})")
# 确认删除
print(f"\n警告:即将删除 {len(student_projects)} 个项目及其所有仓库!")
confirm = input("确认要继续吗?(输入 'DELETE' 确认): ").strip()
if confirm != 'DELETE':
print("已取消操作")
return
success_count = 0
total_count = len(student_projects)
for i, project in enumerate(student_projects, 1):
project_name = project.get("name")
print(f"\n[{i}/{total_count}] 处理项目: {project_name}")
# 获取项目下的仓库
repositories = get_project_repositories(auth, project_name)
if repositories:
print(f" 发现 {len(repositories)} 个仓库,开始删除...")
repo_success = 0
for repo in repositories:
repo_name = repo.get("name")
if delete_repository(auth, project_name, repo_name):
repo_success += 1
print(f" 仓库删除完成: {repo_success}/{len(repositories)}")
else:
print(f" 项目 {project_name} 中没有仓库")
# 删除项目
if delete_project(auth, project_name):
success_count += 1
print(f"\n" + "="*60)
print(f"删除完成!")
print(f"成功删除项目: {success_count}/{total_count}")
def list_student_projects(auth):
"""列出所有学生项目(预览模式)"""
print("正在查找学生项目 (stu01~stu49)...")
student_projects = get_student_projects(auth)
if not student_projects:
print("未找到任何学生项目")
return
print(f"\n找到 {len(student_projects)} 个学生项目:")
print("-" * 80)
for project in student_projects:
project_name = project.get("name")
project_id = project.get("project_id")
owner_name = project.get("owner_name", "N/A")
creation_time = project.get("creation_time", "N/A")
print(f"项目: {project_name}")
print(f" ID: {project_id}")
print(f" 所有者: {owner_name}")
print(f" 创建时间: {creation_time}")
# 获取仓库信息
repositories = get_project_repositories(auth, project_name)
if repositories:
print(f" 仓库 ({len(repositories)}个):")
for repo in repositories:
print(f" - {repo.get('name')} (拉取次数: {repo.get('pull_count', 0)})")
else:
print(f" 仓库: 无")
print()
def main():
print("Harbor 学生项目删除工具")
print("=" * 40)
print("此工具专门删除 stu01~stu49 开头的项目和仓库")
# 获取管理员凭据
admin_username, admin_password = get_admin_credentials()
auth = HTTPBasicAuth(admin_username, admin_password)
# 测试API连接
if not test_api_connection(auth):
return
while True:
print("\n请选择操作:")
print("1. 预览学生项目(不删除)")
print("2. 删除所有学生项目和仓库")
print("3. 退出")
choice = input("请输入选项 (1-3): ").strip()
if choice == '1':
list_student_projects(auth)
elif choice == '2':
delete_student_projects_and_repositories(auth)
break
elif choice == '3':
print("退出程序")
break
else:
print("无效选项,请重试")
if __name__ == "__main__":
main()

258
delete_users.py Normal file
View File

@ -0,0 +1,258 @@
import pandas as pd
import requests
import getpass
import sys
from urllib3.exceptions import InsecureRequestWarning
from requests.auth import HTTPBasicAuth
# 禁用不安全HTTPS警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
HARBOR_URL = "https://harbor.seahi.me"
API_BASE = f"{HARBOR_URL}/api/v2.0"
def get_admin_credentials():
"""获取Harbor管理员凭据"""
print("请输入Harbor管理员账号信息")
username = input("用户名: ")
password = getpass.getpass("密码: ")
return username, password
def test_api_connection(auth):
"""测试API连接和认证"""
try:
response = requests.get(f"{API_BASE}/users", auth=auth, verify=False, timeout=10)
if response.status_code == 401:
print("认证失败:用户名或密码错误")
return False
elif response.status_code == 200:
print("API连接成功")
return True
else:
print(f"API连接失败: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"API连接出错: {str(e)}")
return False
def get_all_users(auth):
"""获取所有用户列表"""
try:
response = requests.get(f"{API_BASE}/users", auth=auth, verify=False, timeout=10)
if response.status_code == 200:
return response.json()
else:
print(f"获取用户列表失败: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"获取用户列表出错: {str(e)}")
return None
def get_user_by_username(auth, username):
"""根据用户名获取用户信息"""
users = get_all_users(auth)
if users is None:
return None
for user in users:
if user.get("username") == username:
return user
return None
def get_user_projects(auth, user_id):
"""获取用户拥有的所有项目"""
try:
response = requests.get(f"{API_BASE}/projects", auth=auth, verify=False, timeout=10)
if response.status_code == 200:
projects = response.json()
# 过滤出属于该用户的项目
user_projects = [p for p in projects if p.get("owner_id") == user_id]
return user_projects
else:
print(f"获取项目列表失败: {response.status_code} - {response.text}")
return []
except Exception as e:
print(f"获取项目列表出错: {str(e)}")
return []
def get_project_repositories(auth, project_name):
"""获取项目下的所有仓库"""
try:
response = requests.get(f"{API_BASE}/projects/{project_name}/repositories",
auth=auth, verify=False, timeout=10)
if response.status_code == 200:
return response.json()
else:
print(f"获取项目 {project_name} 仓库列表失败: {response.status_code}")
return []
except Exception as e:
print(f"获取项目 {project_name} 仓库列表出错: {str(e)}")
return []
def delete_repository(auth, project_name, repo_name):
"""删除仓库"""
try:
# 仓库名称可能包含项目名,需要提取仓库名部分
if '/' in repo_name:
repo_name = repo_name.split('/')[-1]
response = requests.delete(f"{API_BASE}/projects/{project_name}/repositories/{repo_name}",
auth=auth, verify=False, timeout=10)
if response.status_code == 200:
print(f" ✓ 删除仓库: {project_name}/{repo_name}")
return True
else:
print(f" ✗ 删除仓库失败 {project_name}/{repo_name}: {response.status_code}")
return False
except Exception as e:
print(f" ✗ 删除仓库出错 {project_name}/{repo_name}: {str(e)}")
return False
def delete_project(auth, project_name):
"""删除项目"""
try:
response = requests.delete(f"{API_BASE}/projects/{project_name}",
auth=auth, verify=False, timeout=10)
if response.status_code == 200:
print(f" ✓ 删除项目: {project_name}")
return True
else:
print(f" ✗ 删除项目失败 {project_name}: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f" ✗ 删除项目出错 {project_name}: {str(e)}")
return False
def delete_user_resources(auth, user):
"""删除用户的所有资源(项目和仓库)"""
user_id = user.get("user_id")
username = user.get("username")
print(f"正在清理用户 {username} 的资源...")
# 获取用户拥有的项目
projects = get_user_projects(auth, user_id)
if not projects:
print(f" 用户 {username} 没有拥有的项目")
return True
print(f" 发现 {len(projects)} 个项目需要删除")
success = True
for project in projects:
project_name = project.get("name")
print(f" 处理项目: {project_name}")
# 获取项目下的仓库
repositories = get_project_repositories(auth, project_name)
# 删除所有仓库
for repo in repositories:
repo_name = repo.get("name")
if not delete_repository(auth, project_name, repo_name):
success = False
# 删除项目
if not delete_project(auth, project_name):
success = False
return success
def delete_harbor_user(auth, username):
"""删除Harbor用户及其所有资源"""
# 获取用户信息
user = get_user_by_username(auth, username)
if user is None:
print(f"! 用户不存在: {username}")
return False
user_id = user.get("user_id")
print(f"开始删除用户: {username} (ID: {user_id})")
# 1. 先删除用户的所有资源
if not delete_user_resources(auth, user):
print(f"! 清理用户 {username} 的资源时遇到问题,但继续尝试删除用户")
# 2. 删除用户
try:
response = requests.delete(f"{API_BASE}/users/{user_id}",
auth=auth, verify=False, timeout=10)
if response.status_code == 200:
print(f"✓ 成功删除用户: {username}")
return True
elif response.status_code == 404:
print(f"! 用户不存在: {username}")
return False
elif response.status_code == 412:
print(f"! 无法删除用户 {username}:用户仍有关联资源")
return False
else:
print(f"✗ 删除用户失败 {username}: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"✗ 删除用户出错 {username}: {str(e)}")
return False
def delete_users_from_excel():
"""从Excel文件中读取用户并删除"""
try:
# 读取Excel文件跳过第一行标题行
df = pd.read_excel('users.xlsx', skiprows=1)
# 获取管理员凭据
admin_username, admin_password = get_admin_credentials()
# 创建认证对象
auth = HTTPBasicAuth(admin_username, admin_password)
# 测试API连接
if not test_api_connection(auth):
return
print("\n开始删除用户...")
success_count = 0
total_count = 0
failed_users = []
# 遍历Excel中的用户并删除
for _, row in df.iterrows():
username = str(row.iloc[0]).strip()
# 跳过空行
if username == 'nan' or not username:
continue
total_count += 1
print(f"\n[{total_count}] 处理用户: {username}")
if delete_harbor_user(auth, username):
success_count += 1
else:
failed_users.append(username)
print(f"\n" + "="*50)
print(f"删除完成!")
print(f"成功删除: {success_count}/{total_count} 个用户")
if failed_users:
print(f"删除失败的用户: {', '.join(failed_users)}")
except FileNotFoundError:
print("错误:找不到 users.xlsx 文件,请确保文件在当前目录中")
except Exception as e:
print(f"发生错误: {e}")
def main():
print("Harbor 用户删除工具")
print("=" * 30)
print("注意:此工具会删除用户及其所有项目和仓库!")
confirm = input("\n确认要继续吗?(y/N): ").strip().lower()
if confirm != 'y':
print("已取消操作")
return
delete_users_from_excel()
if __name__ == "__main__":
main()

98
register.py Normal file
View File

@ -0,0 +1,98 @@
import pandas as pd
import json
import requests
import getpass
import sys
from urllib3.exceptions import InsecureRequestWarning
from requests.auth import HTTPBasicAuth
# 禁用不安全HTTPS警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
HARBOR_URL = "https://harbor.seahi.me"
def get_admin_credentials():
print("请输入Harbor管理员账号信息")
username = input("用户名: ")
password = getpass.getpass("密码: ")
return username, password
def create_harbor_user(auth, user_data):
"""创建Harbor用户"""
url = f"{HARBOR_URL}/api/v2.0/users"
headers = {
'Content-Type': 'application/json'
}
payload = {
"username": user_data["username"],
"email": user_data["email"],
"realname": user_data["realname"],
"password": user_data["password"],
}
try:
response = requests.post(url, json=payload, headers=headers, auth=auth, verify=False)
if response.status_code == 201:
print(f"✓ 成功创建用户: {user_data['username']}")
return True
elif response.status_code == 409:
print(f"! 用户已存在: {user_data['username']}")
return True
else:
print(f"✗ 创建用户失败 {user_data['username']}: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"✗ 创建用户出错 {user_data['username']}: {str(e)}")
return False
def convert_excel_to_json():
try:
# 读取Excel文件跳过第一行标题行
df = pd.read_excel('users.xlsx', skiprows=1)
# 获取管理员凭据
admin_username, admin_password = get_admin_credentials()
# 创建认证对象
auth = HTTPBasicAuth(admin_username, admin_password)
# 测试认证
test_response = requests.get(f"{HARBOR_URL}/api/v2.0/users", auth=auth, verify=False)
if test_response.status_code == 401:
print("认证失败:用户名或密码错误")
return
print("\n开始创建用户...")
success_count = 0
total_count = 0
# 将DataFrame转换为JSON格式并创建用户
for _, row in df.iterrows():
# 确保所有值都转换为字符串,并处理 NaN 值
username = str(row.iloc[0]).strip()
realname = str(row.iloc[1]).strip()
password = str(row.iloc[2]).strip()
email = str(row.iloc[4]).strip()
# 跳过空行
if username == 'nan' or realname == 'nan':
continue
user_dict = {
"username": username,
"realname": realname,
"password": password,
"email": email
}
total_count += 1
if create_harbor_user(auth, user_dict):
success_count += 1
print(f"\n完成!成功创建 {success_count}/{total_count} 个用户")
except Exception as e:
print(f"发生错误: {e}")
if __name__ == "__main__":
convert_excel_to_json()

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
pandas>=1.3.0
requests>=2.26.0
urllib3>=1.26.7
openpyxl>=3.0.9