Flink SQL DataGen Connector 示例
1、概述
使用 Flink SQL DataGen Connector,可以快速地生成符合规则的测试数据,可以在不依赖真实数据的情况下进行开发和测试。
2、使用示例
创建一个名为 “users” 的表,包含 6 个字段:id、name、age、email、created_at 和 updated_at。
在表的定义中,指定各个字段的规则:
id 字段使用序列生成器,生成的范围从 1 到 1000;
name 字段的长度为 10 个字符;
age 字段的范围从 18 到 60 岁;
email 字段的长度为随机的10个字符;
created_at 和 updated_at 字段使用随机时间生成器,时间范围从 2022 年 1 月 1 日到 2022 年 12 月 31 日。
3、官网参数介绍
1)数据类型注释
| Type | Supported Generators | Notes | 
|---|---|---|
| BOOLEAN | random | |
| CHAR | random / sequence | |
| VARCHAR | random / sequence | |
| STRING | random / sequence | |
| DECIMAL | random / sequence | |
| TINYINT | random / sequence | |
| SMALLINT | random / sequence | |
| INT | random / sequence | |
| BIGINT | random / sequence | |
| FLOAT | random / sequence | |
| DOUBLE | random / sequence | |
| DATE | random | Always resolves to the current date of the local machine. | 
| TIME | random | Always resolves to the current time of the local machine. | 
| TIMESTAMP | random | Always resolves to the current timestamp of the local machine. | 
| TIMESTAMP_LTZ | random | Always resolves to the current timestamp of the local machine. | 
| INTERVAL YEAR TO MONTH | random | |
| INTERVAL DAY TO MONTH | random | |
| ROW | random | Generates a row with random subfields. | 
| ARRAY | random | Generates an array with random entries. | 
| MAP | random | Generates a map with random entries. | 
| MULTISET | random | Generates a multiset with random entries. | 
2)连接器参数:
| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 | 
|---|---|---|---|---|
| connector | 必须 | (none) | String | 指定要使用的连接器,这里是 ‘datagen’。 | 
| rows-per-second | 可选 | 10000 | Long | 每秒生成的行数,用以控制数据发出速率。 | 
| fields.#.kind | 可选 | random | String | 指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’。 | 
| fields.#.min | 可选 | (Minimum value of type) | (Type of field) | 随机生成器的最小值,适用于数字类型。 | 
| fields.#.max | 可选 | (Maximum value of type) | (Type of field) | 随机生成器的最大值,适用于数字类型。 | 
| fields.#.length | 可选 | 100 | Integer | 随机生成器生成字符的长度,适用于 char、varchar、string。 | 
| fields.#.start | 可选 | (none) | (Type of field) | 序列生成器的起始值。 | 
| fields.#.end | 可选 | (none) | (Type of field) | 序列生成器的结束值。 | 
4、代码示例
CREATE TABLE users (
  id BIGINT,
  name STRING,
  age INT,
  text STRING,
  created_at TIMESTAMP(3),
  updated_at TIMESTAMP(3)
) WITH (
  'connector' = 'datagen',
  'fields.id.kind' = 'sequence',
  'fields.id.start' = '1',
  'fields.id.end' = '1000',
  'fields.name.length' = '10',
  'fields.age.min' = '18',
  'fields.age.max' = '60',
  'fields.text.length' = '5'
);
测试结果:
select * from users;




















