从分布式计算考试题到实战:用Python模拟Ricart-Agrawala互斥算法(附完整代码)

news2026/4/14 13:17:36
从理论到实践用Python实现Ricart-Agrawala分布式互斥算法分布式系统中最具挑战性的问题之一是如何在多个进程间实现互斥访问共享资源。Ricart-Agrawala算法作为经典的分布式互斥解决方案不仅理论优雅更能通过代码实现直观展示其工作原理。本文将带你从零开始用Python模拟这一算法并通过可视化工具观察其运行过程。1. 理解Ricart-Agrawala算法的核心思想Ricart-Agrawala算法由Glenn Ricart和Ashok Agrawala于1981年提出它解决了分布式环境中无中心协调器情况下的互斥访问问题。该算法基于以下三个基本原则完全分布式没有单点故障风险所有节点地位平等基于请求优先级使用逻辑时钟和时间戳确定请求的先后顺序多数同意原则不需要所有节点同意只需获得足够数量的许可算法的工作流程可以分为四个阶段请求阶段当进程需要访问临界区时向所有其他进程发送请求消息决策阶段收到请求的进程根据自身状态决定是否立即回复或推迟回复执行阶段当收集到足够数量的回复后进入临界区执行释放阶段退出临界区后向所有被推迟回复的进程发送许可消息class Process: def __init__(self, pid, all_processes): self.pid pid # 进程唯一标识 self.clock 0 # 逻辑时钟 self.state RELEASED # 进程状态RELEASED, WANTED, HELD self.deferred set() # 被推迟回复的请求集合 self.all_processes all_processes # 系统中所有进程的引用2. 构建分布式模拟环境在真实分布式环境中进程运行在不同的物理机器上通过网络通信。为了模拟这一环境我们将使用Python的multiprocessing模块创建多个独立进程并通过队列实现进程间通信。2.1 进程间通信设计每个进程需要维护以下核心数据结构请求队列存储收到的其他进程的请求回复队列存储收到的回复延迟队列存储需要延迟处理的请求from multiprocessing import Process, Queue import time import random class DistributedSystem: def __init__(self, num_processes): self.num_processes num_processes self.processes [] self.queues [Queue() for _ in range(num_processes)] def start(self): for i in range(self.num_processes): p Process(targetrun_process, args(i, self.queues, self.num_processes)) p.start() self.processes.append(p)2.2 逻辑时钟实现分布式系统中没有全局时钟Ricart-Agrawala算法使用Lamport逻辑时钟来解决事件排序问题def increment_clock(self): self.clock 1 return self.clock def update_clock(self, received_clock): self.clock max(self.clock, received_clock) 13. 算法核心实现3.1 请求临界区当进程需要进入临界区时它会执行以下操作更新自身状态为WANTED递增逻辑时钟向所有其他进程发送请求消息等待足够数量的回复def request_critical_section(self): self.state WANTED self.increment_clock() request_msg { type: REQUEST, pid: self.pid, timestamp: self.clock } # 向所有其他进程发送请求 for q in self.queues: if q ! self.queues[self.pid]: q.put(request_msg) # 等待N-1个回复 replies_needed self.num_processes - 1 while replies_received replies_needed: if not self.queues[self.pid].empty(): msg self.queues[self.pid].get() if msg[type] REPLY: replies_received 1 self.state HELD3.2 处理接收到的请求当进程收到其他进程的请求时会根据自身状态和请求的时间戳决定立即回复还是推迟回复当前进程状态请求时间戳比较动作HELD任何推迟回复WANTED较早的时间戳推迟回复WANTED较晚的时间戳立即回复RELEASED任何立即回复def handle_request(self, request_msg): self.update_clock(request_msg[timestamp]) if self.state HELD or \ (self.state WANTED and (self.timestamp, self.pid) (request_msg[timestamp], request_msg[pid])): # 推迟回复 self.deferred.add(request_msg[pid]) else: # 立即回复 reply_msg { type: REPLY, pid: self.pid } self.queues[request_msg[pid]].put(reply_msg)4. 可视化与性能分析为了更直观地理解算法行为我们可以使用matplotlib库创建可视化工具展示进程状态变化和消息传递。4.1 状态转换图import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation def visualize_process_states(process_states): fig, ax plt.subplots() def update(frame): ax.clear() for pid, states in process_states.items(): ax.plot(range(len(states)), states, labelfProcess {pid}) ax.legend() ani FuncAnimation(fig, update, frameslen(process_states[0]), interval500) plt.show()4.2 性能指标测量Ricart-Agrawala算法的关键性能指标包括完成一次互斥所需的报文数量理论值为2(n-1)等待时间从发出请求到进入临界区的时间系统吞吐量单位时间内能够完成的互斥操作次数def measure_performance(run_time): start_time time.time() operations 0 while time.time() - start_time run_time: # 模拟进程请求临界区 operations 1 throughput operations / run_time print(f系统吞吐量: {throughput:.2f} 操作/秒)5. 实际应用中的优化策略基础实现虽然正确但在实际分布式环境中还需要考虑以下优化5.1 容错处理分布式系统中节点可能失败我们需要增加超时机制和重试逻辑def request_with_retry(self, max_retries3): retries 0 while retries max_retries: try: return self.request_critical_section() except TimeoutError: retries 1 time.sleep(2 ** retries) # 指数退避 raise Exception(Max retries exceeded)5.2 部分连接优化不是所有进程都需要互相连接可以设计拓扑结构减少通信开销进程通信拓扑示例 P0 —— P1 —— P2 \ / \ / P3 —— P4 —— P55.3 性能对比数据下表比较了不同互斥算法的关键指标算法报文复杂度延迟容错性公平性集中式O(1)低差好Ricart-AgrawalaO(N)中好好令牌环O(∞)高中中MaekawaO(√N)中好中6. 完整代码实现与测试以下是整合了所有组件的完整实现import multiprocessing as mp import time import random from collections import defaultdict class RicartAgrawalaProcess: def __init__(self, pid, queues, num_processes): self.pid pid self.queues queues self.num_processes num_processes self.clock 0 self.state RELEASED self.deferred set() self.replies_received 0 self.request_time None def run(self): while True: # 随机决定是否请求临界区 if random.random() 0.3 and self.state RELEASED: self.request_cs() # 处理收到的消息 self.handle_messages() # 如果在临界区随机决定是否退出 if self.state HELD and random.random() 0.5: self.release_cs() time.sleep(random.uniform(0.1, 0.5)) def request_cs(self): self.state WANTED self.clock 1 self.request_time time.time() self.replies_received 0 request {type: REQUEST, pid: self.pid, timestamp: self.clock} for i in range(self.num_processes): if i ! self.pid: self.queues[i].put(request) def release_cs(self): self.state RELEASED # 向所有被推迟的进程发送回复 for pid in self.deferred: reply {type: REPLY, pid: self.pid} self.queues[pid].put(reply) self.deferred.clear() def handle_messages(self): while not self.queues[self.pid].empty(): msg self.queues[self.pid].get() if msg[type] REQUEST: self.clock max(self.clock, msg[timestamp]) 1 if self.state HELD or \ (self.state WANTED and (self.clock, self.pid) (msg[timestamp], msg[pid])): self.deferred.add(msg[pid]) else: reply {type: REPLY, pid: self.pid} self.queues[msg[pid]].put(reply) elif msg[type] REPLY: self.replies_received 1 if self.replies_received self.num_processes - 1 and self.state WANTED: self.state HELD wait_time time.time() - self.request_time print(fProcess {self.pid} entered CS after {wait_time:.2f}s) def run_process(pid, queues, num_processes): process RicartAgrawalaProcess(pid, queues, num_processes) process.run() if __name__ __main__: num_processes 4 system DistributedSystem(num_processes) system.start() try: while True: time.sleep(1) except KeyboardInterrupt: print(Terminating processes...) for p in system.processes: p.terminate()7. 常见问题与调试技巧在实现分布式算法时经常会遇到以下典型问题死锁确保所有推迟的请求最终都能得到回复活锁引入随机延迟避免进程持续冲突消息丢失添加序列号和确认机制时钟同步严格遵循逻辑时钟更新规则调试分布式系统时可以采用以下策略日志记录为每个重要事件添加详细日志确定性测试固定随机种子重现问题逐步验证先验证双进程场景再扩展到多进程可视化工具实时显示进程状态和消息流# 示例调试日志 def log_event(self, event): with open(fprocess_{self.pid}.log, a) as f: f.write(f[{time.ctime()}] Clock {self.clock} - {event}\n)通过这个完整的实现我们不仅验证了Ricart-Agrawala算法的正确性还展示了如何将分布式系统理论转化为实际可运行的代码。这种从理论到实践的转换能力正是分布式系统开发者需要掌握的核心技能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2516541.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…