MCP项目笔记三(server)
MCP Server服务端设计请求分发、回调注册、通知发送与 transport 连接Server实现了一个典型 MCP 服务端骨架的核心结构。它并不直接承载具体业务而是提供了一组通用能力包括请求接收与 JSON 解析方法分发回调覆盖异步通知发送transport 抽象与连接管理同步 / 异步运行模式从职责划分上看这个Server更接近一个通用框架通信层由transport负责协议处理与分发由Server负责具体业务逻辑则通过默认命令函数或回调覆盖注入。一、Server 的整体角色该Server的主要职责可以概括为以下几部分持有并管理transport_通过functionMap建立 method 与 handler 的映射关系通过HandleRequest()统一处理客户端请求通过OverrideCallback()提供运行时覆盖默认行为的能力通过notification_queue_与WriterLoop()实现异步通知发送通过同步或异步连接函数驱动整个生命周期从结构上看可以将它抽象为Server 协议层 分发层 生命周期管理 transport 底层通信通道 callback / plugin 业务逻辑实现在这种设计中Server不直接依赖底层通信细节也不将业务逻辑硬编码在主流程中而是将协议处理、传输层和业务扩展点区分开来。二、请求分发请求分发是整个Server的核心处理机制主要由两部分构成functionMap方法注册表HandleRequest()统一分发入口客户端请求进入系统后的基本路径如下客户端发送 JSON ↓ transport_-Read() / ReadAsync() ↓ json::parse() ↓ HandleRequest() ↓ 根据 method 查找 functionMap ↓ 调用对应 XxxCmd() ↓ 返回 json response ↓ transport_-Write()这条链路说明HandleRequest()是服务端请求处理的中心入口。1. functionMap 的作用在构造阶段Server会注册一组 method 与对应处理函数的映射例如initializepingresources/listresources/readtools/listtools/callprompts/listnotifications/...其本质是method 字符串 → std::functionjson(const json)即通过 method 字符串定位具体 handler。例如initialize对应InitializeCmdping对应PingCmdtools/call对应ToolsCallCmd这里使用 lambda 的原因在于成员函数需要绑定this而functionMap需要保存统一签名的可调用对象因此 lambda 充当了包装层。2. HandleRequest 的处理过程HandleRequest()的职责包括可选地输出请求日志校验请求中是否包含method在functionMap中查找对应 handler调用 handler 并返回其结果在找不到 method 时构造标准错误响应其核心流程可表示为JSON Request ↓ HandleRequest() ↓ 是否包含 method? ↓ ↓ 否 是 ↓ ↓ InvalidRequest functionMap.find(method) ↓ 是否找到 handler? ↓ ↓ 否 是 ↓ ↓ MethodNotFound handler(request) ↓ 返回 response这种分发方式避免了大量if-else或switch逻辑使 method 扩展与默认处理的管理集中在统一路由表中。3. request 与 notification 的区别在读循环中处理结果通常会经过如下判断if(response!nullptr){transport_-Write(response.dump());}这意味着handler 返回非空 JSON表示该消息是 request需要返回 responsehandler 返回nullptr表示该消息是 notification不返回任何响应因此请求与通知的区分并不依赖额外的独立流程而是依赖 handler 的返回值语义。三、回调注册回调注册机制用于在运行时覆盖默认 method 的处理逻辑。它的核心接口是boolServer::OverrideCallback(conststd::stringmethod,std::functionjson(constjson)function)其本质是修改functionMap中指定 method 的 handler。1. OverrideCallback 的工作方式其核心行为可以概括为functionMap[method]std::move(function);但现有实现只允许覆盖已存在的 method即先检查该 method 是否已经注册。因此它的语义不是“动态添加任意新方法”而是“替换框架已有方法的默认实现”。2. 在整体流程中的位置覆盖完成后请求链路变为客户端请求 ↓ HandleRequest() ↓ 查 functionMap ↓ 调用 handler其中 handler 既可能是构造函数中预注册的默认命令函数也可能是通过OverrideCallback()替换后的函数。也就是说回调注册改变的是“查表后的落点”。3. 框架与业务的分工这种设计体现了清晰的分工框架负责transport 生命周期管理JSON 解析请求分发线程管理默认协议方法实现业务层负责具体的工具调用逻辑资源读取逻辑prompt 获取逻辑对默认方法进行覆盖例如tools/call默认实现只是一个占位逻辑提示需要在插件中重写这表明该方法本身就是为业务层覆盖预留的。4. 设计边界该实现中的回调注册有几个明确边界默认仅允许覆盖已有 method对functionMap的修改未显式加锁运行期动态覆盖不具备线程安全保障通知类 method 若被覆盖仍应遵循 notification 语义返回nullptr因此该机制更适合在服务启动前完成注册而不是在高并发运行过程中频繁调整。四、通知发送通知发送采用的是异步生产者-消费者模型而不是调用方直接执行transport_-Write()。其基本链路如下业务代码 / 插件 ↓ SendNotification(pluginName, notification) ↓ notification_queue_ ↓ queue_cv_.notify_one() ↓ WriterLoop() ↓ transport_-Write(notification)这意味着通知发送被拆分为两个阶段业务线程提交通知写线程统一串行发送通知1. SendNotification 的职责SendNotification()的职责主要包括检查服务是否处于停止状态将通知写入notification_queue_通过条件变量唤醒写线程它本身并不直接执行底层写操作而只是充当通知生产者。2. WriterLoop 的职责WriterLoop()是发送通知的消费者线程。其典型逻辑为等待队列中出现待发送通知从队列中取出一条通知在适当的锁保护下调用transport_-Write()当前实现中WriterLoop只消费通知队列因此它在语义上是“通知专用写线程”。3. 为什么通知采用队列而不是直接写采用通知队列有三个直接原因第一避免并发写 transport多个线程若同时调用transport_-Write()容易导致输出交叉或线程安全问题。第二保证通知顺序队列天然提供 FIFO 顺序能够保证通知按入队顺序发送。第三避免业务线程阻塞如果底层 transport 写操作较慢直接写会拖慢调用方线程改为入队后由写线程统一消费可以将发送阻塞与业务线程解耦。4. response 与 notification 的写路径差异该实现中存在两类输出路径请求响应由读循环在处理 request 后直接写回transport_-Write(response.dump());服务端主动通知由SendNotification()提交到队列再由WriterLoop()发送transport_-Write(notification_to_send);因此当前实现并没有将所有输出统一到单写线程而是将 notification 单独放入异步写路径。5. output_mutex_ 的作用由于 response 与 notification 可能来自不同线程底层写操作可能并发发生因此output_mutex_同时保护队列访问transport 写操作这样可以避免响应输出与通知输出在底层通道中发生交叉。虽然这种设计的锁粒度较大但实现相对简单。五、连接 transporttransport是Server与外部世界之间的抽象通信层。Server并不直接依赖具体通信方式而是通过ITransport接口与底层 IO 解耦。1. transport 的抽象角色在类中transport_的类型为std::shared_ptrITransporttransport_;这意味着Server只依赖如下抽象能力Start()Read()/ReadAsync()Write()Stop()至于底层是标准输入输出、SSE 还是其他传输方式都由具体ITransport实现决定。这体现的是协议层与传输层分离的设计。2. Connect同步连接模式同步模式入口为boolConnect(conststd::shared_ptrITransporttransport);其主要流程为校验 transport 非空保存到transport_重置停止状态启动writer_thread_调用transport_-Start()进入循环调用transport-Read()对读到的 JSON 执行解析、分发与响应写回在结束时执行停止流程在该模式下Connect()调用后当前线程会被阻塞在读循环中。3. ConnectAsync异步连接模式异步模式入口为boolConnectAsync(conststd::shared_ptrITransporttransport);它与同步模式的区别在于同样保存 transport、初始化状态并启动写线程额外启动reader_thread_在读线程中调用transport_-ReadAsync()ConnectAsync()自身会快速返回因此异步模式中主线程负责启动读线程负责接收请求写线程负责发送通知4. Stoptransport 生命周期结束点Stop()的典型逻辑包括设置isStopping_ true调用transport_-Stop()释放transport_停止写线程并唤醒条件变量join 写线程停止读线程并 join 读线程因此transport 生命周期是由Server统一驱动的连接时注入并启动停止时关闭并释放从这个角度看Connect()/ConnectAsync()的含义更接近“将当前 Server 绑定到一个通信通道并启动运行”而不是传统意义上的主动建立网络连接。六、完整调用链将上述四部分合并后可以得到整个Server从启动到处理消息的完整调用链。1. 客户端 request → 服务端 responseServer::Connect / ConnectAsync ↓ 绑定 transport_初始化状态 ↓ 启动 WriterLoop ↓ 同步模式调用 Read() 异步模式读线程中调用 ReadAsync() ↓ 得到 json_string ↓ json::parse() ↓ HandleRequest() ↓ 根据 method 查找 functionMap ↓ 调用对应 XxxCmd() 或覆盖后的 callback ↓ 返回 response ↓ if (response ! nullptr) ↓ transport_-Write(response.dump())这条链路对应标准 request-response 处理过程。2. 客户端 notification → 服务端处理但不回包客户端发送 notification ↓ Read / ReadAsync ↓ json::parse() ↓ HandleRequest() ↓ 调用 NotificationXXXCmd() ↓ 返回 nullptr ↓ 不执行 transport_-Write()在这条链路中notification 与 request 的处理入口相同但由于返回值为空因此不会产生回包。3. 服务端主动 notification → 客户端接收通知业务代码 / 插件 ↓ SendNotification() ↓ notification_queue_ ↓ queue_cv_.notify_one() ↓ WriterLoop() ↓ transport_-Write(notification)这条链路独立于请求分发流程属于服务端主动推送机制。七、总结这份Server实现的整体结构可以概括为通过Connect()/ConnectAsync()绑定并启动 transport通过Read()/ReadAsync()接收客户端消息通过json::parse()与HandleRequest()完成请求分发通过functionMap建立 method 到 handler 的映射通过OverrideCallback()提供运行时覆盖默认逻辑的能力通过SendNotification()、通知队列和WriterLoop()实现异步主动通知通过Stop()统一收束 transport 与线程生命周期从设计上看这是一套将传输层、协议处理层与业务扩展层分离的服务端骨架实现。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2425879.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!