多个source、多个sink
关键配置sink的plugin_input [source_data1, source_data2]对应模型┌──────────┐│ Source A │──┐└──────────┘ │├──▶ Sink┌──────────┐ ││ Source B │──┘└──────────┘执行语句# ds-st-demo10-2-mysql2pgsql.confsh /data/tools/seatunnel/seatunnel-2.3.12/bin/seatunnel.sh --config /data/tools/seatunnel/myconf/ds-st-demo10-2-mysql2pgsql.conf -i -DJvmOption-Xms2G -Xmx2G -m local建表-- ds-st-demo10-2-mysql2pgsql.confCREATE TABLE public.t_8_100w_imp_st_ds_demo10 (id BIGINT PRIMARY KEY,user_name VARCHAR(2000),sex VARCHAR(20),decimal_f NUMERIC(32, 6),phone_number VARCHAR(20),age INT,create_time TIMESTAMP,description TEXT,address VARCHAR(2000) DEFAULT 未知,my_status INT);COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.id IS 主键;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.user_name IS 名字;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.sex IS 性别男女;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.decimal_f IS 大数字;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.phone_number IS 电话;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.age IS 字符串年龄转数字;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.create_time IS 新增时间;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.description IS 大文本;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.address IS 空地址转默认值未知;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.my_status IS 状态;conf配置env {# 任务名字业务中可以弄表idjob.name ds-st-demo10.conf# 最大批线程数并行度线程数parallelism 5# 任务模式BATCH:批处理模式STREAMING:流处理模式job.mode BATCH}source {# 第一个数据集jdbc {# 给这个数据集起个名字plugin_output source_data1url jdbc:mysql://ip:port/cs1driver com.mysql.cj.jdbc.Driveruser rootpassword ***# sqlquery select id,name as user_name,sex,decimal_f,phone_number,CAST(age AS SIGNED) as age,create_time,description,address from t_8_100w where id 10# 并行读取配置# 分片的字段支持String、Number(int, bigint, decimal, ...)、Datepartition_column id# 表的分割大小行数每个分片的数据行默认8096行。最后分片数表的总行数 / split.sizesplit.size 50000# 分片数匹配并行度parallelism2.3.12已不推荐配置了用split.size来代替# partition_num 5# 最大批处理数:查询的行提取大小(指定当前任务每次执行时读取数据条数,该值(默认1000)受运行内存影响,若该值较大或单条数据量较大需适当调整运行内存大小。)fetch_size 10000# 连接参数# 连接超时时间300msconnection_check_timeout_sec 300# 其他jdbc的参数properties {useUnicode truecharacterEncoding utf8# 时区不同数据库参数不一样serverTimezone Asia/Shanghai# 使用游标提高大结果集性能useCursorFetch true# 每次获取行数defaultFetchSize 10000}}# 第二个数据集jdbc {# 给这个数据集起个名字plugin_output source_data2url jdbc:mysql://ip:port/cs1driver com.mysql.cj.jdbc.Driveruser rootpassword ***#query select id,name as user_name,sex,decimal_f,phone_number,CAST(age AS SIGNED) as age,create_time,description,address from t_8_100w where id 10 and id 20# 并行读取配置# 分片的字段支持String、Number(int, bigint, decimal, ...)、Datepartition_column id# 表的分割大小行数每个分片的数据行默认8096行。最后分片数表的总行数 / split.sizesplit.size 50000# 分片数匹配并行度parallelism2.3.12已不推荐配置了用split.size来代替# partition_num 5# 最大批处理数:查询的行提取大小(指定当前任务每次执行时读取数据条数,该值(默认1000)受运行内存影响,若该值较大或单条数据量较大需适当调整运行内存大小。)fetch_size 10000# 连接参数# 连接超时时间300msconnection_check_timeout_sec 300# 其他jdbc的参数properties {useUnicode truecharacterEncoding utf8# 时区不同数据库参数不一样serverTimezone Asia/Shanghai# 使用游标提高大结果集性能useCursorFetch true# 每次获取行数defaultFetchSize 10000}}}# 清洗转换简单的清洗转换直接在source的query的sql中处理了就行transform {# 1. 字段映射sql中做了实际生成中不在这里处理。直接在source的query的sql中处理了就行# 还可以用FieldMapper 插件来映射字段# 转换age为数字类型pgsql必须转# 2. 手机号脱敏13812341234 - 138****1234# 3. 年龄转换字符串转整数实际生产中不用转换也没有内置的转换插件可以直接保存成功# 4. 性别转换1-男2-女# 5. 数据过滤只保留 age 25 的记录。# 6. 地址默认值空地址设为未知}sink {jdbc {# 接收的最终数据集汇聚到一个结果中plugin_input [source_data1, source_data2]url jdbc:postgresql://ip:5432/source_dbdriver org.postgresql.Driveruser postgrespassword 123456## query # 自动生成sql的配置和query参数互斥# 生成自动插入sql。如果目标库没有表也会自动建表generate_sink_sql true# database必须要因为generate_sink_sqltrue。database source_db# 自动生成sql时table必须要。table public.t_8_100w_imp_st_ds_demo10# 生成类似INSERT INTO …… ON CONFLICT (主键) DO UPDATE SET …… 的sql# enable_upsert true# 判断值唯一的健此选项用于支持在自动生成 SQL 时进行 insertdelete 和 update 操作。# primary_keys [id]# 表结构处理策略表不存在时报错任务失败一般用CREATE_SCHEMA_WHEN_NOT_EXIST表不存在时创建表表存在时跳过操作保留数据schema_save_mode ERROR_WHEN_SCHEMA_NOT_EXIST# 插入数据的处理策略# APPEND_DATA保留表结构和数据追加新数据不删除现有数据(一般用这个)# DROP_DATA保留表结构删除表中所有数据清空表——实现清空重灌# CUSTOM_PROCESSING :用户定义处理。需要配合custom_sql使用data_save_mode DROP_DATA# 当 data_save_mode 选择 CUSTOM_PROCESSING 时您应该填写 CUSTOM_SQL 参数。此参数通常填入可执行的 SQL。SQL 将在同步任务之前执行。#可以实现同步删除执行前置update、truncate的sql等#这个sql未执行不知道为啥。#这个sql已经执行。原因因为generate_sink_sqltrue的原因。才会执行custom_sql。只有自动生成sql的时候这个才会执行custom_sql update source_db.public.t_8_100w_imp_st_ds_demo10 set my_status 23# 批量写入条数batch_size 10000# 批次提交间隔batch_interval_ms 500# 重试次数max_retries 3# 连接参数# 连接超时时间300msconnection_check_timeout_sec 300# 其他jdbc的参数properties {# PostgreSQL专用参数# PostgreSQL的批量优化注意大小写reWriteBatchedInserts true# 如果需要时区设置options -c timezoneAsia/Shanghai}}}结果(汇聚了19条数据)2026-01-15 14:28:15,952 INFO [s.c.s.s.c.ClientExecuteCommand] [main] -***********************************************Job Statistic Information***********************************************Start Time : 2026-01-15 14:28:11End Time : 2026-01-15 14:28:15Total Time(s) : 4Total Read Count : 19Total Write Count : 19Total Failed Count : 0***********************************************
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2473370.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!