ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统

news2025/6/2 6:38:08

ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐


📚 目录

  • ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐
    • 一、背景🚀
    • 二、系统整体架构 🏗️
    • 三、实战展示 🛠️:交易行为告警系统
      • 3.1 ABP 采集交易事件 📝
        • CAP + Outbox 配置示例 💼
      • 3.2 Flink CEP 模式与 Exactly-Once ⚡
      • 3.3 Redis Stream + SignalR 实时推送 🔔
    • 四、生产级部署和监控 📈
    • 五、自动化测试 🧪


一、背景🚀

在金融 💰、电商 🛒、IoT 🌐 等高频交互系统中,越来越多的场景需要“实时发现问题并响应”。


二、系统整体架构 🏗️

Publish Event
消费 Transaction
写入警报
推送警报
读取警报
实时推送
ABP VNext API
Kafka: transactions
Flink CEP Job
PostgreSQL Sink
Redis Stream
RiskAlertWorker
SignalR Hub

💡 图示展示了各组件之间的数据流向,实现消息解耦和高可用。


三、实战展示 🛠️:交易行为告警系统

3.1 ABP 采集交易事件 📝

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Distributed;

public class TransactionCreatedDomainEvent : DomainEvent
{
    public Guid UserId { get; set; }
    public decimal Amount { get; set; }
    public string Location { get; set; }
}

public class TransactionCreatedHandler : IDistributedEventHandler<TransactionCreatedDomainEvent>
{
    private readonly IDistributedEventBus _eventBus;
    private readonly ILogger<TransactionCreatedHandler> _logger;

    public TransactionCreatedHandler(IDistributedEventBus eventBus,
                                     ILogger<TransactionCreatedHandler> logger)
    {
        _eventBus = eventBus;
        _logger = logger;
    }

    public async Task HandleEventAsync(TransactionCreatedDomainEvent eventData)
    {
        var eto = new TransactionCreatedEto
        {
            UserId = eventData.UserId,
            Amount = eventData.Amount,
            Location = eventData.Location,
            OccurredAt = Clock.Now
        };
        try
        {
            await _eventBus.PublishAsync(eto);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "发布交易事件失败:{UserId}", eventData.UserId);
            throw;
        }
    }
}
CAP + Outbox 配置示例 💼
// appsettings.json
"Cap": {
  "UseEntityFramework": true,
  "UseDashboard": true,
  "Producer": {
    "Kafka": { "Servers": "localhost:9092" }
  },
  "Outbox": { "TableName": "CapOutboxMessages" }
}

3.2 Flink CEP 模式与 Exactly-Once ⚡

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.eventtime._
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import java.time.Duration

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.setStateBackend(new RocksDBStateBackend("file:///flink-checkpoints"))
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
  ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

val watermarkStrategy = WatermarkStrategy
  .forBoundedOutOfOrderness[Transaction](Duration.ofSeconds(5))
  .withTimestampAssigner((event, _) => event.timestamp.toEpochMilli)

val stream = env
  .addSource(new FlinkKafkaConsumer[Transaction]("transactions", deserializer, props))
  .assignTimestampsAndWatermarks(watermarkStrategy)

val pattern = Pattern.begin[Transaction]("first")
  .where(_.amount > 10000)
  .next("second")
  .where(new IterativeCondition[Transaction] {
    override def filter(event: Transaction, ctx: IterativeCondition.Context[Transaction]) = {
      val first = ctx.getEventsForPattern("first").iterator().next()
      event.location != first.location
    }
  })
  .within(Time.minutes(5))

pattern.handleTimeout(new PatternTimeoutFunction[Transaction, Unit] {
  override def timeout(map: java.util.Map[String, java.util.List[Transaction]], timestamp: Long, out: Collector[Unit]): Unit = {
    // 超时清理逻辑
  }
}, Time.minutes(5))
ABP API Kafka Flink Redis Worker SignalR 发布交易事件 消费并处理流 推送警报 StreamReadGroup SignalR 推送 ABP API Kafka Flink Redis Worker SignalR

💡 建议全链路使用 Schema Registry 管理消息格式,防止兼容性问题。


3.3 Redis Stream + SignalR 实时推送 🔔

using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;

public class RiskAlertWorker : BackgroundService
{
    private readonly IConnectionMultiplexer _redis;
    private readonly IHubContext<RiskAlertHub> _hubContext;
    private readonly ILogger<RiskAlertWorker> _logger;

    public RiskAlertWorker(IConnectionMultiplexer redis,
                           IHubContext<RiskAlertHub> hubContext,
                           ILogger<RiskAlertWorker> logger)
    {
        _redis = redis;
        _hubContext = hubContext;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var db = _redis.GetDatabase();
        try { await db.StreamCreateConsumerGroupAsync("risk-alerts", "alert-group", "$", true); }
        catch { /* 忽略 BUSYGROUP */ }

        int backoff = 1000;
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var entries = await db.StreamReadGroupAsync(
                    "risk-alerts", "alert-group", "consumer-1",
                    count: 10, flags: CommandFlags.Block(5000));

                foreach (var entry in entries)
                {
                    var alert = JsonSerializer.Deserialize<RiskEventDto>(entry["data"]!);
                    await _hubContext.Clients.Group(alert.UserId.ToString())
                               .SendAsync("ReceiveAlert", alert, stoppingToken);
                    await db.StreamAcknowledgeAsync("risk-alerts", "alert-group", entry.Id);
                }
                backoff = 1000;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "处理 Redis 告警失败");
                await Task.Delay(backoff, stoppingToken);
                backoff = Math.Min(backoff * 2, 16000);
            }
        }
    }
}

[Authorize]
public class RiskAlertHub : Hub { }

四、生产级部署和监控 📈

组件推荐配置
ABP 后端Pod 存活/就绪探针 ✅ + HTTPS 🔒 + Serilog→Elasticsearch Sink 📝 + CAP Outbox
Kafkaenable.idempotence=true 🔁, acks=all ✅, TLS/SASL 🔐
FlinkRocksDBStateBackend ⚙️ + EXACTLY_ONCE ⚡ + State TTL 🕒 + HA 🌟
RedisRedis Cluster 🔄 + AOF 📝 + ACL 🔑 + 阻塞消费 ⏳
PostgreSQL主从流复制 🛠️ + WAL 日志 📜 + TimescaleDB 插件 📊
SignalRAzure SignalR ☁️ / Redis Backplane 🔄 + JWT 鉴权 🔏
# Flink YAML 示例
state.backend: rocksdb
checkpointing:
  interval: 10s
  mode: EXACTLY_ONCE
  externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# Flink Prometheus Reporter
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250

📊 在 Grafana 中可视化:Kafka TPS、Flink 延迟分位、Redis 消费速率、ABP 请求成功率/错误率。


五、自动化测试 🧪

// Testcontainers 启动依赖
var kafka = new KafkaContainer().StartAsync().GetAwaiter().GetResult();
var redis = new RedisContainer().StartAsync().GetAwaiter().GetResult();
var postgres = new PostgreSqlContainer().StartAsync().GetAwaiter().GetResult();

// 注入到 ABP 测试模块
context.Services.Configure<CapOptions>(opts => {
    opts.ProducerConnectionString = kafka.GetBootstrapAddress();
    opts.OutboxTableName = "CapOutboxMessages";
});

// Flink MiniCluster
var flinkCluster = new MiniClusterWithClientResource(
    new MiniClusterResourceConfiguration.Builder().Build());
flinkCluster.Start();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2393291.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能与机器学习从理论、技术与实践的多维对比

人工智能(Artificial Intelligence, AI)提出“让机器像人类一样思考”的目标,其核心理论围绕符号系统假设展开——认为智能行为可通过逻辑符号系统(如谓词逻辑、产生式规则)建模。 机器学习(Machine Learning, ML)是人工智能的子集,聚焦于通过数据自动改进算法性能的理…

什么是 WPF 技术?什么是 WPF 样式?下载、安装、配置、基本语法简介教程

什么是 WPF 技术&#xff1f;什么是 WPF 样式&#xff1f;下载、安装、配置、基本语法简介教程 摘要 WPF教程、WPF开发、.NET 8 WPF、Visual Studio 2022 WPF、WPF下载、WPF安装、WPF配置、WPF样式、WPF样式详解、XAML语法、XAML基础、MVVM架构、数据绑定、依赖属性、资源字典…

流程自动化引擎:让业务自己奔跑

在当今竞争激烈的商业环境中&#xff0c;企业面临着快速变化的市场需求、日益复杂的业务流程以及不断增长的运营成本。如何优化业务流程、提升效率并降低成本&#xff0c;成为企业持续发展的关键问题。 流程自动化引擎&#xff08;Process Automation Engine&#xff09;作为一…

AI炼丹日志-23 - MCP 自动操作 自动进行联网检索 扩展MCP能力

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇&#xff1a; MyBatis 更新完毕目前开始更新 Spring&#xff0c;一起深入浅出&#xff01; 大数据篇 300&#xff1a; Hadoop&…

用 Python 模拟雪花飘落效果

用 Python 模拟雪花飘落效果 雪花轻轻飘落&#xff0c;给冬日带来一份浪漫与宁静。本文将带你用一份简单的 Python 脚本&#xff0c;手把手实现「雪花飘落效果」动画。文章深入浅出&#xff0c;零基础也能快速上手&#xff0c;完整代码仅需一个脚本文件即可运行。 目录 前言…

基于定制开发开源AI智能名片S2B2C商城小程序的大零售渗透策略研究

摘要&#xff1a;本文聚焦“一切皆零售”理念下的大零售渗透趋势&#xff0c;提出以定制开发开源AI智能名片S2B2C商城小程序为核心工具的渗透策略。通过分析该小程序在需求感应、场景融合、数据驱动等方面的技术优势&#xff0c;结合零售渗透率提升的关键路径&#xff0c;揭示其…

XPlifeapp:高效打印,便捷生活

在数字化时代&#xff0c;虽然电子设备的使用越来越普遍&#xff0c;但打印的需求依然存在。无论是学生需要打印课表、资料&#xff0c;还是职场人士需要打印名片、报告&#xff0c;一个高效便捷的打印软件都能大大提高工作效率。XPlifeapp就是这样一款超级好用的手机打印软件&…

等保测评-Mysql数据库测评篇

Mysql数据库测评 0x01 前言 "没有网络安全、就没有国家安全" 等保测评是什么&#xff1f; 等保测评&#xff08;网络安全等级保护测评&#xff09;是根据中国《网络安全法》及相关标准&#xff0c;对信息系统安全防护能力进行检测评估的法定流程。其核心依据《信…

02.K8S核心概念

服务的分类 有状态服务&#xff1a;会对本地环境产生依赖&#xff0c;例如需要把数据存储到本地磁盘&#xff0c;如mysql、redis&#xff1b; 无状态服务&#xff1a;不会对本地环境产生任何依赖&#xff0c;例如不会存储数据到本地磁盘&#xff0c;如nginx、apache&#xff…

一篇文章玩转CAP原理

CAP 原理是分布式系统设计的核心理论之一&#xff0c;揭示了系统设计中的 根本性权衡。 一、CAP 的定义 CAP 由三个核心属性组成&#xff0c;任何分布式系统最多只能同时满足其中两个&#xff1a; 一致性&#xff08;Consistency&#xff09; 所有节点在同一时刻看到的数据完全…

Vue-收集表单信息

收集表单信息 Input label for 和 input id 关联, 点击账号标签 也能聚焦 input 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><title>表单数据</title><!-- 引入Vue --><scrip…

vscode开发stm32,main.c文件中出现很多报错影响开发解决日志

本质上为 .vscode/c_cpp_properties.json文件和Makefile文件中冲突&#xff0c;两者没有同步。 将makefile文件中的内容同步过来即可&#xff0c;下面给出一个json文件的模板&#xff0c;每个人的情况不同&#xff0c;针对性修改即可 {"configurations": [{"na…

嵌入式鸿蒙系统中水平和垂直以及图片调用方法

利用openharmony操作的具体现象: 第一:Column 作用:沿垂直方向布局的容器。 第二:常用接口 Column(value?: {space?: string | number}) 参数: 参数名参数类型必填参数描述spacestring | number否纵向布局元素垂直方向间距。 从API version 9开始,space为负数或者ju…

【海康USB相机被HALCON助手连接过后,MVS显示无法连接故障。】

在Halcon里使用助手调用海康USB相机时&#xff0c;如果这个界面点击了【是】 那么恭喜你&#xff0c;相机只能被HALCON调用使用&#xff0c;使用MVS或者海康开发库&#xff0c;将查找不到相机 解决方式&#xff1a; 右键桌面【此电脑】图标 ->选择【管理】 ->选择【设备…

2025年电气工程与轨道交通国际会议:绿色能源与智能交通的创新之路

2025年电气工程与轨道交通国际会议&#xff08;ICEERT 2025&#xff09;是一场电气工程与轨道交通领域的国际盛会&#xff0c;将于2025年在武汉隆重召开。此次会议汇聚了全球顶尖的专家学者和行业精英&#xff0c;共同探讨电气工程与轨道交通的最新研究成果和技术趋势。会议将围…

WPF log4net用法

WPF log4net用法 一、在工程中管理NuGet程序包&#xff0c;找到log4net&#xff0c;点击安装&#xff0c;如下图已成功安装&#xff1b; 二、在工程中右键添加新建项&#xff0c;选择应用程序配置文件&#xff08;后缀为.config&#xff09;,然后设置名称&#xff0c;这里设置…

数字孪生数据监控如何提升汽车零部件工厂产品质量

一、汽车零部件工厂的质量挑战 汽车零部件作为汽车制造的基础&#xff0c;其质量直接关系到整车的性能、可靠性和安全性。在传统的汽车零部件生产过程中&#xff0c;质量问题往往难以在早期阶段被发现和解决&#xff0c;导致生产效率低下、生产成本上升&#xff0c;甚至影响到…

贪心算法实战3

文章目录 前言区间问题跳跃游戏跳跃游戏II用最少数量的箭引爆气球无重叠区间划分字母区间合并区间 最大子序和加油站监控二叉树 前言 今天继续带大家进行贪心算法的实战篇3&#xff0c;本章注意来解答一些运用贪心算法的比较难的问题&#xff0c;大家好好体会&#xff0c;怎么…

实测,大模型谁更懂数据可视化?

大家好&#xff0c;我是 Ai 学习的老章 看论文时&#xff0c;经常看到漂亮的图表&#xff0c;很多不知道是用什么工具绘制的&#xff0c;或者很想复刻类似图表。 实测&#xff0c;大模型 LaTeX 公式识别&#xff0c;出乎预料 前文&#xff0c;我用 Kimi、Qwen-3-235B-A22B、…

Linux入门(十一)进程管理

Linux 中每个执行的程序都称为一个进程&#xff0c;每个进程都分配一个ID号&#xff08;PID&#xff09; 每个进程都可能以两种方式存在&#xff0c;前台&#xff08;屏幕上可以操作的&#xff09;和后台&#xff08;屏幕上无法看到的&#xff09;&#xff0c;一般系统的服务都…