Apache Burr框架:构建可观测有状态数据应用的核心原理与实践

news2026/5/17 5:31:40
1. 项目概述一个用于构建和评估数据产品的Python框架如果你正在处理数据密集型应用比如推荐系统、个性化广告或者任何需要根据用户行为实时调整策略的场景你肯定遇到过这样的困境模型训练和离线评估做得再好一旦上线面对真实、动态、充满噪音的数据流效果往往大打折扣。传统的批处理流水线在这里显得笨重且滞后。今天要聊的apache/burr通常简称为Burr就是为解决这类问题而生的一个开源Python框架。它不是一个机器学习模型库而是一个应用状态管理框架核心目标是帮你更优雅地构建、调试和评估那些有状态的、基于事件的应用程序。简单来说Burr帮你把复杂的、多步骤的、有状态的数据处理逻辑比如一个对话机器人的多轮交互、一个推荐系统的实时排序流水线拆解成一个个清晰、可测试的“步骤”Action并自动管理步骤之间的状态流转。它的价值在于将应用逻辑从杂乱的状态管理代码中解放出来让你能像搭积木一样构建应用并且能轻松地追踪每一次状态变化为后续的分析、评估和调试提供了前所未有的便利。无论你是数据科学家、机器学习工程师还是后端开发者只要你在构建需要维护内部状态并对外部事件做出响应的应用Burr都值得你深入了解。2. 核心设计理念状态机与声明式编程Burr的设计哲学深深植根于有限状态机Finite State Machine, FSM理论和声明式编程思想。理解这一点是掌握Burr的关键。2.1 将应用视为状态机在Burr的视角里任何一个有状态的应用程序都可以被建模为一个状态机。这个状态机包含几个核心要素状态State在任意时间点你的应用所“知道”的一切信息。这可以是一个简单的字典包含用户ID、会话历史、当前查询、模型预测结果、计数器等等。动作Action导致状态发生变化的唯一原因。一个动作是一个纯函数或类方法它读取当前状态和可能的输入执行一些逻辑如调用模型API、查询数据库然后产生一个新的状态和可选的输出。转移Transition由动作触发的从一个状态到另一个状态的转变。Burr的核心职责就是根据你定义的规则决定在给定状态下执行哪个动作并管理状态转移。例如一个简单的聊天机器人状态机可能包含状态{session_id: abc, conversation_history: [], awaiting_response: False}以及动作process_user_input、call_llm_api、format_response。用户输入触发process_user_input动作该动作更新历史并设置awaiting_response为True然后条件触发call_llm_api动作依此类推。2.2 声明式与可观测性与传统命令式编程写一堆if-else和直接修改全局变量不同Burr鼓励你声明式地描述应用逻辑。你定义好动作和它们之间的运行条件“在什么状态下什么动作可以执行”Burr的引擎负责按正确的顺序执行它们。这种方式带来了几个巨大优势逻辑清晰应用流程一目了然不再是面条代码。易于测试每个动作都是独立的、功能单一的单元可以单独进行单元测试。你可以轻松地给定一个输入状态断言动作的输出状态和结果。强大的可观测性由于Burr严格管理所有状态变更它可以自动记录每一次状态转移的完整轨迹。这个轨迹包含了每一步的状态快照、执行了哪个动作、输入输出是什么、耗时多少。这对于调试复杂流程、理解生产环境中的用户行为、以及进行事后的效果评估比如分析推荐系统在哪个环节导致用户流失是黄金般的数据。注意Burr本身不强制规定状态的存储方式内存、Redis、数据库或执行环境本地、服务器、分布式任务队列。它提供接口和工具让你能将这些“轨迹”记录下来并与你现有的监控、实验追踪平台如MLflow、Weights Biases集成。3. 核心概念深度解析与实操入门让我们通过一个具体的例子来拆解Burr的核心概念。假设我们要构建一个简化版的“内容审核助手”用户输入一段文本系统先检查长度然后调用情感分析模型最后根据情感分数决定是直接发布、转人工审核还是拒绝。3.1 定义状态与动作首先我们需要定义应用的初始状态和一系列动作。from burr.core import action, State, ApplicationBuilder from burr.core.persistence import SQLLitePersister import pandas as pd # 假设我们有一个简单的情感分析函数 def analyze_sentiment(text: str) - float: # 这里可以替换为真实的模型调用如调用Transformers库或API # 返回一个介于-1负面到1正面的分数 return 0.5 # 示例值 # 1. 定义动作 (使用装饰器) action(reads[user_input], writes[input_length, is_valid]) def validate_input(state: State, user_input: str) - tuple[State, dict]: 检查用户输入是否有效。 length len(user_input) is_valid 10 length 1000 new_state state.update(input_lengthlength, is_validis_valid) # 返回更新后的状态和一个结果字典可选 return new_state, {message: fInput validated. Length: {length}, Valid: {is_valid}} action(reads[user_input, is_valid], writes[sentiment_score]) def run_sentiment_analysis(state: State) - tuple[State, dict]: 运行情感分析。 if not state.get(is_valid): # 如果输入无效可以跳过此动作或设置默认值 new_state state.update(sentiment_scoreNone) return new_state, {error: Invalid input, skipping analysis} score analyze_sentiment(state[user_input]) new_state state.update(sentiment_scorescore) return new_state, {sentiment_score: score} action(reads[sentiment_score, is_valid], writes[decision]) def make_decision(state: State) - tuple[State, dict]: 根据情感分数做出决定。 if not state.get(is_valid): decision reject else: score state.get(sentiment_score) if score is None: decision pending elif score 0.3: decision approve elif score -0.3: decision human_review else: decision reject new_state state.update(decisiondecision) return new_state, {final_decision: decision}代码解读action装饰器用于声明一个函数是一个Burr动作。reads参数指明这个动作需要读取状态的哪些字段writes参数指明它会写入创建或更新哪些字段。这是一种声明有助于Burr进行优化和可视化。每个动作函数都以state: State作为第一个参数并返回一个元组(new_state, result)。State对象类似于一个字典但不可变。你必须通过state.update()方法来创建新的状态这符合函数式编程的原则避免了副作用。result字典可以包含任何你想在这一步输出的信息比如日志、中间结果或错误信息。3.2 构建应用程序与可视化定义了动作后我们需要用它们来构建一个完整的应用程序并定义运行逻辑即状态转移图。# 2. 使用ApplicationBuilder构建应用 app ( ApplicationBuilder() .with_state( # 设置初始状态 user_input, # 初始为空运行时注入 input_length0, is_validFalse, sentiment_scoreNone, decisionpending ) .with_actions( # 注册所有动作 validatevalidate_input, analyzerun_sentiment_analysis, decidemake_decision, ) .with_transitions( # 定义动作之间的转移关系 # 从initial状态开始执行validate动作 (initial, validate), # validate完成后总是执行analyze (validate, analyze), # analyze完成后总是执行decide (analyze, decide), # decide完成后进入terminal状态应用结束 (decide, terminal), ) .with_entrypoint(initial) # 设置入口点 .build() )现在我们已经定义好了一个简单的线性工作流validate - analyze - decide。Burr的一个强大功能是可以可视化这个流程。# 3. 可视化应用流图 (需要安装graphviz) app.visualize(output_file_pathcontent_moderator_flow.png, include_conditionsFalse, viewTrue)这行代码会生成一张PNG图片清晰地展示出从initial到terminal经过三个动作的完整路径。对于更复杂的、有条件分支的流程例如如果输入无效则直接跳到决定步骤可视化工具能帮你快速理解逻辑。3.3 运行应用与追踪状态构建好应用后我们可以运行它并观察状态的变化。# 4. 运行应用 # 首先我们需要一个“驱动函数”来启动应用并注入初始输入。 def run_moderation(user_text: str): # 从构建好的app创建一个新的运行实例 app_instance app.build() # 初始化状态注入用户输入 initial_state app_instance.initialize(state_kwargs{user_input: user_text}) # 运行应用直到结束到达terminal状态 # action_result包含最后执行的动作名和结果 action_result, final_state, _ app_instance.run( initial_stateinitial_state, halt_before[], # 不在任何动作前停止 halt_after[], # 不在任何动作后停止 ) print(f最终决定: {final_state[decision]}) print(f情感分数: {final_state[sentiment_score]}) return final_state # 测试一下 final_state run_moderation(This product is absolutely amazing! I love it!)但仅仅得到最终结果还不够。Burr的杀手锏在于全程追踪。我们需要配置一个持久化存储来记录每一次状态变化。# 5. 配置持久化追踪以SQLite为例 persister SQLLitePersister(db_path:memory:) # 使用内存数据库也可用文件路径 app_with_tracking ( ApplicationBuilder() .with_state(...) # 同上 .with_actions(...) # 同上 .with_transitions(...) # 同上 .with_entrypoint(initial) .with_identifiers(app_iddemo_app) # 为这次运行设置一个ID .with_tracker(persisterpersister, projectcontent_moderation_demo) .build() ) # 现在运行会自动记录轨迹 app_instance app_with_tracking.build() initial_state app_instance.initialize(state_kwargs{user_input: This is terrible.}) action_result, final_state, _ app_instance.run(initial_stateinitial_state) # 之后可以从持久化器中查询这次运行的完整轨迹 tracker app_instance.tracker if tracker: history tracker.list_app_ids(partition_keydemo_app) print(f记录的应用运行ID: {history}) # 可以加载任意一次运行的历史进行回放或分析4. 高级模式与实战技巧掌握了基础之后我们来看看Burr如何应对更复杂的现实场景。4.1 条件转移与循环现实中的应用很少是简单的线性流程。Burr允许你基于状态动态决定下一步执行哪个动作。from burr.core import condition condition def needs_human_review(state: State) - bool: 判断是否需要人工审核。 return state.get(decision) human_review action(reads[], writes[reviewed_by]) def human_review_action(state: State) - tuple[State, dict]: # 模拟人工审核逻辑 new_state state.update(reviewed_byoperator_123) return new_state, {review_status: completed} app_complex ( ApplicationBuilder() .with_state(...) .with_actions( validatevalidate_input, analyzerun_sentiment_analysis, decidemake_decision, human_reviewhuman_review_action, # 可以定义一个“重新分析”的动作 reanalyzesome_reanalysis_action, ) .with_transitions( (initial, validate), (validate, analyze), (analyze, decide), # 关键条件转移。如果needs_human_review条件为True则从“decide”转到“human_review” (decide, human_review, condition_exprneeds_human_review), # 否则直接结束 (decide, terminal, condition_exprlambda s: not needs_human_review(s)), # 假设人工审核后可能需要重新分析 (human_review, reanalyze, condition_exprsome_condition), (reanalyze, decide), # 形成循环 ) .build() )通过condition_expr参数你可以为转移添加布尔条件。这使得构建带有分支、循环比如多轮对话的复杂工作流变得非常简单和清晰。4.2 与异步任务和外部系统集成Burr动作是同步函数但现实世界充满了异步操作如HTTP API调用、数据库查询。最佳实践是将这些IO密集型操作封装在动作内部并使用异步编程。import asyncio import aiohttp action(reads[user_input], writes[api_response]) async def call_external_api(state: State) - tuple[State, dict]: 异步调用外部情感分析API。 async with aiohttp.ClientSession() as session: async with session.post( https://api.example.com/sentiment, json{text: state[user_input]} ) as resp: result await resp.json() score result.get(score, 0.0) new_state state.update(api_responseresult, sentiment_scorescore) return new_state, result # 运行异步应用需要使用异步的ApplicationRunner from burr.core import ApplicationRunner async def run_async_app(): app_async (...).build() # 构建包含异步动作的应用 runner ApplicationRunner(app_async, initial_state_kwargs{user_input: test}) final_state await runner.arun() return final_state asyncio.run(run_async_app())实操心得对于生产环境建议将Burr应用部署为独立的服务如使用FastAPI包装或者将其动作作为任务提交到分布式队列如Celery、RQ。Burr的State对象可以被序列化Pickle因此你可以轻松地将整个应用的状态暂停、持久化到数据库然后在另一个工作进程中恢复执行。这对于处理长时间运行或需要断点续跑的流程非常有用。4.3 测试与调试策略Burr的架构让测试变得异常简单。单元测试动作直接调用动作函数传入模拟的State对象断言返回的新状态和结果。def test_validate_input(): state State({user_input: Hello}) new_state, result validate_input.run(state, user_inputHello World) assert new_state[input_length] 11 assert new_state[is_valid] is True assert validated in result[message]集成测试应用流使用ApplicationBuilder构建测试应用注入初始状态运行并断言最终状态。调试与复现利用持久化追踪器你可以获取任何一次历史运行的app_id然后使用app.replay_from(application_id, resumeTrue)来精确复现当时的运行过程这对于排查线上bug至关重要。5. 常见问题、性能考量与架构建议在实际项目中应用Burr你会遇到一些典型的选择和挑战。5.1 状态存储与序列化问题状态应该包含什么如何存储内容状态应只包含驱动应用逻辑所必需的最小数据。避免将庞大的中间数据如整个机器学习模型的嵌入向量直接塞进状态。可以存储对这些数据的引用如ID、路径。序列化默认使用Pickle。对于生产环境尤其是分布式环境需要考虑安全性Pickle不安全。考虑使用dill兼容性更好或自定义序列化如转JSON但可能丢失类型信息。大小大状态会影响存储和网络传输。可以使用压缩或外部存储如将大对象存S3状态里只存URL。存储后端Burr提供了SQLLitePersister和PostgreSQLPersister。你也可以实现自己的Persister接口连接到Redis、MongoDB或你公司的内部存储。5.2 错误处理与重试Burr框架本身不提供复杂的错误处理机制。动作中抛出的异常会直接向上传播导致应用运行中断。策略在动作内部实现健壮的错误处理。例如调用外部API时使用重试机制如tenacity库。状态回滚Burr没有内置的事务回滚。如果一个动作失败当前状态就是失败前的状态。你可以设计一个“补偿动作”或利用追踪日志手动修复状态。超时控制对于可能长时间运行的动作务必设置超时避免整个应用卡死。5.3 性能与扩展性单个应用实例Burr应用本身是单线程同步执行的除非你在动作内自己开线程/异步。对于高并发请求你需要水平扩展部署多个应用实例并通过负载均衡器分发请求。由于状态通常与单个用户/会话绑定这很直接。分布式状态如果应用状态非常大或需要在多个实例间共享则需要使用外部共享存储如Redis作为状态后端。你需要实现一个自定义的State存储层。追踪开销持久化每一次状态变更会有IO开销。在生产环境中可以考虑抽样记录只记录一部分请求的完整轨迹。使用更快的持久化后端如Redis。将追踪日志异步写入例如先写入内存队列再由后台线程批量落盘。5.4 与现有MLOps生态集成Burr不是来取代你的MLOps工具链的而是来增强它的。实验追踪将Burr的tracker与MLflow、WB集成。你可以把每次运行的轨迹包含所有输入、输出、状态作为一个MLflow Run来记录方便比较不同算法或参数下的应用行为。特征存储动作可以从Tecton、Feast等特征存储中实时读取特征。模型服务动作可以调用部署在Seldon Core、Triton或简单HTTP端点上的模型。架构建议将Burr视为编排层Orchestration Layer。它位于你的业务逻辑/模型服务之上负责协调工作流、管理状态和提供可观测性。下层是具体的服务模型推理、数据库、API上层是用户界面或触发器。这样的分层清晰职责分明。从我个人的使用经验来看Burr最大的价值在于它迫使你以一种结构化、可测试、可观测的方式来思考有状态应用。初期学习曲线存在但一旦适应开发调试效率会显著提升尤其是在处理复杂、多阶段的AI产品逻辑时。它可能不是所有场景的银弹但对于那些状态复杂、逻辑分支多、且需要对决策过程进行深度分析和审计的应用Burr无疑是一个强大的工具。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2620567.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…