基于docker安装flink

文章目录

  • 环境准备
    • Flink
      • docker-compose方式
      • 二进制部署
    • Kafka
    • Mysql
  • Flink 执行 SQL命令
    • 进入SQL客户端CLI
    • 执行SQL查询
      • 表格模式
      • 变更日志模式
      • Tableau模式
      • 窗口计算
    • 窗口计算
      • 滚动窗口demo
      • 滑动窗口
  • 踩坑

环境准备

Flink

docker-compose方式

version: "3"
services:
  jobmanager:
    image: flink:latest
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:latest
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

前端访问地址: http://192.168.56.112:8081/#/overview

二进制部署

wget https://archive.apache.org/dist/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz

vim conf/flink-conf.yaml

jobmanager.rpc.address: 192.168.56.112 # 修改为本机ip

./bin/start-cluster.sh

Kafka

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper   ## 镜像
    ports:
      - "2181:2181"                 ## 对外暴露的端口号
  kafka:
    image: wurstmeister/kafka       ## 镜像
    volumes:
        - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.56.112    ## 修改:宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: 192.168.56.112:2181       ## 卡夫卡运行是基于zookeeper的
  kafka-manager:
    image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面
    environment:
        ZK_HOSTS:                    ## 修改:宿主机IP
    ports:
      - "9000:9000"

Mysql

docker run -d -p3306:3306 --name=mysql57 -e MYSQL_ROOT_PASSWORD=111111 mysql:5.7

在这里插入图片描述

Flink 执行 SQL命令

进入SQL客户端CLI

docker exec  -it flink_jobmanager_1  /bin/bash

./bin/sql-client.sh

在这里插入图片描述

执行SQL查询

SELECT 'Hello World';

在这里插入图片描述

表格模式

表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。执行如下命令启用:

SET sql-client.execution.result-mode = table;

可以使用如下查询语句查看不同模式的的运行结果:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

在这里插入图片描述

变更日志模式

变更日志模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)组成的持续查询结果流。

SET sql-client.execution.result-mode = changelog;

在这里插入图片描述

Tableau模式

Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业执行模式(execution.type):

SET sql-client.execution.result-mode = tableau;

在这里插入图片描述

注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理完所有的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

滚动窗口demo

根据订单信息使用kafka作为数据源表,JDBC作为数据结果表统计用户在5秒内的订单数量,并根据窗口的订单id和窗口开启时间作为主键,将结果实时统计到JDBC中:

  1. 在MySQL的flink数据库下创建表order_count,创建语句如下:
CREATE TABLE `flink`.`order_count` (
        `user_id` VARCHAR(32) NOT NULL,
        `window_start` TIMESTAMP NOT NULL,
        `window_end` TIMESTAMP NULL,
        `total_num` BIGINT UNSIGNED NULL,
        PRIMARY KEY (`user_id`, `window_start`)
)        ENGINE = InnoDB
        DEFAULT CHARACTER SET = utf8mb4
        COLLATE = utf8mb4_general_ci;
  1. 创建flink opensource sql作业,并提交运行作业
CREATE TABLE orders (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string,
  watermark for order_time as order_time - INTERVAL '3' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'order_topic',
  'properties.bootstrap.servers' = '192.168.56.112:9092',
  'properties.group.id' = 'order_group',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE TABLE jdbcSink (
  user_id string,
  window_start timestamp(3),
  window_end timestamp(3),
  total_num BIGINT,
  primary key (user_id, window_start) not enforced
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.56.112:3306/flink',
  'table-name' = 'order_count',
  'username' = 'root',
  'password' = '111111',
  'sink.buffer-flush.max-rows' = '1'
);

SELECT 
    'WINDOW',-- window_start,window_end,
    group_key,record_num,create_time,
    SUM(record_num) OVER w AS sum_amount
FROM temp
WINDOW w AS (
  PARTITION BY group_key
  ORDER BY rowtime
  RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)
  
select 
    user_id,
    TUMBLE_START(order_time, INTERVAL '5' SECOND),
    TUMBLE_END(order_time, INTERVAL '5' SECOND),
    COUNT(*) from orders
    GROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
    
SELECT 
    'WINDOW',
    user_id,order_id,real_pay,order_time
    COUNT(*) OVER w AS sum_amount
FROM orders
WINDOW w AS (
  PARTITION BY user_id
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) 
    

insert into jdbcSink select 
    user_id,
    TUMBLE_START(order_time, INTERVAL '5' SECOND),
    TUMBLE_END(order_time, INTERVAL '5' SECOND),
    COUNT(*) from orders
    GROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
  1. Kafka 相关操作
bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --list

bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --create --replication-factor 1 --partitions 1 --topic order_topic

bin/kafka-console-producer.sh --broker-list 192.168.56.112:9092 --topic order_topic

bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.112:9092 --topic order_topic --from-beginning

bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --describe --topic order_topic 

bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --delete --topic order_topic 

发送数据样例

{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-09-26 15:20:11", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:28:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}

滑动窗口

SELECT * FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND));

SELECT * FROM TABLE(
    HOP(
      DATA => TABLE orders,
      TIMECOL => DESCRIPTOR(order_time),
      SLIDE => INTERVAL '5' MINUTES,
      SIZE => INTERVAL '10' MINUTES));
      
      
SELECT window_start, window_end, SUM(pay_amount)
  FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND))
  GROUP BY window_start, window_end;

踩坑

  1. Could not find any factory for identifier ‘kafka’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

查看flink version

flink-sql-connector-kafka-1.17.1.jar

https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka/1.17.1

下载对应版本jar,放到lib目录下,重启

  1. Could not find any factory for identifier ‘jdbc’ that implements 'org.apache.flink.table.factories.DynamicTableFactory
    flink-connector-jdbc-3.1.0-1.17.jar
    https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar

  2. Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.31

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/610797.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

乡村振兴与城乡融合发展:加强城乡间经济、文化、社会等方面的交流与合作,推动城乡一体化发展,实现美丽乡村共荣

目录 一、引言 二、乡村振兴与城乡融合发展的意义 三、城乡交流合作的现状与挑战 (一)现状 (二)挑战 四、加强城乡交流合作的策略与路径 (一)完善城乡交流合作机制 (二)推动…

electron-vite工具打包后通过内置配置文件动态修改接口地址实现方法

系列文章目录 electronvitevue3 快速入门教程 文章目录 系列文章目录前言一、实现过程二、代码演示1.resources/env.json2.App.vue3.main/index.js4.request.js5.安装后修改 前言 使用electron-vite 工具开发项目打包完后每次要改接口地址都要重新打包,对于多环境…

后端的一些科普文章

后端开发一般有4个方面 后端开发流程 1阶段 域名认证 是每一个计算机在网络上有一个ip地址,可以通过这个地址来访问102.305.122.5(举例), 但是这个公网ip地址,比较难记忆,所以大家使用域名来更好的记忆…

越秀城投·星汇城 | 看得再多,都不如实景现房更安心

对于大多数家庭而言,买房是人生大事。经历了前几年房企暴雷、楼盘停工烂尾的风波,“现房”成为买房人心中最安心的代名词。无需再等待,所见即所得。 越秀城投星汇城位于平度南部新城核芯片区,不仅享受区域发展的利好,…

ArcGIS10.2系列许可到期解决方案

本文手机码字,不排版了。 昨晚(2021\12\17)12点后,收到很多学员反馈 ArcGIS10.2系列软件突然崩溃。更有的,今天全单位崩溃。 ​ 提示许可15天内到期。之前大部分许可是到2021年1月1日的。 ​ 后续的版本许可都是永久的…

[Kubernetes] 云原生 Istio 介绍

文章目录 1.Istio 介绍2.Istio 特征3.Istio 与服务治理4.Istio与Kubernetes4.1 Istio是Kubernetes的好帮手4.2 Kubernetes是Istio的好基座 5.Istio与服务网格5.1 时代选择服务网格5.2 服务网格选择Istio 1.Istio 介绍 服务网格是一个独立的基础设施层,用来处理服务之…

FastReID使用教程、踩坑记录

近期在尝试使用FastReID,期间对FastReID架构、损失函数、数据集准备、模型训练/评估/可视化/特征向量输出、调试debug记录等进行记录。 FastReID架构理解 关于FastReID的介绍,可点击此链接前往查询。 ReID和FastReID架构 对于模型架构、损失函数、实验…

鸿蒙ArkUI-X跨平台开发电商应用

一、ArkUI-X 简介 ArkUI-X 是由 OpenHarmony TSC - 跨平台应用开发框架 TSG 所孵化的开源项目,使用ArkUI-X可以让开发者基于一套主代码, 就可以构建支持多平台的精美、高性能应用。目前支持OpenHarmony、HarmonyOS、Android、 iOS,后续会逐步增加更多平台支持。 ArKUI跨平台…

Smma-net:一种基于音频线索的目标说话人提取网络,具有谱图匹配和相互关注功能

SMMA-NET: AN AUDIO CLUE-BASED TARGET SPEAKER EXTRACTION NETWORK WITH SPECTROGRAM MATCHING AND MUTUAL ATTENTION 第二章 目标说话人提取之《Smma-net:一种基于音频线索的目标说话人提取网络,具有谱图匹配和相互关注功能》 文章目录 SMMA-NET: AN AUDIO CLUE-…

星途重启:244亿公里外的「旅行者1号」,修好了

2024年4月20日,旅行者1号工程团队时隔5个月,终于重新收到了来自47年前所发射的探测器传回的有效数据。 ▲收到数据当天,工程团队成员在NASA喷气动力实验室的会议室中欢呼。 01.关于旅行者1号 在当下5G和WIFI已经普及的时代,NASA喷…

力扣2105---给植物浇水II(Java、模拟、双指针)

题目描述: Alice 和 Bob 打算给花园里的 n 株植物浇水。植物排成一行,从左到右进行标记,编号从 0 到 n - 1 。其中,第 i 株植物的位置是 x i 。 每一株植物都需要浇特定量的水。Alice 和 Bob 每人有一个水罐,最初是…

debian testing (预计13版本)wps字体无法正常显示

背 景 本人使用debian办公,原来使用的是debian 12,由于“生命不息,折腾不止“,终于将稳定版的debian 12升级为testing. 结果发现,debian 12能够正常使用的wps存在部分字体无法正常显示,经研究发现,原来是w…

The Sandbox 与 Cuisinia 合作推出全新体验!

与 Cuisinia 一起吃 Voxel! 召唤所有美食家和游戏玩家!准备好在 Cuisinia x The Sandbox Moodie 挑战赛中挑逗你的味蕾,考验你的技能!加入我们的美味探险,品尝充满活力的泰国美食。 为什么选择 Cuisinia? …

图像锐化——非锐化掩膜USM和锐化掩膜SM(附代码)

非锐化掩膜 (USM) 和锐化掩膜 (SM) 都是常用的图像锐化技术。它们都可以通过增强图像的边缘信息来提高图像的清晰度。 目录 一、非锐化掩膜USM1.1 USM原理1.2 USM实现步骤1.3 优点1.4 代码 二、锐化掩膜SM2.1 SM原理2.2 SM实现步骤2.3 优点2.4 代码 三、锐化效果四、总结4.1 效…

vue 代码样式问题

部分电脑存在样式错乱问题&#xff0c;部分电脑样式正常。最后发现是样式写在 el-col 里面导致的。 注意&#xff1a;写样式不要放在 el-row 或者 el-row &#xff0c;导致部分电脑会出现莫名其妙的样式问题 <el-row class"detail"><el-col class"it…

在RK3588开发板使用FFMpeg 结合云服务器加SRS实现摄像头数据推流到云端拱其他设备查看

今天测试了一把在开发板把摄像头数据推流到云端服务器&#xff0c;然后给其他电脑通过val软件拉取显示摄像头画面&#xff0c;浅浅记录一下大概步骤 1.开发板端先下载ffmpeg apt install ffmpeg2.云服务器先安装SRS的库 云服务器我使用ubuntu系统&#xff0c;SRS是个什么东西&…

扫码查看文件是如何实现的?文件活码在线生成的方法

现在很多场景下会通过扫码的方式来查看文件&#xff0c;这种方式可以让更多的人同时通过扫码的方式来查看二维码&#xff0c;有利于文件的快速分享以及用户获取内容的个人体验&#xff0c;而且可以保护文件的安全性&#xff0c;那么如何制作文件二维码呢&#xff1f; 文件二维…

车辆管理|基于SprinBoot+vue的4S店车辆管理系统(源码+数据库+文档)

4S店车辆管理系统 目录 基于SprinBootvue的4S店车辆管理系统 一、前言 二、系统设计 三、系统功能设计 系统实现 1管理员功能模块 2销售员功能模块 3维修员功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xf…

傻傻分不清楚:JDK/JRE/JVM的区别和联系

在Java开发的世界里&#xff0c;JDK、JRE和JVM是三个经常听到的术语。 对于初学者来说&#xff0c;它们的概念和区别可能会让人感到困惑。 这篇文章详细解释下三个组件的含义、它们之间的区别和联系。 一&#xff0c;JDK&#xff1a;Java Development Kit JDK是Java开发工具…

k8s个人认知理解

pod的定义 pod里面有容器&#xff0c;所以pod就是一个容器组&#xff0c;一个pod里面可以有多个容器也可以有一个容器&#xff0c;最低只能有一个容器&#xff0c;目前现在主流使用的都是一个pod里面一个容器&#xff0c;同一个pod里面的容器&#xff0c;需要紧耦合。配置文件…