1. 自定义多线程程序:
import concurrent.futures
import threading
class CustomThreadPool:
def __init__(self, max_workers):
self.max_workers = max_workers
self.pool = concurrent.futures.ThreadPoolExecutor(max_workers)
self.running_num = 0
self.semaphore = threading.Semaphore(max_workers)
def submit(self, fn, *args, **kwargs):
self.semaphore.acquire() # 获取信号量(如果已满则阻塞)
self.running_num += 1
future = self.pool.submit(fn, *args, **kwargs)
future.add_done_callback(self._callback)
return future
def _callback(self, future):
self.running_num -= 1
self.semaphore.release() # 释放信号量,允许新任务提交
def shutdown(self):
self.pool.shutdown()
以上程序实现:使用这个自定义线程池时,当并发任务数达到上限后,新提交的任务会被阻塞,直到有任务完成。
使用程序:
import time
import random
from custom_threadpool import CustomThreadPool # 假设上面的类保存在 custom_threadpool.py 中
# 模拟下载图片的函数(实际应用中可替换为 requests.get 等真实下载逻辑)
def download_image(url):
# 模拟网络请求延迟
delay = random.uniform(0.5, 2.0)
time.sleep(delay)
# 模拟下载结果
size = random.randint(100, 1000)
print(f"下载完成: {url} (大小: {size}KB, 耗时: {delay:.2f}s)")
return {"url": url, "size": size, "status": "success"}
# 主程序
def main():
# 创建一个最大并发数为3的线程池
with CustomThreadPool(max_workers=3) as pool:
# 模拟10个图片下载任务
image_urls = [f"https://example.com/image_{i}.jpg" for i in range(1, 11)]
# 提交所有下载任务
futures = []
for url in image_urls:
future = pool.submit(download_image, url)
futures.append(future)
# 获取所有任务的结果(按完成顺序输出)
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"处理结果: {result['url']} ({result['size']}KB)")
except Exception as e:
print(f"下载失败: {e}")
if __name__ == "__main__":
start_time = time.time()
main()
print(f"总耗时: {time.time() - start_time:.2f}秒")
2.* ** 参数:
def example(a, b, *args, **kwargs):
print(f"固定参数:a={a}, b={b}")
print(f"位置参数:{args}")
print(f"关键字参数:{kwargs}")
example(1, 2, 3, 4, x=5, y=6)
# 输出:
# 固定参数:a=1, b=2
# 位置参数:(3, 4)
# 关键字参数:{'x': 5, 'y': 6}