
更多Python学习内容:ipengtao.com
大家好,我是涛哥,今天为大家分享 Python多线程优化:提升程序性能的实例,全文5600字,阅读大约16钟。
多线程是一种有效的并发编程方式,能够提高程序的性能。本文将通过详细的实例代码,探讨如何优化Python多线程程序,以充分发挥多核处理器的潜力,提升程序的执行效率。
1. 多线程基础
首先,看一个简单的多线程示例,通过Python的threading模块创建两个线程并同时执行任务。
import threading
import time
def task1():
    for _ in range(5):
        print("Task 1")
        time.sleep(1)
def task2():
    for _ in range(5):
        print("Task 2")
        time.sleep(1)
if __name__ == "__main__":
    thread1 = threading.Thread(target=task1)
    thread2 = threading.Thread(target=task2)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()2. 线程同步与互斥锁
在多线程环境中,为了避免竞争条件和保证数据一致性,需要使用互斥锁。以下是一个使用threading.Lock的例子:
import threading
counter = 0
counter_lock = threading.Lock()
def update_counter():
    global counter
    with counter_lock:
        for _ in range(100000):
            counter += 1
if __name__ == "__main__":
    thread1 = threading.Thread(target=update_counter)
    thread2 = threading.Thread(target=update_counter)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print("Counter:", counter)3. 线程池优化
使用线程池可以更好地管理线程的生命周期,减少线程的创建和销毁开销。以下是一个使用concurrent.futures.ThreadPoolExecutor的例子:
from concurrent.futures import ThreadPoolExecutor
import time
def task(num):
    print(f"Task {num} started")
    time.sleep(2)
    print(f"Task {num} completed")
if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=3) as executor:
        for i in range(5):
            executor.submit(task, i)4. 多线程与I/O密集型任务
对于I/O密集型任务,使用异步编程更为高效。以下是一个使用asyncio的例子:
import asyncio
async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 completed")
async def task2():
    print("Task 2 started")
    await asyncio.sleep(2)
    print("Task 2 completed")
if __name__ == "__main__":
    asyncio.run(asyncio.gather(task1(), task2()))5. 避免全局解释器锁(GIL)
在CPython解释器中,全局解释器锁(GIL)限制了同一时刻只能有一个线程执行Python字节码。对于CPU密集型任务,可以考虑使用concurrent.futures.ProcessPoolExecutor,利用多进程来避免GIL。
from concurrent.futures import ProcessPoolExecutor
def square(n):
    return n * n
if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        result = list(executor.map(square, [1, 2, 3, 4, 5]))
    print("Result:", result)6. 线程安全的数据结构
在多线程环境中,选择线程安全的数据结构是至关重要的。以下是一个使用queue.Queue实现线程安全队列的例子:
import threading
import queue
import time
def producer(q):
    for i in range(5):
        time.sleep(1)
        item = f"Item {i}"
        print(f"Producing {item}")
        q.put(item)
def consumer(q):
    while True:
        time.sleep(2)
        item = q.get()
        if item is None:
            break
        print(f"Consuming {item}")
if __name__ == "__main__":
    my_queue = queue.Queue()
    producer_thread = threading.Thread(target=producer, args=(my_queue,))
    consumer_thread = threading.Thread(target=consumer, args=(my_queue,))
    producer_thread.start()
    consumer_thread.start()
    producer_thread.join()
    my_queue.put(None)  # Signal consumer to exit
    consumer_thread.join()7. 自定义线程池
有时候,可能需要更多的线程控制权,这时可以考虑实现自定义线程池。以下是一个简单的自定义线程池示例:
import threading
import queue
import time
class CustomThreadPool:
    def __init__(self, max_workers):
        self.max_workers = max_workers
        self.work_queue = queue.Queue()
        self.workers = []
    def submit(self, func, args):
        self.work_queue.put((func, args))
    def worker(self):
        while True:
            func, args = self.work_queue.get()
            if func is None:
                break
            func(*args)
    def start(self):
        for _ in range(self.max_workers):
            worker_thread = threading.Thread(target=self.worker)
            worker_thread.start()
            self.workers.append(worker_thread)
    def join(self):
        for _ in range(self.max_workers):
            self.work_queue.put((None, None))
        for worker_thread in self.workers:
            worker_thread.join()
def task(num):
    print(f"Task {num} started")
    time.sleep(2)
    print(f"Task {num} completed")
if __name__ == "__main__":
    custom_pool = CustomThreadPool(max_workers=3)
    for i in range(5):
        custom_pool.submit(task, (i,))
    custom_pool.start()
    custom_pool.join()8. 使用threading.Event进行线程间通信
 
 在多线程编程中,线程间通信是一个重要的话题。使用threading.Event可以实现简单而有效的线程间通信。以下是一个示例:
import threading
import time
def worker(event, thread_num):
    print(f"Thread {thread_num} waiting for event.")
    event.wait()  # 等待事件被设置
    print(f"Thread {thread_num} received the event.")
if __name__ == "__main__":
    event = threading.Event()
    threads = []
    for i in range(3):
        thread = threading.Thread(target=worker, args=(event, i))
        threads.append(thread)
        thread.start()
    print("Main thread sleeping for 2 seconds.")
    time.sleep(2)
    event.set()  # 设置事件,通知所有等待的线程
    for thread in threads:
        thread.join()9. 使用threading.Condition进行复杂线程同步
 
 在某些情况下,需要更复杂的线程同步机制。threading.Condition提供了这样的功能,以下是一个生产者-消费者问题的示例:
import threading
import time
MAX_BUFFER_SIZE = 3
buffer = []
buffer_lock = threading.Lock()
buffer_not_full = threading.Condition(lock=buffer_lock)
buffer_not_empty = threading.Condition(lock=buffer_lock)
def producer():
    global buffer
    for i in range(5):
        time.sleep(1)
        with buffer_not_full:
            while len(buffer) == MAX_BUFFER_SIZE:
                buffer_not_full.wait()  # 缓冲区已满,等待通知
            buffer.append(i)
            print(f"Produced {i}")
            buffer_not_empty.notify()  # 通知消费者缓冲区非空
def consumer():
    global buffer
    for i in range(5):
        time.sleep(2)
        with buffer_not_empty:
            while not buffer:
                buffer_not_empty.wait()  # 缓冲区为空,等待通知
            item = buffer.pop(0)
            print(f"Consumed {item}")
            buffer_not_full.notify()  # 通知生产者缓冲区未满
if __name__ == "__main__":
    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)
    producer_thread.start()
    consumer_thread.start()
    producer_thread.join()
    consumer_thread.join()10. 使用threading.Timer进行定时任务
 
 threading.Timer可以用于执行定时任务,以下是一个简单的定时任务的示例:
import threading
def print_hello():
    print("Hello, Timer!")
if __name__ == "__main__":
    timer = threading.Timer(5, print_hello)  # 5秒后执行print_hello函数
    timer.start()
    timer.join()总结
通过本文,深入探讨了Python中多线程编程的各个方面,并提供了丰富的示例代码来演示不同的技术和最佳实践。在多线程编程中,学习如何创建和启动线程,处理线程间通信,使用线程锁进行同步,以及通过队列实现线程安全的数据交换。还深入了解了线程池的概念和实现,展示了如何自定义线程池以及处理线程池中的任务。进一步地,介绍了线程间通信的不同方式,包括使用threading.Event进行简单通信和使用threading.Condition进行复杂的线程同步。还演示了如何利用threading.Timer实现定时任务,以及在多线程环境中的异常处理和安全性考虑。
通过这些例子,可以更全面地理解和应用多线程编程,更好地解决实际问题,并提高Python程序的效率。在设计和优化多线程程序时,根据具体需求选择适当的线程同步机制和通信方式是至关重要的。
如果你觉得文章还不错,请大家 点赞、分享、留言 下,因为这将是我持续输出更多优质文章的最强动力!
更多Python学习内容:ipengtao.com
干货笔记整理
100个爬虫常见问题.pdf ,太全了!
Python 自动化运维 100个常见问题.pdf
Python Web 开发常见的100个问题.pdf
124个Python案例,完整源代码!
PYTHON 3.10中文版官方文档
耗时三个月整理的《Python之路2.0.pdf》开放下载
最经典的编程教材《Think Python》开源中文版.PDF下载

点击“阅读原文”,获取更多学习内容








![[操作系统] 文件管理](https://img-blog.csdnimg.cn/img_convert/3ccd0a8a6df362ee98c82451ecdf9451.png)

![BUUCTF [ACTF新生赛2020]swp 1](https://img-blog.csdnimg.cn/direct/fb6afa02cddb4c9282c6cd8f858e850f.png)








