新建project:
- pom文件
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>test.wyh</groupId>
    <artifactId>Flink117_Test_01</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
</project> 
- Main类(先用批的方式测试)
 
package test01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class EnvDemo {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //指定端口,默认8081
        conf.set(RestOptions.BIND_PORT, "8082");
        //会自动识别是远程集群还是本地IDEA环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        //设置流/批,默认是流
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.readTextFile("test_input/test_word.txt")
                .flatMap(
                        (String value, Collector< Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words){
                                out.collect(Tuple2.of(word, 1));
                            }
                        })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .print();
        env.execute();
    }
}
 
- 创建测试文件
 

- 运行程序
 

-------------------------------------------------
- Main类(用批的方式测试)
 
package test01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class EnvDemo {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //指定端口,默认8081
        conf.set(RestOptions.BIND_PORT, "8082");
        //会自动识别是远程集群还是本地IDEA环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        //设置流/批,默认是流
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.readTextFile("test_input/test_word.txt")
                .flatMap(
                        (String value, Collector< Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words){
                                out.collect(Tuple2.of(word, 1));
                            }
                        })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .print();
        env.execute();
    }
}
 
- 运行程序
 




















