编写程序让智能燃气检测仪检测到燃气泄漏时,不仅报警,还能模拟语音提示,关闭燃气阀门。
智能燃气检测与安全防护系统一、实际应用场景描述现代家庭厨房普遍使用天然气、液化气等燃气作为烹饪能源。虽然燃气为生活带来便利但其泄漏可能导致中毒、爆炸等严重安全事故。特别是在用户外出、夜间熟睡或老人独居时往往难以及时发现燃气泄漏错过最佳处置时机。本系统基于智能仪器课程中的气体传感器技术、实时数据处理、执行器控制和物联网安全理念设计一套完整的智能燃气安全防护系统。系统集成了MQ系列燃气传感器、蜂鸣器报警、语音提示和电动阀门控制实现检测-报警-语音播报-自动断气的四重防护机制特别适用于家庭厨房、餐厅后厨、燃气锅炉房等场景。二、引入痛点1. 发现滞后传统燃气报警器仅发出刺耳蜂鸣用户可能不在现场或未察觉2. 误报困扰单纯蜂鸣无法说明具体情况用户难以判断是否真泄漏3. 处置不及时即使发现报警手动关闭阀门可能因慌乱而延误4. 缺乏联动传统报警器无法与其他安全设备协同工作5. 状态不明用户无法远程了解燃气阀门当前状态6. 维护缺失缺乏传感器校准和故障自检机制三、核心逻辑讲解┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ 燃气传感器 │────▶│ 数据处理 │────▶│ 泄漏判定 ││ MQ-2/MQ-5 │ │ AD转换滤波 │ │ 浓度阈值? │└─────────────────┘ └─────────────────┘ └─────────────────┘│┌─────────────────┐ ││ 安全执行 │◀─────────────────┘│ (报警语音关阀)│└─────────────────┘│┌────────────────────┼────────────────────┐▼ ▼ ▼┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ 蜂鸣器报警 │ │ TTS语音提示 │ │ 电动关断阀 ││ 声光警示 │ │ 状态播报 │ │ 切断气源 │└─────────────┘ └─────────────┘ └─────────────┘核心逻辑流程1. 系统初始化传感器校准、阀门复位、语音引擎加载2. 周期性采样每500ms读取MQ传感器ADC值3. 信号处理滑动窗口滤波去除噪声干扰4. 泄漏判定浓度ppm 阈值(1000ppm)判定为泄漏5. 安全执行触发三级响应 - 声光报警 语音播报 自动关阀6. 状态同步记录事件日志更新系统状态7. 恢复机制确认安全后需手动复位才能重新通气四、代码模块化项目结构smart_gas_detector/├── main.py # 主程序入口├── config.py # 系统配置文件├── gas_sensor/ # 燃气检测模块│ ├── __init__.py│ └── mq_sensor.py # MQ系列传感器驱动├── alert/ # 报警模块│ ├── __init__.py│ ├── buzzer.py # 蜂鸣器控制│ └── voice_prompt.py # 语音提示模块├── actuator/ # 执行器模块│ ├── __init__.py│ └── gas_valve.py # 燃气阀门控制├── safety/ # 安全逻辑模块│ ├── __init__.py│ └── safety_manager.py # 安全管理器├── utils/ # 工具模块│ ├── __init__.py│ ├── logger.py # 日志记录│ └── filter.py # 信号滤波├── data/ # 数据存储│ └── gas_log.csv├── requirements.txt└── README.md1. config.py - 系统配置文件系统配置文件存储所有硬件参数和安全阈值配置# 燃气传感器配置SENSOR_CONFIG {adc_channel: 0, # ADS1115 ADC通道adc_address: 0x48, # I2C地址ro_clean_air: 9.83, # 洁净空气中Ro值需校准rl: 10.0, # 负载电阻(kΩ)min_ppm: 200, # 最小检测浓度(ppm)max_ppm: 10000, # 最大检测浓度(ppm)sample_interval: 0.5, # 采样间隔(秒)filter_window: 10 # 滤波窗口大小}# 安全阈值配置SAFETY_THRESHOLDS {warning_ppm: 500, # 预警浓度(ppm)danger_ppm: 1000, # 危险浓度(ppm)critical_ppm: 3000, # 严重泄漏浓度(ppm)hysteresis: 100 # 迟滞值防止频繁切换}# 报警配置ALERT_CONFIG {buzzer_pin: 18, # 蜂鸣器GPIO引脚led_pin: 23, # 警示LED引脚alarm_duration: 30, # 报警持续时间(秒)voice_enabled: True, # 启用语音提示voice_volume: 80 # 语音音量(0-100)}# 阀门配置VALVE_CONFIG {valve_pin: 24, # 阀门控制引脚valve_type: normally_closed, # 常闭型阀门close_duration: 2.0, # 关阀动作时长(秒)manual_override: True # 允许手动超控}# 系统配置SYSTEM_CONFIG {log_file: data/gas_log.csv,calibration_required: True, # 首次运行需要校准auto_restart: False, # 禁止自动重启安全考虑heartbeat_interval: 60 # 心跳检测间隔(秒)}2. gas_sensor/mq_sensor.py - 燃气传感器模块MQ系列燃气传感器模块基于智能仪器课程中的半导体气体传感器原理支持MQ-2/MQ-5/MQ-135等多种型号import timeimport mathfrom config import SENSOR_CONFIGclass MGSensor:MQ系列燃气传感器类基于半导体气敏元件工作原理- 金属氧化物表面吸附氧气形成耗尽层- 可燃气体分子与氧反应改变耗尽层厚度- 电导率变化反映气体浓度核心公式Rs/Ro A * (ppm)^-B其中Rs为传感器电阻Ro为洁净空气电阻# 不同气体的特性曲线参数A, BGAS_CURVES {lpg: {a: 447.71, b: -2.95}, # 液化石油气co: {a: 599.65, b: -2.85}, # 一氧化碳ch4: {a: 1012.7, b: -2.79}, # 甲烷h2: {a: 987.99, b: -2.62}, # 氢气alcohol: {a: 200.77, b: -2.76}, # 酒精smoke: {a: 300.0, b: -2.5} # 烟雾}def __init__(self, sensor_typemq2):初始化燃气传感器Args:sensor_type: 传感器型号 (mq2, mq5, mq135)self.sensor_type sensor_type.lower()self.adc_channel SENSOR_CONFIG[adc_channel]self.adc_address SENSOR_CONFIG[adc_address]self.rl SENSOR_CONFIG[rl]self.ro None # 洁净空气电阻需校准获得self.calibration_samples []print(f[燃气传感器] 初始化 {sensor_type.upper()} 传感器...)self._initialize_adc()print([燃气传感器] 初始化完成)def _initialize_adc(self):初始化ADC转换器实际环境中使用ADS1115进行模数转换# 模拟ADC初始化实际项目中替换为真实I2C通信# import Adafruit_ADS1x15# self.adc Adafruit_ADS1x15.ADS1115(addressself.adc_address)# self.adc.start_adc(self.adc_channel, gain1)print(f[ADC] 初始化ADS1115 0x{self.adc_address:02X})def calibrate_ro(self, samples50, delay0.1):校准洁净空气中的Ro值必须在洁净空气中执行此校准Args:samples: 采样次数delay: 采样间隔(秒)Returns:float: 校准后的Ro值(kΩ)print([校准] 开始校准Ro值请确保环境空气洁净...)print([校准] 倒计时 5 秒...)time.sleep(5)raw_values []for i in range(samples):raw self._read_raw_adc()raw_values.append(raw)print(f 采样 {i1}/{samples}: {raw})time.sleep(delay)avg_raw sum(raw_values) / len(raw_values)self.ro self._calculate_rs(avg_raw) / SENSOR_CONFIG[ro_clean_air]print(f[校准] Ro {self.ro:.2f} kΩ)print([校准] 校准完成)return self.rodef _read_raw_adc(self):读取ADC原始值Returns:int: ADC原始读数 (0-32767)# 模拟ADC读取实际项目中替换为真实读取# value self.adc.get_last_result()import random# 模拟洁净空气值 ~15000有燃气时降低base_value random.uniform(12000, 18000)noise random.gauss(0, 500)return int(base_value noise)def _calculate_rs(self, raw_adc):计算传感器电阻Rs分压电路原理Vout Vcc * RL / (RS RL)RS RL * (Vcc - Vout) / VoutArgs:raw_adc: ADC原始读数Returns:float: 传感器电阻Rs(kΩ)# ADC参考电压通常为3.3V或5Vv_ref 3.3max_adc 32767# 计算输出电压v_out (raw_adc / max_adc) * v_ref# 防止除零if v_out 0.01:v_out 0.01# 计算Rsrs self.rl * (v_ref - v_out) / v_outreturn rsdef read_ppm(self, gas_typelpg):读取指定气体的浓度值Args:gas_type: 气体类型 (lpg, co, ch4, h2, alcohol, smoke)Returns:dict: 包含浓度数据的字典{ppm: float, # 浓度值(ppm)rs_ro_ratio: float, # Rs/Ro比值raw_adc: int, # 原始ADC值timestamp: str # 时间戳}if self.ro is None:raise RuntimeError(传感器未校准请先调用calibrate_ro())raw_adc self._read_raw_adc()rs self._calculate_rs(raw_adc)rs_ro_ratio rs / self.ro# 根据气体特性曲线计算ppmcurve self.GAS_CURVES.get(gas_type, self.GAS_CURVES[lpg])a curve[a]b curve[b]# ppm (Rs/Ro)^(1/-b) / aif rs_ro_ratio 0:ppm math.pow(rs_ro_ratio, 1.0 / b) / aelse:ppm 0.0# 限制合理范围ppm max(0, min(ppm, SENSOR_CONFIG[max_ppm]))from datetime import datetimereturn {ppm: round(ppm, 2),rs_ro_ratio: round(rs_ro_ratio, 4),raw_adc: raw_adc,timestamp: datetime.now().strftime(%Y-%m-%d %H:%M:%S)}def read_all_gases(self):读取所有支持气体的浓度Returns:dict: 各气体浓度字典results {}for gas in self.GAS_CURVES.keys():results[gas] self.read_ppm(gas)[ppm]return resultsdef get_resistance(self):获取当前传感器电阻Returns:float: 传感器电阻值(kΩ)raw_adc self._read_raw_adc()return self._calculate_rs(raw_adc)def is_warming_up(self):检查传感器是否处于预热状态MQ传感器上电后需要1-2分钟预热Returns:bool: 是否正在预热# 模拟预热检查return self.ro is Nonedef cleanup(self):清理传感器资源print([燃气传感器] 清理资源...)# 实际项目中关闭ADC连接# self.adc.stop_adc()print([燃气传感器] 资源清理完成)# 测试代码if __name__ __main__:sensor MGSensor(mq2)# 模拟校准sensor.ro 9.83 # 假设已校准print(\n开始读取气体浓度10次:\n)for i in range(10):data sensor.read_ppm(lpg)print(f#{i1} | 时间: {data[timestamp]} | fLPG: {data[ppm]:6.1f} ppm | fRs/Ro: {data[rs_ro_ratio]:.4f} | fADC: {data[raw_adc]})time.sleep(0.5)sensor.cleanup()3. alert/buzzer.py - 蜂鸣器报警模块蜂鸣器报警模块提供声光报警功能import timeimport threadingfrom config import ALERT_CONFIGclass BuzzerAlert:蜂鸣器报警控制器支持多种报警模式- 连续音持续蜂鸣- 脉冲音间歇蜂鸣- 渐变音频率变化- SOS音国际求救信号def __init__(self):初始化蜂鸣器self.buzzer_pin ALERT_CONFIG[buzzer_pin]self.led_pin ALERT_CONFIG[led_pin]self.is_alarming Falseself.alarm_thread Noneself._setup_gpio()print(f[蜂鸣器] 初始化完成引脚: {self.buzzer_pin})def _setup_gpio(self):设置GPIO引脚# 模拟GPIO初始化实际项目中替换为真实GPIO操作# import RPi.GPIO as GPIO# GPIO.setmode(GPIO.BCM)# GPIO.setup(self.buzzer_pin, GPIO.OUT)# GPIO.setup(self.led_pin, GPIO.OUT)print(f[GPIO] 设置引脚 {self.buzzer_pin}(蜂鸣器), {self.led_pin}(LED))def start_alarm(self, alarm_typecontinuous):启动报警Args:alarm_type: 报警类型 (continuous, pulse, sos, ramp)if self.is_alarming:returnself.is_alarming Trueself.alarm_thread threading.Thread(targetself._alarm_loop,args(alarm_type,),daemonTrue)self.alarm_thread.start()print(f[报警] 启动 {alarm_type} 报警模式)def stop_alarm(self):停止报警self.is_alarming Falseif self.alarm_thread:self.alarm_thread.join(timeout1)self._turn_off()print([报警] 报警已停止)def _alarm_loop(self, alarm_type):报警循环while self.is_alarming:if alarm_type continuous:self._continuous_beep()elif alarm_type pulse:self._pulse_beep()elif alarm_type sos:self._sos_beep()elif alarm_type ramp:self._ramp_beep()def _continuous_beep(self):连续蜂鸣self._turn_on()time.sleep(0.5)def _pulse_beep(self):脉冲蜂鸣self._turn_on()time.sleep(0.2)self._turn_off()time.sleep(0.3)def _sos_beep(self):SOS求救信号# S: ··· (短长短短长)for _ in range(3):self._turn_on(); time.sleep(0.15)self._turn_off(); time.sleep(0.15)time.sleep(0.15)# O: --- (长短长短长)for _ in range(3):self._turn_on(); time.sleep(0.45)self._turn_off(); time.sleep(0.15)time.sleep(0.15)# S: ···for _ in range(3):self._turn_on(); time.sleep(0.15)self._turn_off(); time.sleep(0.15)time.sleep(1.5)def _ramp_beep(self):渐变蜂鸣for freq in range(1, 5):self._turn_on()time.sleep(0.1 * freq)self._turn_off()time.sleep(0.1)def _turn_on(self):打开蜂鸣器和LED# 实际GPIO操作:# GPIO.output(self.buzzer_pin, GPIO.HIGH)# GPIO.output(self.led_pin, GPIO.HIGH)pass # 模拟模式下不实际操作def _turn_off(self):关闭蜂鸣器和LED# 实际GPIO操作:# GPIO.output(self.buzzer_pin, GPIO.LOW)# GPIO.output(self.led_pin, GPIO.LOW)pass # 模拟模式下不实际操作def beep_once(self, duration0.2):单次蜂鸣Args:duration: 蜂鸣时长(秒)self._turn_on()time.sleep(duration)self._turn_off()def cleanup(self):清理资源self.stop_alarm()print([蜂鸣器] 资源清理完成)if __name__ __main__:buzzer BuzzerAlert()print(测试连续报警...)buzzer.start_alarm(continuous)time.sleep(3)buzzer.stop_alarm()print(\n测试SOS报警...)buzzer.start_alarm(sos)time.sleep(6)buzzer.stop_alarm()buzzer.cleanup()4. alert/voice_prompt.py - 语音提示模块语音提示模块基于TTS技术实现燃气泄漏语音播报import timeimport threadingfrom config import ALERT_CONFIGclass VoicePrompt:语音提示控制器使用文本转语音(TTS)技术播报安全提示支持中文语音合成# 预定义语音提示模板VOICE_TEMPLATES {leak_detected: 警告检测到燃气泄漏浓度超标请立即撤离,valve_closed: 燃气阀门已自动关闭请开窗通风切勿开关电器,safe_now: 燃气浓度已恢复正常请检查泄漏原因后再通气。,calibration: 燃气检测器校准中请确保环境空气洁净。,system_ready: 燃气检测系统就绪正在实时监控中。,low_battery: 燃气检测器电量不足请及时充电或更换电池。}def __init__(self):初始化语音引擎self.enabled ALERT_CONFIG[voice_enabled]self.volume ALERT_CONFIG[voice_volume]self.is_speaking Falseself.tts_engine Noneif self.enabled:self._initialize_tts()print(f[语音] 初始化完成状态: {已启用 if self.enabled else 已禁用})def _initialize_tts(self):初始化TTS引擎支持多种TTS后端gTTS, pyttsx3, pico2wavetry:# 尝试使用pyttsx3离线TTSimport pyttsx3self.tts_engine pyttsx3.init()# 设置语速self.tts_engine.setProperty(rate, 150)# 设置音量self.tts_engine.setProperty(volume, self.volume / 100.0)# 设置中文语音如果可用voices self.tts_engine.getProperty(voices)for voice in voices:if chinese in voice.languages[0].decode().lower():self.tts_engine.setProperty(voice, voice.id)breakprint([TTS] 使用pyttsx3引擎)except ImportError:print([TTS] pyttsx3未安装使用模拟模式)def speak(self, text, prioritynormal):播报文本Args:text: 要播报的文本priority: 优先级 (low, normal, high, critical)if not self.enabled:print(f[语音-静音] {text})returnif self.is_speaking:if priority critical:# 关键提示可打断当前播报self._stop_speaking()else:returnself.is_speaking Trueif self.tts_engine:# 使用真实TTS引擎self._speak_thread(text)else:# 模拟模式self._simulate_speech(text)def _speak_thread(self, text):TTS播报线程def tts_worker():self.tts_engine.say(text)self.tts_engine.runAndWait()self.is_speaking Falsethread threading.Thread(targettts_worker, daemonTrue)thread.start()def _simulate_speech(self, text):模拟语音播报无TTS引擎时print(f\n{*50})print( [语音播报])print(f {text})print(f{*50}\n)# 模拟播报时长time.sleep(len(text) * 0.15)self.is_speaking Falsedef play_template(self, template_name, **kwargs):播放预定义语音模板Args:template_name: 模板名称**kwargs: 模板变量if template_name in self.VOICE_TEMPLATES:text self.VOICE_TEMPLATES[template_name]# 替换变量for key, value in kwargs.items():text text.replace(f{{{key}}}, str(value))self.speak(text, priorityhigh)def stop_speaking(self):停止当前播报if self.tts_engine:self._stop_speaking()self.is_speaking Falsedef _stop_speaking(self):停止TTS引擎try:self.tts_engine.stop()except:passdef set_volume(self, volume):设置音量Args:volume: 音量值(0-100)self.volume max(0, min(100, volume))if self.tts_engine:self.tts_engine.setProperty(volume, self.volume / 100.0)def cleanup(self):清理资源self.stop_speaking()if self.tts_engine:self.tts_engine.stop()print([语音] 资源清理完成)# 测试代码if __name__ __main__:voice VoicePrompt()print(测试语音播报:\n)voice.play_template(leak_detected)time.sleep(2)voice.play_template(valve_closed)time.sleep(2)voice.speak(这是一条普通优先级的测试消息。)time.sleep(2)voice.speak(这是一条关键提示会打断之前的播报, prioritycritical)voice.cleanup()5. actuator/gas_valve.py - 燃气阀门控制模块燃气阀门控制模块控制电动燃气阀的开关实现气源自动切断import timeimport threadingimport loggingfrom config import VALVE_CONFIG, SYSTEM_CONFIGclass GasValveController:燃气阀门控制器支持常闭(NC)和常开(NO)两种阀门类型具备安全联锁保护机制VALVE_STATES {OPEN: open,CLOSED: closed,OPENING: opening,CLOSING: closing,FAULT: fault}def __init__(self):初始化阀门控制器self.valve_pin VALVE_CONFIG[valve_pin]self.valve_type VALVE_CONFIG[valve_type]self.close_duration VALVE_CONFIG[close_duration]self.manual_override VALVE_CONFIG[manual_override]self.current_state self.VALVE_STATES[CLOSED]self.target_state self.VALVE_STATES[CLOSED]self.is_moving Falseself.last_operation_time Noneself.fault_count 0self._setup_gpio()print(f[阀门] 初始化完成引脚: {self.valve_pin}, f类型: {常闭 if self.valve_type normally_closed else 常开})def _setup_gpio(self):设置GPIO引脚# 模拟GPIO初始化实际项目中替换为真实GPIO操作# import RPi.GPIO as GPIO# GPIO.setmode(GPIO.BCM)# GPIO.setup(self.valve_pin, GPIO.OUT)# GPIO.output(self.valve_pin, GPIO.LOW if self.valve_type normally_closed else GPIO.HIGH)print(f[GPIO] 设置阀门引脚 {self.valve_pin})def get_state(self):获取阀门当前状态Returns:dict: 阀门状态信息return {state: self.current_state,target_state: self.target_state,is_moving: self.is_moving利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2434605.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!