上图~
import os
import time
import threading
import requests
import subprocess
import importlib
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
class PackageInstallerApp:
def __init__(self, root):
self.root = root
self.root.title("Design By Tim")
self.root.geometry("800x600")
# 默认设置
self.install_dir = r"C:\Users\AAA\Python_Package"
self.default_packages = ["numpy", "opencv-python", "pyttsx3"]
self.mirrors = [
"https://pypi.tuna.tsinghua.edu.cn/simple/",
"https://mirrors.aliyun.com/pypi/simple/",
"https://pypi.mirrors.ustc.edu.cn/simple/",
"https://mirrors.bfsu.edu.cn/pypi/web/simple/",
"https://mirrors.cloud.tencent.com/pypi/simple/",
"https://mirrors.nju.edu.cn/pypi/web/simple/",
"https://mirrors.hit.edu.cn/pypi/web/simple/",
"https://mirror.sjtu.edu.cn/pypi/web/simple/",
"https://pypi.doubanio.com/simple/",
"https://mirrors.zju.edu.cn/pypi/web/simple/",
"https://mirrors.pku.edu.cn/pypi/simple/",
"https://mirrors.yun-idc.com/pypi/simple/",
"https://mirrors.neusoft.edu.cn/pypi/web/simple/",
"https://mirrors.xjtu.edu.cn/pypi/web/simple/",
"https://mirrors.huaweicloud.com/repository/pypi/simple/"
]
# UI控件引用
self.start_button = None
self.cancel_button = None
# 创建UI
self.create_widgets()
def create_widgets(self):
# 主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 包列表输入
ttk.Label(main_frame, text="要下载的包(逗号分隔):").grid(row=0, column=0, sticky=tk.W)
self.pkg_entry = ttk.Entry(main_frame, width=50)
self.pkg_entry.grid(row=0, column=1, sticky=tk.EW)
self.pkg_entry.insert(0, ",".join(self.default_packages))
# 安装目录
ttk.Label(main_frame, text="安装目录:").grid(row=1, column=0, sticky=tk.W)
self.dir_entry = ttk.Entry(main_frame, width=50)
self.dir_entry.grid(row=1, column=1, sticky=tk.EW)
self.dir_entry.insert(0, self.install_dir)
# 按钮框架
btn_frame = ttk.Frame(main_frame)
btn_frame.grid(row=2, column=0, columnspan=2, pady=10)
# 开始按钮
self.start_button = ttk.Button(btn_frame, text="开始下载安装", command=self.start_process)
self.start_button.pack(side=tk.LEFT, padx=5)
# 取消按钮
self.cancel_button = ttk.Button(btn_frame, text="取消", command=self.root.quit)
self.cancel_button.pack(side=tk.LEFT, padx=5)
# 进度条
self.progress = ttk.Progressbar(main_frame, orient=tk.HORIZONTAL, length=500, mode='determinate')
self.progress.grid(row=3, column=0, columnspan=2, pady=10)
# 状态标签
self.status_label = ttk.Label(main_frame, text="准备就绪")
self.status_label.grid(row=4, column=0, columnspan=2)
# 日志输出
ttk.Label(main_frame, text="进度日志:").grid(row=5, column=0, sticky=tk.W)
self.log_text = scrolledtext.ScrolledText(main_frame, width=80, height=20, state='normal')
self.log_text.grid(row=6, column=0, columnspan=2, sticky=tk.NSEW)
# 配置网格权重
main_frame.columnconfigure(1, weight=1)
main_frame.rowconfigure(6, weight=1)
def log_message(self, message):
self.log_text.config(state='normal')
self.log_text.insert(tk.END, message + "\n")
self.log_text.config(state='disabled')
self.log_text.see(tk.END)
self.root.update()
def update_progress(self, value):
self.progress['value'] = value
self.root.update()
def update_status(self, message):
self.status_label.config(text=message)
self.root.update()
def set_ui_state(self, enabled):
state = tk.NORMAL if enabled else tk.DISABLED
self.start_button.config(state=state)
self.cancel_button.config(state=state)
self.root.update()
def start_process(self):
# 获取用户输入
packages = [pkg.strip() for pkg in self.pkg_entry.get().split(",") if pkg.strip()]
self.install_dir = self.dir_entry.get().strip()
if not packages:
messagebox.showerror("错误", "请输入至少一个包名")
return
os.makedirs(self.install_dir, exist_ok=True)
# 禁用UI
self.set_ui_state(False)
self.log_text.config(state='normal')
self.log_text.delete(1.0, tk.END)
self.log_text.config(state='disabled')
self.update_progress(0)
# 开始下载安装流程
threading.Thread(target=self.download_and_install, args=(packages,), daemon=True).start()
def download_and_install(self, packages):
overall_success = True # 跟踪整体成功状态
try:
# 1. 测试镜像源速度
self.update_status("正在测试镜像源速度...")
self.log_message("="*50)
self.log_message("开始测试镜像源速度")
fastest_mirrors = self.find_fastest_mirrors()
# 2. 下载包
self.update_status("开始下载包...")
self.log_message("="*50)
self.log_message("开始下载包")
downloaded_files = []
total_packages = len(packages)
download_success = True
for i, package in enumerate(packages):
self.update_progress((i/total_packages)*50)
mirror = fastest_mirrors[i % len(fastest_mirrors)]
success, files = self.download_package(mirror, package)
if not success:
download_success = False
overall_success = False
self.log_message(f"⚠️ 包 {package} 下载失败,将继续尝试其他包")
else:
downloaded_files.extend(files)
if not download_success:
self.log_message("警告: 部分包下载失败")
# 3. 安装包
self.update_status("开始安装包...")
self.log_message("="*50)
self.log_message("开始安装包")
install_success = True
for i, file in enumerate(downloaded_files):
self.update_progress(50 + (i/len(downloaded_files))*40)
if not self.install_package(file):
install_success = False
overall_success = False
self.log_message(f"⚠️ 文件 {file} 安装失败")
if not install_success:
self.log_message("警告: 部分包安装失败")
# 4. 验证安装
self.update_status("验证安装...")
self.log_message("="*50)
self.log_message("开始验证安装")
verify_success = True
for i, package in enumerate(packages):
self.update_progress(90 + (i/len(packages))*10)
if not self.test_installation(package):
verify_success = False
overall_success = False
self.log_message(f"⚠️ 包 {package} 验证失败")
if not verify_success:
self.log_message("警告: 部分包验证失败")
self.update_progress(100)
# 显示最终结果
if overall_success:
self.update_status("所有操作成功完成!")
self.log_message("="*50)
self.log_message("✅ 所有包下载安装完成并验证成功!")
messagebox.showinfo("完成", "所有包下载安装完成并验证成功!")
else:
self.update_status("操作完成,但有错误发生")
self.log_message("="*50)
self.log_message("⚠️ 操作完成,但部分步骤失败,请检查日志")
messagebox.showwarning("完成但有错误",
"操作完成,但部分步骤失败,请检查日志了解详情")
except Exception as e:
self.log_message(f"❌ 发生严重错误: {str(e)}")
self.update_status("操作因错误中止")
messagebox.showerror("错误", f"处理过程中发生严重错误: {str(e)}")
overall_success = False
finally:
# 重新启用UI
self.set_ui_state(True)
def find_fastest_mirrors(self):
"""找出最快的3个镜像源"""
with ThreadPoolExecutor(max_workers=15) as executor:
futures = [executor.submit(self.test_mirror_speed, mirror) for mirror in self.mirrors]
results = []
for future in as_completed(futures):
speed, mirror = future.result()
if speed != float('inf'):
results.append((speed, mirror))
self.log_message(f"测试镜像源 {mirror} 速度: {speed:.2f}秒")
results.sort()
fastest_mirrors = [mirror for speed, mirror in results[:3]]
self.log_message(f"最快的3个镜像源: {', '.join(fastest_mirrors)}")
return fastest_mirrors
def test_mirror_speed(self, mirror):
"""测试镜像源速度"""
try:
start = time.time()
response = requests.get(urljoin(mirror, "simple/"), timeout=5)
if response.status_code == 200:
return time.time() - start, mirror
except:
pass
return float('inf'), mirror
def download_package(self, mirror, package):
"""下载单个包"""
try:
self.log_message(f"正在从 {mirror} 下载 {package}...")
cmd = [
"python", "-m", "pip", "download",
package,
"-d", self.install_dir,
"-i", mirror,
"--trusted-host", mirror.split('//')[1].split('/')[0]
]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = process.communicate()
if process.returncode == 0:
# 获取下载的文件列表
files = [f for f in os.listdir(self.install_dir) if f.startswith(package.replace("-", "_"))]
self.log_message(f"✅ 成功下载 {package}: {', '.join(files)}")
return True, files
else:
self.log_message(f"❌ 下载 {package} 失败: {stderr.strip()}")
return False, []
except Exception as e:
self.log_message(f"❌ 下载 {package} 时发生错误: {str(e)}")
return False, []
def install_package(self, filename):
"""安装单个包"""
try:
filepath = os.path.join(self.install_dir, filename)
self.log_message(f"正在安装 {filename}...")
cmd = ["python", "-m", "pip", "install", filepath]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = process.communicate()
if process.returncode == 0:
self.log_message(f"✅ 成功安装 {filename}")
return True
else:
self.log_message(f"❌ 安装 {filename} 失败: {stderr.strip()}")
return False
except Exception as e:
self.log_message(f"❌ 安装 {filename} 时发生错误: {str(e)}")
return False
def test_installation(self, package):
"""测试包是否安装成功"""
try:
# 转换包名(如opencv-python -> opencv_python)
import_name = package.replace("-", "_")
self.log_message(f"正在验证 {package} 是否可以导入...")
module = importlib.import_module(import_name)
version = getattr(module, "__version__", "未知版本")
self.log_message(f"✅ 验证成功: {package} (版本: {version})")
return True
except Exception as e:
self.log_message(f"❌ 验证失败: 无法导入 {package} - {str(e)}")
return False
if __name__ == "__main__":
root = tk.Tk()
app = PackageInstallerApp(root)
root.mainloop()