基于用户行为的动态标签与SOP触发引擎
一、问题背景技术背景说明教育私域运营中用户从加好友到正价课成交通常经历多个阶段兴趣期→咨询期→试听期→犹豫期→成交期。每个阶段需要不同的运营策略。例如咨询3次未购买 → 标记“高意向-未转化”并推送限时优惠试听完成3天未购课 → 触发“限时优惠往期学员案例”组合消息30天未登录 → 发送“专属学习报告续费礼包”激活企微官方限制官方API仅提供基础的客户标签读写能力无法实现实时行为捕获无法获取用户在微信端的“微行为”如阅读公众号文章、点击朋友圈链接动态规则计算不支持基于多条件组合如“咨询次数3且最近咨询24h且未付费”的自动标签变更SOP任务编排缺乏内置的定时触发、条件分支等自动化工作流引擎为什么需要技术手段需要构建独立的“用户行为分析引擎”通过API对接第三方协议层获取全量行为数据利用规则引擎Drools/EasyRules或脚本实现实时计算并通过消息队列驱动SOP任务执行。二、技术方案架构设计行为数据源iPad协议/小程序/公众号 →Kafka行为流→Flink/Spark实时计算→规则引擎→标签更新→SOP触发→企微触达技术选型消息队列Kafka高吞吐、持久化实时计算Apache FlinkCEP复杂事件处理规则引擎Groovy动态脚本热加载、灵活配置数据存储Redis实时状态 MySQL历史记录方案对比方案实时性规则灵活性开发复杂度适用场景定时SQL扫描小时级低低离线分析非实时运营Python脚本轮询分钟级中中中小机构规则简单FlinkGroovy秒级高高大型机构复杂动态规则三、实现步骤步骤1行为数据采集通过企销宝iPad协议API订阅用户行为事件python# behavior_subscriber.py - 订阅用户行为 import requests import json import pika import time QXB_API http://your-qxb-server:8080/api APP_KEY your_app_key # 获取最近的行为事件实际场景可使用长轮询或WebSocket def poll_behaviors(): url f{QXB_API}/behavior/pull params { app_key: APP_KEY, last_seq: get_last_seq(), # 从Redis获取上次处理的位置 limit: 100 } resp requests.get(url, paramsparams) events resp.json().get(events, []) for event in events: # 发送到Kafka send_to_kafka(user_behaviors, json.dumps(event)) update_last_seq(events[-1][seq] if events else get_last_seq()) def send_to_kafka(topic, message): # Kafka生产者代码略 pass步骤2Flink实时计算任务java// BehaviorRuleJob.java - Flink实时规则计算 import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; public class BehaviorRuleJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 从Kafka消费行为数据 FlinkKafkaConsumerString consumer new FlinkKafkaConsumer( user_behaviors, new SimpleStringSchema(), getKafkaProps() ); DataStreamString stream env.addSource(consumer); // 定义规则连续3次咨询未购买 PatternString, ? pattern Pattern .Stringbegin(first) .where(new SimpleConditionString() { Override public boolean filter(String event) throws Exception { JSONObject obj new JSONObject(event); return consult.equals(obj.getString(type)); } }) .times(3) .followedBy(no_purchase) .where(new SimpleConditionString() { Override public boolean filter(String event) throws Exception { JSONObject obj new JSONObject(event); return no_purchase.equals(obj.getString(user_status)); } }); // 应用规则 PatternStreamString patternStream CEP.pattern(stream, pattern); // 匹配结果处理 patternStream.select(new PatternSelectFunctionString, String() { Override public String select(MapString, ListString pattern) throws Exception { // 触发规则发送到SOP执行队列 String userId extractUserId(pattern); sendToSopQueue(high_intent_no_purchase, userId); return matched; } }); env.execute(Behavior Rule Engine); } }步骤3SOP任务执行器python# sop_executor.py - SOP任务执行 import redis import json import time import requests from threading import Thread r redis.Redis(hostlocalhost, port6379, decode_responsesTrue) # SOP任务配置 SOP_TASKS { high_intent_no_purchase: { name: 高意向未转化-限时优惠, delay: 300, # 触发后延迟5分钟发送 content: 注意到您最近多次咨询{course}课程特别为您准备了一张限时优惠券..., tags: [高意向-待转化] }, trial_complete_3d_no_buy: { name: 试听完成3天未购-案例推送, delay: 259200, # 3天 content: 您试听的课程已经过去3天了很多学员反馈通过这门课..., tags: [试听-需跟进] } } def sop_worker(): SOP任务队列消费者 while True: # 从Redis阻塞获取任务 task_data r.blpop(sop_queue, timeout0) if not task_data: continue task json.loads(task_data[1]) rule_id task[rule_id] user_id task[user_id] params task.get(params, {}) sop_config SOP_TASKS[rule_id] # 延迟执行 time.sleep(sop_config.get(delay, 0)) # 1. 打标签 tag_user(user_id, sop_config[tags]) # 2. 发送消息 content sop_config[content].format(**params) send_msg(user_id, content) # 3. 记录执行日志 log_execution(rule_id, user_id) def send_msg(user_id, content): 通过企销宝API发送消息 requests.post(f{QXB_API}/message/send_text, json{ app_key: APP_KEY, wxid: assistant_001, to_wxid: user_id, content: content }) # 启动多个worker线程 for i in range(5): t Thread(targetsop_worker) t.daemon True t.start()四、最佳实践规则管理动态热加载将规则存储在数据库通过Groovy脚本动态编译支持运营人员在线调整规则而无需重启服务规则版本控制每条规则记录版本号执行时记录使用的规则版本便于效果回溯性能优化状态存储使用Redis存储用户实时状态如咨询次数、最后互动时间避免每次计算都扫描历史数据窗口计算对于“最近24小时”等时间窗口条件使用Flink的滑动窗口功能避免全量扫描踩坑经验规则冲突多条规则可能同时匹配同一用户需定义优先级策略如只执行优先级最高的或按顺序执行死循环风险避免规则触发后产生的行为又触发了同一条规则如发送消息→用户回复→再次触发需在规则条件中排除机器人发送的消息五、工具推荐企销宝在本方案中作为行为数据源和执行终端数据采集提供完整的用户行为API包括消息收发、朋友圈互动、群事件等为规则引擎提供丰富输入多账号并发支持数百个账号同时在线满足大规模私域运营的数据采集需求执行终端规则触发后通过其API完成打标签、发消息等操作形成闭环
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2410419.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!