一、环境准备
1、准备两台服务器server115 和server116安装好hadoop环境,其中server115配置hdfs的namenode,在server116上配置hdfs的SecondaryNameNode,server116配置yarn的 ResourceManager,启动hadoop集群

2、配置hadoop环境变量
vim /etc/profile
export HADOOP_CLASSPATH=`hadoop classpath` 
二、配置FLink集群环境
关于flink的架构图

部署配置:
| 服务器 | server115 | server116 | 
|---|---|---|
| Flink组件 | taskmanger | jobmanager taskmanager | 
(1) 进入server115节点,编辑flink的配置文件flink-conf.yml
# JobManager runs.
jobmanager.rpc.address: server116
jobmanager.bind-host: server116
taskmanager.bind-host: server115
taskmanager.host: server115
rest.port: 8081
rest.address: server116
 
(2) 进入server116节点,编辑flink的配置文件flink-conf.yml
# JobManager runs.
jobmanager.rpc.address: server116
jobmanager.bind-host: server116
taskmanager.bind-host: server116
taskmanager.host:server116
rest.port: 8081
rest.address: server116
 
(3)在server115、server116的节点workers配置
server115
server116
 
三、启动flink
 ./flink/bin/yarn-session.sh    -d
 

JobManager Web Interface: http://server116:40617

四、任务测试
1、 开启一个tcp服务
nc  -l  -p  9999 

2、编写java侦听代码
public class SocketFlinkExecute {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("server116", 40617);
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("server116", 9999)
                .flatMap(new MySplitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);
        dataStream.print();
        try {
            env.execute("Window WordCount Remote");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
 
运行程序,tcp服务端发送文本数据

 



















