原课程代码是用Anthropic写的,下面代码是用OpenAI改写的,模型则用阿里巴巴的模型做测试
.env 文件为:
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_API_BASE=https://dashscope.aliyuncs.com/compatible-mode/v1
目录
- 代码
- 代码解释
- 1. 导入和初始化
- 2. MCP_ChatBot类初始化
- 3. 查询处理核心方法
- 3.1 消息处理循环
- 3.2 工具调用处理
- 4. 聊天循环
- 5. 服务器连接和工具初始化
- 6. 程序入口
- 核心特性
- 示例
代码
from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List
import asyncio
import json
import os
load_dotenv()
class MCP_ChatBot:
def __init__(self):
# Initialize session and client objects
self.session: ClientSession = None
self.client = openai.OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_API_BASE")
)
self.available_tools: List[dict] = []
async def process_query(self, query):
messages = [{'role':'user', 'content':query}]
response = self.client.chat.completions.create(
model='qwen-turbo',
max_tokens=2024,
tools=self.available_tools,
messages=messages
)
process_query = True
while process_query:
# 获取助手的回复
message = response.choices[0].message
# 检查是否有普通文本内容
if message.content:
print(message.content)
process_query = False
# 检查是否有工具调用
elif message.tool_calls:
# 添加助手消息到历史
messages.append({
"role": "assistant",
"content": None,
"tool_calls": message.tool_calls
})
# 处理每个工具调用
for tool_call in message.tool_calls:
tool_id = tool_call.id
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
print(f"Calling tool {tool_name} with args {tool_args}")
# 执行工具调用
result = await self.session.call_tool(tool_name, arguments=tool_args)
# 添加工具结果到消息历史
messages.append({
"role": "tool",
"tool_call_id": tool_id,
"content": result.content
})
# 获取下一个回复
response = self.client.chat.completions.create(
model='qwen-turbo',
max_tokens=2024,
tools=self.available_tools,
messages=messages
)
# 如果只有文本回复,则结束处理
if response.choices[0].message.content and not response.choices[0].message.tool_calls:
print(response.choices[0].message.content)
process_query = False
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Chatbot Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
await self.process_query(query)
print("\n")
except Exception as e:
print(f"\nError: {str(e)}")
async def connect_to_server_and_run(self):
# Create server parameters for stdio connection
server_params = StdioServerParameters(
command="uv", # Executable
args=["run", "research_server.py"], # Optional command line arguments
env=None, # Optional environment variables
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
self.session = session
# Initialize the connection
await session.initialize()
# List available tools
response = await session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
self.available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
await self.chat_loop()
async def main():
chatbot = MCP_ChatBot()
await chatbot.connect_to_server_and_run()
if __name__ == "__main__":
asyncio.run(main())
代码解释
1. 导入和初始化
from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List
import asyncio
import json
import os
load_dotenv()
load_dotenv()
: 加载环境变量文件(.env)中的配置openai
: OpenAI API客户端库mcp
: Model Context Protocol相关模块,用于与MCP服务器通信asyncio
: 异步编程支持json
: JSON数据处理
2. MCP_ChatBot类初始化
class MCP_ChatBot:
def __init__(self):
self.session: ClientSession = None
self.client = openai.OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_API_BASE")
)
self.available_tools: List[dict] = []
session
: MCP客户端会话,用于与MCP服务器通信client
: OpenAI客户端,配置API密钥和基础URLavailable_tools
: 存储从MCP服务器获取的可用工具列表
3. 查询处理核心方法
async def process_query(self, query):
messages = [{'role':'user', 'content':query}]
response = self.client.chat.completions.create(
model='qwen-turbo',
max_tokens=2024,
tools=self.available_tools,
messages=messages
)
这个方法是整个系统的核心,处理用户查询的完整流程:
3.1 消息处理循环
process_query = True
while process_query:
message = response.choices[0].message
if message.content:
print(message.content)
process_query = False
- 检查AI回复是否包含文本内容
- 如果有文本内容,直接输出并结束处理
3.2 工具调用处理
elif message.tool_calls:
messages.append({
"role": "assistant",
"content": None,
"tool_calls": message.tool_calls
})
for tool_call in message.tool_calls:
tool_id = tool_call.id
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
result = await self.session.call_tool(tool_name, arguments=tool_args)
messages.append({
"role": "tool",
"tool_call_id": tool_id,
"content": result.content
})
工具调用处理流程:
- 将助手的工具调用请求添加到消息历史
- 遍历每个工具调用请求
- 提取工具名称、参数和调用ID
- 通过MCP会话执行实际的工具调用
- 将工具执行结果添加到消息历史
- 继续与AI对话,获取基于工具结果的最终回复
4. 聊天循环
async def chat_loop(self):
print("\nMCP Chatbot Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
await self.process_query(query)
print("\n")
except Exception as e:
print(f"\nError: {str(e)}")
- 提供交互式命令行界面
- 持续接收用户输入
- 调用
process_query
处理每个查询 - 包含异常处理机制
5. 服务器连接和工具初始化
async def connect_to_server_and_run(self):
server_params = StdioServerParameters(
command="uv",
args=["run", "research_server.py"],
env=None,
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
self.session = session
await session.initialize()
response = await session.list_tools()
tools = response.tools
self.available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
await self.chat_loop()
服务器连接流程:
- 配置MCP服务器参数(使用uv运行research_server.py)
- 建立stdio通信连接
- 创建客户端会话并初始化
- 获取服务器提供的工具列表
- 将工具格式转换为OpenAI兼容格式
- 启动聊天循环
6. 程序入口
async def main():
chatbot = MCP_ChatBot()
await chatbot.connect_to_server_and_run()
if __name__ == "__main__":
asyncio.run(main())
- 创建聊天机器人实例
- 启动异步主程序
核心特性
- 异步处理: 全程使用async/await,支持高并发
- 工具集成: 通过MCP协议动态获取和调用外部工具
- 对话连续性: 维护完整的对话历史,支持多轮对话
- 错误处理: 包含异常捕获和错误提示
- 灵活配置: 通过环境变量配置API密钥和服务器地址
示例
uv run mcp_chatbot.py
其他相关链接:
吴恩达MCP课程(1):chat_bot
吴恩达MCP课程(2):research_server