docker安装flink

docker安装flink

5.1、拉取flink镜像,创建网络

docker pull flink
docker network create flink-network

5.2、创建 jobmanager

# 创建 JobManager 
docker run \
 -itd \
 --name=jobmanager \
 --publish 8081:8081 \
 --network flink-network \
 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
 flink:latest jobmanager 

5.3、创建 TaskManager

# 创建 TaskManager 
 docker run \
  -itd \
  --name=taskmanager \
  --network flink-network \
  --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
  flink:latest taskmanager 

5.4、访问公网ip

http://localhost:8081/

访问 http://150.158.119.225/:8081/

5.5 修改Task Slots

默认的Slots num是1,我们可以修改为5:
修改的目录是jobmanager和taskmanager的/opt/flink/confflink-conf.yaml文件:

修改taskmanager.numberOfTaskSlots:即可。
注意:默认的docker容器中没有vi/vim命令,可以使用docker cp命令,复制出来修改,然后在复制回去,如下:

docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/

5.6、通过flinksql消费Kafka

Docker安装kafka 3.5
并且通过python,简单写一个生产者
Python生产、消费Kafka

5.7 导入flink-sql-connector-kafka jar

顾名思义,用于连接flinksql和kafka。
进入flink

docker exec -it jobmanager /bin/bash

进入 flink的bin目录

cd /opt/flink/bin

查看flink版本:

flink --version

根据自己的flink版本,下载对应的 flink-sql-connector-kafka jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因为我是1.18.0,所以选择下图的版本包:

将下载的jar包,分别在jobmanager,taskmanager /opt/flink/lib目录下,注意,是两个都要放,如下图:

可以使用docker cp test.txt jobmanager:/opt/flink/lib命令,用户宿主机和docker容器文件传输。把test.txt换成对应的jar包即可

docker cp test.txt jobmanager:/opt/flink/lib
docker cp test.txt taskmanager:/opt/flink/lib

5.8 flinksql消费kafka

java结合日志

kafka.send("GatewayLog", JSONUtil.toJsonStr(gatewayLog));

GatewayLog是topic

yaml的服务配置

spring:
  kafka:
    bootstrap-servers: "10.10.10.155:9092"
    consumer:
      group-id: "teleGatewayGroup"

我在本地生成了一条log,将使用flinksql处理这个数据。

进入jobmanager中,执行

cd /opt/flink/bin
sql-client.sh

Flink SQL执行以下语句:

CREATE TABLE GatewayLog (
    platform VARCHAR,
    serviceId VARCHAR,
    targetServer VARCHAR,
    requestPath VARCHAR,
    requestMethod VARCHAR,
    schema VARCHAR,
    requestContentType VARCHAR,
    headers VARCHAR,
    requestBody VARCHAR,
    ip VARCHAR,
    startTime TIMESTAMP,
    endTime VARCHAR,
    executeTime VARCHAR,
    status VARCHAR,
    nickName VARCHAR,
    account VARCHAR,
    accountType VARCHAR,
    serviceName VARCHAR,
    orgCode VARCHAR
) WITH (
  'connector' = 'kafka',
  'topic' = 'GatewayLog',
  'properties.bootstrap.servers' = '150.158.119.225:9092',
  'properties.group.id' = 'flinKGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

select * from GatewayLog;

可以看到Flink在消费kafka数据,如下图:

中间缺少很多包。
在这里插入图片描述

flink-connector-kafka

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/3.1.0-1.18

依赖的kafka-clients

https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/3.6.1

然后在Linux需要看权限问题。

chmod -R 777 /lib 

把文件夹都改成777 所有人。

然后执行

sql最好先改成varchar,变成成功。

最后select * from table

执行成功。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/408072.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【全网首发】上周申请的谷歌Gemini 1.5 Pro已通过!百万token的Gemini 1.5 Pro开箱测试(一)

大家好,我是木易,一个持续关注AI领域的互联网技术产品经理,国内Top2本科,美国Top10 CS研究生,MBA。我坚信AI是普通人变强的“外挂”,所以创建了“AI信息Gap”这个公众号,专注于分享AI全维度知识…

C# WPF 桌面应用程序使用 SQlite 数据库

我们在开发 WPF 桌面应用程序时,数据库存的使用是必不可少的,除非你的应用没有数据存储的需求,有了数据存储需求,我们就会面临使用什么样的数据库的选择问题,我的选择方案是,单机版的应用我优先选择 Sqlite…

Mac安装Appium

一、环境依赖 一、JDK环境二、Android-SDK环境(android自动化)三、Homebrew环境四、Nodejs 安装cnpm 五、安装appium六、安装appium-doctor来确认安装环境是否完成七、安装相关依赖 二、重头大戏, 配置wda(WebDriverAgent&#x…

小苯的IDE括号问题(CD) -----牛客小白月赛87(双链表)

C题&#xff1a;C-小苯的IDE括号问题&#xff08;easy&#xff09;_牛客小白月赛87 (nowcoder.com) D题&#xff1a; D-小苯的IDE括号问题&#xff08;hard&#xff09;_牛客小白月赛87 (nowcoder.com) C题代码&#xff1a; #include<bits/stdc.h>using namespace std…

番外篇 | YOLOv5+DeepSort实现行人目标跟踪检测

前言:Hello大家好,我是小哥谈。DeepSort是一种用于目标跟踪的深度学习算法。它结合了目标检测和目标跟踪的技术,能够在视频中准确地跟踪多个目标,并为每个目标分配一个唯一的ID。DeepSort的核心思想是将目标检测和目标跟踪两个任务进行联合训练,以提高跟踪的准确性和稳定性…

Sentinel微服务流量治理组件实战上

目录 分布式系统遇到的问题 解决方案 Sentinel 是什么&#xff1f; Sentinel 工作原理 Sentinel 功能和设计理念 流量控制 熔断降级 Sentinel工作主流程 Sentinel快速开始 Sentinel资源保护的方式 基于API实现 SentinelResource注解实现 Spring Cloud Alibaba整合…

一键生成PDF即刻呈现:轻松创建无忧体验

在信息爆炸的时代&#xff0c;我们每天都在与各种文件、资料打交道。无论是工作中的报告、合同&#xff0c;还是学习中的笔记、论文&#xff0c;如何高效、安全地管理这些珍贵的资料&#xff0c;成为了我们迫切的需求。幸运的是&#xff0c;随着科技的发展&#xff0c;我们不再…

Mysql--索引分类

Mysql--索引分类 1. 索引分类2. 聚集索引&二级索引 1. 索引分类 在MySQL数据库&#xff0c;将索引的具体类型主要分为以下几类&#xff1a;主键索引、唯一索引、常规索引、全文索引。 2. 聚集索引&二级索引 而在在InnoDB存储引擎中&#xff0c;根据索引的存储形式&am…

Mysql运维篇(五) 部署MHA--主机环境配置

一路走来&#xff0c;所有遇到的人&#xff0c;帮助过我的、伤害过我的都是朋友&#xff0c;没有一个是敌人。如有侵权&#xff0c;请留言&#xff0c;我及时删除&#xff01; 大佬博文 https://www.cnblogs.com/gomysql/p/3675429.html MySQL 高可用&#xff08;MHA&#x…

【算法与数据结构】417、LeetCode太平洋大西洋水流问题

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;题目要求雨水既能流向太平洋也能流向大西洋的网格。雨水流向取决于网格的高度。一个比较直接的方式是对…

LangChain Agent v0.2.0简明教程 (中)

4. Retrieval4.1 Document loaders4.2 Text Splitter4.3 Text embedding models4.4 Vector stores4.5 Retrievers4.6 Indexing4. Retrieval 许多LLM需要使用特定用户的外部数据,这些数据不属于模型训练集的一部分。实现这一目标的主要方法是通过检索增强生成(RAG)。在此过程…

window: C++ 获取自己写的dll的地址

我自己用C写了一个插件,插件是dll形式的,我的插件式在dll的目录下有个config文件夹,里面是我用json写的插件配置文件,当插件运行的时候我需要读取到json配置文件,所有最重要的就是如何获取dll的路径. 大概就是这么个结构, 我自己封装了一个函数.只适用于window编程,因为里面用…

【ArcGIS】利用DEM进行水文分析:流向/流量等

利用DEM进行水文分析 ArcGIS实例参考 水文分析通过建立地表水文模型&#xff0c;研究与地表水流相关的各种自然现象&#xff0c;在城市和区域规划、农业及森林、交通道路等许多领域具有广泛的应用。 ArcGIS实例 某流域30m分辨率DEM如下&#xff1a; &#xff08;1&#xff09…

【大数据】Flink 内存管理(一):设置 Flink 进程内存

Flink 内存管理&#xff08;一&#xff09;&#xff1a;设置 Flink 进程内存 1.配置 Total Memory2.JVM 参数3.根据比例限制的组件&#xff08;Capped Fractionated Components&#xff09; Apache Flink 通过严格控制各种组件的内存使用&#xff0c;在 JVM 上提供高效的工作负…

LabVIEW储氢材料循环寿命测试系统

LabVIEW储氢材料循环寿命测试系统 随着氢能技术的发展&#xff0c;固态储氢技术因其高密度和安全性成为研究热点。储氢材料的循环寿命是衡量其工程应用的关键。然而&#xff0c;传统的循环寿命测试设备存在成本高、测试效率低、数据处理复杂等问题。设计了一种基于LabVIEW软件…

SQL-Labs靶场“46-50”关通关教程

君衍. 一、四十六关 ORDER BY数字型注入1、源码分析2、rand()盲注3、if语句盲注4、时间盲注5、报错注入6、Limit注入 二、四十七关 ORDER BY单引号报错注入1、源码分析2、报错注入3、时间盲注 三、四十八关 ODRER BY数字型盲注1、源码分析2、rand()盲注3、if语句盲注4、时间盲注…

汽车大灯尾灯灯罩裂了可以修复吗?汽车大灯尾灯裂缝修复用什么胶?拆开的灯罩用什么胶合壳密封?

随着科学技术的不断发展&#xff0c;汽车大灯尾灯破损是可以修的。 TADHE车灯无痕修复专用UV胶是一种经过处理的UV树脂胶&#xff0c;主要成份是改性丙烯酸UV树脂。应用在车灯的专业无痕修复领域。 其具备环氧树脂胶优点的同时&#xff0c;还有如下特点&#xff1a; 固化时间…

独立版表情包小程序完整版源码前后端源码,附带系统搭建教程

搭建要求&#xff1a; 1.系统要求Nginx 1.18.0PHP-7.2mysql5.6&#xff0c;开启 ssl&#xff0c;php需要安装 sg11 扩展 2.设置伪静态 location / { index index.php index.html index.htm; if (!-e $request_filename) { rewrite ^/(.*)$ /index.php?s$1; } } location /a…

华为OD机试真题-虚拟游戏理财-2023年OD统一考试(C卷)---Python3--开源

题目&#xff1a; 考察内容&#xff1a; for if max 代码&#xff1a; """ 题目分析&#xff1a;投资额*回报率投资回报 要在可接受范围内选择最优的投资方式获得最大回报最多投资2个理财产品输入&#xff1a; 产品数int; 总投资额int; 总风险int 产品投资…

C#中的dynamic怎么用

在C#中&#xff0c;dynamic 关键字允许您在编译时推迟类型检查&#xff0c;而是将类型检查推迟到运行时。这意味着可以在运行时确定对象的类型&#xff0c;而不是在编译时。这种行为与使用静态类型的编程方式有所不同&#xff0c;因为在编译时会对类型进行检查。 使用 dynamic …