flinkSql 将流和表的互相转换

流——>表

方式一

方式二

方式一:写sql 
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
tableEnv.createTemporaryView("t_1",source,$("word"));

方式二:使用dsl
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
Table table = tableEnv.fromDataStream(source,$("word"));

 表——>流

Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");

// 方式一:toAppendStream
DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);

// 报错:toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])

// 这个不支持分组和聚合操作,若出现聚合操作使用方式二将表转为流

//方式二:toRetractStream
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);

 wordCount案例

方式一:使用sql

package com.bigdata.day07;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @基本功能:
 * @program:flinkProject
 * @author: 堇年
 * @create:2024-11-28 14:42:27
 **/
public class _06_flink_wordcounnt {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 获取tableEnv对象
        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
        SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        //2. 创建表对象
        tEnv.createTemporaryView("t_1",flatMap,$("word"));
        //3. 编写sql语句
        Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");
        //4. 将Table变为stream流
        //使用toAppendStream时会报错 因为有聚合操作
        //DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);
        // toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])
        // 在这里可以映射为ROW对象,也可以映射为自己定义的实体类
        DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);
        retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
            @Override
            public boolean filter(Tuple2<Boolean, Row> value) throws Exception {
                return value.f0;
            }
        }).print();


        //5. execute-执行
        env.execute();
    }
}

方式二:使用dsl语句 

package com.bigdata.day07;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

public class _06_flink_wordcounnt_dsl {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 获取tableEnv对象
        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
        SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        //2. 创建表对象
        Table table = tEnv.fromDataStream(flatMap,$("word"));

        //3. 编写sql语句
        Table rsTable = table.groupBy($("word")).select($("word"),$("word").count().as("wordcount"));
        rsTable.printSchema();

        //4. 将Table变为stream流

        DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(rsTable, Row.class);
        retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
            @Override
            public boolean filter(Tuple2<Boolean, Row> value) throws Exception {
                return value.f0;
            }
        }).print();


        //5. execute-执行
        env.execute();
    }
}

结果展示 

+I 表示有一条新数据进行了插入
+U 表示有一条已存在的数据有插入了一条,需要进行更新
-U 在+U前表示,先删除原本的,在update新的

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/930919.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

React性能优化

三个可以优化的地方 避免过度多次渲染 组件会在以下情况下重新渲染 注意&#xff1a;例如组件组合的形式&#xff0c;<Test><Counter></Counter></Test>,即使Test发生了重新渲染&#xff0c;Counter也不会重新渲染。另外使用React这样的库或框架时&a…

分布式事务的前世今生-纯理论

一个可用的复杂的系统总是从可用的简单系统进化而来。反过来这句话也正确: 从零开始设计的复杂的系统从来都用不了&#xff0c;也没办法让它变的可用。 --John Gal 《系统学》 1975 1. 事务的概念 百科&#xff1a; 事务&#xff08;Transaction&#xff09;&#xff0c;一般是…

k8s-编写CSI插件(3)

1、概述 在 Kubernetes 中&#xff0c;存储插件的开发主要有以下几种方式&#xff1a; CSI插件&#xff1a;Container Storage Interface (CSI) 是 Kubernetes 的标准插件接口&#xff0c;是全新的插件方案&#xff0c;插件和驱动调用通过grpc协议&#xff0c;功能丰富&#x…

Linux系统:网络

目录 一、网络协议 1.网络协议概念 2.协议分层 3.OSI七层模型和TCP/IP五层&#xff08;或四层&#xff09;模型 4.为什么要有网络协议&#xff1f; 5.网络通信协议的原理 二、网络传输的基本流程 1.局域网的网络传输流程 1.MAC地址 2.局域网通信原理&#xff08;以太网…

电子应用设计方案-45:智能火锅系统方案设计

智能火锅系统方案设计 一、引言 随着人们生活水平的提高和对饮食体验的追求&#xff0c;智能火锅系统应运而生。本方案旨在设计一款集智能化控制、高效加热、安全保障和个性化体验于一体的智能火锅系统。 二、系统概述 1. 系统目标 - 实现精准的温度控制&#xff0c;满足不同…

论文概览 |《Urban Analytics and City Science》2023.03 Vol.50 Issue.3

本次给大家整理的是《Environment and Planning B: Urban Analytics and City Science》杂志2023年3月第50卷第3期的论文的题目和摘要&#xff0c;一共包括18篇SCI论文&#xff01; 论文1 A new kind of search 一种新型的搜索 【摘要】 ChatGPT (2022) was first launched o…

jwt简介和在go中的简单使用

什么是 JSON Web 令牌&#xff1f; JSON Web 令牌 &#xff08;JWT&#xff09; 是一种开放标准 &#xff08;RFC 7519&#xff09;&#xff0c;它定义了一种紧凑且自包含的方式&#xff0c;用于将信息作为 JSON 对象在各方之间安全地传输。此信息是经过数字签名的&#xff0c…

机器学习详解(3):线性回归之代码详解

文章目录 1 数据预处理2 构建线性回归模型并绘制回归线初始化方法前向传播&#xff1a;forward_propagation代价函数&#xff1a;cost_function反向传播&#xff1a;backward_propagation参数更新&#xff1a;update_parameters训练方法&#xff1a;train代码运行结果 3 使用Py…

iOS如何自定义一个类似UITextView的本文编辑View

对于IOS涉及文本输入常用的两个View是UITextView和UITextField&#xff0c;一个用于复杂文本输入&#xff0c;一个用于简单文本输入&#xff0c;在大多数开发中涉及文本输入的场景使用这两个View能够满足需求。但是对于富文本编辑相关的开发&#xff0c;这两个View就无法满足自…

唇形同步视频生成工具:Wav2Lip

一、模型介绍 今天介绍一个唇形同步的工具-Wav2Lip&#xff1b;Wav2Lip是一种用于生成唇形同步&#xff08;lip-sync&#xff09;视频的深度学习算法&#xff0c;它能够根据输入的音频流自动为给定的人脸视频添加准确的口型动作。 &#xff08;Paper&#xff09; Wav2Lip模型…

轻量化特征融合 | YOLOv11 引入一种基于增强层间特征相关性的轻量级特征融合网络 | 北理工新作

本改进已同步到Magic框架 摘要—无人机图像中的小目标检测由于分辨率低和背景融合等因素具有挑战性,导致特征信息有限。多尺度特征融合可以通过捕获不同尺度的信息来增强检测,但传统策略效果不佳。简单的连接或加法操作无法充分利用多尺度融合的优势,导致特征之间的相关性不…

数学课上的囚徒问题(2)

#include<bits/stdc.h> using namespace std; int main() {int n; cin>>n;double res0;for(int in/21;i<n;i)res1./i;cout<<fixed<<setprecision(12)<<(1-res); } 赛后看到别人那么短的代码直接破防了&#xff0c;才发现思考起来也不是很简单…

21Java之多线程、线程池、并发、并行

一、多线程常用方法 下面我们演示一下getName()、setName(String name)、currentThread()、sleep(long time)这些方法的使用效果。 public class MyThread extends Thread{public MyThread(String name){super(name); //1.执行父类Thread(String name)构造器&#xff0c;为当前…

HttpClient介绍

1. 介绍 2. 发送Get方式请求 public void httpGetTest() throws Exception{// 创建HttpClient对象CloseableHttpClient httpClient HttpClients.createDefault();// 创建请求方式对象HttpGet httpGet new HttpGet("http://localhost/user/shop/status");// 发送请…

矩阵的乘(包括乘方)和除

矩阵的乘分为两种&#xff1a; 一种是高等代数中对矩阵的乘的定义&#xff1a;可以去这里看看包含矩阵的乘。总的来说&#xff0c;若矩阵 A s ∗ n A_{s*n} As∗n​列数和矩阵 B n ∗ t B_{n*t} Bn∗t​的行数相等&#xff0c;则 A A A和 B B B可相乘&#xff0c;得到一个矩阵 …

智能跳转 - 短链接高级特性详解

看到标题&#xff0c;我只想说短链接工具已经卷疯了。很多人都知道&#xff0c;短链接的基础特性就是将长链接变短&#xff0c;更加简洁美观便于传播推广&#xff1b;高级一点的功能还有数据统计&#xff0c;便于运营进行分析决策&#xff1b;更高级的还能绑定企业自己的域名&a…

离线写博客(失败) - 用Markdown来离线写博客

因为想控制一下用网&#xff0c;但是又有写博客的需求&#xff0c;所以想研究一下离线写博客。 我看CSDN上面好像有很多介绍&#xff0c;Windows Live Writer 啦&#xff0c;Markdown啦&#xff0c;还有一些其他的&#xff0c;我看了一下&#xff0c;好像 Markdown还有点儿靠谱…

第三节、电机定速转动【51单片机-TB6600驱动器-步进电机教程】

摘要&#xff1a;本节介绍用定时器定时的方式&#xff0c;精准控制脉冲时间&#xff0c;从而控制步进电机速度 一、计算过程 1.1 电机每一步的角速度等于走这一步所花费的时间&#xff0c;走一步角度等于步距角&#xff0c;走一步的时间等于一个脉冲的时间 w s t e p t … ……

夏普MX-4608N复印机维修模式进入方法及载体初始化方法

夏普MX-4608N复印机载体型号&#xff08;图&#xff09;&#xff1a; 型 号&#xff1a;载体&#xff08;黑色&#xff09;MX-561CV 净含量&#xff1a;395克 下面图片中分别有载体、刮板、鼓芯、上纸盒搓纸轮一套&#xff0c;均原装正品&#xff1b; 保养周期将至的时候建…

头歌 Linux之线程管理

第1关&#xff1a;创建线程 任务描述 通常我们编写的程序都是单进程&#xff0c;如果在一个进程中没有创建新的线程&#xff0c;则这个单进程程序也就是单线程程序。本关我们将介绍如何在一个进程中创建多个线程。 本关任务&#xff1a;学会使用C语言在Linux系统中使用pthrea…