目录
一、Watchdog示例
二、aiohttp服务配置热更新
在同事的golang代码中学习到了config.json热更新的功能,这里自己也学习了一下python写web服务的时候怎么来实现配置的热更新。主要是利用Watchdog这个第三方python库,来监控文件系统的改变,从而实现不用重启代码热更新的功能。
一、Watchdog示例
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class HotUpdateEventHandler(FileSystemEventHandler):
    def on_created(self, event):
        if event.is_directory:
            return
        print(f'File created: {event.src_path}')
    def on_modified(self, event):
        if event.is_directory:
            return
        print(f'File modified: {event.src_path}')
if __name__ == "__main":
    path = "/root"  # 检测根目录下文件和目录的改变
    # 处理器
    event_handler = HotUpdateEventHandler()
    # 监控器
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    try:
        while True:
            # 其他的功能逻辑
            do_something()
    except KeyboardInterrupt:
        observer.stop()
    observer.join() 
流程:
1、定义事件处理器(文件的修改、删除等对应逻辑)
2、定义监控器
3、监控器中执行调度,传入事件处理器、监控路径和是否递归监控
重要的两个class
watchdog.observers.Obsever
watchdog.events.FileSystemEventHandler
Obsever发起监控线程,FileSystemEventHandler实现具体的文件系统变化的逻辑
在实现具体的功能的时候,需要自己重写FileSystemEventHandler中的方法

主要常用的就是以上的方法
其中event具有的属性如下

二、aiohttp服务配置热更新
import json
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import aiohttp
from aiohttp import web
import os
config_path = "./test.json"
config = json.load(open(config_path))
def reload_config():
    """热重载配置"""
    global config
    try:
        new_config = json.load(open(config_path))
        print('Config reloaded successfully.')
        config = new_config
    except Exception as e:
        print(f'Failed to reload config: {e}')
class ConfigReloadEventHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.src_path == config_path:
            print(f"Config file modified: {event.src_path}. Reloading...")
            reload_config()
async def post(request:web.Request):
    req = await request.json()
    send_data = config
    return web.json_response(send_data)
async def main():
    app = web.Application()
    app.add_routes([
        web.post('/nlp', post)
    ])
    return app
# 使用方法
if __name__ == "__main__":
    config_event_handler = ConfigReloadEventHandler()
    path, filename = os.path.split(config_path)
    observer = Observer()
    observer.schedule(config_event_handler,path,recursive=False)
    observer.start()
    try:
        local_ip = "0.0.0.0"
        port = "6666"
        web.run_app(main(), port=port)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
 
使用aiohttp启动一个web 服务,同时热更新test.json配置文件,作为全局变量config在web服务中使用

test.json内容
{"name":"huangyang"} 
请求结果

修改test.json文件内容
{"name":"huangyang_new"} 
再次请求结果为

服务端日志

参考文章
watchdog 2.1.5 documentation
解密Python Watchdog:实时监控文件系统的终极解决方案

















