1.为什么定时器的时间设置为,窗口的end值+1ms就可以呢?
因为定时器是下游,水位线是取的多个上游的最小的, 水位线是跟在数据后面的,所以当定时器的时间到达时,上游一定计算完成了,并且数据已经在水位线之前到下游了,所以可以触发计算,就是当前窗口所有的数据,比如窗口是[8:00~9:00)
2.为什么要用定时器呢?
不用也可以计算,但是是来一条计算一条,假如有10万条,效率低,用定时器计算,可以在数据到齐时,一起计算,效率高。
3.不用window,但是一定要keyBy
不用window的原因:不是取有限的数据,而是取所有end是9:00的数据
一定要keyBy的原因:因为上游计算完成的有可能有[8:05~9:05)的数据,所以需要根据end分组
4.为什么读取文件,没到5分钟就触发计算了?
因为用的是事件时间
5.定义的flag变量,计算完需要置null吗?
不需要。每个key都有自己的ValueState
6.定时器触发的时候,上游一定都计算完了吗?
一定计算完了。
因为上游是先keyBy,再window,计算的是A商品在 [8:00~9:00)时间段内的数据,B商品在 [8:00~9:00)时间段内的数据,当A的水位线到达9:00的时候,触发了计算,但是B的水位线才到8:30,这时候定时器会取上游最小的8:30,所以不会触发,当B的水位线推进到9:00的时候,现在最小的就是9:00,所以定时器会触发计算,这样,A和B都被计算了,没有丢失数据。
7.flink的定时器 如果重复注册相同的 会触发多次吗?
不会
“答案是不会,应为Flink内部使用的HeapPriorityQueueSet来存储定时器,一个注册请求到来时,其add()方法会检查是否已经存在,如果存在则不会加入。 ”
但是最好在外面手动控制,比如用一个Boolean值,只在第一个时注册。
8.下游定时器需要等所有上游时间都到达后计算,等的是哪些上游?
可以通过webui界面看上下游
对于topn,下游是process算子, 上游是aggregate算子,当A、B、C三个商品都完成之后,下游定时器计算。
9.对于水位线
源头是周期性产生的,但是之后是:水位线是跟在数据屁股后面的,所以等aggregate算子计算完后,定时器再计算。

package com.atguigu.flink.state;
import com.atguigu.flink.func.WaterSensorMapFunction;
import com.atguigu.flink.pojo.UserBehavior;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
 * Created by Smexy on 2023/6/21
 *
 *  需求: 每隔5min输出最近1h内点击量(pv)最多的前3个商品
 *
 *  数据:    543462,1715,1464116,pv,1511658000
 *          userId,商品id,商品类别id,行为类型,ts
 *
 *   输入:  543462,1715,1464116,pv,1511658000
 *          粒度: 一个用户点击一个商品的一次是一行
 *
 *
 *   推理计算过程:   聚合,keyBy  商品id
 *          第一次聚合:  统计最近1h(窗口)内,各个商品的点击总次数
 *              size: 范围,1h
 *              slide: 计算时机,5min
 *                  滑动的时间窗口。
 *
 *              输入:  543462,1715,1464116,pv,1511658000
 *
 *              输出:
 *                    [8:00,9:00):
 *                          A---120
 *                    [8:05,9:05)
 *                          A---150
 *                    [8:00,9:00):
 *                          B---130
 *                    [8:00,9:00):
 *                          C---132
 *                    [8:00,9:00):
 *                          D---131
 *
 *          第二次聚合:  将每个时间段窗口中各个商品的点击量,排序再取前3
 *                  用不用开窗? 不用
 *                  需要keyBy,按照窗口的统计的时间范围keyBy
 *
 *                  等同一个窗口的所有数据全部到达后,再一次性计算。
 *                  如何知道当前要计算的数据已经全部到达,可以触发运算?
 *                      使用定时器,将窗口的endTime作为触发时间,只要下游的时间到了endTime证明上游endTime之前的所有数据都已经到达了下游,可以进行运算。
 *
 *
 *
 *    输出:
 *              [8:00,9:00):
 *                      A--120
 *                      B--119
 *                      C--118
 *              [8:05,9:05):
 *                       E--120
 *                       B--119
 *                       C--118
 */
public class Flink12_TopN
{
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(
            new TextLineInputFormat(StandardCharsets.UTF_8.name())
            ,
            new Path("input/UserBehavior.csv")
        ).build();
         WatermarkStrategy<UserBehavior> watermarkStrategy = WatermarkStrategy
                     .<UserBehavior>forMonotonousTimestamps()
                     .withTimestampAssigner( (e, ts) -> e.getTimestamp() * 1000);
         //1.读数据,封装bean,过滤pv,生成水印
        SingleOutputStreamOperator<UserBehavior> ds = env
            .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "source")
            .map(line -> {
                String[] words = line.split(",");
                return new UserBehavior(
                    Long.valueOf(words[0]),
                    Long.valueOf(words[1]),
                    Integer.valueOf(words[2]),
                    words[3],
                    Long.valueOf(words[4])
                );
            })
            .filter(bean -> "pv".equals(bean.getBehavior()))
            .assignTimestampsAndWatermarks(watermarkStrategy);
        /*
            2.开窗,统计每种商品的点击次数
         */
        SingleOutputStreamOperator<HotItem> ds1 = ds
            .keyBy(UserBehavior::getItemId)
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
            .aggregate(new AggregateFunction<UserBehavior, Long, HotItem>()
            {
                @Override
                public Long createAccumulator() {
                    return 0l;
                }
                @Override
                public Long add(UserBehavior value, Long accumulator) {
                    return accumulator + 1;
                }
                @Override
                public HotItem getResult(Long accumulator) {
                    return new HotItem(null, null, null, accumulator);
                }
                @Override
                public Long merge(Long a, Long b) {
                    return null;
                }
            }, new ProcessWindowFunction<HotItem, HotItem, Long, TimeWindow>()
            {
                @Override
                public void process(Long key, ProcessWindowFunction<HotItem, HotItem, Long, TimeWindow>.Context context, Iterable<HotItem> iterable, Collector<HotItem> collector) throws Exception {
                    HotItem hotItem = iterable.iterator().next();
                    TimeWindow window = context.window();
                    //赋值
                    hotItem.setStart(window.getStart());
                    hotItem.setEnd(window.getEnd());
                    hotItem.setItemId(key);
                    collector.collect(hotItem);
                }
            });
        //3.在下游按照窗口的时间范围分组,top3统计。使用定时器触发运算。
        ds1
            .keyBy(HotItem::getStart)
            .process(new KeyedProcessFunction<Long, HotItem, String>()
            {
                private ValueState<Boolean> flag;
                private ListState<HotItem> listState;
                /*
                    没来一条数据,先存起来,等定时器到点了,再触发top3
                 */
                @Override
                public void open(Configuration parameters) throws Exception {
                    listState = getRuntimeContext().getListState(new ListStateDescriptor<>("hot3", HotItem.class));
                    flag = getRuntimeContext().getState(new ValueStateDescriptor<>("flag", Boolean.class));
                }
                //进行top3计算
                @Override
                public void onTimer(long timestamp, KeyedProcessFunction<Long, HotItem, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                    List<HotItem> top3 = StreamSupport.stream(listState.get().spliterator(), true)
                                                         .sorted((h1, h2) -> -h1.getCount().compareTo(h2.getCount()))
                                                         .limit(3)
                                                         .collect(Collectors.toList());
                    //整理数据的格式
                    String resultStr = top3.stream().map(item -> item.getItemId() + ":" + item.getCount()).collect(Collectors.joining(","));
                    String timeStr = MyUtil.parseTimeWindow(new TimeWindow(top3.get(0).getStart(), top3.get(0).getEnd()));
                    out.collect(timeStr + ": top3  : " + resultStr);
                }
                @Override
                public void processElement(HotItem hotItem, KeyedProcessFunction<Long, HotItem, String>.Context context, Collector<String> collector) throws Exception {
                    listState.add(hotItem);
                    //在当前组中第一条数据来的时候,定定时器
                    if (flag.value() == null){
                        //定定时器
                        context.timerService().registerEventTimeTimer(hotItem.getEnd());
                        flag.update(false);
                    }
                }
            })
            .print();
        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class HotItem{
        //定义窗口范围 时间窗口
        private Long start;
        private Long end;
        //定义统计的指标
        private Long itemId;
        private Long count;
    }
}
 
                


















