什么是RisingWave
RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库。RisingWave 让用户使用操作传统数据库的方式来处理流数据。通过创建实时物化视图,RisingWave 可以让用户轻松编写流计算逻辑,并通过访问物化视图来对流计算结果进行及时、一致的查询。
安装与启动
Docker 环境
docker run -it --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest playground

通过DataGrip连接

创建表格与物化视图
create table t(v1 int, v2 int);
insert into t values(1,10),(2,20),(3,30);
create materialized view mv as select sum(v1) from t;

查询试图
select * from mv;

导入数据
通过datagen生成数据
CREATE TABLE t1 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
CREATE SOURCE s1 (w1 int, w2 int)
WITH (
connector = 'datagen',
fields.w1.kind = 'sequence',
fields.w1.start = '1',
fields.w2.kind = 'random',
fields.w2.min = '-10',
fields.w2.max = '10',
fields.w2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
查询创建的表
show tables;

查询source
show sources;

查询结果

Source不支持直接查询

进行流计算
create materialized view mv_t1 as select count(*) from t1;
create materialized view mv_s1 as select count(*) from s1;


从kafka topic中创建source数据
-- 创建kafka source
CREATE SOURCE from_kafka2 (
name2 string,
id2 string,
table2 string
)
INCLUDE offset
include timestamp
include partition
WITH (
connector = 'kafka',
topic = 'kafka-send2',
properties.bootstrap.server = '192.168.5.54:9092'
)
FORMAT PLAIN ENCODE json;
kafka中数据格式

查询结果




















