1.创建表的执行环境
第一种
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> streamOperator = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}));
//创建表的执行环境
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
//转化为table
Table table = streamTableEnvironment.fromDataStream(streamOperator);
第二种
EnvironmentSettings settings =EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
2.注册表用于把数据载入输入
String inputDDL = " CREATE TABLE clicktable ( "+
"url STRING ,"+
"user_name STRING," +
"timestamp BIGINT "+
" ) with ("+
" 'connector' = 'filesystem' ,"+
" 'path' = 'file/click.txt', "+
" 'format' = 'csv' )";
TableResult tableResult = tableEnv.executeSql(inputDDL);
3.通过sql查询语句得到一张结果表result
Table result= tableEnv.sqlQuery("select url,count(1) as cnt from clicktable group by url");
4.表输出
String createDDL = " CREATE TABLE output ( "+
"url STRING ,"+
"cnt BIGINT ) with ("+
" 'connector' = 'filesystem' ,"+
" 'path' = 'output', "+
" 'format' = 'csv' )";
tableEnv.executeSql(createDDL);
result.executeInsert("output");

流表转换:
1.表转流
StreamTableEnvironment
todataStream
toChangelogStream
2.流转表



![[Element]调整select样式](https://img-blog.csdnimg.cn/7610054fcf444eb1bd5362402d95ef9a.png)
















