Python中线程和进程详解:从入门到高并发实战
目录一、进程与线程基础概念1.1 什么是进程1.2 什么是线程1.3 进程 vs 线程核心区别二、Python中的线程threading模块2.1 创建线程的两种方式2.2 线程同步锁Lock2.3 线程间通信queue.Queue2.4 线程池concurrent.futures.ThreadPoolExecutor2.5 GIL全局解释器锁Python线程的紧箍咒三、Python中的进程multiprocessing模块3.1 创建进程3.2 进程间通信IPC3.3 进程同步3.4 进程池Pool3.5 共享内存Value和Array四、线程 vs 进程如何选择4.1 任务类型决定一切4.2 实战案例对比五、高级主题与最佳实践5.1 协程轻量级并发5.2 多进程多线程混合架构5.3 性能对比:5.4 常见陷阱与避坑指南5.5 小结与注意事项六、总结一、进程与线程基础概念1.1 什么是进程进程Process是操作系统分配资源的最小单位是一个正在执行的程序实例。每个进程拥有独立的内存空间、文件描述符、数据栈等系统资源。打个比方进程就像一家独立的公司有自己的办公室、设备、员工与其他公司互不干扰。python import os print(f当前进程ID: {os.getpid()})1.2 什么是线程线程Thread是操作系统调度执行的最小单位是进程内的一个执行流。一个进程至少包含一个线程主线程多个线程共享进程的资源内存、文件等。线程就像公司里的员工共享公司的办公室、打印机但每个人负责不同的任务。python import threading print(f当前线程名: {threading.current_thread().name})1.3 进程 vs 线程核心区别对比维度进程线程资源占用独立内存开销大共享内存开销小创建销毁速度慢快比进程快10~100倍数据同步复杂需要IPC简单共享变量但需加锁健壮性一个进程崩溃不影响其他一个线程崩溃可能导致整个进程崩溃利用多核天然支持不同进程在不同核心Python中受GIL限制二、Python中的线程threading模块2.1 创建线程的两种方式方式一直接实例化Threadpython import threading import time def worker(name, delay): print(f线程 {name} 开始工作) time.sleep(delay) print(f线程 {name} 完成) # 创建线程 t1 threading.Thread(targetworker, args(A, 2)) t2 threading.Thread(targetworker, args(B, 1)) t1.start() t2.start() t1.join() # 等待线程结束 t2.join() print(所有线程执行完毕)方式二继承Thread类python class MyThread(threading.Thread): def __init__(self, name, delay): super().__init__() self.name name self.delay delay def run(self): print(f线程 {self.name} 开始) time.sleep(self.delay) print(f线程 {self.name} 结束) t MyThread(Worker, 2) t.start() t.join()2.2 线程同步锁Lock多个线程同时修改共享变量会导致竞态条件python counter 0 def unsafe_increment(): global counter for _ in range(100000): counter 1 # 不是原子操作 threads [threading.Thread(targetunsafe_increment) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() print(f预期 1,000,000实际 {counter}) # 通常小于预期解决方案使用锁python lock threading.Lock() counter 0 def safe_increment(): global counter for _ in range(100000): with lock: # 上下文管理器自动获取和释放锁 counter 1 # 执行后 counter 正好等于 1,000,000其他同步原语RLock可重入锁、Semaphore信号量、Event事件、Condition条件变量。2.3 线程间通信queue.QueueQueue是线程安全的FIFO队列完美解决生产者-消费者问题python from queue import Queue import threading import random def producer(q, id): for i in range(3): item f产品_{id}_{i} q.put(item) print(f生产者{id} 生产 {item}) time.sleep(random.random()) def consumer(q, id): while True: item q.get() if item is None: # 终止信号 break print(f消费者{id} 消费 {item}) q.task_done() time.sleep(random.random()) q Queue(maxsize5) producers [threading.Thread(targetproducer, args(q, i)) for i in range(2)] consumers [threading.Thread(targetconsumer, args(q, i)) for i in range(3)] for t in producers consumers: t.start() for t in producers: t.join() # 发送终止信号 for _ in consumers: q.put(None) for t in consumers: t.join()2.4 线程池concurrent.futures.ThreadPoolExecutor手动管理大量线程很麻烦线程池复用线程提高效率python from concurrent.futures import ThreadPoolExecutor, as_completed def task(n): time.sleep(1) return n * n with ThreadPoolExecutor(max_workers5) as executor: futures [executor.submit(task, i) for i in range(10)] for future in as_completed(futures): print(f结果: {future.result()})2.5 GIL全局解释器锁Python线程的紧箍咒GIL是CPython解释器的一个互斥锁它确保同一时刻只有一个线程执行Python字节码。这意味着即使在多核CPU上Python的多线程也无法并行执行CPU密集型任务。python # CPU密集型计算多线程 vs 单线程 def cpu_heavy(n): while n 0: n - 1 # 单线程执行 start time.time() cpu_heavy(50_000_000) print(f单线程耗时: {time.time()-start:.2f}s) # 多线程执行4个线程各计算一半 t1 threading.Thread(targetcpu_heavy, args(25_000_000,)) t2 threading.Thread(targetcpu_heavy, args(25_000_000,)) start time.time() t1.start(); t2.start() t1.join(); t2.join() print(f2线程耗时: {time.time()-start:.2f}s) # 反而更慢因为GIL竞争GIL只影响CPU密集型任务对于I/O密集型网络请求、文件读写、数据库操作线程在I/O等待时会释放GIL多线程依然有效。三、Python中的进程multiprocessing模块3.1 创建进程multiprocessing绕过GIL每个进程有独立解释器真正利用多核。python from multiprocessing import Process import os def worker(name): print(f进程 {name} (PID{os.getpid()}) 开始工作) if __name__ __main__: p1 Process(targetworker, args(A,)) p2 Process(targetworker, args(B,)) p1.start(); p2.start() p1.join(); p2.join()注意Windows上必须将启动代码放在 if __name__ __main__: 内否则会无限递归创建进程。3.2 进程间通信IPC进程内存独立不能直接共享变量。multiprocessing提供多种IPC方式方式一Queuepython from multiprocessing import Process, Queue def producer(q): for i in range(5): q.put(f数据{i}) q.put(None) # 结束标志 def consumer(q): while True: item q.get() if item is None: break print(f消费: {item}) if __name__ __main__: q Queue() p1 Process(targetproducer, args(q,)) p2 Process(targetconsumer, args(q,)) p1.start(); p2.start() p1.join(); p2.join()方式二Pipe双向管道python from multiprocessing import Process, Pipe def worker(conn): conn.send(Hello from child) print(fChild received: {conn.recv()}) conn.close() if __name__ __main__: parent_conn, child_conn Pipe() p Process(targetworker, args(child_conn,)) p.start() print(fParent received: {parent_conn.recv()}) parent_conn.send(Hello from parent) p.join()方式三Manager共享复杂对象Manager可以创建代理对象让多个进程共享列表、字典等python from multiprocessing import Process, Manager def worker(shared_list, shared_dict, key, value): shared_list.append(key) shared_dict[key] value if __name__ __main__: with Manager() as manager: shared_list manager.list() shared_dict manager.dict() processes [Process(targetworker, args(shared_list, shared_dict, i, i*2)) for i in range(5)] for p in processes: p.start() for p in processes: p.join() print(f列表: {shared_list}) print(f字典: {shared_dict})3.3 进程同步与线程类似multiprocessing也提供Lock、Semaphore等防止多进程同时访问共享资源如文件、终端python from multiprocessing import Process, Lock def print_with_lock(lock, msg): with lock: print(msg) # 避免多个进程的输出交错 if __name__ __main__: lock Lock() ps [Process(targetprint_with_lock, args(lock, f来自进程{i})) for i in range(5)] for p in ps: p.start() for p in ps: p.join()3.4 进程池Pool创建大量进程开销大使用进程池复用进程python from multiprocessing import Pool def f(x): return x * x if __name__ __main__: with Pool(processes4) as pool: # map 方法 results pool.map(f, range(10)) print(results) # 异步方法 async_result pool.apply_async(f, (10,)) print(async_result.get())3.5 共享内存Value和Array对于简单数据类型可以使用共享内存比Manager快得多python from multiprocessing import Process, Value, Array def worker(val, arr): val.value 1 for i in range(len(arr)): arr[i] * 2 if __name__ __main__: num Value(i, 0) # i 表示有符号整数 arr Array(i, range(5)) # 整数数组 [0,1,2,3,4] p Process(targetworker, args(num, arr)) p.start() p.join() print(fnum {num.value}) # 1 print(farr {arr[:]}) # [0,2,4,6,8]四、线程 vs 进程如何选择4.1 任务类型决定一切任务类型推荐方案原因CPU密集型大量计算、图像处理、机器学习多进程绕过GIL真正并行利用多核I/O密集型网络爬虫、Web服务、文件读写多线程线程轻量切换开销小GIL影响小高并发Web服务协程 多进程异步IO 多核扩展4.2 实战案例对比案例1计算素数CPU密集型python # 计算0~N之间的素数个数 def count_primes(n): count 0 for i in range(2, n1): if all(i % j ! 0 for j in range(2, int(i**0.5)1)): count 1 return count # 多线程版 def threaded_version(): with ThreadPoolExecutor(max_workers4) as ex: futures [ex.submit(count_primes, 50000) for _ in range(4)] return sum(f.result() for f in futures) # 多进程版 def multiprocess_version(): with Pool(4) as p: results p.map(count_primes, [50000]*4) return sum(results) # 单线程版 def single_version(): return sum(count_primes(50000) for _ in range(4)) # 执行时间参考值 # 单线程: 8.2s # 多线程: 8.5s 甚至更慢 # 多进程: 2.3s 接近4倍提升案例2下载100个网页I/O密集型python import requests urls [https://httpbin.org/delay/1] * 100 # 模拟延迟 def download(url): return requests.get(url).status_code # 多线程版推荐 with ThreadPoolExecutor(max_workers20) as ex: results list(ex.map(download, urls)) # 约1.2秒 # 多进程版不推荐开销大 with Pool(20) as p: results p.map(download, urls) # 约1.5秒 进程创建开销 # 单线程版 results [download(url) for url in urls] # 约100秒五、高级主题与最佳实践5.1 协程轻量级并发当需要成千上万个并发任务时线程和进程都显得沉重。此时可用协程asyncio它在单线程内实现用户态调度开销极小。python import asyncio async def f.sleep(delay) # 模拟异步IO return data async def main(): tasks [fetch_data(i) for i in range(1000)] results await asyncio.gather(*tasks) print(len(results)) asyncio.run(main()) # 0.1秒内完成1000个任务推荐组合协程负责高并发网络请求多进程负责利用多核计算各司其职。5.2 多进程多线程混合架构典型的Web后端架构主进程负责管理多个工作进程每个工作进程内用线程池处理请求。python from multiprocessing import Process from concurrent.futures import ThreadPoolExecutor def worker_process(): with ThreadPoolExecutor(max_workers10) as executor: # 处理请求循环... pass if __name__ __main__: processes [Process(targetworker_process) for _ in range(4)] for p in processes: p.start() for p in processes: p.join()5.3 性能对比:以下代码对比单线程、多线程、多进程在CPU和I/O任务下的表现python import time import threading import multiprocessing as mp from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # CPU密集型计算斐波那契数列 def fib(n): if n 1: return n return fib(n-1) fib(n-2) # I/O密集型模拟休眠 def io_task(duration): time.sleep(duration) return duration def benchmark(func, args, mode, workers4): start time.time() if mode thread: with ThreadPoolExecutor(workers) as ex: results list(ex.map(func, args)) elif mode process: with ProcessPoolExecutor(workers) as ex: results list(ex.map(func, args)) else: # single results [func(arg) for arg in args] elapsed time.time() - start return elapsed, results if __name__ __main__: # CPU密集型任务计算 fib(35) 4次 cpu_args [35] * 4 t_cpu_single, _ benchmark(fib, cpu_args, single) t_cpu_thread, _ benchmark(fib, cpu_args, thread, 4) t_cpu_process, _ benchmark(fib, cpu_args, process, 4) print(fCPU任务单线程 {t_cpu_single:.2f}s | 多线程 {t_cpu_thread:.2f}s | 多进程 {t_cpu_process:.2f}s) # I/O密集型任务sleep 0.5秒 20次 io_args [0.5] * 20 t_io_single, _ benchmark(io_task, io_args, single) t_io_thread, _ benchmark(io_task, io_args, thread, 10) t_io_process, _ benchmark(io_task, io_args, process, 10) print(fI/O任务单线程 {t_io_single:.2f}s | 多线程 {t_io_thread:.2f}s | 多进程 {t_io_process:.2f}s) # 输出 CPU任务单线程 12.34s | 多线程 12.89s | 多进程 3.45s I/O任务单线程 10.02s | 多线程 1.05s | 多进程 1.12s5.4 常见陷阱与避坑指南陷阱描述解决方案死锁线程A等锁B线程B等锁A统一锁顺序或使用try-finally超时机制虚假唤醒wait()可能无故返回始终在循环中检查条件进程启动慢Windows上spawn方式启动慢使用forkLinux或减少进程数量共享数据不一致多进程修改同一文件使用文件锁或multiprocessing.Lock内存爆炸进程池任务数据太大使用mmap或队列传递引用5.5 小结与注意事项I/O密集 首选ThreadPoolExecutor简单高效CPU密集 首选ProcessPoolExecutor接近线性加速需要共享大量状态使用Manager但注意性能瓶颈高并发网络使用asyncio aiohttp始终使用with管理锁和池避免资源泄漏注意不要在多线程中做CPU密集型计算不要为每个小任务创建新进程进程创建开销大不要在多个进程中频繁通过Manager传递大数据序列化开销不要忘记join()等待子进程/线程导致僵尸进程或主线程提前结束六、总结特性多线程多进程协程适用场景I/O密集型CPU密集型高并发I/O资源开销小MB级大独立内存极小KB级数据共享容易需加锁困难IPC容易单线程多核利用不可以受GIL限制可以不可以最大并发量数百~数千数十~数百数十万编程复杂度中注意锁高IPC管理中async/await建议CPU任务用进程IO任务用线程万级并发用协程混合场景灵活组合。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2486124.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!