Flink源码阅读:双流操作
Window Join我们先回顾一下 window join 的使用方法。DataStreamTuple2String, Double result source1.join(source2) .where(record - record.f0) .equalTo(record - record.f0) .window(TumblingEventTimeWindows.of(Time.seconds(2L))) .apply(new JoinFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double() { Override public Tuple2String, Double join(Tuple2String, Double record1, Tuple2String, Double record2) throws Exception { return Tuple2.of(record1.f0, record1.f1); } });上述调用链路类的流转如下在 WithWindow 的 apply 方法中是构建了一个 coGroupedWindowedStream然后调用它的 apply 方法。public T SingleOutputStreamOperatorT apply( JoinFunctionT1, T2, T function, TypeInformationT resultType) { // clean the closure function input1.getExecutionEnvironment().clean(function); coGroupedWindowedStream input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream.apply(new JoinCoGroupFunction(function), resultType); }这里可以看出Window Join 的底层是转换成 coGroup 进行处理的。在 JoinCoGroupFunction 中coGroup 方法就是对两个流进行两层遍历然后将其应用到我们自定义的 JoinFunction 上。private static class JoinCoGroupFunctionT1, T2, T extends WrappingFunctionJoinFunctionT1, T2, T implements CoGroupFunctionT1, T2, T { private static final long serialVersionUID 1L; public JoinCoGroupFunction(JoinFunctionT1, T2, T wrappedFunction) { super(wrappedFunction); } Override public void coGroup(IterableT1 first, IterableT2 second, CollectorT out) throws Exception { for (T1 val1 : first) { for (T2 val2 : second) { out.collect(wrappedFunction.join(val1, val2)); } } } }CoGroupCoGroup 的整体用法和流程与 Join 都类似我们就不逐个介绍了。我们直接来看 apply 方法。public T SingleOutputStreamOperatorT apply( CoGroupFunctionT1, T2, T function, TypeInformationT resultType) { // clean the closure function input1.getExecutionEnvironment().clean(function); UnionTypeInfoT1, T2 unionType new UnionTypeInfo(input1.getType(), input2.getType()); UnionKeySelectorT1, T2, KEY unionKeySelector new UnionKeySelector(keySelector1, keySelector2); SingleOutputStreamOperatorTaggedUnionT1, T2 taggedInput1 input1.map(new Input1TaggerT1, T2()); taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false); taggedInput1.returns(unionType); SingleOutputStreamOperatorTaggedUnionT1, T2 taggedInput2 input2.map(new Input2TaggerT1, T2()); taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false); taggedInput2.returns(unionType); DataStreamTaggedUnionT1, T2 unionStream taggedInput1.union(taggedInput2); // we explicitly create the keyed stream to manually pass the key type information in windowedStream new KeyedStreamTaggedUnionT1, T2, KEY( unionStream, unionKeySelector, keyType) .window(windowAssigner); if (trigger ! null) { windowedStream.trigger(trigger); } if (evictor ! null) { windowedStream.evictor(evictor); } if (allowedLateness ! null) { windowedStream.allowedLateness(allowedLateness); } return windowedStream.apply( new CoGroupWindowFunctionT1, T2, T, KEY, W(function), resultType); }在 apply 方法中先把两个流进行合并然后创建了 windowedStream并把窗口相关的属性设置好最后是调用 windowedStream 的 apply 方法。在调用windowedStream.apply方法时又将 function 包装成了 CoGroupWindowFunction。private static class CoGroupWindowFunctionT1, T2, T, KEY, W extends Window extends WrappingFunctionCoGroupFunctionT1, T2, T implements WindowFunctionTaggedUnionT1, T2, T, KEY, W { private static final long serialVersionUID 1L; public CoGroupWindowFunction(CoGroupFunctionT1, T2, T userFunction) { super(userFunction); } Override public void apply(KEY key, W window, IterableTaggedUnionT1, T2 values, CollectorT out) throws Exception { ListT1 oneValues new ArrayList(); ListT2 twoValues new ArrayList(); for (TaggedUnionT1, T2 val : values) { if (val.isOne()) { oneValues.add(val.getOne()); } else { twoValues.add(val.getTwo()); } } wrappedFunction.coGroup(oneValues, twoValues, out); } }在 CoGroupWindowFunction 的 apply 方法中是将主键为 key 的流分开两个流再去调用 JoinCoGroupFunction 的 coGroup 方法。这里的 values 都是相同的 key原因是在 window 中维护的 windowState它内部是一个 stateTable窗口的 namespace 和 key 共同维护一个 state当窗口触发时就会对相同 key 的数据调用 apply 方法。Interval Join梳理完了 Window Join 和 CoGroup 之后我们再接着看 Interval Join。还是先来回顾一下用法。DataStreamTuple2String, Double intervalJoinResult source1.keyBy(record - record.f0) .intervalJoin(source2.keyBy(record - record.f0)) .between(Time.seconds(-2), Time.seconds(2)) .process(new ProcessJoinFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double() { Override public void processElement(Tuple2String, Double record1, Tuple2String, Double record2, ProcessJoinFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double.Context context, CollectorTuple2String, Double out) throws Exception { out.collect(Tuple2.of(record1.f0, record1.f1 record2.f1)); } });通过用法可以看出interval join 传入的对象是两个 KeyedStream接着使用 between 方法定义 interval join 的上下边界最后调用 process 方法执行计算逻辑。在调用过程中类型的转换如下图。我们主要关注 process 的逻辑。public OUT SingleOutputStreamOperatorOUT process( ProcessJoinFunctionIN1, IN2, OUT processJoinFunction, TypeInformationOUT outputType) { Preconditions.checkNotNull(processJoinFunction); Preconditions.checkNotNull(outputType); final ProcessJoinFunctionIN1, IN2, OUT cleanedUdf left.getExecutionEnvironment().clean(processJoinFunction); if (isEnableAsyncState) { final AsyncIntervalJoinOperatorKEY, IN1, IN2, OUT operator new AsyncIntervalJoinOperator( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, leftLateDataOutputTag, rightLateDataOutputTag, left.getType() .createSerializer( left.getExecutionConfig().getSerializerConfig()), right.getType() .createSerializer( right.getExecutionConfig().getSerializerConfig()), cleanedUdf); return left.connect(right) .keyBy(keySelector1, keySelector2) .transform(Interval Join [Async], outputType, operator); } else { final IntervalJoinOperatorKEY, IN1, IN2, OUT operator new IntervalJoinOperator( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, leftLateDataOutputTag, rightLateDataOutputTag, left.getType() .createSerializer( left.getExecutionConfig().getSerializerConfig()), right.getType() .createSerializer( right.getExecutionConfig().getSerializerConfig()), cleanedUdf); return left.connect(right) .keyBy(keySelector1, keySelector2) .transform(Interval Join, outputType, operator); } }Interval join 是基于 ConnectedStream 实现的ConnectedStream 提供了更加通用的双流操作它将两个流组合成一个 TwoInputTransformation然后加入执行图中。具体的 Operator 是 IntervalJoinOperator 或 AsyncIntervalJoinOperator它们都是 TwoInputStreamOperator 的实现类提供processElement1和processElement2两个方法分别处理两个输入源的数据最终都调用的是 processElement。private THIS, OTHER void processElement( final StreamRecordTHIS record, final MapStateLong, ListIntervalJoinOperator.BufferEntryTHIS ourBuffer, final MapStateLong, ListIntervalJoinOperator.BufferEntryOTHER otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception { final THIS ourValue record.getValue(); final long ourTimestamp record.getTimestamp(); if (ourTimestamp Long.MIN_VALUE) { throw new FlinkException( Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.); } if (isLate(ourTimestamp)) { sideOutput(ourValue, ourTimestamp, isLeft); return; } addToBuffer(ourBuffer, ourValue, ourTimestamp); for (Map.EntryLong, ListBufferEntryOTHER bucket : otherBuffer.entries()) { final long timestamp bucket.getKey(); if (timestamp ourTimestamp relativeLowerBound || timestamp ourTimestamp relativeUpperBound) { continue; } for (BufferEntryOTHER entry : bucket.getValue()) { if (isLeft) { collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } long cleanupTime (relativeUpperBound 0L) ? ourTimestamp relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } }在 IntervalJoinOperator 中维护了两个 MapState每个消息进来的时候都会加入到 MapState 中key 是 timestampvalue 是一个元素的列表。然后遍历另一个 MapState得到符合条件的数据。最后是为每条数据注册一个定时器当时间超过有效范围后会从 MapState 中清除这个时间戳的数据。总结本文我们梳理了 Flink 的三种双流操作的源码我们了解到 Window Join 底层是通过 CoGroup 实现的。CoGroup 本身是将两个流合并成 WindowedStream 并依赖于 WindowState 进行数据 join。最后 Interval Join 是通过 ConnectedStreams 实现的内部的 IntervalJoinOperator 会维护两个 MapState通过 MapState 进行数据关联。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2473480.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!