吴恩达MCP课程(5):mcp_chatbot_prompt_resource.py

news2025/7/21 18:02:28

前提条件:
1、吴恩达MCP课程(5):research_server_prompt_resource.py
2、server_config_prompt_resource.json文件

{
    "mcpServers": {
        "filesystem": {
            "command": "npx",
            "args": [
                "--y",
                "@modelcontextprotocol/server-filesystem",
                "."
            ]
        },
        "research": {
            "command": "uv",
            "args": ["run", "research_server_prompt_resource.py"]
        },
        "fetch": {
            "command": "uvx",
            "args": ["mcp-server-fetch"]
        }
    }
}

代码

原课程用的anthropic的,下面改成openai,并用千问模型做测试

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
import json
import asyncio
import os


load_dotenv()

class MCP_ChatBot:
    def __init__(self):
        self.exit_stack = AsyncExitStack()
        self.client = openai.OpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            base_url=os.getenv("OPENAI_API_BASE")
        )
        # Tools list required for OpenAI API
        self.available_tools = []
        # Prompts list for quick display 
        self.available_prompts = []
        # Sessions dict maps tool/prompt names or resource URIs to MCP client sessions
        self.sessions = {}

    async def connect_to_server(self, server_name, server_config):
        try:
            server_params = StdioServerParameters(**server_config)
            stdio_transport = await self.exit_stack.enter_async_context(
                stdio_client(server_params)
            )
            read, write = stdio_transport
            session = await self.exit_stack.enter_async_context(
                ClientSession(read, write)
            )
            await session.initialize()
            
            
            try:
                # List available tools
                response = await session.list_tools()
                for tool in response.tools:
                    self.sessions[tool.name] = session
                    self.available_tools.append({
                        "type": "function",
                        "function": {
                            "name": tool.name,
                            "description": tool.description,
                            "parameters": tool.inputSchema
                        }
                    })
            
                # List available prompts
                prompts_response = await session.list_prompts()
                if prompts_response and prompts_response.prompts:
                    for prompt in prompts_response.prompts:
                        self.sessions[prompt.name] = session
                        self.available_prompts.append({
                            "name": prompt.name,
                            "description": prompt.description,
                            "arguments": prompt.arguments
                        })
                # List available resources
                resources_response = await session.list_resources()
                if resources_response and resources_response.resources:
                    for resource in resources_response.resources:
                        resource_uri = str(resource.uri)
                        self.sessions[resource_uri] = session
            
            except Exception as e:
                print(f"Error {e}")
                
        except Exception as e:
            print(f"Error connecting to {server_name}: {e}")

    async def connect_to_servers(self):
        try:
            with open("server_config_prompt_resource.json", "r") as file:
                data = json.load(file)
            servers = data.get("mcpServers", {})
            for server_name, server_config in servers.items():
                await self.connect_to_server(server_name, server_config)
        except Exception as e:
            print(f"Error loading server config: {e}")
            raise
    
    async def process_query(self, query):
        messages = [{"role":"user", "content":query}]
        
        while True:
            response = self.client.chat.completions.create(
                model="qwen-turbo",
                tools=self.available_tools,
                messages=messages
            )
            
            message = response.choices[0].message
            
            # 检查是否有普通文本内容
            if message.content:
                print(message.content)
                messages.append({"role": "assistant", "content": message.content})
                break
                
            # 检查是否有工具调用
            elif message.tool_calls:
                # 添加助手消息到历史
                messages.append({
                    "role": "assistant", 
                    "content": None,
                    "tool_calls": message.tool_calls
                })
                
                # 处理每个工具调用
                for tool_call in message.tool_calls:
                    tool_id = tool_call.id
                    tool_name = tool_call.function.name
                    tool_args = json.loads(tool_call.function.arguments)
                    
                    print(f"Calling tool {tool_name} with args {tool_args}")
                    
                    # 获取session并调用工具
                    session = self.sessions.get(tool_name)
                    if not session:
                        print(f"Tool '{tool_name}' not found.")
                        break
                        
                    result = await session.call_tool(tool_name, arguments=tool_args)
                    
                    # 添加工具结果到消息历史
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tool_id,
                        "content": result.content
                    })
            else:
                break

    async def get_resource(self, resource_uri):
        session = self.sessions.get(resource_uri)
        
        # Fallback for papers URIs - try any papers resource session
        if not session and resource_uri.startswith("papers://"):
            for uri, sess in self.sessions.items():
                if uri.startswith("papers://"):
                    session = sess
                    break
            
        if not session:
            print(f"Resource '{resource_uri}' not found.")
            return
        
        try:
            result = await session.read_resource(uri=resource_uri)
            if result and result.contents:
                print(f"\nResource: {resource_uri}")
                print("Content:")
                print(result.contents[0].text)
            else:
                print("No content available.")
        except Exception as e:
            print(f"Error: {e}")
    
    async def list_prompts(self):
        """List all available prompts."""
        if not self.available_prompts:
            print("No prompts available.")
            return
        
        print("\nAvailable prompts:")
        for prompt in self.available_prompts:
            print(f"- {prompt['name']}: {prompt['description']}")
            if prompt['arguments']:
                print(f"  Arguments:")
                for arg in prompt['arguments']:
                    arg_name = arg.name if hasattr(arg, 'name') else arg.get('name', '')
                    print(f"    - {arg_name}")
    
    async def execute_prompt(self, prompt_name, args):
        """Execute a prompt with the given arguments."""
        session = self.sessions.get(prompt_name)
        if not session:
            print(f"Prompt '{prompt_name}' not found.")
            return
        
        try:
            result = await session.get_prompt(prompt_name, arguments=args)
            if result and result.messages:
                prompt_content = result.messages[0].content
                
                # Extract text from content (handles different formats)
                if isinstance(prompt_content, str):
                    text = prompt_content
                elif hasattr(prompt_content, 'text'):
                    text = prompt_content.text
                else:
                    # Handle list of content items
                    text = " ".join(item.text if hasattr(item, 'text') else str(item) 
                                  for item in prompt_content)
                
                print(f"\nExecuting prompt '{prompt_name}'...")
                await self.process_query(text)
        except Exception as e:
            print(f"Error: {e}")
    
    async def chat_loop(self):
        print("\nMCP Chatbot Started!")
        print("Type your queries or 'quit' to exit.")
        print("Use @folders to see available topics")
        print("Use @<topic> to search papers in that topic")
        print("Use /prompts to list available prompts")
        print("Use /prompt <name> <arg1=value1> to execute a prompt")
        
        while True:
            try:
                query = input("\nQuery: ").strip()
                if not query:
                    continue
        
                if query.lower() == 'quit':
                    break
                
                # Check for @resource syntax first
                if query.startswith('@'):
                    # Remove @ sign  
                    topic = query[1:]
                    if topic == "folders":
                        resource_uri = "papers://folders"
                    else:
                        resource_uri = f"papers://{topic}"
                    await self.get_resource(resource_uri)
                    continue
                
                # Check for /command syntax
                if query.startswith('/'):
                    parts = query.split()
                    command = parts[0].lower()
                    
                    if command == '/prompts':
                        await self.list_prompts()
                    elif command == '/prompt':
                        if len(parts) < 2:
                            print("Usage: /prompt <name> <arg1=value1> <arg2=value2>")
                            continue
                        
                        prompt_name = parts[1]
                        args = {}
                        
                        # Parse arguments
                        for arg in parts[2:]:
                            if '=' in arg:
                                key, value = arg.split('=', 1)
                                args[key] = value
                        
                        await self.execute_prompt(prompt_name, args)
                    else:
                        print(f"Unknown command: {command}")
                    continue
                
                await self.process_query(query)
                    
            except Exception as e:
                print(f"\nError: {str(e)}")
    
    async def cleanup(self):
        await self.exit_stack.aclose()


async def main():
    chatbot = MCP_ChatBot()
    try:
        await chatbot.connect_to_servers()
        await chatbot.chat_loop()
    finally:
        await chatbot.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

代码解释

这段代码实现了一个基于MCP(Model Context Protocol)的聊天机器人,能够连接到多个MCP服务器,使用它们提供的工具、提示和资源。下面是对代码的详细解释:

类结构与初始化

class MCP_ChatBot:
    def __init__(self):
        self.exit_stack = AsyncExitStack()
        self.client = openai.OpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            base_url=os.getenv("OPENAI_API_BASE")
        )
        # Tools list required for OpenAI API
        self.available_tools = []
        # Prompts list for quick display 
        self.available_prompts = []
        # Sessions dict maps tool/prompt names or resource URIs to MCP client sessions
        self.sessions = {}
  • AsyncExitStack:用于管理多个异步上下文管理器,确保资源正确释放
  • openai.OpenAI:创建OpenAI客户端,使用环境变量中的API密钥和基础URL
  • available_tools:存储可用工具列表,用于OpenAI API的工具调用功能
  • available_prompts:存储可用提示列表,用于快速显示
  • sessions:字典,将工具名称、提示名称或资源URI映射到对应的MCP客户端会话

服务器连接

async def connect_to_server(self, server_name, server_config):
    # 创建服务器参数、建立连接并初始化会话
    # 获取可用工具、提示和资源

这个方法负责:

  1. 使用提供的配置创建StdioServerParameters
  2. 通过stdio_client建立与服务器的连接
  3. 创建并初始化ClientSession
  4. 获取服务器提供的工具、提示和资源
  5. 将它们添加到相应的列表和字典中
async def connect_to_servers(self):
    # 从配置文件加载服务器信息并连接到每个服务器

这个方法从server_config_prompt_resource.json文件加载服务器配置,并为每个服务器调用connect_to_server方法。

查询处理

async def process_query(self, query):
    # 处理用户查询,使用OpenAI API和可用工具

这个方法是聊天机器人的核心,它:

  1. 创建包含用户查询的消息列表
  2. 调用OpenAI API,传递消息和可用工具
  3. 处理API的响应:
    • 如果有普通文本内容,打印并添加到消息历史
    • 如果有工具调用,执行每个工具调用并将结果添加到消息历史

工具调用的处理流程:

  1. 从响应中提取工具ID、名称和参数
  2. sessions字典中获取对应的会话
  3. 调用工具并获取结果
  4. 将结果添加到消息历史中

资源和提示管理

async def get_resource(self, resource_uri):
    # 获取并显示指定URI的资源内容

这个方法用于获取和显示资源内容,特别是论文资源。它会:

  1. sessions字典中获取对应的会话
  2. 对于以papers://开头的URI,如果找不到对应会话,会尝试使用任何处理papers的会话
  3. 调用read_resource方法获取资源内容
  4. 打印资源内容
async def list_prompts(self):
    # 列出所有可用的提示

这个方法列出所有可用的提示及其描述和参数。

async def execute_prompt(self, prompt_name, args):
    # 执行指定的提示,并处理结果

这个方法执行指定的提示,并将结果传递给process_query方法进行处理。它处理不同格式的提示内容,确保能够正确提取文本。

聊天循环

async def chat_loop(self):
    # 主聊天循环,处理用户输入

这个方法是聊天机器人的主循环,它:

  1. 打印欢迎信息和使用说明
  2. 循环接收用户输入
  3. 根据输入的不同格式执行不同操作:
    • 如果输入是quit,退出循环
    • 如果输入以@开头,调用get_resource方法获取资源
    • 如果输入以/开头,执行命令(如列出提示或执行提示)
    • 否则,调用process_query方法处理普通查询

资源清理

async def cleanup(self):
    # 清理资源

这个方法调用exit_stack.aclose()来清理所有资源。

主函数

async def main():
    chatbot = MCP_ChatBot()
    try:
        await chatbot.connect_to_servers()
        await chatbot.chat_loop()
    finally:
        await chatbot.cleanup()

主函数创建MCP_ChatBot实例,连接到服务器,运行聊天循环,并确保在结束时清理资源。

代码运行结果

uv run mcp_chatbot_prompt_resource.py

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2401694.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

设计模式——抽象工厂设计模式(创建型)

摘要 抽象工厂设计模式是一种创建型设计模式&#xff0c;旨在提供一个接口&#xff0c;用于创建一系列相关或依赖的对象&#xff0c;无需指定具体类。它通过抽象工厂、具体工厂、抽象产品和具体产品等组件构建&#xff0c;相比工厂方法模式&#xff0c;能创建一个产品族。该模…

基于LocalAI与cpolar技术协同的本地化AI模型部署与远程访问方案解析

文章目录 前言1. Docker部署2. 简单使用演示3. 安装cpolar内网穿透4. 配置公网地址5. 配置固定公网地址前言 各位极客朋友们!今天要向大家推荐一套创新性的本地部署方案——LocalAI技术架构。这款开源工具包能够将普通配置的笔记本电脑转化为具备强大算力的AI工作站,轻松实现…

霍尔效应传感器的革新突破:铟化铟晶体与结构演进驱动汽车点火系统升级

一、半导体材料革新&#xff1a;铟化铟晶体的电压放大机制 铟化铟&#xff08;InSb&#xff09;晶体因其独特的能带结构&#xff0c;成为提升霍尔电压的关键材料。相较于传统硅基材料&#xff0c;其载流子迁移率高出3-5倍&#xff0c;在相同磁场强度下可显著放大霍尔电压。其作…

无法运用pytorch环境、改环境路径、隔离环境

一.未建虚拟环境时 1.创建新项目后&#xff0c;直接运行是这样的。 2.设置中Virtualenv找不到pytorch环境&#xff1f;因为此时没有创建新虚拟环境。 3.选择conda环境&#xff08;全局环境&#xff09;时&#xff0c;是可以下载环境的。 运行结果如下&#xff1a; 是全局环境…

从0开始学vue:pnpm怎么安装

一、什么是 pnpm&#xff1f; pnpm&#xff08;Performant npm&#xff09;是新一代 JavaScript 包管理器&#xff0c;优势包括&#xff1a; 节省磁盘空间&#xff1a;通过硬链接和符号链接实现高效存储安装速度更快&#xff1a;比 npm/yarn 快 2-3 倍内置工作区支持&#xf…

Python 网络编程 -- WebSocket编程

作者主要是为了用python构建实时网络通信程序。 概念性的东西越简单越好理解,因此,下面我从晚上摘抄的概念 我的理解。 什么是网络通信? 更确切地说&#xff0c;网络通信是两台计算机上的两个进程之间的通信。比如&#xff0c;浏览器进程和新浪服务器上的某个Web服务进程在通…

边缘计算应用实践心得

当数据中心的光纤开始承载不了爆炸式增长的物联网数据流时&#xff0c;边缘计算就像毛细血管般渗透进现代数字肌理的末梢。这种将算力下沉到数据源头的技术范式&#xff0c;本质上是对传统云计算中心化架构的叛逆与补充——在智能制造车间里&#xff0c;实时质检算法直接在工业…

EXCEL如何快速批量给两字姓名中间加空格

EXCEL如何快速批量给姓名中间加空格 优点&#xff1a;不会导致排版混乱 缺点&#xff1a;无法输出在原有单元格上&#xff0c;若需要保留原始数据&#xff0c;可将公式结果复制后“选择性粘贴为值” 使用场景&#xff1a;在EXCEL中想要快速批量给两字姓名中间加入空格使姓名对…

Jenkins | Linux环境部署Jenkins与部署java项目

1. 部署jenkins 1.1 下载war包 依赖环境 jdk 11 下载地址: https://www.jenkins.io/ 依赖环境 1.2 启动服务 启动命令 需要注意使用jdk11以上的版本 直接启动 # httpPort 指定端口 #-Xms2048m -Xmx4096m 指定java 堆内存初始大小 与最大大小 /usr/java/jdk17/bin/java…

react私有样式处理

react私有样式处理 Nav.jsx Menu.jsx vue中通过scoped来实现样式私有化。加上scoped&#xff0c;就属于当前组件的私有样式。 给视图中的元素都加了一个属性data-v-xxx&#xff0c;然后给这些样式都加上属性选择器。&#xff08;deep就是不加属性也不加属性选择器&#xff09; …

UDP/TCP协议全解

目录 一. UDP协议 1.UDP协议概念 2.UDP数据报格式 3.UDP协议差错控制 二. TCP协议 1.TCP协议概念 2.三次握手与四次挥手 3.TCP报文段格式&#xff08;重点&#xff09; 4.流量控制 5.拥塞控制 一. UDP协议 1.UDP协议概念 当应用层的进程1要向进程2传输报文&#xff…

Duix.HeyGem:以“离线+开源”重构数字人创作生态

在AI技术快速演进的今天,虚拟数字人正从高成本、高门槛的专业领域走向大众化应用。Duix.HeyGem 数字人项目正是这一趋势下的杰出代表。该项目由一支拥有七年AI研发经验的团队打造,通过放弃传统3D建模路径,转向真人视频驱动的AI训练模型,成功实现了低成本、高质量、本地化的…

ubuntu22.04安装megaton

前置 sudo apt-get install git cmake ninja-build generate-ninja安装devkitPro https://blog.csdn.net/qq_39942341/article/details/148388639?spm1001.2014.3001.5502 安装cargo https://blog.csdn.net/qq_39942341/article/details/148387783?spm1001.2014.3001.5501 …

Windows应用-GUID工具

下载本应用 我们在DirectShow和媒体基础程序的调试中&#xff0c;将会遇到大量的GUID&#xff0c;调试窗口大部分情况下只给出GUID字符串&#xff0c;此GUID代表什么&#xff0c;我们无从得知。这时&#xff0c;就需要此“GUID工具”&#xff0c;将GUID字符串翻译为GUID定义&am…

vue+element-ui一个页面有多个子组件组成。子组件里面有各种表单,实现点击enter实现跳转到下一个表单元素的功能。

一个父组件里面是有各个子组件的form表单组成的。 我想实现点击enter。焦点直接跳转到下一个表单元素。 父组件就是由各个子组件构成 子组件就像下图一样的都有个el-form的表单。 enterToTab.js let enterToTab {}; (function() {// 返回随机数enterToTab.addEnterListener …

Vehicle HAL(5)--vhal 实现设置属性的流程

目录 1. ard11 vhal 设置属性的时序图 CarService > vhal > CarService 2. EmulatedVehicleHal::set(xxx) 的实现 本文介绍ard11的vhal属性设置流程图。 1. ard11 vhal 设置属性的时序图 CarService > vhal > CarService 2. EmulatedVehicleHal::set(xxx) 的实现…

WebRTC中的几个Rtp*Sender

一、问题&#xff1a; webrtc当中有几个比较相似的类&#xff0c;看着都是发送RTP数据包的&#xff0c;分别是&#xff1a;RtpPacketToSend 和RtpSenderVideo还有RtpVideoSender以及RTPSender&#xff0c;这说明什么呢&#xff1f;首先&#xff0c;说明我会很多连词&#xff0…

代码随想录算法训练营第十一天 | 150. 逆波兰表达式求值、239. 滑动窗口最大值、347.前 K 个高频元素、栈与队列总结

150. 逆波兰表达式求值--后缀表达式 力扣题目链接(opens new window) 根据 逆波兰表示法&#xff0c;求表达式的值。 有效的运算符包括 , - , * , / 。每个运算对象可以是整数&#xff0c;也可以是另一个逆波兰表达式。 说明&#xff1a; 整数除法只保留整数部分。 给…

IDEA202403 设置主题和护眼色

文章目录 背景一、设置主题二、设置背景豆沙绿三、设置控制台颜色 背景 在用IDEA进行开发时&#xff0c;长时间对着屏幕&#xff0c;不费眼是及其重要 一、设置主题 默认的主题是 Dark 暗黑&#xff0c;可更改为其他&#xff0c;如Light 高亮 位置&#xff1a;编辑栏【files…

无人机螺旋桨平衡方法详解

螺旋桨平衡对于优化无人机性能、可靠性和使用寿命至关重要。不平衡的螺旋桨会产生过度振动&#xff0c;导致推力效率降低、噪音增大&#xff0c;并加速轴承和传感器的磨损。 螺旋桨平衡可通过三种方式实现&#xff1a;静态平衡、动态平衡和气动平衡。 静态与动态平衡是通过在…