前言
A2A(Agent2Agent)是 Google 推出的一项新协议,旨在解决多智能体(Multi-Agent)系统中跨平台、跨组织协作的难题。它为 AI 代理之间的通信、协作和任务分工提供了一个统一的标准,可以类比为网页世界的 HTTP 协议——即 AI 代理之间的“通用语言”
项目地址:https://github.com/google/A2A/tree/main/samples/python
一、项目架构介绍
1. 项目概述
基于 A2A (Agent-to-Agent) 协议的示例项目,展示了如何使用不同的 AI 框架来实现智能代理之间的通信和协作。
2. 项目结构
samples/python/
├── agents/ # 各种AI代理实现
│ ├── ag2/ # AG2框架实现的代理
│ ├── crewai/ # CrewAI框架实现的代理
│ ├── langgraph/ # LangGraph框架实现的代理
│ ├── google_adk/ # Google ADK框架实现的代理
│ ├── llama_index_file_chat/ # LlamaIndex实现的文件聊天代理
│ ├── marvin/ # Marvin框架实现的代理
│ ├── mindsdb/ # MindsDB实现的代理
│ └── semantickernel/ # Semantic Kernel实现的代理
├── hosts/ # 客户端实现
├── common/ # 公共代码
└── .vscode/ # VS Code配置
3. 技术特点
-
多框架支持:
- 支持多种流行的AI框架(LangGraph、CrewAI、AG2等)
- 每个框架都有独立的代理实现
-
标准化通信:
- 使用A2A协议进行代理间通信
- 基于HTTP的通信机制
- 统一的请求/响应格式
-
客户端实现:
- 提供CLI命令行客户端
- 支持与多个代理的交互
- 包含任务编排功能
4. 运行环境要求
- Python 3.13 或更高版本
- UV 包管理器
- 各框架所需的API密钥(如Google API Key等)
5. 使用流程
-
启动代理服务器:
cd samples/python/agents/[agent_name] uv run .
-
启动客户端:
cd samples/python/hosts/cli uv run .
6. 主要功能模块
6.1 代理实现(agents/)
- AG2代理:基于AG2框架的代理实现
- CrewAI代理:使用CrewAI框架的代理
- LangGraph代理:基于LangGraph的代理
- Google ADK代理:使用Google Agent Development Kit的代理
- LlamaIndex代理:文件聊天功能
- Marvin代理:基于Marvin框架的代理
- MindsDB代理:数据库交互代理
- Semantic Kernel代理:基于Microsoft Semantic Kernel的代理
6.2 客户端实现(hosts/)
- CLI命令行界面
- 支持多代理交互
- 任务编排功能
6.3 公共模块(common/)
- A2A协议实现
- 共享工具类
- 通用接口定义
7. 项目特点
-
模块化设计:
- 每个代理都是独立的模块
- 可以单独运行和测试
-
标准化接口:
- 统一的A2A协议
- 一致的通信格式
-
可扩展性:
- 易于添加新的代理实现
- 支持不同的AI框架
-
示例性质:
- 主要用于演示A2A功能
- 非生产级代码
二、场景介绍
1. 核心场景代理实现
1.1 LangGraph 货币转换代理
- 功能:提供货币汇率转换服务
- 技术特点:
- 使用 LangGraph 框架
- 集成 Google Gemini 模型
- 支持多轮对话
- 实时流式响应
- 使用 Frankfurter API 获取实时汇率
- 通信流程:
1.2 CrewAI 图像生成代理
- 功能:基于文本描述生成图像
- 技术特点:
- 使用 CrewAI 框架
- 集成 Google Gemini API
- 支持图像修改
- 缓存系统
- 通信流程:
2. 通信协议(A2A)
2.1 请求格式
{
"jsonrpc": "2.0",
"id": "unique_id",
"method": "tasks/send",
"params": {
"id": "task_id",
"sessionId": "session_id",
"acceptedOutputModes": ["text"],
"message": {
"role": "user",
"parts": [{
"type": "text",
"text": "query"
}]
}
}
}
2.2 响应格式
{
"jsonrpc": "2.0",
"id": "unique_id",
"result": {
"id": "task_id",
"status": {
"state": "completed",
"timestamp": "timestamp"
},
"artifacts": [{
"parts": [{
"type": "text",
"text": "response"
}],
"index": 0
}]
}
}
三、Client代码实现
- 核心类:A2AClient
a) 初始化
def __init__(self, agent_card: AgentCard = None, url: str = None, timeout: TimeoutTypes = 60.0):
# 支持两种初始化方式:
# 1. 通过 AgentCard 初始化
# 2. 直接通过 URL 初始化
# 默认超时时间 60 秒
b) 主要方法
# 1. 发送任务
async def send_task(self, payload: dict[str, Any]) -> SendTaskResponse:
# 同步任务发送
# 返回任务响应
# 2. 流式任务
async def send_task_streaming(self, payload: dict[str, Any]) -> AsyncIterable[SendTaskStreamingResponse]:
# 支持 Server-Sent Events (SSE)
# 返回流式响应
# 3. 获取任务
async def get_task(self, payload: dict[str, Any]) -> GetTaskResponse:
# 获取任务状态和结果
# 4. 取消任务
async def cancel_task(self, payload: dict[str, Any]) -> CancelTaskResponse:
# 取消正在执行的任务
# 5. 设置回调
async def set_task_callback(self, payload: dict[str, Any]) -> SetTaskPushNotificationResponse:
# 设置任务完成后的回调通知
# 6. 获取回调
async def get_task_callback(self, payload: dict[str, Any]) -> GetTaskPushNotificationResponse:
# 获取当前任务的回调配置
- 辅助类:A2ACardResolver
class A2ACardResolver:
def __init__(self, base_url, agent_card_path='/.well-known/agent.json'):
# 初始化解析器
# 默认从 /.well-known/agent.json 获取代理卡片
def get_agent_card(self) -> AgentCard:
# 获取并解析代理卡片
# 返回 AgentCard 对象
- 使用示例
# 1. 基本使用
client = A2AClient(url="http://agent-server")
response = await client.send_task({
"message": "生成一张图片",
"sessionId": "session-123"
})
# 2. 流式响应
async for event in client.send_task_streaming({
"message": "生成一张图片",
"sessionId": "session-123"
}):
print(event)
# 3. 获取代理卡片
resolver = A2ACardResolver("http://agent-server")
card = resolver.get_agent_card()
客户端实现提供了:
- 完整的 A2A 协议支持
- 异步操作支持
- 类型安全
- 错误处理
- 流式响应
- 代理发现
四、Server实现
- 核心类:A2AServer
a) 初始化
def __init__(self, host='0.0.0.0', port=5000, endpoint='/', agent_card: AgentCard = None, task_manager: TaskManager = None):
# 配置服务器参数
# 初始化路由
# 设置代理卡片和任务管理器
b) 主要功能
# 1. 启动服务器
def start(self):
# 验证必要组件
# 启动 uvicorn 服务器
# 2. 处理请求
async def _process_request(self, request: Request):
# 解析请求
# 路由到对应的处理方法
# 返回响应
# 3. 获取代理卡片
def _get_agent_card(self, request: Request) -> JSONResponse:
# 返回代理卡片信息
- 任务管理器:TaskManager
a) 抽象基类
class TaskManager(ABC):
# 定义任务管理接口
@abstractmethod
async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse
@abstractmethod
async def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse
@abstractmethod
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse
# ... 其他抽象方法
b) 内存实现
class InMemoryTaskManager(TaskManager):
def __init__(self):
# 初始化存储
self.tasks = {} # 任务存储
self.push_notification_infos = {} # 推送通知配置
self.task_sse_subscribers = {} # SSE 订阅者
- 关键功能实现
a) 任务管理
# 1. 获取任务
async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse:
# 获取任务状态和结果
# 2. 取消任务
async def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:
# 取消正在执行的任务
# 3. 更新任务
async def update_store(self, task_id: str, status: TaskStatus, artifacts: list[Artifact]) -> Task:
# 更新任务状态和结果
b) 推送通知
# 1. 设置通知
async def set_push_notification_info(self, task_id: str, notification_config: PushNotificationConfig):
# 配置任务完成通知
# 2. 获取通知
async def get_push_notification_info(self, task_id: str) -> PushNotificationConfig:
# 获取任务通知配置
c) SSE 支持
# 1. 设置 SSE 消费者
async def setup_sse_consumer(self, task_id: str, is_resubscribe: bool = False):
# 创建 SSE 事件队列
# 2. 事件入队
async def enqueue_events_for_sse(self, task_id, task_update_event):
# 将事件发送给订阅者
# 3. 事件出队
async def dequeue_events_for_sse(self, request_id, task_id, sse_event_queue):
# 从队列获取事件并发送
- 错误处理
def _handle_exception(self, e: Exception) -> JSONResponse:
# 处理不同类型的错误
if isinstance(e, json.decoder.JSONDecodeError):
json_rpc_error = JSONParseError()
elif isinstance(e, ValidationError):
json_rpc_error = InvalidRequestError()
else:
json_rpc_error = InternalError()
这个服务器实现提供了:
- 完整的 A2A 协议支持
- 异步操作支持
- 任务管理
- 推送通知
- 流式响应
- 错误处理
一个功能完整的 A2A 服务器实现,可以作为其他语言实现的参考。
五、LangGraph 货币转换Agent
/samples/python/agents/langgraph
- 整体架构
- 采用三层架构:
__main__.py
: 服务器入口点agent.py
: 核心代理实现task_manager.py
: 任务管理实现
- 核心组件详解
a) 入口点 (__main__.py
)
- 配置服务器参数(host, port)
- 设置代理能力(streaming, pushNotifications)
- 定义代理技能(convert_currency)
- 创建代理卡片(AgentCard)
- 初始化服务器和任务管理器
b) 代理实现 (agent.py
)
- 使用 LangGraph 框架实现 ReAct 模式
- 核心组件:
1. 工具函数:get_exchange_rate
- 调用 Frankfurter API 获取实时汇率
- 支持指定日期查询
- 错误处理和响应验证
2. 响应格式:ResponseFormat
- status: input_required/completed/error
- message: 响应消息
3. CurrencyAgent 类
- 使用 Google Gemini 2.0 Flash 模型
- 支持同步调用(invoke)和流式响应(stream)
- 状态管理和会话追踪
c) 任务管理器 (task_manager.py
)
- 继承自 InMemoryTaskManager
- 主要功能:
1. 任务验证
- 检查输出模式兼容性
- 验证推送通知配置
2. 任务处理
- 同步任务处理(on_send_task)
- 流式任务处理(on_send_task_subscribe)
- 任务状态更新和通知
3. 推送通知
- 支持任务状态变更通知
- 验证通知 URL 所有权
- 关键流程
a) 同步请求流程
1. 客户端发送请求
2. 任务管理器验证请求
3. 代理处理请求
4. 更新任务状态
5. 发送响应
b) 流式请求流程
1. 客户端订阅任务
2. 创建 SSE 事件队列
3. 异步处理代理响应
4. 实时更新任务状态
5. 发送流式事件
六、CrewAI 图像生成Agent
- 整体架构
- 采用三层架构:
__main__.py
: 服务器入口点agent.py
: 核心代理实现task_manager.py
: 任务管理实现
- 核心组件详解
a) 入口点 (__main__.py
)
- 配置服务器参数(host, port)
- 设置代理能力(streaming=False)
- 定义代理技能(image_generator)
- 创建代理卡片(AgentCard)
- 初始化服务器和任务管理器
b) 代理实现 (agent.py
)
- 使用 CrewAI 框架实现图像生成
- 核心组件:
1. 数据模型:Imagedata
- id: 图像唯一标识
- name: 图像名称
- mime_type: MIME类型
- bytes: Base64编码的图像数据
- error: 错误信息
2. 工具函数:generate_image_tool
- 使用 Google Gemini API 生成图像
- 支持图像修改
- 缓存管理
- 错误处理
3. ImageGenerationAgent 类
- 支持文本和图像输入
- 使用 CrewAI 框架
- 图像生成和修改功能
- 会话状态管理
c) 任务管理器 (task_manager.py
)
- 继承自 InMemoryTaskManager
- 主要功能:
1. 任务处理
- 验证输出模式
- 处理任务请求
- 管理任务状态
2. 图像处理
- 获取图像数据
- 处理图像响应
- 错误处理
- 关键流程
a) 图像生成流程
1. 接收用户提示
2. 创建 CrewAI 任务
3. 调用 Gemini API
4. 处理生成的图像
5. 返回图像数据
b) 图像修改流程
1. 接收修改请求
2. 获取参考图像
3. 生成新图像
4. 更新缓存
5. 返回结果