RocketMQ Streams 1.1.0: 轻量级流处理再出发
本文作者倪泽Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer01 背景RocketMQ Streams是一款基于RocketMQ为基础的轻量级流计算引擎具有资源消耗少、部署简单、功能全面的特点目前已经在社区开源。RocketMQ Streams在阿里云内部被使用在对资源比较敏感同时又强烈需要流计算的场景比如在自建机房的云安全场景下。自RocketMQ Streams开源以来吸引了大量用户调研和试用。但是也存在一些问题在RocketMQ Streams 1.1.0中主要针对以下问题做出了改进和优化。1、面向用户API不够友好不能使用泛型不支持自定义序列化/反序列化2、代码冗余在RocketMQ Streams中存在将流处理拓扑序列化反序列化模块RocketMQ Streams作为轻量级流处理SDK构建好流处理节点之后应该可以直接处理数据不存在将流处理拓扑图本地保存或者网络传输需求。3、流处理过程不容易理解含有大量缓存、刷新逻辑4、存在大量支持SQL的代码这部分和SDK方式运行流处理任务的逻辑无关在RocketMQ Streams 1.1.0中对上述问题做出了改进期望能带来更好的使用体验。同时重新设计了流处理拓扑构建过程、去掉冗余代码使得代码更容易被理解。从今天起将推出系列文章介绍RocketMQ Streams 1.1.0版本本次文章主要介绍RocketMQ Streams 1.1.0的API如何使用如何利用RocketMQ Streams快速构建流处理应用。02 典型使用示例本地运行下列示例的步骤1、部署RocketMQ 5.02、使用mqAdmin创建topic3、构建示例工程添加依赖启动示例。RocketMQ Streams 坐标dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-streams/artifactId version1.1.0/version /dependency4、向topic中写入相应数据并观察结果。更详细文档请参考https://github.com/apache/roc...WordCountpublic class WordCount { public static void main(String[] args) { StreamBuilder builder new StreamBuilder(wordCount); builder.source(sourceTopic, total - { String value new String(total, StandardCharsets.UTF_8); return new Pair(null, value); }) .flatMap((ValueMapperActionString, ListString) value - { String[] splits value.toLowerCase().split(\W); return Arrays.asList(splits); }) .keyBy(value - value) .count() .toRStream() .print(); TopologyBuilder topologyBuilder builder.build(); Properties properties new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, 127.0.0.1:9876); RocketMQStream rocketMQStream new RocketMQStream(topologyBuilder, properties); final CountDownLatch latch new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread(wordcount-shutdown-hook) { Override public void run() { rocketMQStream.stop(); latch.countDown(); } }); try { rocketMQStream.start(); latch.await(); } catch (final Throwable e) { System.exit(1); } System.exit(0); } }WordCount示例要点1、JobId wordCount唯一标识流处理任务2、自定义的反序列化3、一对多转化4、lambda形式从数据中指定Key5、支持有状态计算窗口聚合public class WindowCount { public static void main(String[] args) { StreamBuilder builder new StreamBuilder(windowCountUser); AggregateActionString, User, Num aggregateAction (key, value, accumulator) - new Num(value.getName(), 100); builder.source(user, source - { User user1 JSON.parseObject(source, User.class); return new Pair(null, user1); }) .selectTimestamp(User::getTimestamp) .filter(value - value.getAge() 0) .keyBy(value - key) .window(WindowBuilder.tumblingWindow(Time.seconds(15))) .aggregate(aggregateAction) .toRStream() .print(); TopologyBuilder topologyBuilder builder.build(); Properties properties new Properties(); properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, 127.0.0.1:9876); properties.put(Constant.TIME_TYPE, TimeType.EVENT_TIME); properties.put(Constant.ALLOW_LATENESS_MILLISECOND, 2000); RocketMQStream rocketMQStream new RocketMQStream(topologyBuilder, properties); rocketMQStream.start(); } }窗口聚合示例要点1、支持指定时间字段2、支持滑动、滚动、会话多种类型window3、支持自定义UDAF类型聚合4、支持自定义时间类型和数据最大迟到时间双流JOINpublic class JoinWindow { public static void main(String[] args) { StreamBuilder builder new StreamBuilder(joinWindow); //左流 RStreamUser user builder.source(user, total - { User user1 JSON.parseObject(total, User.class); return new Pair(null, user1); }); //右流 RStreamNum num builder.source(num, source - { Num user12 JSON.parseObject(source, Num.class); return new Pair(null, user12); }); //自定义join后的运算 ValueJoinActionUser, Num, Union action new ValueJoinActionUser, Num, Union() { Override public Union apply(User value1, Num value2) { ... } }; user.join(num) .where(User::getName) .equalTo(Num::getName) .window(WindowBuilder.tumblingWindow(Time.seconds(30))) .apply(action) .print(); TopologyBuilder topologyBuilder builder.build(); Properties properties new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, 127.0.0.1:9876); RocketMQStream rocketMQStream new RocketMQStream(topologyBuilder, properties); rocketMQStream.start(); } }双流聚合示例要点1、支持window join和非window join对于非window join只需要在上述及连表达式中去掉window即可2、支持多种窗口类型的window join3、支持对join后数据自定义操作03 参与贡献RocketMQ Streams是Apache RocketMQ的子项目已经在社区开源参与RocketMQ Streams相关工作请参考以下资源1、试用RocketMQ Streams并阅读相关文档以了解更多信息maven仓库坐标dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-streams/artifactId version1.1.0/version /dependencyRocketMQ Streams文档https://rocketmq.apache.org/z...2、参与贡献如果你有任何功能请求或错误报告请随时提交 Pull Request 来分享你的反馈和想法社区仓库https://github.com/apache/roc...3、联系我们可以在 GitHub上创建 Issue向 RocketMQ 邮件列表发送电子邮件或在 RocketMQ Streams SIG 交流群与专家共同探讨RocketMQ Streams SIG加入方式添加“小火箭”微信回复RocketMQ Streams。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2579136.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!