互斥锁与消息队列的架构哲学

news2025/6/9 20:16:05

更多精彩内容请访问:通义灵码2.5——基于编程智能体开发Wiki多功能搜索引擎更多精彩内容请访问:更多精彩内容请访问:通义灵码2.5——基于编程智能体开发Wiki多功能搜索引擎

一、资源争用的现实镜像

当多个ATM机共用一个现金库时,出纳员们需要:

  1. 检查库门状态(锁状态检测)

  2. 挂上"使用中"标牌(acquire)

  3. 完成现金交接(临界区操作)

  4. 取下标牌(release)

import threading

cash_vault = 1000000
vault_lock = threading.Lock()

def withdraw(amount):
    global cash_vault
    with vault_lock:  # 自动管理锁周期
        if cash_vault >= amount:
            cash_vault -= amount
            return True
        return False

二、锁机制的进化图谱

2.1 互斥锁的局限性

传统Lock在复杂场景暴露出问题:

  • 嵌套调用导致死锁

  • 无法区分读写操作

  • 长时间阻塞影响系统响应

2.2 读写锁(RWLock)解决方案

from threading import RLock

class Account:
    def __init__(self):
        self._balance = 0
        self._lock = RLock()
    
    def transfer(self, amount):
        with self._lock:  # 可重入锁
            self._balance += amount
    
    def audit(self):
        with self._lock:  # 读操作同样保护
            return self._balance

2.3 条件变量实现精准唤醒

class BoundedBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.queue = []
        self.lock = threading.Lock()
        self.not_empty = threading.Condition(self.lock)
        self.not_full = threading.Condition(self.lock)
    
    def put(self, item):
        with self.not_full:
            while len(self.queue) >= self.capacity:
                self.not_full.wait()
            self.queue.append(item)
            self.not_empty.notify()
    
    def get(self):
        with self.not_empty:
            while not self.queue:
                self.not_empty.wait()
            item = self.queue.pop(0)
            self.not_full.notify()
            return item

三、消息队列的异步革命

3.1 生产者-消费者模式重构

对比传统锁方案与队列方案:

维度锁方案队列方案耦合度高(直接竞争)低(缓冲区解耦)吞吐量依赖锁粒度依赖队列深度错误隔离容易连锁崩溃失败消息可重试

3.2 Python队列实现

import queue
import random

task_queue = queue.Queue(maxsize=5)

def producer():
    while True:
        item = random.randint(1,100)
        task_queue.put(item)  # 自动阻塞直到有空位
        print(f"生产: {item}")

def consumer():
    while True:
        item = task_queue.get()  # 自动阻塞直到有数据
        print(f"消费: {item}")
        task_queue.task_done()

# 启动线程
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()

四、分布式环境下的进阶方案

4.1 Redis实现分布式锁

import redis
from contextlib import contextmanager

redis_cli = redis.Redis()

@contextmanager
def dist_lock(lock_name, timeout=10):
    identifier = str(uuid.uuid4())
    # 获取锁
    if redis_cli.setnx(lock_name, identifier):
        redis_cli.expire(lock_name, timeout)
        try:
            yield
        finally:
            # Lua脚本保证原子性
            script = """
            if redis.call('get',KEYS[1]) == ARGV[1] then
                return redis.call('del',KEYS[1])
            else
                return 0
            end"""
            redis_cli.eval(script, 1, lock_name, identifier)
    else:
        raise Exception("获取锁失败")

4.2 Kafka式消息队列

from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
consumer = KafkaConsumer('my_topic',
                         group_id='my_group',
                         bootstrap_servers='localhost:9092')

# 生产消息
producer.send('my_topic', b'raw_bytes')  

# 消费消息
for msg in consumer:
    print(f"收到: {msg.value}")

五、性能调优实战

5.1 锁竞争热点检测

import threading
import time

class ProfiledLock:
    def __init__(self):
        self._lock = threading.Lock()
        self.wait_stats = []
    
    def acquire(self):
        start = time.monotonic()
        self._lock.acquire()
        wait_time = time.monotonic() - start
        self.wait_stats.append(wait_time)
        return wait_time
    
    def release(self):
        self._lock.release()
    
    def stats(self):
        return {
            'max': max(self.wait_stats),
            'avg': sum(self.wait_stats)/len(self.wait_stats)
        }

5.2 队列水位监控

class MonitoredQueue(queue.Queue):
    def __init__(self, maxsize=0):
        super().__init__(maxsize)
        self.put_history = []
        self.get_history = []
    
    def put(self, item, block=True, timeout=None):
        super().put(item, block, timeout)
        self.put_history.append((time.time(), self.qsize()))
    
    def get(self, block=True, timeout=None):
        item = super().get(block, timeout)
        self.get_history.append((time.time(), self.qsize()))
        return item
    
    def plot_usage(self):
        import matplotlib.pyplot as plt
        plt.plot([t[0] for t in self.put_history], 
                [t[1] for t in self.put_history], label='puts')
        plt.plot([t[0] for t in self.get_history],
                [t[1] for t in self.get_history], label='gets')
        plt.legend()
        plt.show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2405792.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

每日Prompt:治愈动漫插画

提示词 现代都市治愈动漫插画风格,现代女子,漂亮,长直发,20岁,豆沙唇,白皙,气质,清纯现代都市背景下,夕阳西下,一位穿着白色露脐短袖,粉色工装裤…

6.8 note

paxos算法_初步感知 Paxos算法保证一致性主要通过以下几个关键步骤和机制: 准备阶段 - 提议者向所有接受者发送准备请求,请求中包含一个唯一的编号。 - 接受者收到请求后,会检查编号,如果编号比它之前见过的都大,就会承…

面试心得 --- 车载诊断测试常见的一些面试问题

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 做到欲望极简,了解自己的真实欲望,不受外在潮流的影响,不盲从,不跟风。把自己的精力全部用在自己。一是去掉多余,凡事找规律,基础是诚信;二是…

跟进一下目前最新的大数据技术

搭建最新平台 40C64G服务器,搭建3节点kvm,8C12G。 apache-hive-4.0.1-bin apache-tez-0.10.4-bin flink-1.20.1 hadoop-3.4.1 hbase-2.6.2 jdk-11.0.276 jdk8u452-b09 jdk8终于可以不用了 spark-3.5.5-bin-hadoop3 zookeeper-3.9.3 trino…

系统模块与功能设计框架

系统模块与功能设计框架,严格遵循专业架构设计原则,基于行业标准(如微服务架构、DDD领域驱动设计)构建。设计采用分层解耦模式,确保可扩展性和可维护性,适用于电商、企业服务、数字平台等中大型系统。 系统…

我爱学算法之—— 前缀和(中)

一、724. 寻找数组的中心下标 题目解析 这道题,给定数组nums,要求我们找出这个数组的中心下标。 **中心下标:**指左侧所有元素的和等于右侧所有元素的和。 如果存在多个中心数组下标,就返回最左侧的中心数组下标。 算法思路 暴…

Elasticsearch从安装到实战、kibana安装以及自定义IK分词器/集成整合SpringBoot详细的教程ES(三)

DSL官方地址: DSL查询分类 Elasticsearch提供了基于JSON的DSL(https://www.elastic.co/docs/explore-analyze/query-filter/languages/querydsl)来定义查询。常见的查询类型包括: 查询所有:查询出所有数据&#xff0…

React Hooks 指南:何时使用 useEffect ?

在 React 的函数组件中,useEffect Hook 是一个强大且不可或缺的工具。它允许我们处理副作用 (side effects)——那些在组件渲染之外发生的操作。但是,什么时候才是使用 useEffect 的正确时机呢?让我们深入探讨一下! 什么是副作用…

API标准的本质与演进:从 REST 架构到 AI 服务集成

在当今数字化浪潮中,API 已成为系统之间沟通与协作的“语言”,REST(Representational State Transfer,表述性状态转移)是一种基于 HTTP 协议的 Web 架构风格。它不仅改变了 Web 应用开发的方式,也成为构建现…

html - <mark>标签

<mark> 标签在HTML中用于高亮显示文本&#xff0c;通常用于突出显示某些重要的部分。它的默认样式通常是背景色为黄色&#xff0c;但你可以通过CSS自定义其外观。 1. 基本用法 <mark> 标签用于标记文本的高亮显示。它常用于搜索结果中&#xff0c;突出显示匹配的…

JavaWeb:前端工程化-Vue

Vue工程化 介绍 什么是Vue? 小白眼里前端开发 前端工程化 环境准备 D:\Program Files\nodejs Vue项目-快速入门 步骤 D:\front\vue 安装依赖 目录结构 code . vscode打开 启动 VScode侧边栏左下角&#xff0c;没有NPM脚本&#xff0c;如何打开&#xff1f;&…

AT_abc409_e [ABC409E] Pair Annihilation

AT_abc409_e [ABC409E] Pair Annihilation 赛时没开longlong挂了。 思路 首先我们可以把这棵树转化为一颗有根树&#xff0c;且所有电子的都朝根节点移动。 那么接下来我们就需要选择一个最优的树根。 考虑换根dp。 但是可以发现换根时答案其实是没有变化的。 我们设 f…

开疆智能Ethernet/IP转Modbus网关连接西门子BW500积算仪配置案例

本案例是通过Ethernet转Modbus网关将皮带秤数据接入到罗克韦尔1769L32E型PLC中。 首先进行ABB PLC的设置 1&#xff0c; 运行 RSLogix 5000 程序加载Ethernet转Modbus网关的EDS 文件&#xff1a; 2&#xff0c;新建工程并添加PLC 3&#xff0c;New Module添加网关&#xff…

【五子棋在线对战】三.数据管理模块实现

数据管理模块实现 1.数据库表的设计2.数据管理模块的封装和实现2.1 user_table() && ~user_table()2.2 insert() 注册时新增用户2.3 login() 登录验证&#xff0c;并返回详细的用户信息2.4 通过用户名获取用户信息 && 通过用户id获取用户信息2.5 win() &&a…

【JMeter】后置处理器 - 提取器

文章目录 概览边界提取器正则提取器JSON提取器 概览 CSS/JQuery提取器&#xff1b;给网页使用JSON提取器&#xff1a;给JSON数据使用★边界提取器&#xff1a;给字符串使用★正则表达式提取器&#xff1a;更加高级的字符使用★Xpath提取器&#xff1a;给网页使用 边界提取器…

OpenAI技术路线急转:从TypeScript到Rust的Codex CLI重构内幕

目录 前言&#xff1a;OpenAI的技术抉择引发业界思考 Codex CLI&#xff1a;OpenAI的终端AI编程利器 语言抉择的戏剧性反转&#xff1a;从TypeScript到Rust Rust重写的四大技术动因 1. 零依赖部署&#xff1a;消除环境配置痛点 2. 内存安全与沙箱隔离 3. 性能的全面碾压 …

window下配置ssh免密登录服务器

window下配置ssh免密登录服务器 本地windows远程登录我的ssh服务器10.10.101.xx服务器&#xff0c;想要每次都免密登录这个服务器. 记录下教程&#xff0c;防止后期忘记&#xff0c;指导我实现这个过程。 教程 二、实践步骤&#xff1a;Windows 上配置 SSH 免密登录 2.1 确…

nginx部署

配置阿里云yum源 安装如下编译工具 yum install -y gcc gcc-c autoconf automake make #安装使用nginx还得安装nginx所需的一些第三方系统库的支持&#xff0c;比如nginx的静态资源压缩功能所需的gzip lib库&#xff0c;nginx需要支持URL重写&#xff0c;所需的pcre库&…

线性规划饮食问题求解:FastAPI作为服务端+libhv作为客户端实现

之前在 Pyomo介绍-CSDN博客 中介绍过通过Pyomo求解线性规划问题&#xff0c;这里使用FastAPI作为服务端&#xff0c;开源网络库libhv作为客户端&#xff0c;求解饮食成本最小化问题。 服务端测试代码test_fastapi_pyomo_server.py如下&#xff1a; from fastapi import FastAP…

前端验证下跨域问题(npm验证)

文章目录 一、背景二、效果展示三、代码展示3.1&#xff09;index.html3.2&#xff09;package.json3.3&#xff09; service.js3.4&#xff09;service2.js 四、使用说明4.1&#xff09;安装依赖4.2&#xff09;启动服务器4.3&#xff09;访问前端页面 五、跨域解决方案说明六…