什么是RisingWave
RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库。RisingWave 让用户使用操作传统数据库的方式来处理流数据。通过创建实时物化视图,RisingWave 可以让用户轻松编写流计算逻辑,并通过访问物化视图来对流计算结果进行及时、一致的查询。
安装与启动
Docker 环境
docker run -it --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest playground
通过DataGrip连接
创建表格与物化视图
create table t(v1 int, v2 int);
insert into t values(1,10),(2,20),(3,30);
create materialized view mv as select sum(v1) from t;
查询试图
select * from mv;
导入数据
通过datagen生成数据
CREATE TABLE t1 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
CREATE SOURCE s1 (w1 int, w2 int)
WITH (
connector = 'datagen',
fields.w1.kind = 'sequence',
fields.w1.start = '1',
fields.w2.kind = 'random',
fields.w2.min = '-10',
fields.w2.max = '10',
fields.w2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
查询创建的表
show tables;
查询source
show sources;
查询结果
Source不支持直接查询
进行流计算
create materialized view mv_t1 as select count(*) from t1;
create materialized view mv_s1 as select count(*) from s1;
从kafka topic中创建source数据
-- 创建kafka source CREATE SOURCE from_kafka2 ( name2 string, id2 string, table2 string ) INCLUDE offset include timestamp include partition WITH ( connector = 'kafka', topic = 'kafka-send2', properties.bootstrap.server = '192.168.5.xxx:9092' ) FORMAT PLAIN ENCODE json;
kafka中数据格式
查询结果
-- 不指定结构创建source
CREATE SOURCE from_kafka5 ( content2 bytea ) INCLUDE offset include timestamp include partition WITH ( connector = 'kafka', topic = 'kafka-send2', properties.bootstrap.server = '192.168.5.xxx:9092' ) FORMAT PLAIN ENCODE BYTES;
-- json处理函数
SELECT ('{"a": {"b":"foo"}}'::jsonb -> 'a') ::jsonb ->> 'b'; select convert_from(content2, 'utf8') from from_kafka5; select to_jsonb(encode(content2, 'base64')) from from_kafka5; select to_jsonb(convert_from(content2, 'utf8')) from from_kafka5; select to_jsonb(content2) from from_kafka5; select convert_from(content2, 'utf8') :: jsonb ->> 'id2',convert_from(content2, 'utf8') :: jsonb ->> 'table2',convert_from(content2, 'utf8') :: jsonb ->> 'name2' from from_kafka5; select jsonb_typeof(encode(content2, 'escape') :: jsonb -> 'id2') from from_kafka5; select jsonb_extract_path(to_jsonb(convert_from(content2, 'utf8')), 'id2') from from_kafka5;
-- 统计查询
SELECT content2 FROM from_kafka5 where to_char(_rw_kafka_timestamp, 'YYYYMMDDHH24MI') = '202405090846' order by _rw_kafka_timestamp; SELECT to_char(date_trunc('minute', _rw_kafka_timestamp), 'YYYYMMDDHH24MISS') AS period, max(_rw_kafka_offset) as logEndOffset, min(_rw_kafka_offset),(max(_rw_kafka_offset)::INTEGER - min(_rw_kafka_offset)::INTEGER + 1) as addOffset, 'from_kafka5' as topic FROM from_kafka5 where _rw_kafka_timestamp + INTERVAL '24 hour' > now() group by date_trunc('minute', _rw_kafka_timestamp) order by period;