flink输出到es、redis、mysql、kafka、file
文章目录
- 配置pom文件
- 公共实体类
- KafkaSInk
- ElasticsearchSink(EsSink)
- RedisSink
- MysqlSink(JdbcSink)
- FileSink
自己先准备一下相关环境
配置pom文件
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入 Flink 相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<!-- 引入kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<!-- redis依赖 -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!-- ES依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Mysql依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
公共实体类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@ToString
@AllArgsConstructor
public class UserEvent {
private String userName;
private String url;
private Long timestemp;
}
KafkaSInk
将数据输出到kafka中,先启动kafka consumer,再运行程序
import com.event.UserEvent;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Properties;
public class KafkaSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
//kafka相关配置
properties.setProperty("bootstrap.servers", "hadoop01:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> stream = env.fromCollection(Arrays.asList(
"xiaoming,www.baidu.com,1287538716253",
"Mr Li,www.baidu.com,1287538710000",
"Mr Zhang,www.baidu.com,1287538710900"
));
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
//输出规则
String[] split = value.split(",");
return new UserEvent(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim())).toString();
}
});
//启动kafkaconsumer指令
// ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events
result.addSink(new FlinkKafkaProducer<String>(
//kafka所在地址
"hadoop01:9092",
//指定输出的topic
"events",
new SimpleStringSchema()
));
env.execute();
}
}
运行结果
ElasticsearchSink(EsSink)
将数据输出到elasticsearch中
示例代码
import com.event.UserEvent;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.table.descriptors.Elasticsearch;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
public class EsSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserEvent> userEventDataStreamSource =
env.fromCollection(
Arrays.asList(
new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),
new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),
new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),
new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),
new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),
new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)
));
//定义host列表
List<HttpHost> hosts = Arrays.asList(new HttpHost("hadoop01", 9200));
//定义ElasticsearchSinkFunction
ElasticsearchSinkFunction<UserEvent> elasticsearchSinkFunction = new ElasticsearchSinkFunction<UserEvent>() {
@Override
public void process(UserEvent userEvent, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
IndexRequest indexRequest = Requests.indexRequest()
.index("events")
.type("type")
.source(new HashMap<String, String>() {{
put(userEvent.getUserName(), userEvent.getUrl());
}});
requestIndexer.add(indexRequest);
}
};
//写入es
userEventDataStreamSource.addSink(new ElasticsearchSink.Builder<>(hosts, elasticsearchSinkFunction).build());
env.execute();
}
}
指令
GET _cat/indices
GET _cat/indices/events
GET events/_search
运行结果
RedisSink
将数据输出到Redis
示例代码
import com.event.UserEvent;
import my.test.source.CustomSouce;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class RedisSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<UserEvent> streamSource = env.addSource(new CustomSouce());
//创建jedis连接配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("master")
.setTimeout(10000)
.setPort(6379)
.build();
//写到redis
streamSource.addSink(new RedisSink<>(config, new MyRedisMapper()));
env.execute();
}
public static class MyRedisMapper implements RedisMapper<UserEvent>{
@Override
public RedisCommandDescription getCommandDescription() {
//写入方式为hset
return new RedisCommandDescription(RedisCommand.HSET, "events"); //additionalKey参数标识存储再哪里
}
@Override
public String getKeyFromData(UserEvent userEvent) {
return userEvent.getUserName();
}
@Override
public String getValueFromData(UserEvent userEvent) {
return userEvent.getUrl();
}
}
}
自定义source
import com.event.UserEvent;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
public class CustomSouce implements SourceFunction<UserEvent> {
// 声明一个布尔变量,作为控制数据生成的标识位
private Boolean running = true;
@Override
public void run(SourceContext<UserEvent> ctx) throws Exception {
Random random = new Random(); // 在指定的数据集中随机选取数据
String[] users = {"Mary", "Alice", "Bob", "Cary"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
"./prod?id=2"};
while (running) {
ctx.collect(new UserEvent(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar .getInstance().getTimeInMillis()
));
// 隔 1 秒生成一个点击事件,方便观测
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
运行结果
因为上述source是一个无界流,所以数据一直会变化
MysqlSink(JdbcSink)
将数据输出到mysql
表结构
create table events(
user_name varchar(20) not null,
url varchar(100) not null
);
示例代码
import com.event.UserEvent;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
public class MysqlSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//一组数据
DataStreamSource<UserEvent> userEventDataStreamSource =
env.fromCollection(
Arrays.asList(
new UserEvent("zhangsan", "/path?test123", System.currentTimeMillis() - 2000L),
new UserEvent("zhangsan", "/path?test", System.currentTimeMillis() + 2000L),
new UserEvent("lisi", "/path?checkParam", System.currentTimeMillis()),
new UserEvent("bob", "/path?test", System.currentTimeMillis() + 2000L),
new UserEvent("mary", "/path?checkParam", System.currentTimeMillis()),
new UserEvent("lisi", "/path?checkParam123", System.currentTimeMillis() - 2000L)
));
userEventDataStreamSource.addSink(JdbcSink.sink(
//要执行的sql语句
"INSERT INTO events (user_name, url) VALUES(?, ?)",
new JdbcStatementBuilder<UserEvent>() {
@Override
public void accept(PreparedStatement preparedStatement, UserEvent userEvent) throws SQLException {
//sql占位符赋值
preparedStatement.setString(1, userEvent.getUserName());
preparedStatement.setString(2, userEvent.getUrl());
}
},
//jdbc相关参数配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hadoop01:3306/mysql")
.withUsername("root")
.withPassword("123456")
.withDriverName("com.mysql.jdbc.Driver")
.build()
));
env.execute();
}
}
当程序运行结束之后可以看到mysql的events表里面多了数据
FileSink
将数据输出到文件中(可以输出分区文件)
import com.event.UserEvent;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.TimeUtils;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class FileSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserEvent> userEventDataStreamSource =
env.fromCollection(
Arrays.asList(
new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),
new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),
new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),
new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),
new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),
new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)
));
StreamingFileSink<String> streamingFileSink = StreamingFileSink.
<String>forRowFormat(new Path("./output/"), new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024 * 1024)
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
//不活跃的间隔时间,用于归档保存使用
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.build()
).build();
userEventDataStreamSource.map(data -> data.getUserName()).addSink(streamingFileSink);
env.execute();
}
}
运行结束后会多出来一些文件